package com.tmzn.devicelinkykc.taskQueue.runner; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.tmzn.devicelinkykc.constant.Constant; import com.tmzn.devicelinkykc.constant.DeviceOnlineStatus; import com.tmzn.devicelinkykc.constant.PortStatusConstant; import com.tmzn.devicelinkykc.constant.RedisConstant; import com.tmzn.devicelinkykc.constant.ykc.StatusConstant; import com.tmzn.devicelinkykc.entity.Device; import com.tmzn.devicelinkykc.entity.DeviceStatus; import com.tmzn.devicelinkykc.entity.OrderStatus; import com.tmzn.devicelinkykc.frameMsg.FrameDataSplicing; import com.tmzn.devicelinkykc.frameMsg.frameType.LoginFrame; import com.tmzn.devicelinkykc.frameMsg.frameType.RealTimeStatusPushFrame; import com.tmzn.devicelinkykc.openfeign.MsgService; import com.tmzn.devicelinkykc.redis.RedisCache; import com.tmzn.devicelinkykc.service.DeviceControlerService; import com.tmzn.devicelinkykc.service.DeviceService; import com.tmzn.devicelinkykc.service.DeviceStatusService; import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg; import com.tmzn.devicelinkykc.socket.SocketHandle; import com.tmzn.devicelinkykc.taskQueue.DeviceOnlineTask; import com.tmzn.devicelinkykc.transdata.entity.DeviceParam; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.io.IOException; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Component @Slf4j(topic = "MsgHeartRunner") public class DeviceOnlineRunner { private static final Logger logger = LoggerFactory.getLogger(DeviceOnlineTask.class); @Autowired private DeviceStatusService deviceStatusService; @Autowired private DeviceService deviceService; @Autowired private SocketHandle socketHandle; @Autowired private RedisCache redisCache; @Autowired private MsgService msgService; @Autowired private DeviceControlerService deviceControlerService; @Autowired private LoginFrame loginFrame; private static final BigDecimal zero = new BigDecimal("0"); //设备的状态更新间隔时间 private static final long GAP_TIME = 20 * 60 * 1000; public void resetTheConnection() { try { //差正常的设备,禁用设备不管 logger.info("设备连接检查"); QueryWrapper deviceQueryWrapper = new QueryWrapper<>(); deviceQueryWrapper.eq("disabled", DeviceOnlineStatus.NORMAL); List list = deviceService.list(deviceQueryWrapper); logger.info("查询到设备" + list.size()); Set imeiList = new HashSet<>(); Map deviceMap = new HashMap<>(); list.forEach(device->{ imeiList.add(device.getDeviceImei()); redisCache.setCacheMapValue(RedisConstant.DEVICE_IMEI_PILE_MAP,device.getDeviceImei(),device.getPileCode()); deviceMap.put(device.getDeviceImei(),device); }); redisCache.setCacheObject(RedisConstant.DEVICE_INFO, imeiList); QueryWrapper deviceStatusQueryWrapper = new QueryWrapper<>(); deviceStatusQueryWrapper.eq("online_status", DeviceOnlineStatus.ONLINE); List onlineList = deviceStatusService.list(deviceStatusQueryWrapper); long nowTime = System.currentTimeMillis(); onlineList.forEach(deviceStatus->{ try{ long updateTime = deviceStatus.getUpdateTime(); //超过20秒没有更新端口消息的 置为已下线 if ((nowTime - updateTime) > GAP_TIME) { logger.info("{}更新时间已超时,下线",deviceStatus.getPileCode()); if (socketHandle.existDeviceConnection(deviceStatus.getPileCode())) { // redisCache.deleteObject(RedisConstant.KEYS + deviceStatus.getPileCode()); redisCache.deleteCacheMapValue(RedisConstant.YKC_KEY_MAP,deviceStatus.getPileCode()); socketHandle.removeDeviceConnection(deviceStatus.getPileCode()); } deviceStatus.setOnlineStatus(DeviceOnlineStatus.OFFLINE); deviceStatusService.updateById(deviceStatus); try103(deviceStatus); return; } Device device = deviceMap.get(deviceStatus.getDeviceImei()); if(device==null){ return; } boolean needLogin = false; if (!socketHandle.existDeviceConnection(deviceStatus.getPileCode())) { logger.info("链接不存在{}",device.getPileCode()); needLogin = true; }else{ DeviceConnectionMsg deviceConnection = socketHandle.getDeviceConnection(deviceStatus.getPileCode()); if (!deviceConnection.getSocket().isConnected()) { logger.info("链接已失效重新登录{}",device.getPileCode()); socketHandle.removeDeviceConnection(deviceStatus.getPileCode()); needLogin = true; } if(deviceConnection.getLoginStatus()== 0 && deviceConnection.getLoginTime()>0 && (System.currentTimeMillis()-deviceConnection.getLoginTime())>60*1000){ logger.info("未应答重新登录{}",device.getPileCode()); needLogin = true; } } if(!needLogin){ return ; } logger.info("桩检测需要自动上线{}",device.getPileCode()); socketHandle.addDeviceConnection(device.getIp(),device.getPort(),device.getPileCode(), device.getDeviceImei(), device.getDeviceSn(),device.getCommProtocolVer(),device.getPileType()); try{ Thread.sleep(300); loginFrame.loginMsgSend(socketHandle.getDeviceConnection(device.getPileCode()), device); }catch (Exception e){ log.info("{}发送登录消息失败"+e.getMessage(),device.getPileCode()); } // deviceControlerService.sendImeiDetail(device.getDeviceImei()); try103(deviceStatus); }catch (Exception e){ log.info("{}检测异常"+e.getMessage(),deviceStatus.getPileCode()); } }); } catch (Exception e) { e.printStackTrace(); } } public void try103(DeviceStatus deviceStatus){ String key = "try103:_"+deviceStatus.getDeviceImei(); if(redisCache.hasKey(key)){ log.info("{}-{}忽略下发103", deviceStatus.getPileCode(), deviceStatus.getGunPort()); return; } log.info("{}-{}-2分钟内无状态下发一次103", deviceStatus.getPileCode(), deviceStatus.getGunPort()); deviceControlerService.sendImeiDetail(deviceStatus.getDeviceImei()); redisCache.setCacheObject(key, System.currentTimeMillis(),120, TimeUnit.SECONDS); } }