InviteStreamServiceImpl.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.baomidou.dynamic.datasource.annotation.DS;
  4. import com.genersoft.iot.vmp.common.InviteInfo;
  5. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  6. import com.genersoft.iot.vmp.common.InviteSessionType;
  7. import com.genersoft.iot.vmp.common.VideoManagerConstants;
  8. import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
  9. import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
  10. import com.genersoft.iot.vmp.service.IInviteStreamService;
  11. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  12. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  13. import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  14. import org.slf4j.Logger;
  15. import org.slf4j.LoggerFactory;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.context.event.EventListener;
  18. import org.springframework.data.redis.core.RedisTemplate;
  19. import org.springframework.scheduling.annotation.Async;
  20. import org.springframework.stereotype.Service;
  21. import java.util.List;
  22. import java.util.Map;
  23. import java.util.concurrent.ConcurrentHashMap;
  24. import java.util.concurrent.CopyOnWriteArrayList;
  25. @Service
  26. @DS("master")
  27. public class InviteStreamServiceImpl implements IInviteStreamService {
  28. private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
  29. private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
  30. @Autowired
  31. private RedisTemplate<Object, Object> redisTemplate;
  32. @Autowired
  33. private IVideoManagerStorage storage;
  34. /**
  35. * 流到来的处理
  36. */
  37. @Async("taskExecutor")
  38. @org.springframework.context.event.EventListener
  39. public void onApplicationEvent(MediaArrivalEvent event) {
  40. // if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
  41. //
  42. // }
  43. }
  44. /**
  45. * 流离开的处理
  46. */
  47. @Async("taskExecutor")
  48. @EventListener
  49. public void onApplicationEvent(MediaDepartureEvent event) {
  50. if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
  51. InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
  52. if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
  53. removeInviteInfo(inviteInfo);
  54. storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
  55. }
  56. }
  57. }
  58. @Override
  59. public void updateInviteInfo(InviteInfo inviteInfo) {
  60. if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
  61. logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
  62. return;
  63. }
  64. InviteInfo inviteInfoForUpdate = null;
  65. if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
  66. if (inviteInfo.getDeviceId() == null
  67. || inviteInfo.getChannelId() == null
  68. || inviteInfo.getType() == null
  69. || inviteInfo.getStream() == null
  70. ) {
  71. return;
  72. }
  73. inviteInfoForUpdate = inviteInfo;
  74. } else {
  75. InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
  76. inviteInfo.getChannelId(), inviteInfo.getStream());
  77. if (inviteInfoInRedis == null) {
  78. logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}",
  79. inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  80. return;
  81. }
  82. if (inviteInfo.getStreamInfo() != null) {
  83. inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
  84. }
  85. if (inviteInfo.getSsrcInfo() != null) {
  86. inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
  87. }
  88. if (inviteInfo.getStreamMode() != null) {
  89. inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
  90. }
  91. if (inviteInfo.getReceiveIp() != null) {
  92. inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
  93. }
  94. if (inviteInfo.getReceivePort() != null) {
  95. inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
  96. }
  97. if (inviteInfo.getStatus() != null) {
  98. inviteInfoInRedis.setStatus(inviteInfo.getStatus());
  99. }
  100. inviteInfoForUpdate = inviteInfoInRedis;
  101. }
  102. String key = VideoManagerConstants.INVITE_PREFIX +
  103. ":" + inviteInfoForUpdate.getType() +
  104. ":" + inviteInfoForUpdate.getDeviceId() +
  105. ":" + inviteInfoForUpdate.getChannelId() +
  106. ":" + inviteInfoForUpdate.getStream()+
  107. ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
  108. redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
  109. }
  110. @Override
  111. public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
  112. InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  113. if (inviteInfoInDb == null) {
  114. return null;
  115. }
  116. removeInviteInfo(inviteInfoInDb);
  117. String key = VideoManagerConstants.INVITE_PREFIX +
  118. ":" + inviteInfo.getType() +
  119. ":" + inviteInfo.getDeviceId() +
  120. ":" + inviteInfo.getChannelId() +
  121. ":" + stream +
  122. ":" + inviteInfo.getSsrcInfo().getSsrc();
  123. inviteInfoInDb.setStream(stream);
  124. if (inviteInfoInDb.getSsrcInfo() != null) {
  125. inviteInfoInDb.getSsrcInfo().setStream(stream);
  126. }
  127. redisTemplate.opsForValue().set(key, inviteInfoInDb);
  128. return inviteInfoInDb;
  129. }
  130. @Override
  131. public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  132. String key = VideoManagerConstants.INVITE_PREFIX +
  133. ":" + (type != null ? type : "*") +
  134. ":" + (deviceId != null ? deviceId : "*") +
  135. ":" + (channelId != null ? channelId : "*") +
  136. ":" + (stream != null ? stream : "*")
  137. + ":*";
  138. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  139. if (scanResult.isEmpty()) {
  140. return null;
  141. }
  142. if (scanResult.size() != 1) {
  143. logger.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
  144. }
  145. return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
  146. }
  147. @Override
  148. public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) {
  149. return getInviteInfo(type, deviceId, channelId, null);
  150. }
  151. @Override
  152. public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) {
  153. return getInviteInfo(type, null, null, stream);
  154. }
  155. @Override
  156. public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  157. String scanKey = VideoManagerConstants.INVITE_PREFIX +
  158. ":" + (type != null ? type : "*") +
  159. ":" + (deviceId != null ? deviceId : "*") +
  160. ":" + (channelId != null ? channelId : "*") +
  161. ":" + (stream != null ? stream : "*") +
  162. ":*";
  163. List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
  164. if (scanResult.size() > 0) {
  165. for (Object keyObj : scanResult) {
  166. String key = (String) keyObj;
  167. InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
  168. if (inviteInfo == null) {
  169. continue;
  170. }
  171. redisTemplate.delete(key);
  172. inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
  173. }
  174. }
  175. }
  176. @Override
  177. public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) {
  178. removeInviteInfo(inviteSessionType, deviceId, channelId, null);
  179. }
  180. @Override
  181. public void removeInviteInfo(InviteInfo inviteInfo) {
  182. removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  183. }
  184. @Override
  185. public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) {
  186. String key = buildKey(type, deviceId, channelId, stream);
  187. List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  188. if (callbacks == null) {
  189. callbacks = new CopyOnWriteArrayList<>();
  190. inviteErrorCallbackMap.put(key, callbacks);
  191. }
  192. callbacks.add(callback);
  193. }
  194. private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  195. String key = type + ":" + deviceId + ":" + channelId;
  196. // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  197. if (stream != null) {
  198. key += (":" + stream);
  199. }
  200. return key;
  201. }
  202. @Override
  203. public void clearInviteInfo(String deviceId) {
  204. removeInviteInfo(null, deviceId, null, null);
  205. }
  206. @Override
  207. public int getStreamInfoCount(String mediaServerId) {
  208. int count = 0;
  209. String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
  210. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  211. if (scanResult.size() == 0) {
  212. return 0;
  213. }else {
  214. for (Object keyObj : scanResult) {
  215. String keyStr = (String) keyObj;
  216. InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
  217. if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
  218. count++;
  219. }
  220. }
  221. }
  222. return count;
  223. }
  224. @Override
  225. public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
  226. String key = buildSubStreamKey(type, deviceId, channelId, stream);
  227. List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  228. if (callbacks == null) {
  229. return;
  230. }
  231. for (ErrorCallback<Object> callback : callbacks) {
  232. callback.run(code, msg, data);
  233. }
  234. inviteErrorCallbackMap.remove(key);
  235. }
  236. private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  237. String key = type + ":" + ":" + deviceId + ":" + channelId;
  238. // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  239. if (stream != null) {
  240. key += (":" + stream);
  241. }
  242. return key;
  243. }
  244. @Override
  245. public InviteInfo getInviteInfoBySSRC(String ssrc) {
  246. String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
  247. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  248. if (scanResult.size() != 1) {
  249. return null;
  250. }
  251. return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
  252. }
  253. @Override
  254. public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
  255. InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  256. if (inviteInfoInDb == null) {
  257. return null;
  258. }
  259. removeInviteInfo(inviteInfoInDb);
  260. String key = VideoManagerConstants.INVITE_PREFIX +
  261. ":" + inviteInfo.getType() +
  262. ":" + inviteInfo.getDeviceId() +
  263. ":" + inviteInfo.getChannelId() +
  264. ":" + inviteInfo.getStream() +
  265. ":" + ssrc;
  266. if (inviteInfoInDb.getSsrcInfo() != null) {
  267. inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
  268. }
  269. redisTemplate.opsForValue().set(key, inviteInfoInDb);
  270. return inviteInfoInDb;
  271. }
  272. }