瀏覽代碼

[功能修复]修复runner

liuf 1 年之前
父節點
當前提交
675c9bf0e0

+ 1 - 0
src/main/java/com/tmzn/devicelinkykc/constant/RedisConstant.java

@@ -20,6 +20,7 @@ public class RedisConstant {
 
     //全部设备的imei
     public static final String DEVICE_INFO="deviceInfoYKC";
+    public static final String DEVICE_IMEI_PILE_MAP="device_imei_pile_map";
 
     //充电中设备的实时计费信息
     public static final String DEVICE_CHARNGING_INFO="deviceCharngingInfo";

+ 1 - 1
src/main/java/com/tmzn/devicelinkykc/message/DeviceMsgHandle.java

@@ -829,7 +829,7 @@ public class DeviceMsgHandle {
                 }
             }
         }
-        logger.info("{}设备原状态>>>" + statusServiceOneTemp.toString(),imei);
+//        logger.info("{}设备原状态>>>" + statusServiceOneTemp.toString(),imei);
         statusServiceOne.setGunStatus(gunsStatus);
         statusServiceOne.setInsertGunStatus(insertGunStatus);
         statusServiceOne.setOnlineStatus(DeviceOnlineStatus.ONLINE);

+ 3 - 97
src/main/java/com/tmzn/devicelinkykc/taskQueue/DeviceOnlineTask.java

@@ -18,6 +18,7 @@ import com.tmzn.devicelinkykc.service.DeviceService;
 import com.tmzn.devicelinkykc.service.DeviceStatusService;
 import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg;
 import com.tmzn.devicelinkykc.socket.SocketHandle;
+import com.tmzn.devicelinkykc.taskQueue.runner.DeviceOnlineRunner;
 import com.tmzn.devicelinkykc.transdata.entity.DeviceParam;
 import com.tmzn.devicelinkykc.transdata.entity.opertype.OperEnum;
 import org.slf4j.Logger;
