DeviceServiceImpl.java 12 KB


  1. package com.genersoft.iot.vmp.service.impl;
  2. import com.genersoft.iot.vmp.conf.DynamicTask;
  3. import com.genersoft.iot.vmp.gb28181.bean.Device;
  4. import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
  5. import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
  6. import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
  7. import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
  8. import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
  9. import com.genersoft.iot.vmp.service.IDeviceService;
  10. import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
  11. import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
  12. import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
  13. import com.genersoft.iot.vmp.service.IMediaServerService;
  14. import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
  15. import com.genersoft.iot.vmp.storager.dao.DeviceMapper;
  16. import com.genersoft.iot.vmp.utils.DateUtil;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.stereotype.Service;
  21. import org.springframework.util.StringUtils;
  22. import java.time.Instant;
  23. import java.util.List;
  24. import java.util.concurrent.TimeUnit;
  25. /**
  26. * 设备业务(目录订阅)
  27. */
  28. @Service
  29. public class DeviceServiceImpl implements IDeviceService {
  30. private final static Logger logger = LoggerFactory.getLogger(DeviceServiceImpl.class);
  31. private final String registerExpireTaskKeyPrefix = "device-register-expire-";
  32. @Autowired
  33. private DynamicTask dynamicTask;
  34. @Autowired
  35. private ISIPCommander sipCommander;
  36. @Autowired
  37. private CatalogResponseMessageHandler catalogResponseMessageHandler;
  38. @Autowired
  39. private IRedisCatchStorage redisCatchStorage;
  40. @Autowired
  41. private DeviceMapper deviceMapper;
  42. @Autowired
  43. private ISIPCommander commander;
  44. @Autowired
  45. private VideoStreamSessionManager streamSession;
  46. @Autowired
  47. private IMediaServerService mediaServerService;
  48. @Override
  49. public void online(Device device) {
  50. logger.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
  51. Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
  52. Device deviceInDb = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
  53. String now = DateUtil.getNow();
  54. if (deviceInRedis != null && deviceInDb == null) {
  55. // redis 存在脏数据
  56. redisCatchStorage.clearCatchByDeviceId(device.getDeviceId());
  57. }
  58. device.setUpdateTime(now);
  59. device.setOnline(1);
  60. // 第一次上线
  61. if (device.getCreateTime() == null) {
  62. device.setCreateTime(now);
  63. logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
  64. deviceMapper.add(device);
  65. redisCatchStorage.updateDevice(device);
  66. commander.deviceInfoQuery(device);
  67. sync(device);
  68. }else {
  69. deviceMapper.update(device);
  70. redisCatchStorage.updateDevice(device);
  71. }
  72. // 上线添加订阅
  73. if (device.getSubscribeCycleForCatalog() > 0) {
  74. // 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
  75. addCatalogSubscribe(device);
  76. }
  77. if (device.getSubscribeCycleForMobilePosition() > 0) {
  78. addMobilePositionSubscribe(device);
  79. }
  80. // 刷新过期任务
  81. String registerExpireTaskKey = registerExpireTaskKeyPrefix + device.getDeviceId();
  82. dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId()), device.getExpires() * 1000);
  83. }
  84. @Override
  85. public void offline(String deviceId) {
  86. Device device = deviceMapper.getDeviceByDeviceId(deviceId);
  87. if (device == null) {
  88. return;
  89. }
  90. String registerExpireTaskKey = registerExpireTaskKeyPrefix + deviceId;
  91. dynamicTask.stop(registerExpireTaskKey);
  92. device.setOnline(0);
  93. redisCatchStorage.updateDevice(device);
  94. deviceMapper.update(device);
  95. // 离线释放所有ssrc
  96. List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null);
  97. if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
  98. for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
  99. mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
  100. mediaServerService.closeRTPServer(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
  101. streamSession.remove(deviceId, ssrcTransaction.getChannelId(), ssrcTransaction.getStream());
  102. }
  103. }
  104. // 移除订阅
  105. removeCatalogSubscribe(device);
  106. removeMobilePositionSubscribe(device);
  107. }
  108. @Override
  109. public boolean addCatalogSubscribe(Device device) {
  110. if (device == null || device.getSubscribeCycleForCatalog() < 0) {
  111. return false;
  112. }
  113. logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
  114. // 添加目录订阅
  115. CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander, dynamicTask);
  116. // 提前开始刷新订阅
  117. int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
  118. // 设置最小值为30
  119. dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000);
  120. return true;
  121. }
  122. @Override
  123. public boolean removeCatalogSubscribe(Device device) {
  124. if (device == null || device.getSubscribeCycleForCatalog() < 0) {
  125. return false;
  126. }
  127. logger.info("[移除目录订阅]: {}", device.getDeviceId());
  128. String taskKey = device.getDeviceId() + "catalog";
  129. if (device.getOnline() == 1) {
  130. Runnable runnable = dynamicTask.get(taskKey);
  131. if (runnable instanceof ISubscribeTask) {
  132. ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  133. subscribeTask.stop();
  134. }
  135. }
  136. dynamicTask.stop(taskKey);
  137. return true;
  138. }
  139. @Override
  140. public boolean addMobilePositionSubscribe(Device device) {
  141. if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
  142. return false;
  143. }
  144. logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId());
  145. // 添加目录订阅
  146. MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander, dynamicTask);
  147. // 设置最小值为30
  148. int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
  149. // 提前开始刷新订阅
  150. dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, (subscribeCycleForCatalog -1 ) * 1000);
  151. return true;
  152. }
  153. @Override
  154. public boolean removeMobilePositionSubscribe(Device device) {
  155. if (device == null || device.getSubscribeCycleForCatalog() < 0) {
  156. return false;
  157. }
  158. logger.info("[移除移动位置订阅]: {}", device.getDeviceId());
  159. String taskKey = device.getDeviceId() + "mobile_position";
  160. if (device.getOnline() == 1) {
  161. Runnable runnable = dynamicTask.get(taskKey);
  162. if (runnable instanceof ISubscribeTask) {
  163. ISubscribeTask subscribeTask = (ISubscribeTask) runnable;
  164. subscribeTask.stop();
  165. }
  166. }
  167. dynamicTask.stop(taskKey);
  168. return true;
  169. }
  170. @Override
  171. public SyncStatus getChannelSyncStatus(String deviceId) {
  172. return catalogResponseMessageHandler.getChannelSyncProgress(deviceId);
  173. }
  174. @Override
  175. public Boolean isSyncRunning(String deviceId) {
  176. return catalogResponseMessageHandler.isSyncRunning(deviceId);
  177. }
  178. @Override
  179. public void sync(Device device) {
  180. if (catalogResponseMessageHandler.isSyncRunning(device.getDeviceId())) {
  181. logger.info("开启同步时发现同步已经存在");
  182. return;
  183. }
  184. int sn = (int)((Math.random()*9+1)*100000);
  185. catalogResponseMessageHandler.setChannelSyncReady(device, sn);
  186. sipCommander.catalogQuery(device, sn, event -> {
  187. String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
  188. catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
  189. });
  190. }
  191. @Override
  192. public Device queryDevice(String deviceId) {
  193. return deviceMapper.getDeviceByDeviceId(deviceId);
  194. }
  195. @Override
  196. public List<Device> getAllOnlineDevice() {
  197. return deviceMapper.getOnlineDevices();
  198. }
  199. @Override
  200. public boolean expire(Device device) {
  201. Instant registerTimeDate = Instant.from(DateUtil.formatter.parse(device.getRegisterTime()));
  202. Instant expireInstant = registerTimeDate.plusMillis(TimeUnit.SECONDS.toMillis(device.getExpires()));
  203. return expireInstant.isBefore(Instant.now());
  204. }
  205. @Override
  206. public void checkDeviceStatus(Device device) {
  207. if (device == null || device.getOnline() == 0) {
  208. return;
  209. }
  210. sipCommander.deviceStatusQuery(device, null);
  211. }
  212. @Override
  213. public Device getDeviceByHostAndPort(String host, int port) {
  214. return deviceMapper.getDeviceByHostAndPort(host, port);
  215. }
  216. @Override
  217. public void updateDevice(Device device) {
  218. Device deviceInStore = deviceMapper.getDeviceByDeviceId(device.getDeviceId());
  219. if (deviceInStore == null) {
  220. logger.warn("更新设备时未找到设备信息");
  221. return;
  222. }
  223. if (!StringUtils.isEmpty(device.getName())) {
  224. deviceInStore.setName(device.getName());
  225. }
  226. if (!StringUtils.isEmpty(device.getCharset())) {
  227. deviceInStore.setCharset(device.getCharset());
  228. }
  229. if (!StringUtils.isEmpty(device.getMediaServerId())) {
  230. deviceInStore.setMediaServerId(device.getMediaServerId());
  231. }
  232. // 目录订阅相关的信息
  233. if (device.getSubscribeCycleForCatalog() > 0) {
  234. if (deviceInStore.getSubscribeCycleForCatalog() == 0 || deviceInStore.getSubscribeCycleForCatalog() != device.getSubscribeCycleForCatalog()) {
  235. deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
  236. // 开启订阅
  237. addCatalogSubscribe(deviceInStore);
  238. }
  239. }else if (device.getSubscribeCycleForCatalog() == 0) {
  240. if (deviceInStore.getSubscribeCycleForCatalog() != 0) {
  241. deviceInStore.setSubscribeCycleForCatalog(device.getSubscribeCycleForCatalog());
  242. // 取消订阅
  243. removeCatalogSubscribe(deviceInStore);
  244. }
  245. }
  246. // 移动位置订阅相关的信息
  247. if (device.getSubscribeCycleForMobilePosition() > 0) {
  248. if (deviceInStore.getSubscribeCycleForMobilePosition() == 0 || deviceInStore.getSubscribeCycleForMobilePosition() != device.getSubscribeCycleForMobilePosition()) {
  249. deviceInStore.setMobilePositionSubmissionInterval(device.getMobilePositionSubmissionInterval());
  250. deviceInStore.setSubscribeCycleForMobilePosition(device.getSubscribeCycleForMobilePosition());
  251. // 开启订阅
  252. addMobilePositionSubscribe(deviceInStore);
  253. }
  254. }else if (device.getSubscribeCycleForMobilePosition() == 0) {
  255. if (deviceInStore.getSubscribeCycleForMobilePosition() != 0) {
  256. // 取消订阅
  257. removeMobilePositionSubscribe(deviceInStore);
  258. }
  259. }
  260. String now = DateUtil.getNow();
  261. device.setUpdateTime(now);
  262. device.setCharset(device.getCharset().toUpperCase());
  263. device.setUpdateTime(DateUtil.getNow());
  264. if (deviceMapper.update(device) > 0) {
  265. redisCatchStorage.updateDevice(device);
  266. }
  267. }
  268. }