StreamPushServiceImpl.java 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson2.JSON;
  3. import com.alibaba.fastjson2.JSONArray;
  4. import com.alibaba.fastjson2.JSONObject;
  5. import com.alibaba.fastjson2.TypeReference;
  6. import com.genersoft.iot.vmp.conf.MediaConfig;
  7. import com.genersoft.iot.vmp.conf.UserSetting;
  8. import com.genersoft.iot.vmp.gb28181.bean.*;
  9. import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
  10. import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
  11. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  12. import com.genersoft.iot.vmp.media.zlm.dto.*;
  13. import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
  14. import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
  15. import com.genersoft.iot.vmp.service.IGbStreamService;
  16. import com.genersoft.iot.vmp.service.IMediaServerService;
  17. import com.genersoft.iot.vmp.service.IStreamPushService;
  18. import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
  19. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  20. import com.genersoft.iot.vmp.storager.dao.*;
  21. import com.genersoft.iot.vmp.utils.DateUtil;
  22. import com.genersoft.iot.vmp.vmanager.bean.ResourceBaceInfo;
  23. import com.github.pagehelper.PageHelper;
  24. import com.github.pagehelper.PageInfo;
  25. import org.slf4j.Logger;
  26. import org.slf4j.LoggerFactory;
  27. import org.springframework.beans.factory.annotation.Autowired;
  28. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
  29. import org.springframework.stereotype.Service;
  30. import org.springframework.transaction.TransactionDefinition;
  31. import org.springframework.transaction.TransactionStatus;
  32. import org.springframework.util.ObjectUtils;
  33. import java.util.*;
  34. import java.util.stream.Collectors;
  35. @Service
  36. public class StreamPushServiceImpl implements IStreamPushService {
  37. private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
  38. @Autowired
  39. private GbStreamMapper gbStreamMapper;
  40. @Autowired
  41. private StreamPushMapper streamPushMapper;
  42. @Autowired
  43. private StreamProxyMapper streamProxyMapper;
  44. @Autowired
  45. private ParentPlatformMapper parentPlatformMapper;
  46. @Autowired
  47. private PlatformCatalogMapper platformCatalogMapper;
  48. @Autowired
  49. private PlatformGbStreamMapper platformGbStreamMapper;
  50. @Autowired
  51. private IGbStreamService gbStreamService;
  52. @Autowired
  53. private EventPublisher eventPublisher;
  54. @Autowired
  55. private ZLMRESTfulUtils zlmresTfulUtils;
  56. @Autowired
  57. private IRedisCatchStorage redisCatchStorage;
  58. @Autowired
  59. private UserSetting userSetting;
  60. @Autowired
  61. private IMediaServerService mediaServerService;
  62. @Autowired
  63. DataSourceTransactionManager dataSourceTransactionManager;
  64. @Autowired
  65. TransactionDefinition transactionDefinition;
  66. @Autowired
  67. private MediaConfig mediaConfig;
  68. @Override
  69. public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
  70. if (jsonData == null) {
  71. return null;
  72. }
  73. Map<String, StreamPushItem> result = new HashMap<>();
  74. List<OnStreamChangedHookParam> onStreamChangedHookParams = JSON.parseObject(jsonData, new TypeReference<List<OnStreamChangedHookParam>>() {});
  75. for (OnStreamChangedHookParam item : onStreamChangedHookParams) {
  76. // 不保存国标推理以及拉流代理的流
  77. if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
  78. || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
  79. || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
  80. String key = item.getApp() + "_" + item.getStream();
  81. StreamPushItem streamPushItem = result.get(key);
  82. if (streamPushItem == null) {
  83. streamPushItem = transform(item);
  84. result.put(key, streamPushItem);
  85. }
  86. }
  87. }
  88. return new ArrayList<>(result.values());
  89. }
  90. @Override
  91. public StreamPushItem transform(OnStreamChangedHookParam item) {
  92. StreamPushItem streamPushItem = new StreamPushItem();
  93. streamPushItem.setApp(item.getApp());
  94. streamPushItem.setMediaServerId(item.getMediaServerId());
  95. streamPushItem.setStream(item.getStream());
  96. streamPushItem.setAliveSecond(item.getAliveSecond());
  97. streamPushItem.setOriginSock(item.getOriginSock());
  98. streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
  99. streamPushItem.setOriginType(item.getOriginType());
  100. streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
  101. streamPushItem.setOriginUrl(item.getOriginUrl());
  102. streamPushItem.setCreateTime(DateUtil.getNow());
  103. streamPushItem.setAliveSecond(item.getAliveSecond());
  104. streamPushItem.setStatus(true);
  105. streamPushItem.setStreamType("push");
  106. streamPushItem.setVhost(item.getVhost());
  107. streamPushItem.setServerId(item.getSeverId());
  108. return streamPushItem;
  109. }
  110. @Override
  111. public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
  112. PageHelper.startPage(page, count);
  113. List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
  114. return new PageInfo<>(all);
  115. }
  116. @Override
  117. public List<StreamPushItem> getPushList(String mediaServerId) {
  118. return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  119. }
  120. @Override
  121. public boolean saveToGB(GbStream stream) {
  122. stream.setStreamType("push");
  123. stream.setStatus(true);
  124. stream.setCreateTime(DateUtil.getNow());
  125. stream.setStreamType("push");
  126. stream.setMediaServerId(mediaConfig.getId());
  127. int add = gbStreamMapper.add(stream);
  128. return add > 0;
  129. }
  130. @Override
  131. public boolean removeFromGB(GbStream stream) {
  132. // 判断是否需要发送事件
  133. gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
  134. platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
  135. int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
  136. MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
  137. JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
  138. if (mediaList != null) {
  139. if (mediaList.getInteger("code") == 0) {
  140. JSONArray data = mediaList.getJSONArray("data");
  141. if (data == null) {
  142. streamPushMapper.del(stream.getApp(), stream.getStream());
  143. }
  144. }
  145. }
  146. return del > 0;
  147. }
  148. @Override
  149. public StreamPushItem getPush(String app, String streamId) {
  150. return streamPushMapper.selectOne(app, streamId);
  151. }
  152. @Override
  153. public boolean stop(String app, String streamId) {
  154. StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
  155. if (streamPushItem != null) {
  156. gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
  157. }
  158. platformGbStreamMapper.delByAppAndStream(app, streamId);
  159. gbStreamMapper.del(app, streamId);
  160. int delStream = streamPushMapper.del(app, streamId);
  161. if (delStream > 0) {
  162. MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
  163. zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
  164. }
  165. return true;
  166. }
  167. @Override
  168. public void zlmServerOnline(String mediaServerId) {
  169. // 同步zlm推流信息
  170. MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
  171. if (mediaServerItem == null) {
  172. return;
  173. }
  174. // 数据库记录
  175. List<StreamPushItem> pushList = getPushList(mediaServerId);
  176. Map<String, StreamPushItem> pushItemMap = new HashMap<>();
  177. // redis记录
  178. List<OnStreamChangedHookParam> onStreamChangedHookParams = redisCatchStorage.getStreams(mediaServerId, "PUSH");
  179. Map<String, OnStreamChangedHookParam> streamInfoPushItemMap = new HashMap<>();
  180. if (pushList.size() > 0) {
  181. for (StreamPushItem streamPushItem : pushList) {
  182. if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
  183. pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  184. }
  185. }
  186. }
  187. if (onStreamChangedHookParams.size() > 0) {
  188. for (OnStreamChangedHookParam onStreamChangedHookParam : onStreamChangedHookParams) {
  189. streamInfoPushItemMap.put(onStreamChangedHookParam.getApp() + onStreamChangedHookParam.getStream(), onStreamChangedHookParam);
  190. }
  191. }
  192. // 获取所有推流鉴权信息,清理过期的
  193. List<StreamAuthorityInfo> allStreamAuthorityInfo = redisCatchStorage.getAllStreamAuthorityInfo();
  194. Map<String, StreamAuthorityInfo> streamAuthorityInfoInfoMap = new HashMap<>();
  195. for (StreamAuthorityInfo streamAuthorityInfo : allStreamAuthorityInfo) {
  196. streamAuthorityInfoInfoMap.put(streamAuthorityInfo.getApp() + streamAuthorityInfo.getStream(), streamAuthorityInfo);
  197. }
  198. zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
  199. if (mediaList == null) {
  200. return;
  201. }
  202. String dataStr = mediaList.getString("data");
  203. Integer code = mediaList.getInteger("code");
  204. List<StreamPushItem> streamPushItems = null;
  205. if (code == 0 ) {
  206. if (dataStr != null) {
  207. streamPushItems = handleJSON(dataStr, mediaServerItem);
  208. }
  209. }
  210. if (streamPushItems != null) {
  211. for (StreamPushItem streamPushItem : streamPushItems) {
  212. pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  213. streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  214. streamAuthorityInfoInfoMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
  215. }
  216. }
  217. List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
  218. if (offlinePushItems.size() > 0) {
  219. String type = "PUSH";
  220. int runLimit = 300;
  221. if (offlinePushItems.size() > runLimit) {
  222. for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
  223. int toIndex = i + runLimit;
  224. if (i + runLimit > offlinePushItems.size()) {
  225. toIndex = offlinePushItems.size();
  226. }
  227. List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
  228. streamPushMapper.delAll(streamPushItemsSub);
  229. }
  230. }else {
  231. streamPushMapper.delAll(offlinePushItems);
  232. }
  233. }
  234. Collection<OnStreamChangedHookParam> offlineOnStreamChangedHookParamList = streamInfoPushItemMap.values();
  235. if (offlineOnStreamChangedHookParamList.size() > 0) {
  236. String type = "PUSH";
  237. for (OnStreamChangedHookParam offlineOnStreamChangedHookParam : offlineOnStreamChangedHookParamList) {
  238. JSONObject jsonObject = new JSONObject();
  239. jsonObject.put("serverId", userSetting.getServerId());
  240. jsonObject.put("app", offlineOnStreamChangedHookParam.getApp());
  241. jsonObject.put("stream", offlineOnStreamChangedHookParam.getStream());
  242. jsonObject.put("register", false);
  243. jsonObject.put("mediaServerId", mediaServerId);
  244. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  245. // 移除redis内流的信息
  246. redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineOnStreamChangedHookParam.getApp(), offlineOnStreamChangedHookParam.getStream());
  247. }
  248. }
  249. Collection<StreamAuthorityInfo> streamAuthorityInfos = streamAuthorityInfoInfoMap.values();
  250. if (streamAuthorityInfos.size() > 0) {
  251. for (StreamAuthorityInfo streamAuthorityInfo : streamAuthorityInfos) {
  252. // 移除redis内流的信息
  253. redisCatchStorage.removeStreamAuthorityInfo(streamAuthorityInfo.getApp(), streamAuthorityInfo.getStream());
  254. }
  255. }
  256. }));
  257. }
  258. @Override
  259. public void zlmServerOffline(String mediaServerId) {
  260. List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
  261. // 移除没有GBId的推流
  262. streamPushMapper.deleteWithoutGBId(mediaServerId);
  263. gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
  264. // 其他的流设置未启用
  265. streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
  266. streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
  267. // 发送流停止消息
  268. String type = "PUSH";
  269. // 发送redis消息
  270. List<OnStreamChangedHookParam> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
  271. if (streamInfoList.size() > 0) {
  272. for (OnStreamChangedHookParam onStreamChangedHookParam : streamInfoList) {
  273. // 移除redis内流的信息
  274. redisCatchStorage.removeStream(mediaServerId, type, onStreamChangedHookParam.getApp(), onStreamChangedHookParam.getStream());
  275. JSONObject jsonObject = new JSONObject();
  276. jsonObject.put("serverId", userSetting.getServerId());
  277. jsonObject.put("app", onStreamChangedHookParam.getApp());
  278. jsonObject.put("stream", onStreamChangedHookParam.getStream());
  279. jsonObject.put("register", false);
  280. jsonObject.put("mediaServerId", mediaServerId);
  281. redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
  282. }
  283. }
  284. }
  285. @Override
  286. public void clean() {
  287. }
  288. @Override
  289. public boolean saveToRandomGB() {
  290. List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
  291. long gbId = 100001;
  292. for (StreamPushItem streamPushItem : streamPushItems) {
  293. streamPushItem.setStreamType("push");
  294. streamPushItem.setStatus(true);
  295. streamPushItem.setGbId("34020000004111" + gbId);
  296. streamPushItem.setCreateTime(DateUtil.getNow());
  297. gbId ++;
  298. }
  299. int limitCount = 30;
  300. if (streamPushItems.size() > limitCount) {
  301. for (int i = 0; i < streamPushItems.size(); i += limitCount) {
  302. int toIndex = i + limitCount;
  303. if (i + limitCount > streamPushItems.size()) {
  304. toIndex = streamPushItems.size();
  305. }
  306. gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
  307. }
  308. }else {
  309. gbStreamMapper.batchAdd(streamPushItems);
  310. }
  311. return true;
  312. }
  313. @Override
  314. public void batchAdd(List<StreamPushItem> streamPushItems) {
  315. streamPushMapper.addAll(streamPushItems);
  316. gbStreamMapper.batchAdd(streamPushItems);
  317. }
  318. @Override
  319. public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
  320. // 存储数据到stream_push表
  321. streamPushMapper.addAll(streamPushItems);
  322. List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
  323. .filter(streamPushItem-> streamPushItem.getId() != null)
  324. .collect(Collectors.toList());
  325. // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
  326. if (streamPushItemForGbStream.size() > 0) {
  327. gbStreamMapper.batchAdd(streamPushItemForGbStream);
  328. }
  329. // 去除没有ID也就是没有存储到数据库的数据
  330. List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
  331. .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
  332. .collect(Collectors.toList());
  333. if (streamPushItemsForPlatform.size() > 0) {
  334. // 获取所有平台,平台和目录信息一般不会特别大量。
  335. List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
  336. Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
  337. if (parentPlatformList.size() == 0) {
  338. return;
  339. }
  340. for (ParentPlatform platform : parentPlatformList) {
  341. Map<String, PlatformCatalog> catalogMap = new HashMap<>();
  342. // 创建根节点
  343. PlatformCatalog platformCatalog = new PlatformCatalog();
  344. platformCatalog.setId(platform.getServerGBId());
  345. catalogMap.put(platform.getServerGBId(), platformCatalog);
  346. // 查询所有节点信息
  347. List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
  348. if (platformCatalogs.size() > 0) {
  349. for (PlatformCatalog catalog : platformCatalogs) {
  350. catalogMap.put(catalog.getId(), catalog);
  351. }
  352. }
  353. platformInfoMap.put(platform.getServerGBId(), catalogMap);
  354. }
  355. List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
  356. Map<String, List<GbStream>> platformForEvent = new HashMap<>();
  357. // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
  358. for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
  359. List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
  360. if (platFormInfoList != null && platFormInfoList.size() > 0) {
  361. for (String[] platFormInfoArray : platFormInfoList) {
  362. StreamPushItem streamPushItemForPlatform = new StreamPushItem();
  363. streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
  364. if (platFormInfoArray.length > 0) {
  365. // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
  366. // 不存在这个平台,则忽略导入此关联关系
  367. if (platformInfoMap.get(platFormInfoArray[0]) == null
  368. || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
  369. logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
  370. continue;
  371. }
  372. streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
  373. List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
  374. if (gbStreamList == null) {
  375. gbStreamList = new ArrayList<>();
  376. platformForEvent.put(platFormInfoArray[0], gbStreamList);
  377. }
  378. // 为发送通知整理数据
  379. streamPushItemForPlatform.setName(streamPushItem.getName());
  380. streamPushItemForPlatform.setApp(streamPushItem.getApp());
  381. streamPushItemForPlatform.setStream(streamPushItem.getStream());
  382. streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
  383. gbStreamList.add(streamPushItemForPlatform);
  384. }
  385. if (platFormInfoArray.length > 1) {
  386. streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
  387. }
  388. streamPushItemListFroPlatform.add(streamPushItemForPlatform);
  389. }
  390. }
  391. }
  392. if (streamPushItemListFroPlatform.size() > 0) {
  393. platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
  394. // 发送通知
  395. for (String platformId : platformForEvent.keySet()) {
  396. eventPublisher.catalogEventPublishForStream(
  397. platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
  398. }
  399. }
  400. }
  401. }
  402. @Override
  403. public boolean batchStop(List<GbStream> gbStreams) {
  404. if (gbStreams == null || gbStreams.size() == 0) {
  405. return false;
  406. }
  407. gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
  408. platformGbStreamMapper.delByGbStreams(gbStreams);
  409. gbStreamMapper.batchDelForGbStream(gbStreams);
  410. int delStream = streamPushMapper.delAllForGbStream(gbStreams);
  411. if (delStream > 0) {
  412. for (GbStream gbStream : gbStreams) {
  413. MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
  414. zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
  415. }
  416. }
  417. return true;
  418. }
  419. @Override
  420. public void allStreamOffline() {
  421. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
  422. if (onlinePushers.size() == 0) {
  423. return;
  424. }
  425. streamPushMapper.setAllStreamOffline();
  426. // 发送通知
  427. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  428. }
  429. @Override
  430. public void offline(List<StreamPushItemFromRedis> offlineStreams) {
  431. // 更新部分设备离线
  432. List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
  433. streamPushMapper.offline(offlineStreams);
  434. // 发送通知
  435. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
  436. }
  437. @Override
  438. public void online(List<StreamPushItemFromRedis> onlineStreams) {
  439. // 更新部分设备上线streamPushService
  440. List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
  441. streamPushMapper.online(onlineStreams);
  442. // 发送通知
  443. eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
  444. }
  445. @Override
  446. public boolean add(StreamPushItem stream) {
  447. stream.setUpdateTime(DateUtil.getNow());
  448. stream.setCreateTime(DateUtil.getNow());
  449. stream.setServerId(userSetting.getServerId());
  450. // 放在事务内执行
  451. boolean result = false;
  452. TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
  453. try {
  454. int addStreamResult = streamPushMapper.add(stream);
  455. if (!ObjectUtils.isEmpty(stream.getGbId())) {
  456. stream.setStreamType("push");
  457. gbStreamMapper.add(stream);
  458. }
  459. dataSourceTransactionManager.commit(transactionStatus);
  460. result = true;
  461. }catch (Exception e) {
  462. logger.error("批量移除流与平台的关系时错误", e);
  463. dataSourceTransactionManager.rollback(transactionStatus);
  464. }
  465. return result;
  466. }
  467. @Override
  468. public List<String> getAllAppAndStream() {
  469. return streamPushMapper.getAllAppAndStream();
  470. }
  471. @Override
  472. public ResourceBaceInfo getOverview() {
  473. return streamPushMapper.getOverview(userSetting.isUsePushingAsStatus());
  474. }
  475. }