@@ -46,25 +47,11 @@ import java.util.stream.Collectors;
 @Component
 public class DeviceOnlineTask {
 
-    private static final Logger logger = LoggerFactory.getLogger(DeviceOnlineTask.class);
-    @Autowired
-    private DeviceStatusService deviceStatusService;
-    @Autowired
-    private DeviceService deviceService;
-    @Autowired
-    private SocketHandle socketHandle;
-    @Autowired
-    private RedisCache redisCache;
-    @Autowired
-    private MsgService msgService;
-    @Autowired
-    private DeviceControlerService deviceControlerService;
 
     @Autowired
-    private LoginFrame loginFrame;
+    DeviceOnlineRunner deviceOnlineRunner;
 
     //设备的状态更新间隔时间
-    private static final long gapTime = 20 * 60 * 1000;
 
     public void start() {
         ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(1);
@@ -72,7 +59,7 @@ public class DeviceOnlineTask {
             @Override
             public void run() {
                 try {
-                    resetTheConnection();
+                    deviceOnlineRunner.resetTheConnection();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -82,88 +69,7 @@ public class DeviceOnlineTask {
 
     }
 
-    /**
-     * 获取设备状态
-     */
-    private void getDevicePortDetail() {
-        logger.info("getDevicePortDetail>>>>>>>>>>>>>>>>>>>>>>>>");
-        List<DeviceStatus> list = deviceStatusService.list();
-        //双枪情况下,通过set集合减少循环次数,
-        Set<String> collect = list.stream().map(DeviceStatus::getDeviceImei).collect(Collectors.toSet());
-        collect.stream().forEach(imei -> {
-            DeviceParam dataParam = new DeviceParam();
-            dataParam.setDeviceId(imei);
-            dataParam.setCcid(imei);
-            deviceControlerService.sendPortDetailCmd(dataParam);
-        });
-    }
-
-    public void resetTheConnection() {
-        try {
-            //差正常的设备,禁用设备不管
-            logger.info("设备连接检查");
-            QueryWrapper<Device> deviceQueryWrapper = new QueryWrapper<>();
-            deviceQueryWrapper.eq("disabled", DeviceOnlineStatus.NORMAL);
-            List<Device> list = deviceService.list(deviceQueryWrapper);
-            logger.info("查询到设备" + list.size());
-            //云快充设备缓存过滤订阅消息
-            Set<String> imeiList = list.stream().map(device ->
-                    device.getDeviceImei()
-            ).collect(Collectors.toSet());
-            //redisCache.setCacheObject(RedisConstant.DEVICE_INFO, imeiList, 10 * 1000 * 60, TimeUnit.MILLISECONDS);
-            redisCache.setCacheObject(RedisConstant.DEVICE_INFO, imeiList);
 
 
-            QueryWrapper<DeviceStatus> deviceStatusQueryWrapper = new QueryWrapper<>();
-            deviceStatusQueryWrapper.eq("online_status", DeviceOnlineStatus.ONLINE);
-            List<DeviceStatus> onlineList = deviceStatusService.list(deviceStatusQueryWrapper);
 
-            List<DeviceStatus> newOnlineList = onlineList.stream().map(deviceStatus -> {
-                long updateTime = deviceStatus.getUpdateTime();
-                long nowTime = System.currentTimeMillis();
-                if ((nowTime - updateTime) > gapTime) {
-                    if (socketHandle.existDeviceConnection(deviceStatus.getPileCode())) {
-                        redisCache.deleteObject(RedisConstant.KEYS + deviceStatus.getPileCode());
-                        socketHandle.removeDeviceConnection(deviceStatus.getPileCode());
-                    }
-                    deviceStatus.setOnlineStatus(DeviceOnlineStatus.OFFLINE);
-                    //deviceStatus.setGunStatus(StatusConstant.OFFLINE);
-                }
-                return deviceStatus;
-            }).collect(Collectors.toList());
-
-            if (redisCache.hasKey(RedisConstant.DEVICE_INFO)) {
-                Set<String> imeis = redisCache.getCacheObject(RedisConstant.DEVICE_INFO);
-                logger.info("redis-imei>>>" + imeis.size());
-            }
-            logger.info("dataBase device status change:" + newOnlineList.size());
-            deviceStatusService.updateBatchById(newOnlineList);
-
-            Map<String, Device> collect = list.stream().collect(Collectors.toMap(device -> device.getDeviceImei(), device -> device, (exis, repl) -> exis));
-
-            newOnlineList.stream().forEach(deviceStatus -> {
-                //logger.info(" newOnlineList.stream().forEach");
-                if (!socketHandle.existDeviceConnection(deviceStatus.getPileCode())) {
-                    try {
-                        Device device = collect.get(deviceStatus.getDeviceImei());
-                        socketHandle.addDeviceConnection(device.getIp(), device.getPort(), deviceStatus.getPileCode(), deviceStatus.getDeviceImei(), deviceStatus.getDeviceSn(), device.getCommProtocolVer());
-                    } catch (IOException e) {
-                        e.printStackTrace();
-                    }
-                }
-                DeviceConnectionMsg deviceConnection = socketHandle.getDeviceConnection(deviceStatus.getPileCode());
-                //logger.info("deviceConnection");
-                if (deviceConnection.getLoginStatus() == Constant.DEVICE_NOT_LOGIN_STATUS && deviceStatus.getOnlineStatus() == DeviceOnlineStatus.ONLINE) {
-                    //logger.info("DEVICE_NOT_LOGIN_STATUS");
-                    List<Device> collect1 = list.stream().filter(device -> device.getPileCode().equals(deviceConnection.getDeviceId())).collect(Collectors.toList());
-                    if (collect1.size() > 0) {
-                        logger.info("设备连接检查login>>>" + collect1.get(0).getDeviceSn());
-                        loginFrame.loginMsgSend(deviceConnection, collect1.get(0));
-                    }
-                }
-            });
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
 }

+ 152 - 0
src/main/java/com/tmzn/devicelinkykc/taskQueue/runner/DeviceOnlineRunner.java

@@ -0,0 +1,152 @@
+package com.tmzn.devicelinkykc.taskQueue.runner;
+
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.tmzn.devicelinkykc.constant.Constant;
+import com.tmzn.devicelinkykc.constant.DeviceOnlineStatus;
+import com.tmzn.devicelinkykc.constant.PortStatusConstant;
+import com.tmzn.devicelinkykc.constant.RedisConstant;
+import com.tmzn.devicelinkykc.constant.ykc.StatusConstant;
+import com.tmzn.devicelinkykc.entity.Device;
+import com.tmzn.devicelinkykc.entity.DeviceStatus;
+import com.tmzn.devicelinkykc.frameMsg.FrameDataSplicing;
+import com.tmzn.devicelinkykc.frameMsg.frameType.LoginFrame;
+import com.tmzn.devicelinkykc.frameMsg.frameType.RealTimeStatusPushFrame;
+import com.tmzn.devicelinkykc.openfeign.MsgService;
+import com.tmzn.devicelinkykc.redis.RedisCache;
+import com.tmzn.devicelinkykc.service.DeviceControlerService;
+import com.tmzn.devicelinkykc.service.DeviceService;
+import com.tmzn.devicelinkykc.service.DeviceStatusService;
+import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg;
+import com.tmzn.devicelinkykc.socket.SocketHandle;
+import com.tmzn.devicelinkykc.taskQueue.DeviceOnlineTask;
+import com.tmzn.devicelinkykc.transdata.entity.DeviceParam;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+
+@Component
+@Slf4j(topic = "MsgHeartRunner")
+public class DeviceOnlineRunner {
+
+    private static final Logger logger = LoggerFactory.getLogger(DeviceOnlineTask.class);
+    @Autowired
+    private DeviceStatusService deviceStatusService;
+    @Autowired
+    private DeviceService deviceService;
+    @Autowired
+    private SocketHandle socketHandle;
+    @Autowired
+    private RedisCache redisCache;
+    @Autowired
+    private MsgService msgService;
+    @Autowired
+    private DeviceControlerService deviceControlerService;
+
+    @Autowired
+    private LoginFrame loginFrame;
+
+
+    private static final BigDecimal zero = new BigDecimal("0");
+    //设备的状态更新间隔时间
+    private static final long GAP_TIME = 20 * 60 * 1000;
+
+    public void resetTheConnection() {
+        try {
+            //差正常的设备,禁用设备不管
+            logger.info("设备连接检查");
+            QueryWrapper<Device> deviceQueryWrapper = new QueryWrapper<>();
+            deviceQueryWrapper.eq("disabled", DeviceOnlineStatus.NORMAL);
+            List<Device> list = deviceService.list(deviceQueryWrapper);
+            logger.info("查询到设备" + list.size());
+            //云快充设备缓存过滤订阅消息
+            Set<String> imeiList = list.stream().map(device->{
+                      //缓存一下设备的序列号
+                      redisCache.setCacheMapValue(RedisConstant.DEVICE_IMEI_PILE_MAP,device.getDeviceImei(),device.getPileCode());
+                      return  device.getDeviceImei();
+                    }
+            ).collect(Collectors.toSet());
+
+            redisCache.setCacheObject(RedisConstant.DEVICE_INFO, imeiList);
+
+
+            QueryWrapper<DeviceStatus> deviceStatusQueryWrapper = new QueryWrapper<>();
+            deviceStatusQueryWrapper.eq("online_status", DeviceOnlineStatus.ONLINE);
+            List<DeviceStatus> onlineList = deviceStatusService.list(deviceStatusQueryWrapper);
+
+            List<DeviceStatus> newOnlineList = onlineList.stream().map(deviceStatus -> {
+                long updateTime = deviceStatus.getUpdateTime();
+                long nowTime = System.currentTimeMillis();
+                if ((nowTime - updateTime) > GAP_TIME) {
+                    if (socketHandle.existDeviceConnection(deviceStatus.getPileCode())) {
+                        redisCache.deleteObject(RedisConstant.KEYS + deviceStatus.getPileCode());
+                        socketHandle.removeDeviceConnection(deviceStatus.getPileCode());
+                    }
+                    deviceStatus.setOnlineStatus(DeviceOnlineStatus.OFFLINE);
+                    //deviceStatus.setGunStatus(StatusConstant.OFFLINE);
+                }
+                return deviceStatus;
+            }).collect(Collectors.toList());
+
+            if (redisCache.hasKey(RedisConstant.DEVICE_INFO)) {
+                Set<String> imeis = redisCache.getCacheObject(RedisConstant.DEVICE_INFO);
+                logger.info("redis-imei>>>" + imeis.size());
+            }
+            logger.info("dataBase device status change:" + newOnlineList.size());
+            deviceStatusService.updateBatchById(newOnlineList);
+
+            Map<String, Device> collect = list.stream().collect(Collectors.toMap(device -> device.getDeviceImei(), device -> device, (exis, repl) -> exis));
+
+            newOnlineList.stream().forEach(deviceStatus -> {
+                //logger.info(" newOnlineList.stream().forEach");
+                if (!socketHandle.existDeviceConnection(deviceStatus.getPileCode())) {
+                    try {
+                        Device device = collect.get(deviceStatus.getDeviceImei());
+                        socketHandle.addDeviceConnection(device.getIp(), device.getPort(), deviceStatus.getPileCode(), deviceStatus.getDeviceImei(), deviceStatus.getDeviceSn(), device.getCommProtocolVer());
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+                DeviceConnectionMsg deviceConnection = socketHandle.getDeviceConnection(deviceStatus.getPileCode());
+                //logger.info("deviceConnection");
+                if (deviceConnection.getLoginStatus() == Constant.DEVICE_NOT_LOGIN_STATUS && deviceStatus.getOnlineStatus() == DeviceOnlineStatus.ONLINE) {
+                    //logger.info("DEVICE_NOT_LOGIN_STATUS");
+                    List<Device> collect1 = list.stream().filter(device -> device.getPileCode().equals(deviceConnection.getDeviceId())).collect(Collectors.toList());
+                    if (collect1.size() > 0) {
+                        logger.info("设备连接检查login>>>" + collect1.get(0).getDeviceSn());
+                        loginFrame.loginMsgSend(deviceConnection, collect1.get(0));
+                    }
+                }
+            });
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 获取设备状态
+     */
+    private void getDevicePortDetail() {
+        logger.info("getDevicePortDetail>>>>>>>>>>>>>>>>>>>>>>>>");
+        List<DeviceStatus> list = deviceStatusService.list();
+        //双枪情况下,通过set集合减少循环次数,
+        Set<String> collect = list.stream().map(DeviceStatus::getDeviceImei).collect(Collectors.toSet());
+        collect.stream().forEach(imei -> {
+            DeviceParam dataParam = new DeviceParam();
+            dataParam.setDeviceId(imei);
+            dataParam.setCcid(imei);
+            deviceControlerService.sendPortDetailCmd(dataParam);
+        });
+    }
+
+}