Browse Source

在redis中添加wvp存活依据,添加推流变化消息

648540858 4 years atrás
parent
commit
b1c92cf4e8

+ 6 - 1
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -8,7 +8,9 @@ package com.genersoft.iot.vmp.common;
  */
 public class VideoManagerConstants {
 	
-	public static final String WVP_SERVER_PREFIX = "VMP_wvp_server";
+	public static final String WVP_SERVER_PREFIX = "VMP_SIGNALLING_SERVER_INFO_";
+
+	public static final String WVP_SERVER_STREAM_PUSH_PREFIX = "VMP_SIGNALLING_STREAM_PUSH_";
 
 	public static final String MEDIA_SERVER_PREFIX = "VMP_MEDIA_SERVER_";
 
@@ -51,4 +53,7 @@ public class VideoManagerConstants {
 	public static final String MEDIA_SSRC_USED_PREFIX = "VMP_media_used_ssrc_";
 
 	public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_media_transaction_";
+
+	//************************** redis 消息*********************************
+	public static final String WVP_MSG_STREAM_PUSH_CHANGE_PREFIX = "WVP_msg_stream_push_change";
 }

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/conf/UserSetup.java

@@ -27,6 +27,8 @@ public class UserSetup {
 
     private Boolean logInDatebase = Boolean.TRUE;
 
+    private String serverId = "000000";
+
     private List<String> interfaceAuthenticationExcludes = new ArrayList<>();
 
     public Boolean getSavePositionHistory() {
@@ -104,4 +106,12 @@ public class UserSetup {
     public void setLogInDatebase(Boolean logInDatebase) {
         this.logInDatebase = logInDatebase;
     }
+
+    public String getServerId() {
+        return serverId;
+    }
+
+    public void setServerId(String serverId) {
+        this.serverId = serverId;
+    }
 }

+ 0 - 25
src/main/java/com/genersoft/iot/vmp/conf/VManagerConfig.java

@@ -1,25 +0,0 @@
-package com.genersoft.iot.vmp.conf;
-
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-
-/**    
- * @description: 获取数据库配置   
- * @author: swwheihei
- * @date:   2020年5月6日 下午2:46:00     
- */
-@Configuration("vmConfig")
-public class VManagerConfig {
-
-	@Value("${spring.application.database:redis}")
-    private String database;
-
-
-    public String getDatabase() {
-        return database;
-    }
-
-    public void setDatabase(String database) {
-        this.database = database;
-    }
-}

+ 18 - 6
src/main/java/com/genersoft/iot/vmp/conf/WVPTimerTask.java

@@ -1,7 +1,10 @@
 package com.genersoft.iot.vmp.conf;
 
+import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
@@ -12,13 +15,22 @@ public class WVPTimerTask {
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
-    private SipConfig sipConfig;
+    private IMediaServerService mediaServerService;
 
     @Autowired
-    private MediaConfig mediaConfig;
+    private UserSetup userSetup;
+
+    @Value("${server.port}")
+    private int serverPort;
+
+    @Autowired
+    private SipConfig sipConfig;
 
-//    @Scheduled(cron="0/2 * *  * * ? ")   //每3秒执行一次
-//    public void execute(){
-////        redisCatchStorage.updateWVPInfo();
-//    }
+    @Scheduled(fixedRate = 2 * 1000)   //每3秒执行一次
+    public void execute(){
+        JSONObject jsonObject = new JSONObject();
+        jsonObject.put("ip", sipConfig.getIp());
+        jsonObject.put("port", serverPort);
+        redisCatchStorage.updateWVPInfo(userSetup.getServerId(), jsonObject, 3);
+    }
 }

+ 0 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/callback/DeferredResultHolder.java

@@ -37,8 +37,6 @@ public class DeferredResultHolder {
 
 	public static final String CALLBACK_CMD_STOP = "CALLBACK_STOP";
 
-	public static final String CALLBACK_ONVIF = "CALLBACK_ONVIF";
-
 	public static final String CALLBACK_CMD_MOBILEPOSITION = "CALLBACK_MOBILEPOSITION";
 
 	public static final String CALLBACK_CMD_PRESETQUERY = "CALLBACK_PRESETQUERY";

+ 16 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IMediaService;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
@@ -56,6 +57,9 @@ public class ZLMHttpHookListener {
 	@Autowired
 	private IMediaServerService mediaServerService;
 
+	@Autowired
+	private IMediaService mediaService;
+
 	@Autowired
 	private ZLMRESTfulUtils zlmresTfulUtils;
 
@@ -295,11 +299,23 @@ public class ZLMHttpHookListener {
 				}
 			}else {
 				if (!"rtp".equals(app) ){
+					// 发送流变化redis消息
+					JSONObject jsonObject = new JSONObject();
+					jsonObject.put("serverId", userSetup.getServerId());
+					jsonObject.put("app", app);
+					jsonObject.put("stream", streamId);
+					jsonObject.put("register", regist);
+					jsonObject.put("mediaServerId", mediaServerId);
+					redisCatchStorage.sendStreamChangeMsg(jsonObject);
+
 					MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
 					if (regist) {
 						zlmMediaListManager.addMedia(mediaServerItem, app, streamId);
+						StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
+						redisCatchStorage.addStream(mediaServerItem, app, streamId, streamInfo);
 					}else {
 						zlmMediaListManager.removeMedia( app, streamId);
+						redisCatchStorage.removeStream(mediaServerItem, app, streamId);
 					}
 				}
 			}

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@@ -47,7 +47,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
     private boolean sslEnabled;
 
     @Value("${server.port}")
-    private String serverPort;
+    private Integer serverPort;
 
     @Autowired
     private MediaConfig mediaConfig;

+ 24 - 1
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatformCatch;
 import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 
 import java.util.List;
 import java.util.Map;
@@ -120,5 +121,27 @@ public interface IRedisCatchStorage {
     /**
      * 在redis添加wvp的信息
      */
-    void updateWVPInfo(JSONObject jsonObject);
+    void updateWVPInfo(String id, JSONObject jsonObject, int time);
+
+    /**
+     * 发送推流生成与推流消失消息
+     * @param jsonObject 消息内容
+     */
+    void sendStreamChangeMsg(JSONObject jsonObject);
+
+    /**
+     * 添加流信息到redis
+     * @param mediaServerItem
+     * @param app
+     * @param streamId
+     */
+    void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo);
+
+    /**
+     * 移除流信息从redis
+     * @param mediaServerItem
+     * @param app
+     * @param streamId
+     */
+    void removeStream(MediaServerItem mediaServerItem, String app, String streamId);
 }

+ 20 - 1
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.dao.DeviceChannelMapper;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
@@ -295,8 +296,26 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     }
 
     @Override
-    public void updateWVPInfo(JSONObject jsonObject) {
+    public void updateWVPInfo(String id, JSONObject jsonObject, int time) {
+        String key = VideoManagerConstants.WVP_SERVER_PREFIX + id;
+        redis.set(key, jsonObject, time);
+    }
+
+    @Override
+    public void sendStreamChangeMsg(JSONObject jsonObject) {
+        String key = VideoManagerConstants.WVP_MSG_STREAM_PUSH_CHANGE_PREFIX;
+        redis.convertAndSend(key, jsonObject.toJSONString());
+    }
 
+    @Override
+    public void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) {
+        String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId();
+        redis.set(key, streamInfo);
     }
 
+    @Override
+    public void removeStream(MediaServerItem mediaServerItem, String app, String streamId) {
+        String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId();
+        redis.del(key);
+    }
 }

+ 6 - 0
src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java

@@ -729,4 +729,10 @@ public class RedisUtil {
         return new ArrayList<>(keys);
     }
 
+    //    ============================== 消息发送与订阅 ==============================
+    public void convertAndSend(String channel, String msg) {
+        redisTemplate.convertAndSend(channel, msg);
+
+    }
+
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/web/ApiCompatibleController.java

@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.web;
+package com.genersoft.iot.vmp.web.gb28181;
 
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.service.IMediaService;

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/web/ApiControlController.java

@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.web;
+package com.genersoft.iot.vmp.web.gb28181;
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.gb28181.bean.Device;

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/web/ApiController.java

@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.web;
+package com.genersoft.iot.vmp.web.gb28181;
 
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.SipConfig;

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/web/ApiDeviceController.java

@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.web;
+package com.genersoft.iot.vmp.web.gb28181;
 
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/web/ApiStreamController.java

@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.web;
+package com.genersoft.iot.vmp.web.gb28181;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/web/AuthController.java

@@ -1,4 +1,4 @@
-package com.genersoft.iot.vmp.web;
+package com.genersoft.iot.vmp.web.gb28181;
 
 import com.genersoft.iot.vmp.service.IUserService;
 import com.genersoft.iot.vmp.storager.dao.dto.User;