StreamProxyServiceImpl.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.genersoft.iot.vmp.common.StreamInfo;
  4. import com.genersoft.iot.vmp.gb28181.bean.GbStream;
  5. import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
  6. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  7. import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
  8. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  9. import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
  10. import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
  11. import com.genersoft.iot.vmp.service.IGbStreamService;
  12. import com.genersoft.iot.vmp.service.IMediaServerService;
  13. import com.genersoft.iot.vmp.service.IMediaService;
  14. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  15. import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
  16. import com.genersoft.iot.vmp.storager.dao.GbStreamMapper;
  17. import com.genersoft.iot.vmp.storager.dao.ParentPlatformMapper;
  18. import com.genersoft.iot.vmp.storager.dao.PlatformGbStreamMapper;
  19. import com.genersoft.iot.vmp.storager.dao.StreamProxyMapper;
  20. import com.genersoft.iot.vmp.service.IStreamProxyService;
  21. import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
  22. import com.github.pagehelper.PageInfo;
  23. import org.slf4j.Logger;
  24. import org.slf4j.LoggerFactory;
  25. import org.springframework.beans.factory.annotation.Autowired;
  26. import org.springframework.stereotype.Service;
  27. import org.springframework.util.StringUtils;
  28. import java.util.ArrayList;
  29. import java.util.List;
  30. /**
  31. * 视频代理业务
  32. */
  33. @Service
  34. public class StreamProxyServiceImpl implements IStreamProxyService {
  35. private final static Logger logger = LoggerFactory.getLogger(StreamProxyServiceImpl.class);
  36. @Autowired
  37. private IVideoManagerStorager videoManagerStorager;
  38. @Autowired
  39. private IMediaService mediaService;
  40. @Autowired
  41. private ZLMRESTfulUtils zlmresTfulUtils;;
  42. @Autowired
  43. private StreamProxyMapper streamProxyMapper;
  44. @Autowired
  45. private GbStreamMapper gbStreamMapper;
  46. @Autowired
  47. private PlatformGbStreamMapper platformGbStreamMapper;
  48. @Autowired
  49. private ParentPlatformMapper parentPlatformMapper;
  50. @Autowired
  51. private IGbStreamService gbStreamService;
  52. @Autowired
  53. private IMediaServerService mediaServerService;
  54. @Override
  55. public WVPResult<StreamInfo> save(StreamProxyItem param) {
  56. MediaServerItem mediaInfo;
  57. WVPResult<StreamInfo> wvpResult = new WVPResult<>();
  58. wvpResult.setCode(0);
  59. if ("auto".equals(param.getMediaServerId())){
  60. mediaInfo = mediaServerService.getMediaServerForMinimumLoad();
  61. }else {
  62. mediaInfo = mediaServerService.getOne(param.getMediaServerId());
  63. }
  64. if (mediaInfo == null) {
  65. logger.warn("保存代理未找到在线的ZLM...");
  66. wvpResult.setMsg("保存失败");
  67. return wvpResult;
  68. }
  69. String dstUrl = String.format("rtmp://%s:%s/%s/%s", "127.0.0.1", mediaInfo.getRtmpPort(), param.getApp(),
  70. param.getStream() );
  71. param.setDst_url(dstUrl);
  72. StringBuffer result = new StringBuffer();
  73. boolean streamLive = false;
  74. param.setMediaServerId(mediaInfo.getId());
  75. boolean saveResult;
  76. // 更新
  77. if (videoManagerStorager.queryStreamProxy(param.getApp(), param.getStream()) != null) {
  78. saveResult = videoManagerStorager.updateStreamProxy(param);
  79. }else { // 新增
  80. saveResult = videoManagerStorager.addStreamProxy(param);
  81. }
  82. if (saveResult) {
  83. result.append("保存成功");
  84. if (param.isEnable()) {
  85. JSONObject jsonObject = addStreamProxyToZlm(param);
  86. if (jsonObject == null) {
  87. streamLive = false;
  88. result.append(", 但是启用失败,请检查流地址是否可用");
  89. param.setEnable(false);
  90. videoManagerStorager.updateStreamProxy(param);
  91. }else {
  92. Integer code = jsonObject.getInteger("code");
  93. if (code == 0) {
  94. streamLive = true;
  95. StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStream(
  96. mediaInfo, param.getApp(), param.getStream(), null);
  97. wvpResult.setData(streamInfo);
  98. }else {
  99. result.append(", 但是启用失败,请检查流地址是否可用");
  100. param.setEnable(false);
  101. videoManagerStorager.updateStreamProxy(param);
  102. }
  103. }
  104. }
  105. }else {
  106. result.append("保存失败");
  107. }
  108. if ( !StringUtils.isEmpty(param.getPlatformGbId()) && streamLive) {
  109. List<GbStream> gbStreams = new ArrayList<>();
  110. gbStreams.add(param);
  111. if (gbStreamService.addPlatformInfo(gbStreams, param.getPlatformGbId())){
  112. result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]成功");
  113. }else {
  114. result.append(", 关联国标平台[ " + param.getPlatformGbId() + " ]失败");
  115. }
  116. }
  117. // 查找开启了全部直播流共享的上级平台
  118. List<ParentPlatform> parentPlatforms = parentPlatformMapper.selectAllAhareAllLiveStream();
  119. if (parentPlatforms.size() > 0) {
  120. for (ParentPlatform parentPlatform : parentPlatforms) {
  121. param.setPlatformId(parentPlatform.getServerGBId());
  122. String stream = param.getStream();
  123. StreamProxyItem streamProxyItems = platformGbStreamMapper.selectOne(param.getApp(), stream, parentPlatform.getServerGBId());
  124. if (streamProxyItems == null) {
  125. platformGbStreamMapper.add(param);
  126. }
  127. }
  128. }
  129. wvpResult.setMsg(result.toString());
  130. return wvpResult;
  131. }
  132. @Override
  133. public JSONObject addStreamProxyToZlm(StreamProxyItem param) {
  134. JSONObject result = null;
  135. MediaServerItem mediaServerItem = null;
  136. if (param.getMediaServerId() == null) {
  137. logger.warn("添加代理时MediaServerId 为null");
  138. return null;
  139. }else {
  140. mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
  141. }
  142. if ("default".equals(param.getType())){
  143. result = zlmresTfulUtils.addStreamProxy(mediaServerItem, param.getApp(), param.getStream(), param.getUrl(),
  144. param.isEnable_hls(), param.isEnable_mp4(), param.getRtp_type());
  145. }else if ("ffmpeg".equals(param.getType())) {
  146. result = zlmresTfulUtils.addFFmpegSource(mediaServerItem, param.getSrc_url(), param.getDst_url(),
  147. param.getTimeout_ms() + "", param.isEnable_hls(), param.isEnable_mp4(),
  148. param.getFfmpeg_cmd_key());
  149. }
  150. return result;
  151. }
  152. @Override
  153. public JSONObject removeStreamProxyFromZlm(StreamProxyItem param) {
  154. if (param ==null) return null;
  155. MediaServerItem mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
  156. JSONObject result = zlmresTfulUtils.closeStreams(mediaServerItem, param.getApp(), param.getStream());
  157. return result;
  158. }
  159. @Override
  160. public PageInfo<StreamProxyItem> getAll(Integer page, Integer count) {
  161. return videoManagerStorager.queryStreamProxyList(page, count);
  162. }
  163. @Override
  164. public void del(String app, String stream) {
  165. StreamProxyItem streamProxyItem = videoManagerStorager.queryStreamProxy(app, stream);
  166. if (streamProxyItem != null) {
  167. videoManagerStorager.deleteStreamProxy(app, stream);
  168. JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyItem);
  169. if (jsonObject != null && jsonObject.getInteger("code") == 0) {
  170. // 如果关联了国标那么移除关联
  171. gbStreamMapper.del(app, stream);
  172. platformGbStreamMapper.delByAppAndStream(app, stream);
  173. // TODO 如果关联的推流, 那么状态设置为离线
  174. }
  175. }
  176. }
  177. @Override
  178. public boolean start(String app, String stream) {
  179. boolean result = false;
  180. StreamProxyItem streamProxy = videoManagerStorager.queryStreamProxy(app, stream);
  181. if (!streamProxy.isEnable() && streamProxy != null) {
  182. JSONObject jsonObject = addStreamProxyToZlm(streamProxy);
  183. if (jsonObject == null) return false;
  184. if (jsonObject.getInteger("code") == 0) {
  185. result = true;
  186. streamProxy.setEnable(true);
  187. videoManagerStorager.updateStreamProxy(streamProxy);
  188. }
  189. }
  190. return result;
  191. }
  192. @Override
  193. public boolean stop(String app, String stream) {
  194. boolean result = false;
  195. StreamProxyItem streamProxyDto = videoManagerStorager.queryStreamProxy(app, stream);
  196. if (streamProxyDto != null && streamProxyDto.isEnable()) {
  197. JSONObject jsonObject = removeStreamProxyFromZlm(streamProxyDto);
  198. if (jsonObject.getInteger("code") == 0) {
  199. streamProxyDto.setEnable(false);
  200. result = videoManagerStorager.updateStreamProxy(streamProxyDto);
  201. }
  202. }
  203. return result;
  204. }
  205. @Override
  206. public JSONObject getFFmpegCMDs(MediaServerItem mediaServerItem) {
  207. JSONObject result = new JSONObject();
  208. JSONObject mediaServerConfigResuly = zlmresTfulUtils.getMediaServerConfig(mediaServerItem);
  209. if (mediaServerConfigResuly != null && mediaServerConfigResuly.getInteger("code") == 0
  210. && mediaServerConfigResuly.getJSONArray("data").size() > 0){
  211. JSONObject mediaServerConfig = mediaServerConfigResuly.getJSONArray("data").getJSONObject(0);
  212. for (String key : mediaServerConfig.keySet()) {
  213. if (key.startsWith("ffmpeg.cmd")){
  214. result.put(key, mediaServerConfig.getString(key));
  215. }
  216. }
  217. }
  218. return result;
  219. }
  220. @Override
  221. public StreamProxyItem getStreamProxyByAppAndStream(String app, String streamId) {
  222. return videoManagerStorager.getStreamProxyByAppAndStream(app, streamId);
  223. }
  224. }