MediaServerServiceImpl.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONArray;
  4. import com.alibaba.fastjson.JSONObject;
  5. import com.genersoft.iot.vmp.common.StreamInfo;
  6. import com.genersoft.iot.vmp.conf.MediaConfig;
  7. import com.genersoft.iot.vmp.conf.ProxyServletConfig;
  8. import com.genersoft.iot.vmp.gb28181.bean.Device;
  9. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  10. import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
  11. import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
  12. import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
  13. import com.genersoft.iot.vmp.media.zlm.dto.IMediaServerItem;
  14. import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
  15. import com.genersoft.iot.vmp.media.zlm.dto.ZLMRunInfo;
  16. import com.genersoft.iot.vmp.service.IMediaServerService;
  17. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  18. import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
  19. import org.mitre.dsmiley.httpproxy.ProxyServlet;
  20. import org.slf4j.Logger;
  21. import org.slf4j.LoggerFactory;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.beans.factory.annotation.Value;
  24. import org.springframework.stereotype.Service;
  25. import java.text.SimpleDateFormat;
  26. import java.util.*;
  27. /**
  28. * 媒体服务器节点管理
  29. */
  30. @Service
  31. public class MediaServerServiceImpl implements IMediaServerService {
  32. private final static Logger logger = LoggerFactory.getLogger(MediaServerServiceImpl.class);
  33. private Map<String, IMediaServerItem> zlmServers = new HashMap<>(); // 所有数据库的zlm的缓存
  34. private Map<String, Integer> zlmServerStatus = new LinkedHashMap<>(); // 所有上线的zlm的缓存以及负载
  35. @Value("${sip.ip}")
  36. private String sipIp;
  37. @Value("${server.ssl.enabled:false}")
  38. private boolean sslEnabled;
  39. @Value("${server.port}")
  40. private String serverPort;
  41. @Autowired
  42. private MediaConfig mediaConfig;
  43. @Autowired
  44. private ZLMRESTfulUtils zlmresTfulUtils;
  45. @Autowired
  46. private MediaServerMapper mediaServerMapper;
  47. @Autowired
  48. private IRedisCatchStorage redisCatchStorage;
  49. @Autowired
  50. private VideoStreamSessionManager streamSession;
  51. @Autowired
  52. private ZLMRTPServerFactory zlmrtpServerFactory;
  53. private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  54. /**
  55. * 初始化
  56. */
  57. @Override
  58. public void init() {
  59. zlmServers.clear();
  60. zlmServerStatus.clear();
  61. List<MediaServerItem> mediaServerItemList = mediaServerMapper.queryAll();
  62. for (IMediaServerItem mediaServerItem : mediaServerItemList) {
  63. zlmServers.put(mediaServerItem.getId(), mediaServerItem);
  64. }
  65. }
  66. @Override
  67. public void closeRTPServer(Device device, String channelId) {
  68. StreamInfo streamInfo = redisCatchStorage.queryPlayByDevice(device.getDeviceId(), channelId);
  69. IMediaServerItem mediaServerItem = null;
  70. if (streamInfo != null) {
  71. mediaServerItem = this.getOne (streamInfo.getMediaServerId());
  72. }
  73. String streamId = String.format("gb_play_%s_%s", device.getDeviceId(), channelId);
  74. zlmrtpServerFactory.closeRTPServer(mediaServerItem, streamId);
  75. streamSession.remove(device.getDeviceId(), channelId);
  76. }
  77. @Override
  78. public void update(MediaConfig mediaConfig) {
  79. }
  80. @Override
  81. public List<IMediaServerItem> getAll() {
  82. if (zlmServers.size() == 0) {
  83. init();
  84. }
  85. List<IMediaServerItem> result = new ArrayList<>();
  86. for (String id : zlmServers.keySet()) {
  87. IMediaServerItem mediaServerItem = zlmServers.get(id);
  88. mediaServerItem.setCount(zlmServerStatus.get(id) == null ? 0 : zlmServerStatus.get(id));
  89. result.add(mediaServerItem);
  90. }
  91. return result;
  92. // return mediaServerMapper.queryAll();
  93. }
  94. /**
  95. * 获取单个zlm服务器
  96. * @param mediaServerId 服务id
  97. * @return MediaServerItem
  98. */
  99. @Override
  100. public IMediaServerItem getOne(String mediaServerId) {
  101. if (mediaServerId ==null) return null;
  102. IMediaServerItem mediaServerItem = zlmServers.get(mediaServerId);
  103. if (mediaServerItem != null) {
  104. mediaServerItem.setCount(zlmServerStatus.get(mediaServerId) == null ? 0 : zlmServerStatus.get(mediaServerId));
  105. return mediaServerItem;
  106. }else {
  107. IMediaServerItem item = mediaServerMapper.queryOne(mediaServerId);
  108. if (item != null) {
  109. zlmServers.put(item.getId(), item);
  110. }
  111. return item;
  112. }
  113. }
  114. @Override
  115. public IMediaServerItem getOneByHostAndPort(String host, int port) {
  116. return mediaServerMapper.queryOneByHostAndPort(host, port);
  117. }
  118. /**
  119. * 处理zlm上线
  120. * @param zlmServerConfig zlm上线携带的参数
  121. */
  122. @Override
  123. public void handLeZLMServerConfig(ZLMServerConfig zlmServerConfig) {
  124. logger.info("[ {} ]-[ {}:{} ]已连接",
  125. zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
  126. IMediaServerItem serverItem = getOne(zlmServerConfig.getGeneralMediaServerId());
  127. String now = this.format.format(new Date(System.currentTimeMillis()));
  128. if (serverItem != null) {
  129. serverItem.setSecret(zlmServerConfig.getApiSecret());
  130. serverItem.setIp(zlmServerConfig.getIp());
  131. // 如果是配置文件中的zlm。 也就是默认zlm。 一切以配置文件内容为准
  132. // docker部署不会使用zlm配置的端口号;
  133. // 直接编译部署的使用配置文件的端口号,如果zlm修改配改了配置,wvp自动修改
  134. if (serverItem.getId().equals(mediaConfig.getId())
  135. || (serverItem.getIp().equals(mediaConfig.getIp()) && serverItem.getHttpPort() == mediaConfig.getHttpPort())) {
  136. // 配置文件的zlm
  137. mediaConfig.setId(zlmServerConfig.getGeneralMediaServerId());
  138. mediaConfig.setUpdateTime(now);
  139. if (mediaConfig.getHttpPort() == 0) mediaConfig.setHttpPort(zlmServerConfig.getHttpPort());
  140. if (mediaConfig.getHttpSSlPort() == 0) mediaConfig.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
  141. if (mediaConfig.getRtmpPort() == 0) mediaConfig.setRtmpPort(zlmServerConfig.getRtmpPort());
  142. if (mediaConfig.getRtmpSSlPort() == 0) mediaConfig.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
  143. if (mediaConfig.getRtspPort() == 0) mediaConfig.setRtspPort(zlmServerConfig.getRtspPort());
  144. if (mediaConfig.getRtspSSLPort() == 0) mediaConfig.setRtspSSLPort(zlmServerConfig.getRtspSSlport());
  145. if (mediaConfig.getRtpProxyPort() == 0) mediaConfig.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
  146. mediaServerMapper.update(mediaConfig);
  147. serverItem = mediaConfig.getMediaSerItem();
  148. setZLMConfig(mediaConfig);
  149. }else {
  150. if (!serverItem.isDocker()) {
  151. serverItem.setHttpPort(zlmServerConfig.getHttpPort());
  152. serverItem.setHttpSSlPort(zlmServerConfig.getHttpSSLport());
  153. serverItem.setRtmpPort(zlmServerConfig.getRtmpPort());
  154. serverItem.setRtmpSSlPort(zlmServerConfig.getRtmpSslPort());
  155. serverItem.setRtpProxyPort(zlmServerConfig.getRtpProxyPort());
  156. serverItem.setRtspPort(zlmServerConfig.getRtspPort());
  157. }
  158. serverItem.setUpdateTime(now);
  159. mediaServerMapper.update(serverItem);
  160. setZLMConfig(serverItem);
  161. }
  162. }else {
  163. if (zlmServerConfig.getGeneralMediaServerId().equals(mediaConfig.getId())
  164. || (zlmServerConfig.getIp().equals(mediaConfig.getIp()) && zlmServerConfig.getHttpPort() == mediaConfig.getHttpPort())) {
  165. mediaConfig.setId(zlmServerConfig.getGeneralMediaServerId());
  166. mediaConfig.setCreateTime(now);
  167. mediaConfig.setUpdateTime(now);
  168. serverItem = mediaConfig.getMediaSerItem();
  169. mediaServerMapper.add(mediaConfig);
  170. }else {
  171. // 一个新的zlm接入wvp
  172. serverItem = new MediaServerItem(zlmServerConfig, sipIp);
  173. serverItem.setCreateTime(now);
  174. serverItem.setUpdateTime(now);
  175. mediaServerMapper.add(serverItem);
  176. }
  177. }
  178. // 更新缓存
  179. if (zlmServerStatus.get(serverItem.getId()) == null) {
  180. zlmServers.put(serverItem.getId(), serverItem);
  181. zlmServerStatus.put(serverItem.getId(),0);
  182. }
  183. // 查询服务流数量
  184. IMediaServerItem finalServerItem = serverItem;
  185. zlmresTfulUtils.getMediaList(serverItem, null, null, "rtmp",(mediaList ->{
  186. Integer code = mediaList.getInteger("code");
  187. if (code == 0) {
  188. JSONArray data = mediaList.getJSONArray("data");
  189. if (data != null) {
  190. zlmServerStatus.put(finalServerItem.getId(),data.size());
  191. }else {
  192. zlmServerStatus.put(finalServerItem.getId(),0);
  193. }
  194. }
  195. }));
  196. }
  197. /**
  198. * 更新缓存
  199. * @param mediaServerItem zlm服务
  200. * @param count 在线数
  201. * @param online 在线状态
  202. */
  203. @Override
  204. public void updateServerCatch(IMediaServerItem mediaServerItem, Integer count, Boolean online) {
  205. if (mediaServerItem != null) {
  206. zlmServers.put(mediaServerItem.getId(), mediaServerItem);
  207. Collection<Integer> values = zlmServerStatus.values();
  208. if (online != null && count != null) {
  209. zlmServerStatus.put(mediaServerItem.getId(), count);
  210. }
  211. }
  212. }
  213. @Override
  214. public void addCount(String mediaServerId) {
  215. if (zlmServerStatus.get(mediaServerId) != null) {
  216. zlmServerStatus.put(mediaServerId, zlmServerStatus.get(mediaServerId) + 1);
  217. }
  218. }
  219. @Override
  220. public void removeCount(String mediaServerId) {
  221. if (zlmServerStatus.get(mediaServerId) != null) {
  222. zlmServerStatus.put(mediaServerId, zlmServerStatus.get(mediaServerId) - 1);
  223. }
  224. }
  225. /**
  226. * 获取负载最低的节点
  227. * @return MediaServerItem
  228. */
  229. @Override
  230. public IMediaServerItem getMediaServerForMinimumLoad() {
  231. int mediaCount = -1;
  232. String key = null;
  233. System.out.println(JSON.toJSONString(zlmServerStatus));
  234. if (zlmServerStatus.size() == 1) {
  235. Map.Entry entry = zlmServerStatus.entrySet().iterator().next();
  236. key= (String) entry.getKey();
  237. }else {
  238. for (String id : zlmServerStatus.keySet()) {
  239. if (key == null) {
  240. key = id;
  241. mediaCount = zlmServerStatus.get(id);
  242. }
  243. if (zlmServerStatus.get(id) == 0) {
  244. key = id;
  245. break;
  246. }else if (mediaCount >= zlmServerStatus.get(id)){
  247. mediaCount = zlmServerStatus.get(id);
  248. key = id;
  249. }
  250. }
  251. }
  252. if (key == null) {
  253. logger.info("获取负载最低的节点时无在线节点");
  254. return null;
  255. }else{
  256. return zlmServers.get(key);
  257. }
  258. }
  259. /**
  260. * 对zlm服务器进行基础配置
  261. * @param mediaServerItem 服务ID
  262. */
  263. @Override
  264. public void setZLMConfig(IMediaServerItem mediaServerItem) {
  265. logger.info("[ {} ]-[ {}:{} ]设置zlm",
  266. mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
  267. String protocol = sslEnabled ? "https" : "http";
  268. String hookPrex = String.format("%s://%s:%s/index/hook", protocol, mediaServerItem.getHookIp(), serverPort);
  269. String recordHookPrex = null;
  270. if (mediaServerItem.getRecordAssistPort() != 0) {
  271. recordHookPrex = String.format("http://127.0.0.1:%s/api/record", mediaServerItem.getRecordAssistPort());
  272. }
  273. Map<String, Object> param = new HashMap<>();
  274. param.put("api.secret",mediaServerItem.getSecret()); // -profile:v Baseline
  275. param.put("ffmpeg.cmd","%s -fflags nobuffer -i %s -c:a aac -strict -2 -ar 44100 -ab 48k -c:v libx264 -f flv %s");
  276. param.put("hook.enable","1");
  277. param.put("hook.on_flow_report","");
  278. param.put("hook.on_play",String.format("%s/on_play", hookPrex));
  279. param.put("hook.on_http_access","");
  280. param.put("hook.on_publish", String.format("%s/on_publish", hookPrex));
  281. param.put("hook.on_record_mp4",recordHookPrex != null? String.format("%s/on_record_mp4", recordHookPrex): "");
  282. param.put("hook.on_record_ts","");
  283. param.put("hook.on_rtsp_auth","");
  284. param.put("hook.on_rtsp_realm","");
  285. param.put("hook.on_server_started",String.format("%s/on_server_started", hookPrex));
  286. param.put("hook.on_shell_login",String.format("%s/on_shell_login", hookPrex));
  287. param.put("hook.on_stream_changed",String.format("%s/on_stream_changed", hookPrex));
  288. param.put("hook.on_stream_none_reader",String.format("%s/on_stream_none_reader", hookPrex));
  289. param.put("hook.on_stream_not_found",String.format("%s/on_stream_not_found", hookPrex));
  290. param.put("hook.timeoutSec","20");
  291. param.put("general.streamNoneReaderDelayMS",mediaServerItem.getStreamNoneReaderDelayMS());
  292. JSONObject responseJSON = zlmresTfulUtils.setServerConfig(mediaServerItem, param);
  293. if (responseJSON != null && responseJSON.getInteger("code") == 0) {
  294. logger.info("[ {} ]-[ {}:{} ]设置zlm成功",
  295. mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
  296. }else {
  297. logger.info("[ {} ]-[ {}:{} ]设置zlm失败" + responseJSON.getString("msg"),
  298. mediaServerItem.getId(), mediaServerItem.getIp(), mediaServerItem.getHttpPort());
  299. }
  300. }
  301. }