Browse Source

去除zlm使用redis过期作为心跳超时的方式

648540858 3 years ago
parent
commit
6e90050db4

+ 0 - 2
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -14,8 +14,6 @@ public class VideoManagerConstants {
 
 	public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_";
 
-	public static final String MEDIA_SERVER_KEEPALIVE_PREFIX = "VMP_MEDIA_SERVER_KEEPALIVE_";
-
 	public static final String MEDIA_SERVERS_ONLINE_PREFIX = "VMP_MEDIA_ONLINE_SERVERS_";
 
 	public static final String MEDIA_STREAM_PREFIX = "VMP_MEDIA_STREAM";

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/conf/RedisKeyExpirationEventMessageListener.java

@@ -1,5 +1,6 @@
-package com.genersoft.iot.vmp.conf;
+package com.genersoft.iot.vmp.conf.redis;
 
+import com.genersoft.iot.vmp.conf.UserSetting;
 import org.springframework.data.redis.connection.RedisConnection;
 import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
 import org.springframework.data.redis.listener.RedisMessageListenerContainer;

+ 81 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/event/offline/KeepaliveTimeoutListenerForPlatform.java

@@ -0,0 +1,81 @@
+package com.genersoft.iot.vmp.gb28181.event.offline;
+
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisKeyExpirationEventMessageListener;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+
+/**    
+ * 设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件
+ * @author swwheihei
+ */
+@Component
+public class KeepaliveTimeoutListenerForPlatform extends RedisKeyExpirationEventMessageListener {
+
+    private Logger logger = LoggerFactory.getLogger(KeepaliveTimeoutListenerForPlatform.class);
+
+	@Autowired
+	private EventPublisher publisher;
+
+	@Autowired
+	private UserSetting userSetting;
+
+	@Autowired
+	private SipSubscribe sipSubscribe;
+
+	@Autowired
+	private IVideoManagerStorage storager;
+
+    public KeepaliveTimeoutListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) {
+        super(listenerContainer, userSetting);
+    }
+
+
+    /**
+     * 监听失效的key
+     * @param message
+     * @param pattern
+     */
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        //  获取失效的key
+        String expiredKey = message.toString();
+        // 平台心跳到期,需要重发, 判断是否已经多次未收到心跳回复, 多次未收到,则重新发起注册, 注册尝试多次未得到回复,则认为平台离线
+        String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.PLATFORM_KEEPALIVE_PREFIX + userSetting.getServerId() + "_";
+        String PLATFORM_REGISTER_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_PREFIX + userSetting.getServerId() + "_";
+        String REGISTER_INFO_PREFIX = VideoManagerConstants.PLATFORM_REGISTER_INFO_PREFIX + userSetting.getServerId() + "_";
+        if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
+            String platformGbId = expiredKey.substring(PLATFORM_KEEPLIVEKEY_PREFIX.length());
+            ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGbId);
+            if (platform != null) {
+                publisher.platformKeepaliveExpireEventPublish(platformGbId);
+            }
+        }else if (expiredKey.startsWith(PLATFORM_REGISTER_PREFIX)) {
+            String platformGbId = expiredKey.substring(PLATFORM_REGISTER_PREFIX.length(),expiredKey.length());
+            ParentPlatform platform = storager.queryParentPlatByServerGBId(platformGbId);
+            if (platform != null) {
+                publisher.platformRegisterCycleEventPublish(platformGbId);
+            }
+        }else if (expiredKey.startsWith(REGISTER_INFO_PREFIX)) {
+            String callId = expiredKey.substring(REGISTER_INFO_PREFIX.length());
+            if (sipSubscribe.getErrorSubscribe(callId) != null) {
+                SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
+                eventResult.callId = callId;
+                eventResult.msg = "注册超时";
+                eventResult.type = "register timeout";
+                sipSubscribe.getErrorSubscribe(callId).response(eventResult);
+            }
+        }
+    }
+}

+ 0 - 72
src/main/java/com/genersoft/iot/vmp/media/zlm/event/ZLMKeepliveTimeoutListener.java

