Przeglądaj źródła

重构多wvp国标级联机制

648540858 1 rok temu
rodzic
commit
9c6765d44e
21 zmienionych plików z 789 dodań i 575 usunięć
  1. 2 16
      src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java
  2. 205 0
      src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java
  3. 24 0
      src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java
  4. 93 0
      src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java
  5. 87 0
      src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java
  6. 14 2
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java
  7. 6 8
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
  8. 9 36
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  9. 0 4
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  10. 9 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java
  11. 1 1
      src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
  12. 16 5
      src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
  13. 16 0
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java
  14. 0 97
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java
  15. 0 122
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java
  16. 0 106
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java
  17. 0 99
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java
  18. 0 76
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java
  19. 203 0
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java
  20. 100 0
      src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java
  21. 4 3
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

+ 2 - 16
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisMsgListenConfig.java

@@ -34,23 +34,13 @@ public class RedisMsgListenConfig {
 	@Autowired
 	private RedisPushStreamStatusListMsgListener redisPushStreamListMsgListener;
 
-	@Autowired
-	private RedisPushStreamResponseListener redisPushStreamResponseListener;
 
 	@Autowired
 	private RedisCloseStreamMsgListener redisCloseStreamMsgListener;
 
-	@Autowired
-	private RedisPushStreamCloseResponseListener redisPushStreamCloseResponseListener;
-
-	@Autowired
-	private RedisPlatformWaitPushStreamOnlineListener redisPlatformWaitPushStreamOnlineListener;
-
-	@Autowired
-	private RedisPlatformStartSendRtpListener redisPlatformStartSendRtpListener;
 
 	@Autowired
-	private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
+	private RedisRpcConfig redisRpcConfig;
 
 
 	/**
@@ -69,12 +59,8 @@ public class RedisMsgListenConfig {
 		container.addMessageListener(redisAlarmMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM_RECEIVE));
 		container.addMessageListener(redisPushStreamStatusMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_STATUS_CHANGE));
 		container.addMessageListener(redisPushStreamListMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_PUSH_STREAM_LIST_CHANGE));
-		container.addMessageListener(redisPushStreamResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_RESPONSE));
 		container.addMessageListener(redisCloseStreamMsgListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE));
-		container.addMessageListener(redisPushStreamCloseResponseListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_CLOSE_REQUESTED));
-		container.addMessageListener(redisPlatformStartSendRtpListener, new PatternTopic(VideoManagerConstants.START_SEND_PUSH_STREAM));
-		container.addMessageListener(redisPlatformWaitPushStreamOnlineListener, new PatternTopic(VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED));
-		container.addMessageListener(redisPlatformPushStreamOnlineLister, new PatternTopic(VideoManagerConstants.PUSH_STREAM_ONLINE));
+		container.addMessageListener(redisRpcConfig, new PatternTopic(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY));
         return container;
     }
 }

+ 205 - 0
src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java

@@ -0,0 +1,205 @@
+package com.genersoft.iot.vmp.conf.redis;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.service.redisMsg.control.RedisRpcController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class RedisRpcConfig implements MessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisRpcConfig.class);
+
+    public final static String REDIS_REQUEST_CHANNEL_KEY = "WVP_REDIS_REQUEST_CHANNEL_KEY";
+
+    private final Random random = new Random();
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private RedisRpcController redisRpcController;
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        boolean isEmpty = taskQueue.isEmpty();
+        taskQueue.offer(message);
+        if (isEmpty) {
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    try {
+                        RedisRpcMessage redisRpcMessage = JSON.parseObject(new String(msg.getBody()), RedisRpcMessage.class);
+                        if (redisRpcMessage.getRequest() != null) {
+                            handlerRequest(redisRpcMessage.getRequest());
+                        } else if (redisRpcMessage.getResponse() != null){
+                            handlerResponse(redisRpcMessage.getResponse());
+                        } else {
+                            logger.error("[redis rpc 解析失败] {}", JSON.toJSONString(redisRpcMessage));
+                        }
+                    } catch (Exception e) {
+                        logger.error("[redis rpc 解析异常] ", e);
+                    }
+                }
+            });
+        }
+    }
+
+    private void handlerResponse(RedisRpcResponse response) {
+        if (userSetting.getServerId().equals(response.getToId())) {
+            return;
+        }
+        response(response);
+    }
+
+    private void handlerRequest(RedisRpcRequest request) {
+        try {
+            if (userSetting.getServerId().equals(request.getFromId())) {
+                return;
+            }
+            Method method = getMethod(request.getUri());
+            // 没有携带目标ID的可以理解为哪个wvp有结果就哪个回复,携带目标ID,但是如果是不存在的uri则直接回复404
+            if (userSetting.getServerId().equals(request.getToId())) {
+                if (method == null) {
+                    // 回复404结果
+                    RedisRpcResponse response = request.getResponse();
+                    response.setStatusCode(404);
+                    sendResponse(response);
+                    return;
+                }
+                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+                if(response != null) {
+                    sendResponse(response);
+                }
+            }else {
+                if (method == null) {
+                    return;
+                }
+                RedisRpcResponse response = (RedisRpcResponse)method.invoke(redisRpcController, request);
+                if (response != null) {
+                    sendResponse(response);
+                }
+            }
+        }catch (InvocationTargetException | IllegalAccessException e) {
+            logger.error("[redis rpc ] 处理请求失败 ", e);
+        }
+
+    }
+
+    private Method getMethod(String name) {
+        // 启动后扫描所有的路径注解
+        Method[] methods = redisRpcController.getClass().getMethods();
+        for (Method method : methods) {
+            if (method.getName().equals(name)) {
+                return method;
+            }
+        }
+        return null;
+    }
+
+    private void sendResponse(RedisRpcResponse response){
+        response.setToId(userSetting.getServerId());
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setResponse(response);
+        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+
+    private void sendRequest(RedisRpcRequest request){
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setRequest(request);
+        redisTemplate.convertAndSend(REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+
+
+    private final Map<Long, SynchronousQueue<RedisRpcResponse>> topicSubscribers = new ConcurrentHashMap<>();
+    private final Map<Long, CommonCallback<RedisRpcResponse>> callbacks = new ConcurrentHashMap<>();
+
+    public RedisRpcResponse request(RedisRpcRequest request, int timeOut) {
+        request.setSn((long) random.nextInt(1000) + 1);
+        SynchronousQueue<RedisRpcResponse> subscribe = subscribe(request.getSn());
+        try {
+            sendRequest(request);
+            return subscribe.poll(timeOut, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            logger.warn("[redis rpc timeout] uri: {}, sn: {}", request.getUri(), request.getSn(), e);
+        } finally {
+            this.unsubscribe(request.getSn());
+        }
+        return null;
+    }
+
+    public void request(RedisRpcRequest request, CommonCallback<RedisRpcResponse> callback) {
+        request.setSn((long) random.nextInt(1000) + 1);
+        setCallback(request.getSn(), callback);
+        sendRequest(request);
+    }
+
+    public Boolean response(RedisRpcResponse response) {
+        SynchronousQueue<RedisRpcResponse> queue = topicSubscribers.get(response.getSn());
+        CommonCallback<RedisRpcResponse> callback = callbacks.get(response.getSn());
+        if (queue != null) {
+            try {
+                return queue.offer(response, 2, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                logger.error("{}", e.getMessage(), e);
+            }
+        }else if (callback != null) {
+            callback.run(response);
+            callbacks.remove(response.getSn());
+        }
+        return false;
+    }
+
+    private void unsubscribe(long key) {
+        topicSubscribers.remove(key);
+    }
+
+
+    private SynchronousQueue<RedisRpcResponse> subscribe(long key) {
+        SynchronousQueue<RedisRpcResponse> queue = null;
+        if (!topicSubscribers.containsKey(key))
+            topicSubscribers.put(key, queue = new SynchronousQueue<>());
+        return queue;
+    }
+
+    private void setCallback(long key, CommonCallback<RedisRpcResponse> callback)  {
+        if (!callbacks.containsKey(key)) {
+            callbacks.put(key, callback);
+        }
+
+    }
+
+    public void removeCallback(long key)  {
+        callbacks.remove(key);
+    }
+}

+ 24 - 0
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcMessage.java

@@ -0,0 +1,24 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+public class RedisRpcMessage {
+
+    private RedisRpcRequest request;
+
+    private RedisRpcResponse response;
+
+    public RedisRpcRequest getRequest() {
+        return request;
+    }
+
+    public void setRequest(RedisRpcRequest request) {
+        this.request = request;
+    }
+
+    public RedisRpcResponse getResponse() {
+        return response;
+    }
+
+    public void setResponse(RedisRpcResponse response) {
+        this.response = response;
+    }
+}

+ 93 - 0
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcRequest.java

@@ -0,0 +1,93 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+/**
+ * 通过redis发送请求
+ */
+public class RedisRpcRequest {
+
+    /**
+     * 来自的WVP ID
+     */
+    private String fromId;
+
+
+    /**
+     * 目标的WVP ID
+     */
+    private String toId;
+
+    /**
+     * 序列号
+     */
+    private long sn;
+
+    /**
+     * 访问的路径
+     */
+    private String uri;
+
+    /**
+     * 参数
+     */
+    private Object param;
+
+    public String getFromId() {
+        return fromId;
+    }
+
+    public void setFromId(String fromId) {
+        this.fromId = fromId;
+    }
+
+    public String getToId() {
+        return toId;
+    }
+
+    public void setToId(String toId) {
+        this.toId = toId;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public Object getParam() {
+        return param;
+    }
+
+    public void setParam(Object param) {
+        this.param = param;
+    }
+
+    public long getSn() {
+        return sn;
+    }
+
+    public void setSn(long sn) {
+        this.sn = sn;
+    }
+
+    @Override
+    public String toString() {
+        return "RedisRpcRequest{" +
+                "fromId='" + fromId + '\'' +
+                ", toId='" + toId + '\'' +
+                ", sn='" + sn + '\'' +
+                ", uri='" + uri + '\'' +
+                ", param=" + param +
+                '}';
+    }
+
+    public RedisRpcResponse getResponse() {
+        RedisRpcResponse response = new RedisRpcResponse();
+        response.setFromId(fromId);
+        response.setToId(toId);
+        response.setSn(sn);
+        response.setUri(uri);
+        return response;
+    }
+}

+ 87 - 0
src/main/java/com/genersoft/iot/vmp/conf/redis/bean/RedisRpcResponse.java

@@ -0,0 +1,87 @@
+package com.genersoft.iot.vmp.conf.redis.bean;
+
+/**
+ * 通过redis发送回复
+ */
+public class RedisRpcResponse {
+
+    /**
+     * 来自的WVP ID
+     */
+    private String fromId;
+
+
+    /**
+     * 目标的WVP ID
+     */
+    private String toId;
+
+
+    /**
+     * 序列号
+     */
+    private long sn;
+
+    /**
+     * 状态码
+     */
+    private int statusCode;
+
+    /**
+     * 访问的路径
+     */
+    private String uri;
+
+    /**
+     * 参数
+     */
+    private Object body;
+
+    public String getFromId() {
+        return fromId;
+    }
+
+    public void setFromId(String fromId) {
+        this.fromId = fromId;
+    }
+
+    public String getToId() {
+        return toId;
+    }
+
+    public void setToId(String toId) {
+        this.toId = toId;
+    }
+
+    public long getSn() {
+        return sn;
+    }
+
+    public void setSn(long sn) {
+        this.sn = sn;
+    }
+
+    public int getStatusCode() {
+        return statusCode;
+    }
+
+    public void setStatusCode(int statusCode) {
+        this.statusCode = statusCode;
+    }
+
+    public String getUri() {
+        return uri;
+    }
+
+    public void setUri(String uri) {
+        this.uri = uri;
+    }
+
+    public Object getBody() {
+        return body;
+    }
+
+    public void setBody(Object body) {
+        this.body = body;
+    }
+}

+ 14 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/AckRequestProcessor.java

@@ -15,9 +15,11 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IDeviceService;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IPlayService;
-import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
+import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.InitializingBean;
@@ -54,6 +56,8 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 
 	@Autowired
     private IRedisCatchStorage redisCatchStorage;
+	@Autowired
+    private IRedisRpcService redisRpcService;
 
 	@Autowired
     private UserSetting userSetting;
@@ -113,7 +117,15 @@ public class AckRequestProcessor extends SIPRequestProcessorParent implements In
 		if (parentPlatform != null) {
 			Map<String, Object> param = getSendRtpParam(sendRtpItem);
 			if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
-				redisCatchStorage.sendStartSendRtp(sendRtpItem);
+//				redisCatchStorage.sendStartSendRtp(sendRtpItem);
+				WVPResult wvpResult = redisRpcService.startSendRtp(sendRtpItem);
+				if (wvpResult.getCode() == 0) {
+					MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(0, sendRtpItem.getApp(), sendRtpItem.getStream(),
+							sendRtpItem.getChannelId(), parentPlatform.getServerGBId(), parentPlatform.getName(), userSetting.getServerId(),
+							sendRtpItem.getMediaServerId());
+					messageForPushChannel.setPlatFormIndex(parentPlatform.getId());
+					redisCatchStorage.sendPlatformStartPlayMsg(messageForPushChannel);
+				}
 			} else {
 				JSONObject startSendRtpStreamResult = sendRtp(sendRtpItem, mediaInfo, param);
 				if (startSendRtpStreamResult != null) {

+ 6 - 8
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -18,7 +18,7 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.RequestStopPushStreamMsg;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import gov.nist.javax.sip.message.SIPRequest;
@@ -97,6 +97,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 	@Autowired
 	private IStreamPushService pushService;
 
+	@Autowired
+	private IRedisRpcService redisRpcService;
+
 
 	@Override
 	public void afterPropertiesSet() throws Exception {
@@ -134,13 +137,8 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 			if (sendRtpItem.getPlayType().equals(InviteStreamType.PUSH)) {
 				// 查询这路流是否是本平台的
 				StreamPushItem push = pushService.getPush(sendRtpItem.getApp(), sendRtpItem.getStream());
-				if (push!= null && !push.isSelf()) {
-					// 不是本平台的就发送redis消息让其他wvp停止发流
-					ParentPlatform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getPlatformId());
-					if (platform != null) {
-						RequestStopPushStreamMsg streamMsg = RequestStopPushStreamMsg.getInstance(sendRtpItem, platform.getName(), platform.getId());
-//						redisGbPlayMsgListener.sendMsgForStopSendRtpStream(sendRtpItem.getServerId(), streamMsg);
-					}
+				if (!userSetting.getServerId().equals(sendRtpItem.getServerId())) {
+					redisRpcService.stopSendRtp(sendRtpItem);
 				}else {
 					MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
 					redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(),

+ 9 - 36
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -19,7 +19,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.ISIPRequestProcessor
 import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
-import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.*;
@@ -29,7 +29,6 @@ import com.genersoft.iot.vmp.service.bean.ErrorCallback;
 import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.redisMsg.RedisPushStreamResponseListener;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -86,7 +85,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     private IRedisCatchStorage redisCatchStorage;
 
     @Autowired
-    private IInviteStreamService inviteStreamService;
+    private IRedisRpcService redisRpcService;
 
     @Autowired
     private SSRCFactory ssrcFactory;
@@ -94,9 +93,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     @Autowired
     private DynamicTask dynamicTask;
 
-    @Autowired
-    private RedisPushStreamResponseListener redisPushStreamResponseListener;
-
     @Autowired
     private IPlayService playService;
 
@@ -121,9 +117,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
     @Autowired
     private UserSetting userSetting;
 
-    @Autowired
-    private RedisPlatformPushStreamOnlineLister mediaListManager;
-
     @Autowired
     private SipConfig config;
 
@@ -594,15 +587,17 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     sendRtpItem.setSessionName(sessionName);
 
                     if ("push".equals(gbStream.getStreamType())) {
+                        sendRtpItem.setPlayType(InviteStreamType.PUSH);
                         if (streamPushItem != null) {
                             // 从redis查询是否正在接收这个推流
                             OnStreamChangedHookParam pushListItem = redisCatchStorage.getPushListItem(gbStream.getApp(), gbStream.getStream());
-
                             if (pushListItem != null) {
                                 sendRtpItem.setServerId(pushListItem.getSeverId());
+                                sendRtpItem.setMediaServerId(pushListItem.getMediaServerId());
+
                                 StreamPushItem transform = streamPushService.transform(pushListItem);
                                 transform.setSelf(userSetting.getServerId().equals(pushListItem.getSeverId()));
-                                // 推流状态
+                                // 开始推流
                                 sendPushStream(sendRtpItem, mediaServerItem, platform, request);
                             }else {
                                 if (!platform.isStartOfflinePush()) {
@@ -702,8 +697,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 }
                 // 写入redis, 超时时回复
                 sendRtpItem.setStatus(1);
-                sendRtpItem.setFromTag(request.getFromTag());
-                sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
                 SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
                 if (response != null) {
                     sendRtpItem.setToTag(response.getToTag());
@@ -714,7 +707,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     sendRtpItem.setSsrc(ssrc);
                 }
                 redisCatchStorage.updateSendRTPSever(sendRtpItem);
-
             } else {
                 // 不在线 拉起
                 notifyPushStreamOnline(sendRtpItem, mediaServerItem, platform, request);
@@ -769,18 +761,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
         dynamicTask.startDelay(sendRtpItem.getCallId(), () -> {
             logger.info("[ app={}, stream={} ] 等待设备开始推流超时", sendRtpItem.getApp(), sendRtpItem.getStream());
             try {
-                redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
-                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
                 responseAck(request, Response.REQUEST_TIMEOUT); // 超时
             } catch (SipException | InvalidArgumentException | ParseException e) {
                 logger.error("未处理的异常 ", e);
             }
         }, userSetting.getPlatformPlayTimeout());
-        redisCatchStorage.addWaiteSendRtpItem(sendRtpItem, userSetting.getPlatformPlayTimeout());
-        // 添加上线的通知
-        mediaListManager.addChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream(), (sendRtpItemFromRedis) -> {
+        //
+        redisRpcService.waitePushStreamOnline(sendRtpItem, (sendRtpItemFromRedis) -> {
             dynamicTask.stop(sendRtpItem.getCallId());
-            redisPushStreamResponseListener.removeEvent(sendRtpItem.getApp(), sendRtpItem.getStream());
             if (sendRtpItemFromRedis.getServerId().equals(userSetting.getServerId())) {
 
                 int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
@@ -813,19 +801,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 // 其他平台内容
                 otherWvpPushStream(sendRtpItemFromRedis, request, platform);
             }
-        });
 
-        // 添加回复的拒绝或者错误的通知
-        redisPushStreamResponseListener.addEvent(sendRtpItem.getApp(), sendRtpItem.getStream(), response -> {
-            if (response.getCode() != 0) {
-                dynamicTask.stop(sendRtpItem.getCallId());
-                mediaListManager.removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
-                try {
-                    responseAck(request, Response.TEMPORARILY_UNAVAILABLE, response.getMsg());
-                } catch (SipException | InvalidArgumentException | ParseException e) {
-                    logger.error("[命令发送失败] 国标级联 点播回复: {}", e.getMessage());
-                }
-            }
         });
     }
 
@@ -836,12 +812,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
      */
     private void otherWvpPushStream(SendRtpItem sendRtpItem, SIPRequest request, ParentPlatform platform) {
         logger.info("[级联点播]直播流来自其他平台,发送redis消息");
-        // 发送redis消息
-        redisCatchStorage.sendStartSendRtp(sendRtpItem);
+        sendRtpItem = redisRpcService.getSendRtpItem(sendRtpItem);
         // 写入redis, 超时时回复
         sendRtpItem.setStatus(1);
-        sendRtpItem.setCallId(request.getCallIdHeader().getCallId());
-        sendRtpItem.setFromTag(request.getFromTag());
         SIPResponse response = sendStreamAck(request, sendRtpItem, platform);
         if (response != null) {
             sendRtpItem.setToTag(response.getToTag());

+ 0 - 4
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -23,7 +23,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.service.redisMsg.RedisPlatformPushStreamOnlineLister;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.DateUtil;
@@ -100,9 +99,6 @@ public class ZLMHttpHookListener {
     @Autowired
     private EventPublisher eventPublisher;
 
-    @Autowired
-    private RedisPlatformPushStreamOnlineLister zlmMediaListManager;
-
     @Autowired
     private ZlmHttpHookSubscribe subscribe;
 

+ 9 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java

@@ -365,4 +365,13 @@ public class ZLMServerFactory {
         }
         return result;
     }
+
+    public JSONObject stopSendRtpStream(MediaServerItem mediaServerItem, SendRtpItem sendRtpItem) {
+        Map<String, Object> param = new HashMap<>();
+        param.put("vhost", "__defaultVhost__");
+        param.put("app", sendRtpItem.getApp());
+        param.put("stream", sendRtpItem.getStream());
+        param.put("ssrc", sendRtpItem.getSsrc());
+        return zlmresTfulUtils.startSendRtp(mediaServerItem, param);
+    }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java

@@ -6,7 +6,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.media.zlm.dto.ServerKeepaliveData;
 import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
 
 import java.util.List;
 
@@ -97,4 +96,5 @@ public interface IMediaServerService {
 
     List<MediaServerItem> getAllWithAssistPort();
 
+    MediaServerItem getMediaServerByAppAndStream(String app, String stream);
 }

+ 16 - 5
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@@ -25,7 +25,6 @@ import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.utils.JsonUtil;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
-import com.genersoft.iot.vmp.vmanager.bean.RecordFile;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
@@ -36,19 +35,15 @@ import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
-import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
-import org.springframework.util.Assert;
 import org.springframework.util.ObjectUtils;
 
 import java.io.File;
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 /**
  * 媒体服务器节点管理
@@ -751,6 +746,22 @@ public class MediaServerServiceImpl implements IMediaServerService {
 
     @Override
     public List<MediaServerItem> getAllWithAssistPort() {
+
         return mediaServerMapper.queryAllWithAssistPort();
     }
+
+    @Override
+    public MediaServerItem getMediaServerByAppAndStream(String app, String stream) {
+        List<MediaServerItem> mediaServerItemList = getAllOnline();
+        if (mediaServerItemList.isEmpty()) {
+            return null;
+        }
+        for (MediaServerItem mediaServerItem : mediaServerItemList) {
+            Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, app, stream);
+            if (streamReady) {
+                return mediaServerItem;
+            }
+        }
+        return null;
+    }
 }

+ 16 - 0
src/main/java/com/genersoft/iot/vmp/service/redisMsg/IRedisRpcService.java

@@ -0,0 +1,16 @@
+package com.genersoft.iot.vmp.service.redisMsg;
+
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+
+public interface IRedisRpcService {
+
+    SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem);
+
+    WVPResult startSendRtp(SendRtpItem sendRtpItem);
+
+    void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback);
+
+    WVPResult stopSendRtp(SendRtpItem sendRtpItem);
+}

+ 0 - 97
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformPushStreamOnlineLister.java

@@ -1,97 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.*;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
-import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
-import com.genersoft.iot.vmp.utils.DateUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.text.ParseException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * @author lin
- */
-@Component
-public class RedisPlatformPushStreamOnlineLister implements MessageListener {
-
-    private final Logger logger = LoggerFactory.getLogger("RedisPlatformPushStreamOnlineLister");
-
-    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-    /**
-     * 通过redis消息接收流上线的通知,如果本机由对这个流的监听,则回调
-     */
-    @Override
-    public void onMessage(Message message, byte[] pattern) {
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
-                    sendStreamEvent(sendRtpItem);
-                }
-            });
-        }
-    }
-
-    private final Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
-
-    public void sendStreamEvent(SendRtpItem sendRtpItem) {
-        // 查看推流状态
-        ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
-        if (channelOnlineEventLister != null)  {
-            try {
-                channelOnlineEventLister.run(sendRtpItem);
-            } catch (ParseException e) {
-                logger.error("sendStreamEvent: ", e);
-            }
-            removedChannelOnlineEventLister(sendRtpItem.getApp(), sendRtpItem.getStream());
-        }
-    }
-
-    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
-        this.channelOnPublishEvents.put(app + "_" + stream, callback);
-    }
-
-    public void removedChannelOnlineEventLister(String app, String stream) {
-        this.channelOnPublishEvents.remove(app + "_" + stream);
-    }
-
-    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
-        return this.channelOnPublishEvents.get(app + "_" + stream);
-    }
-
-
-}

+ 0 - 122
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformStartSendRtpListener.java

@@ -1,122 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.alibaba.fastjson2.JSONObject;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * 收到消息后开始给上级发流
- * @author lin
- */
-@Component
-public class RedisPlatformStartSendRtpListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPlatformStartSendRtpListener.class);
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody()));
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        SendRtpItem sendRtpItem = JSON.parseObject(new String(msg.getBody()), SendRtpItem.class);
-                        sendRtpItem.getMediaServerId();
-                        MediaServerItem mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
-                        if (mediaServer == null) {
-                            return;
-                        }
-                        Map<String, Object> sendRtpParam = getSendRtpParam(sendRtpItem);
-                        sendRtp(sendRtpItem, mediaServer, sendRtpParam);
-
-                    }catch (Exception e) {
-                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
-                    }
-                }
-            });
-        }
-    }
-
-    private Map<String, Object> getSendRtpParam(SendRtpItem sendRtpItem) {
-        String isUdp = sendRtpItem.isTcp() ? "0" : "1";
-        Map<String, Object> param = new HashMap<>(12);
-        param.put("vhost","__defaultVhost__");
-        param.put("app",sendRtpItem.getApp());
-        param.put("stream",sendRtpItem.getStream());
-        param.put("ssrc", sendRtpItem.getSsrc());
-        param.put("dst_url",sendRtpItem.getIp());
-        param.put("dst_port", sendRtpItem.getPort());
-        param.put("src_port", sendRtpItem.getLocalPort());
-        param.put("pt", sendRtpItem.getPt());
-        param.put("use_ps", sendRtpItem.isUsePs() ? "1" : "0");
-        param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
-        param.put("is_udp", isUdp);
-        if (!sendRtpItem.isTcp()) {
-            // udp模式下开启rtcp保活
-            param.put("udp_rtcp_timeout", sendRtpItem.isRtcp()? "1":"0");
-        }
-        return param;
-    }
-
-    private JSONObject sendRtp(SendRtpItem sendRtpItem, MediaServerItem mediaInfo, Map<String, Object> param){
-        JSONObject startSendRtpStreamResult = null;
-        if (sendRtpItem.getLocalPort() != 0) {
-            if (sendRtpItem.isTcpActive()) {
-                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
-            }else {
-                param.put("dst_url", sendRtpItem.getIp());
-                param.put("dst_port", sendRtpItem.getPort());
-                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
-            }
-        }else {
-            if (sendRtpItem.isTcpActive()) {
-                startSendRtpStreamResult = zlmServerFactory.startSendRtpPassive(mediaInfo, param);
-            }else {
-                param.put("dst_url", sendRtpItem.getIp());
-                param.put("dst_port", sendRtpItem.getPort());
-                startSendRtpStreamResult = zlmServerFactory.startSendRtpStream(mediaInfo, param);
-            }
-        }
-        return startSendRtpStreamResult;
-
-    }
-}

