liuf преди 5 месеца
родител
ревизия
84286491f7

+ 1 - 0
readme.txt

@@ -17,3 +17,4 @@ docker rm device-link-ykc
 docker rmi device-link-ykc
 
 
+docker-compose up -d --build device-link-ykc

+ 1 - 1
src/main/java/com/tmzn/devicelinkykc/controller/DeviceController.java

@@ -241,7 +241,7 @@ public class DeviceController {
                 deviceStatusQueryWrapper.eq("gun_status",StatusConstant.CHARGING).eq("device_imei",device.getDeviceImei());
                 List<DeviceStatus> list = deviceStatusService.list(deviceStatusQueryWrapper);
                 if (list.size()>0){
-                    return AjaxResult.error("有充电中设备IP和端口无法修改!");
+                 //   return AjaxResult.error("有充电中设备IP和端口无法修改!");
                 }
             }
             deviceList.stream().forEach(device -> {

+ 29 - 1
src/main/java/com/tmzn/devicelinkykc/controller/TestController.java

@@ -27,6 +27,7 @@ import com.tmzn.devicelinkykc.service.DeviceControlerService;
 import com.tmzn.devicelinkykc.service.DeviceStatusService;
 import com.tmzn.devicelinkykc.service.OrderStatusService;
 import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg;
+import com.tmzn.devicelinkykc.socket.SocketHandle;
 import com.tmzn.devicelinkykc.transdata.entity.DeviceParam;
 import com.tmzn.devicelinkykc.msgparser.MessageParseMgr;
 import com.tmzn.devicelinkykc.util.Encrytion;
@@ -43,8 +44,11 @@ import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.RoundingMode;
+import java.net.InetSocketAddress;
+import java.net.Socket;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -85,7 +89,8 @@ public class TestController {
     @Autowired
     private OrderStatusService orderStatusService;
 
-
+    @Autowired
+    private SocketHandle socketHandle;
 
     @Autowired
     private TransMoney transMoney;
@@ -93,6 +98,29 @@ public class TestController {
     public String testMsg() {
         try {
 
+            //集测
+            try {
+
+                socketHandle.addDeviceConnection("119.36.5.9", 30561,"1111", "222","333", "1.6",1);
+
+
+//                Socket socket = new Socket();
+//                socket.connect(new InetSocketAddress("119.36.5.9", 30561), 5000);
+//                socket.setKeepAlive(true);
+
+//                DeviceConnectionMsg deviceConnectionMsg = new DeviceConnectionMsg(socket, deviceId,imei,deviceSn,ver,isDc);
+                //每个设备连接后开启监听接收消息
+                System.out.println("成功");
+               // ykcMsgHandle.startListening(deviceConnectionMsg);
+                //并将连接信息等保存再Map中
+               // deviceConnectionMsgMap.put(deviceId, deviceConnectionMsg);
+            } catch (IOException e) {
+                // 连接失败,捕获IOException异常
+                log.info("{}-{}-{}连接失败: " + e.getMessage());
+            }
+
+            System.out.println(1111);
+
 //            realTimeStatusPushFrame.upStatusDevice(null,);
 
             String pileCode = "51220002000016";

+ 0 - 1
src/main/java/com/tmzn/devicelinkykc/message/DeviceMsgHandle.java

@@ -855,7 +855,6 @@ public class DeviceMsgHandle {
             statusServiceOne.setCreateTime(System.currentTimeMillis());
             statusServiceOne.setUpdateTime(System.currentTimeMillis());
             statusServiceOne.setOnlineStatus(DeviceOnlineStatus.ONLINE);
-            statusServiceOne.setUpdateTime(System.currentTimeMillis());
             statusServiceOne.setPortStatus(portStatus);
             deviceStatusService.saveOrUpdate(statusServiceOne);
         }

+ 16 - 1
src/main/java/com/tmzn/devicelinkykc/socket/SocketHandle.java

@@ -11,6 +11,7 @@ import org.springframework.stereotype.Component;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -32,11 +33,25 @@ public class SocketHandle {
     @Autowired
     private YkcMsgHandle ykcMsgHandle;
 
-    public synchronized boolean addDeviceConnection(String ip, int port, String deviceId, String imei, String deviceSn, String ver,int isDc) throws IOException {
+    HashMap<String,Long> connectTime = new HashMap<>();
+
+
+    public  boolean addDeviceConnection(String ip, int port, String deviceId, String imei, String deviceSn, String ver,int isDc) throws IOException {
         if (deviceConnectionMsgMap.containsKey(deviceId)) {
             log.info("已存在登录{}",imei);
             return  true;
         }
+
+
+        long nowTime = System.currentTimeMillis();
+        if(connectTime.containsKey(imei) && connectTime.get(imei)>0) {
+            if ((nowTime - connectTime.get(imei)) < 10000) {
+                log.info("10秒内不再尝试连接");
+                return false;
+            }
+        }
+        connectTime.put(imei, nowTime);
+
         log.info("请求登录{},版本:{}",imei,ver);
         //思考:这里的Socket的IP和端口从数据库查询到,云快充的device库中加字段保存IP和地址,根据桩后台传的设备SN码确定设备是对接的那个厂家的平台
         //Socket socket = new Socket("114.55.7.88", 8781);

+ 7 - 6
src/main/java/com/tmzn/devicelinkykc/taskQueue/queue/MsgFreeQueue.java

@@ -16,19 +16,20 @@ import java.util.concurrent.BlockingQueue;
 @Slf4j(topic = "MsgFreeQueue")
 public class MsgFreeQueue extends Thread implements ApplicationRunner {
 
-    private BlockingQueue<Map<String, DeviceConnectionMsg>> freeQueue = new ArrayBlockingQueue(1024);
+    private BlockingQueue<Map<String, DeviceConnectionMsg>> freeQueue = new ArrayBlockingQueue(1);
 
 
     @Autowired
     private MsgFreeRunner msgFreeRunner;
 
     public void add(Map<String, DeviceConnectionMsg> object) {
-        try {
-            freeQueue.put(object);
-
-        } catch (InterruptedException e) {
-            e.printStackTrace();
+        //            freeQueue.put(object);
+        // 非阻塞
+        boolean success = freeQueue.offer(object);
+        if (!success) {
+            log.info("队列已满,丢弃free设备状态更新: size={}", object.size());
         }
+
     }
 
     @Override

+ 5 - 6
src/main/java/com/tmzn/devicelinkykc/taskQueue/queue/MsgHeartQueue.java

@@ -16,17 +16,16 @@ import java.util.concurrent.BlockingQueue;
 @Slf4j(topic = "MsgHeartQueue")
 public class MsgHeartQueue extends Thread implements ApplicationRunner {
 
-    private BlockingQueue<Map<String, DeviceConnectionMsg>> heartQueue = new ArrayBlockingQueue(1024);
+    private BlockingQueue<Map<String, DeviceConnectionMsg>> heartQueue = new ArrayBlockingQueue(1);
 
 
     @Autowired
     MsgHeartRunner msgHeartRunner;
     public void add(Map<String, DeviceConnectionMsg> map){
-        try {
-                heartQueue.put(map);
-
-        } catch (InterruptedException e) {
-            e.printStackTrace();
+        //                heartQueue.put(map);
+        boolean success = heartQueue.offer(map);
+        if (!success) {
+            log.info("队列已满,丢弃Heart设备状态更新: size={}", map.size());
         }
     }
 

+ 5 - 6
src/main/java/com/tmzn/devicelinkykc/taskQueue/queue/MsgTranscationQueue.java

@@ -21,18 +21,17 @@ import java.util.concurrent.BlockingQueue;
 @Slf4j(topic = "MsgTranscationQueue")
 public class MsgTranscationQueue extends Thread implements ApplicationRunner {
 
-    private BlockingQueue<Map<String, DeviceConnectionMsg>> freeQueue = new ArrayBlockingQueue(1024);
+    private BlockingQueue<Map<String, DeviceConnectionMsg>> freeQueue = new ArrayBlockingQueue(1);
 
 
     @Autowired
     private MsgTranscationRunner msgTranscationRunner;
 
     public void add(Map<String, DeviceConnectionMsg> object){
-        try {
-            freeQueue.put(object);
-
-        } catch (InterruptedException e) {
-            e.printStackTrace();
+        //            freeQueue.put(object);
+        boolean success = freeQueue.offer(object);
+        if (!success) {
+            log.info("队列已满,丢弃transaction设备状态更新: size={}", object.size());
         }
     }
 

+ 9 - 9
src/main/java/com/tmzn/devicelinkykc/taskQueue/queue/TaskExecutePool.java

@@ -17,9 +17,9 @@ public class TaskExecutePool {
     @Bean
     public Executor heartTaskAsyncPool() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        executor.setCorePoolSize(2); //核心线程数
-        executor.setMaxPoolSize(4);  //最大线程数
-        executor.setQueueCapacity(1000); //队列大小
+        executor.setCorePoolSize(1); //核心线程数
+        executor.setMaxPoolSize(1);  //最大线程数
+        executor.setQueueCapacity(0); //队列大小
         executor.setKeepAliveSeconds(300); //线程最大空闲时间
         executor.setThreadNamePrefix("async-heartExecutor-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(一共四种,此处省略)
@@ -41,9 +41,9 @@ public class TaskExecutePool {
     @Bean
     public Executor freeTaskAsyncPool() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        executor.setCorePoolSize(2); //核心线程数
-        executor.setMaxPoolSize(4);  //最大线程数
-        executor.setQueueCapacity(1000); //队列大小
+        executor.setCorePoolSize(1); //核心线程数
+        executor.setMaxPoolSize(1);  //最大线程数
+        executor.setQueueCapacity(0); //队列大小
         executor.setKeepAliveSeconds(300); //线程最大空闲时间
         executor.setThreadNamePrefix("async-freeExecutor-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(一共四种,此处省略)
@@ -53,9 +53,9 @@ public class TaskExecutePool {
     @Bean
     public Executor transactionTaskAsyncPool() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        executor.setCorePoolSize(2); //核心线程数
-        executor.setMaxPoolSize(4);  //最大线程数
-        executor.setQueueCapacity(1000); //队列大小
+        executor.setCorePoolSize(1); //核心线程数
+        executor.setMaxPoolSize(1);  //最大线程数
+        executor.setQueueCapacity(0); //队列大小
         executor.setKeepAliveSeconds(300); //线程最大空闲时间
         executor.setThreadNamePrefix("async-transactionExecutor-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(一共四种,此处省略)

+ 1 - 1
src/main/java/com/tmzn/devicelinkykc/taskQueue/runner/DeviceOnlineRunner.java

@@ -86,11 +86,11 @@ public class DeviceOnlineRunner {
             QueryWrapper<DeviceStatus> deviceStatusQueryWrapper = new QueryWrapper<>();
             deviceStatusQueryWrapper.eq("online_status", DeviceOnlineStatus.ONLINE);
             List<DeviceStatus> onlineList = deviceStatusService.list(deviceStatusQueryWrapper);
+            long nowTime = System.currentTimeMillis();
 
             onlineList.forEach(deviceStatus->{
                 try{
                     long updateTime = deviceStatus.getUpdateTime();
-                    long nowTime = System.currentTimeMillis();
                     //超过20秒没有更新端口消息的 置为已下线
                     if ((nowTime - updateTime) > GAP_TIME) {
                         logger.info("{}更新时间已超时,下线",deviceStatus.getPileCode());