StreamPushUploadFileHandler.java 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.excel.context.AnalysisContext;
  3. import com.alibaba.excel.event.AnalysisEventListener;
  4. import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  5. import com.genersoft.iot.vmp.service.IStreamPushService;
  6. import com.genersoft.iot.vmp.utils.DateUtil;
  7. import com.genersoft.iot.vmp.vmanager.bean.StreamPushExcelDto;
  8. import com.google.common.collect.BiMap;
  9. import com.google.common.collect.HashBiMap;
  10. import org.springframework.util.ObjectUtils;
  11. import org.springframework.util.StringUtils;
  12. import java.util.*;
  13. public class StreamPushUploadFileHandler extends AnalysisEventListener<StreamPushExcelDto> {
  14. /**
  15. * 错误数据的回调,用于将错误数据发送给页面
  16. */
  17. private ErrorDataHandler errorDataHandler;
  18. /**
  19. * 推流的业务类用于存储数据
  20. */
  21. private IStreamPushService pushService;
  22. /**
  23. * 默认流媒体节点ID
  24. */
  25. private String defaultMediaServerId;
  26. /**
  27. * 用于存储不加过滤的所有数据
  28. */
  29. private List<StreamPushItem> streamPushItems = new ArrayList<>();
  30. /**
  31. * 用于存储更具APP+Stream过滤后的数据,可以直接存入stream_push表与gb_stream表
  32. */
  33. private Map<String,StreamPushItem> streamPushItemForSave = new HashMap<>();
  34. /**
  35. * 用于存储按照APP+Stream为KEY, 平台ID+目录Id 为value的数据,用于存储到gb_stream表后获取app+Stream对应的平台与目录信息,然后存入关联表
  36. */
  37. private Map<String, List<String[]>> streamPushItemsForPlatform = new HashMap<>();
  38. /**
  39. * 用于判断文件是否存在重复的app+Stream+平台ID
  40. */
  41. private Set<String> streamPushStreamSet = new HashSet<>();
  42. /**
  43. * 用于存储APP+Stream->国标ID 的数据结构, 数据一一对应,全局判断APP+Stream->国标ID是否存在不对应
  44. */
  45. private BiMap<String,String> gBMap = HashBiMap.create();
  46. /**
  47. * 记录错误的APP+Stream
  48. */
  49. private List<String> errorStreamList = new ArrayList<>();
  50. /**
  51. * 记录错误的国标ID
  52. */
  53. private List<String> errorGBList = new ArrayList<>();
  54. /**
  55. * 读取数量计数器
  56. */
  57. private int loadedSize = 0;
  58. public StreamPushUploadFileHandler(IStreamPushService pushService, String defaultMediaServerId, ErrorDataHandler errorDataHandler) {
  59. this.pushService = pushService;
  60. this.defaultMediaServerId = defaultMediaServerId;
  61. this.errorDataHandler = errorDataHandler;
  62. }
  63. public interface ErrorDataHandler{
  64. void handle(List<String> streams, List<String> gbId);
  65. }
  66. @Override
  67. public void invoke(StreamPushExcelDto streamPushExcelDto, AnalysisContext analysisContext) {
  68. if (ObjectUtils.isEmpty(streamPushExcelDto.getApp())
  69. || ObjectUtils.isEmpty(streamPushExcelDto.getStream())
  70. || ObjectUtils.isEmpty(streamPushExcelDto.getGbId())) {
  71. return;
  72. }
  73. if (gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()) == null) {
  74. try {
  75. gBMap.put(streamPushExcelDto.getApp() + streamPushExcelDto.getStream(), streamPushExcelDto.getGbId());
  76. }catch (IllegalArgumentException e) {
  77. e.printStackTrace();
  78. errorGBList.add(streamPushExcelDto.getGbId() + "(不同的app+stream使用了相同的国标ID)");
  79. return;
  80. }
  81. }else {
  82. if (!gBMap.get(streamPushExcelDto.getApp() + streamPushExcelDto.getStream()).equals(streamPushExcelDto.getGbId())) {
  83. errorGBList.add(streamPushExcelDto.getGbId() + "(同一组app+stream使用了不同的国标ID)");
  84. return;
  85. }
  86. }
  87. if (streamPushStreamSet.contains(streamPushExcelDto.getApp() + streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId())) {
  88. errorStreamList.add(streamPushExcelDto.getApp() + "/" + streamPushExcelDto.getStream()+ "/" +
  89. streamPushExcelDto.getPlatformId() + "(同一组app+stream添加在了同一个平台下)");
  90. return;
  91. }else {
  92. streamPushStreamSet.add(streamPushExcelDto.getApp()+streamPushExcelDto.getStream() + streamPushExcelDto.getPlatformId());
  93. }
  94. StreamPushItem streamPushItem = new StreamPushItem();
  95. streamPushItem.setApp(streamPushExcelDto.getApp());
  96. streamPushItem.setStream(streamPushExcelDto.getStream());
  97. streamPushItem.setGbId(streamPushExcelDto.getGbId());
  98. streamPushItem.setStatus(streamPushExcelDto.getStatus());
  99. streamPushItem.setStreamType("push");
  100. streamPushItem.setCreateTime(DateUtil.getNow());
  101. streamPushItem.setMediaServerId(defaultMediaServerId);
  102. streamPushItem.setName(streamPushExcelDto.getName());
  103. streamPushItem.setOriginType(2);
  104. streamPushItem.setOriginTypeStr("rtsp_push");
  105. streamPushItem.setTotalReaderCount("0");
  106. streamPushItem.setPlatformId(streamPushExcelDto.getPlatformId());
  107. streamPushItem.setCatalogId(streamPushExcelDto.getCatalogId());
  108. // 存入所有的通道信息
  109. streamPushItems.add(streamPushItem);
  110. streamPushItemForSave.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
  111. if (!ObjectUtils.isEmpty(streamPushExcelDto.getPlatformId())) {
  112. List<String[]> platformList = streamPushItemsForPlatform.get(streamPushItem.getApp() + streamPushItem.getStream());
  113. if (platformList == null) {
  114. platformList = new ArrayList<>();
  115. streamPushItemsForPlatform.put(streamPushItem.getApp() + streamPushItem.getStream(), platformList);
  116. }
  117. String platformId = streamPushExcelDto.getPlatformId();
  118. String catalogId = streamPushExcelDto.getCatalogId();
  119. if (ObjectUtils.isEmpty(streamPushExcelDto.getCatalogId())) {
  120. catalogId = null;
  121. }
  122. String[] platFormInfoArray = new String[]{platformId, catalogId};
  123. platformList.add(platFormInfoArray);
  124. }
  125. loadedSize ++;
  126. if (loadedSize > 1000) {
  127. saveData();
  128. streamPushItems.clear();
  129. streamPushItemForSave.clear();
  130. streamPushItemsForPlatform.clear();
  131. loadedSize = 0;
  132. }
  133. }
  134. @Override
  135. public void doAfterAllAnalysed(AnalysisContext analysisContext) {
  136. // 这里也要保存数据,确保最后遗留的数据也存储到数据库
  137. saveData();
  138. streamPushItems.clear();
  139. streamPushItemForSave.clear();
  140. gBMap.clear();
  141. streamPushStreamSet.clear();
  142. streamPushItemsForPlatform.clear();
  143. errorDataHandler.handle(errorStreamList, errorGBList);
  144. }
  145. private void saveData(){
  146. if (streamPushItemForSave.size() > 0) {
  147. // 向数据库查询是否存在重复的app
  148. pushService.batchAddForUpload(new ArrayList<>(streamPushItemForSave.values()), streamPushItemsForPlatform);
  149. }
  150. }
  151. }