| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package com.tmzn.devicelinkykc.taskQueue.runner;
- import com.alibaba.fastjson2.JSON;
- import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
- import com.tmzn.devicelinkykc.constant.Constant;
- import com.tmzn.devicelinkykc.constant.DeviceOnlineStatus;
- import com.tmzn.devicelinkykc.constant.RedisConstant;
- import com.tmzn.devicelinkykc.constant.ykc.StatusConstant;
- import com.tmzn.devicelinkykc.entity.BillingModel;
- import com.tmzn.devicelinkykc.entity.Device;
- import com.tmzn.devicelinkykc.entity.DeviceStatus;
- import com.tmzn.devicelinkykc.entity.OrderStatus;
- import com.tmzn.devicelinkykc.frameMsg.DataConversion;
- import com.tmzn.devicelinkykc.frameMsg.TransMoney;
- import com.tmzn.devicelinkykc.frameMsg.frameType.CharngingPushFrame;
- import com.tmzn.devicelinkykc.frameMsg.frameType.HeartFrameSend;
- import com.tmzn.devicelinkykc.frameMsg.frameType.LoginFrame;
- import com.tmzn.devicelinkykc.frameMsg.frameType.TransactionFlowPushFrame;
- import com.tmzn.devicelinkykc.message.DeviceMsgHandle;
- import com.tmzn.devicelinkykc.redis.RedisCache;
- import com.tmzn.devicelinkykc.service.BillingModelService;
- import com.tmzn.devicelinkykc.service.DeviceService;
- import com.tmzn.devicelinkykc.service.OrderStatusService;
- import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg;
- import com.tmzn.devicelinkykc.socket.SocketHandle;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
- import java.math.BigDecimal;
- import java.time.Instant;
- import java.time.ZoneId;
- import java.util.List;
- import java.util.Map;
- import java.util.Set;
- import java.util.concurrent.TimeUnit;
- import java.util.stream.Collectors;
- @Component
- @Slf4j(topic = "MsgHeartRunner")
- public class MsgTranscationRunner {
- @Autowired
- private OrderStatusService orderStatusService;
- @Autowired
- private SocketHandle socketHandle;
- @Autowired
- DeviceMsgHandle deviceMsgHandle;
- @Autowired
- TransMoney transMoney;
- @Autowired
- BillingModelService billingModelService;
- @Autowired
- TransactionFlowPushFrame transactionFlowPushFrame;
- @Autowired
- CharngingPushFrame charngingPushFrame;
- @Autowired
- RedisCache redisCache;
- private static final Logger logger = LoggerFactory.getLogger(DeviceMsgHandle.class);
- public void rebillOne(OrderStatus statusServiceOne) throws IOException {
- long t = 1730795051000L;
- if(statusServiceOne.getCreateTime()<t){
- log.info("{}11月5号之前旧版的订单不上报了吧",statusServiceOne.getPileCode());
- return;
- }
- if(!deviceMsgHandle.checkConnection(statusServiceOne.getPileCode(),statusServiceOne.getDeviceImei())){
- log.info("{}重新上报句柄不存在等待下次执行",statusServiceOne.getPileCode());
- return;
- }
- DeviceConnectionMsg deviceConnection = socketHandle.getDeviceConnection(statusServiceOne.getPileCode());
- String imei = statusServiceOne.getDeviceImei();
- int port = statusServiceOne.getGunsCode();
- //获取缓存的最终结果
- QueryWrapper<BillingModel> billingModelQueryWrapper = new QueryWrapper<>();
- billingModelQueryWrapper.eq("device_imei", statusServiceOne.getDeviceImei());
- BillingModel model = billingModelService.getOne(billingModelQueryWrapper);
- statusServiceOne.setRetryTime(statusServiceOne.getRetryTime()+1);
- Map<String, BigDecimal> map ;
- //最终计算结果
- String cacheKey = "compute_finish:"+statusServiceOne.getId();
- if(redisCache.hasKey(cacheKey)){
- map = redisCache.getCacheObject(cacheKey);
- log.info("{}从缓存获取map{}",statusServiceOne.getDeviceImei(), JSON.toJSONString(map));
- }else{
- try{
- map = transMoney.compute(port, model, statusServiceOne.getCreateTime(), statusServiceOne.getEndTime());
- //10分钟内只查询一次
- log.info("{}重新获取map{}",statusServiceOne.getDeviceImei(), JSON.toJSONString(map));
- redisCache.setCacheObject(cacheKey,map);
- redisCache.expire(cacheKey,600);
- }catch (Exception e){
- orderStatusService.updateById(statusServiceOne);
- log.info("计算金额异常{},{},重试第{}次",statusServiceOne.getDeviceImei(),e.getMessage(),statusServiceOne.getRetryTime());
- return ;
- }
- }
- byte[] encrypt = new byte[0];
- //查询计费模板
- encrypt = transactionFlowPushFrame.sendTrans(deviceConnection, statusServiceOne.getTransOrder(), statusServiceOne.getPileCode(), statusServiceOne.getGunsCode(), statusServiceOne.getCreateTime(), statusServiceOne.getEndTime(), model, statusServiceOne.getCard(), map, statusServiceOne.getReasonStopCharging());
- if (encrypt == null || encrypt.length <= 0) {
- logger.info("订单上送重试{} 需要重新上报{}", statusServiceOne.getRetryTime(),statusServiceOne.getPileCode());
- orderStatusService.updateById(statusServiceOne);
- return;
- } else {
- logger.info("订单上送成功{}", statusServiceOne.getPileCode());
- statusServiceOne.setTransactionOrderReportingActionStatus(StatusConstant.TRANSACTION_ORDER_REPORTING_ACTION_STATUS_OK);
- statusServiceOne.setOriginalText(encrypt);
- }
- orderStatusService.updateById(statusServiceOne);
- }
- @Async("transactionTaskAsyncPool")
- public void transactionMsg(Map<String, DeviceConnectionMsg> map) throws Exception {
- //处理已结束的订单 未上报的 离线上报
- log.info("======离线订单记录补送=====");
- //任务处理 1.上报交易流水消息后没有响应回复的情况,30秒上送一次三次后停止,五分钟后上送最后一次,不管是否成功都不再上送:有结束时间,充电状态是结束充电的
- long t = System.currentTimeMillis()-60 *1000;
- QueryWrapper<OrderStatus> queryWrapper = new QueryWrapper<>();
- queryWrapper.eq("now_order_status", StatusConstant.NOW_ORDER_STATUS_CHARGING_ENDING).orderByDesc("id"); //充电完成
- //已经停止一分钟了 还没有上送成功的
- queryWrapper.lt("end_time",t);
- queryWrapper.eq("transaction_order_reply_status", 0); //未回复的
- queryWrapper.lt("retry_time",10);
- queryWrapper.last("limit 500");
- List<OrderStatus> list = orderStatusService.list(queryWrapper);
- list.forEach(item->{
- try{
- log.info("检测订单上报{}--{}",item.getPileCode(),item.getId());
- rebillOne(item);
- }catch (Exception e){
- log.info("离线上报失败{},{}",item.getPileCode(),e.getMessage());
- }
- });
- log.info("======transaction push task ending=====");
- }
- }
|