package com.tmzn.devicelinkykc.taskQueue.runner; import com.alibaba.fastjson2.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.tmzn.devicelinkykc.constant.Constant; import com.tmzn.devicelinkykc.constant.DeviceOnlineStatus; import com.tmzn.devicelinkykc.constant.RedisConstant; import com.tmzn.devicelinkykc.constant.ykc.StatusConstant; import com.tmzn.devicelinkykc.entity.BillingModel; import com.tmzn.devicelinkykc.entity.Device; import com.tmzn.devicelinkykc.entity.DeviceStatus; import com.tmzn.devicelinkykc.entity.OrderStatus; import com.tmzn.devicelinkykc.frameMsg.DataConversion; import com.tmzn.devicelinkykc.frameMsg.TransMoney; import com.tmzn.devicelinkykc.frameMsg.frameType.CharngingPushFrame; import com.tmzn.devicelinkykc.frameMsg.frameType.HeartFrameSend; import com.tmzn.devicelinkykc.frameMsg.frameType.LoginFrame; import com.tmzn.devicelinkykc.frameMsg.frameType.TransactionFlowPushFrame; import com.tmzn.devicelinkykc.message.DeviceMsgHandle; import com.tmzn.devicelinkykc.redis.RedisCache; import com.tmzn.devicelinkykc.service.BillingModelService; import com.tmzn.devicelinkykc.service.DeviceService; import com.tmzn.devicelinkykc.service.OrderStatusService; import com.tmzn.devicelinkykc.socket.DeviceConnectionMsg; import com.tmzn.devicelinkykc.socket.SocketHandle; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.io.IOException; import java.math.BigDecimal; import java.time.Instant; import java.time.ZoneId; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Component @Slf4j(topic = "MsgHeartRunner") public class MsgTranscationRunner { @Autowired private OrderStatusService orderStatusService; @Autowired private SocketHandle socketHandle; @Autowired DeviceMsgHandle deviceMsgHandle; @Autowired TransMoney transMoney; @Autowired BillingModelService billingModelService; @Autowired TransactionFlowPushFrame transactionFlowPushFrame; @Autowired CharngingPushFrame charngingPushFrame; @Autowired RedisCache redisCache; private static final Logger logger = LoggerFactory.getLogger(DeviceMsgHandle.class); public void rebillOne(OrderStatus statusServiceOne) throws IOException { long t = 1730795051000L; if(statusServiceOne.getCreateTime() billingModelQueryWrapper = new QueryWrapper<>(); billingModelQueryWrapper.eq("device_imei", statusServiceOne.getDeviceImei()); BillingModel model = billingModelService.getOne(billingModelQueryWrapper); statusServiceOne.setRetryTime(statusServiceOne.getRetryTime()+1); Map map ; //最终计算结果 String cacheKey = "compute_finish:"+statusServiceOne.getId(); if(redisCache.hasKey(cacheKey)){ map = redisCache.getCacheObject(cacheKey); log.info("{}从缓存获取map{}",statusServiceOne.getDeviceImei(), JSON.toJSONString(map)); }else{ try{ map = transMoney.compute(port, model, statusServiceOne.getCreateTime(), statusServiceOne.getEndTime()); //10分钟内只查询一次 log.info("{}重新获取map{}",statusServiceOne.getDeviceImei(), JSON.toJSONString(map)); redisCache.setCacheObject(cacheKey,map); redisCache.expire(cacheKey,600); }catch (Exception e){ orderStatusService.updateById(statusServiceOne); log.info("计算金额异常{},{},重试第{}次",statusServiceOne.getDeviceImei(),e.getMessage(),statusServiceOne.getRetryTime()); return ; } } byte[] encrypt = new byte[0]; //查询计费模板 encrypt = transactionFlowPushFrame.sendTrans(deviceConnection, statusServiceOne.getTransOrder(), statusServiceOne.getPileCode(), statusServiceOne.getGunsCode(), statusServiceOne.getCreateTime(), statusServiceOne.getEndTime(), model, statusServiceOne.getCard(), map, statusServiceOne.getReasonStopCharging()); if (encrypt == null || encrypt.length <= 0) { logger.info("订单上送重试{} 需要重新上报{}", statusServiceOne.getRetryTime(),statusServiceOne.getPileCode()); orderStatusService.updateById(statusServiceOne); return; } else { logger.info("订单上送成功{}", statusServiceOne.getPileCode()); statusServiceOne.setTransactionOrderReportingActionStatus(StatusConstant.TRANSACTION_ORDER_REPORTING_ACTION_STATUS_OK); statusServiceOne.setOriginalText(encrypt); } orderStatusService.updateById(statusServiceOne); } @Async("transactionTaskAsyncPool") public void transactionMsg(Map map) throws Exception { //处理已结束的订单 未上报的 离线上报 log.info("======离线订单记录补送====="); //任务处理 1.上报交易流水消息后没有响应回复的情况,30秒上送一次三次后停止,五分钟后上送最后一次,不管是否成功都不再上送:有结束时间,充电状态是结束充电的 long t = System.currentTimeMillis()-60 *1000; QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("now_order_status", StatusConstant.NOW_ORDER_STATUS_CHARGING_ENDING).orderByDesc("id"); //充电完成 //已经停止一分钟了 还没有上送成功的 queryWrapper.lt("end_time",t); queryWrapper.eq("transaction_order_reply_status", 0); //未回复的 queryWrapper.lt("retry_time",10); queryWrapper.last("limit 500"); List list = orderStatusService.list(queryWrapper); list.forEach(item->{ try{ log.info("检测订单上报{}--{}",item.getPileCode(),item.getId()); rebillOne(item); }catch (Exception e){ log.info("离线上报失败{},{}",item.getPileCode(),e.getMessage()); } }); log.info("======transaction push task ending====="); } }