StreamPushServiceImpl.java 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.alibaba.fastjson.TypeReference;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.conf.UserSetup;
  7. import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
  8. import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  9. import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  10. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  11. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  12. import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
  13. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  14. import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
  15. import com.genersoft.iot.vmp.media.zlm.dto.*;
  16. import com.genersoft.iot.vmp.service.IGbStreamService;
  17. import com.genersoft.iot.vmp.service.IMediaServerService;
  18. import com.genersoft.iot.vmp.service.IStreamPushService;
  19. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  20. import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
  21. import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
  22. import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
  23. import com.genersoft.iot.vmp.storager.dao.StreamPushMapper;
  24. import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
  25. import com.github.pagehelper.PageHelper;
  26. import com.github.pagehelper.PageInfo;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.stereotype.Service;
  29. import org.springframework.util.StringUtils;
  30. import java.util.*;
  31. @Service
  32. public class StreamPushServiceImpl implements IStreamPushService {
  33. @Autowired
  34. private GbStreamMapper gbStreamMapper;
  35. @Autowired
  36. private StreamPushMapper streamPushMapper;
  37. @Autowired
  38. private ParentPlatformMapper parentPlatformMapper;
  39. @Autowired
  40. private PlatformGbStreamMapper platformGbStreamMapper;
  41. @Autowired
  42. private IGbStreamService gbStreamService;
  43. @Autowired
  44. private EventPublisher eventPublisher;
  45. @Autowired
  46. private ZLMRESTfulUtils zlmresTfulUtils;
  47. @Autowired
  48. private IRedisCatchStorage redisCatchStorage;
  49. @Autowired
  50. private UserSetup userSetup;
  51. @Autowired
  52. private IMediaServerService mediaServerService;
  53. @Override
  54. public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
  55. if (jsonData == null) return null;
  56. Map<String, StreamPushItem> result = new HashMap<>();
  57. List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {});
  58. for (MediaItem item : mediaItems) {
  59. // 不保存国标推理以及拉流代理的流
  60. if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
  61. || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
  62. || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
  63. String key = item.getApp() + "_" + item.getStream();
  64. StreamPushItem streamPushItem = result.get(key);
  65. if (streamPushItem == null) {
  66. streamPushItem = transform(item);
  67. result.put(key, streamPushItem);
  68. }
  69. }
  70. }
  71. return new ArrayList<>(result.values());
  72. }
  73. @Override
  74. public StreamPushItem transform(MediaItem item) {
  75. StreamPushItem streamPushItem = new StreamPushItem();
  76. streamPushItem.setApp(item.getApp());
  77. streamPushItem.setMediaServerId(item.getMediaServerId());
  78. streamPushItem.setStream(item.getStream());
  79. streamPushItem.setAliveSecond(item.getAliveSecond());
  80. streamPushItem.setCreateStamp(item.getCreateStamp());
  81. streamPushItem.setOriginSock(item.getOriginSock());
  82. streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
  83. streamPushItem.setOriginType(item.getOriginType());
  84. streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
  85. streamPushItem.setOriginUrl(item.getOriginUrl());
  86. streamPushItem.setCreateStamp(item.getCreateStamp());
  87. streamPushItem.setAliveSecond(item.getAliveSecond());
  88. streamPushItem.setStatus(true);
  89. streamPushItem.setStreamType("push");
  90. streamPushItem.setVhost(item.getVhost());
  91. return streamPushItem;
  92. }
  93. @Override
  94. public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
  95. PageHelper.startPage(page, count);
  96. List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
  97. return new PageInfo<>(all);
  98. }
  99. @Override
  100. public List<StreamPushItem> getPushList(String mediaServerId) {
  101. return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  102. }
  103. @Override
  104. public boolean saveToGB(GbStream stream) {
  105. stream.setStreamType("push");
  106. stream.setStatus(true);
  107. stream.setCreateStamp(System.currentTimeMillis());
  108. int add = gbStreamMapper.add(stream);
  109. // 查找开启了全部直播流共享的上级平台
  110. List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
  111. if (parentPlatforms.size() > 0) {
  112. for (ParentPlatform parentPlatform : parentPlatforms) {
  113. stream.setCatalogId(parentPlatform.getCatalogId());
  114. stream.setPlatformId(parentPlatform.getServerGBId());
  115. String streamId = stream.getStream();
  116. StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
  117. if (streamProxyItem == null) {
  118. platformGbStreamMapper.add(stream);
  119. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  120. }else {
  121. if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
  122. // 此流使用另一个国标Id已经与该平台关联,移除此记录
  123. platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
  124. platformGbStreamMapper.add(stream);
  125. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  126. }
  127. }
  128. }
  129. }
  130. return add > 0;
  131. }
  132. @Override
  133. public boolean removeFromGB(GbStream stream) {
  134. // 判断是否需要发送事件
  135. gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
  136. int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  137. platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  138. MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
  139. JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
  140. if (mediaList == null) {
  141. streamPushMapper.del(stream.getApp(), stream.getStream());
  142. }
  143. return del > 0;
  144. }
  145. @Override
  146. public StreamPushItem getPush(String app, String streamId) {
  147. return streamPushMapper.selectOne(app, streamId);
  148. }
  149. @Override
  150. public boolean stop(String app, String streamId) {
  151. StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  152. gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  153. int delStream = streamPushMapper.del(app, streamId);
  154. gbStreamMapper.del(app, streamId);
  155. platformGbStreamMapper.delByAppAndStream(app, streamId);
  156. if (delStream > 0) {
  157. MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
  158. zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
  159. }
  160. return true;
  161. }
  162. @Override
  163. public void zlmServerOnline(String mediaServerId) {
  164. // 同步zlm推流信息
  165. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  166. if (mediaServerItem == null) {
  167. return;
  168. }
  169. // 数据库记录
  170. List<StreamPushItem> pushList = getPushList(mediaServerId);
  171. Map<String, StreamPushItem> pushItemMap = new HashMap<>();
  172. // redis记录
  173. List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  174. Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
  175. if (pushList.size() > 0) {
  176. for (StreamPushItem streamPushItem : pushList) {
  177. if (StringUtils.isEmpty(streamPushItem.getGbId())) {
  178. pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  179. }
  180. }
  181. }
  182. if (mediaItems.size() > 0) {
  183. for (MediaItem mediaItem : mediaItems) {
  184. streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem);
  185. }
  186. }
  187. zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  188. if (mediaList == null) return;
  189. String dataStr = mediaList.getString("data");
  190. Integer code = mediaList.getInteger("code");
  191. List<StreamPushItem> streamPushItems = null;
  192. if (code == 0 ) {
  193. if (dataStr != null) {
  194. streamPushItems = handleJSON(dataStr, mediaServerItem);
  195. }
  196. }
  197. if (streamPushItems != null) {
  198. for (StreamPushItem streamPushItem : streamPushItems) {
  199. pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  200. streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  201. }
  202. }
  203. List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
  204. if (offlinePushItems.size() > 0) {
  205. String type = "PUSH";
  206. int runLimit = 300;
  207. if (offlinePushItems.size() > runLimit) {
  208. for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
  209. int toIndex = i + runLimit;
  210. if (i + runLimit > offlinePushItems.size()) {
  211. toIndex = offlinePushItems.size();
  212. }
  213. List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
  214. streamPushMapper.delAll(streamPushItemsSub);
  215. }
  216. }else {
  217. streamPushMapper.delAll(offlinePushItems);
  218. }
  219. }
  220. Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values();
  221. if (offlineMediaItemList.size() > 0) {
  222. String type = "PUSH";
  223. for (MediaItem offlineMediaItem : offlineMediaItemList) {
  224. JSONObject jsonObject = new JSONObject();
  225. jsonObject.put("serverId", userSetup.getServerId());
  226. jsonObject.put("app", offlineMediaItem.getApp());
  227. jsonObject.put("stream", offlineMediaItem.getStream());
  228. jsonObject.put("register", false);
  229. jsonObject.put("mediaServerId", mediaServerId);
  230. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  231. // 移除redis内流的信息
  232. redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream());
  233. }
  234. }
  235. }));
  236. }
  237. @Override
  238. public void zlmServerOffline(String mediaServerId) {
  239. List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  240. // 移除没有GBId的推流
  241. streamPushMapper.deleteWithoutGBId(mediaServerId);
  242. gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
  243. // 其他的流设置未启用
  244. gbStreamMapper.updateStatusByMediaServerId(mediaServerId, false);
  245. // 发送流停止消息
  246. String type = "PUSH";
  247. // 发送redis消息
  248. List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  249. if (streamInfoList.size() > 0) {
  250. for (MediaItem mediaItem : streamInfoList) {
  251. // 移除redis内流的信息
  252. redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
  253. JSONObject jsonObject = new JSONObject();
  254. jsonObject.put("serverId", userSetup.getServerId());
  255. jsonObject.put("app", mediaItem.getApp());
  256. jsonObject.put("stream", mediaItem.getStream());
  257. jsonObject.put("register", false);
  258. jsonObject.put("mediaServerId", mediaServerId);
  259. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  260. }
  261. }
  262. }
  263. @Override
  264. public void clean() {
  265. }
  266. @Override
  267. public boolean saveToRandomGB() {
  268. List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
  269. long gbId = 100001;
  270. for (StreamPushItem streamPushItem : streamPushItems) {
  271. streamPushItem.setStreamType("push");
  272. streamPushItem.setStatus(true);
  273. streamPushItem.setGbId("34020000004111" + gbId);
  274. streamPushItem.setCreateStamp(System.currentTimeMillis());
  275. gbId ++;
  276. }
  277. int limitCount = 30;
  278. if (streamPushItems.size() > limitCount) {
  279. for (int i = 0; i < streamPushItems.size(); i += limitCount) {
  280. int toIndex = i + limitCount;
  281. if (i + limitCount > streamPushItems.size()) {
  282. toIndex = streamPushItems.size();
  283. }
  284. gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
  285. }
  286. }else {
  287. gbStreamMapper.batchAdd(streamPushItems);
  288. }
  289. return true;
  290. }
  291. @Override
  292. public void batchAdd(List<StreamPushItem> streamPushItems) {
  293. streamPushMapper.addAll(streamPushItems);
  294. gbStreamMapper.batchAdd(streamPushItems);
  295. // 查找开启了全部直播流共享的上级平台
  296. List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
  297. if (parentPlatforms.size() > 0) {
  298. for (StreamPushItem stream : streamPushItems) {
  299. for (ParentPlatform parentPlatform : parentPlatforms) {
  300. stream.setCatalogId(parentPlatform.getCatalogId());
  301. stream.setPlatformId(parentPlatform.getServerGBId());
  302. String streamId = stream.getStream();
  303. StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
  304. if (streamProxyItem == null) {
  305. platformGbStreamMapper.add(stream);
  306. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  307. }else {
  308. if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
  309. // 此流使用另一个国标Id已经与该平台关联,移除此记录
  310. platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
  311. platformGbStreamMapper.add(stream);
  312. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  313. stream.setGbId(streamProxyItem.getGbId());
  314. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.DEL);
  315. }
  316. }
  317. }
  318. }
  319. }
  320. }
  321. @Override
  322. public boolean batchStop(List<GbStream> gbStreams) {
  323. if (gbStreams == null || gbStreams.size() == 0) {
  324. return false;
  325. }
  326. gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
  327. int delStream = streamPushMapper.delAllForGbStream(gbStreams);
  328. gbStreamMapper.batchDelForGbStream(gbStreams);
  329. platformGbStreamMapper.delByGbStreams(gbStreams);
  330. if (delStream > 0) {
  331. for (GbStream gbStream : gbStreams) {
  332. MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
  333. zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  334. }
  335. }
  336. return true;
  337. }
  338. }