liuf 3 hónapja
szülő
commit
2ff4d4631f

+ 15 - 15
src/main/java/com/tmzn/devicelinkykc/taskQueue/StartTask.java

@@ -123,20 +123,20 @@ public class StartTask {
     /**
      * todo:考虑压力问题
      */
-    private void getDevicePortDetail(){
-        logger.info("getDevicePortDetail>>>>>>>>>>>>>>>>>>>>>>>>");
-        List<DeviceStatus> list = deviceStatusService.list();
-        //双枪情况下,通过set集合减少循环次数,
-        Set<String> collect = list.stream().map(DeviceStatus::getDeviceImei).collect(Collectors.toSet());
-        collect.stream().forEach(imei->{
-            DataParam dataParam = new DataParam();
-            dataParam.setDeviceId(imei);
-            dataParam.setCcid(imei);
-            JSONObject object1 = new JSONObject();
-            dataParam.setData(object1);
-            dataParam.setType(OperEnum.PortDetail.getType());
-            msgService.sendMsg(dataParam);
-        });
-    }
+//    private void getDevicePortDetail(){
+//        logger.info("getDevicePortDetail>>>>>>>>>>>>>>>>>>>>>>>>");
+//        List<DeviceStatus> list = deviceStatusService.list();
+//        //双枪情况下,通过set集合减少循环次数,
+//        Set<String> collect = list.stream().map(DeviceStatus::getDeviceImei).collect(Collectors.toSet());
+//        collect.stream().forEach(imei->{
+//            DataParam dataParam = new DataParam();
+//            dataParam.setDeviceId(imei);
+//            dataParam.setCcid(imei);
+//            JSONObject object1 = new JSONObject();
+//            dataParam.setData(object1);
+//            dataParam.setType(OperEnum.PortDetail.getType());
+//            msgService.sendMsg(dataParam);
+//        });
+//    }
 }
 

+ 79 - 64
src/main/java/com/tmzn/devicelinkykc/taskQueue/runner/MsgCharngingRunner.java

