浏览代码

修改新版本

wzh 2 年之前
父节点
当前提交
9e524f84d4

+ 1 - 38
ruoyi-admin/src/main/resources/application-dev.yml

@@ -87,41 +87,6 @@ spring:
         max-active: 8
         # #连接池最大阻塞等待时间(使用负值表示没有限制)
         max-wait: -1ms
-  kafka:
-    bootstrap-servers: 113.141.88.28:9092
-    consumer:
-      auto:
-        commit:
-          interval:
-            ms: 1000
-      auto-offset-reset: latest
-      enable-auto-commit: true
-      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      max-poll-records: 500
-      properties:
-        group:
-          id: msgAdmin
-        request:
-          timeout:
-            ms: 180000
-        session:
-          timeout:
-            ms: 120000
-      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-    listener:
-      concurrency: 1
-      missing-topics-fatal: false
-      type: batch
-    producer:
-      acks: 1
-      batch-size: 16384
-      buffer-memory: 33554432
-      key-serializer: org.apache.kafka.common.serialization.StringSerializer
-      properties:
-        linger:
-          ms: 0
-      retries: 0
-      value-serializer: org.apache.kafka.common.serialization.StringSerializer
   datasource:
     type: com.alibaba.druid.pool.DruidDataSource
     driverClassName: com.mysql.cj.jdbc.Driver
@@ -246,10 +211,8 @@ xss:
 knife4j:
   enable: true
 deviceName: 869636060423465
-kafka:
-  topic: deviceMessage
 productKey: 52
 splitTable: true
-
+receiveService: http://127.0.0.1:8052
 
 

+ 1 - 36
ruoyi-admin/src/main/resources/application-prod.yml

@@ -87,41 +87,6 @@ spring:
         max-active: 8
         # #连接池最大阻塞等待时间(使用负值表示没有限制)
         max-wait: -1ms
-  kafka:
-    bootstrap-servers: 113.141.88.28:9092
-    consumer:
-      auto:
-        commit:
-          interval:
-            ms: 1000
-      auto-offset-reset: latest
-      enable-auto-commit: true
-      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-      max-poll-records: 500
-      properties:
-        group:
-          id: msgAdmin
-        request:
-          timeout:
-            ms: 180000
-        session:
-          timeout:
-            ms: 120000
-      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-    listener:
-      concurrency: 1
-      missing-topics-fatal: false
-      type: batch
-    producer:
-      acks: 1
-      batch-size: 16384
-      buffer-memory: 33554432
-      key-serializer: org.apache.kafka.common.serialization.StringSerializer
-      properties:
-        linger:
-          ms: 0
-      retries: 0
-      value-serializer: org.apache.kafka.common.serialization.StringSerializer
   datasource:
     type: com.alibaba.druid.pool.DruidDataSource
     driverClassName: com.mysql.cj.jdbc.Driver
@@ -251,6 +216,6 @@ kafka:
   topic: deviceMessage
 productKey: 52
 splitTable: true
-
+receiveService: http://127.0.0.1:8052
 
 

+ 1 - 1
ruoyi-admin/src/main/resources/application.yml

@@ -1,3 +1,3 @@
 spring:
   profiles:
-    active: prod
+    active: dev

+ 1 - 0
ruoyi-framework/src/main/java/com/ruoyi/framework/datasource/DynamicDataSourceContextHolder.java

@@ -1,5 +1,6 @@
 package com.ruoyi.framework.datasource;
 
+import com.ruoyi.common.annotation.DataSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

+ 5 - 1
ruoyi-iot/pom.xml

@@ -15,7 +15,11 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>5.8.6</version>
+        </dependency>
         <!-- 系统模块-->
         <dependency>
             <groupId>com.ruoyi</groupId>

+ 14 - 1
ruoyi-iot/src/main/java/com/ruoyi/iot/constant/IotConstant.java

@@ -12,7 +12,20 @@ public class IotConstant {
 
 
     /**
-     * 预约充电,检查key
+     * 开始充电,检查key
      */
     public static final String START_CHARGE_DIR_REDIS = "startcharge:deviceid:";
+
+
+
+    /**
+     * 取消充电,检查key
+     */
+    public static final String CANCEL_CHARGE_DIR_REDIS = "cancelcharge:deviceid:";
+
+
+    /**
+     * 获取需要获取端口详情的设备
+     */
+    public static final String GET_PORT_CMD_KEY = "getPortCmdKey";
 }

