소스 검색

优化多wvp国标级联推流

648540858 1 년 전
부모
커밋
b4168c02cb
17개의 변경된 파일332개의 추가작업 그리고 405개의 파일을 삭제
  1. 1 0
      src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java
  2. 5 5
      src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
  3. 15 1
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java
  4. 1 11
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
  5. 1 4
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
  6. 78 104
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  7. 13 13
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  8. 0 138
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
  9. 1 0
      src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
  10. 1 13
      src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
  11. 17 0
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
  12. 97 0
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
  13. 59 18
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
  14. 26 1
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
  15. 0 97
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java
  16. 4 0
      src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java
  17. 13 0
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -74,6 +74,7 @@ public class VideoManagerConstants {
 	public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
 	public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
 	public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
+	public static final String PUSH_STREAM_ONLINE = "VMP_PUSH_STREAM_ONLINE:";
 
 
 

+ 5 - 5
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java

@@ -28,9 +28,6 @@ public class RedisMsgListenConfig {
 	@Autowired
 	private RedisAlarmMsgListener redisAlarmMsgListener;
 
-	@Autowired
-	private RedisStreamMsgListener redisStreamMsgListener;
-
 	@Autowired
 	private RedisPushStreamStatusMsgListener redisPushStreamStatusMsgListener;
 
@@ -52,6 +49,9 @@ public class RedisMsgListenConfig {
 	@Autowired
 	private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
 
+	@Autowired
+	private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
+
 
 	/**
 	 * redis消息监听器容器 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
@@ -67,14 +67,14 @@ public class RedisMsgListenConfig {
         container.setConnectionFactory(connectionFactory);
 		container.addMessageListener(redisGPSMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_GPS));
 		container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
-		container.addMessageListener(redisStreamMsgListener, new PatternTopic(VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + "PUSH"));
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
 		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
 		container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
 		container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
-		container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.WAITE_SEND_PUSH_STREAM));
 		container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
+		container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED));
+		container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE));
         return container;
     }
 }

+ 15 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpItem.java

@@ -132,6 +132,11 @@ public class SendRtpItem {
      */
     private String receiveStream;
 
+    /**
+     * 上级的点播类型
+     */
+    private String sessionName;
+
     public String getIp() {
         return ip;
     }
@@ -332,6 +337,14 @@ public class SendRtpItem {
         this.localIp = localIp;
     }
 
+    public String getSessionName() {
+        return sessionName;
+    }
+
+    public void setSessionName(String sessionName) {
+        this.sessionName = sessionName;
+    }
+
     @Override
     public String toString() {
         return "SendRtpItem{" +
@@ -347,7 +360,7 @@ public class SendRtpItem {
                 ", stream='" + stream + '\'' +
                 ", tcp=" + tcp +
                 ", tcpActive=" + tcpActive +
-                ", localIp=" + localIp +
+                ", localIp='" + localIp + '\'' +
                 ", localPort=" + localPort +
                 ", mediaServerId='" + mediaServerId + '\'' +
                 ", serverId='" + serverId + '\'' +
@@ -360,6 +373,7 @@ public class SendRtpItem {
                 ", rtcp=" + rtcp +
                 ", playType=" + playType +
                 ", receiveStream='" + receiveStream + '\'' +
+                ", sessionName='" + sessionName + '\'' +
                 '}';
     }
 }

+ 1 - 11
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -16,7 +16,6 @@ import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlayService;
 import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import org.slf4j.Logger;
@@ -77,9 +76,6 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private DynamicTask dynamicTask;
 
-	@Autowired
-	private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
 	@Autowired
 	private IPlayService playService;
 
@@ -117,13 +113,7 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 		if (parentPlatform != null) {
 			Map<String, Object> param = getSendRtpParam(sendRtpItem);
 			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
-				RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
-						sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
-						sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
-						sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
-				redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
-					playService.startSendRtpStreamHand(sendRtpItem, parentPlatform, json, param, callIdHeader);
-				});
+				redisCatchStorage.sendStartSendRtp(sendRtpItem);
 			} else {
 				JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
 				if (startSendRtpStreamResult != null) {

+ 1 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -19,7 +19,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.message.SIPRequest;
@@ -98,8 +97,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private IStreamPushService pushService;
 
-	@Autowired
-	private RedisGbPlayMsgListener redisGbPlayMsgListener;
 
 	@Override
 	public void afterPropertiesSet() throws Exception {
@@ -142,7 +139,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
 					if (platform != null) {
 						RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
-						redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
+//						redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
 					}
 				}else {
 					MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());

+ 78 - 104
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.*;
@@ -122,7 +122,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     private UserSetting userSetting;
 
     @Autowired
-    private ZLMMediaListManager mediaListManager;
+    private RedisPlatformPushStreamOnlineLister mediaListManager;
 
     @Autowired
     private SipConfig config;
@@ -568,19 +568,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
 
                     }
                 } else if (gbStream != null) {
+                    SendRtpItem sendRtpItem = new SendRtpItem();
+                    if (!userSetting.getUseCustomSsrcForParentInvite() && gb28181Sdp.getSsrc() != null) {
+                        sendRtpItem.setSsrc(gb28181Sdp.getSsrc());
+                    }
 
-                    String ssrc;
-                    if (userSetting.getUseCustomSsrcForParentInvite() || gb28181Sdp.getSsrc() == null) {
-                        // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
-                        ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
-                    }else {
-                        ssrc = gb28181Sdp.getSsrc();
+                    if (tcpActive != null) {
+                        sendRtpItem.setTcpActive(tcpActive);
                     }
-                    SendRtpItem sendRtpItem = new SendRtpItem();
-                    sendRtpItem.setTcpActive(tcpActive);
                     sendRtpItem.setTcp(mediaTransmissionTCP);
                     sendRtpItem.setRtcp(platform.isRtcp());
-                    sendRtpItem.setSsrc(ssrc);
                     sendRtpItem.setPlatformName(platform.getName());
                     sendRtpItem.setPlatformId(platform.getServerGBId());
                     sendRtpItem.setMediaServerId(mediaServerItem.getId());
@@ -593,37 +590,48 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     sendRtpItem.setCallId(callIdHeader.getCallId());
                     sendRtpItem.setFromTag(request.getFromTag());
                     sendRtpItem.setOnlyAudio(false);
-                    sendRtpItem.setPlayType(InviteStreamType.PUSH);
                     sendRtpItem.setStatus(0);
+                    sendRtpItem.setSessionName(sessionName);
 
                     if ("push".equals(gbStream.getStreamType())) {
                         if (streamPushItem != null) {
                             // 从redis查询是否正在接收这个推流
                             OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
 
-                            sendRtpItem.setServerId(pushListItem.getSeverId());
                             if (pushListItem != null) {
+                                sendRtpItem.setServerId(pushListItem.getSeverId());
                                 StreamPushItem transform = streamPushService.transform(pushListItem);
                                 transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
                                 // 推流状态
-                                pushStream(sendRtpItem, mediaServerItem, platform, request);
+                                sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                             }else {
-                                // 未推流 拉起
+                                if (!platform.isStartOfflinePush()) {
+                                    // 平台设置中关闭了拉起离线的推流则直接回复
+                                    try {
+                                        logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", sendRtpItem.getChannelId(), sendRtpItem.getApp(), sendRtpItem.getStream());
+                                        responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
+                                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                                        logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
+                                    }
+                                    return;
+                                }
                                 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                             }
                         }
                     } else if ("proxy".equals(gbStream.getStreamType())) {
                         if (null != proxyByAppAndStream) {
+                            if (sendRtpItem.getSsrc() == null) {
+                                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
+                                String ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                                sendRtpItem.setSsrc(ssrc);
+                            }
                             if (proxyByAppAndStream.isStatus()) {
-                                pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                                        mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+                                sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
                             } else {
                                 //开启代理拉流
                                 notifyProxyStreamOnline(sendRtpItem, mediaServerItem, platform, request);
                             }
                         }
-
-
                     }
                 }
             }
@@ -649,33 +657,23 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     /**
      * 安排推流
      */
-    private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
-                            CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
-                            int port, Boolean tcpActive, boolean mediaTransmissionTCP,
-                            String channelId, String addressStr, String ssrc, String requesterId) {
-            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+    private void sendProxyStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
             if (streamReady != null && streamReady) {
                 // 自平台内容
-                SendRtpItem sendRtpItem = zlmServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
-                        gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
-
-            if (sendRtpItem == null) {
-                logger.warn("服务器端口资源不足");
-                try {
-                    responseAck(request, Response.BUSY_HERE);
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
+                int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+                if (localPort == 0) {
+                    logger.warn("服务器端口资源不足");
+                    try {
+                        responseAck(request, Response.BUSY_HERE);
+                    } catch (SipException | InvalidArgumentException | ParseException e) {
+                        logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
+                    }
+                    return;
                 }
-                return;
-            }
-            if (tcpActive != null) {
-                sendRtpItem.setTcpActive(tcpActive);
-            }
-            sendRtpItem.setPlayType(InviteStreamType.PUSH);
+            sendRtpItem.setPlayType(InviteStreamType.PROXY);
             // 写入redis, 超时时回复
             sendRtpItem.setStatus(1);
-            sendRtpItem.setCallId(callIdHeader.getCallId());
-            sendRtpItem.setFromTag(request.getFromTag());
             sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
 
             SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
@@ -683,12 +681,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 sendRtpItem.setToTag(response.getToTag());
             }
             redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
         }
-
     }
 
-    private void pushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
+    private void sendPushStream(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
         // 推流
         if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
             Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
@@ -712,6 +708,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 if (response != null) {
                     sendRtpItem.setToTag(response.getToTag());
                 }
+                if (sendRtpItem.getSsrc() == null) {
+                    // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
+                    String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+                    sendRtpItem.setSsrc(ssrc);
+                }
                 redisCatchStorage.updateSendRTPSever(sendRtpItem);
 
             } else {
@@ -719,10 +720,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
             }
         } else {
-            SendRtpItem sendRtpItem = new SendRtpItem();
-            sendRtpItem.setRtcp(platform.isRtcp());
-            sendRtpItem.setTcp(mediaTransmissionTCP);
-            sendRtpItem.setTcpActive();
             // 其他平台内容
             otherWvpPushStream(sendRtpItem, request, platform);
         }
