package com.tmzn.devicelinkykc.taskQueue.runner; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.tmzn.devicelinkykc.constant.Constant; import com.tmzn.devicelinkykc.constant.DeviceOnlineStatus; import com.tmzn.devicelinkykc.constant.RedisConstant; import com.tmzn.devicelinkykc.entity.Device; import com.tmzn.devicelinkykc.entity.DeviceStatus; import com.tmzn.devicelinkykc.frameMsg.frameType.HeartFrameSend; import com.tmzn.devicelinkykc.frameMsg.frameType.LoginFrame; import com.tmzn.devicelinkykc.redis.RedisCache; import com.tmzn.devicelinkykc.service.DeviceService; import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg; import com.tmzn.devicelinkykc.socket.SocketHandle; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @Component @Slf4j(topic = "MsgHeartRunner") public class MsgHeartRunner { @Autowired private DeviceService deviceService; @Autowired private LoginFrame loginFrame; @Autowired private RedisCache redisCache; @Autowired private HeartFrameSend heartFrameSend; @Autowired private SocketHandle socketHandle; @Async("heartTaskAsyncPool") public void heartMsg(Map map) throws Exception { log.info("======Heart beat task starting====="); //todo return //任务处理 //map拿出来进行心跳包上报,1:直接检查心跳时间是否大于三十秒 Set devicePileCodes = map.keySet(); log.info("heart.deviceConnectionSize>>"+devicePileCodes.size()); devicePileCodes.stream().forEach(devicePileCode -> { try{ DeviceConnectionMsg deviceConnectionMsg = map.get(devicePileCode); handlePort(1,devicePileCode,deviceConnectionMsg); handlePort(2,devicePileCode,deviceConnectionMsg); }catch (Exception e){ log.info("{}Heart beat task exception"+e.toString(),devicePileCode); e.printStackTrace(); } }); log.info("======Heart beat task ending====="); } public void handlePort(int portId,String devicePileCode,DeviceConnectionMsg deviceConnectionMsg){ Long heartTime = deviceConnectionMsg.getHeartTime(); String portStatusCacheKey = portId==1?RedisConstant.ONLINE_DEVICE_ONE:RedisConstant.ONLINE_DEVICE_TWO; //1:当上一次的心跳和本次的心跳相差大于30秒证明已经3次没收到心跳回复了,进行重新登录 //2:设备第一次上线时未触发过登录,设备连接集合中初始话的心跳时间还是0,符合该判断,进行登录 if (deviceConnectionMsg.getLoginStatus()== Constant.DEVICE_NOT_LOGIN_STATUS) { //刚登录5秒内不发送心跳 避免重复发送LoginMsg if ((System.currentTimeMillis() - deviceConnectionMsg.getLoginTime()) < 8 * 1000) { return; } if(!deviceConnectionMsg.getLoginMsgSend()){ QueryWrapper deviceQueryWrapper = new QueryWrapper<>(); deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL); Device device = deviceService.getOne(deviceQueryWrapper); if(device!=null){ loginFrame.loginMsgSend(deviceConnectionMsg,device); } } } DeviceStatus oneDs = redisCache.getCacheMapValue(portStatusCacheKey, devicePileCode); if(oneDs==null){ // log.info("not found ones:{}",devicePileCode); return; } boolean res = heartFrameSend.heartSend(deviceConnectionMsg, oneDs); if(res){ return; } // if (deviceConnectionMsg.getLoginStatus()== Constant.DEVICE_NOT_LOGIN_STATUS) { // if (oneDs != null && oneDs.getOnlineStatus() == DeviceOnlineStatus.ONLINE && (System.currentTimeMillis() - heartTime) > 50 * 1000L) { // QueryWrapper deviceQueryWrapper = new QueryWrapper<>(); // deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL); // Device device = deviceService.getOne(deviceQueryWrapper); // if(device!=null){ // // loginFrame.loginMsgSend(deviceConnectionMsg,device); // } // log.info("heartTaskAsyncPool-1>not longin and heart normal>>" + devicePileCode); // return; // } // } if (oneDs.getOnlineStatus() == DeviceOnlineStatus.ONLINE && (System.currentTimeMillis() - heartTime) > 50 * 1000L) { log.info("pileCode:" + devicePileCode + " loging... ..." + (System.currentTimeMillis() - heartTime)); int a = 0; if (redisCache.hasKey(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode)) { //如果登录次数30秒内登三次就算断开 a = redisCache.getCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode); if (a > 3) { log.info("30s no heart {} reset",devicePileCode); socketHandle.removeDeviceConnection(devicePileCode); } else { // QueryWrapper deviceQueryWrapper = new QueryWrapper<>(); // deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL); // Device device = deviceService.getOne(deviceQueryWrapper); // if(device!=null){ //// loginFrame.loginMsgSend(deviceConnectionMsg, device); // socketHandle.removeDeviceConnection(devicePileCode); // } } a++; redisCache.setCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode, a,32 * 1000, TimeUnit.MILLISECONDS); } else { redisCache.setCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode, a, 32 * 1000, TimeUnit.MILLISECONDS); } } } }