package com.ruoyi.iot.mq; import com.alibaba.fastjson2.JSONObject; import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.iot.service.ITChargeRecordService; import com.ruoyi.iot.service.ITDevicePlanService; import com.ruoyi.iot.socket.WebSocket; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.List; @Slf4j public class KafkaConsumer { @Autowired private ITChargeRecordService itChargeRecordService; @Autowired ITDevicePlanService itDevicePlanService; // 消费监听 @KafkaListener(id = "msgAdmin" ,topics = {"${kafka.topic}"}) public void onMessage1(List list) { for (String s : list) { try{ log.info("message is comming,{}",s); JSONObject jsonObject = JSONObject.parseObject(s); Integer type = jsonObject.getInteger("type"); if(type == null){ type = 0; } String imei = jsonObject.getString("imei"); if(type == 101 || type == 116 || type == 103 || type == 96 || type == 113){// JSONObject real_data = jsonObject.getJSONObject("real_data"); JSONObject data = new JSONObject(); data.put("real_data",real_data); data.put("type",type); WebSocket.sendMsg(imei,data); if(type == 103){ itDevicePlanService.runPlanChargeJob(imei,real_data); } if(type == 113){ itChargeRecordService.endChargeNotice(imei); } } }catch (Exception e){ log.error("解析异常",e); } } } }