@@ -733,29 +730,28 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
      */
     private void notifyProxyStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
         // TODO 控制启用以使设备上线
-        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
+        logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
         // 监听流上线
-        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(gbStream.getApp(), gbStream.getStream(), true, "rtsp", mediaServerItem.getId());
+        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", mediaServerItem.getId());
         zlmHttpHookSubscribe.addSubscribe(hookSubscribe, (mediaServerItemInUSe, hookParam) -> {
             OnStreamChangedHookParam streamChangedHookParam = (OnStreamChangedHookParam)hookParam;
             logger.info("[上级点播]拉流代理已经就绪, {}/{}", streamChangedHookParam.getApp(), streamChangedHookParam.getStream());
-            dynamicTask.stop(callIdHeader.getCallId());
-            pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
-                    mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
+            dynamicTask.stop(sendRtpItem.getCallId());
+            sendProxyStream(sendRtpItem, mediaServerItem, platform, request);
         });
-        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
-            logger.info("[ app={}, stream={} ] 等待拉流代理流超时", gbStream.getApp(), gbStream.getStream());
+        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+            logger.info("[ app={}, stream={} ] 等待拉流代理流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
             zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
         }, userSetting.getPlatformPlayTimeout());
-        boolean start = streamProxyService.start(gbStream.getApp(), gbStream.getStream());
+        boolean start = streamProxyService.start(sendRtpItem.getApp(), sendRtpItem.getStream());
         if (!start) {
             try {
-                responseAck(request, Response.BUSY_HERE, "channel [" + gbStream.getGbId() + "] offline");
+                responseAck(request, Response.BUSY_HERE, "channel [" + sendRtpItem.getChannelId() + "] offline");
             } catch (SipException | InvalidArgumentException | ParseException e) {
                 logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
             }
             zlmHttpHookSubscribe.removeSubscribe(hookSubscribe);
-            dynamicTask.stop(callIdHeader.getCallId());
+            dynamicTask.stop(sendRtpItem.getCallId());
         }
     }
 
