|
@@ -1,57 +1,50 @@
|
|
|
package com.qlm.netty.handler;
|
|
package com.qlm.netty.handler;
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.jfinal.plugin.activerecord.Db;
|
|
|
|
|
+import com.jfinal.plugin.activerecord.Record;
|
|
|
import com.qlm.netty.model.IndustrialControlData;
|
|
import com.qlm.netty.model.IndustrialControlData;
|
|
|
-import com.qlm.netty.protocol.CustomProtocol;
|
|
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
|
|
import io.netty.handler.timeout.IdleStateEvent;
|
|
import io.netty.handler.timeout.IdleStateEvent;
|
|
|
|
|
+import io.netty.util.ReferenceCountUtil;
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
|
|
+import java.util.Date;
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * @author wuyingjianwu
|
|
|
|
|
+ * @date 2025/08/29 10:05
|
|
|
|
|
+ * 描述 : Netty服务端处理类
|
|
|
|
|
+ */
|
|
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
|
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
|
|
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
|
|
|
|
|
|
|
|
- // 心跳消息类型
|
|
|
|
|
- private static final byte HEARTBEAT_TYPE = 1;
|
|
|
|
|
- // 数据消息类型
|
|
|
|
|
- private static final byte DATA_TYPE = 2;
|
|
|
|
|
-
|
|
|
|
|
@Override
|
|
@Override
|
|
|
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
|
|
|
|
|
|
+ public void channelActive(ChannelHandlerContext ctx) {
|
|
|
logger.info("客户端连接成功: {}", ctx.channel().remoteAddress());
|
|
logger.info("客户端连接成功: {}", ctx.channel().remoteAddress());
|
|
|
|
|
+ ctx.writeAndFlush("你已成功连接Netty服务端!");
|
|
|
|
|
+
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
|
|
|
|
- CustomProtocol protocol = (CustomProtocol) msg;
|
|
|
|
|
- String deviceId = protocol.getDeviceId();
|
|
|
|
|
-
|
|
|
|
|
- switch (protocol.getType()) {
|
|
|
|
|
- case HEARTBEAT_TYPE:
|
|
|
|
|
- logger.info("收到来自设备 {} 的心跳消息", deviceId);
|
|
|
|
|
- // 回复心跳
|
|
|
|
|
- ctx.writeAndFlush(new CustomProtocol(deviceId, HEARTBEAT_TYPE, new byte[0]));
|
|
|
|
|
- break;
|
|
|
|
|
-
|
|
|
|
|
- case DATA_TYPE:
|
|
|
|
|
- // 解析数据
|
|
|
|
|
- String dataStr = new String(protocol.getContent());
|
|
|
|
|
- IndustrialControlData data = JSON.parseObject(dataStr, IndustrialControlData.class);
|
|
|
|
|
- data.setDeviceId(deviceId);
|
|
|
|
|
-
|
|
|
|
|
- logger.info("收到来自设备 {} 的数据: {}", deviceId, data);
|
|
|
|
|
-
|
|
|
|
|
- // 这里可以添加数据处理逻辑,例如存储到数据库
|
|
|
|
|
- // processIndustrialData(data);
|
|
|
|
|
-
|
|
|
|
|
- // 回复确认消息
|
|
|
|
|
- ctx.writeAndFlush(new CustomProtocol(deviceId, DATA_TYPE, "SUCCESS".getBytes()));
|
|
|
|
|
- break;
|
|
|
|
|
-
|
|
|
|
|
- default:
|
|
|
|
|
- logger.warn("收到未知类型的消息: {}", protocol.getType());
|
|
|
|
|
- break;
|
|
|
|
|
|
|
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
|
|
|
|
+ logger.info("收到来自工控机程序的消息");
|
|
|
|
|
+ try {
|
|
|
|
|
+ logger.info("收到来自工控机程序的消息: {}",msg);
|
|
|
|
|
+ IndustrialControlData data = JSON.parseObject(msg.toString(), IndustrialControlData.class);
|
|
|
|
|
+ // 这里可以添加数据处理逻辑,例如存储到数据库
|
|
|
|
|
+ processIndustrialData(data);
|
|
|
|
|
+ // 回复确认消息
|
|
|
|
|
+ ctx.writeAndFlush("SUCCESS");
|
|
|
|
|
+ }catch (Exception e){
|
|
|
|
|
+ logger.error("处理数据异常: {}",e);
|
|
|
|
|
+ ctx.writeAndFlush("ERROR");
|
|
|
|
|
+ }
|
|
|
|
|
+ finally {
|
|
|
|
|
+ // 释放资源
|
|
|
|
|
+ ReferenceCountUtil.release(msg);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -86,8 +79,36 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
|
|
|
logger.info("客户端断开连接: {}", ctx.channel().remoteAddress());
|
|
logger.info("客户端断开连接: {}", ctx.channel().remoteAddress());
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // 数据处理方法,可根据实际需求实现
|
|
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 处理工控机数据
|
|
|
|
|
+ * @param data
|
|
|
|
|
+ */
|
|
|
private void processIndustrialData(IndustrialControlData data) {
|
|
private void processIndustrialData(IndustrialControlData data) {
|
|
|
// 这里实现数据存储或其他业务逻辑
|
|
// 这里实现数据存储或其他业务逻辑
|
|
|
|
|
+ String deviceId = data.getDeviceId();
|
|
|
|
|
+ Record device = Db.findFirst("select * from t_jz_device where id = ?", deviceId);
|
|
|
|
|
+ if(device == null){
|
|
|
|
|
+ logger.info("设备不存在: {}", deviceId);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ //查询设备详细信息
|
|
|
|
|
+ Record deviceDetail = Db.findFirst("select * from t_device_detail where device_id = ?", deviceId);
|
|
|
|
|
+ Record deviceDetailDb = new Record();
|
|
|
|
|
+ deviceDetailDb.set("device_id", deviceId);
|
|
|
|
|
+ deviceDetailDb.set("product_id", data.getProductId());
|
|
|
|
|
+ deviceDetailDb.set("cpu_usage", data.getCpuUsage());
|
|
|
|
|
+ deviceDetailDb.set("memory_usage", data.getMemoryUsage());
|
|
|
|
|
+ deviceDetailDb.set("disk_free", data.getDiskFreeSpace());
|
|
|
|
|
+ deviceDetailDb.set("current_task_quantity", data.getProductionTaskCount());
|
|
|
|
|
+ deviceDetailDb.set("current_quantity", data.getProductionTaskNum());
|
|
|
|
|
+ deviceDetailDb.set("total_quantity", data.getProductionTaskTotal());
|
|
|
|
|
+ deviceDetailDb.set("uploaded_quantity", data.getProductionTaskTransferNum());
|
|
|
|
|
+ deviceDetailDb.set("product_id",data.getProductId());
|
|
|
|
|
+ if (deviceDetail == null) {
|
|
|
|
|
+ Db.save("t_device_detail", deviceDetailDb);
|
|
|
|
|
+ }else{
|
|
|
|
|
+ deviceDetailDb.set("updated_time",new Date());
|
|
|
|
|
+ Db.update("t_device_detail", "device_id",deviceDetailDb);
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|