StreamPushServiceImpl.java 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.alibaba.fastjson.TypeReference;
  6. import com.genersoft.iot.vmp.conf.UserSetting;
  7. import com.genersoft.iot.vmp.gb28181.bean.*;
  8. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  9. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  10. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  11. import com.genersoft.iot.vmp.media.zlm.dto.*;
  12. import com.genersoft.iot.vmp.service.IGbStreamService;
  13. import com.genersoft.iot.vmp.service.IMediaServerService;
  14. import com.genersoft.iot.vmp.service.IStreamPushService;
  15. import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
  16. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  17. import com.genersoft.iot.vmp.storager.dao.*;
  18. import com.genersoft.iot.vmp.utils.DateUtil;
  19. import com.github.pagehelper.PageHelper;
  20. import com.github.pagehelper.PageInfo;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. import org.springframework.beans.factory.annotation.Autowired;
  24. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  25. import org.springframework.stereotype.Service;
  26. import org.springframework.transaction.TransactionDefinition;
  27. import org.springframework.transaction.TransactionStatus;
  28. import org.springframework.util.StringUtils;
  29. import java.util.*;
  30. import java.util.stream.Collectors;
  31. @Service
  32. public class StreamPushServiceImpl implements IStreamPushService {
  33. private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
  34. @Autowired
  35. private GbStreamMapper gbStreamMapper;
  36. @Autowired
  37. private StreamPushMapper streamPushMapper;
  38. @Autowired
  39. private StreamProxyMapper streamProxyMapper;
  40. @Autowired
  41. private ParentPlatformMapper parentPlatformMapper;
  42. @Autowired
  43. private PlatformCatalogMapper platformCatalogMapper;
  44. @Autowired
  45. private PlatformGbStreamMapper platformGbStreamMapper;
  46. @Autowired
  47. private IGbStreamService gbStreamService;
  48. @Autowired
  49. private EventPublisher eventPublisher;
  50. @Autowired
  51. private ZLMRESTfulUtils zlmresTfulUtils;
  52. @Autowired
  53. private IRedisCatchStorage redisCatchStorage;
  54. @Autowired
  55. private UserSetting userSetting;
  56. @Autowired
  57. private IMediaServerService mediaServerService;
  58. @Autowired
  59. DataSourceTransactionManager dataSourceTransactionManager;
  60. @Autowired
  61. TransactionDefinition transactionDefinition;
  62. @Override
  63. public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
  64. if (jsonData == null) {
  65. return null;
  66. }
  67. Map<String, StreamPushItem> result = new HashMap<>();
  68. List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {});
  69. for (MediaItem item : mediaItems) {
  70. // 不保存国标推理以及拉流代理的流
  71. if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
  72. || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
  73. || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
  74. String key = item.getApp() + "_" + item.getStream();
  75. StreamPushItem streamPushItem = result.get(key);
  76. if (streamPushItem == null) {
  77. streamPushItem = transform(item);
  78. result.put(key, streamPushItem);
  79. }
  80. }
  81. }
  82. return new ArrayList<>(result.values());
  83. }
  84. @Override
  85. public StreamPushItem transform(MediaItem item) {
  86. StreamPushItem streamPushItem = new StreamPushItem();
  87. streamPushItem.setApp(item.getApp());
  88. streamPushItem.setMediaServerId(item.getMediaServerId());
  89. streamPushItem.setStream(item.getStream());
  90. streamPushItem.setAliveSecond(item.getAliveSecond());
  91. streamPushItem.setOriginSock(item.getOriginSock());
  92. streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
  93. streamPushItem.setOriginType(item.getOriginType());
  94. streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
  95. streamPushItem.setOriginUrl(item.getOriginUrl());
  96. streamPushItem.setCreateTime(DateUtil.getNow());
  97. streamPushItem.setAliveSecond(item.getAliveSecond());
  98. streamPushItem.setStatus(true);
  99. streamPushItem.setStreamType("push");
  100. streamPushItem.setVhost(item.getVhost());
  101. streamPushItem.setServerId(item.getSeverId());
  102. return streamPushItem;
  103. }
  104. @Override
  105. public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
  106. PageHelper.startPage(page, count);
  107. List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
  108. return new PageInfo<>(all);
  109. }
  110. @Override
  111. public List<StreamPushItem> getPushList(String mediaServerId) {
  112. return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  113. }
  114. @Override
  115. public boolean saveToGB(GbStream stream) {
  116. stream.setStreamType("push");
  117. stream.setStatus(true);
  118. stream.setCreateTime(DateUtil.getNow());
  119. int add = gbStreamMapper.add(stream);
  120. return add > 0;
  121. }
  122. @Override
  123. public boolean removeFromGB(GbStream stream) {
  124. // 判断是否需要发送事件
  125. gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
  126. platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  127. int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  128. MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
  129. JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
  130. if (mediaList != null) {
  131. if (mediaList.getInteger("code") == 0) {
  132. JSONArray data = mediaList.getJSONArray("data");
  133. if (data == null) {
  134. streamPushMapper.del(stream.getApp(), stream.getStream());
  135. }
  136. }
  137. }
  138. return del > 0;
  139. }
  140. @Override
  141. public StreamPushItem getPush(String app, String streamId) {
  142. return streamPushMapper.selectOne(app, streamId);
  143. }
  144. @Override
  145. public boolean stop(String app, String streamId) {
  146. StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  147. gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  148. platformGbStreamMapper.delByAppAndStream(app, streamId);
  149. gbStreamMapper.del(app, streamId);
  150. int delStream = streamPushMapper.del(app, streamId);
  151. if (delStream > 0) {
  152. MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
  153. zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
  154. }
  155. return true;
  156. }
  157. @Override
  158. public void zlmServerOnline(String mediaServerId) {
  159. // 同步zlm推流信息
  160. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  161. if (mediaServerItem == null) {
  162. return;
  163. }
  164. // 数据库记录
  165. List<StreamPushItem> pushList = getPushList(mediaServerId);
  166. Map<String, StreamPushItem> pushItemMap = new HashMap<>();
  167. // redis记录
  168. List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  169. Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
  170. if (pushList.size() > 0) {
  171. for (StreamPushItem streamPushItem : pushList) {
  172. if (StringUtils.isEmpty(streamPushItem.getGbId())) {
  173. pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  174. }
  175. }
  176. }
  177. if (mediaItems.size() > 0) {
  178. for (MediaItem mediaItem : mediaItems) {
  179. streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem);
  180. }
  181. }
  182. zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  183. if (mediaList == null) {
  184. return;
  185. }
  186. String dataStr = mediaList.getString("data");
  187. Integer code = mediaList.getInteger("code");
  188. List<StreamPushItem> streamPushItems = null;
  189. if (code == 0 ) {
  190. if (dataStr != null) {
  191. streamPushItems = handleJSON(dataStr, mediaServerItem);
  192. }
  193. }
  194. if (streamPushItems != null) {
  195. for (StreamPushItem streamPushItem : streamPushItems) {
  196. pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  197. streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  198. }
  199. }
  200. List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
  201. if (offlinePushItems.size() > 0) {
  202. String type = "PUSH";
  203. int runLimit = 300;
  204. if (offlinePushItems.size() > runLimit) {
  205. for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
  206. int toIndex = i + runLimit;
  207. if (i + runLimit > offlinePushItems.size()) {
  208. toIndex = offlinePushItems.size();
  209. }
  210. List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
  211. streamPushMapper.delAll(streamPushItemsSub);
  212. }
  213. }else {
  214. streamPushMapper.delAll(offlinePushItems);
  215. }
  216. }
  217. Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values();
  218. if (offlineMediaItemList.size() > 0) {
  219. String type = "PUSH";
  220. for (MediaItem offlineMediaItem : offlineMediaItemList) {
  221. JSONObject jsonObject = new JSONObject();
  222. jsonObject.put("serverId", userSetting.getServerId());
  223. jsonObject.put("app", offlineMediaItem.getApp());
  224. jsonObject.put("stream", offlineMediaItem.getStream());
  225. jsonObject.put("register", false);
  226. jsonObject.put("mediaServerId", mediaServerId);
  227. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  228. // 移除redis内流的信息
  229. redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream());
  230. }
  231. }
  232. }));
  233. }
  234. @Override
  235. public void zlmServerOffline(String mediaServerId) {
  236. List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  237. // 移除没有GBId的推流
  238. streamPushMapper.deleteWithoutGBId(mediaServerId);
  239. gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
  240. // 其他的流设置未启用
  241. streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
  242. streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
  243. // 发送流停止消息
  244. String type = "PUSH";
  245. // 发送redis消息
  246. List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  247. if (streamInfoList.size() > 0) {
  248. for (MediaItem mediaItem : streamInfoList) {
  249. // 移除redis内流的信息
  250. redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
  251. JSONObject jsonObject = new JSONObject();
  252. jsonObject.put("serverId", userSetting.getServerId());
  253. jsonObject.put("app", mediaItem.getApp());
  254. jsonObject.put("stream", mediaItem.getStream());
  255. jsonObject.put("register", false);
  256. jsonObject.put("mediaServerId", mediaServerId);
  257. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  258. }
  259. }
  260. }
  261. @Override
  262. public void clean() {
  263. }
  264. @Override
  265. public boolean saveToRandomGB() {
  266. List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
  267. long gbId = 100001;
  268. for (StreamPushItem streamPushItem : streamPushItems) {
  269. streamPushItem.setStreamType("push");
  270. streamPushItem.setStatus(true);
  271. streamPushItem.setGbId("34020000004111" + gbId);
  272. streamPushItem.setCreateTime(DateUtil.getNow());
  273. gbId ++;
  274. }
  275. int limitCount = 30;
  276. if (streamPushItems.size() > limitCount) {
  277. for (int i = 0; i < streamPushItems.size(); i += limitCount) {
  278. int toIndex = i + limitCount;
  279. if (i + limitCount > streamPushItems.size()) {
  280. toIndex = streamPushItems.size();
  281. }
  282. gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
  283. }
  284. }else {
  285. gbStreamMapper.batchAdd(streamPushItems);
  286. }
  287. return true;
  288. }
  289. @Override
  290. public void batchAdd(List<StreamPushItem> streamPushItems) {
  291. streamPushMapper.addAll(streamPushItems);
  292. gbStreamMapper.batchAdd(streamPushItems);
  293. }
  294. @Override
  295. public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
  296. // 存储数据到stream_push表
  297. streamPushMapper.addAll(streamPushItems);
  298. List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
  299. .filter(streamPushItem-> streamPushItem.getId() != null)
  300. .collect(Collectors.toList());
  301. // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
  302. if (streamPushItemForGbStream.size() > 0) {
  303. gbStreamMapper.batchAdd(streamPushItemForGbStream);
  304. }
  305. // 去除没有ID也就是没有存储到数据库的数据
  306. List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
  307. .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
  308. .collect(Collectors.toList());
  309. if (streamPushItemsForPlatform.size() > 0) {
  310. // 获取所有平台,平台和目录信息一般不会特别大量。
  311. List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
  312. Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
  313. if (parentPlatformList.size() == 0) {
  314. return;
  315. }
  316. for (ParentPlatform platform : parentPlatformList) {
  317. Map<String, PlatformCatalog> catalogMap = new HashMap<>();
  318. // 创建根节点
  319. PlatformCatalog platformCatalog = new PlatformCatalog();
  320. platformCatalog.setId(platform.getServerGBId());
  321. catalogMap.put(platform.getServerGBId(), platformCatalog);
  322. // 查询所有节点信息
  323. List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
  324. if (platformCatalogs.size() > 0) {
  325. for (PlatformCatalog catalog : platformCatalogs) {
  326. catalogMap.put(catalog.getId(), catalog);
  327. }
  328. }
  329. platformInfoMap.put(platform.getServerGBId(), catalogMap);
  330. }
  331. List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
  332. Map<String, List<GbStream>> platformForEvent = new HashMap<>();
  333. // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
  334. for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
  335. List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
  336. if (platFormInfoList != null && platFormInfoList.size() > 0) {
  337. for (String[] platFormInfoArray : platFormInfoList) {
  338. StreamPushItem streamPushItemForPlatform = new StreamPushItem();
  339. streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
  340. if (platFormInfoArray.length > 0) {
  341. // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
  342. // 不存在这个平台,则忽略导入此关联关系
  343. if (platformInfoMap.get(platFormInfoArray[0]) == null
  344. || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
  345. logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
  346. continue;
  347. }
  348. streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
  349. List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
  350. if (gbStreamList == null) {
  351. gbStreamList = new ArrayList<>();
  352. platformForEvent.put(platFormInfoArray[0], gbStreamList);
  353. }
  354. // 为发送通知整理数据
  355. streamPushItemForPlatform.setName(streamPushItem.getName());
  356. streamPushItemForPlatform.setApp(streamPushItem.getApp());
  357. streamPushItemForPlatform.setStream(streamPushItem.getStream());
  358. streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
  359. gbStreamList.add(streamPushItemForPlatform);
  360. }
  361. if (platFormInfoArray.length > 1) {
  362. streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
  363. }
  364. streamPushItemListFroPlatform.add(streamPushItemForPlatform);
  365. }
  366. }
  367. }
  368. if (streamPushItemListFroPlatform.size() > 0) {
  369. platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
  370. // 发送通知
  371. for (String platformId : platformForEvent.keySet()) {
  372. eventPublisher.catalogEventPublishForStream(
  373. platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
  374. }
  375. }
  376. }
  377. }
  378. @Override
  379. public boolean batchStop(List<GbStream> gbStreams) {
  380. if (gbStreams == null || gbStreams.size() == 0) {
  381. return false;
  382. }
  383. gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
  384. platformGbStreamMapper.delByGbStreams(gbStreams);
  385. gbStreamMapper.batchDelForGbStream(gbStreams);
  386. int delStream = streamPushMapper.delAllForGbStream(gbStreams);
  387. if (delStream > 0) {
  388. for (GbStream gbStream : gbStreams) {
  389. MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
  390. zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  391. }
  392. }
  393. return true;
  394. }
  395. @Override
  396. public void allStreamOffline() {
  397. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
  398. if (onlinePushers.size() == 0) {
  399. return;
  400. }
  401. streamPushMapper.setAllStreamOffline();
  402. // 发送通知
  403. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  404. }
  405. @Override
  406. public void offline(List<StreamPushItemFromRedis> offlineStreams) {
  407. // 更新部分设备离线
  408. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
  409. streamPushMapper.offline(offlineStreams);
  410. // 发送通知
  411. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  412. }
  413. @Override
  414. public void online(List<StreamPushItemFromRedis> onlineStreams) {
  415. // 更新部分设备上线streamPushService
  416. List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
  417. streamPushMapper.online(onlineStreams);
  418. // 发送通知
  419. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
  420. }
  421. @Override
  422. public boolean add(StreamPushItem stream) {
  423. stream.setUpdateTime(DateUtil.getNow());
  424. stream.setCreateTime(DateUtil.getNow());
  425. stream.setServerId(userSetting.getServerId());
  426. // 放在事务内执行
  427. boolean result = false;
  428. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  429. try {
  430. int addStreamResult = streamPushMapper.add(stream);
  431. if (!StringUtils.isEmpty(stream.getGbId())) {
  432. gbStreamMapper.add(stream);
  433. }
  434. dataSourceTransactionManager.commit(transactionStatus);
  435. result = true;
  436. }catch (Exception e) {
  437. logger.error("批量移除流与平台的关系时错误", e);
  438. dataSourceTransactionManager.rollback(transactionStatus);
  439. }
  440. return result;
  441. }
  442. }