@@ -763,50 +759,28 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
      * 通知流上线
      */
     private void notifyPushStreamOnline(SendRtpItem sendRtpItem, MediaServerItem mediaServerItem, ParentPlatform platform, SIPRequest request) {
-        if (!platform.isStartOfflinePush()) {
-            // 平台设置中关闭了拉起离线的推流则直接回复
-            try {
-                logger.info("[上级点播] 失败,推流设备未推流,channel: {}, app: {}, stream: {}", gbStream.getGbId(), gbStream.getApp(), gbStream.getStream());
-                responseAck(request, Response.TEMPORARILY_UNAVAILABLE, "channel stream not pushing");
-            } catch (SipException | InvalidArgumentException | ParseException e) {
-                logger.error("[命令发送失败] invite 通道未推流: {}", e.getMessage());
-            }
-            return;
-        }
-        // 发送redis消息以使设备上线
-        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
-
+        // 发送redis消息以使设备上线,流上线后被
+        logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", sendRtpItem.getApp(), sendRtpItem.getStream());
         MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
-                gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
-                platform.getName(), userSetting.getServerId(), gbStream.getMediaServerId());
+                sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getChannelId(), sendRtpItem.getPlatformId(),
+                platform.getName(), userSetting.getServerId(), sendRtpItem.getMediaServerId());
         redisCatchStorage.sendStreamPushRequestedMsg(messageForPushChannel);
         // 设置超时
