InviteStreamServiceImpl.java 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.genersoft.iot.vmp.common.InviteInfo;
  4. import com.genersoft.iot.vmp.common.InviteSessionStatus;
  5. import com.genersoft.iot.vmp.common.InviteSessionType;
  6. import com.genersoft.iot.vmp.common.VideoManagerConstants;
  7. import com.genersoft.iot.vmp.service.IInviteStreamService;
  8. import com.genersoft.iot.vmp.service.bean.ErrorCallback;
  9. import com.genersoft.iot.vmp.utils.redis.RedisUtil;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.data.redis.core.RedisTemplate;
  14. import org.springframework.stereotype.Service;
  15. import java.util.List;
  16. import java.util.Map;
  17. import java.util.concurrent.ConcurrentHashMap;
  18. import java.util.concurrent.CopyOnWriteArrayList;
  19. @Service
  20. public class InviteStreamServiceImpl implements IInviteStreamService {
  21. private final Logger logger = LoggerFactory.getLogger(InviteStreamServiceImpl.class);
  22. private final Map<String, List<ErrorCallback<Object>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
  23. @Autowired
  24. private RedisTemplate<Object, Object> redisTemplate;
  25. @Override
  26. public void updateInviteInfo(InviteInfo inviteInfo) {
  27. if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {
  28. logger.warn("[更新Invite信息],参数不全: {}", JSON.toJSON(inviteInfo));
  29. return;
  30. }
  31. InviteInfo inviteInfoForUpdate = null;
  32. if (InviteSessionStatus.ready == inviteInfo.getStatus()) {
  33. if (inviteInfo.getDeviceId() == null
  34. || inviteInfo.getChannelId() == null
  35. || inviteInfo.getType() == null
  36. || inviteInfo.getStream() == null
  37. ) {
  38. return;
  39. }
  40. inviteInfoForUpdate = inviteInfo;
  41. } else {
  42. InviteInfo inviteInfoInRedis = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(),
  43. inviteInfo.getChannelId(), inviteInfo.getStream());
  44. if (inviteInfoInRedis == null) {
  45. logger.warn("[更新Invite信息],未从缓存中读取到Invite信息: deviceId: {}, channel: {}, stream: {}",
  46. inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  47. return;
  48. }
  49. if (inviteInfo.getStreamInfo() != null) {
  50. inviteInfoInRedis.setStreamInfo(inviteInfo.getStreamInfo());
  51. }
  52. if (inviteInfo.getSsrcInfo() != null) {
  53. inviteInfoInRedis.setSsrcInfo(inviteInfo.getSsrcInfo());
  54. }
  55. if (inviteInfo.getStreamMode() != null) {
  56. inviteInfoInRedis.setStreamMode(inviteInfo.getStreamMode());
  57. }
  58. if (inviteInfo.getReceiveIp() != null) {
  59. inviteInfoInRedis.setReceiveIp(inviteInfo.getReceiveIp());
  60. }
  61. if (inviteInfo.getReceivePort() != null) {
  62. inviteInfoInRedis.setReceivePort(inviteInfo.getReceivePort());
  63. }
  64. if (inviteInfo.getStatus() != null) {
  65. inviteInfoInRedis.setStatus(inviteInfo.getStatus());
  66. }
  67. inviteInfoForUpdate = inviteInfoInRedis;
  68. }
  69. String key = VideoManagerConstants.INVITE_PREFIX +
  70. ":" + inviteInfoForUpdate.getType() +
  71. ":" + inviteInfoForUpdate.getDeviceId() +
  72. ":" + inviteInfoForUpdate.getChannelId() +
  73. ":" + inviteInfoForUpdate.getStream()+
  74. ":" + inviteInfoForUpdate.getSsrcInfo().getSsrc();
  75. redisTemplate.opsForValue().set(key, inviteInfoForUpdate);
  76. }
  77. @Override
  78. public InviteInfo updateInviteInfoForStream(InviteInfo inviteInfo, String stream) {
  79. InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  80. if (inviteInfoInDb == null) {
  81. return null;
  82. }
  83. removeInviteInfo(inviteInfoInDb);
  84. String key = VideoManagerConstants.INVITE_PREFIX +
  85. ":" + inviteInfo.getType() +
  86. ":" + inviteInfo.getDeviceId() +
  87. ":" + inviteInfo.getChannelId() +
  88. ":" + stream +
  89. ":" + inviteInfo.getSsrcInfo().getSsrc();
  90. inviteInfoInDb.setStream(stream);
  91. if (inviteInfoInDb.getSsrcInfo() != null) {
  92. inviteInfoInDb.getSsrcInfo().setStream(stream);
  93. }
  94. redisTemplate.opsForValue().set(key, inviteInfoInDb);
  95. return inviteInfoInDb;
  96. }
  97. @Override
  98. public InviteInfo getInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  99. String key = VideoManagerConstants.INVITE_PREFIX +
  100. ":" + (type != null ? type : "*") +
  101. ":" + (deviceId != null ? deviceId : "*") +
  102. ":" + (channelId != null ? channelId : "*") +
  103. ":" + (stream != null ? stream : "*")
  104. + ":*";
  105. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  106. if (scanResult.size() != 1) {
  107. return null;
  108. }
  109. return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
  110. }
  111. @Override
  112. public InviteInfo getInviteInfoByDeviceAndChannel(InviteSessionType type, String deviceId, String channelId) {
  113. return getInviteInfo(type, deviceId, channelId, null);
  114. }
  115. @Override
  116. public InviteInfo getInviteInfoByStream(InviteSessionType type, String stream) {
  117. return getInviteInfo(type, null, null, stream);
  118. }
  119. @Override
  120. public void removeInviteInfo(InviteSessionType type, String deviceId, String channelId, String stream) {
  121. String scanKey = VideoManagerConstants.INVITE_PREFIX +
  122. ":" + (type != null ? type : "*") +
  123. ":" + (deviceId != null ? deviceId : "*") +
  124. ":" + (channelId != null ? channelId : "*") +
  125. ":" + (stream != null ? stream : "*") +
  126. ":*";
  127. List<Object> scanResult = RedisUtil.scan(redisTemplate, scanKey);
  128. if (scanResult.size() > 0) {
  129. for (Object keyObj : scanResult) {
  130. String key = (String) keyObj;
  131. InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(key);
  132. if (inviteInfo == null) {
  133. continue;
  134. }
  135. redisTemplate.delete(key);
  136. inviteErrorCallbackMap.remove(buildKey(type, deviceId, channelId, inviteInfo.getStream()));
  137. }
  138. }
  139. }
  140. @Override
  141. public void removeInviteInfoByDeviceAndChannel(InviteSessionType inviteSessionType, String deviceId, String channelId) {
  142. removeInviteInfo(inviteSessionType, deviceId, channelId, null);
  143. }
  144. @Override
  145. public void removeInviteInfo(InviteInfo inviteInfo) {
  146. removeInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  147. }
  148. @Override
  149. public void once(InviteSessionType type, String deviceId, String channelId, String stream, ErrorCallback<Object> callback) {
  150. String key = buildKey(type, deviceId, channelId, stream);
  151. List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  152. if (callbacks == null) {
  153. callbacks = new CopyOnWriteArrayList<>();
  154. inviteErrorCallbackMap.put(key, callbacks);
  155. }
  156. callbacks.add(callback);
  157. }
  158. private String buildKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  159. String key = type + ":" + deviceId + ":" + channelId;
  160. // 如果ssrc未null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  161. if (stream != null) {
  162. key += (":" + stream);
  163. }
  164. return key;
  165. }
  166. @Override
  167. public void clearInviteInfo(String deviceId) {
  168. removeInviteInfo(null, deviceId, null, null);
  169. }
  170. @Override
  171. public int getStreamInfoCount(String mediaServerId) {
  172. int count = 0;
  173. String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:*";
  174. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  175. if (scanResult.size() == 0) {
  176. return 0;
  177. }else {
  178. for (Object keyObj : scanResult) {
  179. String keyStr = (String) keyObj;
  180. InviteInfo inviteInfo = (InviteInfo) redisTemplate.opsForValue().get(keyStr);
  181. if (inviteInfo != null && inviteInfo.getStreamInfo() != null && inviteInfo.getStreamInfo().getMediaServerId().equals(mediaServerId)) {
  182. count++;
  183. }
  184. }
  185. }
  186. return count;
  187. }
  188. @Override
  189. public void call(InviteSessionType type, String deviceId, String channelId, String stream, int code, String msg, Object data) {
  190. String key = buildSubStreamKey(type, deviceId, channelId, stream);
  191. List<ErrorCallback<Object>> callbacks = inviteErrorCallbackMap.get(key);
  192. if (callbacks == null) {
  193. return;
  194. }
  195. for (ErrorCallback<Object> callback : callbacks) {
  196. callback.run(code, msg, data);
  197. }
  198. inviteErrorCallbackMap.remove(key);
  199. }
  200. private String buildSubStreamKey(InviteSessionType type, String deviceId, String channelId, String stream) {
  201. String key = type + ":" + ":" + deviceId + ":" + channelId;
  202. // 如果ssrc为null那么可以实现一个通道只能一次操作,ssrc不为null则可以支持一个通道多次invite
  203. if (stream != null) {
  204. key += (":" + stream);
  205. }
  206. return key;
  207. }
  208. @Override
  209. public InviteInfo getInviteInfoBySSRC(String ssrc) {
  210. String key = VideoManagerConstants.INVITE_PREFIX + ":*:*:*:*:" + ssrc;
  211. List<Object> scanResult = RedisUtil.scan(redisTemplate, key);
  212. if (scanResult.size() != 1) {
  213. return null;
  214. }
  215. return (InviteInfo) redisTemplate.opsForValue().get(scanResult.get(0));
  216. }
  217. @Override
  218. public InviteInfo updateInviteInfoForSSRC(InviteInfo inviteInfo, String ssrc) {
  219. InviteInfo inviteInfoInDb = getInviteInfo(inviteInfo.getType(), inviteInfo.getDeviceId(), inviteInfo.getChannelId(), inviteInfo.getStream());
  220. if (inviteInfoInDb == null) {
  221. return null;
  222. }
  223. removeInviteInfo(inviteInfoInDb);
  224. String key = VideoManagerConstants.INVITE_PREFIX +
  225. ":" + inviteInfo.getType() +
  226. ":" + inviteInfo.getDeviceId() +
  227. ":" + inviteInfo.getChannelId() +
  228. ":" + inviteInfo.getStream() +
  229. ":" + inviteInfo.getSsrcInfo().getSsrc();
  230. if (inviteInfoInDb.getSsrcInfo() != null) {
  231. inviteInfoInDb.getSsrcInfo().setSsrc(ssrc);
  232. }
  233. redisTemplate.opsForValue().set(key, inviteInfoInDb);
  234. return inviteInfoInDb;
  235. }
  236. }