+ 0 - 106
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPlatformWaitPushStreamOnlineListener.java

@@ -1,106 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
-import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
-import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * 上级等到设备推流的redis消息
- * @author lin
- */
-@Component
-public class RedisPlatformWaitPushStreamOnlineListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPlatformWaitPushStreamOnlineListener.class);
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private ZlmHttpHookSubscribe hookSubscribe;
-
-    @Autowired
-    private RedisPlatformPushStreamOnlineLister redisPlatformPushStreamOnlineLister;
-
-    @Autowired
-    private SSRCFactory ssrcFactory;
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-
-    /**
-     * 当上级点播时,这里负责监听等到流上线,流上线后如果是在当前服务则直接回调,如果是其他wvp,则由redis消息进行通知
-     */
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS消息-收到上级等到设备推流的redis消息]: {}", new String(message.getBody()));
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        MessageForPushChannel messageForPushChannel = JSON.parseObject(new String(msg.getBody()), MessageForPushChannel.class);
-                        if (messageForPushChannel == null
-                                || ObjectUtils.isEmpty(messageForPushChannel.getApp())
-                                || ObjectUtils.isEmpty(messageForPushChannel.getStream())
-                        || userSetting.getServerId().equals(messageForPushChannel.getServerId())){
-                            continue;
-                        }
-
-                        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
-                        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
-                                messageForPushChannel.getApp(), messageForPushChannel.getStream(), true, "rtsp",
-                                null);
-                        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
-                            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
-                            SendRtpItem sendRtpItem = redisCatchStorage.getWaiteSendRtpItem(messageForPushChannel.getApp(), messageForPushChannel.getStream());
-                            if (sendRtpItem.getSsrc() == null) {
-                                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
-                                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
-                                sendRtpItem.setSsrc(ssrc);
-                                sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
-                                sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
-                                redisPlatformPushStreamOnlineLister.sendStreamEvent(sendRtpItem);
-                                // 通知其他wvp, 由RedisPlatformPushStreamOnlineLister接收此监听。
-                                redisCatchStorage.sendPushStreamOnline(sendRtpItem);
-                            }
-                        });
-
-
-                    }catch (Exception e) {
-                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
-                    }
-                }
-            });
-        }
-    }
-}