-        dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
-            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
+        dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
+            logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
             try {
-                redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
-                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+                redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
+                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                 responseAck(request, Response.REQUEST_TIMEOUT); // 超时
             } catch (SipException | InvalidArgumentException | ParseException e) {
                 logger.error("未处理的异常 ", e);
             }
         }, userSetting.getPlatformPlayTimeout());
-        // 写入redis待发流信息,供其他wvp读取并生成发流信息
-        SendRtpItem sendRtpItemTemp = new SendRtpItem();
-        sendRtpItemTemp.setIp(addressStr);
-        sendRtpItemTemp.setPort(port);
-        sendRtpItemTemp.setSsrc(ssrc);
-        sendRtpItemTemp.setPlatformId(requesterId);
-        sendRtpItemTemp.setPlatformName(platform.getName());
-        sendRtpItemTemp.setTcp(mediaTransmissionTCP);
-        sendRtpItemTemp.setRtcp(platform.isRtcp());
-        sendRtpItemTemp.setTcpActive(tcpActive);
-        sendRtpItemTemp.setPlayType(InviteStreamType.PUSH);
-        redisCatchStorage.addWaiteSendRtpItem(sendRtpItemTemp, userSetting.getPlatformPlayTimeout());
+        redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout());
         // 添加上线的通知
-        mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (sendRtpItemFromRedis) -> {
-            dynamicTask.stop(callIdHeader.getCallId());
-            redisPushStreamResponseListener.removeEvent(gbStream.getApp(), gbStream.getStream());
+        mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> {
+            dynamicTask.stop(sendRtpItem.getCallId());
+            redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
             if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
 
                 int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -823,18 +797,18 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     }
                     return;
                 }
-                sendRtpItemTemp.setLocalPort(localPort);
-                sendRtpItemTemp.setLocalIp(ObjectUtils.isEmpty(platform.getSendStreamIp()): );
-                // 写入redis, 超时时回复
-                sendRtpItemTemp.setStatus(1);
-                sendRtpItemTemp.setCallId(callIdHeader.getCallId());
+                sendRtpItem.setLocalPort(localPort);
+                if (!ObjectUtils.isEmpty(platform.getSendStreamIp())) {
+                    sendRtpItem.setLocalIp(platform.getSendStreamIp());
+                }
 
-                sendRtpItemTemp.setFromTag(request.getFromTag());
-                SIPResponse response = sendStreamAck(request, sendRtpItemTemp, platform);
+                // 写入redis, 超时时回复
+                sendRtpItem.setStatus(1);
+                SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                 if (response != null) {
-                    sendRtpItemTemp.setToTag(response.getToTag());
+                    sendRtpItem.setToTag(response.getToTag());
                 }
-                redisCatchStorage.updateSendRTPSever(sendRtpItemTemp);
+                redisCatchStorage.updateSendRTPSever(sendRtpItem);
             } else {
                 // 其他平台内容
                 otherWvpPushStream(sendRtpItemFromRedis, request, platform);
@@ -842,10 +816,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
         });
 
         // 添加回复的拒绝或者错误的通知
