package com.qlm.netty.handler; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import com.jfinal.plugin.activerecord.Db; import com.jfinal.plugin.activerecord.Record; import com.qlm.netty.constant.NettyAttributes; import com.qlm.netty.constant.NettyTopic; import com.qlm.netty.manager.DeviceSessionManager; import com.qlm.netty.model.IndustrialControlData; import com.qlm.netty.model.NettyMessage; import com.qlm.netty.util.WhitelistValidator; import com.qlm.tools.WxUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; import java.util.Date; import java.util.concurrent.ScheduledFuture; import static com.qlm.netty.constant.CommandType.DEVICE_INFO_REPORT; /** * Netty服务端处理类 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private ScheduledFuture requestTask; // 心跳常量定义 private static final long HEARTBEAT_INTERVAL = 30000; // 30秒检查一次 private static final long SUSPICIOUS_TIMEOUT = 90000; // 90秒无心跳标记为可疑 private static final long CLOSE_TIMEOUT = 120000; // 120秒无心跳关闭连接 @Override public void channelActive(ChannelHandlerContext ctx) { // 获取客户端IP并验证白名单 InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); if (!WhitelistValidator.isValidIp(address)) { logger.warn("拒绝非白名单IP连接: {}", address.getAddress().getHostAddress()); ctx.close(); return; } logger.info("客户端连接成功: {}", ctx.channel().remoteAddress()); // 更新最后心跳时间和初始化可疑标记 ctx.channel().attr(NettyAttributes.LAST_HEARTBEAT_TIME).set(System.currentTimeMillis()); ctx.channel().attr(NettyAttributes.IS_SUSPICIOUS).set(false); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { String message = msg.toString(); logger.info("收到客户端消息: {}", message); // 尝试解析为统一格式消息 try { NettyMessage nettyMessage = JSON.parseObject(message, new TypeReference() {}); handleNettyMessage(ctx, nettyMessage); } catch (Exception e) { // 如果不是统一格式消息,记录警告但不中断处理 logger.warn("接收到非统一格式消息: {}", e.getMessage()); } } finally { // 释放资源 ReferenceCountUtil.release(msg); } } /** * 处理统一格式的Netty消息 */ private void handleNettyMessage(ChannelHandlerContext ctx, NettyMessage nettyMessage) { String topic = nettyMessage.getTopic(); String deviceNo = nettyMessage.getDeviceNo(); String messageId = nettyMessage.getMessageId(); // 将会话与会话管理器关联 if (deviceNo != null && !deviceNo.isEmpty()) { DeviceSessionManager.getInstance().addDeviceSession(deviceNo, ctx); ctx.channel().attr(NettyAttributes.DEVICE_ID).set(deviceNo); } // 根据消息主题处理不同类型的消息 try { switch (NettyTopic.fromCode(topic)) { case HEARTBEAT_RESPONSE: handleHeartbeat(ctx, nettyMessage); break; case HEARTBEAT_REQUEST: // 客户端发起的心跳请求,立即响应 handleClientHeartbeatRequest(ctx, nettyMessage); break; case DEVICE_ACTIVE_REPORT: handleDeviceActiveReport(ctx, nettyMessage); break; case COMMAND_DOWNLOAD_RESPONSE: break; case COMMON_RESPONSE: handleCommonResponse(ctx, nettyMessage); break; default: logger.warn("未处理的消息主题: {}", topic); // 返回未支持的主题响应 sendResponse(ctx, messageId, deviceNo, 1, "未支持的消息主题", null,NettyTopic.COMMON_RESPONSE.getCode()); break; } handleDeviceInfoResponse(ctx, nettyMessage); } catch (IllegalArgumentException e) { logger.error("处理消息异常: {}", e.getMessage()); sendResponse(ctx, messageId, deviceNo, 2, "处理消息异常: " + e.getMessage(), null,NettyTopic.COMMON_RESPONSE.getCode()); } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; switch (event.state()) { case READER_IDLE: // 读取超时,检查是否有心跳 checkHeartbeat(ctx); break; case WRITER_IDLE: // 写超时 break; case ALL_IDLE: // 所有超时 break; default: } } } /** * 检查心跳状态,实现90秒标记可疑、120秒关闭连接的逻辑 */ private void checkHeartbeat(ChannelHandlerContext ctx) { Long lastHeartbeatTime = ctx.channel().attr(NettyAttributes.LAST_HEARTBEAT_TIME).get(); if (lastHeartbeatTime != null) { long idleTime = System.currentTimeMillis() - lastHeartbeatTime; // 如果超过120秒没有心跳,则关闭连接 if (idleTime > CLOSE_TIMEOUT) { logger.info("客户端 {} 120秒无心跳,关闭连接并标记设备离线", ctx.channel().remoteAddress()); removeDeviceSession(ctx); ctx.close(); return; } // 如果超过90秒没有心跳,标记为可疑并发送主动心跳请求确认 if (idleTime > SUSPICIOUS_TIMEOUT) { Boolean isSuspicious = ctx.channel().attr(NettyAttributes.IS_SUSPICIOUS).get(); if (isSuspicious == null || !isSuspicious) { logger.info("客户端 {} 90秒无心跳,标记为可疑连接", ctx.channel().remoteAddress()); ctx.channel().attr(NettyAttributes.IS_SUSPICIOUS).set(true); ctx.channel().attr(NettyAttributes.SUSPICIOUS_TIME).set(System.currentTimeMillis()); } // 发送结构化的心跳请求进行确认 String deviceNo = ctx.channel().attr(NettyAttributes.DEVICE_ID).get(); NettyMessage heartbeatRequest = new NettyMessage( NettyTopic.HEARTBEAT_REQUEST.getCode(), deviceNo); ctx.writeAndFlush(JSON.toJSONString(heartbeatRequest)); logger.debug("向可疑客户端 {} 发送主动心跳请求进行确认", ctx.channel().remoteAddress()); return; } // 对于正常连接,每30秒也发送一次心跳请求 String deviceNo = ctx.channel().attr(NettyAttributes.DEVICE_ID).get(); NettyMessage heartbeatRequest = new NettyMessage( NettyTopic.HEARTBEAT_REQUEST.getCode(), deviceNo); ctx.writeAndFlush(JSON.toJSONString(heartbeatRequest)); logger.debug("向客户端 {} 发送定期心跳请求", ctx.channel().remoteAddress()); } else { ctx.close(); } } /** * 处理客户端发起的心跳请求 */ private void handleClientHeartbeatRequest(ChannelHandlerContext ctx, NettyMessage request) { // 更新最后心跳时间 ctx.channel().attr(NettyAttributes.LAST_HEARTBEAT_TIME).set(System.currentTimeMillis()); // 重置可疑标记 ctx.channel().attr(NettyAttributes.IS_SUSPICIOUS).set(false); // 立即回复心跳响应 NettyMessage response = new NettyMessage( NettyTopic.HEARTBEAT_RESPONSE.getCode(), request.getDeviceNo()); response.setMessageId(request.getMessageId()); response.setStatusCode(0); response.setMessage("心跳响应成功"); ctx.writeAndFlush(JSON.toJSONString(response)); } /** * 移除设备会话 */ private void removeDeviceSession(ChannelHandlerContext ctx) { String deviceId = ctx.channel().attr(NettyAttributes.DEVICE_ID).get(); if (deviceId != null) { // 可以在这里添加设备离线的业务逻辑 logger.info("设备 {} 已离线", deviceId); DeviceSessionManager.getInstance().removeDeviceSession(deviceId); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { logger.error("发生异常: {}", cause.getMessage(), cause); removeDeviceSession(ctx); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) { logger.info("客户端断开连接: {}", ctx.channel().remoteAddress()); removeDeviceSession(ctx); } /** * 发送响应消息 */ private void sendResponse(ChannelHandlerContext ctx, String messageId, String deviceNo, int statusCode, String message, T data,String topicType) { NettyMessage response = new NettyMessage (topicType, deviceNo); response.setMessageId(messageId); response.setStatusCode(statusCode); response.setMessage(message); response.setData(data); ctx.writeAndFlush(JSON.toJSONString(response)); logger.debug("发送响应: {}",JSON.toJSONString(response)); } /** * 处理心跳消息(新版) */ private void handleHeartbeat(ChannelHandlerContext ctx, NettyMessage message) { // 更新最后心跳时间 ctx.channel().attr(NettyAttributes.LAST_HEARTBEAT_TIME).set(System.currentTimeMillis()); // 重置可疑标记 ctx.channel().attr(NettyAttributes.IS_SUSPICIOUS).set(false); logger.debug("收到来自 {} 的心跳,消息ID: {}", ctx.channel().remoteAddress(), message.getMessageId()); // 回复心跳确认 //sendResponse(ctx, message.getMessageId(), message.getDeviceNo(),0, "心跳接收成功", null, NettyTopic.HEARTBEAT_RESPONSE.getCode()); } /** * 处理设备主动上报 */ private void handleDeviceActiveReport(ChannelHandlerContext ctx, NettyMessage message) { try { IndustrialControlData data = JSON.parseObject(message.getData().toString(), IndustrialControlData.class); // 处理工业数据 processIndustrialData(data,message.getDeviceNo()); // 发送成功响应 sendResponse(ctx, message.getMessageId(), message.getDeviceNo(), 0, "数据上报成功", null,NettyTopic.COMMON_RESPONSE.getCode()); } catch (Exception e) { logger.error("处理设备主动上报异常: {}", e.getMessage()); sendResponse(ctx, message.getMessageId(), message.getDeviceNo(), 3, "处理数据异常: " + e.getMessage(), null,NettyTopic.COMMON_RESPONSE.getCode()); } } /** * 处理设备信息响应,计算通信延迟 */ private void handleDeviceInfoResponse(ChannelHandlerContext ctx, NettyMessage message) { // 获取设备ID String deviceId = message.getDeviceNo(); long timestamp = message.getTimestamp(); //如何判断timestamp 时间有没有值 if (timestamp == 0) { return; } long delay = System.currentTimeMillis()-timestamp; if (deviceId != null) { try { // 查询设备是否存在 Record device = Db.findFirst("select * from t_jz_device where device_no = ?", deviceId); if (device != null) { // 查询设备详细信息 Record deviceDetail = Db.findFirst("select * from t_device_detail where device_id = ?", WxUtil.getInt("id", device)); if (deviceDetail != null) { // 更新设备延迟信息 deviceDetail.set("network_delay", delay); deviceDetail.set("updated_time", new Date()); Db.update("t_device_detail", "device_id", deviceDetail); } } } catch (Exception e) { logger.error("保存通信延迟信息失败: {}", e.getMessage()); } } } /** * 处理通用响应 */ private void handleCommonResponse(ChannelHandlerContext ctx, NettyMessage message) { logger.debug("收到客户端响应 - 消息ID: {}, 状态码: {}, 消息: {}", message.getMessageId(), message.getStatusCode(), message.getMessage()); // 可以根据实际需求处理客户端的响应 // 例如:更新指令执行状态等 } /** * 处理工控机数据 */ private void processIndustrialData(IndustrialControlData data,String deviceNo) { // 这里实现数据存储或其他业务逻辑 Record device = Db.findFirst("select * from t_jz_device where device_no = ?", deviceNo); if(device == null){ logger.info("设备不存在: {}", deviceNo); return; } Integer deviceId = WxUtil.getInt("id", device); //查询设备详细信息 Record deviceDetail = Db.findFirst("select * from t_device_detail where device_id = ?", deviceId); Record deviceDetailDb = new Record(); deviceDetailDb.set("device_id", deviceId); deviceDetailDb.set("product_sku", data.getProductSku()); deviceDetailDb.set("cpu_usage", StringUtils.isNotBlank(data.getCpuUsage())?data.getCpuUsage():"0.00"); deviceDetailDb.set("memory_usage", StringUtils.isNotBlank(data.getMemoryUsage())?data.getMemoryUsage():"0.00"); deviceDetailDb.set("disk_free", data.getDiskFreeSpace()); deviceDetailDb.set("current_task_quantity", data.getProductionTaskCount()); deviceDetailDb.set("current_quantity", data.getProductionTaskNum()); deviceDetailDb.set("total_quantity", data.getProductionTaskTotal()); deviceDetailDb.set("uploaded_quantity", data.getProductionTaskTransferNum()); if (deviceDetail == null) { Db.save("t_device_detail", deviceDetailDb); }else{ deviceDetailDb.set("updated_time",new Date()); Db.update("t_device_detail", "device_id",deviceDetailDb); } } }