| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- package com.tmzn.devicelinkykc.taskQueue.runner;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.tmzn.devicelinkykc.constant.Constant;
- import com.tmzn.devicelinkykc.constant.DeviceOnlineStatus;
- import com.tmzn.devicelinkykc.constant.RedisConstant;
- import com.tmzn.devicelinkykc.entity.Device;
- import com.tmzn.devicelinkykc.entity.DeviceStatus;
- import com.tmzn.devicelinkykc.frameMsg.frameType.HeartFrameSend;
- import com.tmzn.devicelinkykc.frameMsg.frameType.LoginFrame;
- import com.tmzn.devicelinkykc.redis.RedisCache;
- import com.tmzn.devicelinkykc.service.DeviceService;
- import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg;
- import com.tmzn.devicelinkykc.socket.SocketHandle;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.TimeUnit;
- @Component
- @Slf4j(topic = "MsgHeartRunner")
- public class MsgHeartRunner {
- @Autowired
- private DeviceService deviceService;
- @Autowired
- private LoginFrame loginFrame;
- @Autowired
- private RedisCache redisCache;
- @Autowired
- private HeartFrameSend heartFrameSend;
- @Autowired
- private SocketHandle socketHandle;
- @Async("heartTaskAsyncPool")
- public void heartMsg(Map<String, DeviceConnectionMsg> map) throws Exception {
- log.info("======Heart beat task starting=====");
- //todo return
- //任务处理
- //map拿出来进行心跳包上报,1:直接检查心跳时间是否大于三十秒
- Set<String> devicePileCodes = map.keySet();
- log.info("heart.deviceConnectionSize>>"+devicePileCodes.size());
- devicePileCodes.stream().forEach(devicePileCode -> {
- try{
- DeviceConnectionMsg deviceConnectionMsg = map.get(devicePileCode);
- handlePort(1,devicePileCode,deviceConnectionMsg);
- handlePort(2,devicePileCode,deviceConnectionMsg);
- }catch (Exception e){
- log.info("{}Heart beat task exception"+e.toString(),devicePileCode);
- e.printStackTrace();
- }
- });
- log.info("======Heart beat task ending=====");
- }
- public void handlePort(int portId,String devicePileCode,DeviceConnectionMsg deviceConnectionMsg){
- Long heartTime = deviceConnectionMsg.getHeartTime();
- String portStatusCacheKey = portId==1?RedisConstant.ONLINE_DEVICE_ONE:RedisConstant.ONLINE_DEVICE_TWO;
- //1:当上一次的心跳和本次的心跳相差大于30秒证明已经3次没收到心跳回复了,进行重新登录
- //2:设备第一次上线时未触发过登录,设备连接集合中初始话的心跳时间还是0,符合该判断,进行登录
- if (deviceConnectionMsg.getLoginStatus()== Constant.DEVICE_NOT_LOGIN_STATUS) {
- //刚登录5秒内不发送心跳 避免重复发送LoginMsg
- if ((System.currentTimeMillis() - deviceConnectionMsg.getLoginTime()) < 8 * 1000) {
- return;
- }
- if(!deviceConnectionMsg.getLoginMsgSend()){
- QueryWrapper<Device> deviceQueryWrapper = new QueryWrapper<>();
- deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL);
- Device device = deviceService.getOne(deviceQueryWrapper);
- if(device!=null){
- loginFrame.loginMsgSend(deviceConnectionMsg,device);
- }
- }
- }
- DeviceStatus oneDs = redisCache.getCacheMapValue(portStatusCacheKey, devicePileCode);
- if(oneDs==null){
- // log.info("not found ones:{}",devicePileCode);
- return;
- }
- boolean res = heartFrameSend.heartSend(deviceConnectionMsg, oneDs);
- if(res){
- return;
- }
- // if (deviceConnectionMsg.getLoginStatus()== Constant.DEVICE_NOT_LOGIN_STATUS) {
- // if (oneDs != null && oneDs.getOnlineStatus() == DeviceOnlineStatus.ONLINE && (System.currentTimeMillis() - heartTime) > 50 * 1000L) {
- // QueryWrapper<Device> deviceQueryWrapper = new QueryWrapper<>();
- // deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL);
- // Device device = deviceService.getOne(deviceQueryWrapper);
- // if(device!=null){
- // // loginFrame.loginMsgSend(deviceConnectionMsg,device);
- // }
- // log.info("heartTaskAsyncPool-1>not longin and heart normal>>" + devicePileCode);
- // return;
- // }
- // }
- if (oneDs.getOnlineStatus() == DeviceOnlineStatus.ONLINE && (System.currentTimeMillis() - heartTime) > 50 * 1000L) {
- log.info("pileCode:" + devicePileCode + " loging... ..." + (System.currentTimeMillis() - heartTime));
- int a = 0;
- if (redisCache.hasKey(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode)) {
- //如果登录次数30秒内登三次就算断开
- a = redisCache.getCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode);
- if (a > 3) {
- log.info("30s no heart {} reset",devicePileCode);
- socketHandle.removeDeviceConnection(devicePileCode);
- } else {
- // QueryWrapper<Device> deviceQueryWrapper = new QueryWrapper<>();
- // deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL);
- // Device device = deviceService.getOne(deviceQueryWrapper);
- // if(device!=null){
- //// loginFrame.loginMsgSend(deviceConnectionMsg, device);
- // socketHandle.removeDeviceConnection(devicePileCode);
- // }
- }
- a++;
- redisCache.setCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode, a,32 * 1000, TimeUnit.MILLISECONDS);
- } else {
- redisCache.setCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode, a, 32 * 1000, TimeUnit.MILLISECONDS);
- }
- }
- }
- }
|