-        redisPushStreamResponseListener.addEvent(gbStream.getApp(), gbStream.getStream(), response -> {
+        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
             if (response.getCode() != 0) {
-                dynamicTask.stop(callIdHeader.getCallId());
-                mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
+                dynamicTask.stop(sendRtpItem.getCallId());
+                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                 try {
                     responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
                 } catch (SipException | InvalidArgumentException | ParseException e) {

+ 13 - 13
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -18,14 +18,12 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
-import com.genersoft.iot.vmp.media.zlm.dto.HookType;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
+import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -103,7 +101,7 @@ public class ZLMHttpHookListener {
     private EventPublisher eventPublisher;
 
     @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
+    private RedisPlatformPushStreamOnlineLister zlmMediaListManager;
 
     @Autowired
     private ZlmHttpHookSubscribe subscribe;
@@ -130,6 +128,9 @@ public class ZLMHttpHookListener {
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
 
+    @Autowired
+    private IStreamPushService streamPushService;
+
     /**
      * 服务器定时上报时间,上报间隔可配置,默认10s上报一次
      */
@@ -236,11 +237,8 @@ public class ZLMHttpHookListener {
                 // 鉴权通过
                 redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
             }
-        } else {
-            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
         }
 
-
         HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
         result.setEnable_audio(true);
         taskExecutor.execute(() -> {
@@ -465,8 +463,7 @@ public class ZLMHttpHookListener {
                                     || param.getOriginType() == OriginType.RTMP_PUSH.ordinal()
                                     || param.getOriginType() == OriginType.RTC_PUSH.ordinal()) {
                                 param.setSeverId(userSetting.getServerId());
-                                zlmMediaListManager.addPush(param);
-
+                                streamPushService.updatePush(param);
                                 // 冗余数据,自己系统中自用
                                 redisCatchStorage.addPushListItem(param.getApp(), param.getStream(), param);
                             }
@@ -483,10 +480,13 @@ public class ZLMHttpHookListener {
                                 }
                             }
                             GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
-                            if (gbStream != null) {
-//									eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                            // 查找是否关联了国标, 关联了不删除, 置为离线
+                            if (gbStream == null) {
+                                storager.removeMedia(param.getApp(), param.getStream());
+                            }else {
+//                                eventPublisher.catalogEventPublishForStream(null, gbStream, CatalogEvent.OFF);
+                                storager.mediaOffline(param.getApp(), param.getStream());
                             }
-                            zlmMediaListManager.removeMedia(param.getApp(), param.getStream());
                         }
                         GbStream gbStream = storager.getGbStream(param.getApp(), param.getStream());
                         if (gbStream != null) {

+ 0 - 138
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java

@@ -1,138 +0,0 @@
-package com.genersoft.iot.vmp.media.zlm;
-
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @author lin
- */
-@Component
-public class ZLMMediaListManager {
-
-    private Logger logger = LoggerFactory.getLogger("ZLMMediaListManager");
-
-    @Autowired
-    private ZLMRESTfulUtils zlmresTfulUtils;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private IVideoManagerStorage storager;
-
-    @Autowired
-    private GbStreamMapper gbStreamMapper;
-
-    @Autowired
-    private PlatformGbStreamMapper platformGbStreamMapper;
-
-    @Autowired
-    private IStreamPushService streamPushService;
-
-    @Autowired
-    private IStreamProxyService streamProxyService;
-
-    @Autowired
-    private StreamPushMapper streamPushMapper;
-
-    @Autowired
-    private ZlmHttpHookSubscribe subscribe;
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-    private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
-
-    public StreamPushItem addPush(OnStreamChangedHookParam onStreamChangedHookParam) {
-        StreamPushItem transform = streamPushService.transform(onStreamChangedHookParam);
-        StreamPushItem pushInDb = streamPushService.getPush(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
-        transform.setPushIng(onStreamChangedHookParam.isRegist());
-        transform.setUpdateTime(DateUtil.getNow());
-        transform.setPushTime(DateUtil.getNow());
-        transform.setSelf(userSetting.getServerId().equals(onStreamChangedHookParam.getSeverId()));
-        if (pushInDb == null) {
-            transform.setCreateTime(DateUtil.getNow());
-            streamPushMapper.add(transform);
-        }else {
-            streamPushMapper.update(transform);
-            gbStreamMapper.updateMediaServer(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), onStreamChangedHookParam.getMediaServerId());
-        }
-        ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
-        if ( channelOnlineEventLister != null)  {
-            try {
-                channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());;
-            } catch (ParseException e) {
-                logger.error("addPush: ", e);
-            }
-            removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
-        }
-        return transform;
-    }
-
-    public void sendStreamEvent(String app, String stream, String mediaServerId) {
-        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
-        // 查看推流状态
-        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
-        if (streamReady != null && streamReady) {
-            ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(app, stream);
-            if (channelOnlineEventLister != null)  {
-                try {
-                    channelOnlineEventLister.run(app, stream, mediaServerId);
-                } catch (ParseException e) {
-                    logger.error("sendStreamEvent: ", e);
-                }
-                removedChannelOnlineEventLister(app, stream);
-            }
-        }
-    }
-
-    public int removeMedia(String app, String streamId) {
-        // 查找是否关联了国标, 关联了不删除, 置为离线
-        GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
-        int result;
-        if (gbStream == null) {
-            result = storager.removeMedia(app, streamId);
-        }else {
-            result =storager.mediaOffline(app, streamId);
-        }
-        return result;
-    }
-
-    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
-        this.channelOnPublishEvents.put(app + "_" + stream, callback);
-    }
-
-    public void removedChannelOnlineEventLister(String app, String stream) {
-        this.channelOnPublishEvents.remove(app + "_" + stream);
-    }
-
-    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
-        return this.channelOnPublishEvents.get(app + "_" + stream);
-    }
-
-}

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@@ -118,4 +118,5 @@ public interface IStreamPushService {
     Map<String, StreamPushItem> getAllAppAndStreamMap();
 
 
+    void updatePush(OnStreamChangedHookParam param);
 }

+ 1 - 13
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -31,7 +31,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.*;
-import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.storager.dao.CloudRecordServiceMapper;
@@ -134,9 +133,6 @@ public class PlayServiceImpl implements IPlayService {
     @Autowired
     private ThreadPoolTaskExecutor taskExecutor;
 
-    @Autowired
-    private RedisGbPlayMsgListener redisGbPlayMsgListener;
-
     @Autowired
     private ZlmHttpHookSubscribe hookSubscribe;
 
@@ -1366,15 +1362,7 @@ public class PlayServiceImpl implements IPlayService {
             param.put("udp_rtcp_timeout", sendRtpItem.isRtcp() ? "1" : "0");
         }
 
-        if (mediaInfo == null) {
-            RequestPushStreamMsg requestPushStreamMsg = RequestPushStreamMsg.getInstance(
-                    sendRtpItem.getMediaServerId(), sendRtpItem.getApp(), sendRtpItem.getStream(),
-                    sendRtpItem.getIp(), sendRtpItem.getPort(), sendRtpItem.getSsrc(), sendRtpItem.isTcp(),
-                    sendRtpItem.getLocalPort(), sendRtpItem.getPt(), sendRtpItem.isUsePs(), sendRtpItem.isOnlyAudio());
-            redisGbPlayMsgListener.sendMsgForStartSendRtpStream(sendRtpItem.getServerId(), requestPushStreamMsg, json -> {
-                startSendRtpStreamHand(sendRtpItem, platform, json, param, callIdHeader);
-            });
-        } else {
+        if (mediaInfo != null) {
             // 如果是严格模式,需要关闭端口占用
             JSONObject startSendRtpStreamResult = null;
             if (sendRtpItem.getLocalPort() != 0) {

+ 17 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -553,4 +553,21 @@ public class StreamPushServiceImpl implements IStreamPushService {
     public Map<String, StreamPushItem> getAllAppAndStreamMap() {
         return streamPushMapper.getAllAppAndStreamMap();
     }
+
+    @Override
+    public void updatePush(OnStreamChangedHookParam param) {
+        StreamPushItem transform = transform(param);
+        StreamPushItem pushInDb = getPush(param.getApp(), param.getStream());
+        transform.setPushIng(param.isRegist());
+        transform.setUpdateTime(DateUtil.getNow());
+        transform.setPushTime(DateUtil.getNow());
+        transform.setSelf(userSetting.getServerId().equals(param.getSeverId()));
+        if (pushInDb == null) {
+            transform.setCreateTime(DateUtil.getNow());
+            streamPushMapper.add(transform);
+        }else {
+            streamPushMapper.update(transform);
+            gbStreamMapper.updateMediaServer(param.getApp(), param.getStream(), param.getMediaServerId());
+        }
+    }
 }

+ 97 - 0
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java

@@ -0,0 +1,97 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.GbStream;
+import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamProxyService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
+import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
+import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.text.ParseException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * @author lin
+ */
+@Component
+public class RedisPlatformPushStreamOnlineLister implements MessageListener {
+
+    private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister");
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    /**
+     * 通过redis消息接收流上线的通知,如果本机由对这个流的监听,则回调
+     */
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
+                    sendStreamEvent(sendRtpItem);
+                }
+            });
+        }
+    }
+
+    private final Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
+
+    public void sendStreamEvent(SendRtpItem sendRtpItem) {
+        // 查看推流状态
+        ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (channelOnlineEventLister != null)  {
+            try {
+                channelOnlineEventLister.run(sendRtpItem);
+            } catch (ParseException e) {
+                logger.error("sendStreamEvent: ", e);
+            }
+            removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
+        }
+    }
+
+    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
+        this.channelOnPublishEvents.put(app + "_" + stream, callback);
+    }
+
+    public void removedChannelOnlineEventLister(String app, String stream) {
+        this.channelOnPublishEvents.remove(app + "_" + stream);
+    }
+
+    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
+        return this.channelOnPublishEvents.get(app + "_" + stream);
+    }
+
+
+}

+ 59 - 18
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java

@@ -1,12 +1,16 @@
 package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,6 +22,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
@@ -32,10 +38,10 @@ public class RedisPlatformStartSendRtpListener implements MessageListener {
     private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
 
     @Autowired
-    private UserSetting userSetting;
+    private ZLMServerFactory zlmServerFactory;
 
     @Autowired
-    private ZlmHttpHookSubscribe hookSubscribe;
+    private IMediaServerService mediaServerService;
 
     @Qualifier("taskExecutor")
     @Autowired
@@ -52,23 +58,14 @@ public class RedisPlatformStartSendRtpListener implements MessageListener {
                 while (!taskQueue.isEmpty()) {
                     Message msg = taskQueue.poll();
                     try {
-                        MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
-                        if (messageForPushChannel == null
-                                || ObjectUtils.isEmpty(messageForPushChannel.getApp())
-                                || ObjectUtils.isEmpty(messageForPushChannel.getStream())
-                        || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
-                            continue;
+                        SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
+                        sendRtpItem.getMediaServerId();
+                        MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+                        if (mediaServer == null) {
+                            return;
                         }
-
-                        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
-                        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
-                                messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
-                                null);
-                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
-                            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
-
-                        });
-
+                        Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem);
+                        sendRtp(sendRtpItem, mediaServer, sendRtpParam);
 
                     }catch (Exception e) {
                         logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
@@ -78,4 +75,48 @@ public class RedisPlatformStartSendRtpListener implements MessageListener {
             });
         }
     }
