PlayServiceImpl.java 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764
  1. package com.genersoft.iot.vmp.service.impl;
  2. import java.math.BigDecimal;
  3. import java.math.RoundingMode;
  4. import java.util.List;
  5. import java.util.Objects;
  6. import java.util.UUID;
  7. import javax.sip.ResponseEvent;
  8. import com.genersoft.iot.vmp.gb28181.bean.*;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.http.HttpStatus;
  13. import org.springframework.http.ResponseEntity;
  14. import org.springframework.stereotype.Service;
  15. import org.springframework.web.context.request.async.DeferredResult;
  16. import com.alibaba.fastjson.JSON;
  17. import com.alibaba.fastjson.JSONArray;
  18. import com.alibaba.fastjson.JSONObject;
  19. import com.genersoft.iot.vmp.common.StreamInfo;
  20. import com.genersoft.iot.vmp.conf.DynamicTask;
  21. import com.genersoft.iot.vmp.conf.SipConfig;
  22. import com.genersoft.iot.vmp.conf.UserSetting;
  23. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  24. import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
  25. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  26. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  27. import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  28. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  29. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
  30. import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  31. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
  32. import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
  33. import com.genersoft.iot.vmp.media.zlm.dto.HookType;
  34. import com.genersoft.iot.vmp.utils.DateUtil;
  35. import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
  36. import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  37. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  38. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  39. import com.genersoft.iot.vmp.service.IMediaServerService;
  40. import com.genersoft.iot.vmp.service.IMediaService;
  41. import com.genersoft.iot.vmp.service.IPlayService;
  42. import com.genersoft.iot.vmp.service.bean.InviteTimeOutCallback;
  43. import com.genersoft.iot.vmp.service.bean.PlayBackCallback;
  44. import com.genersoft.iot.vmp.service.bean.PlayBackResult;
  45. import com.genersoft.iot.vmp.service.bean.SSRCInfo;
  46. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  47. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  48. import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  49. import com.genersoft.iot.vmp.vmanager.bean.AudioBroadcastResult;
  50. import com.genersoft.iot.vmp.utils.DateUtil;
  51. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  52. import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.AudioBroadcastEvent;
  53. import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
  54. import gov.nist.javax.sip.stack.SIPDialog;
  55. import org.slf4j.Logger;
  56. import org.slf4j.LoggerFactory;
  57. import org.springframework.beans.factory.annotation.Autowired;
  58. import org.springframework.http.HttpStatus;
  59. import org.springframework.http.ResponseEntity;
  60. import org.springframework.stereotype.Service;
  61. import org.springframework.util.ResourceUtils;
  62. import org.springframework.web.context.request.async.DeferredResult;
  63. import javax.sip.ResponseEvent;
  64. import javax.sip.SipException;
  65. import java.io.FileNotFoundException;
  66. import java.math.BigDecimal;
  67. import java.text.ParseException;
  68. import java.math.RoundingMode;
  69. import java.util.*;
  70. import java.util.stream.Collectors;
  71. import java.util.stream.Stream;
  72. @SuppressWarnings(value = {"rawtypes", "unchecked"})
  73. @Service
  74. public class PlayServiceImpl implements IPlayService {
  75. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  76. @Autowired
  77. private IVideoManagerStorage storager;
  78. @Autowired
  79. private SIPCommander cmder;
  80. @Autowired
  81. private AudioBroadcastManager audioBroadcastManager;
  82. @Autowired
  83. private SIPCommanderFroPlatform sipCommanderFroPlatform;
  84. @Autowired
  85. private IRedisCatchStorage redisCatchStorage;
  86. @Autowired
  87. private ZLMRTPServerFactory zlmrtpServerFactory;
  88. @Autowired
  89. private DeferredResultHolder resultHolder;
  90. @Autowired
  91. private ZLMRESTfulUtils zlmresTfulUtils;
  92. @Autowired
  93. private AssistRESTfulUtils assistRESTfulUtils;
  94. @Autowired
  95. private IMediaService mediaService;
  96. @Autowired
  97. private IMediaServerService mediaServerService;
  98. @Autowired
  99. private VideoStreamSessionManager streamSession;
  100. @Autowired
  101. private UserSetting userSetting;
  102. @Autowired
  103. private SipConfig sipConfig;
  104. @Autowired
  105. private DynamicTask dynamicTask;
  106. @Autowired
  107. private ZLMHttpHookSubscribe subscribe;
  108. @Override
  109. public PlayResult play(MediaServerItem mediaServerItem, String deviceId, String channelId,
  110. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  111. Runnable timeoutCallback) {
  112. PlayResult playResult = new PlayResult();
  113. RequestMessage msg = new RequestMessage();
  114. String key = DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId;
  115. msg.setKey(key);
  116. String uuid = UUID.randomUUID().toString();
  117. msg.setId(uuid);
  118. playResult.setUuid(uuid);
  119. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
  120. playResult.setResult(result);
  121. // 录像查询以channelId作为deviceId查询
  122. resultHolder.put(key, uuid, result);
  123. if (mediaServerItem == null) {
  124. WVPResult wvpResult = new WVPResult();
  125. wvpResult.setCode(-1);
  126. wvpResult.setMsg("未找到可用的zlm");
  127. msg.setData(wvpResult);
  128. resultHolder.invokeResult(msg);
  129. return playResult;
  130. }
  131. Device device = redisCatchStorage.getDevice(deviceId);
  132. StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  133. playResult.setDevice(device);
  134. result.onCompletion(()->{
  135. // 点播结束时调用截图接口
  136. // TODO 应该在上流时调用更好,结束也可能是错误结束
  137. String path = "snap";
  138. String fileName = deviceId + "_" + channelId + ".jpg";
  139. ResponseEntity responseEntity = (ResponseEntity)result.getResult();
  140. if (responseEntity != null && responseEntity.getStatusCode() == HttpStatus.OK) {
  141. WVPResult wvpResult = (WVPResult)responseEntity.getBody();
  142. if (Objects.requireNonNull(wvpResult).getCode() == 0) {
  143. StreamInfo streamInfoForSuccess = (StreamInfo)wvpResult.getData();
  144. MediaServerItem mediaInfo = mediaServerService.getOne(streamInfoForSuccess.getMediaServerId());
  145. String streamUrl = streamInfoForSuccess.getFmp4();
  146. // 请求截图
  147. logger.info("[请求截图]: " + fileName);
  148. zlmresTfulUtils.getSnap(mediaInfo, streamUrl, 15, 1, path, fileName);
  149. }
  150. }
  151. });
  152. if (streamInfo != null) {
  153. String streamId = streamInfo.getStream();
  154. if (streamId == null) {
  155. WVPResult wvpResult = new WVPResult();
  156. wvpResult.setCode(-1);
  157. wvpResult.setMsg("点播失败, redis缓存streamId等于null");
  158. msg.setData(wvpResult);
  159. resultHolder.invokeAllResult(msg);
  160. return playResult;
  161. }
  162. String mediaServerId = streamInfo.getMediaServerId();
  163. MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
  164. JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaInfo, streamId);
  165. if(rtpInfo.getInteger("code") == 0){
  166. if (rtpInfo.getBoolean("exist")) {
  167. WVPResult wvpResult = new WVPResult();
  168. wvpResult.setCode(0);
  169. wvpResult.setMsg("success");
  170. wvpResult.setData(streamInfo);
  171. msg.setData(wvpResult);
  172. resultHolder.invokeAllResult(msg);
  173. if (hookEvent != null) {
  174. hookEvent.response(mediaServerItem, JSONObject.parseObject(JSON.toJSONString(streamInfo)));
  175. }
  176. }else {
  177. redisCatchStorage.stopPlay(streamInfo);
  178. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  179. streamInfo = null;
  180. }
  181. }else {
  182. //zlm连接失败
  183. redisCatchStorage.stopPlay(streamInfo);
  184. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  185. streamInfo = null;
  186. }
  187. }
  188. if (streamInfo == null) {
  189. String streamId = null;
  190. if (mediaServerItem.isRtpEnable()) {
  191. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  192. }
  193. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
  194. play(mediaServerItem, ssrcInfo, device, channelId, (mediaServerItemInUse, response)->{
  195. if (hookEvent != null) {
  196. hookEvent.response(mediaServerItem, response);
  197. }
  198. }, event -> {
  199. // sip error错误
  200. WVPResult wvpResult = new WVPResult();
  201. wvpResult.setCode(-1);
  202. wvpResult.setMsg(String.format("点播失败, 错误码: %s, %s", event.statusCode, event.msg));
  203. msg.setData(wvpResult);
  204. resultHolder.invokeAllResult(msg);
  205. if (errorEvent != null) {
  206. errorEvent.response(event);
  207. }
  208. }, (code, msgStr)->{
  209. // invite点播超时
  210. WVPResult wvpResult = new WVPResult();
  211. wvpResult.setCode(-1);
  212. if (code == 0) {
  213. wvpResult.setMsg("点播超时,请稍候重试");
  214. }else if (code == 1) {
  215. wvpResult.setMsg("收流超时,请稍候重试");
  216. }
  217. msg.setData(wvpResult);
  218. // 回复之前所有的点播请求
  219. resultHolder.invokeAllResult(msg);
  220. }, uuid);
  221. }
  222. return playResult;
  223. }
  224. @Override
  225. public void play(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
  226. ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
  227. InviteTimeOutCallback timeoutCallback, String uuid) {
  228. String streamId = null;
  229. if (mediaServerItem.isRtpEnable()) {
  230. streamId = String.format("%s_%s", device.getDeviceId(), channelId);
  231. }
  232. if (ssrcInfo == null) {
  233. ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, device.isSsrcCheck(), false);
  234. }
  235. logger.info("[点播开始] deviceId: {}, channelId: {}, SSRC: {}", device.getDeviceId(), channelId, ssrcInfo.getSsrc() );
  236. // 超时处理
  237. String timeOutTaskKey = UUID.randomUUID().toString();
  238. SSRCInfo finalSsrcInfo = ssrcInfo;
  239. dynamicTask.startDelay( timeOutTaskKey,()->{
  240. SIPDialog dialog = streamSession.getDialogByStream(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  241. if (dialog != null) {
  242. logger.info("[点播超时] 收流超时 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
  243. timeoutCallback.run(1, "收流超时");
  244. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  245. cmder.streamByeCmd(device.getDeviceId(), channelId, finalSsrcInfo.getStream(), null);
  246. }else {
  247. logger.info("[点播超时] 消息未响应 deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
  248. timeoutCallback.run(0, "点播超时");
  249. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  250. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  251. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  252. }
  253. }, userSetting.getPlayTimeout());
  254. final String ssrc = ssrcInfo.getSsrc();
  255. final String stream = ssrcInfo.getStream();
  256. //端口获取失败的ssrcInfo 没有必要发送点播指令
  257. if(ssrcInfo.getPort() <= 0){
  258. logger.info("[点播端口分配异常],deviceId={},channelId={},ssrcInfo={}", device.getDeviceId(), channelId, ssrcInfo);
  259. return;
  260. }
  261. cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channelId, (MediaServerItem mediaServerItemInuse, JSONObject response) -> {
  262. logger.info("收到订阅消息: " + response.toJSONString());
  263. dynamicTask.stop(timeOutTaskKey);
  264. // hook响应
  265. onPublishHandlerForPlay(mediaServerItemInuse, response, device.getDeviceId(), channelId, uuid);
  266. hookEvent.response(mediaServerItemInuse, response);
  267. logger.info("[点播成功] deviceId: {}, channelId: {}", device.getDeviceId(), channelId);
  268. }, (event) -> {
  269. ResponseEvent responseEvent = (ResponseEvent)event.event;
  270. String contentString = new String(responseEvent.getResponse().getRawContent());
  271. // 获取ssrc
  272. int ssrcIndex = contentString.indexOf("y=");
  273. // 检查是否有y字段
  274. if (ssrcIndex >= 0) {
  275. //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
  276. String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
  277. // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
  278. if (ssrc.equals(ssrcInResponse)) {
  279. return;
  280. }
  281. logger.info("[点播消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse );
  282. if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
  283. logger.info("[SIP 消息] SSRC修正 {}->{}", ssrc, ssrcInResponse);
  284. if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
  285. // ssrc 不可用
  286. // 释放ssrc
  287. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  288. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  289. event.msg = "下级自定义了ssrc,但是此ssrc不可用";
  290. event.statusCode = 400;
  291. errorEvent.response(event);
  292. return;
  293. }
  294. // 单端口模式streamId也有变化,需要重新设置监听
  295. if (!mediaServerItem.isRtpEnable()) {
  296. // 添加订阅
  297. HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
  298. subscribe.removeSubscribe(hookSubscribe);
  299. hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
  300. subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
  301. logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
  302. dynamicTask.stop(timeOutTaskKey);
  303. // hook响应
  304. onPublishHandlerForPlay(mediaServerItemInUse, response, device.getDeviceId(), channelId, uuid);
  305. hookEvent.response(mediaServerItemInUse, response);
  306. });
  307. }
  308. // 关闭rtp server
  309. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  310. // 重新开启ssrc server
  311. mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
  312. }
  313. }
  314. }, (event) -> {
  315. dynamicTask.stop(timeOutTaskKey);
  316. mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  317. // 释放ssrc
  318. mediaServerService.releaseSsrc(mediaServerItem.getId(), finalSsrcInfo.getSsrc());
  319. streamSession.remove(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
  320. errorEvent.response(event);
  321. });
  322. }
  323. @Override
  324. public void onPublishHandlerForPlay(MediaServerItem mediaServerItem, JSONObject response, String deviceId, String channelId, String uuid) {
  325. RequestMessage msg = new RequestMessage();
  326. if (uuid != null) {
  327. msg.setId(uuid);
  328. }
  329. msg.setKey(DeferredResultHolder.CALLBACK_CMD_PLAY + deviceId + channelId);
  330. StreamInfo streamInfo = onPublishHandler(mediaServerItem, response, deviceId, channelId);
  331. if (streamInfo != null) {
  332. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  333. if (deviceChannel != null) {
  334. deviceChannel.setStreamId(streamInfo.getStream());
  335. storager.startPlay(deviceId, channelId, streamInfo.getStream());
  336. }
  337. redisCatchStorage.startPlay(streamInfo);
  338. WVPResult wvpResult = new WVPResult();
  339. wvpResult.setCode(0);
  340. wvpResult.setMsg("success");
  341. wvpResult.setData(streamInfo);
  342. msg.setData(wvpResult);
  343. resultHolder.invokeAllResult(msg);
  344. } else {
  345. logger.warn("设备预览API调用失败!");
  346. msg.setData("设备预览API调用失败!");
  347. resultHolder.invokeAllResult(msg);
  348. }
  349. }
  350. @Override
  351. public MediaServerItem getNewMediaServerItem(Device device) {
  352. if (device == null) {
  353. return null;
  354. }
  355. String mediaServerId = device.getMediaServerId();
  356. MediaServerItem mediaServerItem;
  357. if (mediaServerId == null) {
  358. mediaServerItem = mediaServerService.getMediaServerForMinimumLoad();
  359. }else {
  360. mediaServerItem = mediaServerService.getOne(mediaServerId);
  361. }
  362. if (mediaServerItem == null) {
  363. logger.warn("点播时未找到可使用的ZLM...");
  364. }
  365. return mediaServerItem;
  366. }
  367. @Override
  368. public DeferredResult<ResponseEntity<String>> playBack(String deviceId, String channelId, String startTime,
  369. String endTime,InviteStreamCallback inviteStreamCallback,
  370. PlayBackCallback callback) {
  371. Device device = storager.queryVideoDevice(deviceId);
  372. if (device == null) {
  373. return null;
  374. }
  375. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  376. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
  377. return playBack(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, inviteStreamCallback, callback);
  378. }
  379. @Override
  380. public DeferredResult<ResponseEntity<String>> playBack(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo,
  381. String deviceId, String channelId, String startTime,
  382. String endTime, InviteStreamCallback infoCallBack,
  383. PlayBackCallback playBackCallback) {
  384. if (mediaServerItem == null || ssrcInfo == null) {
  385. return null;
  386. }
  387. String uuid = UUID.randomUUID().toString();
  388. String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;
  389. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  390. Device device = storager.queryVideoDevice(deviceId);
  391. if (device == null) {
  392. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  393. return result;
  394. }
  395. resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId, uuid, result);
  396. RequestMessage msg = new RequestMessage();
  397. msg.setId(uuid);
  398. msg.setKey(key);
  399. PlayBackResult<RequestMessage> playBackResult = new PlayBackResult<>();
  400. String playBackTimeOutTaskKey = UUID.randomUUID().toString();
  401. dynamicTask.startDelay(playBackTimeOutTaskKey, ()->{
  402. logger.warn(String.format("设备回放超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  403. playBackResult.setCode(-1);
  404. playBackResult.setData(msg);
  405. playBackCallback.call(playBackResult);
  406. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  407. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  408. if (dialog != null) {
  409. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  410. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  411. }else {
  412. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  413. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  414. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  415. }
  416. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  417. // 回复之前所有的点播请求
  418. playBackCallback.call(playBackResult);
  419. }, userSetting.getPlayTimeout());
  420. cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, infoCallBack,
  421. (InviteStreamInfo inviteStreamInfo) -> {
  422. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  423. dynamicTask.stop(playBackTimeOutTaskKey);
  424. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  425. if (streamInfo == null) {
  426. logger.warn("设备回放API调用失败!");
  427. msg.setData("设备回放API调用失败!");
  428. playBackResult.setCode(-1);
  429. playBackResult.setData(msg);
  430. playBackCallback.call(playBackResult);
  431. return;
  432. }
  433. redisCatchStorage.startPlayback(streamInfo, inviteStreamInfo.getCallId());
  434. msg.setData(JSON.toJSONString(streamInfo));
  435. playBackResult.setCode(0);
  436. playBackResult.setData(msg);
  437. playBackResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  438. playBackResult.setResponse(inviteStreamInfo.getResponse());
  439. playBackCallback.call(playBackResult);
  440. }, event -> {
  441. dynamicTask.stop(playBackTimeOutTaskKey);
  442. msg.setData(String.format("回放失败, 错误码: %s, %s", event.statusCode, event.msg));
  443. playBackResult.setCode(-1);
  444. playBackResult.setData(msg);
  445. playBackResult.setEvent(event);
  446. playBackCallback.call(playBackResult);
  447. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  448. });
  449. return result;
  450. }
  451. @Override
  452. public DeferredResult<ResponseEntity<String>> download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
  453. Device device = storager.queryVideoDevice(deviceId);
  454. if (device == null) {
  455. return null;
  456. }
  457. MediaServerItem newMediaServerItem = getNewMediaServerItem(device);
  458. SSRCInfo ssrcInfo = mediaServerService.openRTPServer(newMediaServerItem, null, true, true);
  459. return download(newMediaServerItem, ssrcInfo, deviceId, channelId, startTime, endTime, downloadSpeed,infoCallBack, hookCallBack);
  460. }
  461. @Override
  462. public DeferredResult<ResponseEntity<String>> download(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, String deviceId, String channelId, String startTime, String endTime, int downloadSpeed, InviteStreamCallback infoCallBack, PlayBackCallback hookCallBack) {
  463. if (mediaServerItem == null || ssrcInfo == null) {
  464. return null;
  465. }
  466. String uuid = UUID.randomUUID().toString();
  467. String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;
  468. DeferredResult<ResponseEntity<String>> result = new DeferredResult<>(30000L);
  469. Device device = storager.queryVideoDevice(deviceId);
  470. if (device == null) {
  471. result.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
  472. return result;
  473. }
  474. resultHolder.put(key, uuid, result);
  475. RequestMessage msg = new RequestMessage();
  476. msg.setId(uuid);
  477. msg.setKey(key);
  478. WVPResult<StreamInfo> wvpResult = new WVPResult<>();
  479. msg.setData(wvpResult);
  480. PlayBackResult<RequestMessage> downloadResult = new PlayBackResult<>();
  481. downloadResult.setData(msg);
  482. String downLoadTimeOutTaskKey = UUID.randomUUID().toString();
  483. dynamicTask.startDelay(downLoadTimeOutTaskKey, ()->{
  484. logger.warn(String.format("录像下载请求超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  485. wvpResult.setCode(-1);
  486. wvpResult.setMsg("录像下载请求超时");
  487. downloadResult.setCode(-1);
  488. hookCallBack.call(downloadResult);
  489. SIPDialog dialog = streamSession.getDialogByStream(deviceId, channelId, ssrcInfo.getStream());
  490. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  491. if (dialog != null) {
  492. // 点播超时回复BYE 同时释放ssrc以及此次点播的资源
  493. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  494. }else {
  495. mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
  496. mediaServerService.closeRTPServer(deviceId, channelId, ssrcInfo.getStream());
  497. streamSession.remove(deviceId, channelId, ssrcInfo.getStream());
  498. }
  499. cmder.streamByeCmd(device.getDeviceId(), channelId, ssrcInfo.getStream(), null);
  500. // 回复之前所有的点播请求
  501. hookCallBack.call(downloadResult);
  502. }, userSetting.getPlayTimeout());
  503. cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
  504. inviteStreamInfo -> {
  505. logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
  506. dynamicTask.stop(downLoadTimeOutTaskKey);
  507. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  508. streamInfo.setStartTime(startTime);
  509. streamInfo.setEndTime(endTime);
  510. if (streamInfo == null) {
  511. logger.warn("录像下载API调用失败!");
  512. wvpResult.setCode(-1);
  513. wvpResult.setMsg("录像下载API调用失败");
  514. downloadResult.setCode(-1);
  515. hookCallBack.call(downloadResult);
  516. return ;
  517. }
  518. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  519. wvpResult.setCode(0);
  520. wvpResult.setMsg("success");
  521. wvpResult.setData(streamInfo);
  522. downloadResult.setCode(0);
  523. downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
  524. downloadResult.setResponse(inviteStreamInfo.getResponse());
  525. hookCallBack.call(downloadResult);
  526. }, event -> {
  527. dynamicTask.stop(downLoadTimeOutTaskKey);
  528. downloadResult.setCode(-1);
  529. wvpResult.setCode(-1);
  530. wvpResult.setMsg(String.format("录像下载失败, 错误码: %s, %s", event.statusCode, event.msg));
  531. downloadResult.setEvent(event);
  532. hookCallBack.call(downloadResult);
  533. streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
  534. });
  535. return result;
  536. }
  537. @Override
  538. public StreamInfo getDownLoadInfo(String deviceId, String channelId, String stream) {
  539. StreamInfo streamInfo = redisCatchStorage.queryDownload(deviceId, channelId, stream, null);
  540. if (streamInfo != null) {
  541. if (streamInfo.getProgress() == 1) {
  542. return streamInfo;
  543. }
  544. // 获取当前已下载时长
  545. String mediaServerId = streamInfo.getMediaServerId();
  546. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  547. if (mediaServerItem == null) {
  548. logger.warn("查询录像信息时发现节点已离线");
  549. return null;
  550. }
  551. if (mediaServerItem.getRecordAssistPort() != 0) {
  552. JSONObject jsonObject = assistRESTfulUtils.fileDuration(mediaServerItem, streamInfo.getApp(), streamInfo.getStream(), null);
  553. if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  554. long duration = jsonObject.getLong("data");
  555. if (duration == 0) {
  556. streamInfo.setProgress(0);
  557. }else {
  558. String startTime = streamInfo.getStartTime();
  559. String endTime = streamInfo.getEndTime();
  560. long start = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(startTime);
  561. long end = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestamp(endTime);
  562. BigDecimal currentCount = new BigDecimal(duration/1000);
  563. BigDecimal totalCount = new BigDecimal(end-start);
  564. BigDecimal divide = currentCount.divide(totalCount,2, RoundingMode.HALF_UP);
  565. double process = divide.doubleValue();
  566. streamInfo.setProgress(process);
  567. }
  568. }
  569. }
  570. }
  571. return streamInfo;
  572. }
  573. @Override
  574. public void onPublishHandlerForDownload(InviteStreamInfo inviteStreamInfo, String deviceId, String channelId, String uuid) {
  575. RequestMessage msg = new RequestMessage();
  576. msg.setKey(DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId);
  577. msg.setId(uuid);
  578. StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
  579. if (streamInfo != null) {
  580. redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
  581. msg.setData(JSON.toJSONString(streamInfo));
  582. resultHolder.invokeResult(msg);
  583. } else {
  584. logger.warn("设备预览API调用失败!");
  585. msg.setData("设备预览API调用失败!");
  586. resultHolder.invokeResult(msg);
  587. }
  588. }
  589. public StreamInfo onPublishHandler(MediaServerItem mediaServerItem, JSONObject resonse, String deviceId, String channelId) {
  590. String streamId = resonse.getString("stream");
  591. JSONArray tracks = resonse.getJSONArray("tracks");
  592. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(mediaServerItem,"rtp", streamId, tracks, null);
  593. streamInfo.setDeviceID(deviceId);
  594. streamInfo.setChannelId(channelId);
  595. return streamInfo;
  596. }
  597. @Override
  598. public void zlmServerOffline(String mediaServerId) {
  599. // 处理正在向上推流的上级平台
  600. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServer(null);
  601. if (sendRtpItems.size() > 0) {
  602. for (SendRtpItem sendRtpItem : sendRtpItems) {
  603. if (sendRtpItem.getMediaServerId().equals(mediaServerId)) {
  604. ParentPlatform platform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  605. sipCommanderFroPlatform.streamByeCmd(platform, sendRtpItem.getCallId());
  606. }
  607. }
  608. }
  609. // 处理正在观看的国标设备
  610. List<SsrcTransaction> allSsrc = streamSession.getAllSsrc();
  611. if (allSsrc.size() > 0) {
  612. for (SsrcTransaction ssrcTransaction : allSsrc) {
  613. if(ssrcTransaction.getMediaServerId().equals(mediaServerId)) {
  614. cmder.streamByeCmd(ssrcTransaction.getDeviceId(), ssrcTransaction.getChannelId(),
  615. ssrcTransaction.getStream(), null);
  616. }
  617. }
  618. }
  619. }
  620. @Override
  621. public void audioBroadcast(Device device, String channelId, int timeout, AudioBroadcastEvent event) {
  622. if (device == null || channelId == null) {
  623. return;
  624. }
  625. DeviceChannel deviceChannel = storager.queryChannel(device.getDeviceId(), channelId);
  626. if (deviceChannel == null) {
  627. logger.warn("开启语音广播的时候未找到通道: {}", channelId);
  628. event.call("开启语音广播的时候未找到通道");
  629. return;
  630. }
  631. // 查询通道使用状态
  632. if (audioBroadcastManager.exit(device.getDeviceId(), channelId)) {
  633. SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(device.getDeviceId(), channelId, null, null);
  634. if (sendRtpItem != null && sendRtpItem.isOnlyAudio()) {
  635. // 查询流是否存在,不存在则认为是异常状态
  636. MediaServerItem mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  637. Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStreamId());
  638. if (streamReady) {
  639. logger.warn("语音广播已经开启: {}", channelId);
  640. event.call("语音广播已经开启");
  641. return;
  642. }else {
  643. audioBroadcastManager.del(deviceChannel.getDeviceId(),channelId);
  644. redisCatchStorage.deleteSendRTPServer(device.getDeviceId(), channelId, sendRtpItem.getCallId(), sendRtpItem.getStreamId());
  645. }
  646. }
  647. }
  648. // 发送通知
  649. cmder.audioBroadcastCmd(device, channelId, eventResultForOk -> {
  650. // 发送成功
  651. AudioBroadcastCatch audioBroadcastCatch = new AudioBroadcastCatch(device.getDeviceId(), channelId, AudioBroadcastCatchStatus.Ready);
  652. audioBroadcastManager.add(audioBroadcastCatch);
  653. }, eventResultForError -> {
  654. // 发送失败
  655. logger.error("语音广播发送失败: {}:{}", channelId, eventResultForError.msg);
  656. event.call("语音广播发送失败");
  657. stopAudioBroadcast(device.getDeviceId(), channelId);
  658. });
  659. }
  660. @Override
  661. public void stopAudioBroadcast(String deviceId, String channelId){
  662. AudioBroadcastCatch audioBroadcastCatch = audioBroadcastManager.get(deviceId, channelId);
  663. if (audioBroadcastCatch != null) {
  664. try {
  665. SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(deviceId, audioBroadcastCatch.getChannelId(), null, null);
  666. if (sendRtpItem != null) {
  667. redisCatchStorage.deleteSendRTPServer(deviceId, sendRtpItem.getChannelId(), null, null);
  668. MediaServerItem mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
  669. Map<String, Object> param = new HashMap<>();
  670. param.put("vhost", "__defaultVhost__");
  671. param.put("app", sendRtpItem.getApp());
  672. param.put("stream", sendRtpItem.getStreamId());
  673. zlmresTfulUtils.stopSendRtp(mediaInfo, param);
  674. // 立刻结束设备的推流,等待自行结束太慢
  675. zlmresTfulUtils.closeStreams(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStreamId());
  676. }
  677. if (audioBroadcastCatch.getStatus() == AudioBroadcastCatchStatus.Ok) {
  678. cmder.streamByeCmd(audioBroadcastCatch.getDialog(), audioBroadcastCatch.getRequest(), null);
  679. }
  680. audioBroadcastManager.del(deviceId, channelId);
  681. } catch (SipException e) {
  682. throw new RuntimeException(e);
  683. } catch (ParseException e) {
  684. throw new RuntimeException(e);
  685. }
  686. }
  687. }
  688. @Override
  689. public void zlmServerOnline(String mediaServerId) {
  690. // 似乎没啥需要做的
  691. }
  692. }