Quellcode durchsuchen

优化streamchannge hook以及对推流的识别

648540858 vor 4 Jahren
Ursprung
Commit
f61051c463

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/common/StreamInfo.java

@@ -30,7 +30,7 @@ public class StreamInfo {
     private String rtsps;
     private String rtc;
     private String mediaServerId;
-    private JSONArray tracks;
+    private Object tracks;
 
     public static class TransactionInfo{
         public String callId;
@@ -105,11 +105,11 @@ public class StreamInfo {
         this.rtsp = rtsp;
     }
 
-    public JSONArray getTracks() {
+    public Object getTracks() {
         return tracks;
     }
 
-    public void setTracks(JSONArray tracks) {
+    public void setTracks(Object tracks) {
         this.tracks = tracks;
     }
 

+ 35 - 23
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -3,11 +3,13 @@ package com.genersoft.iot.vmp.media.zlm;
 import java.util.List;
 import java.util.UUID;
 
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.MediaConfig;
 import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IMediaService;
@@ -258,12 +260,13 @@ public class ZLMHttpHookListener {
 	 */
 	@ResponseBody
 	@PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
-	public ResponseEntity<String> onStreamChanged(@RequestBody JSONObject json){
+	public ResponseEntity<String> onStreamChanged(@RequestBody MediaItem item){
 		
 		if (logger.isDebugEnabled()) {
-			logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + json.toString());
+			logger.debug("ZLM HOOK on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
 		}
-		String mediaServerId = json.getString("mediaServerId");
+		String mediaServerId = item.getMediaServerId();
+		JSONObject json = (JSONObject) JSON.toJSON(item);
 		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
 		if (subscribe != null ) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
@@ -272,13 +275,12 @@ public class ZLMHttpHookListener {
 			}
 
 		}
-
 		// 流消失移除redis play
-		String app = json.getString("app");
-		String streamId = json.getString("stream");
-		String schema = json.getString("schema");
-		JSONArray tracks = json.getJSONArray("tracks");
-		boolean regist = json.getBoolean("regist");
+		String app = item.getApp();
+		String streamId = item.getStream();
+		String schema = item.getSchema();
+		List<MediaItem.MediaTrack> tracks = item.getTracks();
+		boolean regist = item.isRegist();
 		if (tracks != null) {
 			logger.info("[stream: " + streamId + "] on_stream_changed->>" + schema);
 		}
@@ -298,24 +300,34 @@ public class ZLMHttpHookListener {
 					redisCatchStorage.stopPlayback(streamInfo);
 				}
 			}else {
-				if (!"rtp".equals(app) ){
-					// 发送流变化redis消息
-					JSONObject jsonObject = new JSONObject();
-					jsonObject.put("serverId", userSetup.getServerId());
-					jsonObject.put("app", app);
-					jsonObject.put("stream", streamId);
-					jsonObject.put("register", regist);
-					jsonObject.put("mediaServerId", mediaServerId);
-					redisCatchStorage.sendStreamChangeMsg(jsonObject);
+				if (!"rtp".equals(app)){
+
+					boolean pushChange = false;
 
 					MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
 					if (regist) {
-						zlmMediaListManager.addMedia(mediaServerItem, app, streamId);
-						StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
-						redisCatchStorage.addStream(mediaServerItem, app, streamId, streamInfo);
+						if ((item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8)) {
+							pushChange = true;
+							zlmMediaListManager.addMedia(item);
+							StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem, app, streamId, tracks);
+							redisCatchStorage.addPushStream(mediaServerItem, app, streamId, streamInfo);
+						}
 					}else {
-						zlmMediaListManager.removeMedia( app, streamId);
-						redisCatchStorage.removeStream(mediaServerItem, app, streamId);
+						int result = zlmMediaListManager.removeMedia( app, streamId);
+						redisCatchStorage.removePushStream(mediaServerItem, app, streamId);
+						if (result > 0) {
+							pushChange = true;
+						}
+					}
+					if(pushChange) {
+						// 发送流变化redis消息
+						JSONObject jsonObject = new JSONObject();
+						jsonObject.put("serverId", userSetup.getServerId());
+						jsonObject.put("app", app);
+						jsonObject.put("stream", streamId);
+						jsonObject.put("register", regist);
+						jsonObject.put("mediaServerId", mediaServerId);
+						redisCatchStorage.sendStreamChangeMsg(jsonObject);
 					}
 				}
 			}

+ 10 - 3
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java

@@ -1,6 +1,7 @@
 package com.genersoft.iot.vmp.media.zlm;
 
 import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
@@ -87,6 +88,10 @@ public class ZLMMediaListManager {
         updateMedia(mediaServerItem, app, streamId);
     }
 
+    public void addMedia(MediaItem mediaItem) {
+        storager.updateMedia(streamPushService.transform(mediaItem));
+    }
+
 
     public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) {
         //使用异步更新推流
@@ -113,14 +118,16 @@ public class ZLMMediaListManager {
     }
 
 
-    public void removeMedia(String app, String streamId) {
+    public int removeMedia(String app, String streamId) {
         // 查找是否关联了国标, 关联了不删除, 置为离线
         StreamProxyItem streamProxyItem = gbStreamMapper.selectOne(app, streamId);
+        int result = 0;
         if (streamProxyItem == null) {
-            storager.removeMedia(app, streamId);
+            result = storager.removeMedia(app, streamId);
         }else {
-            storager.mediaOutline(app, streamId);
+            result =storager.mediaOutline(app, streamId);
         }
+        return result;
     }
 
 //    public void clearAllSessions() {

+ 26 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/MediaItem.java

@@ -4,6 +4,11 @@ import java.util.List;
 
 public class MediaItem {
 
+    /**
+     * 注册/注销
+     */
+    private boolean regist;
+
     /**
      * 应用名
      */
@@ -53,6 +58,11 @@ public class MediaItem {
      */
     private String originUrl;
 
+    /**
+     * 服务器id
+     */
+    private String mediaServerId;
+
     /**
      * GMT unix系统时间戳,单位秒
      */
@@ -78,6 +88,14 @@ public class MediaItem {
      */
     private String vhost;
 
+    public boolean isRegist() {
+        return regist;
+    }
+
+    public void setRegist(boolean regist) {
+        this.regist = regist;
+    }
+
     /**
      * 是否是docker部署, docker部署不会自动更新zlm使用的端口,需要自己手动修改
      */
@@ -376,4 +394,12 @@ public class MediaItem {
     public void setDocker(boolean docker) {
         this.docker = docker;
     }
+
+    public String getMediaServerId() {
+        return mediaServerId;
+    }
+
+    public void setMediaServerId(String mediaServerId) {
+        this.mediaServerId = mediaServerId;
+    }
 }

+ 9 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamProxyItem.java

@@ -17,6 +17,7 @@ public class StreamProxyItem extends GbStream {
     private boolean enable;
     private boolean enable_hls;
     private boolean enable_mp4;
+    private boolean enable_remove_none_reader; // 无人观看时删除
     private String platformGbId;
     private String createTime;
 
@@ -142,4 +143,12 @@ public class StreamProxyItem extends GbStream {
     public void setCreateTime(String createTime) {
         this.createTime = createTime;
     }
+
+    public boolean isEnable_remove_none_reader() {
+        return enable_remove_none_reader;
+    }
+
+    public void setEnable_remove_none_reader(boolean enable_remove_none_reader) {
+        this.enable_remove_none_reader = enable_remove_none_reader;
+    }
 }

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/service/IMediaService.java

@@ -32,7 +32,7 @@ public interface IMediaService {
      * @param stream
      * @return
      */
-    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, JSONArray tracks);
+    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaServerItem, String app, String stream, Object tracks);
 
     /**
      * 根据应用名和流ID获取播放地址, 只是地址拼接,返回的ip使用远程访问ip,适用与zlm与wvp在一台主机的情况
@@ -40,5 +40,5 @@ public interface IMediaService {
      * @param stream
      * @return
      */
-    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr);
+    StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr);
 }

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@@ -1,6 +1,7 @@
 package com.genersoft.iot.vmp.service;
 
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.github.pagehelper.PageInfo;
@@ -32,4 +33,6 @@ public interface IStreamPushService {
      * @return
      */
     PageInfo<StreamPushItem> getPushList(Integer page, Integer count);
+
+    StreamPushItem transform(MediaItem item);
 }

+ 3 - 2
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -31,7 +32,7 @@ public class MediaServiceImpl implements IMediaService {
 
 
     @Override
-    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks) {
+    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks) {
         return getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
     }
 
@@ -69,7 +70,7 @@ public class MediaServiceImpl implements IMediaService {
     }
 
     @Override
-    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, JSONArray tracks, String addr) {
+    public StreamInfo getStreamInfoByAppAndStream(MediaServerItem mediaInfo, String app, String stream, Object tracks, String addr) {
         StreamInfo streamInfoResult = new StreamInfo();
         streamInfoResult.setStreamId(stream);
         streamInfoResult.setApp(app);

+ 27 - 22
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -51,33 +51,38 @@ public class StreamPushServiceImpl implements IStreamPushService {
         for (MediaItem item : mediaItems) {
 
             // 不保存国标推理以及拉流代理的流
-            if (item.getOriginType() == 3 || item.getOriginType() == 4 || item.getOriginType() == 5) {
-                continue;
-            }
-            String key = item.getApp() + "_" + item.getStream();
-            StreamPushItem streamPushItem = result.get(key);
-            if (streamPushItem == null) {
-                streamPushItem = new StreamPushItem();
-                streamPushItem.setApp(item.getApp());
-                streamPushItem.setMediaServerId(mediaServerItem.getId());
-                streamPushItem.setStream(item.getStream());
-                streamPushItem.setAliveSecond(item.getAliveSecond());
-                streamPushItem.setCreateStamp(item.getCreateStamp());
-                streamPushItem.setOriginSock(item.getOriginSock());
-                streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
-                streamPushItem.setOriginType(item.getOriginType());
-                streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
-                streamPushItem.setOriginUrl(item.getOriginUrl());
-                streamPushItem.setCreateStamp(item.getCreateStamp());
-                streamPushItem.setAliveSecond(item.getAliveSecond());
-                streamPushItem.setStatus(true);
-                streamPushItem.setVhost(item.getVhost());
-                result.put(key, streamPushItem);
+            if (item.getOriginType() == 1 || item.getOriginType() == 2 || item.getOriginType() == 8) {
+                String key = item.getApp() + "_" + item.getStream();
+                StreamPushItem streamPushItem = result.get(key);
+                if (streamPushItem == null) {
+                    streamPushItem = transform(item);
+                    result.put(key, streamPushItem);
+                }
             }
+
         }
 
         return new ArrayList<>(result.values());
     }
+    @Override
+    public StreamPushItem transform(MediaItem item) {
+        StreamPushItem streamPushItem = new StreamPushItem();
+        streamPushItem.setApp(item.getApp());
+        streamPushItem.setMediaServerId(item.getMediaServerId());
+        streamPushItem.setStream(item.getStream());
+        streamPushItem.setAliveSecond(item.getAliveSecond());
+        streamPushItem.setCreateStamp(item.getCreateStamp());
+        streamPushItem.setOriginSock(item.getOriginSock());
+        streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
+        streamPushItem.setOriginType(item.getOriginType());
+        streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
+        streamPushItem.setOriginUrl(item.getOriginUrl());
+        streamPushItem.setCreateStamp(item.getCreateStamp());
+        streamPushItem.setAliveSecond(item.getAliveSecond());
+        streamPushItem.setStatus(true);
+        streamPushItem.setVhost(item.getVhost());
+        return streamPushItem;
+    }
 
     @Override
     public PageInfo<StreamPushItem> getPushList(Integer page, Integer count) {

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -135,7 +135,7 @@ public interface IRedisCatchStorage {
      * @param app
      * @param streamId
      */
-    void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo);
+    void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo);
 
     /**
      * 移除流信息从redis
@@ -143,5 +143,5 @@ public interface IRedisCatchStorage {
      * @param app
      * @param streamId
      */
-    void removeStream(MediaServerItem mediaServerItem, String app, String streamId);
+    void removePushStream(MediaServerItem mediaServerItem, String app, String streamId);
 }

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorager.java

@@ -353,7 +353,7 @@ public interface IVideoManagerStorager {
 	 * @param app
 	 * @param stream
 	 */
-	void removeMedia(String app, String stream);
+	int removeMedia(String app, String stream);
 
 
 	/**
@@ -366,7 +366,7 @@ public interface IVideoManagerStorager {
 	 * @param app
 	 * @param streamId
 	 */
-	void mediaOutline(String app, String streamId);
+	int mediaOutline(String app, String streamId);
 
 	/**
 	 * 设置平台在线/离线

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/storager/dao/GbStreamMapper.java

@@ -53,7 +53,7 @@ public interface GbStreamMapper {
     @Update("UPDATE gb_stream " +
             "SET status=${status} " +
             "WHERE app=#{app} AND stream=#{stream}")
-    void setStatus(String app, String stream, boolean status);
+    int setStatus(String app, String stream, boolean status);
 
     @Select("SELECT gs.*, pgs.platformId FROM gb_stream gs LEFT JOIN  platform_gb_stream pgs ON gs.app = pgs.app AND gs.stream = pgs.stream WHERE mediaServerId=#{mediaServerId} ")
     List<GbStream> selectAllByMediaServerId(String mediaServerId);

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -308,13 +308,13 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     }
 
     @Override
-    public void addStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) {
+    public void addPushStream(MediaServerItem mediaServerItem, String app, String streamId, StreamInfo streamInfo) {
         String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId();
         redis.set(key, streamInfo);
     }
 
     @Override
-    public void removeStream(MediaServerItem mediaServerItem, String app, String streamId) {
+    public void removePushStream(MediaServerItem mediaServerItem, String app, String streamId) {
         String key = VideoManagerConstants.WVP_SERVER_STREAM_PUSH_PREFIX + app + "_" + streamId + "_" + mediaServerItem.getId();
         redis.del(key);
     }

+ 4 - 4
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStoragerImpl.java

@@ -605,8 +605,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
 	}
 
 	@Override
-	public void removeMedia(String app, String stream) {
-		streamPushMapper.del(app, stream);
+	public int removeMedia(String app, String stream) {
+		return streamPushMapper.del(app, stream);
 	}
 
 	@Override
@@ -615,8 +615,8 @@ public class VideoManagerStoragerImpl implements IVideoManagerStorager {
 	}
 
 	@Override
-	public void mediaOutline(String app, String streamId) {
-		gbStreamMapper.setStatus(app, streamId, false);
+	public int mediaOutline(String app, String streamId) {
+		return gbStreamMapper.setStatus(app, streamId, false);
 	}
 
 	@Override