+
+    private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) {
+        String isUdp = sendRtpItem.isTcp() ? "0" : "1";
+        Map<String, Object> param = new HashMap<>(12);
+        param.put("vhost","__defaultVhost__");
+        param.put("app",sendRtpItem.getApp());
+        param.put("stream",sendRtpItem.getStream());
+        param.put("ssrc", sendRtpItem.getSsrc());
+        param.put("dst_url",sendRtpItem.getIp());
+        param.put("dst_port", sendRtpItem.getPort());
+        param.put("src_port", sendRtpItem.getLocalPort());
+        param.put("pt", sendRtpItem.getPt());
+        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
+        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
+        param.put("is_udp", isUdp);
+        if (!sendRtpItem.isTcp()) {
+            // udp模式下开启rtcp保活
+            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
+        }
+        return param;
+    }
+
+    private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){
+        JSONObject startSendRtpStreamResult = null;
+        if (sendRtpItem.getLocalPort() != 0) {
+            if (sendRtpItem.isTcpActive()) {
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
+            }else {
+                param.put("dst_url", sendRtpItem.getIp());
+                param.put("dst_port", sendRtpItem.getPort());
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
+            }
+        }else {
+            if (sendRtpItem.isTcpActive()) {
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
+            }else {
+                param.put("dst_url", sendRtpItem.getIp());
+                param.put("dst_port", sendRtpItem.getPort());
+                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
+            }
+        }
+        return startSendRtpStreamResult;
+
+    }
 }