+ 0 - 99
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamCloseResponseListener.java

@@ -1,99 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
-import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
-import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
-import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamPushService;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.stereotype.Component;
-
-import javax.sip.InvalidArgumentException;
-import javax.sip.SipException;
-import java.text.ParseException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * 接收redis发送的结束推流请求
- * @author lin
- */
-@Component
-public class RedisPushStreamCloseResponseListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamCloseResponseListener.class);
-
-    @Autowired
-    private IStreamPushService streamPushService;
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private IVideoManagerStorage storager;
-
-    @Autowired
-    private ISIPCommanderForPlatform commanderFroPlatform;
-
-    @Autowired
-    private UserSetting userSetting;
-
-    @Autowired
-    private IMediaServerService mediaServerService;
-
-    @Autowired
-    private ZLMServerFactory zlmServerFactory;
-
-
-    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
-
-    public interface PushStreamResponseEvent{
-        void run(MessageForPushChannelResponse response);
-    }
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS消息-推流结束]: {}", new String(message.getBody()));
-        MessageForPushChannel pushChannel = JSON.parseObject(message.getBody(), MessageForPushChannel.class);
-        StreamPushItem push = streamPushService.getPush(pushChannel.getApp(), pushChannel.getStream());
-        if (push != null) {
-            List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
-                    push.getGbId());
-            if (!sendRtpItems.isEmpty()) {
-                for (SendRtpItem sendRtpItem : sendRtpItems) {
-                    ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
-                    if (parentPlatform != null) {
-                        redisCatchStorage.deleteSendRTPServer(sendRtpItem.getPlatformId(), sendRtpItem.getChannelId(), sendRtpItem.getCallId(), sendRtpItem.getStream());
-                        try {
-                            commanderFroPlatform.streamByeCmd(parentPlatform, sendRtpItem);
-                        } catch (SipException | InvalidArgumentException | ParseException e) {
-                            logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
-                        }
-                    }
-                }
-            }
-        }
-
-    }
-
-    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
-        responseEvents.put(app + stream, callback);
-    }
-
-    public void removeEvent(String app, String stream) {
-        responseEvents.remove(app + stream);
-    }
-}