@@ -1,72 +0,0 @@
-package com.genersoft.iot.vmp.media.zlm.event;
-
-import com.alibaba.fastjson.JSONObject;
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.stereotype.Component;
-
-/**    
- * @description:设备心跳超时监听,借助redis过期特性,进行监听,监听到说明设备心跳超时,发送离线事件
- * @author: swwheihei
- * @date:   2020年5月6日 上午11:35:46     
- */
-@Component
-public class ZLMKeepliveTimeoutListener extends RedisKeyExpirationEventMessageListener {
-
-    private Logger logger = LoggerFactory.getLogger(ZLMKeepliveTimeoutListener.class);
-
-	@Autowired
-	private EventPublisher publisher;
-
-	@Autowired
-	private ZLMRESTfulUtils zlmresTfulUtils;
-
-	@Autowired
-	private UserSetting userSetting;
-
-	@Autowired
-	private IMediaServerService mediaServerService;
-
-    public ZLMKeepliveTimeoutListener(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) {
-        super(listenerContainer, userSetting);
-    }
-
-
-    /**
-     * 监听失效的key,key格式为keeplive_deviceId
-     * @param message
-     * @param pattern
-     */
-    @Override
-    public void onMessage(Message message, byte[] pattern) {
-        //  获取失效的key
-        String expiredKey = message.toString();
-        String KEEPLIVEKEY_PREFIX = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_";
-        if(!expiredKey.startsWith(KEEPLIVEKEY_PREFIX)){
-        	return;
-        }
-        
-        String mediaServerId = expiredKey.substring(KEEPLIVEKEY_PREFIX.length(),expiredKey.length());
-        logger.info("[zlm心跳到期]:" + mediaServerId);
-        // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理
-        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
-        JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
-        if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) {
-            logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息", mediaServerId);
-            // 添加zlm信息
-            mediaServerService.updateMediaServerKeepalive(mediaServerId, mediaServerConfig);
-        }else {
-            publisher.zlmOfflineEventPublish(mediaServerId);
-        }
-    }
-}

+ 35 - 5
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@@ -8,6 +8,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import org.slf4j.Logger;
@@ -53,6 +54,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
 
     private final static Logger logger = LoggerFactory.getLogger(MediaServerServiceImpl.class);
 
+    private final String zlmKeepaliveKeyPrefix = "zlm-keepalive_";
+
     @Autowired
     private SipConfig sipConfig;
 
@@ -83,10 +86,12 @@ public class MediaServerServiceImpl implements IMediaServerService {
     @Autowired
     private ZLMRTPServerFactory zlmrtpServerFactory;
 
-
     @Autowired
     private EventPublisher publisher;
 
+    @Autowired
+    private DynamicTask dynamicTask;
+
     /**
      * 初始化
      */
@@ -398,11 +403,37 @@ public class MediaServerServiceImpl implements IMediaServerService {
         if (serverItem.isAutoConfig()) {
             setZLMConfig(serverItem, "0".equals(zlmServerConfig.getHookEnable()));
         }
+        final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + serverItem.getId();
+        dynamicTask.stop(zlmKeepaliveKey);
+        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(serverItem), serverItem.getHookAliveInterval() * 1000);
         publisher.zlmOnlineEventPublish(serverItem.getId());
         logger.info("[ZLM] 连接成功 {} - {}:{} ",
                 zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
     }
 
+    class KeepAliveTimeoutRunnable implements Runnable{
+
+        private MediaServerItem serverItem;
+
+        public KeepAliveTimeoutRunnable(MediaServerItem serverItem) {
+            this.serverItem = serverItem;
+        }
+
+        @Override
+        public void run() {
+            logger.info("[zlm心跳到期]:" + serverItem.getId());
+            // 发起http请求验证zlm是否确实无法连接,如果确实无法连接则发送离线事件,否则不作处理
+            JSONObject mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(serverItem);
+            if (mediaServerConfig != null && mediaServerConfig.getInteger("code") == 0) {
+                logger.info("[zlm心跳到期]:{}验证后zlm仍在线,恢复心跳信息,请检查zlm是否可以正常向wvp发送心跳", serverItem.getId());
+                // 添加zlm信息
+                updateMediaServerKeepalive(serverItem.getId(), mediaServerConfig);
+            }else {
+                publisher.zlmOfflineEventPublish(serverItem.getId());
+            }
+        }
+    }
+
 
     @Override
     public void zlmServerOffline(String mediaServerId) {
@@ -429,7 +460,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
         }else {
             clearRTPServer(serverItem);
         }
-
     }
 
 
@@ -625,9 +655,9 @@ public class MediaServerServiceImpl implements IMediaServerService {
                 return;
             }
         }
-        String key = VideoManagerConstants.MEDIA_SERVER_KEEPALIVE_PREFIX + userSetting.getServerId() + "_" + mediaServerId;
-        int hookAliveInterval = mediaServerItem.getHookAliveInterval() + 2;
-        RedisUtil.set(key, data, hookAliveInterval);
+        final String zlmKeepaliveKey = zlmKeepaliveKeyPrefix + mediaServerItem.getId();
+        dynamicTask.stop(zlmKeepaliveKey);
+        dynamicTask.startDelay(zlmKeepaliveKey, new KeepAliveTimeoutRunnable(mediaServerItem), mediaServerItem.getHookAliveInterval() * 1000);
     }
 
     private MediaServerItem getOneFromDatabase(String mediaServerId) {