+ 26 - 1
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java

@@ -2,12 +2,15 @@ package com.genersoft.iot.vmp.service.redisMsg;
 
 import com.alibaba.fastjson2.JSON;
 import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
 import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -34,14 +37,26 @@ public class RedisPlatformWaitPushStreamOnlineListener implements MessageListene
     @Autowired
     private UserSetting userSetting;
 
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
     @Autowired
     private ZlmHttpHookSubscribe hookSubscribe;
 
+    @Autowired
+    private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
+
     @Qualifier("taskExecutor")
     @Autowired
     private ThreadPoolTaskExecutor taskExecutor;
 
 
+    /**
+     * 当上级点播时,这里负责监听等到流上线,流上线后如果是在当前服务则直接回调,如果是其他wvp,则由redis消息进行通知
+     */
     @Override
     public void onMessage(Message message, byte[] bytes) {
         logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody()));
@@ -66,7 +81,17 @@ public class RedisPlatformWaitPushStreamOnlineListener implements MessageListene
                                 null);
                         hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
                             // 读取redis中的上级点播信息,生成sendRtpItm发送出去
-
+                            SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream());
+                            if (sendRtpItem.getSsrc() == null) {
+                                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
+                                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+                                sendRtpItem.setSsrc(ssrc);
+                                sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+                                sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+                                redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem);
+                                // 通知其他wvp, 由RedisPlatformPushStreamOnlineLister接收此监听。
+                                redisCatchStorage.sendPushStreamOnline(sendRtpItem);
+                            }
                         });
 
 

