InviteStreamServiceImpl.java 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.baomidou.dynamic.datasource.annotation.DS;
  4. import com.genersoft.iot.vmp.common.InviteInfo;
  5. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  6. import com.genersoft.iot.vmp.common.InviteSessionType;
  7. import com.genersoft.iot.vmp.common.VideoManagerConstants;
  8. import com.genersoft.iot.vmp.conf.UserSetting;
  9. import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
  10. import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
  11. import com.genersoft.iot.vmp.service.IInviteStreamService;
  12. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  13. import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
  14. import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.context.event.EventListener;
  19. import org.springframework.data.redis.core.RedisTemplate;
  20. import org.springframework.scheduling.annotation.Async;
  21. import org.springframework.stereotype.Service;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.concurrent.ConcurrentHashMap;
  25. import java.util.concurrent.CopyOnWriteArrayList;
  26. import java.util.concurrent.TimeUnit;
  27. @Service
  28. @DS("master")
  29. public class InviteStreamServiceImpl implements IInviteStreamService {
  30. private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
  31. private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
  32. @Autowired
  33. private RedisTemplate<Object, Object> redisTemplate;
  34. @Autowired
  35. private IVideoManagerStorage storage;
  36. @Autowired
  37. private UserSetting userSetting;
  38. /**
  39. * 流到来的处理
  40. */
  41. @Async("taskExecutor")
  42. @org.springframework.context.event.EventListener
  43. public void onApplicationEvent(MediaArrivalEvent event) {
  44. // if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
  45. //
  46. // }
  47. }
  48. /**
  49. * 流离开的处理
  50. */
  51. @Async("taskExecutor")
  52. @EventListener
  53. public void onApplicationEvent(MediaDepartureEvent event) {
  54. if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
  55. InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
  56. if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
  57. removeInviteInfo(inviteInfo);
  58. storage.stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
  59. }
  60. }
  61. }
  62. @Override
  63. public void updateInviteInfo(InviteInfo inviteInfo) {
  64. if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
  65. updateInviteInfo(inviteInfo, Long.valueOf(userSetting.getPlayTimeout()) * 2);
  66. }else {
  67. updateInviteInfo(inviteInfo, null);
  68. }
  69. }
  70. @Override
  71. public void updateInviteInfo(InviteInfo inviteInfo, Long time) {
  72. if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
  73. logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
  74. return;
  75. }
  76. InviteInfo inviteInfoForUpdate = null;
  77. if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
  78. if (inviteInfo.getDeviceId() == null
  79. || inviteInfo.getChannelId() == null
  80. || inviteInfo.getType() == null
  81. || inviteInfo.getStream() == null
  82. ) {
  83. return;
  84. }
  85. inviteInfoForUpdate = inviteInfo;
  86. } else {
  87. InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
  88. inviteInfo.getChannelId(), inviteInfo.getStream());
  89. if (inviteInfoInRedis == null) {
  90. logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}",
  91. inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  92. return;
  93. }
  94. if (inviteInfo.getStreamInfo() != null) {
  95. inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
  96. }
  97. if (inviteInfo.getSsrcInfo() != null) {
  98. inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
  99. }
  100. if (inviteInfo.getStreamMode() != null) {
  101. inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
  102. }
  103. if (inviteInfo.getReceiveIp() != null) {
  104. inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
  105. }
  106. if (inviteInfo.getReceivePort() != null) {
  107. inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
  108. }
  109. if (inviteInfo.getStatus() != null) {
  110. inviteInfoInRedis.setStatus(inviteInfo.getStatus());
  111. }
  112. inviteInfoForUpdate = inviteInfoInRedis;
  113. }
  114. String key = VideoManagerConstants.INVITE_PREFIX +
  115. ":" + inviteInfoForUpdate.getType() +
  116. ":" + inviteInfoForUpdate.getDeviceId() +
  117. ":" + inviteInfoForUpdate.getChannelId() +
  118. ":" + inviteInfoForUpdate.getStream()+
  119. ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
  120. if (time != null && time > 0) {
  121. redisTemplate.opsForValue().set(key, inviteInfoForUpdate, time, TimeUnit.SECONDS);
  122. }else {
  123. redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
  124. }
  125. }
  126. @Override
  127. public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
  128. InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  129. if (inviteInfoInDb == null) {
  130. return null;
  131. }
  132. removeInviteInfo(inviteInfoInDb);
  133. String key = VideoManagerConstants.INVITE_PREFIX +
  134. ":" + inviteInfo.getType() +
  135. ":" + inviteInfo.getDeviceId() +
  136. ":" + inviteInfo.getChannelId() +
  137. ":" + stream +
  138. ":" + inviteInfo.getSsrcInfo().getSsrc();
  139. inviteInfoInDb.setStream(stream);
  140. if (inviteInfoInDb.getSsrcInfo() != null) {
  141. inviteInfoInDb.getSsrcInfo().setStream(stream);
  142. }
  143. if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
  144. redisTemplate.opsForValue().set(key, inviteInfoInDb, userSetting.getPlayTimeout() * 2, TimeUnit.SECONDS);
  145. }else {
  146. redisTemplate.opsForValue().set(key, inviteInfoInDb);
  147. }
  148. return inviteInfoInDb;
  149. }
  150. @Override
  151. public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  152. String key = VideoManagerConstants.INVITE_PREFIX +
  153. ":" + (type != null ? type : "*") +
  154. ":" + (deviceId != null ? deviceId : "*") +
  155. ":" + (channelId != null ? channelId : "*") +
  156. ":" + (stream != null ? stream : "*")
  157. + ":*";
  158. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  159. if (scanResult.isEmpty()) {
  160. return null;
  161. }
  162. if (scanResult.size() != 1) {
  163. logger.warn("[获取InviteInfo] 发现 key: {}存在多条", key);
  164. }
  165. return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
  166. }
  167. @Override
  168. public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) {
  169. return getInviteInfo(type, deviceId, channelId, null);
  170. }
  171. @Override
  172. public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) {
  173. return getInviteInfo(type, null, null, stream);
  174. }
  175. @Override
  176. public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  177. String scanKey = VideoManagerConstants.INVITE_PREFIX +
  178. ":" + (type != null ? type : "*") +
  179. ":" + (deviceId != null ? deviceId : "*") +
  180. ":" + (channelId != null ? channelId : "*") +
  181. ":" + (stream != null ? stream : "*") +
  182. ":*";
  183. List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
  184. if (scanResult.size() > 0) {
  185. for (Object keyObj : scanResult) {
  186. String key = (String) keyObj;
  187. InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
  188. if (inviteInfo == null) {
  189. continue;
  190. }
  191. redisTemplate.delete(key);
  192. inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
  193. }
  194. }
  195. }
  196. @Override
  197. public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) {
  198. removeInviteInfo(inviteSessionType, deviceId, channelId, null);
  199. }
  200. @Override
  201. public void removeInviteInfo(InviteInfo inviteInfo) {
  202. removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  203. }
  204. @Override
  205. public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) {
  206. String key = buildKey(type, deviceId, channelId, stream);
  207. List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  208. if (callbacks == null) {
  209. callbacks = new CopyOnWriteArrayList<>();
  210. inviteErrorCallbackMap.put(key, callbacks);
  211. }
  212. callbacks.add(callback);
  213. }
  214. private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  215. String key = type + ":" + deviceId + ":" + channelId;
  216. // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  217. if (stream != null) {
  218. key += (":" + stream);
  219. }
  220. return key;
  221. }
  222. @Override
  223. public void clearInviteInfo(String deviceId) {
  224. removeInviteInfo(null, deviceId, null, null);
  225. }
  226. @Override
  227. public int getStreamInfoCount(String mediaServerId) {
  228. int count = 0;
  229. String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
  230. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  231. if (scanResult.size() == 0) {
  232. return 0;
  233. }else {
  234. for (Object keyObj : scanResult) {
  235. String keyStr = (String) keyObj;
  236. InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
  237. if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
  238. if (inviteInfo.getType().equals(InviteSessionType.DOWNLOAD) && inviteInfo.getStreamInfo().getProgress() == 1) {
  239. continue;
  240. }
  241. count++;
  242. }
  243. }
  244. }
  245. return count;
  246. }
  247. @Override
  248. public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
  249. String key = buildSubStreamKey(type, deviceId, channelId, stream);
  250. List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  251. if (callbacks == null) {
  252. return;
  253. }
  254. for (ErrorCallback<Object> callback : callbacks) {
  255. callback.run(code, msg, data);
  256. }
  257. inviteErrorCallbackMap.remove(key);
  258. }
  259. private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  260. String key = type + ":" + ":" + deviceId + ":" + channelId;
  261. // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  262. if (stream != null) {
  263. key += (":" + stream);
  264. }
  265. return key;
  266. }
  267. @Override
  268. public InviteInfo getInviteInfoBySSRC(String ssrc) {
  269. String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
  270. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  271. if (scanResult.size() != 1) {
  272. return null;
  273. }
  274. return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
  275. }
  276. @Override
  277. public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
  278. InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  279. if (inviteInfoInDb == null) {
  280. return null;
  281. }
  282. removeInviteInfo(inviteInfoInDb);
  283. String key = VideoManagerConstants.INVITE_PREFIX +
  284. ":" + inviteInfo.getType() +
  285. ":" + inviteInfo.getDeviceId() +
  286. ":" + inviteInfo.getChannelId() +
  287. ":" + inviteInfo.getStream() +
  288. ":" + ssrc;
  289. if (inviteInfoInDb.getSsrcInfo() != null) {
  290. inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
  291. }
  292. redisTemplate.opsForValue().set(key, inviteInfoInDb);
  293. return inviteInfoInDb;
  294. }
  295. }