|
|
@@ -1,15 +1,14 @@
|
|
|
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
|
|
|
|
|
|
-import com.alibaba.fastjson2.JSONObject;
|
|
|
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
|
|
+import com.genersoft.iot.vmp.common.StreamInfo;
|
|
|
import com.genersoft.iot.vmp.conf.DynamicTask;
|
|
|
import com.genersoft.iot.vmp.conf.SipConfig;
|
|
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
|
|
import com.genersoft.iot.vmp.gb28181.bean.*;
|
|
|
-import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
|
|
|
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
|
|
-import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
|
|
|
import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
|
|
|
+import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
|
|
|
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
|
|
|
@@ -23,7 +22,12 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
|
|
|
import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
|
|
|
import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
|
|
|
import com.genersoft.iot.vmp.media.zlm.dto.*;
|
|
|
-import com.genersoft.iot.vmp.service.*;
|
|
|
+import com.genersoft.iot.vmp.service.IMediaServerService;
|
|
|
+import com.genersoft.iot.vmp.service.IPlayService;
|
|
|
+import com.genersoft.iot.vmp.service.IStreamProxyService;
|
|
|
+import com.genersoft.iot.vmp.service.IStreamPushService;
|
|
|
+import com.genersoft.iot.vmp.service.bean.InviteErrorCallback;
|
|
|
+import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
|
|
|
import com.genersoft.iot.vmp.service.bean.MessageForPushChannel;
|
|
|
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
|
|
|
import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
|
|
|
@@ -78,6 +82,9 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
@Autowired
|
|
|
private IRedisCatchStorage redisCatchStorage;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ private SSRCFactory ssrcFactory;
|
|
|
+
|
|
|
@Autowired
|
|
|
private DynamicTask dynamicTask;
|
|
|
|
|
|
@@ -90,8 +97,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
@Autowired
|
|
|
private SIPSender sipSender;
|
|
|
|
|
|
- @Autowired
|
|
|
- private AudioBroadcastManager audioBroadcastManager;
|
|
|
+ @Autowired
|
|
|
+ private AudioBroadcastManager audioBroadcastManager;
|
|
|
|
|
|
@Autowired
|
|
|
private ZLMRTPServerFactory zlmrtpServerFactory;
|
|
|
@@ -102,8 +109,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
@Autowired
|
|
|
private ISIPCommander commander;
|
|
|
|
|
|
- @Autowired
|
|
|
- private ZLMRESTfulUtils zlmresTfulUtils;
|
|
|
+ @Autowired
|
|
|
+ private ZLMRESTfulUtils zlmresTfulUtils;
|
|
|
|
|
|
@Autowired
|
|
|
private ZlmHttpHookSubscribe zlmHttpHookSubscribe;
|
|
|
@@ -111,29 +118,25 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
@Autowired
|
|
|
private SIPProcessorObserver sipProcessorObserver;
|
|
|
|
|
|
- @Autowired
|
|
|
- private VideoStreamSessionManager sessionManager;
|
|
|
-
|
|
|
@Autowired
|
|
|
private UserSetting userSetting;
|
|
|
|
|
|
@Autowired
|
|
|
private ZLMMediaListManager mediaListManager;
|
|
|
|
|
|
- @Autowired
|
|
|
- private DeferredResultHolder resultHolder;
|
|
|
+ @Autowired
|
|
|
+ private DeferredResultHolder resultHolder;
|
|
|
|
|
|
- @Autowired
|
|
|
- private ZlmHttpHookSubscribe subscribe;
|
|
|
+ @Autowired
|
|
|
+ private ZlmHttpHookSubscribe subscribe;
|
|
|
|
|
|
- @Autowired
|
|
|
- private SipConfig config;
|
|
|
+ @Autowired
|
|
|
+ private SipConfig config;
|
|
|
|
|
|
@Autowired
|
|
|
private VideoStreamSessionManager streamSession;
|
|
|
|
|
|
|
|
|
-
|
|
|
@Autowired
|
|
|
private RedisGbPlayMsgListener redisGbPlayMsgListener;
|
|
|
|
|
|
@@ -153,7 +156,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
public void process(RequestEvent evt) {
|
|
|
// Invite Request消息实现,此消息一般为级联消息,上级给下级发送请求视频指令
|
|
|
try {
|
|
|
- SIPRequest request = (SIPRequest)evt.getRequest();
|
|
|
+ SIPRequest request = (SIPRequest) evt.getRequest();
|
|
|
String channelId = SipUtils.getChannelIdFromRequest(request);
|
|
|
String requesterId = SipUtils.getUserIdFromFromHeader(request);
|
|
|
CallIdHeader callIdHeader = (CallIdHeader) request.getHeader(CallIdHeader.NAME);
|
|
|
@@ -167,27 +170,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- String ssrc = null;
|
|
|
- SessionDescription sdp = null;
|
|
|
- String ssrcDefault = "0000000000";
|
|
|
- if (channelId == null) {
|
|
|
- // 解析sdp消息, 使用jainsip 自带的sdp解析方式
|
|
|
- String contentString = new String(request.getRawContent());
|
|
|
-
|
|
|
- // jainSip不支持y=字段, 移除以解析。
|
|
|
- int ssrcIndex = contentString.indexOf("y=");
|
|
|
-
|
|
|
- if (ssrcIndex >= 0) {
|
|
|
- //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段
|
|
|
- ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
|
|
- String substring = contentString.substring(0, contentString.indexOf("y="));
|
|
|
- sdp = SdpFactory.getInstance().createSessionDescription(substring);
|
|
|
- } else {
|
|
|
- ssrc = ssrcDefault;
|
|
|
- sdp = SdpFactory.getInstance().createSessionDescription(contentString);
|
|
|
- }
|
|
|
- channelId = sdp.getOrigin().getUsername();
|
|
|
- }
|
|
|
|
|
|
|
|
|
// 查询请求是否来自上级平台\设备
|
|
|
@@ -249,7 +231,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- }else if("proxy".equals(gbStream.getStreamType())){
|
|
|
+ } else if ("proxy".equals(gbStream.getStreamType())) {
|
|
|
proxyByAppAndStream = streamProxyService.getStreamProxyByAppAndStream(gbStream.getApp(), gbStream.getStream());
|
|
|
if (proxyByAppAndStream == null) {
|
|
|
logger.info("[ app={}, stream={} ]找不到zlm {},返回410", gbStream.getApp(), gbStream.getStream(), mediaServerId);
|
|
|
@@ -285,23 +267,21 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
- if (sdp == null || ssrc == null) {
|
|
|
- // 解析sdp消息, 使用jainsip 自带的sdp解析方式
|
|
|
- String contentString = new String(request.getRawContent());
|
|
|
-
|
|
|
- // jainSip不支持y=字段, 移除以解析。
|
|
|
- int ssrcIndex = contentString.indexOf("y=");
|
|
|
- if (ssrcIndex >= 0) {
|
|
|
- //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段
|
|
|
- ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
|
|
- String substring = contentString.substring(0, contentString.indexOf("y="));
|
|
|
- sdp = SdpFactory.getInstance().createSessionDescription(substring);
|
|
|
- } else {
|
|
|
- ssrc = ssrcDefault;
|
|
|
- sdp = SdpFactory.getInstance().createSessionDescription(contentString);
|
|
|
- }
|
|
|
- }
|
|
|
+ // 解析sdp消息, 使用jainsip 自带的sdp解析方式
|
|
|
+ String contentString = new String(request.getRawContent());
|
|
|
|
|
|
+ // jainSip不支持y=字段, 移除以解析。
|
|
|
+ // 检查是否有y字段
|
|
|
+ int ssrcIndex = contentString.indexOf("y=");
|
|
|
+
|
|
|
+ SessionDescription sdp;
|
|
|
+ if (ssrcIndex >= 0) {
|
|
|
+ //ssrc规定长度为10个字节,不取余下长度以避免后续还有“f=”字段
|
|
|
+ String substring = contentString.substring(0, ssrcIndex);
|
|
|
+ sdp = SdpFactory.getInstance().createSessionDescription(substring);
|
|
|
+ } else {
|
|
|
+ sdp = SdpFactory.getInstance().createSessionDescription(contentString);
|
|
|
+ }
|
|
|
String sessionName = sdp.getSessionName().getValue();
|
|
|
|
|
|
Long startTime = null;
|
|
|
@@ -363,7 +343,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
String username = sdp.getOrigin().getUsername();
|
|
|
String addressStr = sdp.getConnection().getAddress();
|
|
|
|
|
|
- logger.info("[上级点播]用户:{}, 通道:{}, 地址:{}:{}, ssrc:{}", username, channelId, addressStr, port, ssrc);
|
|
|
+
|
|
|
Device device = null;
|
|
|
// 通过 channel 和 gbStream 是否为null 值判断来源是直播流合适国标
|
|
|
if (channel != null) {
|
|
|
@@ -387,6 +367,25 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
}
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ String ssrc;
|
|
|
+ if (userSetting.getUseCustomSsrcForParentInvite() || ssrcIndex < 0) {
|
|
|
+ // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
|
|
|
+ ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
|
|
|
+ } else {
|
|
|
+ ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
|
|
+ }
|
|
|
+ String streamTypeStr = null;
|
|
|
+ if (mediaTransmissionTCP) {
|
|
|
+ if (tcpActive) {
|
|
|
+ streamTypeStr = "TCP-ACTIVE";
|
|
|
+ } else {
|
|
|
+ streamTypeStr = "TCP-PASSIVE";
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ streamTypeStr = "UDP";
|
|
|
+ }
|
|
|
+ logger.info("[上级Invite] {}, 平台:{}, 通道:{}, 收流地址:{}:{},收流方式:{}, ssrc:{}", sessionName, username, channelId, addressStr, port, streamTypeStr, ssrc);
|
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
|
device.getDeviceId(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
|
|
|
|
@@ -407,11 +406,10 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
|
|
|
Long finalStartTime = startTime;
|
|
|
Long finalStopTime = stopTime;
|
|
|
- String finalChannelId = channelId;
|
|
|
- ZlmHttpHookSubscribe.Event hookEvent = (mediaServerItemInUSe, responseJSON) -> {
|
|
|
- String app = responseJSON.getString("app");
|
|
|
- String stream = responseJSON.getString("stream");
|
|
|
- logger.info("[上级点播]下级已经开始推流。 回复200OK(SDP), {}/{}", app, stream);
|
|
|
+ InviteErrorCallback<Object> hookEvent = (code, msg, data) -> {
|
|
|
+ StreamInfo streamInfo = (StreamInfo) data;
|
|
|
+ MediaServerItem mediaServerItemInUSe = mediaServerService.getOne(streamInfo.getMediaServerId());
|
|
|
+ logger.info("[上级Invite]下级已经开始推流。 回复200OK(SDP), {}/{}", streamInfo.getApp(), streamInfo.getStream());
|
|
|
// * 0 等待设备推流上来
|
|
|
// * 1 下级已经推流,等待上级平台回复ack
|
|
|
// * 2 推流中
|
|
|
@@ -420,7 +418,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
|
|
|
StringBuffer content = new StringBuffer(200);
|
|
|
content.append("v=0\r\n");
|
|
|
- content.append("o=" + finalChannelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
|
|
|
+ content.append("o=" + channelId + " 0 0 IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
|
|
|
content.append("s=" + sessionName + "\r\n");
|
|
|
content.append("c=IN IP4 " + mediaServerItemInUSe.getSdpIp() + "\r\n");
|
|
|
if ("Playback".equalsIgnoreCase(sessionName)) {
|
|
|
@@ -462,111 +460,118 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
logger.error("[命令发送失败] 国标级联 回复SdpAck", e);
|
|
|
}
|
|
|
};
|
|
|
- SipSubscribe.Event errorEvent = ((event) -> {
|
|
|
+ InviteErrorCallback<Object> errorEvent = ((statusCode, msg, data) -> {
|
|
|
// 未知错误。直接转发设备点播的错误
|
|
|
try {
|
|
|
- Response response = getMessageFactory().createResponse(event.statusCode, evt.getRequest());
|
|
|
- sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
|
|
|
- } catch (ParseException | SipException e) {
|
|
|
+ if (statusCode > 0) {
|
|
|
+ Response response = getMessageFactory().createResponse(statusCode, evt.getRequest());
|
|
|
+ sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), response);
|
|
|
+ }
|
|
|
+ } catch (ParseException | SipException e) {
|
|
|
logger.error("未处理的异常 ", e);
|
|
|
}
|
|
|
});
|
|
|
sendRtpItem.setApp("rtp");
|
|
|
if ("Playback".equalsIgnoreCase(sessionName)) {
|
|
|
sendRtpItem.setPlayType(InviteStreamType.PLAYBACK);
|
|
|
- SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, device.isSsrcCheck(), true);
|
|
|
+ SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
|
|
|
sendRtpItem.setStream(ssrcInfo.getStream());
|
|
|
// 写入redis, 超时时回复
|
|
|
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
playService.playBack(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
|
|
|
- DateUtil.formatter.format(end), null, result -> {
|
|
|
- if (result.getCode() != 0) {
|
|
|
- logger.warn("录像回放失败");
|
|
|
- if (result.getEvent() != null) {
|
|
|
- errorEvent.response(result.getEvent());
|
|
|
- }
|
|
|
- redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), finalChannelId, callIdHeader.getCallId(), null);
|
|
|
- try {
|
|
|
- responseAck(request, Response.REQUEST_TIMEOUT);
|
|
|
- } catch (SipException | InvalidArgumentException | ParseException e) {
|
|
|
- logger.error("[命令发送失败] 国标级联 录像回放 发送REQUEST_TIMEOUT: {}", e.getMessage());
|
|
|
- }
|
|
|
+ DateUtil.formatter.format(end),
|
|
|
+ (code, msg, data) -> {
|
|
|
+ if (code == InviteErrorCode.SUCCESS.getCode()) {
|
|
|
+ hookEvent.run(code, msg, data);
|
|
|
+ } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
|
|
|
+ logger.info("[录像回放]超时, 用户:{}, 通道:{}", username, channelId);
|
|
|
+ redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
+ } else {
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ } else if ("Download".equalsIgnoreCase(sessionName)) {
|
|
|
+ // 获取指定的下载速度
|
|
|
+ Vector sdpMediaDescriptions = sdp.getMediaDescriptions(true);
|
|
|
+ MediaDescription mediaDescription = null;
|
|
|
+ String downloadSpeed = "1";
|
|
|
+ if (sdpMediaDescriptions.size() > 0) {
|
|
|
+ mediaDescription = (MediaDescription) sdpMediaDescriptions.get(0);
|
|
|
+ }
|
|
|
+ if (mediaDescription != null) {
|
|
|
+ downloadSpeed = mediaDescription.getAttribute("downloadspeed");
|
|
|
+ }
|
|
|
+
|
|
|
+ sendRtpItem.setPlayType(InviteStreamType.DOWNLOAD);
|
|
|
+ SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, null, null, device.isSsrcCheck(), true, 0, false, false, device.getStreamModeForParam());
|
|
|
+ sendRtpItem.setStream(ssrcInfo.getStream());
|
|
|
+ // 写入redis, 超时时回复
|
|
|
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
+ playService.download(mediaServerItem, ssrcInfo, device.getDeviceId(), channelId, DateUtil.formatter.format(start),
|
|
|
+ DateUtil.formatter.format(end), Integer.parseInt(downloadSpeed),
|
|
|
+ (code, msg, data) -> {
|
|
|
+ if (code == InviteErrorCode.SUCCESS.getCode()) {
|
|
|
+ hookEvent.run(code, msg, data);
|
|
|
+ } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
|
|
|
+ logger.info("[录像下载]超时, 用户:{}, 通道:{}", username, channelId);
|
|
|
+ redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
} else {
|
|
|
- if (result.getMediaServerItem() != null) {
|
|
|
- hookEvent.response(result.getMediaServerItem(), result.getResponse());
|
|
|
- }
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
}
|
|
|
});
|
|
|
} else {
|
|
|
sendRtpItem.setPlayType(InviteStreamType.PLAY);
|
|
|
- SsrcTransaction playTransaction = sessionManager.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
|
|
|
- if (playTransaction != null) {
|
|
|
- Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, "rtp", playTransaction.getStream());
|
|
|
- if (!streamReady) {
|
|
|
- boolean hasRtpServer = mediaServerService.checkRtpServer(mediaServerItem, "rtp", playTransaction.getStream());
|
|
|
- if (hasRtpServer) {
|
|
|
- logger.info("[上级点播]已经开启rtpServer但是尚未收到流,开启监听流的到来");
|
|
|
- HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", playTransaction.getStream(), true, "rtsp", mediaServerItem.getId());
|
|
|
- zlmHttpHookSubscribe.addSubscribe(hookSubscribe, hookEvent);
|
|
|
- }else {
|
|
|
- playTransaction = null;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (playTransaction == null) {
|
|
|
- String streamId = null;
|
|
|
- if (mediaServerItem.isRtpEnable()) {
|
|
|
- streamId = String.format("%s_%s", device.getDeviceId(), channelId);
|
|
|
- }
|
|
|
- SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, device.isSsrcCheck(), false);
|
|
|
- logger.info(JSONObject.toJSONString(ssrcInfo));
|
|
|
- sendRtpItem.setStream(ssrcInfo.getStream());
|
|
|
- sendRtpItem.setSsrc(ssrc.equals(ssrcDefault) ? ssrcInfo.getSsrc() : ssrc);
|
|
|
-
|
|
|
- // 写入redis, 超时时回复
|
|
|
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
- playService.play(mediaServerItem, ssrcInfo, device, channelId, hookEvent, errorEvent, (code, msg) -> {
|
|
|
- logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, finalChannelId);
|
|
|
- redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), finalChannelId, callIdHeader.getCallId(), null);
|
|
|
- });
|
|
|
+ String streamId = null;
|
|
|
+ if (mediaServerItem.isRtpEnable()) {
|
|
|
+ streamId = String.format("%s_%s", device.getDeviceId(), channelId);
|
|
|
} else {
|
|
|
- sendRtpItem.setStream(playTransaction.getStream());
|
|
|
- // 写入redis, 超时时回复
|
|
|
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
- JSONObject jsonObject = new JSONObject();
|
|
|
- jsonObject.put("app", sendRtpItem.getApp());
|
|
|
- jsonObject.put("stream", sendRtpItem.getStream());
|
|
|
- hookEvent.response(mediaServerItem, jsonObject);
|
|
|
+ streamId = String.format("%08x", Integer.parseInt(ssrc)).toUpperCase();
|
|
|
}
|
|
|
+ sendRtpItem.setStream(streamId);
|
|
|
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
+ playService.play(mediaServerItem, device.getDeviceId(), channelId, ((code, msg, data) -> {
|
|
|
+ if (code == InviteErrorCode.SUCCESS.getCode()) {
|
|
|
+ hookEvent.run(code, msg, data);
|
|
|
+ } else if (code == InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode() || code == InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode()) {
|
|
|
+ logger.info("[上级点播]超时, 用户:{}, 通道:{}", username, channelId);
|
|
|
+ redisCatchStorage.deleteSendRTPServer(platform.getServerGBId(), channelId, callIdHeader.getCallId(), null);
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
+ } else {
|
|
|
+ errorEvent.run(code, msg, data);
|
|
|
+ }
|
|
|
+ }));
|
|
|
+
|
|
|
}
|
|
|
} else if (gbStream != null) {
|
|
|
- if(ssrc.equals(ssrcDefault))
|
|
|
- {
|
|
|
- SsrcConfig ssrcConfig = mediaServerItem.getSsrcConfig();
|
|
|
- if(ssrcConfig != null)
|
|
|
- {
|
|
|
- ssrc = ssrcConfig.getPlaySsrc();
|
|
|
- ssrcConfig.releaseSsrc(ssrc);
|
|
|
- }
|
|
|
+
|
|
|
+ String ssrc;
|
|
|
+ if (userSetting.getUseCustomSsrcForParentInvite() || ssrcIndex < 0) {
|
|
|
+ // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式
|
|
|
+ ssrc = "Play".equalsIgnoreCase(sessionName) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
|
|
|
+ } else {
|
|
|
+ ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
|
|
|
}
|
|
|
- if("push".equals(gbStream.getStreamType())) {
|
|
|
+
|
|
|
+ if ("push".equals(gbStream.getStreamType())) {
|
|
|
if (streamPushItem != null && streamPushItem.isPushIng()) {
|
|
|
// 推流状态
|
|
|
pushStream(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
|
|
|
} else {
|
|
|
// 未推流 拉起
|
|
|
- notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
+ notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
|
|
|
}
|
|
|
- }else if ("proxy".equals(gbStream.getStreamType())){
|
|
|
+ } else if ("proxy".equals(gbStream.getStreamType())) {
|
|
|
if (null != proxyByAppAndStream) {
|
|
|
- if(proxyByAppAndStream.isStatus()){
|
|
|
- pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
+ if (proxyByAppAndStream.isStatus()) {
|
|
|
+ pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
//开启代理拉流
|
|
|
- notifyStreamOnline(evt, request,gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
+ notifyStreamOnline(evt, request, gbStream, null, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
|
|
|
}
|
|
|
}
|
|
|
@@ -586,42 +591,43 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
* 安排推流
|
|
|
*/
|
|
|
private void pushProxyStream(RequestEvent evt, SIPRequest request, GbStream gbStream, ParentPlatform platform,
|
|
|
- CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
|
|
|
- int port, Boolean tcpActive, boolean mediaTransmissionTCP,
|
|
|
- String channelId, String addressStr, String ssrc, String requesterId) {
|
|
|
- Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
|
|
- if (streamReady) {
|
|
|
- // 自平台内容
|
|
|
- SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
|
- gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
|
-
|
|
|
- if (sendRtpItem == null) {
|
|
|
- logger.warn("服务器端口资源不足");
|
|
|
- try {
|
|
|
- responseAck(request, Response.BUSY_HERE);
|
|
|
- } catch (SipException | InvalidArgumentException | ParseException e) {
|
|
|
- logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
|
|
|
- }
|
|
|
- return;
|
|
|
- }
|
|
|
- if (tcpActive != null) {
|
|
|
- sendRtpItem.setTcpActive(tcpActive);
|
|
|
- }
|
|
|
- sendRtpItem.setPlayType(InviteStreamType.PUSH);
|
|
|
- // 写入redis, 超时时回复
|
|
|
- sendRtpItem.setStatus(1);
|
|
|
- sendRtpItem.setCallId(callIdHeader.getCallId());
|
|
|
- sendRtpItem.setFromTag(request.getFromTag());
|
|
|
-
|
|
|
- SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
|
|
|
- if (response != null) {
|
|
|
- sendRtpItem.setToTag(response.getToTag());
|
|
|
+ CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
|
|
|
+ int port, Boolean tcpActive, boolean mediaTransmissionTCP,
|
|
|
+ String channelId, String addressStr, String ssrc, String requesterId) {
|
|
|
+ Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
|
|
+ if (streamReady != null && streamReady) {
|
|
|
+ // 自平台内容
|
|
|
+ SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
|
+ gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
|
+
|
|
|
+ if (sendRtpItem == null) {
|
|
|
+ logger.warn("服务器端口资源不足");
|
|
|
+ try {
|
|
|
+ responseAck(request, Response.BUSY_HERE);
|
|
|
+ } catch (SipException | InvalidArgumentException | ParseException e) {
|
|
|
+ logger.error("[命令发送失败] invite 服务器端口资源不足: {}", e.getMessage());
|
|
|
}
|
|
|
- redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (tcpActive != null) {
|
|
|
+ sendRtpItem.setTcpActive(tcpActive);
|
|
|
+ }
|
|
|
+ sendRtpItem.setPlayType(InviteStreamType.PUSH);
|
|
|
+ // 写入redis, 超时时回复
|
|
|
+ sendRtpItem.setStatus(1);
|
|
|
+ sendRtpItem.setCallId(callIdHeader.getCallId());
|
|
|
+ sendRtpItem.setFromTag(request.getFromTag());
|
|
|
+
|
|
|
+ SIPResponse response = sendStreamAck(mediaServerItem, request, sendRtpItem, platform, evt);
|
|
|
+ if (response != null) {
|
|
|
+ sendRtpItem.setToTag(response.getToTag());
|
|
|
+ }
|
|
|
+ redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
+
|
|
|
private void pushStream(RequestEvent evt, SIPRequest request, GbStream gbStream, StreamPushItem streamPushItem, ParentPlatform platform,
|
|
|
CallIdHeader callIdHeader, MediaServerItem mediaServerItem,
|
|
|
int port, Boolean tcpActive, boolean mediaTransmissionTCP,
|
|
|
@@ -629,7 +635,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
// 推流
|
|
|
if (streamPushItem.isSelf()) {
|
|
|
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
|
|
|
- if (streamReady) {
|
|
|
+ if (streamReady != null && streamReady) {
|
|
|
// 自平台内容
|
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
|
gbStream.getApp(), gbStream.getStream(), channelId, mediaTransmissionTCP, platform.isRtcp());
|
|
|
@@ -661,7 +667,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
|
|
|
} else {
|
|
|
// 不在线 拉起
|
|
|
- notifyStreamOnline(evt, request,gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
+ notifyStreamOnline(evt, request, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
|
|
|
}
|
|
|
|
|
|
@@ -671,6 +677,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
/**
|
|
|
* 通知流上线
|
|
|
*/
|
|
|
@@ -688,7 +695,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
String stream = responseJSON.getString("stream");
|
|
|
logger.info("[上级点播]拉流代理已经就绪, {}/{}", app, stream);
|
|
|
dynamicTask.stop(callIdHeader.getCallId());
|
|
|
- pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
+ pushProxyStream(evt, request, gbStream, platform, callIdHeader, mediaServerItem, port, tcpActive,
|
|
|
mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
|
|
|
});
|
|
|
dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
|
|
|
@@ -707,7 +714,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
}
|
|
|
|
|
|
|
|
|
-
|
|
|
} else if ("push".equals(gbStream.getStreamType())) {
|
|
|
if (!platform.isStartOfflinePush()) {
|
|
|
// 平台设置中关闭了拉起离线的推流则直接回复
|
|
|
@@ -811,7 +817,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
// 发送redis消息
|
|
|
redisGbPlayMsgListener.sendMsg(streamPushItem.getServerId(), streamPushItem.getMediaServerId(),
|
|
|
streamPushItem.getApp(), streamPushItem.getStream(), addressStr, port, ssrc, requesterId,
|
|
|
- channelId, mediaTransmissionTCP, platform.isRtcp(),null, responseSendItemMsg -> {
|
|
|
+ channelId, mediaTransmissionTCP, platform.isRtcp(), null, responseSendItemMsg -> {
|
|
|
SendRtpItem sendRtpItem = responseSendItemMsg.getSendRtpItem();
|
|
|
if (sendRtpItem == null || responseSendItemMsg.getMediaServerItem() == null) {
|
|
|
logger.warn("服务器端口资源不足");
|
|
|
@@ -836,7 +842,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
sendRtpItem.setCallId(callIdHeader.getCallId());
|
|
|
|
|
|
sendRtpItem.setFromTag(request.getFromTag());
|
|
|
- SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request,sendRtpItem, platform, evt);
|
|
|
+ SIPResponse response = sendStreamAck(responseSendItemMsg.getMediaServerItem(), request, sendRtpItem, platform, evt);
|
|
|
if (response != null) {
|
|
|
sendRtpItem.setToTag(response.getToTag());
|
|
|
}
|
|
|
@@ -877,8 +883,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
content.append("t=0 0\r\n");
|
|
|
// 非严格模式端口不统一, 增加兼容性,修改为一个不为0的端口
|
|
|
int localPort = sendRtpItem.getLocalPort();
|
|
|
- if(localPort == 0)
|
|
|
- {
|
|
|
+ if (localPort == 0) {
|
|
|
localPort = new Random().nextInt(65535) + 1;
|
|
|
}
|
|
|
content.append("m=video " + localPort + " RTP/AVP 96\r\n");
|
|
|
@@ -953,7 +958,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
}
|
|
|
if (device != null) {
|
|
|
logger.info("收到设备" + requesterId + "的语音广播Invite请求");
|
|
|
- String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + broadcastCatch.getChannelId();
|
|
|
+ String key = VideoManagerConstants.BROADCAST_WAITE_INVITE + device.getDeviceId() + broadcastCatch.getChannelId();
|
|
|
dynamicTask.stop(key);
|
|
|
try {
|
|
|
responseAck(request, Response.TRYING);
|
|
|
@@ -986,7 +991,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
boolean mediaTransmissionTCP = false;
|
|
|
Boolean tcpActive = null;
|
|
|
for (int i = 0; i < mediaDescriptions.size(); i++) {
|
|
|
- MediaDescription mediaDescription = (MediaDescription)mediaDescriptions.get(i);
|
|
|
+ MediaDescription mediaDescription = (MediaDescription) mediaDescriptions.get(i);
|
|
|
Media media = mediaDescription.getMedia();
|
|
|
|
|
|
Vector mediaFormats = media.getMediaFormats(false);
|
|
|
@@ -1022,7 +1027,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
}
|
|
|
String addressStr = sdp.getOrigin().getAddress();
|
|
|
logger.info("设备{}请求语音流,地址:{}:{},ssrc:{}, {}", requesterId, addressStr, port, ssrc,
|
|
|
- mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP");
|
|
|
+ mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP");
|
|
|
|
|
|
MediaServerItem mediaServerItem = broadcastCatch.getMediaServerItem();
|
|
|
if (mediaServerItem == null) {
|
|
|
@@ -1036,7 +1041,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
return;
|
|
|
}
|
|
|
logger.info("设备{}请求语音流, 收流地址:{}:{},ssrc:{}, {}, 对讲方式:{}", requesterId, addressStr, port, ssrc,
|
|
|
- mediaTransmissionTCP ? (tcpActive? "TCP主动":"TCP被动") : "UDP", sdp.getSessionName().getValue());
|
|
|
+ mediaTransmissionTCP ? (tcpActive ? "TCP主动" : "TCP被动") : "UDP", sdp.getSessionName().getValue());
|
|
|
|
|
|
SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, port, ssrc, requesterId,
|
|
|
device.getDeviceId(), broadcastCatch.getChannelId(),
|
|
|
@@ -1076,7 +1081,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, broadcastCatch.getApp(), broadcastCatch.getStream());
|
|
|
if (streamReady) {
|
|
|
sendOk(device, sendRtpItem, sdp, request, mediaServerItem, mediaTransmissionTCP, ssrc);
|
|
|
- }else {
|
|
|
+ } else {
|
|
|
logger.warn("[语音通话], 未发现待推送的流,app={},stream={}", broadcastCatch.getApp(), broadcastCatch.getStream());
|
|
|
try {
|
|
|
responseAck(request, Response.GONE);
|
|
|
@@ -1093,29 +1098,30 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
} else {
|
|
|
logger.warn("来自无效设备/平台的请求");
|
|
|
try {
|
|
|
- responseAck(request, Response.BAD_REQUEST);; // 不支持的格式,发415
|
|
|
+ responseAck(request, Response.BAD_REQUEST);
|
|
|
+ ; // 不支持的格式,发415
|
|
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
|
|
logger.error("[命令发送失败] invite 来自无效设备/平台的请求, {}", e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- SIPResponse sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request, MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc){
|
|
|
+ SIPResponse sendOk(Device device, SendRtpItem sendRtpItem, SessionDescription sdp, SIPRequest request, MediaServerItem mediaServerItem, boolean mediaTransmissionTCP, String ssrc) {
|
|
|
SIPResponse sipResponse = null;
|
|
|
try {
|
|
|
sendRtpItem.setStatus(2);
|
|
|
redisCatchStorage.updateSendRTPSever(sendRtpItem);
|
|
|
StringBuffer content = new StringBuffer(200);
|
|
|
content.append("v=0\r\n");
|
|
|
- content.append("o="+ config.getId() +" "+ sdp.getOrigin().getSessionId() +" " + sdp.getOrigin().getSessionVersion() + " IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
|
|
|
+ content.append("o=" + config.getId() + " " + sdp.getOrigin().getSessionId() + " " + sdp.getOrigin().getSessionVersion() + " IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
|
|
|
content.append("s=Play\r\n");
|
|
|
- content.append("c=IN IP4 "+mediaServerItem.getSdpIp()+"\r\n");
|
|
|
+ content.append("c=IN IP4 " + mediaServerItem.getSdpIp() + "\r\n");
|
|
|
content.append("t=0 0\r\n");
|
|
|
|
|
|
if (mediaTransmissionTCP) {
|
|
|
- content.append("m=audio "+ sendRtpItem.getLocalPort()+" TCP/RTP/AVP 8\r\n");
|
|
|
- }else {
|
|
|
- content.append("m=audio "+ sendRtpItem.getLocalPort()+" RTP/AVP 8\r\n");
|
|
|
+ content.append("m=audio " + sendRtpItem.getLocalPort() + " TCP/RTP/AVP 8\r\n");
|
|
|
+ } else {
|
|
|
+ content.append("m=audio " + sendRtpItem.getLocalPort() + " RTP/AVP 8\r\n");
|
|
|
}
|
|
|
|
|
|
content.append("a=rtpmap:8 PCMA/8000/1\r\n");
|
|
|
@@ -1125,11 +1131,11 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
|
|
|
content.append("a=connection:new\r\n");
|
|
|
if (!sendRtpItem.isTcpActive()) {
|
|
|
content.append("a=setup:active\r\n");
|
|
|
- }else {
|
|
|
+ } else {
|
|
|
content.append("a=setup:passive\r\n");
|
|
|
}
|
|
|
}
|
|
|
- content.append("y="+ ssrc + "\r\n");
|
|
|
+ content.append("y=" + ssrc + "\r\n");
|
|
|
content.append("f=v/////a/1/8/1\r\n");
|
|
|
|
|
|
ParentPlatform parentPlatform = new ParentPlatform();
|