MsgTranscationRunner.java 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package com.tmzn.devicelinkykc.taskQueue.runner;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
  4. import com.tmzn.devicelinkykc.constant.Constant;
  5. import com.tmzn.devicelinkykc.constant.DeviceOnlineStatus;
  6. import com.tmzn.devicelinkykc.constant.RedisConstant;
  7. import com.tmzn.devicelinkykc.constant.ykc.StatusConstant;
  8. import com.tmzn.devicelinkykc.entity.BillingModel;
  9. import com.tmzn.devicelinkykc.entity.Device;
  10. import com.tmzn.devicelinkykc.entity.DeviceStatus;
  11. import com.tmzn.devicelinkykc.entity.OrderStatus;
  12. import com.tmzn.devicelinkykc.frameMsg.DataConversion;
  13. import com.tmzn.devicelinkykc.frameMsg.TransMoney;
  14. import com.tmzn.devicelinkykc.frameMsg.frameType.CharngingPushFrame;
  15. import com.tmzn.devicelinkykc.frameMsg.frameType.HeartFrameSend;
  16. import com.tmzn.devicelinkykc.frameMsg.frameType.LoginFrame;
  17. import com.tmzn.devicelinkykc.frameMsg.frameType.TransactionFlowPushFrame;
  18. import com.tmzn.devicelinkykc.message.DeviceMsgHandle;
  19. import com.tmzn.devicelinkykc.redis.RedisCache;
  20. import com.tmzn.devicelinkykc.service.BillingModelService;
  21. import com.tmzn.devicelinkykc.service.DeviceService;
  22. import com.tmzn.devicelinkykc.service.OrderStatusService;
  23. import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg;
  24. import com.tmzn.devicelinkykc.socket.SocketHandle;
  25. import lombok.extern.slf4j.Slf4j;
  26. import org.slf4j.Logger;
  27. import org.slf4j.LoggerFactory;
  28. import org.springframework.beans.factory.annotation.Autowired;
  29. import org.springframework.scheduling.annotation.Async;
  30. import org.springframework.stereotype.Component;
  31. import java.io.IOException;
  32. import java.math.BigDecimal;
  33. import java.time.Instant;
  34. import java.time.ZoneId;
  35. import java.util.List;
  36. import java.util.Map;
  37. import java.util.Set;
  38. import java.util.concurrent.TimeUnit;
  39. import java.util.stream.Collectors;
  40. @Component
  41. @Slf4j(topic = "MsgHeartRunner")
  42. public class MsgTranscationRunner {
  43. @Autowired
  44. private OrderStatusService orderStatusService;
  45. @Autowired
  46. private SocketHandle socketHandle;
  47. @Autowired
  48. DeviceMsgHandle deviceMsgHandle;
  49. @Autowired
  50. TransMoney transMoney;
  51. @Autowired
  52. BillingModelService billingModelService;
  53. @Autowired
  54. TransactionFlowPushFrame transactionFlowPushFrame;
  55. @Autowired
  56. CharngingPushFrame charngingPushFrame;
  57. @Autowired
  58. RedisCache redisCache;
  59. private static final Logger logger = LoggerFactory.getLogger(DeviceMsgHandle.class);
  60. public void rebillOne(OrderStatus statusServiceOne) throws IOException {
  61. long t = 1730795051000L;
  62. if(statusServiceOne.getCreateTime()<t){
  63. log.info("{}11月5号之前旧版的订单不上报了吧",statusServiceOne.getPileCode());
  64. return;
  65. }
  66. if(!deviceMsgHandle.checkConnection(statusServiceOne.getPileCode(),statusServiceOne.getDeviceImei())){
  67. log.info("{}重新上报句柄不存在等待下次执行",statusServiceOne.getPileCode());
  68. return;
  69. }
  70. DeviceConnectionMsg deviceConnection = socketHandle.getDeviceConnection(statusServiceOne.getPileCode());
  71. String imei = statusServiceOne.getDeviceImei();
  72. int port = statusServiceOne.getGunsCode();
  73. //获取缓存的最终结果
  74. QueryWrapper<BillingModel> billingModelQueryWrapper = new QueryWrapper<>();
  75. billingModelQueryWrapper.eq("device_imei", statusServiceOne.getDeviceImei());
  76. BillingModel model = billingModelService.getOne(billingModelQueryWrapper);
  77. statusServiceOne.setRetryTime(statusServiceOne.getRetryTime()+1);
  78. Map<String, BigDecimal> map ;
  79. //最终计算结果
  80. String cacheKey = "compute_finish:"+statusServiceOne.getId();
  81. if(redisCache.hasKey(cacheKey)){
  82. map = redisCache.getCacheObject(cacheKey);
  83. log.info("{}从缓存获取map{}",statusServiceOne.getDeviceImei(), JSON.toJSONString(map));
  84. }else{
  85. try{
  86. map = transMoney.compute(port, model, statusServiceOne.getCreateTime(), statusServiceOne.getEndTime());
  87. //10分钟内只查询一次
  88. log.info("{}重新获取map{}",statusServiceOne.getDeviceImei(), JSON.toJSONString(map));
  89. redisCache.setCacheObject(cacheKey,map);
  90. redisCache.expire(cacheKey,600);
  91. }catch (Exception e){
  92. orderStatusService.updateById(statusServiceOne);
  93. log.info("计算金额异常{},{},重试第{}次",statusServiceOne.getDeviceImei(),e.getMessage(),statusServiceOne.getRetryTime());
  94. return ;
  95. }
  96. }
  97. byte[] encrypt = new byte[0];
  98. //查询计费模板
  99. encrypt = transactionFlowPushFrame.sendTrans(deviceConnection, statusServiceOne.getTransOrder(), statusServiceOne.getPileCode(), statusServiceOne.getGunsCode(), statusServiceOne.getCreateTime(), statusServiceOne.getEndTime(), model, statusServiceOne.getCard(), map, statusServiceOne.getReasonStopCharging());
  100. if (encrypt == null || encrypt.length <= 0) {
  101. logger.info("订单上送重试{} 需要重新上报{}", statusServiceOne.getRetryTime(),statusServiceOne.getPileCode());
  102. orderStatusService.updateById(statusServiceOne);
  103. return;
  104. } else {
  105. logger.info("订单上送成功{}", statusServiceOne.getPileCode());
  106. statusServiceOne.setTransactionOrderReportingActionStatus(StatusConstant.TRANSACTION_ORDER_REPORTING_ACTION_STATUS_OK);
  107. statusServiceOne.setOriginalText(encrypt);
  108. }
  109. orderStatusService.updateById(statusServiceOne);
  110. }
  111. @Async("transactionTaskAsyncPool")
  112. public void transactionMsg(Map<String, DeviceConnectionMsg> map) throws Exception {
  113. //处理已结束的订单 未上报的 离线上报
  114. log.info("======离线订单记录补送=====");
  115. //任务处理 1.上报交易流水消息后没有响应回复的情况,30秒上送一次三次后停止,五分钟后上送最后一次,不管是否成功都不再上送:有结束时间,充电状态是结束充电的
  116. long t = System.currentTimeMillis()-60 *1000;
  117. QueryWrapper<OrderStatus> queryWrapper = new QueryWrapper<>();
  118. queryWrapper.eq("now_order_status", StatusConstant.NOW_ORDER_STATUS_CHARGING_ENDING).orderByDesc("id"); //充电完成
  119. //已经停止一分钟了 还没有上送成功的
  120. queryWrapper.lt("end_time",t);
  121. queryWrapper.eq("transaction_order_reply_status", 0); //未回复的
  122. queryWrapper.lt("retry_time",10);
  123. queryWrapper.last("limit 500");
  124. List<OrderStatus> list = orderStatusService.list(queryWrapper);
  125. list.forEach(item->{
  126. try{
  127. log.info("检测订单上报{}--{}",item.getPileCode(),item.getId());
  128. rebillOne(item);
  129. }catch (Exception e){
  130. log.info("离线上报失败{},{}",item.getPileCode(),e.getMessage());
  131. }
  132. });
  133. log.info("======transaction push task ending=====");
  134. }
  135. }