PlayServiceImpl.java 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.gb28181.bean.Device;
  7. import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  8. import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
  9. import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
  10. import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
  11. import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
  12. import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  13. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  14. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  15. import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  16. import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
  17. import com.genersoft.iot.vmp.service.IMediaService;
  18. import com.genersoft.iot.vmp.service.IPlayService;
  19. import org.slf4j.Logger;
  20. import org.slf4j.LoggerFactory;
  21. import org.springframework.beans.factory.annotation.Autowired;
  22. import org.springframework.http.ResponseEntity;
  23. import org.springframework.stereotype.Service;
  24. import org.springframework.web.context.request.async.DeferredResult;
  25. import javax.sip.message.Response;
  26. import java.util.UUID;
  27. @Service
  28. public class PlayServiceImpl implements IPlayService {
  29. private final static Logger logger = LoggerFactory.getLogger(PlayServiceImpl.class);
  30. @Autowired
  31. private IVideoManagerStorager storager;
  32. @Autowired
  33. private SIPCommander cmder;
  34. @Autowired
  35. private IRedisCatchStorage redisCatchStorage;
  36. @Autowired
  37. private DeferredResultHolder resultHolder;
  38. @Autowired
  39. private ZLMRESTfulUtils zlmresTfulUtils;
  40. @Autowired
  41. private IMediaService mediaService;
  42. @Override
  43. public PlayResult play(String deviceId, String channelId, ZLMHttpHookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent) {
  44. PlayResult playResult = new PlayResult();
  45. Device device = storager.queryVideoDevice(deviceId);
  46. StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(deviceId, channelId);
  47. playResult.setDevice(device);
  48. UUID uuid = UUID.randomUUID();
  49. playResult.setUuid(uuid.toString());
  50. DeferredResult<ResponseEntity<String>> result = new DeferredResult<ResponseEntity<String>>();
  51. playResult.setResult(result);
  52. // 录像查询以channelId作为deviceId查询
  53. resultHolder.put(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid, result);
  54. // 超时处理
  55. result.onTimeout(()->{
  56. logger.warn(String.format("设备点播超时,deviceId:%s ,channelId:%s", deviceId, channelId));
  57. // 释放rtpserver
  58. cmder.closeRTPServer(playResult.getDevice(), channelId);
  59. RequestMessage msg = new RequestMessage();
  60. msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + playResult.getUuid());
  61. msg.setData("Timeout");
  62. resultHolder.invokeResult(msg);
  63. });
  64. if (streamInfo == null) {
  65. // 发送点播消息
  66. cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
  67. logger.info("收到订阅消息: " + response.toJSONString());
  68. onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
  69. if (hookEvent != null) {
  70. hookEvent.response(response);
  71. }
  72. }, event -> {
  73. RequestMessage msg = new RequestMessage();
  74. msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
  75. Response response = event.getResponse();
  76. cmder.closeRTPServer(playResult.getDevice(), channelId);
  77. msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
  78. resultHolder.invokeResult(msg);
  79. if (errorEvent != null) {
  80. errorEvent.response(event);
  81. }
  82. });
  83. } else {
  84. String streamId = streamInfo.getStreamId();
  85. if (streamId == null) {
  86. RequestMessage msg = new RequestMessage();
  87. msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
  88. msg.setData(String.format("点播失败, redis缓存streamId等于null"));
  89. resultHolder.invokeResult(msg);
  90. return playResult;
  91. }
  92. JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(streamId);
  93. if (rtpInfo != null && rtpInfo.getBoolean("exist")) {
  94. RequestMessage msg = new RequestMessage();
  95. msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
  96. msg.setData(JSON.toJSONString(streamInfo));
  97. resultHolder.invokeResult(msg);
  98. if (hookEvent != null) {
  99. hookEvent.response(JSONObject.parseObject(JSON.toJSONString(streamInfo)));
  100. }
  101. } else {
  102. redisCatchStorage.stopPlay(streamInfo);
  103. storager.stopPlay(streamInfo.getDeviceID(), streamInfo.getChannelId());
  104. cmder.playStreamCmd(device, channelId, (JSONObject response) -> {
  105. logger.info("收到订阅消息: " + response.toJSONString());
  106. onPublishHandlerForPlay(response, deviceId, channelId, uuid.toString());
  107. }, event -> {
  108. cmder.closeRTPServer(playResult.getDevice(), channelId);
  109. RequestMessage msg = new RequestMessage();
  110. msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
  111. Response response = event.getResponse();
  112. msg.setData(String.format("点播失败, 错误码: %s, %s", response.getStatusCode(), response.getReasonPhrase()));
  113. resultHolder.invokeResult(msg);
  114. });
  115. }
  116. }
  117. return playResult;
  118. }
  119. @Override
  120. public void onPublishHandlerForPlay(JSONObject resonse, String deviceId, String channelId, String uuid) {
  121. RequestMessage msg = new RequestMessage();
  122. msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
  123. StreamInfo streamInfo = onPublishHandler(resonse, deviceId, channelId, uuid);
  124. if (streamInfo != null) {
  125. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  126. if (deviceChannel != null) {
  127. deviceChannel.setStreamId(streamInfo.getStreamId());
  128. storager.startPlay(deviceId, channelId, streamInfo.getStreamId());
  129. }
  130. redisCatchStorage.startPlay(streamInfo);
  131. msg.setData(JSON.toJSONString(streamInfo));
  132. resultHolder.invokeResult(msg);
  133. } else {
  134. logger.warn("设备预览API调用失败!");
  135. msg.setData("设备预览API调用失败!");
  136. resultHolder.invokeResult(msg);
  137. }
  138. }
  139. @Override
  140. public void onPublishHandlerForPlayBack(JSONObject resonse, String deviceId, String channelId, String uuid) {
  141. RequestMessage msg = new RequestMessage();
  142. msg.setId(DeferredResultHolder.CALLBACK_CMD_PlAY + uuid);
  143. StreamInfo streamInfo = onPublishHandler(resonse, deviceId, channelId, uuid);
  144. if (streamInfo != null) {
  145. redisCatchStorage.startPlayback(streamInfo);
  146. msg.setData(JSON.toJSONString(streamInfo));
  147. resultHolder.invokeResult(msg);
  148. } else {
  149. logger.warn("设备预览API调用失败!");
  150. msg.setData("设备预览API调用失败!");
  151. resultHolder.invokeResult(msg);
  152. }
  153. }
  154. public StreamInfo onPublishHandler(JSONObject resonse, String deviceId, String channelId, String uuid) {
  155. String streamId = resonse.getString("stream");
  156. JSONArray tracks = resonse.getJSONArray("tracks");
  157. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream("rtp", streamId, tracks);
  158. streamInfo.setDeviceID(deviceId);
  159. streamInfo.setChannelId(channelId);
  160. return streamInfo;
  161. }
  162. }