+ 9 - 1
ruoyi-iot/src/main/java/com/ruoyi/iot/controller/DeviceController.java

@@ -8,6 +8,7 @@ import com.ruoyi.common.core.domain.AjaxResult;
 import com.ruoyi.common.core.domain.model.LoginUser;
 import com.ruoyi.iot.domain.WeitiandiDeviceInfo;
 import com.ruoyi.iot.entity.param.DeviceParam;
+import com.ruoyi.iot.queue.MsgQueue;
 import com.ruoyi.iot.service.DeviceControlerService;
 import com.ruoyi.iot.service.ITChargeRecordService;
 import com.ruoyi.iot.service.ITDeviceRecordService;
@@ -42,7 +43,8 @@ public class DeviceController {
 
     @Autowired
     private IWeitiandiDeviceInfoService iWeitiandiDeviceInfoService;
-
+    @Autowired
+    private MsgQueue msgQueue;
 
     @GetMapping("/testrpc")
     @Anonymous
@@ -143,4 +145,10 @@ public class DeviceController {
         return AjaxResult.success();
     }
 
+    @RequestMapping("/addMsg")
+    public String addMsg(@RequestParam("msg") String msg){
+        msgQueue.add(msg);
+        return "ok";
+    }
+
 }

+ 11 - 0
ruoyi-iot/src/main/java/com/ruoyi/iot/domain/TDevicePlan.java

@@ -2,6 +2,10 @@ package com.ruoyi.iot.domain;
 
 import com.baomidou.mybatisplus.annotation.TableName;
 import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
+import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
 import lombok.Data;
 
 import java.io.Serializable;
