Przeglądaj źródła

修复流地址返回错误

648540858 3 lat temu
rodzic
commit
bc38f5ef29

+ 24 - 27
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -92,39 +92,36 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 	@Override
 	public void process(RequestEvent evt) {
 		try {
-
 			taskQueue.offer(new HandlerCatchData(evt, null, null));
 			responseAck(evt, Response.OK);
 			if (!taskQueueHandlerRun) {
 				taskQueueHandlerRun = true;
 				taskExecutor.execute(()-> {
-							while (!taskQueue.isEmpty()) {
-								try {
-									HandlerCatchData take = taskQueue.poll();
-									Element rootElement = getRootElement(take.getEvt());
-									String cmd = XmlUtil.getText(rootElement, "CmdType");
-
-									if (CmdType.CATALOG.equals(cmd)) {
-										logger.info("接收到Catalog通知");
-										processNotifyCatalogList(take.getEvt());
-									} else if (CmdType.ALARM.equals(cmd)) {
-										logger.info("接收到Alarm通知");
-										processNotifyAlarm(take.getEvt());
-									} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
-										logger.info("接收到MobilePosition通知");
-										processNotifyMobilePosition(take.getEvt());
-									} else {
-										logger.info("接收到消息:" + cmd);
-									}
-								} catch (DocumentException e) {
-									throw new RuntimeException(e);
-								}
+					while (!taskQueue.isEmpty()) {
+						try {
+							HandlerCatchData take = taskQueue.poll();
+							Element rootElement = getRootElement(take.getEvt());
+							String cmd = XmlUtil.getText(rootElement, "CmdType");
+
+							if (CmdType.CATALOG.equals(cmd)) {
+								logger.info("接收到Catalog通知");
+								processNotifyCatalogList(take.getEvt());
+							} else if (CmdType.ALARM.equals(cmd)) {
+								logger.info("接收到Alarm通知");
+								processNotifyAlarm(take.getEvt());
+							} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
+								logger.info("接收到MobilePosition通知");
+								processNotifyMobilePosition(take.getEvt());
+							} else {
+								logger.info("接收到消息:" + cmd);
 							}
-						taskQueueHandlerRun = false;
-						});
+						} catch (DocumentException e) {
+							throw new RuntimeException(e);
+						}
+					}
+				taskQueueHandlerRun = false;
+				});
 			}
-
-
 		} catch (SipException | InvalidArgumentException | ParseException e) {
 			e.printStackTrace();
 		}
@@ -174,7 +171,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 			} else {
 				mobilePosition.setAltitude(0.0);
 			}
-			logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
 					mobilePosition.getLongitude(), mobilePosition.getLatitude());
 			mobilePosition.setReportSource("Mobile Position");
 

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java

@@ -67,9 +67,9 @@ public class MediaServiceImpl implements IMediaService {
                 JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class);
                 JSONArray tracks = mediaJSON.getJSONArray("tracks");
                 if (authority) {
-                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, streamAuthorityInfo.getCallId());
+                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,streamAuthorityInfo.getCallId());
                 }else {
-                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
+                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null);
                 }
 
             }

+ 26 - 3
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java

@@ -1,16 +1,21 @@
 package com.genersoft.iot.vmp.service.impl;
 
 import com.alibaba.fastjson.JSON;
+import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 /**
  * 接收来自redis的GPS更新通知
  * @author lin
@@ -20,13 +25,31 @@ public class RedisGpsMsgListener implements MessageListener {
 
     private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
 
+    private boolean taskQueueHandlerRun = false;
+
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+
     @Override
     public void onMessage(@NotNull Message message, byte[] bytes) {
-        // TODO 加消息队列
-        GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
-        redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
+                    redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
+                }
+                taskQueueHandlerRun = false;
+            });
+        }
     }
 }

+ 43 - 25
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java

@@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
 import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -21,14 +22,17 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 
 /**
@@ -40,6 +44,8 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
 
     private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class);
 
+    private boolean taskQueueHandlerRun = false;
+
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
@@ -47,39 +53,51 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
     private IStreamPushService streamPushService;
 
     @Autowired
-    private EventPublisher eventPublisher;
+    private DynamicTask dynamicTask;
 
-    @Autowired
-    private UserSetting userSetting;
 
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
     @Autowired
-    private DynamicTask dynamicTask;
+    private ThreadPoolTaskExecutor taskExecutor;
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
         // TODO 增加队列
         logger.warn("[REDIS 消息-推流设备状态变化]: {}", new String(message.getBody()));
-        //
-        PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class);
-        if (statusChangeFromPushStream == null) {
-            logger.warn("[REDIS 消息]推流设备状态变化消息解析失败");
-            return;
-        }
-        // 取消定时任务
-        dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
-        if (statusChangeFromPushStream.isSetAllOffline()) {
-            // 所有设备离线
-            streamPushService.allStreamOffline();
-        }
-        if (statusChangeFromPushStream.getOfflineStreams() != null
-                && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
-            // 更新部分设备离线
-            streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
-        }
-        if (statusChangeFromPushStream.getOnlineStreams() != null &&
-                statusChangeFromPushStream.getOnlineStreams().size() > 0) {
-            // 更新部分设备上线
-            streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
+        taskQueue.offer(message);
+
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
+                    if (statusChangeFromPushStream == null) {
+                        logger.warn("[REDIS 消息]推流设备状态变化消息解析失败");
+                        return;
+                    }
+                    // 取消定时任务
+                    dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
+                    if (statusChangeFromPushStream.isSetAllOffline()) {
+                        // 所有设备离线
+                        streamPushService.allStreamOffline();
+                    }
+                    if (statusChangeFromPushStream.getOfflineStreams() != null
+                            && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
+                        // 更新部分设备离线
+                        streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
+                    }
+                    if (statusChangeFromPushStream.getOnlineStreams() != null &&
+                            statusChangeFromPushStream.getOnlineStreams().size() > 0) {
+                        // 更新部分设备上线
+                        streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
+                    }
+                }
+                taskQueueHandlerRun = false;
+            });
         }
     }
 

+ 4 - 4
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -688,21 +688,21 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void sendMobilePositionMsg(JSONObject jsonObject) {
         String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
-        logger.info("[redis 移动位置订阅通知] {}: {}", key, jsonObject.toString());
+        logger.info("[redis发送通知]移动位置 {}: {}", key, jsonObject.toString());
         redis.convertAndSend(key, jsonObject);
     }
 
     @Override
     public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
         String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
-        logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream());
+        logger.info("[redis发送通知]推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream());
         redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
     }
 
     @Override
     public void sendAlarmMsg(AlarmChannelMessage msg) {
         String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM;
-        logger.info("[redis 报警通知] {}: {}", key, JSON.toJSON(msg));
+        logger.info("[redis发送通知] 报警{}: {}", key, JSON.toJSON(msg));
         redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
     }
 
@@ -715,7 +715,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void sendStreamPushRequestedMsgForStatus() {
         String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED;
-        logger.info("[redis 通知]获取所有推流设备的状态");
+        logger.info("[redis通知]获取所有推流设备的状态");
         JSONObject jsonObject = new JSONObject();
         jsonObject.put(key, key);
         redis.convertAndSend(key, jsonObject);