StreamPushServiceImpl.java 23 KB


  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.alibaba.fastjson.TypeReference;
  6. import com.genersoft.iot.vmp.conf.UserSetting;
  7. import com.genersoft.iot.vmp.gb28181.bean.*;
  8. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  9. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  10. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  11. import com.genersoft.iot.vmp.media.zlm.dto.*;
  12. import com.genersoft.iot.vmp.service.IGbStreamService;
  13. import com.genersoft.iot.vmp.service.IMediaServerService;
  14. import com.genersoft.iot.vmp.service.IStreamPushService;
  15. import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
  16. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  17. import com.genersoft.iot.vmp.storager.dao.*;
  18. import com.genersoft.iot.vmp.utils.DateUtil;
  19. import com.github.pagehelper.PageHelper;
  20. import com.github.pagehelper.PageInfo;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.stereotype.Service;
  25. import org.springframework.util.StringUtils;
  26. import java.util.*;
  27. import java.util.stream.Collectors;
  28. @Service
  29. public class StreamPushServiceImpl implements IStreamPushService {
  30. private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
  31. @Autowired
  32. private GbStreamMapper gbStreamMapper;
  33. @Autowired
  34. private StreamPushMapper streamPushMapper;
  35. @Autowired
  36. private StreamProxyMapper streamProxyMapper;
  37. @Autowired
  38. private ParentPlatformMapper parentPlatformMapper;
  39. @Autowired
  40. private PlatformCatalogMapper platformCatalogMapper;
  41. @Autowired
  42. private PlatformGbStreamMapper platformGbStreamMapper;
  43. @Autowired
  44. private IGbStreamService gbStreamService;
  45. @Autowired
  46. private EventPublisher eventPublisher;
  47. @Autowired
  48. private ZLMRESTfulUtils zlmresTfulUtils;
  49. @Autowired
  50. private IRedisCatchStorage redisCatchStorage;
  51. @Autowired
  52. private UserSetting userSetting;
  53. @Autowired
  54. private IMediaServerService mediaServerService;
  55. @Override
  56. public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
  57. if (jsonData == null) {
  58. return null;
  59. }
  60. Map<String, StreamPushItem> result = new HashMap<>();
  61. List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {});
  62. for (MediaItem item : mediaItems) {
  63. // 不保存国标推理以及拉流代理的流
  64. if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
  65. || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
  66. || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
  67. String key = item.getApp() + "_" + item.getStream();
  68. StreamPushItem streamPushItem = result.get(key);
  69. if (streamPushItem == null) {
  70. streamPushItem = transform(item);
  71. result.put(key, streamPushItem);
  72. }
  73. }
  74. }
  75. return new ArrayList<>(result.values());
  76. }
  77. @Override
  78. public StreamPushItem transform(MediaItem item) {
  79. StreamPushItem streamPushItem = new StreamPushItem();
  80. streamPushItem.setApp(item.getApp());
  81. streamPushItem.setMediaServerId(item.getMediaServerId());
  82. streamPushItem.setStream(item.getStream());
  83. streamPushItem.setAliveSecond(item.getAliveSecond());
  84. streamPushItem.setOriginSock(item.getOriginSock());
  85. streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
  86. streamPushItem.setOriginType(item.getOriginType());
  87. streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
  88. streamPushItem.setOriginUrl(item.getOriginUrl());
  89. streamPushItem.setCreateTime(DateUtil.getNow());
  90. streamPushItem.setAliveSecond(item.getAliveSecond());
  91. streamPushItem.setStatus(true);
  92. streamPushItem.setStreamType("push");
  93. streamPushItem.setVhost(item.getVhost());
  94. streamPushItem.setServerId(item.getSeverId());
  95. return streamPushItem;
  96. }
  97. @Override
  98. public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
  99. PageHelper.startPage(page, count);
  100. List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
  101. return new PageInfo<>(all);
  102. }
  103. @Override
  104. public List<StreamPushItem> getPushList(String mediaServerId) {
  105. return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  106. }
  107. @Override
  108. public boolean saveToGB(GbStream stream) {
  109. stream.setStreamType("push");
  110. stream.setStatus(true);
  111. stream.setCreateTime(DateUtil.getNow());
  112. int add = gbStreamMapper.add(stream);
  113. // 查找开启了全部直播流共享的上级平台
  114. List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
  115. if (parentPlatforms.size() > 0) {
  116. for (ParentPlatform parentPlatform : parentPlatforms) {
  117. stream.setCatalogId(parentPlatform.getCatalogId());
  118. stream.setPlatformId(parentPlatform.getServerGBId());
  119. String streamId = stream.getStream();
  120. StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
  121. if (streamProxyItem == null) {
  122. platformGbStreamMapper.add(stream);
  123. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  124. }else {
  125. if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
  126. // 此流使用另一个国标Id已经与该平台关联,移除此记录
  127. platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
  128. platformGbStreamMapper.add(stream);
  129. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  130. }
  131. }
  132. }
  133. }
  134. return add > 0;
  135. }
  136. @Override
  137. public boolean removeFromGB(GbStream stream) {
  138. // 判断是否需要发送事件
  139. gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
  140. platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  141. int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  142. MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
  143. JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
  144. if (mediaList != null) {
  145. if (mediaList.getInteger("code") == 0) {
  146. JSONArray data = mediaList.getJSONArray("data");
  147. if (data == null) {
  148. streamPushMapper.del(stream.getApp(), stream.getStream());
  149. }
  150. }
  151. }
  152. return del > 0;
  153. }
  154. @Override
  155. public StreamPushItem getPush(String app, String streamId) {
  156. return streamPushMapper.selectOne(app, streamId);
  157. }
  158. @Override
  159. public boolean stop(String app, String streamId) {
  160. StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  161. gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  162. platformGbStreamMapper.delByAppAndStream(app, streamId);
  163. gbStreamMapper.del(app, streamId);
  164. int delStream = streamPushMapper.del(app, streamId);
  165. if (delStream > 0) {
  166. MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
  167. zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
  168. }
  169. return true;
  170. }
  171. @Override
  172. public void zlmServerOnline(String mediaServerId) {
  173. // 同步zlm推流信息
  174. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  175. if (mediaServerItem == null) {
  176. return;
  177. }
  178. // 数据库记录
  179. List<StreamPushItem> pushList = getPushList(mediaServerId);
  180. Map<String, StreamPushItem> pushItemMap = new HashMap<>();
  181. // redis记录
  182. List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  183. Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
  184. if (pushList.size() > 0) {
  185. for (StreamPushItem streamPushItem : pushList) {
  186. if (StringUtils.isEmpty(streamPushItem.getGbId())) {
  187. pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  188. }
  189. }
  190. }
  191. if (mediaItems.size() > 0) {
  192. for (MediaItem mediaItem : mediaItems) {
  193. streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem);
  194. }
  195. }
  196. zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  197. if (mediaList == null) {
  198. return;
  199. }
  200. String dataStr = mediaList.getString("data");
  201. Integer code = mediaList.getInteger("code");
  202. List<StreamPushItem> streamPushItems = null;
  203. if (code == 0 ) {
  204. if (dataStr != null) {
  205. streamPushItems = handleJSON(dataStr, mediaServerItem);
  206. }
  207. }
  208. if (streamPushItems != null) {
  209. for (StreamPushItem streamPushItem : streamPushItems) {
  210. pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  211. streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  212. }
  213. }
  214. List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
  215. if (offlinePushItems.size() > 0) {
  216. String type = "PUSH";
  217. int runLimit = 300;
  218. if (offlinePushItems.size() > runLimit) {
  219. for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
  220. int toIndex = i + runLimit;
  221. if (i + runLimit > offlinePushItems.size()) {
  222. toIndex = offlinePushItems.size();
  223. }
  224. List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
  225. streamPushMapper.delAll(streamPushItemsSub);
  226. }
  227. }else {
  228. streamPushMapper.delAll(offlinePushItems);
  229. }
  230. }
  231. Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values();
  232. if (offlineMediaItemList.size() > 0) {
  233. String type = "PUSH";
  234. for (MediaItem offlineMediaItem : offlineMediaItemList) {
  235. JSONObject jsonObject = new JSONObject();
  236. jsonObject.put("serverId", userSetting.getServerId());
  237. jsonObject.put("app", offlineMediaItem.getApp());
  238. jsonObject.put("stream", offlineMediaItem.getStream());
  239. jsonObject.put("register", false);
  240. jsonObject.put("mediaServerId", mediaServerId);
  241. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  242. // 移除redis内流的信息
  243. redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream());
  244. }
  245. }
  246. }));
  247. }
  248. @Override
  249. public void zlmServerOffline(String mediaServerId) {
  250. List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  251. // 移除没有GBId的推流
  252. streamPushMapper.deleteWithoutGBId(mediaServerId);
  253. gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
  254. // 其他的流设置未启用
  255. streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
  256. streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
  257. // 发送流停止消息
  258. String type = "PUSH";
  259. // 发送redis消息
  260. List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  261. if (streamInfoList.size() > 0) {
  262. for (MediaItem mediaItem : streamInfoList) {
  263. // 移除redis内流的信息
  264. redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
  265. JSONObject jsonObject = new JSONObject();
  266. jsonObject.put("serverId", userSetting.getServerId());
  267. jsonObject.put("app", mediaItem.getApp());
  268. jsonObject.put("stream", mediaItem.getStream());
  269. jsonObject.put("register", false);
  270. jsonObject.put("mediaServerId", mediaServerId);
  271. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  272. }
  273. }
  274. }
  275. @Override
  276. public void clean() {
  277. }
  278. @Override
  279. public boolean saveToRandomGB() {
  280. List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
  281. long gbId = 100001;
  282. for (StreamPushItem streamPushItem : streamPushItems) {
  283. streamPushItem.setStreamType("push");
  284. streamPushItem.setStatus(true);
  285. streamPushItem.setGbId("34020000004111" + gbId);
  286. streamPushItem.setCreateTime(DateUtil.getNow());
  287. gbId ++;
  288. }
  289. int limitCount = 30;
  290. if (streamPushItems.size() > limitCount) {
  291. for (int i = 0; i < streamPushItems.size(); i += limitCount) {
  292. int toIndex = i + limitCount;
  293. if (i + limitCount > streamPushItems.size()) {
  294. toIndex = streamPushItems.size();
  295. }
  296. gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
  297. }
  298. }else {
  299. gbStreamMapper.batchAdd(streamPushItems);
  300. }
  301. return true;
  302. }
  303. @Override
  304. public void batchAdd(List<StreamPushItem> streamPushItems) {
  305. streamPushMapper.addAll(streamPushItems);
  306. gbStreamMapper.batchAdd(streamPushItems);
  307. // 查找开启了全部直播流共享的上级平台
  308. List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
  309. if (parentPlatforms.size() > 0) {
  310. for (StreamPushItem stream : streamPushItems) {
  311. for (ParentPlatform parentPlatform : parentPlatforms) {
  312. stream.setCatalogId(parentPlatform.getCatalogId());
  313. stream.setPlatformId(parentPlatform.getServerGBId());
  314. String streamId = stream.getStream();
  315. StreamProxyItem streamProxyItem = platformGbStreamMapper.selectOne(stream.getApp(), streamId, parentPlatform.getServerGBId());
  316. if (streamProxyItem == null) {
  317. platformGbStreamMapper.add(stream);
  318. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  319. }else {
  320. if (!streamProxyItem.getGbId().equals(stream.getGbId())) {
  321. // 此流使用另一个国标Id已经与该平台关联,移除此记录
  322. platformGbStreamMapper.delByAppAndStreamAndPlatform(stream.getApp(), streamId, parentPlatform.getServerGBId());
  323. platformGbStreamMapper.add(stream);
  324. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.ADD);
  325. stream.setGbId(streamProxyItem.getGbId());
  326. eventPublisher.catalogEventPublishForStream(parentPlatform.getServerGBId(), stream, CatalogEvent.DEL);
  327. }
  328. }
  329. }
  330. }
  331. }
  332. }
  333. @Override
  334. public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
  335. // 存储数据到stream_push表
  336. streamPushMapper.addAll(streamPushItems);
  337. List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
  338. .filter(streamPushItem-> streamPushItem.getId() != null)
  339. .collect(Collectors.toList());
  340. // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
  341. if (streamPushItemForGbStream.size() > 0) {
  342. gbStreamMapper.batchAdd(streamPushItemForGbStream);
  343. }
  344. // 去除没有ID也就是没有存储到数据库的数据
  345. List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
  346. .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
  347. .collect(Collectors.toList());
  348. if (streamPushItemsForPlatform.size() > 0) {
  349. // 获取所有平台,平台和目录信息一般不会特别大量。
  350. List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
  351. Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
  352. if (parentPlatformList.size() == 0) {
  353. return;
  354. }
  355. for (ParentPlatform platform : parentPlatformList) {
  356. Map<String, PlatformCatalog> catalogMap = new HashMap<>();
  357. // 创建根节点
  358. PlatformCatalog platformCatalog = new PlatformCatalog();
  359. platformCatalog.setId(platform.getServerGBId());
  360. catalogMap.put(platform.getServerGBId(), platformCatalog);
  361. // 查询所有节点信息
  362. List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
  363. if (platformCatalogs.size() > 0) {
  364. for (PlatformCatalog catalog : platformCatalogs) {
  365. catalogMap.put(catalog.getId(), catalog);
  366. }
  367. }
  368. platformInfoMap.put(platform.getServerGBId(), catalogMap);
  369. }
  370. List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
  371. Map<String, List<GbStream>> platformForEvent = new HashMap<>();
  372. // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
  373. for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
  374. List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
  375. if (platFormInfoList != null && platFormInfoList.size() > 0) {
  376. for (String[] platFormInfoArray : platFormInfoList) {
  377. StreamPushItem streamPushItemForPlatform = new StreamPushItem();
  378. streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
  379. if (platFormInfoArray.length > 0) {
  380. // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
  381. // 不存在这个平台,则忽略导入此关联关系
  382. if (platformInfoMap.get(platFormInfoArray[0]) == null
  383. || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
  384. logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
  385. continue;
  386. }
  387. streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
  388. List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
  389. if (gbStreamList == null) {
  390. gbStreamList = new ArrayList<>();
  391. platformForEvent.put(platFormInfoArray[0], gbStreamList);
  392. }
  393. // 为发送通知整理数据
  394. streamPushItemForPlatform.setName(streamPushItem.getName());
  395. streamPushItemForPlatform.setApp(streamPushItem.getApp());
  396. streamPushItemForPlatform.setStream(streamPushItem.getStream());
  397. streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
  398. gbStreamList.add(streamPushItemForPlatform);
  399. }
  400. if (platFormInfoArray.length > 1) {
  401. streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
  402. }
  403. streamPushItemListFroPlatform.add(streamPushItemForPlatform);
  404. }
  405. }
  406. }
  407. if (streamPushItemListFroPlatform.size() > 0) {
  408. platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
  409. // 发送通知
  410. for (String platformId : platformForEvent.keySet()) {
  411. eventPublisher.catalogEventPublishForStream(
  412. platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
  413. }
  414. }
  415. }
  416. }
  417. @Override
  418. public boolean batchStop(List<GbStream> gbStreams) {
  419. if (gbStreams == null || gbStreams.size() == 0) {
  420. return false;
  421. }
  422. gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
  423. platformGbStreamMapper.delByGbStreams(gbStreams);
  424. gbStreamMapper.batchDelForGbStream(gbStreams);
  425. int delStream = streamPushMapper.delAllForGbStream(gbStreams);
  426. if (delStream > 0) {
  427. for (GbStream gbStream : gbStreams) {
  428. MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
  429. zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  430. }
  431. }
  432. return true;
  433. }
  434. @Override
  435. public void allStreamOffline() {
  436. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
  437. if (onlinePushers.size() == 0) {
  438. return;
  439. }
  440. streamPushMapper.setAllStreamOffline();
  441. // 发送通知
  442. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  443. }
  444. @Override
  445. public void offline(List<StreamPushItemFromRedis> offlineStreams) {
  446. // 更新部分设备离线
  447. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
  448. streamPushMapper.offline(offlineStreams);
  449. // 发送通知
  450. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  451. }
  452. @Override
  453. public void online(List<StreamPushItemFromRedis> onlineStreams) {
  454. // 更新部分设备上线streamPushService
  455. List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
  456. streamPushMapper.online(onlineStreams);
  457. // 发送通知
  458. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
  459. }
  460. }