@@ -15,6 +19,8 @@ public class TDevicePlan implements Serializable {
 
     /** 创建时间 */
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
+    @JsonSerialize(using = LocalDateTimeSerializer.class)
     private LocalDateTime createTime;
 
     /** 预约单次 */
@@ -25,6 +31,8 @@ public class TDevicePlan implements Serializable {
 
     /** 运行时间 */
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
+    @JsonSerialize(using = LocalDateTimeSerializer.class)
     private LocalDateTime runTime;
 
     /** 重复执行日期 */
@@ -41,4 +49,7 @@ public class TDevicePlan implements Serializable {
 
     /** ccid */
     private String ccid;
+
+    /*预约端口*/
+    private Integer port;
 }

+ 16 - 8
ruoyi-iot/src/main/java/com/ruoyi/iot/domain/TPlanRecord.java

@@ -1,15 +1,16 @@
 package com.ruoyi.iot.domain;
 
-import com.baomidou.mybatisplus.annotation.FieldFill;
-import com.baomidou.mybatisplus.annotation.IdType;
-import com.baomidou.mybatisplus.annotation.TableField;
-import com.baomidou.mybatisplus.annotation.TableId;
-import com.baomidou.mybatisplus.annotation.TableName;
-import java.io.Serializable;
-import java.time.LocalDateTime;
+import com.baomidou.mybatisplus.annotation.*;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
+import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
 import lombok.Getter;
 import lombok.Setter;
 
+import java.io.Serializable;
+import java.time.LocalDateTime;
+
 /**
  * <p>
  *
@@ -35,6 +36,8 @@ public class TPlanRecord implements Serializable {
      * 运行时间
      */
     @TableField(fill = FieldFill.INSERT)
+    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
+    @JsonSerialize(using = LocalDateTimeSerializer.class)
     private LocalDateTime createTime;
 
     /**
@@ -60,10 +63,15 @@ public class TPlanRecord implements Serializable {
      * 原预约的ID
      */
     private Long planId;
-
+    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
+    @JsonSerialize(using = LocalDateTimeSerializer.class)
     private LocalDateTime planTime;
+    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
+    @JsonSerialize(using = LocalDateTimeSerializer.class)
     private LocalDateTime checkTime;
 
     private String ccid;
 
+    private Integer port;
+
 }

+ 0 - 1
ruoyi-iot/src/main/java/com/ruoyi/iot/mq/KafkaConsumer.java

@@ -12,7 +12,6 @@ import org.springframework.stereotype.Component;
 
 import java.util.List;
 
-@Component
 @Slf4j
 public class KafkaConsumer {
 

+ 57 - 0
ruoyi-iot/src/main/java/com/ruoyi/iot/queue/MsgQueue.java

@@ -0,0 +1,57 @@
+package com.ruoyi.iot.queue;
+
+import com.ruoyi.iot.util.HutoolHttpUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+@Component
+@Slf4j
+public class MsgQueue extends Thread implements ApplicationRunner {
+
+    private BlockingQueue<String> queue = new ArrayBlockingQueue(1024);
+
+    @Value("${receiveService}")
+    private String receiveService;
+    @Autowired
+    private TaskRunner taskRunner;
+    public void add(String object){
+        try {
+            queue.put(object);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void run() {
+       while (true){
+           try {
+               String take = queue.take();
+                taskRunner.handlerMsg(take);
+           } catch (Exception e) {
+               e.printStackTrace();
+               log.error("执行异常",e);
+           }
+       }
+    }
+
+
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+            this.start();
+            try {
+                HutoolHttpUtil.get(receiveService+"/receive/reset",new HashMap<>());
+            }catch (Exception e){
+                log.error("连接消息接收服务器失败",e);
+            }
+            log.info("处理器启动");
+    }
+}

+ 31 - 0
ruoyi-iot/src/main/java/com/ruoyi/iot/queue/TaskExecutePool.java

@@ -0,0 +1,31 @@
+package com.ruoyi.iot.queue;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * 自定义创建线程池
+ */
+@Configuration
+@EnableAsync
+public class TaskExecutePool {
+    @Bean
+    public Executor myTaskAsyncPool() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        executor.setCorePoolSize(4); //核心线程数
+        executor.setMaxPoolSize(8);  //最大线程数
+        executor.setQueueCapacity(1000); //队列大小
+        executor.setKeepAliveSeconds(300); //线程最大空闲时间
+        executor.setThreadNamePrefix("async-Executor-");
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 拒绝策略(一共四种,此处省略)
+        executor.initialize();
+        return executor;
+    }
+}
+
+

+ 48 - 0
ruoyi-iot/src/main/java/com/ruoyi/iot/queue/TaskRunner.java

@@ -0,0 +1,48 @@
+package com.ruoyi.iot.queue;
+
+import com.alibaba.fastjson2.JSONObject;
+import com.ruoyi.iot.service.ITDevicePlanService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class TaskRunner {
+    @Autowired
+    private ITDevicePlanService itDevicePlanService;
+
+    @Async("myTaskAsyncPool")
+    public void handlerMsg(String s) {
+        try {
+            JSONObject jsonObject = JSONObject.parseObject(s);
+            Integer type = jsonObject.getInteger("type");
+            if (type == null) {
+                type = 0;
+            }
+
+            String imei = jsonObject.getString("imei");
+            if (type.equals(103)) {//
+                JSONObject real_data = jsonObject.getJSONObject("real_data");
+                JSONObject data = new JSONObject();
+                data.put("real_data", real_data);
+                data.put("type", type);
+                itDevicePlanService.runPlanChargeJob(imei, real_data);
+
+            }
+//            /**
+//             * 116端口发生变化
+//             * 113 停止充电
+//             * 104直接开始充电
+//             */
+//            if (type.equals(116) || type.equals(113) || type.equals(104)) {//代表端口发生变化
+//                itDevicePlanService.portChange(imei, jsonObject);
+//            }
+
+
+        }catch (Exception e){
+            log.error("解析异常",e);
+        }
+    }
+}

+ 17 - 4
ruoyi-iot/src/main/java/com/ruoyi/iot/service/DeviceControlerService.java

@@ -2,7 +2,6 @@ package com.ruoyi.iot.service;
 
 import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONObject;
-import com.ruoyi.iot.domain.TChargeRecord;
 import com.ruoyi.iot.entity.param.DeviceParam;
 import com.ruoyi.iot.openfeign.MsgService;
 import com.ruoyi.iot.transdata.DataParam;
@@ -14,8 +13,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.time.LocalDateTime;
-
 @Service
 @Slf4j
 public class DeviceControlerService {
@@ -145,7 +142,7 @@ public class DeviceControlerService {
     /**
      * 预约充电
      */
-    public void planCharge(String deviceId,String ccid,PlanCharge planCharge) {
+    public void planCharge(String deviceId, String ccid, PlanCharge planCharge) {
         DataParam dataParam = new DataParam();
         JSONObject object1 = JSONObject.parseObject(JSON.toJSONString(planCharge));
         dataParam.setData(object1);
@@ -157,4 +154,20 @@ public class DeviceControlerService {
     }
 
 
+    /**
+     * 取消预约充电
+     */
+    public void cancelChargePlan(String deviceId, String ccid, int port) {
+        DataParam dataParam = new DataParam();
+        JSONObject object1 = new JSONObject();
+        object1.put("port",port);
+        dataParam.setData(object1);
+        dataParam.setCcid(ccid);
+        dataParam.setDeviceId(deviceId.toString());
+        dataParam.setType(OperEnum.CancelPlanCHarge.getType());
+        RpcResult rpcResult = msgService.sendMsg(dataParam);
+        log.info("发送取消预约充电的消息,{},端口:{}",deviceId,port);
+    }
+
+
 }

+ 5 - 8
ruoyi-iot/src/main/java/com/ruoyi/iot/service/ITDevicePlanService.java

@@ -5,9 +5,7 @@ import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.mybatisplus.extension.service.IService;
 import com.ruoyi.iot.domain.TDevicePlan;
 import com.ruoyi.iot.domain.dto.TDevicePlanAddNewDTO;
-import com.ruoyi.iot.domain.vo.TDevicePlanVO;
-
-import java.util.List;
+import com.ruoyi.iot.transdata.RpcResult;
 
 /**
  * 处理设备记录仪数据的业务接口
@@ -28,14 +26,13 @@ public interface ITDevicePlanService extends IService<TDevicePlan>  {
      * @return
      */
     void checkReadyToRun();
-    /**
-     * 根据用户id查询预约记录
-     * @return
-     */
-    List<TDevicePlanVO> getStandardByUserId(Long userId);
 
     void checkRunSuccess();
 
+    RpcResult cancelPlan(Long planId);
+
 
     void runPlanChargeJob(String deviceId,JSONObject object);
+
+
 }

+ 1 - 1
ruoyi-iot/src/main/java/com/ruoyi/iot/service/ITPlanRecordService.java

@@ -1,7 +1,7 @@
 package com.ruoyi.iot.service;
 
-import com.ruoyi.iot.domain.TPlanRecord;
 import com.baomidou.mybatisplus.extension.service.IService;
+import com.ruoyi.iot.domain.TPlanRecord;
 
 import java.util.List;
 

+ 192 - 61
ruoyi-iot/src/main/java/com/ruoyi/iot/service/impl/TDevicePlanServiceImpl.java

@@ -2,28 +2,33 @@ package com.ruoyi.iot.service.impl;
 
 import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.toolkit.StringUtils;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.ruoyi.common.core.redis.RedisCache;
 import com.ruoyi.iot.constant.IotConstant;
 import com.ruoyi.iot.domain.TDevicePlan;
-import com.ruoyi.iot.domain.dto.TDevicePlanAddNewDTO;
-import com.ruoyi.iot.domain.vo.TDevicePlanVO;
 import com.ruoyi.iot.domain.TPlanRecord;
+import com.ruoyi.iot.domain.dto.TDevicePlanAddNewDTO;
 import com.ruoyi.iot.entity.param.DeviceParam;
 import com.ruoyi.iot.ex.ServiceException;
 import com.ruoyi.iot.mapper.TDevicePlanMapper;
+import com.ruoyi.iot.mapper.TDeviceRecordMapper;
 import com.ruoyi.iot.service.DeviceControlerService;
 import com.ruoyi.iot.service.ITDevicePlanService;
 import com.ruoyi.iot.service.ITPlanRecordService;
+import com.ruoyi.iot.transdata.RpcResult;
 import com.ruoyi.iot.transdata.entity.PlanCharge;
 import com.ruoyi.iot.util.DateUtil;
 import com.ruoyi.iot.web.ServiceCode;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeanUtils;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -32,7 +37,7 @@ import java.util.concurrent.TimeUnit;
 
 @Slf4j
 @Service
-public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevicePlan> implements ITDevicePlanService {
+public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper, TDevicePlan> implements ITDevicePlanService {
 
     @Autowired
     TDevicePlanMapper tDevicePlanMapper;
@@ -46,23 +51,9 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
     @Autowired
     private ITPlanRecordService itPlanRecordService;
 
-    @Override
-    public void addNew(TDevicePlanAddNewDTO tDevicePlanAddNewDTO) {
-        log.info("开始处理【添加预约】的业务,参数:{}", tDevicePlanAddNewDTO);
-        // 创建TDevicePlan对象
-        TDevicePlan tDevicePlan = new TDevicePlan();
-        // 调用BeanUtils.copyProperties(源,目标)将参数对象中的属性复制到TDevicePlan对象中
-        BeanUtils.copyProperties(tDevicePlanAddNewDTO, tDevicePlan);
-        // 调用Mapper对象的insert()执行插入预约数据
-        log.info("即将执行插入数据,参数:{}", tDevicePlan);
-        tDevicePlan.setStatus(1l);
-        int rows = tDevicePlanMapper.insert(tDevicePlan);
-        if (rows != 1) {
-            String message = "添加预约失败,服务器忙,请稍后再次尝试!";
-            log.warn(message);
-            throw new ServiceException(ServiceCode.ERR_INSERT, message);
-        }
-    }
+    @Autowired
+    private TDeviceRecordMapper tDeviceRecordMapper;
+
 
     /**
      * 检查预约的单子
@@ -77,10 +68,14 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
         List<TDevicePlan> list = this.list(tDevicePlanLambdaQueryWrapper);
         LocalDateTime now = LocalDateTime.now();
         now = now.plus(3, ChronoUnit.MINUTES);
+        DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+
+        DateTimeFormatter allFmt = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
         List<TPlanRecord> planList = new ArrayList<>();
         StringBuilder ids = new StringBuilder();
         for (TDevicePlan tDevicePlan : list) {
             Long id = tDevicePlan.getId();
+            LocalDateTime createTime = tDevicePlan.getCreateTime();
             String deviceId = tDevicePlan.getDeviceId();
             Long planType = tDevicePlan.getPlanType();
             String ccid = tDevicePlan.getCcid();
@@ -107,10 +102,12 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
                     tPlanRecord.setStatus(0);
                     tPlanRecord.setPlanId(id);
                     tPlanRecord.setCcid(ccid);
+                    tPlanRecord.setPort(tDevicePlan.getPort());
                     planList.add(tPlanRecord);
                     deviceControlerService.sendPortDetailCmd(deviceParam);
                     redisCache.setCacheObject(redisKey,tDevicePlan);
                     redisCache.expire(redisKey,30, TimeUnit.SECONDS);
+                    log.info("添加定时执行任务,{},{}",tDevicePlan.getId(),tDevicePlan.getDeviceId()+","+tDevicePlan.getRunTime());
                 }
             }else{
                 String repeatDays = tDevicePlan.getRepeatDays();
@@ -119,13 +116,21 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
                 int dayOfWeek = DateUtil.getDayOfWeek(now);
                 boolean contain = Arrays.stream(days).anyMatch(e -> e.equals(dayOfWeek+""));
                 if(contain){
-                    String hour = now.getHour()+"";
-                    String minute = now.getMinute()+"";
-                    if(minute.length() == 1){
-                        minute = "0"+minute;
-                    }
-                    String curMin = hour+":"+minute;
-                    if(curMin.equals(repeatTime)){
+                    repeatTime = repeatTime+":00";
+                    String format = now.format(fmt);
+                    String runTimeStr = format+" "+repeatTime;
+                    LocalDateTime runTime = LocalDateTime.parse(runTimeStr,allFmt);
+                    int result = now.compareTo(runTime);
+                    if(result>0){
+                        if(runTime.compareTo(createTime)<0){
+                            log.info("该预约任务下次再执行,{},{}",tDevicePlan.getId(),tDevicePlan.getDeviceId());
+                            continue;
+                        }
+                        boolean hasRun = checkHasRun(tDevicePlan, now);
+                        if(hasRun){
+                            log.info("该预约任务今日已经执行,{},{}",tDevicePlan.getId(),tDevicePlan.getDeviceId());
+                            continue;
+                        }
                         TPlanRecord tPlanRecord = new TPlanRecord();
                         tPlanRecord.setDeviceId(deviceId);
                         tPlanRecord.setCreateTime(LocalDateTime.now());
@@ -133,10 +138,12 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
                         tPlanRecord.setStatus(0);
                         tPlanRecord.setPlanId(id);
                         tPlanRecord.setCcid(ccid);
+                        tPlanRecord.setPort(tDevicePlan.getPort());
                         planList.add(tPlanRecord);
                         deviceControlerService.sendPortDetailCmd(deviceParam);
                         redisCache.setCacheObject(redisKey,tDevicePlan);
                         redisCache.expire(redisKey,30, TimeUnit.SECONDS);
+                        log.info("添加每日定时任务,{},{}",tDevicePlan.getId(),tDevicePlan.getDeviceId()+","+tDevicePlan.getRepeatTime());
                     }
                 }
             }
@@ -149,15 +156,22 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
         itPlanRecordService.bactchInsert(planList);
     }
 
-    @Override
-    public List<TDevicePlanVO> getStandardByUserId(Long userId) {
-        log.debug("开始处理【根据用戶id查询详情】的业务,参数:{}", userId);
-
-        // 调用Mapper的countByDeviceId()执行查询
-        List<TDevicePlanVO> tChargeRecordVO = tDevicePlanMapper.ListByUserId(userId);
-
-        log.debug("即将返回查询结果:{}", tChargeRecordVO);
-        return (List<TDevicePlanVO>) tChargeRecordVO;
+    private boolean checkHasRun(TDevicePlan tDevicePlan,LocalDateTime now) {
+        LambdaQueryWrapper<TPlanRecord> queryWrapper = Wrappers.lambdaQuery();
+        queryWrapper.orderByDesc(TPlanRecord::getCheckTime);
+        queryWrapper.eq(TPlanRecord::getPlanId, tDevicePlan.getId());
+        queryWrapper.last(" limit 1");
+        TPlanRecord one = itPlanRecordService.getOne(queryWrapper);
+        if (one != null) {
+            LocalDateTime createTime = one.getCreateTime();
+            DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+            String todayStr = now.format(fmt);
+            String runTime = createTime.format(fmt);
+            if (StringUtils.equals(runTime, todayStr)) {
+                return true;
+            }
+        }
+        return false;
     }
 
     /**
@@ -186,6 +200,39 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
         itPlanRecordService.saveOrUpdateBatch(list);
     }
 
+    @Override
+    public RpcResult cancelPlan(Long planId) {
+        RpcResult rpcResult = null;
+        LambdaQueryWrapper<TDevicePlan> objectLambdaQueryWrapper = Wrappers.lambdaQuery();
+        objectLambdaQueryWrapper.eq(TDevicePlan::getId,planId);
+        TDevicePlan tDevicePlan = this.getOne(objectLambdaQueryWrapper);
+        if(tDevicePlan != null){
+            Long status = tDevicePlan.getStatus();
+            if(status.equals(3L)){
+                rpcResult =RpcResult.fail("该预约记录已经取消");
+            }else{
+                tDevicePlan.setStatus(3L);
+                this.updateById(tDevicePlan);
+                log.info("用户取消预约,{},用户ID:{}",tDevicePlan.getId(),tDevicePlan.getUserId());
+                String deviceId = tDevicePlan.getDeviceId();
+                String ccid = tDevicePlan.getCcid();
+                DeviceParam deviceParam = new DeviceParam();
+                deviceParam.setDeviceId(deviceId);
+                deviceParam.setCcid(ccid);
+                String redisKey = IotConstant.CANCEL_CHARGE_DIR_REDIS+""+deviceId;
+                deviceControlerService.sendPortDetailCmd(deviceParam);
+                redisCache.setCacheObject(redisKey,tDevicePlan);
+                redisCache.expire(redisKey,30, TimeUnit.SECONDS);
+                log.info("添加取消用户预约检测,{},{}",tDevicePlan.getId(),tDevicePlan.getDeviceId()+","+tDevicePlan.getRepeatTime());
+                rpcResult = RpcResult.ok();
+            }
+
+        }else{
+            rpcResult =RpcResult.fail("找不到预约记录");
+        }
+        return rpcResult;
+    }
+
     /**
      * 过4分钟的检查任务
      * @param deviceId
@@ -193,8 +240,15 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
      */
     @Override
     public void runPlanChargeJob(String deviceId, JSONObject object) {
+        String cancelKey = IotConstant.CANCEL_CHARGE_DIR_REDIS+""+deviceId;
+        TDevicePlan tDevicePlan = redisCache.getAndDel(cancelKey);
+        if(tDevicePlan !=null){//如果有取消的任务
+            redisCache.deleteObject(cancelKey);
+            cancelPlanCharge(deviceId,object,tDevicePlan);
+            return;
+        }
         String redisKey = IotConstant.PLAN_CHARGE_DIR_REDIS_KEY+""+deviceId;
-        TDevicePlan tDevicePlan = redisCache.getAndDel(redisKey);
+        tDevicePlan = redisCache.getAndDel(redisKey);
         LocalDateTime planTime = LocalDateTime.now();
         if(tDevicePlan != null){//代表预约了任务,需要判断端口状态
             redisCache.deleteObject(redisKey);
@@ -210,6 +264,43 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
 
 
     }
+    @Override
+    public void addNew(TDevicePlanAddNewDTO tDevicePlanAddNewDTO) {
+        log.info("开始处理【添加预约】的业务,参数:{}", tDevicePlanAddNewDTO);
+        // 创建TDevicePlan对象
+        TDevicePlan tDevicePlan = new TDevicePlan();
+        // 调用BeanUtils.copyProperties(源,目标)将参数对象中的属性复制到TDevicePlan对象中
+        BeanUtils.copyProperties(tDevicePlanAddNewDTO, tDevicePlan);
+        // 调用Mapper对象的insert()执行插入预约数据
+        log.info("即将执行插入数据,参数:{}", tDevicePlan);
+        tDevicePlan.setStatus(1L);
+        int rows = tDevicePlanMapper.insert(tDevicePlan);
+        if (rows != 1) {
+            String message = "添加预约失败,服务器忙,请稍后再次尝试!";
+            log.warn(message);
+            throw new ServiceException(ServiceCode.ERR_INSERT, message);
+        }
+    }
+
+
+
+    private void cancelPlanCharge(String deviceId, JSONObject object, TDevicePlan tDevicePlan) {
+        Integer port = tDevicePlan.getPort();
+        String ccid = tDevicePlan.getCcid();
+        Integer port_status = null;
+        if(port == 1){
+            port_status = object.getInteger("port_first_status");
+        }else if(port == 2){
+            port_status = object.getInteger("port_second_status");
+        }
+        if(port_status == null){
+            port_status = 0;
+        }
+        if(port_status == 6){
+            deviceControlerService.cancelChargePlan(deviceId,ccid,port);
+            log.info("端口{}已经预约,发送取消预约指令,{}",port,deviceId);
+        }
+    }
 
     /**
      * 手动开始充电
@@ -218,22 +309,44 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
      * @param planRecord
      */
     private void startCharge(String deviceId, JSONObject object, TPlanRecord planRecord) {
-        Integer port_first_status = object.getInteger("port_first_status");
-        if(port_first_status == null){
-            port_first_status = 0;
-        }
-        if(port_first_status != 2){
-            String ccid = planRecord.getCcid();
-            String deviceId1 = planRecord.getDeviceId();
-            planRecord.setStatus(-2);
-            deviceControlerService.startCharge(deviceId,ccid);
-            log.info("检测未在充电--发送充电指令,{}", deviceId);
+        Long planId = planRecord.getPlanId();
+        LambdaQueryWrapper<TDevicePlan> objectLambdaQueryWrapper = Wrappers.lambdaQuery();
+        objectLambdaQueryWrapper.eq(TDevicePlan::getId,planId);
+        Integer port = planRecord.getPort();
+        TDevicePlan one = this.getOne(objectLambdaQueryWrapper);
+        if(one == null){
+            planRecord.setStatus(-4);
+            log.info("检测端口:{}预约任务未找到,无法执行{}",port, deviceId);
         }else{
-            planRecord.setStatus(3);
-            log.info("检测自动充电--已经充电中,{}", deviceId);
+            Long status = one.getStatus();
+            if(status.equals(3L)){//代表已经取消
+                planRecord.setStatus(-3);
+                log.info("检测端口:{}预约任务已取消,未执行{}",port, deviceId);
+            }else{
+                Integer port_status = null;
+                if(port == 1){
+                    port_status = object.getInteger("port_first_status");
+                }else if(port == 2){
+                    port_status = object.getInteger("port_second_status");
+                }
+                if(port_status == null){
+                    port_status = 0;
+                }
+                if(port_status != 2){
+                    String ccid = planRecord.getCcid();
+                    String deviceId1 = planRecord.getDeviceId();
+                    planRecord.setStatus(-2);
+                    deviceControlerService.startCharge(deviceId,ccid);
+                    log.info("检测端口:{}未在充电--发送充电指令,{}",port, deviceId);
+                }else{
+                    planRecord.setStatus(3);
+                    log.info("检测端口:{}自动充电--已经充电中,{}",port, deviceId);
+                }
+            }
         }
         itPlanRecordService.saveOrUpdate(planRecord);
 
+
     }
 
     /**
@@ -246,35 +359,53 @@ public class TDevicePlanServiceImpl extends ServiceImpl<TDevicePlanMapper,TDevic
     private void startPlanCharge(String deviceId, JSONObject object, TDevicePlan tDevicePlan, LocalDateTime planTime) {
         LocalDateTime plus = planTime.plus(4, ChronoUnit.MINUTES);
         Long id = tDevicePlan.getId();
+        Integer port = tDevicePlan.getPort();
+        LambdaQueryWrapper<TDevicePlan> objectLambdaQueryWrapper = Wrappers.lambdaQuery();
+        Long planId = tDevicePlan.getId();
+        objectLambdaQueryWrapper.eq(TDevicePlan::getId,planId);
+        TDevicePlan one = this.getOne(objectLambdaQueryWrapper);
+        if(one == null){
+            return;
+        }
+        Long status = one.getStatus();
+        if(status.equals(3L)){
+            log.info("检测端口:{},预约任务已取消,未执行{}",port, deviceId);
+            return;
+        }
         String ccid = tDevicePlan.getCcid();
-        Integer port_first_status = object.getInteger("port_first_status");
-        if(port_first_status == null){
-            port_first_status = 0;
+        Integer port_status = null;
+        if(port == 1){
+            port_status = object.getInteger("port_first_status");
+        }else if(port == 2){
+            port_status = object.getInteger("port_second_status");
+        }
+        if(port_status == null){
+            port_status = 0;
         }
-        if(port_first_status == 6){//代表端口已经预约,忽略本次
-            log.info("端口已经预约,忽略本次预约,{}",deviceId);
+        if(port_status == 6){//代表端口已经预约,忽略本次
+            log.info("端口{}已经预约,忽略本次预约,{}",port,deviceId);
             return;
         }
         LambdaQueryWrapper<TPlanRecord> lambdaQueryWrapper = new LambdaQueryWrapper<>();
-        lambdaQueryWrapper.eq(TPlanRecord::getPlanId, id);
+        lambdaQueryWrapper.eq(TPlanRecord::getPlanId, id).orderByDesc(TPlanRecord::getCreateTime).last("limit 1");
         TPlanRecord planRecord = itPlanRecordService.getOne(lambdaQueryWrapper);
-        if(port_first_status == 5){//已经连接
+        if(port_status == 5){//已经连接
             PlanCharge planCharge = new PlanCharge();
             planCharge.setMoney(50000);
-            planCharge.setPort(1);
-            planCharge.setTime(1);
+            planCharge.setPort(port);
+            planCharge.setTime(3);
             deviceControlerService.planCharge(deviceId,ccid,planCharge);
 
             planRecord.setStatus(1);
             planRecord.setPlanTime(planTime);
             planRecord.setCheckTime(plus);
             itPlanRecordService.saveOrUpdate(planRecord);
-            log.info("端口已经连接,发送预约充电指令,{}",deviceId);
-        }else if(port_first_status != 6){//
+            log.info("端口{}已经连接,发送预约充电指令,{}",port,deviceId);
+        }else if(port_status != 6){//
             planRecord.setStatus(-1);
             planRecord.setCheckResult("端口状态未连接,无法充电");
             itPlanRecordService.saveOrUpdate(planRecord);
-            log.info("端口未连接,无法发送完成预约充电,{}",deviceId);
+            log.info("端口{}未连接,无法发送完成预约充电,{}",port,deviceId);
         }
     }
 

+ 2 - 1
ruoyi-iot/src/main/java/com/ruoyi/iot/transdata/entity/opertype/OperEnum.java

@@ -11,7 +11,8 @@ public enum OperEnum {
     PortOper(8,"操作端口"),
     DEVICESTATUS(9,"获取设备状态39761"),
     ResetDevice(10,"重置设备状态"),
-    PlanCHarge(11,"预约充电");
+    PlanCHarge(11,"预约充电"),
+    CancelPlanCHarge(12,"取消预约充电");
     private int type;
 
     private String desc;

+ 25 - 0
ruoyi-iot/src/main/java/com/ruoyi/iot/util/HutoolHttpUtil.java

@@ -0,0 +1,25 @@
+package com.ruoyi.iot.util;
+
+import cn.hutool.http.HttpRequest;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class HutoolHttpUtil {
+
+    public static String get(String url, Map<String, Object> queryParams) throws IOException {
+        return get(url, queryParams, new HashMap<>(1));
+    }
+
+    public static String get(String url, Map<String, Object> queryParams, Map<String, String> headers) throws IOException {
+        String body = HttpRequest.get(url).timeout(3000).form(queryParams).addHeaders(headers).execute().body();
+        return body;
+    }
+
+    public static String post(String url, String json, Map<String, String> headers) {
+        String body = HttpRequest.post(url).body(json).addHeaders(headers).execute().body();
+        return body;
+    }
+    
+}