Bläddra i källkod

1.websocket 订阅 —— qlm

qlm 4 år sedan
förälder
incheckning
edc66c4519

+ 29 - 0
pom.xml

@@ -268,6 +268,27 @@
             <artifactId>kaptcha</artifactId>
             <version>2.3.2</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.jfinal</groupId>
+            <artifactId>activerecord</artifactId>
+            <version>4.8</version>
+        </dependency>
+
+        <!--websocket-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+            <version>1.3.5.RELEASE</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.java-websocket</groupId>
+            <artifactId>Java-WebSocket</artifactId>
+            <version>1.3.8</version>
+        </dependency>
+
+
     </dependencies>
 
     <build>
@@ -276,6 +297,14 @@
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.22.2</version>
+                <configuration>
+                    <skipTests>true</skipTests>
+                </configuration>
+            </plugin>
         </plugins>
         <resources>
             <resource>

+ 167 - 0
src/main/java/com/zx/dataservice/config/JfinalActiveRecordConfig.java

@@ -0,0 +1,167 @@
+package com.zx.dataservice.config;
+
+import com.alibaba.druid.pool.DruidDataSource;
+import com.jfinal.plugin.activerecord.ActiveRecordPlugin;
+import com.jfinal.plugin.activerecord.Db;
+import com.jfinal.plugin.activerecord.Record;
+import com.zx.dataservice.utils.PropertiesUtil;
+import com.zx.dataservice.utils.ZXOptions;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
+
+import javax.annotation.PreDestroy;
+import java.sql.SQLException;
+import java.util.List;
+
+/**
+ * Created on 2020/12/04.
+ *
+ * @author qlm
+ */
+@Configuration
+public class JfinalActiveRecordConfig implements ApplicationRunner {
+    /**
+     * 1数据源名称
+     */
+    private static final String GP_DATA_SOURCE_CONFIG = "gp";
+
+    /**
+     * 2数据源名称
+     */
+    private static final String ZQ_DATA_SOURCE_CONFIG = "zq";
+
+
+
+
+    @Bean(name = "gpDataSource")
+//    @ConfigurationProperties(prefix = "spring.datasource.gp")
+    public DruidDataSource gpDataSource()throws SQLException {
+        DruidDataSource dataSource = new DruidDataSource();
+        dataSource.setUrl(PropertiesUtil.getProperty("spring.datasource.test1.jdbc-Url"));
+        dataSource.setUsername(PropertiesUtil.getProperty("spring.datasource.test1.username"));
+        dataSource.setPassword(PropertiesUtil.getProperty("spring.datasource.test1.password"));
+        dataSource.setDriverClassName(PropertiesUtil.getProperty("spring.datasource.test1.driver-class-name"));
+        dataSource.setInitialSize(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.initialSize")));
+        dataSource.setMinIdle(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.minIdle")));
+        dataSource.setMaxActive(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.maxActive")));
+        dataSource.setMaxWait(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.maxWait")));
+        dataSource.setTimeBetweenEvictionRunsMillis(Long.parseLong(PropertiesUtil.getProperty("spring.druid.timeBetweenEvictionRunsMillis")));
+        dataSource.setMinEvictableIdleTimeMillis(Long.parseLong(PropertiesUtil.getProperty("spring.druid.minEvictableIdleTimeMillis")));
+        dataSource.setValidationQuery(PropertiesUtil.getProperty("spring.druid.validationQuery"));
+        dataSource.setTestWhileIdle(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.testWhileIdle")));
+        dataSource.setTestOnBorrow(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.testOnBorrow")));
+        dataSource.setTestOnReturn(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.testOnReturn")));
+        dataSource.setPoolPreparedStatements(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.poolPreparedStatements")));
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.maxPoolPreparedStatementPerConnectionSize")));
+        dataSource.setRemoveAbandoned(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.removeAbandoned")));
+        dataSource.setRemoveAbandonedTimeout(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.removeAbandonedTimeout")));
+        dataSource.setFilters(PropertiesUtil.getProperty("spring.druid.filters"));
+        return dataSource;
+    }
+
+    @Bean(name = "zqDataSource")
+//    @ConfigurationProperties(prefix = "spring.datasource.zq")
+    public DruidDataSource zqDataSource()throws SQLException  {
+        DruidDataSource dataSource = new DruidDataSource();
+        dataSource.setUrl(PropertiesUtil.getProperty("spring.datasource.test2.jdbc-Url"));
+        dataSource.setUsername(PropertiesUtil.getProperty("spring.datasource.test2.username"));
+        dataSource.setPassword(PropertiesUtil.getProperty("spring.datasource.test2.password"));
+        dataSource.setDriverClassName(PropertiesUtil.getProperty("spring.datasource.test2.driver-class-name"));
+        dataSource.setInitialSize(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.initialSize")));
+        dataSource.setMinIdle(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.minIdle")));
+        dataSource.setMaxActive(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.maxActive")));
+        dataSource.setMaxWait(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.maxWait")));
+        dataSource.setTimeBetweenEvictionRunsMillis(Long.parseLong(PropertiesUtil.getProperty("spring.druid.timeBetweenEvictionRunsMillis")));
+        dataSource.setMinEvictableIdleTimeMillis(Long.parseLong(PropertiesUtil.getProperty("spring.druid.minEvictableIdleTimeMillis")));
+        dataSource.setValidationQuery(PropertiesUtil.getProperty("spring.druid.validationQuery"));
+        dataSource.setTestWhileIdle(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.testWhileIdle")));
+        dataSource.setTestOnBorrow(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.testOnBorrow")));
+        dataSource.setTestOnReturn(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.testOnReturn")));
+        dataSource.setPoolPreparedStatements(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.poolPreparedStatements")));
+        dataSource.setMaxPoolPreparedStatementPerConnectionSize(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.maxPoolPreparedStatementPerConnectionSize")));
+        dataSource.setRemoveAbandoned(Boolean.parseBoolean(PropertiesUtil.getProperty("spring.druid.removeAbandoned")));
+        dataSource.setRemoveAbandonedTimeout(Integer.parseInt(PropertiesUtil.getProperty("spring.druid.removeAbandonedTimeout")));
+        return dataSource;
+    }
+
+
+
+    /**
+     * 主数据源  gp
+     * @return
+     */
+    @Bean
+    public ActiveRecordPlugin initgpActiveRecord()  throws SQLException{
+        ActiveRecordPlugin arp = new ActiveRecordPlugin(GP_DATA_SOURCE_CONFIG, gpTransactionAwareDataSourceProxy());
+        arp.start();
+        return arp;
+    }
+
+    /**
+     * 业务数据源  zq
+     * @return
+     */
+    @Bean
+    public ActiveRecordPlugin initzqActiveRecord()throws SQLException  {
+        ActiveRecordPlugin arp = new ActiveRecordPlugin(ZQ_DATA_SOURCE_CONFIG, zqTransactionAwareDataSourceProxy());
+        arp.start();
+        return arp;
+    }
+
+
+    /**
+     * 设置数据源代理
+     */
+    @Bean
+    public TransactionAwareDataSourceProxy gpTransactionAwareDataSourceProxy() throws SQLException{
+        TransactionAwareDataSourceProxy transactionAwareDataSourceProxy = new TransactionAwareDataSourceProxy();
+        transactionAwareDataSourceProxy.setTargetDataSource(gpDataSource());
+        return transactionAwareDataSourceProxy;
+    }
+
+    @Bean
+    public TransactionAwareDataSourceProxy zqTransactionAwareDataSourceProxy() throws SQLException {
+        TransactionAwareDataSourceProxy transactionAwareDataSourceProxy = new TransactionAwareDataSourceProxy();
+        transactionAwareDataSourceProxy.setTargetDataSource(zqDataSource());
+        return transactionAwareDataSourceProxy;
+    }
+
+    /**
+     * 设置事务管理
+     */
+    @Bean(name="mainDataSourceTransactionManager")
+    public DataSourceTransactionManager gpDataSourceTransactionManager()  throws SQLException{
+        DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
+        dataSourceTransactionManager.setDataSource(gpTransactionAwareDataSourceProxy());
+        return dataSourceTransactionManager;
+    }
+
+    @Bean(name="zqDataSourceTransactionManager")
+    public DataSourceTransactionManager zqDataSourceTransactionManager()  throws SQLException{
+        DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager();
+        dataSourceTransactionManager.setDataSource(zqTransactionAwareDataSourceProxy());
+        return dataSourceTransactionManager;
+    }
+
+
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+
+        List<Record> records = Db.use("gp").find("select * from sys_config ");
+        for (Record record:records) {
+            ZXOptions.set(record.getStr("config_key"),record.getStr("config_value"));
+        }
+
+    }
+
+    @PreDestroy
+    public void destory() throws Exception {
+//        QuantUtil quantUtil = new QuantUtil();
+//        quantUtil.toGetCsqCancel(0);
+//        System.out.println("取消所有订阅~");
+    }
+}

+ 119 - 0
src/main/java/com/zx/dataservice/controller/WebSocketController.java

@@ -0,0 +1,119 @@
+package com.zx.dataservice.controller;
+
+/**
+ * Created by qlm on 2020-12-14.
+ */
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+@ServerEndpoint("/websocket/{userId}")
+//@Component
+public class WebSocketController {
+
+    public static Map<String,Session> map = new HashMap<String,Session>();//根据用户找session
+
+    /**
+     * 连接建立成功时调用
+     * @param userId 登录用户id
+     * @param session  可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
+     * @throws IOException
+     */
+    @OnOpen
+    public void onOpen(Session session, @PathParam("userId") String userId) throws IOException{
+        String[] a = userId.split("&");
+
+        String mUserId = a[0];
+        map.put(mUserId + "",session);       //添加到链接map
+        try{
+            System.out.println(mUserId+"连接成功");
+            sendUnreadMsg(session);
+        }
+        catch (Exception e){
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 连接成功时发送的信息
+     */
+    public static void sendUnreadMsg(Session session){
+            synchronized(session){
+                if(session.isOpen()){
+                    try {
+                        session.getBasicRemote().sendText("连接成功");
+                    }catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+    }
+
+
+    /**
+     * 连接关闭调用的方法
+     */
+    @OnClose
+    public void onClose(Session session){
+        if(map.containsValue(session)){
+            for (Entry<String, Session> entry : map.entrySet()){
+                if(entry.getValue().equals(session)){
+                    map.remove(entry.getKey());
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * 发生错误时调用
+     * @param session
+     * @param error
+     */
+    @OnError
+    public void onError(Session session, Throwable error){
+        if(map.containsValue(session)){
+            for (Entry<String, Session> entry : map.entrySet()) {
+                if(entry.getValue().equals(session)){
+                    map.remove(entry.getKey());
+                    break;
+                }
+            }
+        }
+        error.printStackTrace();
+    }
+
+    /**
+     * 消息发送时调用
+     * @param requestJson
+     * @param session
+     */
+    @OnMessage
+    public void onMessage(String requestJson, Session session) {
+        System.out.println("啦啦啦");
+
+    }
+
+
+    /**
+     */
+    public static void sendSysMsg(String id ,String  xMessage){
+
+        try{
+            // 数据服务器 id   :
+            Session session = map.get( id);               //发送消息给对方
+            if(session!=null){
+                session.getBasicRemote().sendText(xMessage);
+            }
+        }catch(IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+}

+ 10 - 4
src/main/java/com/zx/dataservice/service/impl/ChoiceBondServiceImpl.java

@@ -1,13 +1,11 @@
 package com.zx.dataservice.service.impl;
 
 import com.alibaba.fastjson.JSON;
+import com.zx.dataservice.controller.WebSocketController;
 import com.zx.dataservice.mapper2.ChoiceBondMapper;
 import com.zx.dataservice.pojo.StockRestPojo;
 import com.zx.dataservice.service.ChoiceBondService;
-import com.zx.dataservice.utils.BuyAndSellUtils;
-import com.zx.dataservice.utils.FileBondUtil;
-import com.zx.dataservice.utils.HttpRequest;
-import com.zx.dataservice.utils.TimeUtil;
+import com.zx.dataservice.utils.*;
 import com.zx.dataservice.vo.StockRestRedisVO;
 import com.zx.dataservice.vo.StockRestVO;
 import org.apache.commons.lang3.StringUtils;
@@ -223,6 +221,10 @@ public class ChoiceBondServiceImpl implements ChoiceBondService {
 //                    HttpRequest.httpPostWithjson(url, JSON.toJSONString(subList));
                     if(BuyAndSellUtils.isTransState()){
                         try{
+                            final String[] webArr = ZXOptions.get("ws.key").split(";");//websocket key
+                            for (int j = 0; j < webArr.length; j++) {
+                                new WebSocketController().sendSysMsg(webArr[j],JSON.toJSONString(subRedisList));
+                            }
                             HttpRequest.httpPostWithjson(urlRedis, JSON.toJSONString(subRedisList));
                         }catch (Exception e){
                             log.error("插入redis错误" + e);
@@ -246,6 +248,10 @@ public class ChoiceBondServiceImpl implements ChoiceBondService {
 //                    HttpRequest.httpPostWithjson(url, JSON.toJSONString(subList));
                     if(BuyAndSellUtils.isTransState()) {
                         try{
+                            final String[] webArr = ZXOptions.get("ws.key").split(";");//websocket key
+                            for (int j = 0; j < webArr.length; j++) {
+                                new WebSocketController().sendSysMsg(webArr[j],JSON.toJSONString(subRedisList));
+                            }
                             HttpRequest.httpPostWithjson(urlRedis, JSON.toJSONString(subRedisList));
                         }catch (Exception e){
                             log.error("插入redis错误" + e);

+ 1 - 1
src/main/java/com/zx/dataservice/task/AnalysisBondRestDataTask.java

@@ -30,7 +30,7 @@ public class AnalysisBondRestDataTask {
     }
 
     @Async("executorAfterBond")
-    @Scheduled(cron = "0/10 * 15,16,17,18 * * ? *")
+    @Scheduled(cron = "0/10 * 15,16,17,18 * * ? ")
     public void toAnalysisRestAfterData() {
         doTask("after");
     }

+ 1 - 1
src/main/java/com/zx/dataservice/task/AnalysisStockRestDataTask.java

@@ -30,7 +30,7 @@ public class AnalysisStockRestDataTask {
     }
 
     @Async("executorAfterStock")
-    @Scheduled(cron = "0/10 * 15,16,17,18 * * ? *")
+    @Scheduled(cron = "0/10 * 15,16,17,18 * * ? ")
     public void toAnalysisRestAfterData() {
         doTask("after");
     }

+ 1 - 1
src/main/java/com/zx/dataservice/task/AnalysisTempIndexRestDataTask.java

@@ -30,7 +30,7 @@ public class AnalysisTempIndexRestDataTask {
     }
 
     @Async("executorTempAfterIndex")
-    @Scheduled(cron = "0/10 * 15,16,17,18 * * ? *")
+    @Scheduled(cron = "0/10 * 15,16,17,18 * * ? ")
     public void toAnalysisRestAfterData() {
         doTask("after");
     }

+ 171 - 0
src/main/java/com/zx/dataservice/utils/ZXOptions.java

@@ -0,0 +1,171 @@
+/**
+ * Copyright (c) 2016-2019, Michael Yang 杨福海 (fuhai999@gmail.com).
+ * <p>
+ * Licensed under the GNU Lesser General Public License (LGPL) ,Version 3.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.gnu.org/licenses/lgpl-3.0.txt
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.zx.dataservice.utils;
+
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 一些常量 配置
+ * sys_config 表
+ * 2020-12-17
+ *  by qlm
+ */
+public class ZXOptions {
+
+    private static Logger LOG = LoggerFactory.getLogger(ZXOptions.class);
+
+    private static OptionStore store = new OptionStore() {
+        private final Map<String, String> cache = new ConcurrentHashMap<>();
+
+        @Override
+        public String get(String key) {
+            return cache.get(key);
+        }
+
+        @Override
+        public void put(String key, String value) {
+            if (StringUtils.isBlank(value)) {
+                remove(key);
+            } else {
+                cache.put(key, value);
+            }
+        }
+
+        @Override
+        public void remove(String key) {
+            cache.remove(key);
+        }
+    };
+
+    private static List<OptionChangeListener> LISTENERS = new ArrayList<>();
+
+    public static void set(String key, String value) {
+        if (StringUtils.isBlank(key)) {
+            return;
+        }
+        String oldValue = store.get(key);
+        if (Objects.equals(value, oldValue)) {
+            return;
+        }
+        store.put(key, value);
+        for (OptionChangeListener listener : LISTENERS) {
+            try {
+                listener.onChanged(key, value, oldValue);
+            } catch (Throwable ex) {
+                LOG.error(ex.toString(), ex);
+            }
+        }
+        doFinishedChanged(key, value, oldValue);
+    }
+
+    public static String get(String key) {
+        return store.get(key);
+    }
+
+    public static String get(String key, String defaultvalue) {
+        String v = get(key);
+        return StringUtils.isBlank(v) ? defaultvalue : v;
+    }
+
+    public static boolean getAsBool(String key) {
+        return Boolean.parseBoolean(store.get(key));
+    }
+
+
+
+    public static boolean isTrueOrNull(String key) {
+        String data = get(key);
+        return data == null || "true".equals(data);
+    }
+
+    public static int getAsInt(String key, int defaultValue) {
+        String value = get(key);
+        if (StringUtils.isBlank(value)) {
+            return defaultValue;
+        }
+        try {
+            return Integer.parseInt(value);
+        } catch (Exception ex) {
+            LOG.warn(ex.toString(), ex);
+            return defaultValue;
+        }
+    }
+
+    public static float getAsFloat(String key, float defaultValue) {
+        String value = get(key);
+        if (StringUtils.isBlank(value)) {
+            return defaultValue;
+        }
+        try {
+            return Float.parseFloat(value);
+        } catch (Exception ex) {
+            LOG.warn(ex.toString(), ex);
+            return defaultValue;
+        }
+    }
+
+    public static void addListener(OptionChangeListener listener) {
+        LISTENERS.add(listener);
+    }
+
+    public static void removeListener(OptionChangeListener listener) {
+        LISTENERS.remove(listener);
+    }
+
+
+    public static interface OptionChangeListener {
+        public void onChanged(String key, String newValue, String oldValue);
+    }
+
+
+    private static void doFinishedChanged(String key, String value, String oldValue) {
+
+
+    }
+
+    private static boolean fakeStaticEnable = false;
+    private static String fakeStaticSuffix = "";
+
+    public static String getAppUrlSuffix() {
+        return fakeStaticEnable
+                ? (StringUtils.isBlank(fakeStaticSuffix) ? "" : fakeStaticSuffix)
+                : "";
+    }
+
+    public static OptionStore getStore() {
+        return store;
+    }
+
+
+    public static interface OptionStore {
+
+        public String get(String key);
+
+        public void put(String key, String value);
+
+        public void remove(String key);
+
+    }
+}