MsgHeartRunner.java 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package com.tmzn.devicelinkykc.taskQueue.runner;
  2. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  3. import com.tmzn.devicelinkykc.constant.Constant;
  4. import com.tmzn.devicelinkykc.constant.DeviceOnlineStatus;
  5. import com.tmzn.devicelinkykc.constant.RedisConstant;
  6. import com.tmzn.devicelinkykc.entity.Device;
  7. import com.tmzn.devicelinkykc.entity.DeviceStatus;
  8. import com.tmzn.devicelinkykc.frameMsg.frameType.HeartFrameSend;
  9. import com.tmzn.devicelinkykc.frameMsg.frameType.LoginFrame;
  10. import com.tmzn.devicelinkykc.redis.RedisCache;
  11. import com.tmzn.devicelinkykc.service.DeviceService;
  12. import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg;
  13. import com.tmzn.devicelinkykc.socket.SocketHandle;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.springframework.beans.factory.annotation.Autowired;
  16. import org.springframework.scheduling.annotation.Async;
  17. import org.springframework.stereotype.Component;
  18. import java.util.Map;
  19. import java.util.Set;
  20. import java.util.concurrent.TimeUnit;
  21. @Component
  22. @Slf4j(topic = "MsgHeartRunner")
  23. public class MsgHeartRunner {
  24. @Autowired
  25. private DeviceService deviceService;
  26. @Autowired
  27. private LoginFrame loginFrame;
  28. @Autowired
  29. private RedisCache redisCache;
  30. @Autowired
  31. private HeartFrameSend heartFrameSend;
  32. @Autowired
  33. private SocketHandle socketHandle;
  34. @Async("heartTaskAsyncPool")
  35. public void heartMsg(Map<String, DeviceConnectionMsg> map) throws Exception {
  36. log.info("======Heart beat task starting=====");
  37. //todo return
  38. //任务处理
  39. //map拿出来进行心跳包上报,1:直接检查心跳时间是否大于三十秒
  40. Set<String> devicePileCodes = map.keySet();
  41. log.info("heart.deviceConnectionSize>>"+devicePileCodes.size());
  42. devicePileCodes.stream().forEach(devicePileCode -> {
  43. try{
  44. DeviceConnectionMsg deviceConnectionMsg = map.get(devicePileCode);
  45. handlePort(1,devicePileCode,deviceConnectionMsg);
  46. handlePort(2,devicePileCode,deviceConnectionMsg);
  47. }catch (Exception e){
  48. log.info("{}Heart beat task exception"+e.toString(),devicePileCode);
  49. e.printStackTrace();
  50. }
  51. });
  52. log.info("======Heart beat task ending=====");
  53. }
  54. public void handlePort(int portId,String devicePileCode,DeviceConnectionMsg deviceConnectionMsg){
  55. Long heartTime = deviceConnectionMsg.getHeartTime();
  56. String portStatusCacheKey = portId==1?RedisConstant.ONLINE_DEVICE_ONE:RedisConstant.ONLINE_DEVICE_TWO;
  57. //1:当上一次的心跳和本次的心跳相差大于30秒证明已经3次没收到心跳回复了,进行重新登录
  58. //2:设备第一次上线时未触发过登录,设备连接集合中初始话的心跳时间还是0,符合该判断,进行登录
  59. if (deviceConnectionMsg.getLoginStatus()== Constant.DEVICE_NOT_LOGIN_STATUS) {
  60. //刚登录5秒内不发送心跳 避免重复发送LoginMsg
  61. if ((System.currentTimeMillis() - deviceConnectionMsg.getLoginTime()) < 8 * 1000) {
  62. return;
  63. }
  64. if(!deviceConnectionMsg.getLoginMsgSend()){
  65. QueryWrapper<Device> deviceQueryWrapper = new QueryWrapper<>();
  66. deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL);
  67. Device device = deviceService.getOne(deviceQueryWrapper);
  68. if(device!=null){
  69. loginFrame.loginMsgSend(deviceConnectionMsg,device);
  70. }
  71. }
  72. }
  73. DeviceStatus oneDs = redisCache.getCacheMapValue(portStatusCacheKey, devicePileCode);
  74. if(oneDs==null){
  75. // log.info("not found ones:{}",devicePileCode);
  76. return;
  77. }
  78. boolean res = heartFrameSend.heartSend(deviceConnectionMsg, oneDs);
  79. if(res){
  80. return;
  81. }
  82. // if (deviceConnectionMsg.getLoginStatus()== Constant.DEVICE_NOT_LOGIN_STATUS) {
  83. // if (oneDs != null && oneDs.getOnlineStatus() == DeviceOnlineStatus.ONLINE && (System.currentTimeMillis() - heartTime) > 50 * 1000L) {
  84. // QueryWrapper<Device> deviceQueryWrapper = new QueryWrapper<>();
  85. // deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL);
  86. // Device device = deviceService.getOne(deviceQueryWrapper);
  87. // if(device!=null){
  88. // // loginFrame.loginMsgSend(deviceConnectionMsg,device);
  89. // }
  90. // log.info("heartTaskAsyncPool-1>not longin and heart normal>>" + devicePileCode);
  91. // return;
  92. // }
  93. // }
  94. if (oneDs.getOnlineStatus() == DeviceOnlineStatus.ONLINE && (System.currentTimeMillis() - heartTime) > 50 * 1000L) {
  95. log.info("pileCode:" + devicePileCode + " loging... ..." + (System.currentTimeMillis() - heartTime));
  96. int a = 0;
  97. if (redisCache.hasKey(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode)) {
  98. //如果登录次数30秒内登三次就算断开
  99. a = redisCache.getCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode);
  100. if (a > 3) {
  101. log.info("30s no heart {} reset",devicePileCode);
  102. socketHandle.removeDeviceConnection(devicePileCode);
  103. } else {
  104. // QueryWrapper<Device> deviceQueryWrapper = new QueryWrapper<>();
  105. // deviceQueryWrapper.eq("pile_code", devicePileCode).eq("disabled", DeviceOnlineStatus.NORMAL);
  106. // Device device = deviceService.getOne(deviceQueryWrapper);
  107. // if(device!=null){
  108. //// loginFrame.loginMsgSend(deviceConnectionMsg, device);
  109. // socketHandle.removeDeviceConnection(devicePileCode);
  110. // }
  111. }
  112. a++;
  113. redisCache.setCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode, a,32 * 1000, TimeUnit.MILLISECONDS);
  114. } else {
  115. redisCache.setCacheObject(RedisConstant.DEVICE_LOGIN_YKC + devicePileCode, a, 32 * 1000, TimeUnit.MILLISECONDS);
  116. }
  117. }
  118. }
  119. }