StreamPushServiceImpl.java 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSONObject;
  3. import com.baomidou.dynamic.datasource.annotation.DS;
  4. import com.genersoft.iot.vmp.common.StreamInfo;
  5. import com.genersoft.iot.vmp.conf.MediaConfig;
  6. import com.genersoft.iot.vmp.conf.UserSetting;
  7. import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  8. import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  9. import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
  10. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  11. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  12. import com.genersoft.iot.vmp.media.bean.MediaInfo;
  13. import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
  14. import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
  15. import com.genersoft.iot.vmp.media.service.IMediaServerService;
  16. import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
  17. import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
  18. import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  19. import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
  20. import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
  21. import com.genersoft.iot.vmp.service.IGbStreamService;
  22. import com.genersoft.iot.vmp.service.IStreamPushService;
  23. import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
  24. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  25. import com.genersoft.iot.vmp.storager.dao.*;
  26. import com.genersoft.iot.vmp.utils.DateUtil;
  27. import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
  28. import com.github.pagehelper.PageHelper;
  29. import com.github.pagehelper.PageInfo;
  30. import org.slf4j.Logger;
  31. import org.slf4j.LoggerFactory;
  32. import org.springframework.beans.factory.annotation.Autowired;
  33. import org.springframework.context.event.EventListener;
  34. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  35. import org.springframework.scheduling.annotation.Async;
  36. import org.springframework.stereotype.Service;
  37. import org.springframework.transaction.TransactionDefinition;
  38. import org.springframework.transaction.TransactionStatus;
  39. import org.springframework.util.ObjectUtils;
  40. import java.util.*;
  41. import java.util.stream.Collectors;
  42. @Service
  43. @DS("master")
  44. public class StreamPushServiceImpl implements IStreamPushService {
  45. private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
  46. @Autowired
  47. private GbStreamMapper gbStreamMapper;
  48. @Autowired
  49. private StreamPushMapper streamPushMapper;
  50. @Autowired
  51. private StreamProxyMapper streamProxyMapper;
  52. @Autowired
  53. private ParentPlatformMapper parentPlatformMapper;
  54. @Autowired
  55. private PlatformCatalogMapper platformCatalogMapper;
  56. @Autowired
  57. private PlatformGbStreamMapper platformGbStreamMapper;
  58. @Autowired
  59. private IGbStreamService gbStreamService;
  60. @Autowired
  61. private EventPublisher eventPublisher;
  62. @Autowired
  63. private IRedisCatchStorage redisCatchStorage;
  64. @Autowired
  65. private UserSetting userSetting;
  66. @Autowired
  67. private IMediaServerService mediaServerService;
  68. @Autowired
  69. DataSourceTransactionManager dataSourceTransactionManager;
  70. @Autowired
  71. TransactionDefinition transactionDefinition;
  72. @Autowired
  73. private MediaConfig mediaConfig;
  74. /**
  75. * 流到来的处理
  76. */
  77. @Async("taskExecutor")
  78. @EventListener
  79. public void onApplicationEvent(MediaArrivalEvent event) {
  80. MediaInfo mediaInfo = event.getMediaInfo();
  81. if (mediaInfo == null) {
  82. return;
  83. }
  84. if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
  85. && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
  86. && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
  87. return;
  88. }
  89. StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
  90. if (streamAuthorityInfo == null) {
  91. streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
  92. } else {
  93. streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
  94. }
  95. redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
  96. StreamPushItem transform = StreamPushItem.getInstance(event, userSetting.getServerId());
  97. transform.setPushIng(true);
  98. transform.setUpdateTime(DateUtil.getNow());
  99. transform.setPushTime(DateUtil.getNow());
  100. transform.setSelf(true);
  101. StreamPushItem pushInDb = getPush(event.getApp(), event.getStream());
  102. if (pushInDb == null) {
  103. transform.setCreateTime(DateUtil.getNow());
  104. streamPushMapper.add(transform);
  105. }else {
  106. streamPushMapper.update(transform);
  107. gbStreamMapper.updateMediaServer(event.getApp(), event.getStream(), event.getMediaServer().getId());
  108. }
  109. // ChannelOnlineEvent channelOnlineEventLister = getChannelOnlineEventLister(transform.getApp(), transform.getStream());
  110. // if ( channelOnlineEventLister != null) {
  111. // try {
  112. // channelOnlineEventLister.run(transform.getApp(), transform.getStream(), transform.getServerId());;
  113. // } catch (ParseException e) {
  114. // logger.error("addPush: ", e);
  115. // }
  116. // removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
  117. // }
  118. // 冗余数据,自己系统中自用
  119. redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event);
  120. }
  121. /**
  122. * 流离开的处理
  123. */
  124. @Async("taskExecutor")
  125. @EventListener
  126. public void onApplicationEvent(MediaDepartureEvent event) {
  127. }
  128. private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
  129. if (streamInfoList == null || streamInfoList.isEmpty()) {
  130. return null;
  131. }
  132. Map<String, StreamPushItem> result = new HashMap<>();
  133. for (StreamInfo streamInfo : streamInfoList) {
  134. // 不保存国标推理以及拉流代理的流
  135. if (streamInfo.getOriginType() == OriginType.RTSP_PUSH.ordinal()
  136. || streamInfo.getOriginType() == OriginType.RTMP_PUSH.ordinal()
  137. || streamInfo.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
  138. String key = streamInfo.getApp() + "_" + streamInfo.getStream();
  139. StreamPushItem streamPushItem = result.get(key);
  140. if (streamPushItem == null) {
  141. streamPushItem = streamPushItem.getInstance(streamInfo);
  142. result.put(key, streamPushItem);
  143. }
  144. }
  145. }
  146. return new ArrayList<>(result.values());
  147. }
  148. @Override
  149. public StreamPushItem transform(OnStreamChangedHookParam item) {
  150. StreamPushItem streamPushItem = new StreamPushItem();
  151. streamPushItem.setApp(item.getApp());
  152. streamPushItem.setMediaServerId(item.getMediaServerId());
  153. streamPushItem.setStream(item.getStream());
  154. streamPushItem.setAliveSecond(item.getAliveSecond());
  155. streamPushItem.setOriginSock(item.getOriginSock());
  156. streamPushItem.setTotalReaderCount(item.getTotalReaderCount() + "");
  157. streamPushItem.setOriginType(item.getOriginType());
  158. streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
  159. streamPushItem.setOriginUrl(item.getOriginUrl());
  160. streamPushItem.setCreateTime(DateUtil.getNow());
  161. streamPushItem.setAliveSecond(item.getAliveSecond());
  162. streamPushItem.setStatus(true);
  163. streamPushItem.setStreamType("push");
  164. streamPushItem.setVhost(item.getVhost());
  165. streamPushItem.setServerId(item.getSeverId());
  166. return streamPushItem;
  167. }
  168. @Override
  169. public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
  170. PageHelper.startPage(page, count);
  171. List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
  172. return new PageInfo<>(all);
  173. }
  174. @Override
  175. public List<StreamPushItem> getPushList(String mediaServerId) {
  176. return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  177. }
  178. @Override
  179. public boolean saveToGB(GbStream stream) {
  180. stream.setStreamType("push");
  181. stream.setStatus(true);
  182. stream.setCreateTime(DateUtil.getNow());
  183. stream.setStreamType("push");
  184. stream.setMediaServerId(mediaConfig.getId());
  185. int add = gbStreamMapper.add(stream);
  186. return add > 0;
  187. }
  188. @Override
  189. public boolean removeFromGB(GbStream stream) {
  190. // 判断是否需要发送事件
  191. gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
  192. platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  193. int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  194. MediaServer mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
  195. List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaInfo, stream.getApp(), stream.getStream(), null);
  196. if (mediaList != null && mediaList.isEmpty()) {
  197. streamPushMapper.del(stream.getApp(), stream.getStream());
  198. }
  199. return del > 0;
  200. }
  201. @Override
  202. public StreamPushItem getPush(String app, String streamId) {
  203. return streamPushMapper.selectOne(app, streamId);
  204. }
  205. @Override
  206. public boolean stop(String app, String streamId) {
  207. logger.info("[推流 ] 停止流: {}/{}", app, streamId);
  208. StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  209. if (streamPushItem != null) {
  210. gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  211. }
  212. platformGbStreamMapper.delByAppAndStream(app, streamId);
  213. gbStreamMapper.del(app, streamId);
  214. int delStream = streamPushMapper.del(app, streamId);
  215. if (delStream > 0) {
  216. MediaServer mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
  217. mediaServerService.closeStreams(mediaServerItem,app, streamId);
  218. }
  219. return true;
  220. }
  221. @Override
  222. public void zlmServerOnline(String mediaServerId) {
  223. // 同步zlm推流信息
  224. MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
  225. if (mediaServerItem == null) {
  226. return;
  227. }
  228. // 数据库记录
  229. List<StreamPushItem> pushList = getPushList(mediaServerId);
  230. Map<String, StreamPushItem> pushItemMap = new HashMap<>();
  231. // redis记录
  232. List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  233. Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
  234. if (pushList.size() > 0) {
  235. for (StreamPushItem streamPushItem : pushList) {
  236. if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
  237. pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  238. }
  239. }
  240. }
  241. if (onStreamChangedHookParams.size() > 0) {
  242. for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
  243. streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
  244. }
  245. }
  246. // 获取所有推流鉴权信息,清理过期的
  247. List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo();
  248. Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>();
  249. for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
  250. streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
  251. }
  252. List<StreamInfo> mediaList = mediaServerService.getMediaList(mediaServerItem, null, null, null);
  253. if (mediaList == null) {
  254. return;
  255. }
  256. List<StreamPushItem> streamPushItems = handleJSON(mediaList);
  257. if (streamPushItems != null) {
  258. for (StreamPushItem streamPushItem : streamPushItems) {
  259. pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  260. streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  261. streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  262. }
  263. }
  264. List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
  265. if (offlinePushItems.size() > 0) {
  266. String type = "PUSH";
  267. int runLimit = 300;
  268. if (offlinePushItems.size() > runLimit) {
  269. for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
  270. int toIndex = i + runLimit;
  271. if (i + runLimit > offlinePushItems.size()) {
  272. toIndex = offlinePushItems.size();
  273. }
  274. List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
  275. streamPushMapper.delAll(streamPushItemsSub);
  276. }
  277. }else {
  278. streamPushMapper.delAll(offlinePushItems);
  279. }
  280. }
  281. Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
  282. if (offlineOnStreamChangedHookParamList.size() > 0) {
  283. String type = "PUSH";
  284. for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
  285. JSONObject jsonObject = new JSONObject();
  286. jsonObject.put("serverId", userSetting.getServerId());
  287. jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
  288. jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
  289. jsonObject.put("register", false);
  290. jsonObject.put("mediaServerId", mediaServerId);
  291. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  292. // 移除redis内流的信息
  293. redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
  294. // 冗余数据,自己系统中自用
  295. redisCatchStorage.removePushListItem(offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream(), mediaServerItem.getId());
  296. }
  297. }
  298. Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
  299. if (streamAuthorityInfos.size() > 0) {
  300. for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
  301. // 移除redis内流的信息
  302. redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
  303. }
  304. }
  305. }
  306. @Override
  307. public void zlmServerOffline(String mediaServerId) {
  308. List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  309. // 移除没有GBId的推流
  310. streamPushMapper.deleteWithoutGBId(mediaServerId);
  311. gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
  312. // 其他的流设置未启用
  313. streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
  314. streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
  315. // 发送流停止消息
  316. String type = "PUSH";
  317. // 发送redis消息
  318. List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  319. if (streamInfoList.size() > 0) {
  320. for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
  321. // 移除redis内流的信息
  322. redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
  323. JSONObject jsonObject = new JSONObject();
  324. jsonObject.put("serverId", userSetting.getServerId());
  325. jsonObject.put("app", onStreamChangedHookParam.getApp());
  326. jsonObject.put("stream", onStreamChangedHookParam.getStream());
  327. jsonObject.put("register", false);
  328. jsonObject.put("mediaServerId", mediaServerId);
  329. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  330. // 冗余数据,自己系统中自用
  331. redisCatchStorage.removePushListItem(onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream(), mediaServerId);
  332. }
  333. }
  334. }
  335. @Override
  336. public void clean() {
  337. }
  338. @Override
  339. public boolean saveToRandomGB() {
  340. List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
  341. long gbId = 100001;
  342. for (StreamPushItem streamPushItem : streamPushItems) {
  343. streamPushItem.setStreamType("push");
  344. streamPushItem.setStatus(true);
  345. streamPushItem.setGbId("34020000004111" + gbId);
  346. streamPushItem.setCreateTime(DateUtil.getNow());
  347. gbId ++;
  348. }
  349. int limitCount = 30;
  350. if (streamPushItems.size() > limitCount) {
  351. for (int i = 0; i < streamPushItems.size(); i += limitCount) {
  352. int toIndex = i + limitCount;
  353. if (i + limitCount > streamPushItems.size()) {
  354. toIndex = streamPushItems.size();
  355. }
  356. gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
  357. }
  358. }else {
  359. gbStreamMapper.batchAdd(streamPushItems);
  360. }
  361. return true;
  362. }
  363. @Override
  364. public void batchAdd(List<StreamPushItem> streamPushItems) {
  365. streamPushMapper.addAll(streamPushItems);
  366. gbStreamMapper.batchAdd(streamPushItems);
  367. }
  368. @Override
  369. public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
  370. // 存储数据到stream_push表
  371. streamPushMapper.addAll(streamPushItems);
  372. List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
  373. .filter(streamPushItem-> streamPushItem.getGbId() != null)
  374. .collect(Collectors.toList());
  375. // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
  376. if (streamPushItemForGbStream.size() > 0) {
  377. gbStreamMapper.batchAdd(streamPushItemForGbStream);
  378. }
  379. // 去除没有ID也就是没有存储到数据库的数据
  380. List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
  381. .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
  382. .collect(Collectors.toList());
  383. if (streamPushItemsForPlatform.size() > 0) {
  384. // 获取所有平台,平台和目录信息一般不会特别大量。
  385. List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
  386. Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
  387. if (parentPlatformList.size() == 0) {
  388. return;
  389. }
  390. for (ParentPlatform platform : parentPlatformList) {
  391. Map<String, PlatformCatalog> catalogMap = new HashMap<>();
  392. // 创建根节点
  393. PlatformCatalog platformCatalog = new PlatformCatalog();
  394. platformCatalog.setId(platform.getServerGBId());
  395. catalogMap.put(platform.getServerGBId(), platformCatalog);
  396. // 查询所有节点信息
  397. List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
  398. if (platformCatalogs.size() > 0) {
  399. for (PlatformCatalog catalog : platformCatalogs) {
  400. catalogMap.put(catalog.getId(), catalog);
  401. }
  402. }
  403. platformInfoMap.put(platform.getServerGBId(), catalogMap);
  404. }
  405. List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
  406. Map<String, List<GbStream>> platformForEvent = new HashMap<>();
  407. // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
  408. for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
  409. List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
  410. if (platFormInfoList != null && platFormInfoList.size() > 0) {
  411. for (String[] platFormInfoArray : platFormInfoList) {
  412. StreamPushItem streamPushItemForPlatform = new StreamPushItem();
  413. streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
  414. if (platFormInfoArray.length > 0) {
  415. // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
  416. // 不存在这个平台,则忽略导入此关联关系
  417. if (platformInfoMap.get(platFormInfoArray[0]) == null
  418. || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
  419. logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
  420. continue;
  421. }
  422. streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
  423. List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
  424. if (gbStreamList == null) {
  425. gbStreamList = new ArrayList<>();
  426. platformForEvent.put(platFormInfoArray[0], gbStreamList);
  427. }
  428. // 为发送通知整理数据
  429. streamPushItemForPlatform.setName(streamPushItem.getName());
  430. streamPushItemForPlatform.setApp(streamPushItem.getApp());
  431. streamPushItemForPlatform.setStream(streamPushItem.getStream());
  432. streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
  433. gbStreamList.add(streamPushItemForPlatform);
  434. }
  435. if (platFormInfoArray.length > 1) {
  436. streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
  437. }
  438. streamPushItemListFroPlatform.add(streamPushItemForPlatform);
  439. }
  440. }
  441. }
  442. if (!streamPushItemListFroPlatform.isEmpty()) {
  443. platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
  444. // 发送通知
  445. for (String platformId : platformForEvent.keySet()) {
  446. eventPublisher.catalogEventPublishForStream(
  447. platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
  448. }
  449. }
  450. }
  451. }
  452. @Override
  453. public boolean batchStop(List<GbStream> gbStreams) {
  454. if (gbStreams == null || gbStreams.size() == 0) {
  455. return false;
  456. }
  457. gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
  458. platformGbStreamMapper.delByGbStreams(gbStreams);
  459. gbStreamMapper.batchDelForGbStream(gbStreams);
  460. int delStream = streamPushMapper.delAllForGbStream(gbStreams);
  461. if (delStream > 0) {
  462. for (GbStream gbStream : gbStreams) {
  463. MediaServer mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
  464. mediaServerService.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  465. }
  466. }
  467. return true;
  468. }
  469. @Override
  470. public void allStreamOffline() {
  471. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
  472. if (onlinePushers.size() == 0) {
  473. return;
  474. }
  475. streamPushMapper.setAllStreamOffline();
  476. // 发送通知
  477. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  478. }
  479. @Override
  480. public void offline(List<StreamPushItemFromRedis> offlineStreams) {
  481. // 更新部分设备离线
  482. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
  483. streamPushMapper.offline(offlineStreams);
  484. // 发送通知
  485. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  486. }
  487. @Override
  488. public void online(List<StreamPushItemFromRedis> onlineStreams) {
  489. // 更新部分设备上线streamPushService
  490. List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
  491. streamPushMapper.online(onlineStreams);
  492. // 发送通知
  493. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
  494. }
  495. @Override
  496. public boolean add(StreamPushItem stream) {
  497. stream.setUpdateTime(DateUtil.getNow());
  498. stream.setCreateTime(DateUtil.getNow());
  499. stream.setServerId(userSetting.getServerId());
  500. stream.setMediaServerId(mediaConfig.getId());
  501. stream.setSelf(true);
  502. stream.setPushIng(true);
  503. // 放在事务内执行
  504. boolean result = false;
  505. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  506. try {
  507. int addStreamResult = streamPushMapper.add(stream);
  508. if (!ObjectUtils.isEmpty(stream.getGbId())) {
  509. stream.setStreamType("push");
  510. gbStreamMapper.add(stream);
  511. }
  512. dataSourceTransactionManager.commit(transactionStatus);
  513. result = true;
  514. }catch (Exception e) {
  515. logger.error("批量移除流与平台的关系时错误", e);
  516. dataSourceTransactionManager.rollback(transactionStatus);
  517. }
  518. return result;
  519. }
  520. @Override
  521. public List<String> getAllAppAndStream() {
  522. return streamPushMapper.getAllAppAndStream();
  523. }
  524. @Override
  525. public ResourceBaseInfo getOverview() {
  526. int total = streamPushMapper.getAllCount();
  527. int online = streamPushMapper.getAllOnline(userSetting.isUsePushingAsStatus());
  528. return new ResourceBaseInfo(total, online);
  529. }
  530. @Override
  531. public Map<String, StreamPushItem> getAllAppAndStreamMap() {
  532. return streamPushMapper.getAllAppAndStreamMap();
  533. }
  534. }