@@ -35,6 +35,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 
@@ -78,31 +79,42 @@ public class MsgCharngingRunner {
     private static final BigDecimal zero = new BigDecimal("0");
 
 
+    // 使用AtomicBoolean替代基本的boolean
+    private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
     @Async("charngingTaskAsyncPool")
-    public void chargingMsg(Map<String, DeviceConnectionMsg> map) {
-        log.info("======充电中数据上报检测=====");
-        //这边需要给充电中的状态进行查询流水号,流水号是由云快充启动充电时的下发指令带来存库的,
-        if (map.size() < 1) {
-            return;
-        }
+    public void chargingMsg(Map<String, DeviceConnectionMsg> map)  {
+        try{
+            log.info("======充电中数据上报检测=====");
+            if (!isRunning.compareAndSet(false, true)) {
+                log.info("任务正在执行中,跳过本次处理");
+                return; // 如果任务已在运行,则直接返回
+            }
 
-        //查找所有充电中的订单
-        QueryWrapper<OrderStatus> queryWrapper = new QueryWrapper<>();
-        queryWrapper.eq("now_order_status", 0); //充电中状态上报
-
-        //忽略10天前的订单
-        long nowTime = System.currentTimeMillis();
-        long tm = nowTime-86400*10*1000;
-        //10天前的就算了
-        queryWrapper.gt("create_time",tm ); //充电中状态上报
-        queryWrapper.orderByDesc("id");
-
-        //查询所有充电中设备的最新的订单记录,来上报设备状态消息.........?????????????????
-        List<OrderStatus> list = orderStatusService.list(queryWrapper);
-        if (list.isEmpty()) {
-            log.info("无充电中订单上报");
-            return;
-        }
+//            Thread.sleep(20000);
+
+            //这边需要给充电中的状态进行查询流水号,流水号是由云快充启动充电时的下发指令带来存库的,
+            if (map.size() < 1) {
+                return;
+            }
+
+            //查找所有充电中的订单
+            QueryWrapper<OrderStatus> queryWrapper = new QueryWrapper<>();
+            queryWrapper.eq("now_order_status", 0); //充电中状态上报
+
+            //忽略10天前的订单
+            long nowTime = System.currentTimeMillis();
+            long tm = nowTime-86400*10*1000;
+            //10天前的就算了
+            queryWrapper.gt("create_time",tm ); //充电中状态上报
+            queryWrapper.orderByDesc("id");
+
+            //查询所有充电中设备的最新的订单记录,来上报设备状态消息.........?????????????????
+            List<OrderStatus> list = orderStatusService.list(queryWrapper);
+            if (list.isEmpty()) {
+                log.info("无充电中订单上报");
+                return;
+            }
 
 //        Set<String> devicePileCodes = map.keySet();
 //        QueryWrapper<BillingModel> billWapper = new QueryWrapper<>();
@@ -116,50 +128,53 @@ public class MsgCharngingRunner {
 //        Map<String, BillingModel> billingModelMap = billingModels.stream()
 //                .collect(Collectors.toMap(BillingModel::getPileCode, billingModel -> billingModel));
 
-        //只上传最近的订单
-        Map<String, Boolean> dealMap = new HashMap<>();
-        list.forEach(item -> {
-            try {
-                //检查设备是否在线
-                QueryWrapper<DeviceStatus> deviceStatusQueryWrapper = new QueryWrapper<>();
-                deviceStatusQueryWrapper.eq("pile_code", item.getPileCode());
-                deviceStatusQueryWrapper.eq("gun_port", item.getGunsCode());
+            //只上传最近的订单
+            Map<String, Boolean> dealMap = new HashMap<>();
+            list.forEach(item -> {
+                try {
+                    //检查设备是否在线
+                    QueryWrapper<DeviceStatus> deviceStatusQueryWrapper = new QueryWrapper<>();
+                    deviceStatusQueryWrapper.eq("pile_code", item.getPileCode());
+                    deviceStatusQueryWrapper.eq("gun_port", item.getGunsCode());
 //                deviceStatusQueryWrapper.last("limit 1");
-                DeviceStatus statusServiceOne = deviceStatusService.getOne(deviceStatusQueryWrapper);
-                if(statusServiceOne != null && statusServiceOne.getOnlineStatus()==StatusConstant.OFFLINE){
-                    log.info("{}设备已离线不上报",item.getPileCode());
-                    return;
-                }
-
-                QueryWrapper<BillingModel> billWapper = new QueryWrapper<>();
-                billWapper.eq("pile_code", item.getPileCode());
-                billWapper.eq("device_imei", item.getDeviceImei());
-                BillingModel b = billingModelService.getOne(billWapper);
-                if (b == null) {
-                    log.info("{}上报充电中未匹配计费模型", item.getPileCode());
-                    return;
-                }
-
-                if(!deviceMsgHandle.checkConnection(item.getPileCode(),item.getDeviceImei())){
-                    log.info("{}重新连接句柄不存在等待下次执行",item.getPileCode());
-                    return;
-                }
-
-                DeviceConnectionMsg deviceConnectionMsg = map.get(item.getPileCode());
-
-                String k = item.getPileCode() + "_" + item.getGunsCode();
-                if (dealMap.containsKey(k)) {
-                    log.info("{}本轮已上报充电中只报最后一个订单", k);
-                } else {
-                    reportOne(item, b, deviceConnectionMsg);
-                    dealMap.put(k, true);
+                    DeviceStatus statusServiceOne = deviceStatusService.getOne(deviceStatusQueryWrapper);
+                    if(statusServiceOne != null && statusServiceOne.getOnlineStatus()==StatusConstant.OFFLINE){
+                        log.info("{}设备已离线不上报",item.getPileCode());
+                        return;
+                    }
+
+                    QueryWrapper<BillingModel> billWapper = new QueryWrapper<>();
+                    billWapper.eq("pile_code", item.getPileCode());
+                    billWapper.eq("device_imei", item.getDeviceImei());
+                    BillingModel b = billingModelService.getOne(billWapper);
+                    if (b == null) {
+                        log.info("{}上报充电中未匹配计费模型", item.getPileCode());
+                        return;
+                    }
+
+                    if(!deviceMsgHandle.checkConnection(item.getPileCode(),item.getDeviceImei())){
+                        log.info("{}重新连接句柄不存在等待下次执行",item.getPileCode());
+                        return;
+                    }
+
+                    DeviceConnectionMsg deviceConnectionMsg = map.get(item.getPileCode());
+
+                    String k = item.getPileCode() + "_" + item.getGunsCode();
+                    if (dealMap.containsKey(k)) {
+                        log.info("{}本轮已上报充电中只报最后一个订单", k);
+                    } else {
+                        reportOne(item, b, deviceConnectionMsg);
+                        dealMap.put(k, true);
+                    }
+                } catch (Exception e) {
+                    log.info("{}上报充电中异常", item.getPileCode()+e.getMessage());
+                    e.printStackTrace();
                 }
-            } catch (Exception e) {
-                log.info("{}上报充电中异常", item.getPileCode()+e.getMessage());
-                e.printStackTrace();
-            }
-        });
+            });
 
+        } finally {
+            isRunning.set(false);
+        }
 
         log.info("======Charging status push task ending=====");
     }

+ 26 - 13
src/main/java/com/tmzn/devicelinkykc/taskQueue/runner/MsgHeartRunner.java

@@ -20,6 +20,7 @@ import org.springframework.stereotype.Component;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 
 @Component
@@ -43,25 +44,37 @@ public class MsgHeartRunner {
 
 
 
+    // 使用AtomicBoolean替代基本的boolean
+    private final AtomicBoolean isRunning = new AtomicBoolean(false);
+
     @Async("heartTaskAsyncPool")
     public void heartMsg(Map<String, DeviceConnectionMsg> map) throws Exception {
         log.info("======Heart beat task starting=====");
         //todo return
         //任务处理
+        if (!isRunning.compareAndSet(false, true)) {
+            log.info("任务正在执行中,跳过本次处理");
+            return; // 如果任务已在运行,则直接返回
+        }
+
         //map拿出来进行心跳包上报,1:直接检查心跳时间是否大于三十秒
-        Set<String> devicePileCodes = map.keySet();
-        log.info("heart.deviceConnectionSize>>"+devicePileCodes.size());
-        devicePileCodes.stream().forEach(devicePileCode -> {
-            try{
-                DeviceConnectionMsg deviceConnectionMsg = map.get(devicePileCode);
-                handlePort(1,devicePileCode,deviceConnectionMsg);
-                handlePort(2,devicePileCode,deviceConnectionMsg);
-            }catch (Exception e){
-                log.info("{}Heart beat task exception"+e.toString(),devicePileCode);
-                e.printStackTrace();
-            }
-        });
-        log.info("======Heart beat task ending=====");
+      try{
+          Set<String> devicePileCodes = map.keySet();
+          log.info("heart.deviceConnectionSize>>"+devicePileCodes.size());
+          devicePileCodes.stream().forEach(devicePileCode -> {
+              try{
+                  DeviceConnectionMsg deviceConnectionMsg = map.get(devicePileCode);
+                  handlePort(1,devicePileCode,deviceConnectionMsg);
+                  handlePort(2,devicePileCode,deviceConnectionMsg);
+              }catch (Exception e){
+                  log.info("{}Heart beat task exception"+e.toString(),devicePileCode);
+                  e.printStackTrace();
+              }
+          });
+      }finally {
+          isRunning.set(false);
+          log.info("======Heart beat task ending=====");
+      }
     }
 
     public void handlePort(int portId,String devicePileCode,DeviceConnectionMsg deviceConnectionMsg){