+ 0 - 97
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisStreamMsgListener.java

@@ -1,97 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
-import com.genersoft.iot.vmp.media.zlm.dto.ChannelOnlineEvent;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * 接收其他wvp发送流变化通知
- * @author lin
- */
-@Component
-public class RedisStreamMsgListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisStreamMsgListener.class);
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private ZLMMediaListManager zlmMediaListManager;
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        JSONObject steamMsgJson = JSON.parseObject(msg.getBody(), JSONObject.class);
-                        if (steamMsgJson == null) {
-                            logger.warn("[收到redis 流变化]消息解析失败");
-                            continue;
-                        }
-                        String serverId = steamMsgJson.getString("serverId");
-
-                        if (userSetting.getServerId().equals(serverId)) {
-                            // 自己发送的消息忽略即可
-                            continue;
-                        }
-                        logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
-                        String app = steamMsgJson.getString("app");
-                        String stream = steamMsgJson.getString("stream");
-                        boolean register = steamMsgJson.getBoolean("register");
-                        String mediaServerId = steamMsgJson.getString("mediaServerId");
-                        OnStreamChangedHookParam onStreamChangedHookParam = new OnStreamChangedHookParam();
-                        onStreamChangedHookParam.setSeverId(serverId);
-                        onStreamChangedHookParam.setApp(app);
-                        onStreamChangedHookParam.setStream(stream);
-                        onStreamChangedHookParam.setRegist(register);
-                        onStreamChangedHookParam.setMediaServerId(mediaServerId);
-                        onStreamChangedHookParam.setCreateStamp(System.currentTimeMillis()/1000);
-                        onStreamChangedHookParam.setAliveSecond(0L);
-                        onStreamChangedHookParam.setTotalReaderCount("0");
-                        onStreamChangedHookParam.setOriginType(0);
-                        onStreamChangedHookParam.setOriginTypeStr("0");
-                        onStreamChangedHookParam.setOriginTypeStr("unknown");
-                        ChannelOnlineEvent channelOnlineEventLister = zlmMediaListManager.getChannelOnlineEventLister(app, stream);
-                        if ( channelOnlineEventLister != null)  {
-                            try {
-                                channelOnlineEventLister.run(app, stream, serverId);;
-                            } catch (ParseException e) {
-                                logger.error("addPush: ", e);
-                            }
-                            zlmMediaListManager.removedChannelOnlineEventLister(app, stream);
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS消息-流变化] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS消息-流变化] 异常内容: ", e);
-                    }
-                }
-            });
-        }
-    }
-}

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -219,5 +219,9 @@ public interface IRedisCatchStorage {
 
     void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout);
 
+    SendRtpItem getWaiteSendRtpItem(String app, String stream);
+
     void sendStartSendRtp(SendRtpItem sendRtpItem);
+
+    void sendPushStreamOnline(SendRtpItem sendRtpItem);
 }

+ 13 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -685,9 +685,22 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
         redisTemplate.opsForValue().set(key, platformPlayTimeout);
     }
 
+    @Override
+    public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
+        String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
+        return (SendRtpItem)redisTemplate.opsForValue().get(key);
+    }
+
     @Override
     public void sendStartSendRtp(SendRtpItem sendRtpItem) {
         String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
         redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem));
     }
+
+    @Override
+    public void sendPushStreamOnline(SendRtpItem sendRtpItem) {
+        String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED;
+        logger.info("[redis发送通知] 流上线 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
+    }
 }