+ 0 - 76
src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java

@@ -1,76 +0,0 @@
-package com.genersoft.iot.vmp.service.redisMsg;
-
-import com.alibaba.fastjson2.JSON;
-import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.connection.MessageListener;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.ObjectUtils;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * 接收redis返回的推流结果
- * @author lin
- */
-@Component
-public class RedisPushStreamResponseListener implements MessageListener {
-
-    private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamResponseListener.class);
-
-    private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
-
-    @Qualifier("taskExecutor")
-    @Autowired
-    private ThreadPoolTaskExecutor taskExecutor;
-
-
-    private Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
-
-    public interface PushStreamResponseEvent{
-        void run(MessageForPushChannelResponse response);
-    }
-
-    @Override
-    public void onMessage(Message message, byte[] bytes) {
-        logger.info("[REDIS消息-请求推流结果]: {}", new String(message.getBody()));
-        boolean isEmpty = taskQueue.isEmpty();
-        taskQueue.offer(message);
-        if (isEmpty) {
-            taskExecutor.execute(() -> {
-                while (!taskQueue.isEmpty()) {
-                    Message msg = taskQueue.poll();
-                    try {
-                        MessageForPushChannelResponse response = JSON.parseObject(new String(msg.getBody()), MessageForPushChannelResponse.class);
-                        if (response == null || ObjectUtils.isEmpty(response.getApp()) || ObjectUtils.isEmpty(response.getStream())){
-                            logger.info("[REDIS消息-请求推流结果]:参数不全");
-                            continue;
-                        }
-                        // 查看正在等待的invite消息
-                        if (responseEvents.get(response.getApp() + response.getStream()) != null) {
-                            responseEvents.get(response.getApp() + response.getStream()).run(response);
-                        }
-                    }catch (Exception e) {
-                        logger.warn("[REDIS消息-请求推流结果] 发现未处理的异常, \r\n{}", JSON.toJSONString(message));
-                        logger.error("[REDIS消息-请求推流结果] 异常内容: ", e);
-                    }
-                }
-            });
-        }
-    }
-
-    public void addEvent(String app, String stream, PushStreamResponseEvent callback) {
-        responseEvents.put(app + stream, callback);
-    }
-
-    public void removeEvent(String app, String stream) {
-        responseEvents.remove(app + stream);
-    }
-}

