MediaServiceImpl.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.genersoft.iot.vmp.common.InviteInfo;
  3. import com.genersoft.iot.vmp.common.InviteSessionType;
  4. import com.genersoft.iot.vmp.common.VideoManagerConstants;
  5. import com.genersoft.iot.vmp.conf.UserSetting;
  6. import com.genersoft.iot.vmp.conf.exception.ControllerException;
  7. import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
  8. import com.genersoft.iot.vmp.gb28181.bean.*;
  9. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  10. import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
  11. import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
  12. import com.genersoft.iot.vmp.media.bean.MediaServer;
  13. import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
  14. import com.alibaba.fastjson2.JSONArray;
  15. import com.alibaba.fastjson2.JSONObject;
  16. import com.genersoft.iot.vmp.common.StreamInfo;
  17. import com.genersoft.iot.vmp.conf.MediaConfig;
  18. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  19. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  20. import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
  21. import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
  22. import com.genersoft.iot.vmp.service.*;
  23. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  24. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  25. import com.genersoft.iot.vmp.utils.DateUtil;
  26. import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
  27. import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
  28. import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
  29. import org.slf4j.Logger;
  30. import org.slf4j.LoggerFactory;
  31. import org.springframework.beans.factory.annotation.Autowired;
  32. import org.springframework.data.redis.core.RedisTemplate;
  33. import org.springframework.stereotype.Service;
  34. import org.springframework.util.ObjectUtils;
  35. import javax.sip.InvalidArgumentException;
  36. import javax.sip.SipException;
  37. import java.text.ParseException;
  38. import java.util.HashMap;
  39. import java.util.List;
  40. import java.util.Map;
  41. @Service
  42. public class MediaServiceImpl implements IMediaService {
  43. private final static Logger logger = LoggerFactory.getLogger(MediaServiceImpl.class);
  44. @Autowired
  45. private IRedisCatchStorage redisCatchStorage;
  46. @Autowired
  47. private IStreamProxyService streamProxyService;
  48. @Autowired
  49. private UserSetting userSetting;
  50. @Autowired
  51. private RedisTemplate<Object, Object> redisTemplate;
  52. @Autowired
  53. private IUserService userService;
  54. @Autowired
  55. private IInviteStreamService inviteStreamService;
  56. @Autowired
  57. private VideoStreamSessionManager sessionManager;
  58. @Autowired
  59. private IVideoManagerStorage storager;
  60. @Autowired
  61. private IDeviceService deviceService;
  62. @Autowired
  63. private ISIPCommanderForPlatform commanderForPlatform;
  64. @Autowired
  65. private ISIPCommander commander;
  66. @Override
  67. public boolean authenticatePlay(String app, String stream, String callId) {
  68. if (app == null || stream == null) {
  69. return false;
  70. }
  71. if ("rtp".equals(app)) {
  72. return true;
  73. }
  74. StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
  75. if (streamAuthorityInfo == null || streamAuthorityInfo.getCallId() == null) {
  76. return true;
  77. }
  78. return streamAuthorityInfo.getCallId().equals(callId);
  79. }
  80. @Override
  81. public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
  82. // 推流鉴权的处理
  83. if (!"rtp".equals(app)) {
  84. StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
  85. if (streamProxyItem != null) {
  86. ResultForOnPublish result = new ResultForOnPublish();
  87. result.setEnable_audio(streamProxyItem.isEnableAudio());
  88. result.setEnable_mp4(streamProxyItem.isEnableMp4());
  89. return result;
  90. }
  91. if (userSetting.getPushAuthority()) {
  92. // 对于推流进行鉴权
  93. Map<String, String> paramMap = urlParamToMap(params);
  94. // 推流鉴权
  95. if (params == null) {
  96. logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
  97. throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
  98. }
  99. String sign = paramMap.get("sign");
  100. if (sign == null) {
  101. logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
  102. throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
  103. }
  104. // 推流自定义播放鉴权码
  105. String callId = paramMap.get("callId");
  106. // 鉴权配置
  107. boolean hasAuthority = userService.checkPushAuthority(callId, sign);
  108. if (!hasAuthority) {
  109. logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
  110. throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
  111. }
  112. StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
  113. streamAuthorityInfo.setCallId(callId);
  114. streamAuthorityInfo.setSign(sign);
  115. // 鉴权通过
  116. redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
  117. }
  118. }
  119. ResultForOnPublish result = new ResultForOnPublish();
  120. result.setEnable_audio(true);
  121. // 是否录像
  122. if ("rtp".equals(app)) {
  123. result.setEnable_mp4(userSetting.getRecordSip());
  124. } else {
  125. result.setEnable_mp4(userSetting.isRecordPushLive());
  126. }
  127. // 国标流
  128. if ("rtp".equals(app)) {
  129. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
  130. // 单端口模式下修改流 ID
  131. if (!mediaServer.isRtpEnable() && inviteInfo == null) {
  132. String ssrc = String.format("%010d", Long.parseLong(stream, 16));
  133. inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
  134. if (inviteInfo != null) {
  135. result.setStream_replace(inviteInfo.getStream());
  136. logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", stream, inviteInfo.getStream());
  137. stream = inviteInfo.getStream();
  138. }
  139. }
  140. // 设置音频信息及录制信息
  141. List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream);
  142. if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
  143. // 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
  144. StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
  145. streamAuthorityInfo.setApp(app);
  146. streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
  147. streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
  148. redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
  149. String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
  150. String channelId = ssrcTransactionForAll.get(0).getChannelId();
  151. DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
  152. if (deviceChannel != null) {
  153. result.setEnable_audio(deviceChannel.getHasAudio());
  154. }
  155. // 如果是录像下载就设置视频间隔十秒
  156. if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
  157. // 获取录像的总时长,然后设置为这个视频的时长
  158. InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
  159. if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
  160. String startTime = inviteInfoForDownload.getStreamInfo().getStartTime();
  161. String endTime = inviteInfoForDownload.getStreamInfo().getEndTime();
  162. long difference = DateUtil.getDifference(startTime, endTime) / 1000;
  163. result.setMp4_max_second((int) difference);
  164. result.setEnable_mp4(true);
  165. // 设置为2保证得到的mp4的时长是正常的
  166. result.setModify_stamp(2);
  167. }
  168. }
  169. // 如果是talk对讲,则默认获取声音
  170. if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) {
  171. result.setEnable_audio(true);
  172. }
  173. }
  174. } else if (app.equals("broadcast")) {
  175. result.setEnable_audio(true);
  176. } else if (app.equals("talk")) {
  177. result.setEnable_audio(true);
  178. }
  179. if (app.equalsIgnoreCase("rtp")) {
  180. String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
  181. OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
  182. String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + stream;
  183. OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
  184. if (otherRtpSendInfo != null || otherPsSendInfo != null) {
  185. result.setEnable_mp4(true);
  186. }
  187. }
  188. return result;
  189. }
  190. private Map<String, String> urlParamToMap(String params) {
  191. HashMap<String, String> map = new HashMap<>();
  192. if (ObjectUtils.isEmpty(params)) {
  193. return map;
  194. }
  195. String[] paramsArray = params.split("&");
  196. if (paramsArray.length == 0) {
  197. return map;
  198. }
  199. for (String param : paramsArray) {
  200. String[] paramArray = param.split("=");
  201. if (paramArray.length == 2) {
  202. map.put(paramArray[0], paramArray[1]);
  203. }
  204. }
  205. return map;
  206. }
  207. @Override
  208. public boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) {
  209. boolean result = false;
  210. // 国标类型的流
  211. if ("rtp".equals(app)) {
  212. result = userSetting.getStreamOnDemand();
  213. // 国标流, 点播/录像回放/录像下载
  214. InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
  215. // 点播
  216. if (inviteInfo != null) {
  217. // 录像下载
  218. if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
  219. return false;
  220. }
  221. // 收到无人观看说明流也没有在往上级推送
  222. if (redisCatchStorage.isChannelSendingRTP(inviteInfo.getChannelId())) {
  223. List<SendRtpItem> sendRtpItems = redisCatchStorage.querySendRTPServerByChannelId(
  224. inviteInfo.getChannelId());
  225. if (!sendRtpItems.isEmpty()) {
  226. for (SendRtpItem sendRtpItem : sendRtpItems) {
  227. ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(sendRtpItem.getPlatformId());
  228. try {
  229. commanderForPlatform.streamByeCmd(parentPlatform, sendRtpItem.getCallId());
  230. } catch (SipException | InvalidArgumentException | ParseException e) {
  231. logger.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
  232. }
  233. redisCatchStorage.deleteSendRTPServer(parentPlatform.getServerGBId(), sendRtpItem.getChannelId(),
  234. sendRtpItem.getCallId(), sendRtpItem.getStream());
  235. if (InviteStreamType.PUSH == sendRtpItem.getPlayType()) {
  236. redisCatchStorage.sendPlatformStopPlayMsg(sendRtpItem,parentPlatform);
  237. }
  238. }
  239. }
  240. }
  241. Device device = deviceService.getDevice(inviteInfo.getDeviceId());
  242. if (device != null) {
  243. try {
  244. // 多查询一次防止已经被处理了
  245. InviteInfo info = inviteStreamService.getInviteInfo(inviteInfo.getType(),
  246. inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  247. if (info != null) {
  248. commander.streamByeCmd(device, inviteInfo.getChannelId(),
  249. inviteInfo.getStream(), null);
  250. } else {
  251. logger.info("[无人观看] 未找到设备的点播信息: {}, 流:{}", inviteInfo.getDeviceId(), stream);
  252. }
  253. } catch (InvalidArgumentException | ParseException | SipException |
  254. SsrcTransactionNotFoundException e) {
  255. logger.error("[无人观看]点播, 发送BYE失败 {}", e.getMessage());
  256. }
  257. } else {
  258. logger.info("[无人观看] 未找到设备: {},流:{}", inviteInfo.getDeviceId(), stream);
  259. }
  260. inviteStreamService.removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
  261. inviteInfo.getChannelId(), inviteInfo.getStream());
  262. storager.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
  263. return result;
  264. }
  265. SendRtpItem sendRtpItem = redisCatchStorage.querySendRTPServer(null, null, stream, null);
  266. if (sendRtpItem != null && "talk".equals(sendRtpItem.getApp())) {
  267. return false;
  268. }
  269. } else if ("talk".equals(app) || "broadcast".equals(app)) {
  270. return false;
  271. } else {
  272. // 非国标流 推流/拉流代理
  273. // 拉流代理
  274. StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
  275. if (streamProxyItem != null) {
  276. if (streamProxyItem.isEnableRemoveNoneReader()) {
  277. // 无人观看自动移除
  278. result = true;
  279. streamProxyService.del(app, stream);
  280. String url = streamProxyItem.getUrl() != null ? streamProxyItem.getUrl() : streamProxyItem.getSrcUrl();
  281. logger.info("[{}/{}]<-[{}] 拉流代理无人观看已经移除", app, stream, url);
  282. } else if (streamProxyItem.isEnableDisableNoneReader()) {
  283. // 无人观看停用
  284. result = true;
  285. // 修改数据
  286. streamProxyService.stop(app, stream);
  287. } else {
  288. // 无人观看不做处理
  289. result = false;
  290. }
  291. }
  292. }
  293. return result;
  294. }
  295. }