+ 203 - 0
src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcController.java

@@ -0,0 +1,203 @@
+package com.genersoft.iot.vmp.service.redisMsg.control;
+
+import com.alibaba.fastjson2.JSON;
+import com.alibaba.fastjson2.JSONObject;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
+import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+
+/**
+ * 其他wvp发起的rpc调用,这里的方法被 RedisRpcConfig 通过反射寻找对应的方法名称调用
+ */
+@Component
+public class RedisRpcController {
+
+    private final static Logger logger = LoggerFactory.getLogger(RedisRpcController.class);
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
+
+    @Autowired
+    private IMediaServerService mediaServerService;
+
+    @Autowired
+    private SendRtpPortManager sendRtpPortManager;
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
+
+    @Autowired
+    private ZLMServerFactory zlmServerFactory;
+
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+
+    /**
+     * 获取发流的信息
+     */
+    public RedisRpcResponse getSendRtpItem(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 获取发流的信息: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+        // 查询本级是否有这个流
+        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (mediaServerItem == null) {
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        // 自平台内容
+        int localPort = sendRtpPortManager.getNextPort(mediaServerItem);
+        if (localPort == 0) {
+            logger.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        // 写入redis, 超时时回复
+        sendRtpItem.setStatus(1);
+        sendRtpItem.setServerId(userSetting.getServerId());
+        sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
+        if (sendRtpItem.getSsrc() == null) {
+            // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
+            String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
+            sendRtpItem.setSsrc(ssrc);
+        }
+        redisCatchStorage.updateSendRTPSever(sendRtpItem);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        response.setBody(sendRtpItem);
+        return response;
+    }
+
+    /**
+     * 监听流上线
+     */
+    public RedisRpcResponse waitePushStreamOnline(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 监听流上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+        // 查询本级是否有这个流
+        MediaServerItem mediaServerItem = mediaServerService.getMediaServerByAppAndStream(sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (mediaServerItem != null) {
+            logger.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            RedisRpcResponse response = request.getResponse();
+            response.setBody(sendRtpItem);
+            response.setStatusCode(200);
+        }
+        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+
+        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+            logger.info("[redis-rpc] 监听流上线,流已上线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
+            if (sendRtpItem.getSsrc() == null) {
+                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
+                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+                sendRtpItem.setSsrc(ssrc);
+            }
+            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+            sendRtpItem.setServerId(userSetting.getServerId());
+            RedisRpcResponse response = request.getResponse();
+            response.setBody(sendRtpItem);
+            response.setStatusCode(200);
+            // 手动发送结果
+            sendResponse(response);
+
+        });
+        return null;
+    }
+
+
+    /**
+     * 开始发流
+     */
+    public RedisRpcResponse startSendRtp(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+        if (mediaServerItem == null) {
+            logger.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+
+        Boolean streamReady = zlmServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
+        if (!streamReady) {
+            logger.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        JSONObject jsonObject = zlmServerFactory.startSendRtp(mediaServerItem, sendRtpItem);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        if (jsonObject.getInteger("code") == 0) {
+            WVPResult wvpResult = WVPResult.success();
+            response.setBody(wvpResult);
+        }else {
+            WVPResult wvpResult = WVPResult.fail(jsonObject.getInteger("code"), jsonObject.getString("msg"));
+            response.setBody(wvpResult);
+        }
+        return response;
+    }
+
+    /**
+     * 停止发流
+     */
+    public RedisRpcResponse stopSendRtp(RedisRpcRequest request) {
+        SendRtpItem sendRtpItem = JSON.parseObject(request.getParam().toString(), SendRtpItem.class);
+        logger.info("[redis-rpc] 停止推流: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+        MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
+        if (mediaServerItem == null) {
+            logger.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() );
+            RedisRpcResponse response = request.getResponse();
+            response.setStatusCode(200);
+        }
+        JSONObject jsonObject = zlmServerFactory.stopSendRtpStream(mediaServerItem, sendRtpItem);
+        RedisRpcResponse response = request.getResponse();
+        response.setStatusCode(200);
+        if (jsonObject.getInteger("code") == 0) {
+            logger.info("[redis-rpc] 停止推流成功: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() );
+            WVPResult wvpResult = WVPResult.success();
+            response.setBody(wvpResult);
+        }else {
+            int code = jsonObject.getInteger("code");
+            String msg = jsonObject.getString("msg");
+            logger.info("[redis-rpc] 停止推流失败: {}/{}, code: {}, msg: {}", sendRtpItem.getApp(), sendRtpItem.getStream(),code, msg );
+            WVPResult wvpResult = WVPResult.fail(code, msg);
+            response.setBody(wvpResult);
+        }
+        return response;
+    }
+
+    private void sendResponse(RedisRpcResponse response){
+        response.setToId(userSetting.getServerId());
+        RedisRpcMessage message = new RedisRpcMessage();
+        message.setResponse(response);
+        redisTemplate.convertAndSend(RedisRpcConfig.REDIS_REQUEST_CHANNEL_KEY, message);
+    }
+}

+ 100 - 0
src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java

@@ -0,0 +1,100 @@
+package com.genersoft.iot.vmp.service.redisMsg.service;
+
+import com.alibaba.fastjson2.JSON;
+import com.genersoft.iot.vmp.common.CommonCallback;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
+import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
+import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
+import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class RedisRpcServiceImpl implements IRedisRpcService {
+
+
+    @Autowired
+    private RedisRpcConfig redisRpcConfig;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private ZlmHttpHookSubscribe hookSubscribe;
+
+    @Autowired
+    private SSRCFactory ssrcFactory;
+
+    private RedisRpcRequest buildRequest(String uri, Object param) {
+        RedisRpcRequest request = new RedisRpcRequest();
+        request.setFromId(userSetting.getServerId());
+        request.setParam(param);
+        request.setUri(uri);
+        return request;
+    }
+
+    @Override
+    public SendRtpItem getSendRtpItem(SendRtpItem sendRtpItem) {
+
+        RedisRpcRequest request = buildRequest("getSendRtpItem", sendRtpItem);
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        return JSON.parseObject(response.getBody().toString(), SendRtpItem.class);
+    }
+
+    @Override
+    public WVPResult startSendRtp(SendRtpItem sendRtpItem) {
+        RedisRpcRequest request = buildRequest("startSendRtp", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
+    }
+
+    @Override
+    public WVPResult stopSendRtp(SendRtpItem sendRtpItem) {
+        RedisRpcRequest request = buildRequest("stopSendRtp", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        RedisRpcResponse response = redisRpcConfig.request(request, 10);
+        return JSON.parseObject(response.getBody().toString(), WVPResult.class);
+    }
+
+    @Override
+    public void waitePushStreamOnline(SendRtpItem sendRtpItem, CommonCallback<SendRtpItem> callback) {
+        // 监听流上线。 流上线直接发送sendRtpItem消息给实际的信令处理者
+        HookSubscribeForStreamChange hook = HookSubscribeFactory.on_stream_changed(
+                sendRtpItem.getApp(), sendRtpItem.getStream(), true, "rtsp", null);
+        hookSubscribe.addSubscribe(hook, (MediaServerItem mediaServerItemInUse, HookParam hookParam) -> {
+
+            // 读取redis中的上级点播信息,生成sendRtpItm发送出去
+            if (sendRtpItem.getSsrc() == null) {
+                // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
+                String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItemInUse.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItemInUse.getId());
+                sendRtpItem.setSsrc(ssrc);
+            }
+            sendRtpItem.setMediaServerId(mediaServerItemInUse.getId());
+            sendRtpItem.setLocalIp(mediaServerItemInUse.getSdpIp());
+            sendRtpItem.setServerId(userSetting.getServerId());
+            if (callback != null) {
+                callback.run(sendRtpItem);
+            }
+            hookSubscribe.removeSubscribe(hook);
+        });
+        RedisRpcRequest request = buildRequest("waitePushStreamOnline", sendRtpItem);
+        request.setToId(sendRtpItem.getServerId());
+        redisRpcConfig.request(request, response -> {
+            SendRtpItem sendRtpItemFromOther = JSON.parseObject(response.getBody().toString(), SendRtpItem.class);
+            if (callback != null) {
+                callback.run(sendRtpItemFromOther);
+            }
+        });
+
+    }
+}

+ 4 - 3
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -682,19 +682,20 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void addWaiteSendRtpItem(SendRtpItem sendRtpItem, int platformPlayTimeout) {
         String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
-        redisTemplate.opsForValue().set(key, platformPlayTimeout);
+        redisTemplate.opsForValue().set(key, sendRtpItem);
     }
 
     @Override
     public SendRtpItem getWaiteSendRtpItem(String app, String stream) {
         String key = VideoManagerConstants.WAITE_SEND_PUSH_STREAM + app + "_" + stream;
-        return (SendRtpItem)redisTemplate.opsForValue().get(key);
+        return JsonUtil.redisJsonToObject(redisTemplate, key, SendRtpItem.class);
     }
 
     @Override
     public void sendStartSendRtp(SendRtpItem sendRtpItem) {
         String key = VideoManagerConstants.START_SEND_PUSH_STREAM + sendRtpItem.getApp() + "_" + sendRtpItem.getStream();
-        redisTemplate.opsForValue().set(key, JSON.toJSONString(sendRtpItem));
+        logger.info("[redis发送通知] 通知其他WVP推流 {}: {}/{}->{}", key, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getPlatformId());
+        redisTemplate.convertAndSend(key, JSON.toJSON(sendRtpItem));
     }
 
     @Override