Просмотр исходного кода

Merge branch '648540858:wvp-28181-2.0' into wvp-28181-2.0

mrjackwang 3 лет назад
Родитель
Сommit
a59095db0c
56 измененных файлов с 1293 добавлено и 924 удалено
  1. 13 1
      pom.xml
  2. 446 337
      sql/mysql.sql
  3. 3 0
      sql/update.sql
  4. 0 1
      src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java
  5. 7 6
      src/main/java/com/genersoft/iot/vmp/conf/security/DefaultUserDetailsServiceImpl.java
  6. 7 1
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
  7. 11 0
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SyncStatus.java
  8. 5 0
      src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
  9. 3 3
      src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
  10. 18 34
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
  11. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java
  12. 9 11
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java
  13. 33 36
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
  14. 1 0
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java
  15. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java
  16. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceInfoQueryMessageHandler.java
  17. 2 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
  18. 3 3
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java
  19. 15 22
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java
  20. 45 41
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java
  21. 29 106
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java
  22. 10 4
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java
  23. 19 21
      src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java
  24. 33 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java
  25. 44 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java
  26. 43 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java
  27. 23 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java
  28. 36 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java
  29. 15 0
      src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java
  30. 2 0
      src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java
  31. 4 0
      src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java
  32. 0 38
      src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java
  33. 20 7
      src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
  34. 32 33
      src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java
  35. 2 2
      src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java
  36. 33 34
      src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java
  37. 6 8
      src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java
  38. 48 3
      src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java
  39. 46 23
      src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java
  40. 7 6
      src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java
  41. 32 0
      src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
  42. 0 27
      src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java
  43. 3 0
      src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java
  44. 1 1
      src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java
  45. 5 3
      src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java
  46. 5 5
      src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java
  47. 7 38
      src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
  48. 1 0
      src/main/java/com/genersoft/iot/vmp/vmanager/bean/WVPResult.java
  49. 14 6
      src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java
  50. 15 1
      src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
  51. 46 8
      src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java
  52. 17 7
      web_src/src/components/PushVideoList.vue
  53. 1 2
      web_src/src/components/channelList.vue
  54. 24 19
      web_src/src/components/dialog/SyncChannelProgress.vue
  55. 3 4
      web_src/src/components/dialog/catalogEdit.vue
  56. 43 18
      web_src/src/components/dialog/addStreamTOGB.vue

+ 13 - 1
pom.xml

@@ -61,6 +61,13 @@
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-data-redis</artifactId>
+			 <exclusions>
+                <!-- 去掉  Lettuce 的依赖,  Spring Boot 优先使用 Lettuce 作为 Redis 客户端 -->
+                <exclusion>
+                    <groupId>io.lettuce</groupId>
+                    <artifactId>lettuce-core</artifactId>
+                </exclusion>
+            </exclusions>
 		</dependency>
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
@@ -75,6 +82,12 @@
 			<groupId>org.mybatis.spring.boot</groupId>
 			<artifactId>mybatis-spring-boot-starter</artifactId>
 			<version>2.1.4</version>
+			<exclusions>
+				<exclusion>
+					<groupId>com.zaxxer</groupId>
+					<artifactId>HikariCP</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 		<dependency>
 			<groupId>org.springframework.boot</groupId>
@@ -84,7 +97,6 @@
 		<dependency>
 			<groupId>redis.clients</groupId>
 			<artifactId>jedis</artifactId>
-			<version>${jedis-version}</version>
 		</dependency>
 
 		<!-- druid数据库连接池 -->

Разница между файлами не показана из-за своего большого размера
+ 446 - 337
sql/mysql.sql


+ 3 - 0
sql/update.sql

@@ -77,5 +77,8 @@ alter table platform_catalog
 alter table platform_catalog
     add businessGroupId varchar(50) default null;
 
+/********************* ADD ***************************/
+alter table stream_push
+    add self int DEFAULT NULL;
 
 

+ 0 - 1
src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java

@@ -1,6 +1,5 @@
 package com.genersoft.iot.vmp.conf;
 
-import io.swagger.models.auth.In;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Component;
 

+ 7 - 6
src/main/java/com/genersoft/iot/vmp/conf/security/DefaultUserDetailsServiceImpl.java

@@ -1,9 +1,7 @@
 package com.genersoft.iot.vmp.conf.security;
 
-import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
-import com.genersoft.iot.vmp.service.IUserService;
-import com.genersoft.iot.vmp.storager.dao.dto.User;
-import com.github.xiaoymin.knife4j.core.util.StrUtil;
+import java.time.LocalDateTime;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -12,7 +10,10 @@ import org.springframework.security.core.userdetails.UserDetailsService;
 import org.springframework.security.core.userdetails.UsernameNotFoundException;
 import org.springframework.stereotype.Component;
 
-import java.time.LocalDateTime;
+import com.alibaba.excel.util.StringUtils;
+import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
+import com.genersoft.iot.vmp.service.IUserService;
+import com.genersoft.iot.vmp.storager.dao.dto.User;
 
 /**
  * 用户登录认证逻辑
@@ -27,7 +28,7 @@ public class DefaultUserDetailsServiceImpl implements UserDetailsService {
 
     @Override
     public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException {
-        if (StrUtil.isBlank(username)) {
+        if (StringUtils.isBlank(username)) {
             logger.info("登录用户:{} 不存在", username);
             throw new UsernameNotFoundException("登录用户:" + username + " 不存在");
         }

+ 7 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java

@@ -3,8 +3,14 @@ package com.genersoft.iot.vmp.gb28181.bean;
 import java.time.Instant;
 import java.util.List;
 
+/**
+ * @author lin
+ */
 public class CatalogData {
-    private int sn; // 命令序列号
+    /**
+     * 命令序列号
+     */
+    private int sn;
     private int total;
     private List<DeviceChannel> channelList;
     private Instant lastTime;

+ 11 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SyncStatus.java

@@ -2,12 +2,15 @@ package com.genersoft.iot.vmp.gb28181.bean;
 
 /**
  * 摄像机同步状态
+ * @author lin
  */
 public class SyncStatus {
     private int total;
     private int current;
     private String errorMsg;
 
+    private boolean syncIng;
+
     public int getTotal() {
         return total;
     }
@@ -31,4 +34,12 @@ public class SyncStatus {
     public void setErrorMsg(String errorMsg) {
         this.errorMsg = errorMsg;
     }
+
+    public boolean isSyncIng() {
+        return syncIng;
+    }
+
+    public void setSyncIng(boolean syncIng) {
+        this.syncIng = syncIng;
+    }
 }

+ 5 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java

@@ -84,6 +84,11 @@ public class CatalogDataCatch {
         syncStatus.setCurrent(catalogData.getChannelList().size());
         syncStatus.setTotal(catalogData.getTotal());
         syncStatus.setErrorMsg(catalogData.getErrorMsg());
+        if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
+            syncStatus.setSyncIng(false);
+        }else {
+            syncStatus.setSyncIng(true);
+        }
         return syncStatus;
     }
 

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java

@@ -39,9 +39,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
             dynamicTask.stop(taskKey);
         }
         sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
-//            if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
-//                dialog = eventResult.dialog;
-//            }
+            if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
+                dialog = eventResult.dialog;
+            }
             ResponseEvent event = (ResponseEvent) eventResult.event;
             if (event.getResponse().getRawContent() != null) {
                 // 成功

+ 18 - 34
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -10,6 +10,9 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.SIPRequestHeaderProvider;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -348,25 +351,19 @@ public class SIPCommander implements ISIPCommander {
 	@Override
 	public void playStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
 							  ZLMHttpHookSubscribe.Event event, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) {
-		String streamId = ssrcInfo.getStream();
+		String stream = ssrcInfo.getStream();
 		try {
 			if (device == null) {
 				return;
 			}
 			String streamMode = device.getStreamMode().toUpperCase();
 
-			logger.info("{} 分配的ZLM为: {} [{}:{}]", streamId, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
-			// 添加订阅
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", streamId);
-			subscribeKey.put("regist", true);
-			subscribeKey.put("schema", "rtmp");
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			logger.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 				if (event != null) {
 					event.response(mediaServerItemInUse, json);
+					subscribe.removeSubscribe(hookSubscribe);
 				}
 			});
 			//
@@ -440,7 +437,7 @@ public class SIPCommander implements ISIPCommander {
 				errorEvent.response(e);
 			}), e ->{
 				// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
-				streamSession.put(device.getDeviceId(), channelId ,"play", streamId, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
+				streamSession.put(device.getDeviceId(), channelId ,"play", stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), ((ResponseEvent)e.event).getClientTransaction(), VideoStreamSessionManager.SessionType.play);
 				streamSession.put(device.getDeviceId(), channelId ,"play", e.dialog);
 				okEvent.response(e);
 			});
@@ -530,21 +527,14 @@ public class SIPCommander implements ISIPCommander {
 
 			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
 					: udpSipProvider.getNewCallId();
-
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtmp", mediaServerItem.getId());
 			// 添加订阅
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", ssrcInfo.getStream());
-			subscribeKey.put("regist", true);
-			subscribeKey.put("schema", "rtmp");
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey);
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 						if (hookEvent != null) {
 							InviteStreamInfo inviteStreamInfo = new InviteStreamInfo(mediaServerItemInUse, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream());
 							hookEvent.call(inviteStreamInfo);
 						}
+						subscribe.removeSubscribe(hookSubscribe);
 					});
 	        Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, "fromplybck" + tm, null, callIdHeader, ssrcInfo.getSsrc());
 
@@ -643,21 +633,15 @@ public class SIPCommander implements ISIPCommander {
 			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
 					: udpSipProvider.getNewCallId();
 
+			HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
 			// 添加订阅
-			JSONObject subscribeKey = new JSONObject();
-			subscribeKey.put("app", "rtp");
-			subscribeKey.put("stream", ssrcInfo.getStream());
-			subscribeKey.put("regist", true);
-			subscribeKey.put("mediaServerId", mediaServerItem.getId());
-			logger.debug("录像回放添加订阅,订阅内容:" + subscribeKey.toString());
-			subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-					(MediaServerItem mediaServerItemInUse, JSONObject json)->{
+			subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
 						hookEvent.call(new InviteStreamInfo(mediaServerItem, json, callIdHeader.getCallId(), "rtp", ssrcInfo.getStream()));
-						subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey);
-						subscribeKey.put("regist", false);
-						subscribeKey.put("schema", "rtmp");
+						subscribe.removeSubscribe(hookSubscribe);
+						hookSubscribe.getContent().put("regist", false);
+						hookSubscribe.getContent().put("schema", "rtmp");
 						// 添加流注销的订阅,注销了后向设备发送bye
-						subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
+						subscribe.addSubscribe(hookSubscribe,
 								(MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd)->{
 									ClientTransaction transaction = streamSession.getTransaction(device.getDeviceId(), channelId, ssrcInfo.getStream(), callIdHeader.getCallId());
 									if (transaction != null) {

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java

@@ -126,7 +126,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
 					SsrcTransaction ssrcTransactionForPlay = streamSession.getSsrcTransaction(device.getDeviceId(), channelId, "play", null);
 					if (ssrcTransactionForPlay != null){
 						SIPDialog dialogForPlay = (SIPDialog) SerializeUtils.deSerialize(ssrcTransactionForPlay.getDialog());
-						if (dialogForPlay.getCallId().equals(callIdHeader.getCallId())){
+						if (dialogForPlay.getCallId().getCallId().equals(callIdHeader.getCallId())){
 							// 释放ssrc
 							MediaServerItem mediaServerItem = mediaServerService.getOne(ssrcTransactionForPlay.getMediaServerId());
 							if (mediaServerItem != null) {

+ 9 - 11
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -419,18 +419,16 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                         }
                     }
                 } else if (gbStream != null) {
-                    if (streamPushItem.isStatus()) {
-                        // 在线状态
+                    if (streamPushItem != null && streamPushItem.isPushIng()) {
+                        // 推流状态
                         pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                 mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                     } else {
-                        // 不在线 拉起
+                        // 未推流 拉起
                         notifyStreamOnline(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                 mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);
                     }
-
                 }
-
             }
 
         } catch (SipException | InvalidArgumentException | ParseException e) {
@@ -453,7 +451,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                             int port, Boolean tcpActive, boolean mediaTransmissionTCP,
                             String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
         // 推流
-        if (streamPushItem.getServerId().equals(userSetting.getServerId())) {
+        if (streamPushItem.isSelf()) {
             Boolean streamReady = zlmrtpServerFactory.isStreamReady(mediaServerItem, gbStream.getApp(), gbStream.getStream());
             if (streamReady) {
                 // 自平台内容
@@ -502,7 +500,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                                     String channelId, String addressStr, String ssrc, String requesterId) throws InvalidArgumentException, ParseException, SipException {
         if ("proxy".equals(gbStream.getStreamType())) {
             // TODO 控制启用以使设备上线
-            logger.info("[ app={}, stream={} ]通道离线,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
+            logger.info("[ app={}, stream={} ]通道未推流,启用流后开始推流", gbStream.getApp(), gbStream.getStream());
             responseAck(evt, Response.BAD_REQUEST, "channel [" + gbStream.getGbId() + "] offline");
         } else if ("push".equals(gbStream.getStreamType())) {
             if (!platform.isStartOfflinePush()) {
@@ -510,7 +508,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                 return;
             }
             // 发送redis消息以使设备上线
-            logger.info("[ app={}, stream={} ]通道离线,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
+            logger.info("[ app={}, stream={} ]通道未推流,发送redis信息控制设备开始推流", gbStream.getApp(), gbStream.getStream());
 
             MessageForPushChannel messageForPushChannel = MessageForPushChannel.getInstance(1,
                     gbStream.getApp(), gbStream.getStream(), gbStream.getGbId(), gbStream.getPlatformId(),
@@ -520,7 +518,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             dynamicTask.startDelay(callIdHeader.getCallId(), () -> {
                 logger.info("[ app={}, stream={} ] 等待设备开始推流超时", gbStream.getApp(), gbStream.getStream());
                 try {
-                    mediaListManager.removedChannelOnlineEventLister(gbStream.getGbId());
+                    mediaListManager.removedChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream());
                     responseAck(evt, Response.REQUEST_TIMEOUT); // 超时
                 } catch (SipException e) {
                     e.printStackTrace();
@@ -535,7 +533,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
             Boolean finalTcpActive = tcpActive;
 
             // 添加在本机上线的通知
-            mediaListManager.addChannelOnlineEventLister(gbStream.getGbId(), (app, stream, serverId) -> {
+            mediaListManager.addChannelOnlineEventLister(gbStream.getApp(), gbStream.getStream(), (app, stream, serverId) -> {
                 dynamicTask.stop(callIdHeader.getCallId());
                 if (serverId.equals(userSetting.getServerId())) {
                     SendRtpItem sendRtpItem = zlmrtpServerFactory.createSendRtpItem(mediaServerItem, addressStr, finalPort, ssrc, requesterId,
@@ -623,7 +621,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                             // 离线
                             // 查询是否在本机上线了
                             StreamPushItem currentStreamPushItem = streamPushService.getPush(streamPushItem.getApp(), streamPushItem.getStream());
-                            if (currentStreamPushItem.isStatus()) {
+                            if (currentStreamPushItem.isPushIng()) {
                                 // 在线状态
                                 pushStream(evt, gbStream, streamPushItem, platform, callIdHeader, mediaServerItem, port, tcpActive,
                                         mediaTransmissionTCP, channelId, addressStr, ssrc, requesterId);

+ 33 - 36
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -92,39 +92,36 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 	@Override
 	public void process(RequestEvent evt) {
 		try {
-
 			taskQueue.offer(new HandlerCatchData(evt, null, null));
 			responseAck(evt, Response.OK);
 			if (!taskQueueHandlerRun) {
 				taskQueueHandlerRun = true;
 				taskExecutor.execute(()-> {
-							while (!taskQueue.isEmpty()) {
-								try {
-									HandlerCatchData take = taskQueue.poll();
-									Element rootElement = getRootElement(take.getEvt());
-									String cmd = XmlUtil.getText(rootElement, "CmdType");
-
-									if (CmdType.CATALOG.equals(cmd)) {
-										logger.info("接收到Catalog通知");
-										processNotifyCatalogList(take.getEvt());
-									} else if (CmdType.ALARM.equals(cmd)) {
-										logger.info("接收到Alarm通知");
-										processNotifyAlarm(take.getEvt());
-									} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
-										logger.info("接收到MobilePosition通知");
-										processNotifyMobilePosition(take.getEvt());
-									} else {
-										logger.info("接收到消息:" + cmd);
-									}
-								} catch (DocumentException e) {
-									throw new RuntimeException(e);
-								}
+					while (!taskQueue.isEmpty()) {
+						try {
+							HandlerCatchData take = taskQueue.poll();
+							Element rootElement = getRootElement(take.getEvt());
+							String cmd = XmlUtil.getText(rootElement, "CmdType");
+
+							if (CmdType.CATALOG.equals(cmd)) {
+								logger.info("接收到Catalog通知");
+								processNotifyCatalogList(take.getEvt());
+							} else if (CmdType.ALARM.equals(cmd)) {
+								logger.info("接收到Alarm通知");
+								processNotifyAlarm(take.getEvt());
+							} else if (CmdType.MOBILE_POSITION.equals(cmd)) {
+								logger.info("接收到MobilePosition通知");
+								processNotifyMobilePosition(take.getEvt());
+							} else {
+								logger.info("接收到消息:" + cmd);
 							}
-						taskQueueHandlerRun = false;
-						});
+						} catch (DocumentException e) {
+							throw new RuntimeException(e);
+						}
+					}
+				taskQueueHandlerRun = false;
+				});
 			}
-
-
 		} catch (SipException | InvalidArgumentException | ParseException e) {
 			e.printStackTrace();
 		}
@@ -174,7 +171,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 			} else {
 				mobilePosition.setAltitude(0.0);
 			}
-			logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+			logger.info("[收到移动位置订阅通知]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
 					mobilePosition.getLongitude(), mobilePosition.getLatitude());
 			mobilePosition.setReportSource("Mobile Position");
 
@@ -318,7 +315,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 
 			Device device = redisCatchStorage.getDevice(deviceId);
 			if (device == null || device.getOnline() == 0) {
-				logger.warn("[收到 目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
+				logger.warn("[收到目录订阅]:{}, 但是设备已经离线", (device != null ? device.getDeviceId():"" ));
 				return;
 			}
 			Element rootElement = getRootElement(evt, device.getCharset());
@@ -339,28 +336,28 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 					Element eventElement = itemDevice.element("Event");
 					String event;
 					if (eventElement == null) {
-						logger.warn("[收到 目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
+						logger.warn("[收到目录订阅]:{}, 但是Event为空, 设为默认值 ADD", (device != null ? device.getDeviceId():"" ));
 						event = CatalogEvent.ADD;
 					}else {
 						event = eventElement.getText().toUpperCase();
 					}
 					DeviceChannel channel = XmlUtil.channelContentHander(itemDevice, device, event);
 					channel.setDeviceId(device.getDeviceId());
-					logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
+					logger.info("[收到目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
 					switch (event) {
 						case CatalogEvent.ON:
 							// 上线
-							logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());
+							logger.info("[收到通道上线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							storager.deviceChannelOnline(deviceId, channel.getChannelId());
 							break;
 						case CatalogEvent.OFF :
 							// 离线
-							logger.info("收到来自设备【{}】的通道【{}】离线通知", device.getDeviceId(), channel.getChannelId());
+							logger.info("[收到通道离线通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							storager.deviceChannelOffline(deviceId, channel.getChannelId());
 							break;
 						case CatalogEvent.VLOST:
 							// 视频丢失
-							logger.info("收到来自设备【{}】的通道【{}】视频丢失通知", device.getDeviceId(), channel.getChannelId());
+							logger.info("[收到通道视频丢失通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							storager.deviceChannelOffline(deviceId, channel.getChannelId());
 							break;
 						case CatalogEvent.DEFECT:
@@ -368,17 +365,17 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 							break;
 						case CatalogEvent.ADD:
 							// 增加
-							logger.info("收到来自设备【{}】的增加通道【{}】通知", device.getDeviceId(), channel.getChannelId());
+							logger.info("[收到增加通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							deviceChannelService.updateChannel(deviceId, channel);
 							break;
 						case CatalogEvent.DEL:
 							// 删除
-							logger.info("收到来自设备【{}】的删除通道【{}】通知", device.getDeviceId(), channel.getChannelId());
+							logger.info("[收到删除通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							storager.delChannel(deviceId, channel.getChannelId());
 							break;
 						case CatalogEvent.UPDATE:
 							// 更新
-							logger.info("收到来自设备【{}】的更新通道【{}】通知", device.getDeviceId(), channel.getChannelId());
+							logger.info("[收到更新通道通知] 来自设备: {}, 通道 {}", device.getDeviceId(), channel.getChannelId());
 							deviceChannelService.updateChannel(deviceId, channel);
 							break;
 						default:

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java

@@ -143,6 +143,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
                 device.setGeoCoordSys("WGS84");
                 device.setTreeType("CivilCode");
                 device.setDeviceId(deviceId);
+                device.setOnline(0);
             }
             device.setIp(received);
             device.setPort(rPort);

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java

@@ -69,7 +69,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
 
     @Override
     public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
-        logger.info("收到来自设备[{}]的报警通知", device.getDeviceId());
+        logger.info("[收到报警通知]设备:{}", device.getDeviceId());
         // 回复200 OK
         try {
             responseAck(evt, Response.OK);

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/query/cmd/DeviceInfoQueryMessageHandler.java

@@ -44,7 +44,7 @@ public class DeviceInfoQueryMessageHandler extends SIPRequestProcessorParent imp
 
     @Override
     public void handForPlatform(RequestEvent evt, ParentPlatform parentPlatform, Element rootElement) {
-        logger.info("接收到DeviceInfo查询消息");
+        logger.info("[DeviceInfo查询]消息");
         FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
         try {
             // 回复200 OK

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java

@@ -111,6 +111,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
                         int sumNum = Integer.parseInt(sumNumElement.getText());
 
                         if (sumNum == 0) {
+                            logger.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId());
                             // 数据已经完整接收
                             storager.cleanChannelsForDevice(take.getDevice().getDeviceId());
                             catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), null);
@@ -132,7 +133,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
                                 }
                                 int sn = Integer.parseInt(snElement.getText());
                                 catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), channelList);
-                                logger.info("收到来自设备【{}】的通道: {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
+                                logger.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.get(take.getDevice().getDeviceId()) == null ? 0 :catalogDataCatch.get(take.getDevice().getDeviceId()).size(), sumNum);
                                 if (catalogDataCatch.get(take.getDevice().getDeviceId()).size() == sumNum) {
                                     // 数据已经完整接收
                                     boolean resetChannelsResult = storager.resetChannels(take.getDevice().getDeviceId(), catalogDataCatch.get(take.getDevice().getDeviceId()));

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/response/impl/RegisterResponseProcessor.java

@@ -26,7 +26,7 @@ import javax.sip.message.Response;
 @Component
 public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
 
-	private Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class);
+	private final Logger logger = LoggerFactory.getLogger(RegisterResponseProcessor.class);
 	private final String method = "REGISTER";
 
 	@Autowired
@@ -69,11 +69,11 @@ public class RegisterResponseProcessor extends SIPResponseProcessorAbstract {
 
 		ParentPlatformCatch parentPlatformCatch = redisCatchStorage.queryPlatformCatchInfo(platformGBId);
 		if (parentPlatformCatch == null) {
-			logger.warn(String.format("收到 %s 的注册/注销%S请求, 但是平台缓存信息未查询到!!!", platformGBId, response.getStatusCode()));
+			logger.warn(String.format("[收到注册/注销%S请求]平台:%s,但是平台缓存信息未查询到!!!", response.getStatusCode(),platformGBId));
 			return;
 		}
 		String action = parentPlatformCatch.getParentPlatform().getExpires().equals("0") ? "注销" : "注册";
-		logger.info(String.format("收到 %s %s的%S响应", platformGBId, action, response.getStatusCode() ));
+		logger.info(String.format("[%s %S响应]%s ", action, response.getStatusCode(), platformGBId ));
 		ParentPlatform parentPlatform = parentPlatformCatch.getParentPlatform();
 		if (parentPlatform == null) {
 			logger.warn(String.format("收到 %s %s的%S请求, 但是平台信息未查询到!!!", platformGBId, action, response.getStatusCode()));

+ 15 - 22
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -102,12 +102,13 @@ public class ZLMHttpHookListener {
 			logger.debug("[ ZLM HOOK ] on_server_keepalive API调用,参数:" + json.toString());
 		}
 		String mediaServerId = json.getString("mediaServerId");
-		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_keepalive);
+		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_keepalive);
 		if (subscribes != null  && subscribes.size() > 0) {
 			for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
 				subscribe.response(null, json);
 			}
 		}
+		mediaServerService.updateMediaServerKeepalive(mediaServerId, json.getJSONObject("data"));
 
 		JSONObject ret = new JSONObject();
 		ret.put("code", 0);
@@ -167,7 +168,7 @@ public class ZLMHttpHookListener {
 			logger.debug("[ ZLM HOOK ]on_play API调用,参数:" + JSON.toJSONString(param));
 		}
 		String mediaServerId = param.getMediaServerId();
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_play, json);
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_play, json);
 		if (subscribe != null ) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 			if (mediaInfo != null) {
@@ -240,6 +241,8 @@ public class ZLMHttpHookListener {
 			if (mediaInfo != null) {
 				assistRESTfulUtils.addStreamCallInfo(mediaInfo, param.getApp(), param.getStream(), callId, null);
 			}
+		}else {
+			zlmMediaListManager.sendStreamEvent(param.getApp(),param.getStream(), param.getMediaServerId());
 		}
 
 		ret.put("code", 0);
@@ -250,7 +253,7 @@ public class ZLMHttpHookListener {
 		}
 
 
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_publish, json);
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
 		if (subscribe != null) {
 			if (mediaInfo != null) {
 				subscribe.response(mediaInfo, json);
@@ -374,7 +377,7 @@ public class ZLMHttpHookListener {
 			logger.debug("[ ZLM HOOK ]on_shell_login API调用,参数:" + json.toString());
 		}
 		String mediaServerId = json.getString("mediaServerId");
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_shell_login, json);
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_shell_login, json);
 		if (subscribe != null ) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 			if (mediaInfo != null) {
@@ -400,7 +403,7 @@ public class ZLMHttpHookListener {
 		logger.info("[ ZLM HOOK ]on_stream_changed API调用,参数:" + JSONObject.toJSONString(item));
 		String mediaServerId = item.getMediaServerId();
 		JSONObject json = (JSONObject) JSON.toJSON(item);
-		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.getSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, json);
+		ZLMHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_stream_changed, json);
 		if (subscribe != null ) {
 			MediaServerItem mediaInfo = mediaServerService.getOne(mediaServerId);
 			if (mediaInfo != null) {
@@ -461,7 +464,6 @@ public class ZLMHttpHookListener {
 							StreamInfo streamInfoByAppAndStream = mediaService.getStreamInfoByAppAndStream(mediaServerItem,
 									app, stream, tracks, streamAuthorityInfo.getCallId());
 							item.setStreamInfo(streamInfoByAppAndStream);
-							item.setSeverId(userSetting.getServerId());
 							redisCatchStorage.addStream(mediaServerItem, type, app, stream, item);
 							if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
 									|| item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
@@ -469,20 +471,6 @@ public class ZLMHttpHookListener {
 								item.setSeverId(userSetting.getServerId());
 								zlmMediaListManager.addPush(item);
 							}
-
-//							List<GbStream> gbStreams = new ArrayList<>();
-//							if (streamPushItem == null || streamPushItem.getGbId() == null) {
-//								GbStream gbStream = storager.getGbStream(app, streamId);
-//								gbStreams.add(gbStream);
-//							}else {
-//								if (streamPushItem.getGbId() != null) {
-//									gbStreams.add(streamPushItem);
-//								}
-//							}
-//							if (gbStreams.size() > 0) {
-//								eventPublisher.catalogEventPublishForStream(null, gbStreams, CatalogEvent.ON);
-//							}
-
 						}else {
 							// 兼容流注销时类型从redis记录获取
 							MediaItem mediaItem = redisCatchStorage.getStreamInfo(app, stream, mediaServerId);
@@ -626,16 +614,21 @@ public class ZLMHttpHookListener {
 		}
 		String remoteAddr = request.getRemoteAddr();
 		jsonObject.put("ip", remoteAddr);
-		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(ZLMHttpHookSubscribe.HookType.on_server_started);
+		List<ZLMHttpHookSubscribe.Event> subscribes = this.subscribe.getSubscribes(HookType.on_server_started);
 		if (subscribes != null  && subscribes.size() > 0) {
 			for (ZLMHttpHookSubscribe.Event subscribe : subscribes) {
 				subscribe.response(null, jsonObject);
 			}
 		}
+
+		ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(jsonObject, ZLMServerConfig.class);
+		if (zlmServerConfig !=null ) {
+			mediaServerService.zlmServerOnline(zlmServerConfig);
+		}
 		JSONObject ret = new JSONObject();
 		ret.put("code", 0);
 		ret.put("msg", "success");
-		return new ResponseEntity<String>(ret.toString(),HttpStatus.OK);
+		return new ResponseEntity<>(ret.toString(),HttpStatus.OK);
 	}
 
 	private Map<String, String> urlParamToMap(String params) {

+ 45 - 41
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookSubscribe.java

@@ -1,12 +1,16 @@
 package com.genersoft.iot.vmp.media.zlm;
 
 import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
+import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @description:针对 ZLMediaServer的hook事件订阅
@@ -16,51 +20,39 @@ import java.util.concurrent.ConcurrentHashMap;
 @Component
 public class ZLMHttpHookSubscribe {
 
-    public enum HookType{
-        on_flow_report,
-        on_http_access,
-        on_play,
-        on_publish,
-        on_record_mp4,
-        on_rtsp_auth,
-        on_rtsp_realm,
-        on_shell_login,
-        on_stream_changed,
-        on_stream_none_reader,
-        on_stream_not_found,
-        on_server_started,
-        on_server_keepalive
-    }
-
     @FunctionalInterface
     public interface Event{
         void response(MediaServerItem mediaServerItem, JSONObject response);
     }
 
-    private Map<HookType, Map<JSONObject, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
+    private Map<HookType, Map<IHookSubscribe, ZLMHttpHookSubscribe.Event>> allSubscribes = new ConcurrentHashMap<>();
 
-    public void addSubscribe(HookType type, JSONObject hookResponse, ZLMHttpHookSubscribe.Event event) {
-        allSubscribes.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(hookResponse, event);
+    public void addSubscribe(IHookSubscribe hookSubscribe, ZLMHttpHookSubscribe.Event event) {
+        if (hookSubscribe.getExpires() == null) {
+            // 默认5分钟过期
+            Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.MINUTES.toSeconds(5));
+            hookSubscribe.setExpires(expiresInstant);
+        }
+        allSubscribes.computeIfAbsent(hookSubscribe.getHookType(), k -> new ConcurrentHashMap<>()).put(hookSubscribe, event);
     }
 
-    public ZLMHttpHookSubscribe.Event getSubscribe(HookType type, JSONObject hookResponse) {
+    public ZLMHttpHookSubscribe.Event sendNotify(HookType type, JSONObject hookResponse) {
         ZLMHttpHookSubscribe.Event event= null;
-        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
         if (eventMap == null) {
             return null;
         }
-        for (JSONObject key : eventMap.keySet()) {
+        for (IHookSubscribe key : eventMap.keySet()) {
             Boolean result = null;
-            for (String s : key.keySet()) {
+            for (String s : key.getContent().keySet()) {
                 if (result == null) {
-                    result = key.getString(s).equals(hookResponse.getString(s));
+                    result = key.getContent().getString(s).equals(hookResponse.getString(s));
                 }else {
-                    if (key.getString(s) == null) {
+                    if (key.getContent().getString(s) == null) {
                         continue;
                     }
-                    result = result && key.getString(s).equals(hookResponse.getString(s));
+                    result = result && key.getContent().getString(s).equals(hookResponse.getString(s));
                 }
-
             }
             if (null != result && result) {
                 event = eventMap.get(key);
@@ -69,26 +61,30 @@ public class ZLMHttpHookSubscribe {
         return event;
     }
 
-    public void removeSubscribe(HookType type, JSONObject hookResponse) {
-        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+    public void removeSubscribe(IHookSubscribe hookSubscribe) {
+        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(hookSubscribe.getHookType());
         if (eventMap == null) {
             return;
         }
 
-        Set<Map.Entry<JSONObject, Event>> entries = eventMap.entrySet();
+        Set<Map.Entry<IHookSubscribe, Event>> entries = eventMap.entrySet();
         if (entries.size() > 0) {
-            List<Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
-            for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entries) {
-                JSONObject key = entry.getKey();
+            List<Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event>> entriesToRemove = new ArrayList<>();
+            for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entries) {
+                JSONObject content = entry.getKey().getContent();
+                if (content == null || content.size() == 0) {
+                    entriesToRemove.add(entry);
+                    continue;
+                }
                 Boolean result = null;
-                for (String s : key.keySet()) {
+                for (String s : content.keySet()) {
                     if (result == null) {
-                        result = key.getString(s).equals(hookResponse.getString(s));
+                        result = content.getString(s).equals(hookSubscribe.getContent().getString(s));
                     }else {
-                        if (key.getString(s) == null) {
+                        if (content.getString(s) == null) {
                             continue;
                         }
-                        result = result && key.getString(s).equals(hookResponse.getString(s));
+                        result = result && content.getString(s).equals(hookSubscribe.getContent().getString(s));
                     }
                 }
                 if (null != result && result){
@@ -97,7 +93,7 @@ public class ZLMHttpHookSubscribe {
             }
 
             if (!CollectionUtils.isEmpty(entriesToRemove)) {
-                for (Map.Entry<JSONObject, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
+                for (Map.Entry<IHookSubscribe, ZLMHttpHookSubscribe.Event> entry : entriesToRemove) {
                     entries.remove(entry);
                 }
             }
@@ -111,17 +107,25 @@ public class ZLMHttpHookSubscribe {
      * @return
      */
     public List<ZLMHttpHookSubscribe.Event> getSubscribes(HookType type) {
-        // ZLMHttpHookSubscribe.Event event= null;
-        Map<JSONObject, Event> eventMap = allSubscribes.get(type);
+        Map<IHookSubscribe, Event> eventMap = allSubscribes.get(type);
         if (eventMap == null) {
             return null;
         }
         List<ZLMHttpHookSubscribe.Event> result = new ArrayList<>();
-        for (JSONObject key : eventMap.keySet()) {
+        for (IHookSubscribe key : eventMap.keySet()) {
             result.add(eventMap.get(key));
         }
         return result;
     }
 
+    public List<IHookSubscribe> getAll(){
+        ArrayList<IHookSubscribe> result = new ArrayList<>();
+        Collection<Map<IHookSubscribe, Event>> values = allSubscribes.values();
+        for (Map<IHookSubscribe, Event> value : values) {
+            result.addAll(value.keySet());
+        }
+        return result;
+    }
+
 
 }

+ 29 - 106
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaListManager.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
 import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.IStreamProxyService;
 import com.genersoft.iot.vmp.service.IStreamPushService;
 import com.genersoft.iot.vmp.service.bean.ThirdPartyGB;
@@ -63,99 +64,48 @@ public class ZLMMediaListManager {
     @Autowired
     private UserSetting userSetting;
 
-    private Map<String, ChannelOnlineEvent> channelOnlineEvents = new ConcurrentHashMap<>();
-
-
-    public void updateMediaList(MediaServerItem mediaServerItem) {
-        storager.clearMediaList();
-
-        // 使用异步的当时更新媒体流列表
-        zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
-            if (mediaList == null) {
-                return;
-            }
-            String dataStr = mediaList.getString("data");
-
-            Integer code = mediaList.getInteger("code");
-            Map<String, StreamPushItem> result = new HashMap<>();
-            List<StreamPushItem> streamPushItems = null;
-            // 获取所有的国标关联
-//            List<GbStream> gbStreams = gbStreamMapper.selectAllByMediaServerId(mediaServerItem.getId());
-            if (code == 0 ) {
-                if (dataStr != null) {
-                    streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem);
-                }
-            }else {
-                logger.warn("更新视频流失败,错误code: " + code);
-            }
-
-            if (streamPushItems != null) {
-                storager.updateMediaList(streamPushItems);
-                for (StreamPushItem streamPushItem : streamPushItems) {
-                    JSONObject jsonObject = new JSONObject();
-                    jsonObject.put("app", streamPushItem.getApp());
-                    jsonObject.put("stream", streamPushItem.getStream());
-                    jsonObject.put("mediaServerId", mediaServerItem.getId());
-                    subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_play,jsonObject,
-                            (MediaServerItem mediaServerItemInuse, JSONObject response)->{
-                                updateMedia(mediaServerItem, response.getString("app"), response.getString("stream"));
-                            }
-                    );
-                }
-            }
-        }));
+    @Autowired
+    private ZLMRTPServerFactory zlmrtpServerFactory;
 
-    }
+    @Autowired
+    private IMediaServerService mediaServerService;
 
-    public void addMedia(MediaServerItem mediaServerItem, String app, String streamId) {
-        //使用异步更新推流
-        updateMedia(mediaServerItem, app, streamId);
-    }
+    private Map<String, ChannelOnlineEvent> channelOnPublishEvents = new ConcurrentHashMap<>();
 
     public StreamPushItem addPush(MediaItem mediaItem) {
         // 查找此直播流是否存在redis预设gbId
         StreamPushItem transform = streamPushService.transform(mediaItem);
         StreamPushItem pushInDb = streamPushService.getPush(mediaItem.getApp(), mediaItem.getStream());
-        transform.setPushIng(true);
+        transform.setPushIng(mediaItem.isRegist());
         transform.setUpdateTime(DateUtil.getNow());
         transform.setPushTime(DateUtil.getNow());
+        transform.setSelf(userSetting.getServerId().equals(mediaItem.getSeverId()));
         if (pushInDb == null) {
             transform.setCreateTime(DateUtil.getNow());
             streamPushMapper.add(transform);
         }else {
             streamPushMapper.update(transform);
         }
+        if (transform != null) {
+            if (getChannelOnlineEventLister(transform.getApp(), transform.getStream()) != null)  {
+                getChannelOnlineEventLister(transform.getApp(), transform.getStream()).run(transform.getApp(), transform.getStream(), transform.getServerId());
+                removedChannelOnlineEventLister(transform.getApp(), transform.getStream());
+            }
+        }
         return transform;
     }
 
-
-    public void updateMedia(MediaServerItem mediaServerItem, String app, String streamId) {
-        //使用异步更新推流
-        zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId, "rtmp", json->{
-
-            if (json == null) {
-                return;
-            }
-            String dataStr = json.getString("data");
-
-            Integer code = json.getInteger("code");
-            Map<String, StreamPushItem> result = new HashMap<>();
-            List<StreamPushItem> streamPushItems = null;
-            if (code == 0 ) {
-                if (dataStr != null) {
-                    streamPushItems = streamPushService.handleJSON(dataStr, mediaServerItem);
-                }
-            }else {
-                logger.warn("更新视频流失败,错误code: " + code);
+    public void sendStreamEvent(String app, String stream, String mediaServerId) {
+        MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
+        // 查看推流状态
+        if (zlmrtpServerFactory.isStreamReady(mediaServerItem, app, stream)) {
+            if (getChannelOnlineEventLister(app, stream) != null)  {
+                getChannelOnlineEventLister(app, stream).run(app, stream, mediaServerId);
+                removedChannelOnlineEventLister(app, stream);
             }
-
-            if (streamPushItems != null && streamPushItems.size() == 1) {
-                storager.updateMedia(streamPushItems.get(0));
-            }
-        });
+        }
     }
 
-
     public int removeMedia(String app, String streamId) {
         // 查找是否关联了国标, 关联了不删除, 置为离线
         GbStream gbStream = gbStreamMapper.selectOne(app, streamId);
@@ -163,48 +113,21 @@ public class ZLMMediaListManager {
         if (gbStream == null) {
             result = storager.removeMedia(app, streamId);
         }else {
-            // TODO 暂不设置为离线
             result =storager.mediaOffline(app, streamId);
         }
         return result;
     }
 
-    public void addChannelOnlineEventLister(String key, ChannelOnlineEvent callback) {
-        this.channelOnlineEvents.put(key,callback);
+    public void addChannelOnlineEventLister(String app, String stream, ChannelOnlineEvent callback) {
+        this.channelOnPublishEvents.put(app + "_" + stream, callback);
     }
 
-    public void removedChannelOnlineEventLister(String key) {
-        this.channelOnlineEvents.remove(key);
+    public void removedChannelOnlineEventLister(String app, String stream) {
+        this.channelOnPublishEvents.remove(app + "_" + stream);
     }
 
+    public ChannelOnlineEvent getChannelOnlineEventLister(String app, String stream) {
+        return this.channelOnPublishEvents.get(app + "_" + stream);
+    }
 
-
-//    public void clearAllSessions() {
-//        logger.info("清空所有国标相关的session");
-//        JSONObject allSessionJSON = zlmresTfulUtils.getAllSession();
-//        ZLMServerConfig mediaInfo = redisCatchStorage.getMediaInfo();
-//        HashSet<String> allLocalPorts = new HashSet();
-//        if (allSessionJSON.getInteger("code") == 0) {
-//            JSONArray data = allSessionJSON.getJSONArray("data");
-//            if (data.size() > 0) {
-//                for (int i = 0; i < data.size(); i++) {
-//                    JSONObject sessionJOSN = data.getJSONObject(i);
-//                    Integer local_port = sessionJOSN.getInteger("local_port");
-//                    if (!local_port.equals(Integer.valueOf(mediaInfo.getHttpPort())) &&
-//                        !local_port.equals(Integer.valueOf(mediaInfo.getHttpSSLport())) &&
-//                        !local_port.equals(Integer.valueOf(mediaInfo.getRtmpPort())) &&
-//                        !local_port.equals(Integer.valueOf(mediaInfo.getRtspPort())) &&
-//                        !local_port.equals(Integer.valueOf(mediaInfo.getRtspSSlport())) &&
-//                        !local_port.equals(Integer.valueOf(mediaInfo.getHookOnFlowReport()))){
-//                        allLocalPorts.add(sessionJOSN.getInteger("local_port") + "");
-//                     }
-//                }
-//            }
-//        }
-//        if (allLocalPorts.size() > 0) {
-//            List<String> result = new ArrayList<>(allLocalPorts);
-//            String localPortSStr = String.join(",", result);
-//            zlmresTfulUtils.kickSessions(localPortSStr);
-//        }
-//    }
 }

+ 10 - 4
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRTPServerFactory.java

@@ -87,7 +87,7 @@ public class ZLMRTPServerFactory {
         return result;
     }
 
-    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc) {
+    public int createRTPServer(MediaServerItem mediaServerItem, String streamId, int ssrc, Integer port) {
         int result = -1;
         // 查询此rtp server 是否已经存在
         JSONObject rtpInfo = zlmresTfulUtils.getRtpInfo(mediaServerItem, streamId);
@@ -105,7 +105,11 @@ public class ZLMRTPServerFactory {
         param.put("enable_tcp", 1);
         param.put("stream_id", streamId);
         // 推流端口设置0则使用随机端口
-        param.put("port", 0);
+        if (port == null) {
+            param.put("port", 0);
+        }else {
+            param.put("port", port);
+        }
         param.put("ssrc", ssrc);
         JSONObject openRtpServerResultJson = zlmresTfulUtils.openRtpServer(mediaServerItem, param);
 
@@ -273,8 +277,10 @@ public class ZLMRTPServerFactory {
      * 查询待转推的流是否就绪
      */
     public Boolean isStreamReady(MediaServerItem mediaServerItem, String app, String streamId) {
-        JSONObject mediaInfo = zlmresTfulUtils.getMediaInfo(mediaServerItem, app, "rtmp", streamId);
-        return (mediaInfo.getInteger("code") == 0 && mediaInfo.getBoolean("online"));
+        JSONObject mediaInfo = zlmresTfulUtils.getMediaList(mediaServerItem, app, streamId);
+        return (mediaInfo.getInteger("code") == 0
+                && mediaInfo.getJSONArray("data") != null
+                && mediaInfo.getJSONArray("data").size() > 0);
     }
 
     /**

+ 19 - 21
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java

@@ -6,22 +6,22 @@ import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.MediaConfig;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForServerStarted;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.CommandLineRunner;
 import org.springframework.core.annotation.Order;
 import org.springframework.scheduling.annotation.Async;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
-import org.springframework.util.StringUtils;
 
+import java.time.Instant;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 @Component
 @Order(value=1)
@@ -37,18 +37,12 @@ public class ZLMRunner implements CommandLineRunner {
     @Autowired
     private ZLMHttpHookSubscribe hookSubscribe;
 
-    @Autowired
-    private IStreamProxyService streamProxyService;
-
     @Autowired
     private EventPublisher publisher;
 
     @Autowired
     private IMediaServerService mediaServerService;
 
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
     @Autowired
     private MediaConfig mediaConfig;
 
@@ -67,26 +61,24 @@ public class ZLMRunner implements CommandLineRunner {
             mediaServerService.updateToDatabase(mediaSerItem);
         }
         mediaServerService.syncCatchFromDatabase();
+        HookSubscribeForServerStarted hookSubscribeForServerStarted = HookSubscribeFactory.on_server_started();
+//        Instant expiresInstant = Instant.now().plusSeconds(TimeUnit.SECONDS.toSeconds(60));
+//        hookSubscribeForStreamChange.setExpires(expiresInstant);
         // 订阅 zlm启动事件, 新的zlm也会从这里进入系统
-        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_started,new JSONObject(),
+        hookSubscribe.addSubscribe(hookSubscribeForServerStarted,
                 (MediaServerItem mediaServerItem, JSONObject response)->{
             ZLMServerConfig zlmServerConfig = JSONObject.toJavaObject(response, ZLMServerConfig.class);
             if (zlmServerConfig !=null ) {
                 if (startGetMedia != null) {
                     startGetMedia.remove(zlmServerConfig.getGeneralMediaServerId());
+                    if (startGetMedia.size() == 0) {
+                        hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+                    }
                 }
-                mediaServerService.zlmServerOnline(zlmServerConfig);
             }
         });
 
-        // 订阅 zlm保活事件, 当zlm离线时做业务的处理
-        hookSubscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_server_keepalive,new JSONObject(),
-                (MediaServerItem mediaServerItem, JSONObject response)->{
-                    String mediaServerId = response.getString("mediaServerId");
-                    if (mediaServerId !=null ) {
-                        mediaServerService.updateMediaServerKeepalive(mediaServerId, response.getJSONObject("data"));
-                    }
-                });
+
 
         // 获取zlm信息
         logger.info("[zlm] 等待默认zlm中...");
@@ -125,6 +117,9 @@ public class ZLMRunner implements CommandLineRunner {
             zlmServerConfigFirst.setIp(mediaServerItem.getIp());
             zlmServerConfigFirst.setHttpPort(mediaServerItem.getHttpPort());
             startGetMedia.remove(mediaServerItem.getId());
+            if (startGetMedia.size() == 0) {
+                hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+            }
             mediaServerService.zlmServerOnline(zlmServerConfigFirst);
         }else {
             logger.info("[ {} ]-[ {}:{} ]主动连接失败, 清理相关资源, 开始尝试重试连接",
@@ -139,6 +134,9 @@ public class ZLMRunner implements CommandLineRunner {
                 zlmServerConfig.setIp(mediaServerItem.getIp());
                 zlmServerConfig.setHttpPort(mediaServerItem.getHttpPort());
                 startGetMedia.remove(mediaServerItem.getId());
+                if (startGetMedia.size() == 0) {
+                    hookSubscribe.removeSubscribe(HookSubscribeFactory.on_server_started());
+                }
                 mediaServerService.zlmServerOnline(zlmServerConfig);
             }
         }, 2000);

+ 33 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeFactory.java

@@ -0,0 +1,33 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+
+import com.alibaba.fastjson.JSONObject;
+
+/**
+ * hook 订阅工厂
+ * @author lin
+ */
+public class HookSubscribeFactory {
+
+    public static HookSubscribeForStreamChange on_stream_changed(String app, String stream, boolean regist, String scheam, String mediaServerId) {
+        HookSubscribeForStreamChange hookSubscribe = new HookSubscribeForStreamChange();
+        JSONObject subscribeKey = new com.alibaba.fastjson.JSONObject();
+        subscribeKey.put("app", app);
+        subscribeKey.put("stream", stream);
+        subscribeKey.put("regist", regist);
+        if (scheam != null) {
+            subscribeKey.put("schema", scheam);
+        }
+        subscribeKey.put("mediaServerId", mediaServerId);
+        hookSubscribe.setContent(subscribeKey);
+
+        return hookSubscribe;
+    }
+
+    public static HookSubscribeForServerStarted on_server_started() {
+        HookSubscribeForServerStarted hookSubscribe = new HookSubscribeForServerStarted();
+        hookSubscribe.setContent(new JSONObject());
+
+        return hookSubscribe;
+    }
+}

+ 44 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForServerStarted.java

@@ -0,0 +1,44 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.time.Instant;
+
+/**
+ * hook订阅-流变化
+ * @author lin
+ */
+public class HookSubscribeForServerStarted implements IHookSubscribe{
+
+    private HookType hookType = HookType.on_server_started;
+
+    private JSONObject content;
+
+    @JSONField(format="yyyy-MM-dd HH:mm:ss")
+    private Instant expires;
+
+    @Override
+    public HookType getHookType() {
+        return hookType;
+    }
+
+    @Override
+    public JSONObject getContent() {
+        return content;
+    }
+
+    public void setContent(JSONObject content) {
+        this.content = content;
+    }
+
+    @Override
+    public Instant getExpires() {
+        return expires;
+    }
+
+    @Override
+    public void setExpires(Instant expires) {
+        this.expires = expires;
+    }
+}

+ 43 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookSubscribeForStreamChange.java

@@ -0,0 +1,43 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.annotation.JSONField;
+
+import java.time.Instant;
+
+/**
+ * hook订阅-流变化
+ * @author lin
+ */
+public class HookSubscribeForStreamChange implements IHookSubscribe{
+
+    private HookType hookType = HookType.on_stream_changed;
+
+    private JSONObject content;
+
+    private Instant expires;
+
+    @Override
+    public HookType getHookType() {
+        return hookType;
+    }
+
+    @Override
+    public JSONObject getContent() {
+        return content;
+    }
+
+    public void setContent(JSONObject content) {
+        this.content = content;
+    }
+
+    @Override
+    public Instant getExpires() {
+        return expires;
+    }
+
+    @Override
+    public void setExpires(Instant expires) {
+        this.expires = expires;
+    }
+}

+ 23 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/HookType.java

@@ -0,0 +1,23 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+/**
+ * hook类型
+ * @author lin
+ */
+
+public enum HookType {
+
+    on_flow_report,
+    on_http_access,
+    on_play,
+    on_publish,
+    on_record_mp4,
+    on_rtsp_auth,
+    on_rtsp_realm,
+    on_shell_login,
+    on_stream_changed,
+    on_stream_none_reader,
+    on_stream_not_found,
+    on_server_started,
+    on_server_keepalive
+}

+ 36 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/IHookSubscribe.java

@@ -0,0 +1,36 @@
+package com.genersoft.iot.vmp.media.zlm.dto;
+
+import com.alibaba.fastjson.JSONObject;
+
+import java.time.Instant;
+
+/**
+ * zlm hook事件的参数
+ * @author lin
+ */
+public interface IHookSubscribe {
+
+    /**
+     * 获取hook类型
+     * @return hook类型
+     */
+    HookType getHookType();
+
+    /**
+     * 获取hook的具体内容
+     * @return hook的具体内容
+     */
+    JSONObject getContent();
+
+    /**
+     * 设置过期时间
+     * @param instant 过期时间
+     */
+    void setExpires(Instant instant);
+
+    /**
+     * 获取过期时间
+     * @return 过期时间
+     */
+    Instant getExpires();
+}

+ 15 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamPushItem.java

@@ -108,6 +108,13 @@ public class StreamPushItem extends GbStream implements Comparable<StreamPushIte
      */
     private boolean pushIng;
 
+    /**
+     * 是否自己平台的推流
+     */
+    private boolean self;
+
+
+
     public String getVhost() {
         return vhost;
     }
@@ -290,5 +297,13 @@ public class StreamPushItem extends GbStream implements Comparable<StreamPushIte
     public void setPushIng(boolean pushIng) {
         this.pushIng = pushIng;
     }
+
+    public boolean isSelf() {
+        return self;
+    }
+
+    public void setSelf(boolean self) {
+        this.self = self;
+    }
 }
 

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/service/IMediaServerService.java

@@ -48,6 +48,8 @@ public interface IMediaServerService {
 
     SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback);
 
+    SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback, Integer port);
+
     void closeRTPServer(String deviceId, String channelId, String ssrc);
 
     void clearRTPServer(MediaServerItem mediaServerItem);

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/service/IStreamPushService.java

@@ -96,4 +96,8 @@ public interface IStreamPushService {
      */
     void online(List<StreamPushItemFromRedis> onlineStreams);
 
+    /**
+     * 增加推流
+     */
+    boolean add(StreamPushItem stream);
 }

+ 0 - 38
src/main/java/com/genersoft/iot/vmp/service/StreamGPSSubscribeTask.java

@@ -1,38 +0,0 @@
-package com.genersoft.iot.vmp.service;
-
-import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
-import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.scheduling.annotation.Scheduled;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-
-/**
- * 定时查找redis中的GPS推送消息,并保存到对应的流中
- */
-@Component
-public class StreamGPSSubscribeTask {
-
-    @Autowired
-    private IRedisCatchStorage redisCatchStorage;
-
-    @Autowired
-    private IVideoManagerStorage storager;
-
-
-    @Scheduled(fixedRate = 30 * 1000)   //每30秒执行一次
-    public void execute(){
-        List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo();
-        if (gpsMsgInfo.size() > 0) {
-            storager.updateStreamGPS(gpsMsgInfo);
-            for (GPSMsgInfo msgInfo : gpsMsgInfo) {
-                msgInfo.setStored(true);
-                redisCatchStorage.updateGpsMsgInfo(msgInfo);
-            }
-        }
-
-    }
-}

+ 20 - 7
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@@ -86,10 +86,10 @@ public class DeviceServiceImpl implements IDeviceService {
             redisCatchStorage.clearCatchByDeviceId(device.getDeviceId());
         }
         device.setUpdateTime(now);
-        device.setOnline(1);
 
-        // 第一次上线
+        // 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询
         if (device.getCreateTime() == null) {
+            device.setOnline(1);
             device.setCreateTime(now);
             logger.info("[设备上线,首次注册]: {},查询设备信息以及通道信息", device.getDeviceId());
             deviceMapper.add(device);
@@ -97,8 +97,19 @@ public class DeviceServiceImpl implements IDeviceService {
             commander.deviceInfoQuery(device);
             sync(device);
         }else {
-            deviceMapper.update(device);
-            redisCatchStorage.updateDevice(device);
+            if(device.getOnline() == 0){
+                device.setOnline(1);
+                device.setCreateTime(now);
+                logger.info("[设备上线,离线状态下重新注册]: {},查询设备信息以及通道信息", device.getDeviceId());
+                deviceMapper.update(device);
+                redisCatchStorage.updateDevice(device);
+                commander.deviceInfoQuery(device);
+                sync(device);
+            }else {
+                deviceMapper.update(device);
+                redisCatchStorage.updateDevice(device);
+            }
+
         }
 
         // 上线添加订阅
@@ -125,6 +136,8 @@ public class DeviceServiceImpl implements IDeviceService {
         device.setOnline(0);
         redisCatchStorage.updateDevice(device);
         deviceMapper.update(device);
+        //进行通道离线
+        deviceChannelMapper.offlineByDeviceId(deviceId);
         // 离线释放所有ssrc
         List<SsrcTransaction> ssrcTransactions = streamSession.getSsrcTransactionForAll(deviceId, null, null, null);
         if (ssrcTransactions != null && ssrcTransactions.size() > 0) {
@@ -147,7 +160,7 @@ public class DeviceServiceImpl implements IDeviceService {
         logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
         // 添加目录订阅
         CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander, dynamicTask);
-        // 提前开始刷新订阅
+        // 刷新订阅
         int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
         // 设置最小值为30
         dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, (subscribeCycleForCatalog -1) * 1000);
@@ -182,8 +195,8 @@ public class DeviceServiceImpl implements IDeviceService {
         MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander, dynamicTask);
         // 设置最小值为30
         int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
-        // 提前开始刷新订阅
-        dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, (subscribeCycleForCatalog -1 ) * 1000);
+        // 刷新订阅
+        dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, (subscribeCycleForCatalog) * 1000);
         return true;
     }
 

+ 32 - 33
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@@ -1,12 +1,29 @@
 package com.genersoft.iot.vmp.service.impl;
 
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.util.StringUtils;
+
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.session.SsrcConfig;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
@@ -15,30 +32,16 @@ import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerConfig;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
-import com.genersoft.iot.vmp.service.IStreamProxyService;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
-import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.utils.redis.JedisUtil;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
-import okhttp3.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.jdbc.datasource.DataSourceTransactionManager;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.TransactionDefinition;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.util.StringUtils;
 
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.time.LocalDateTime;
-import java.util.*;
-import java.util.stream.Collectors;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.Response;
 
 /**
  * 媒体服务器节点管理
@@ -81,12 +84,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
     @Autowired
     private RedisUtil redisUtil;
 
-    @Autowired
-    private IVideoManagerStorage storager;
-
-    @Autowired
-    private IStreamProxyService streamProxyService;
-
     @Autowired
     private EventPublisher publisher;
 
@@ -124,7 +121,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
     }
 
     @Override
-    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback) {
+    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port) {
         if (mediaServerItem == null || mediaServerItem.getId() == null) {
             return null;
         }
@@ -152,13 +149,18 @@ public class MediaServerServiceImpl implements IMediaServerService {
             }
             int rtpServerPort = mediaServerItem.getRtpProxyPort();
             if (mediaServerItem.isRtpEnable()) {
-                rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0);
+                rtpServerPort = zlmrtpServerFactory.createRTPServer(mediaServerItem, streamId, ssrcCheck?Integer.parseInt(ssrc):0, port);
             }
             redisUtil.set(key, mediaServerItem);
             return new SSRCInfo(rtpServerPort, ssrc, streamId);
         }
     }
 
+    @Override
+    public SSRCInfo openRTPServer(MediaServerItem mediaServerItem, String streamId, String ssrc, boolean ssrcCheck, boolean isPlayback) {
+        return openRTPServer(mediaServerItem, streamId, ssrc, ssrcCheck, isPlayback, null);
+    }
+
     @Override
     public void closeRTPServer(String deviceId, String channelId, String stream) {
         String mediaServerId = streamSession.getMediaServerId(deviceId, channelId, stream);
@@ -355,14 +357,15 @@ public class MediaServerServiceImpl implements IMediaServerService {
      */
     @Override
     public void zlmServerOnline(ZLMServerConfig zlmServerConfig) {
-        logger.info("[ZLM] 正在连接 : {} -> {}:{}",
-                zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
 
         MediaServerItem serverItem = mediaServerMapper.queryOne(zlmServerConfig.getGeneralMediaServerId());
         if (serverItem == null) {
             logger.warn("[未注册的zlm] 拒接接入:{}来自{}:{}", zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(),zlmServerConfig.getHttpPort() );
             logger.warn("请检查ZLM的<general.mediaServerId>配置是否与WVP的<media.id>一致");
             return;
+        }else {
+            logger.info("[ZLM] 正在连接 : {} -> {}:{}",
+                    zlmServerConfig.getGeneralMediaServerId(), zlmServerConfig.getIp(), zlmServerConfig.getHttpPort());
         }
         serverItem.setHookAliveInterval(zlmServerConfig.getHookAliveInterval());
         if (serverItem.getHttpPort() == 0) {
@@ -599,9 +602,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
         boolean result = false;
         OkHttpClient client = new OkHttpClient();
         String url = String.format("http://%s:%s/index/api/record",  ip, port);
-
-        FormBody.Builder builder = new FormBody.Builder();
-
         Request request = new Request.Builder()
                 .get()
                 .url(url)
@@ -633,7 +633,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
         MediaServerItem mediaServerItem = getOne(mediaServerId);
         if (mediaServerItem == null) {
             // zlm连接重试
-
             logger.warn("[更新ZLM 保活信息]失败,未找到流媒体信息");
             return;
         }
@@ -652,7 +651,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
             mediaServerItemMap.put(mediaServerItem.getId(), mediaServerItem);
         }
         for (MediaServerItem mediaServerItem : allInCatch) {
-            if (mediaServerItemMap.get(mediaServerItem) == null) {
+            if (!mediaServerItemMap.containsKey(mediaServerItem.getId())) {
                 delete(mediaServerItem.getId());
             }
         }

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java

@@ -67,9 +67,9 @@ public class MediaServiceImpl implements IMediaService {
                 JSONObject mediaJSON = JSON.parseObject(JSON.toJSONString(data.get(0)), JSONObject.class);
                 JSONArray tracks = mediaJSON.getJSONArray("tracks");
                 if (authority) {
-                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, streamAuthorityInfo.getCallId());
+                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,streamAuthorityInfo.getCallId());
                 }else {
-                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, null);
+                    streamInfo = getStreamInfoByAppAndStream(mediaInfo, app, stream, tracks, addr,null);
                 }
 
             }

+ 33 - 34
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -1,18 +1,43 @@
 package com.genersoft.iot.vmp.service.impl;
 
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+import javax.sip.ResponseEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Service;
+import org.springframework.web.context.request.async.DeferredResult;
+
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.UserSetting;
-import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.bean.Device;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.InviteStreamCallback;
+import com.genersoft.iot.vmp.gb28181.bean.InviteStreamInfo;
+import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
+import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
+import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderFroPlatform;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
 import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.media.zlm.AssistRESTfulUtils;
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -27,23 +52,11 @@ import com.genersoft.iot.vmp.service.bean.PlayBackResult;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import com.genersoft.iot.vmp.vmanager.gb28181.play.bean.PlayResult;
-import gov.nist.javax.sip.stack.SIPDialog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.stereotype.Service;
-import org.springframework.util.ResourceUtils;
-import org.springframework.web.context.request.async.DeferredResult;
 
-import javax.sip.ResponseEvent;
-import java.io.FileNotFoundException;
-import java.math.BigDecimal;
-import java.math.RoundingMode;
-import java.util.*;
+import gov.nist.javax.sip.stack.SIPDialog;
 
 @SuppressWarnings(value = {"rawtypes", "unchecked"})
 @Service
@@ -296,16 +309,10 @@ public class PlayServiceImpl implements IPlayService {
                     // 单端口模式streamId也有变化,需要重新设置监听
                     if (!mediaServerItem.isRtpEnable()) {
                         // 添加订阅
-                        JSONObject subscribeKey = new JSONObject();
-                        subscribeKey.put("app", "rtp");
-                        subscribeKey.put("stream", stream);
-                        subscribeKey.put("regist", true);
-                        subscribeKey.put("schema", "rtmp");
-                        subscribeKey.put("mediaServerId", mediaServerItem.getId());
-                        subscribe.removeSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed,subscribeKey);
-                        subscribeKey.put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
-                        subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-                                (MediaServerItem mediaServerItemInUse, JSONObject response)->{
+                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", stream, true, "rtmp", mediaServerItem.getId());
+                        subscribe.removeSubscribe(hookSubscribe);
+                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
+                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response)->{
                                     logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
                                     dynamicTask.stop(timeOutTaskKey);
                                     // hook响应
@@ -316,7 +323,7 @@ public class PlayServiceImpl implements IPlayService {
                     // 关闭rtp server
                     mediaServerService.closeRTPServer(device.getDeviceId(), channelId, finalSsrcInfo.getStream());
                     // 重新开启ssrc server
-                    mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false);
+                    mediaServerService.openRTPServer(mediaServerItem, finalSsrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), false, finalSsrcInfo.getPort());
 
                 }
             }
@@ -531,14 +538,6 @@ public class PlayServiceImpl implements IPlayService {
                     StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
                     streamInfo.setStartTime(startTime);
                     streamInfo.setEndTime(endTime);
-                    if (streamInfo == null) {
-                        logger.warn("录像下载API调用失败!");
-                        wvpResult.setCode(-1);
-                        wvpResult.setMsg("录像下载API调用失败");
-                        downloadResult.setCode(-1);
-                        hookCallBack.call(downloadResult);
-                        return ;
-                    }
                     redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
                     wvpResult.setCode(0);
                     wvpResult.setMsg("success");

+ 6 - 8
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGbPlayMsgListener.java

@@ -8,6 +8,9 @@ import com.genersoft.iot.vmp.gb28181.bean.SendRtpItem;
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMRTPServerFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
+import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.*;
@@ -270,14 +273,9 @@ public class RedisGbPlayMsgListener implements MessageListener {
             }, userSetting.getPlatformPlayTimeout());
 
             // 添加订阅
-            JSONObject subscribeKey = new JSONObject();
-            subscribeKey.put("app", content.getApp());
-            subscribeKey.put("stream", content.getStream());
-            subscribeKey.put("regist", true);
-            subscribeKey.put("schema", "rtmp");
-            subscribeKey.put("mediaServerId", mediaServerItem.getId());
-            subscribe.addSubscribe(ZLMHttpHookSubscribe.HookType.on_stream_changed, subscribeKey,
-                    (MediaServerItem mediaServerItemInUse, JSONObject json)->{
+            HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed(content.getApp(), content.getStream(), true, "rtmp", mediaServerItem.getId());
+
+            subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json)->{
                         dynamicTask.stop(taskKey);
                         responseSendItem(mediaServerItem, content, toId, serial);
                     });

+ 48 - 3
src/main/java/com/genersoft/iot/vmp/service/impl/RedisGpsMsgListener.java

@@ -1,16 +1,24 @@
 package com.genersoft.iot.vmp.service.impl;
 
 import com.alibaba.fastjson.JSON;
+import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
 /**
  * 接收来自redis的GPS更新通知
  * @author lin
@@ -20,13 +28,50 @@ public class RedisGpsMsgListener implements MessageListener {
 
     private final static Logger logger = LoggerFactory.getLogger(RedisGpsMsgListener.class);
 
+    private boolean taskQueueHandlerRun = false;
+
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+    @Autowired
+    private IVideoManagerStorage storager;
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
+    @Autowired
+    private ThreadPoolTaskExecutor taskExecutor;
+
+
     @Override
     public void onMessage(@NotNull Message message, byte[] bytes) {
-        // TODO 加消息队列
-        GPSMsgInfo gpsMsgInfo = JSON.parseObject(message.getBody(), GPSMsgInfo.class);
-        redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
+        taskQueue.offer(message);
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    GPSMsgInfo gpsMsgInfo = JSON.parseObject(msg.getBody(), GPSMsgInfo.class);
+                    // 只是放入redis缓存起来
+                    redisCatchStorage.updateGpsMsgInfo(gpsMsgInfo);
+                }
+                taskQueueHandlerRun = false;
+            });
+        }
+    }
+
+    /**
+     * 定时将经纬度更新到数据库
+     */
+    @Scheduled(fixedRate = 2 * 1000)   //每2秒执行一次
+    public void execute(){
+        List<GPSMsgInfo> gpsMsgInfo = redisCatchStorage.getAllGpsMsgInfo();
+        if (gpsMsgInfo.size() > 0) {
+            storager.updateStreamGPS(gpsMsgInfo);
+            for (GPSMsgInfo msgInfo : gpsMsgInfo) {
+                msgInfo.setStored(true);
+                redisCatchStorage.updateGpsMsgInfo(msgInfo);
+            }
+        }
     }
 }

+ 46 - 23
src/main/java/com/genersoft/iot/vmp/service/impl/RedisPushStreamStatusMsgListener.java

@@ -14,6 +14,7 @@ import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaItem;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.service.bean.PushStreamStatusChangeFromRedisDto;
 import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -21,14 +22,17 @@ import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.data.redis.connection.Message;
 import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 
 /**
@@ -40,6 +44,8 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
 
     private final static Logger logger = LoggerFactory.getLogger(RedisPushStreamStatusMsgListener.class);
 
+    private boolean taskQueueHandlerRun = false;
+
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
@@ -47,34 +53,51 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
     private IStreamPushService streamPushService;
 
     @Autowired
-    private EventPublisher eventPublisher;
+    private DynamicTask dynamicTask;
 
-    @Autowired
-    private UserSetting userSetting;
 
+
+    private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
+
+    @Qualifier("taskExecutor")
     @Autowired
-    private DynamicTask dynamicTask;
+    private ThreadPoolTaskExecutor taskExecutor;
 
     @Override
     public void onMessage(Message message, byte[] bytes) {
-        PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(message.getBody(), PushStreamStatusChangeFromRedisDto.class);
-        if (statusChangeFromPushStream == null) {
-            logger.warn("[REDIS 消息]推流设备状态变化消息解析失败");
-            return;
-        }
-        if (statusChangeFromPushStream.isSetAllOffline()) {
-            // 所有设备离线
-            streamPushService.allStreamOffline();
-        }
-        if (statusChangeFromPushStream.getOfflineStreams() != null
-                && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
-            // 更新部分设备离线
-            streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
-        }
-        if (statusChangeFromPushStream.getOnlineStreams() != null &&
-                statusChangeFromPushStream.getOnlineStreams().size() > 0) {
-            // 更新部分设备上线
-            streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
+        // TODO 增加队列
+        logger.warn("[REDIS消息-推流设备状态变化]: {}", new String(message.getBody()));
+        taskQueue.offer(message);
+
+        if (!taskQueueHandlerRun) {
+            taskQueueHandlerRun = true;
+            taskExecutor.execute(() -> {
+                while (!taskQueue.isEmpty()) {
+                    Message msg = taskQueue.poll();
+                    PushStreamStatusChangeFromRedisDto statusChangeFromPushStream = JSON.parseObject(msg.getBody(), PushStreamStatusChangeFromRedisDto.class);
+                    if (statusChangeFromPushStream == null) {
+                        logger.warn("[REDIS消息]推流设备状态变化消息解析失败");
+                        return;
+                    }
+                    // 取消定时任务
+                    dynamicTask.stop(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED);
+                    if (statusChangeFromPushStream.isSetAllOffline()) {
+                        // 所有设备离线
+                        streamPushService.allStreamOffline();
+                    }
+                    if (statusChangeFromPushStream.getOfflineStreams() != null
+                            && statusChangeFromPushStream.getOfflineStreams().size() > 0) {
+                        // 更新部分设备离线
+                        streamPushService.offline(statusChangeFromPushStream.getOfflineStreams());
+                    }
+                    if (statusChangeFromPushStream.getOnlineStreams() != null &&
+                            statusChangeFromPushStream.getOnlineStreams().size() > 0) {
+                        // 更新部分设备上线
+                        streamPushService.online(statusChangeFromPushStream.getOnlineStreams());
+                    }
+                }
+                taskQueueHandlerRun = false;
+            });
         }
     }
 
@@ -83,7 +106,7 @@ public class RedisPushStreamStatusMsgListener implements MessageListener, Applic
         //  启动时设置所有推流通道离线,发起查询请求
         redisCatchStorage.sendStreamPushRequestedMsgForStatus();
         dynamicTask.startDelay(VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED, ()->{
-            logger.info("[REDIS 消息]未收到redis回复推流设备状态,执行推流设备离线");
+            logger.info("[REDIS消息]未收到redis回复推流设备状态,执行推流设备离线");
             // 五秒收不到请求就设置通道离线,然后通知上级离线
             streamPushService.allStreamOffline();
         }, 5000);

+ 7 - 6
src/main/java/com/genersoft/iot/vmp/service/impl/RedisStreamMsgListener.java

@@ -46,7 +46,7 @@ public class RedisStreamMsgListener implements MessageListener {
 
         JSONObject steamMsgJson = JSON.parseObject(message.getBody(), JSONObject.class);
         if (steamMsgJson == null) {
-            logger.warn("[REDIS的ALARM通知]消息解析失败");
+            logger.warn("[收到redis 流变化]消息解析失败");
             return;
         }
         String serverId = steamMsgJson.getString("serverId");
@@ -55,7 +55,7 @@ public class RedisStreamMsgListener implements MessageListener {
             // 自己发送的消息忽略即可
             return;
         }
-        logger.info("[REDIS通知] 流变化: {}", new String(message.getBody()));
+        logger.info("[收到redis 流变化]: {}", new String(message.getBody()));
         String app = steamMsgJson.getString("app");
         String stream = steamMsgJson.getString("stream");
         boolean register = steamMsgJson.getBoolean("register");
@@ -72,9 +72,10 @@ public class RedisStreamMsgListener implements MessageListener {
         mediaItem.setOriginType(0);
         mediaItem.setOriginTypeStr("0");
         mediaItem.setOriginTypeStr("unknown");
-
-        zlmMediaListManager.addPush(mediaItem);
-
-
+        if (register) {
+            zlmMediaListManager.addPush(mediaItem);
+        }else {
+            zlmMediaListManager.removeMedia(app, stream);
+        }
     }
 }

+ 32 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -22,7 +22,10 @@ import com.github.pagehelper.PageInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
 import org.springframework.util.StringUtils;
 
 import java.util.*;
@@ -69,6 +72,12 @@ public class StreamPushServiceImpl implements IStreamPushService {
     @Autowired
     private IMediaServerService mediaServerService;
 
+    @Autowired
+    DataSourceTransactionManager dataSourceTransactionManager;
+
+    @Autowired
+    TransactionDefinition transactionDefinition;
+
     @Override
     public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
         if (jsonData == null) {
@@ -463,4 +472,27 @@ public class StreamPushServiceImpl implements IStreamPushService {
         // 发送通知
         eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
     }
+
+    @Override
+    public boolean add(StreamPushItem stream) {
+        stream.setUpdateTime(DateUtil.getNow());
+        stream.setCreateTime(DateUtil.getNow());
+        stream.setServerId(userSetting.getServerId());
+
+        // 放在事务内执行
+        boolean result = false;
+        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
+        try {
+            int addStreamResult = streamPushMapper.add(stream);
+            if (!StringUtils.isEmpty(stream.getGbId())) {
+                gbStreamMapper.add(stream);
+            }
+            dataSourceTransactionManager.commit(transactionStatus);
+            result = true;
+        }catch (Exception e) {
+            logger.error("批量移除流与平台的关系时错误", e);
+            dataSourceTransactionManager.rollback(transactionStatus);
+        }
+        return result;
+    }
 }

+ 0 - 27
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java

@@ -299,18 +299,6 @@ public interface IVideoManagerStorage {
 	 */
 	List<DeviceChannel> queryGbStreamListInPlatform(String platformId);
 
-	/**
-	 * 批量更新推流列表
-	 * @param streamPushItems
-	 */
-	void updateMediaList(List<StreamPushItem> streamPushItems);
-
-	/**
-	 * 更新单个推流
-	 * @param streamPushItem
-	 */
-	void updateMedia(StreamPushItem streamPushItem);
-
 	/**
 	 * 移除单个推流
 	 * @param app
@@ -318,21 +306,6 @@ public interface IVideoManagerStorage {
 	 */
 	int removeMedia(String app, String stream);
 
-
-	/**
-	 * 获取但个推流
-	 * @param app
-	 * @param stream
-	 * @return
-	 */
-	StreamPushItem getMedia(String app, String stream);
-
-
-	/**
-	 * 清空推流列表
-	 */
-	void clearMediaList();
-
 	/**
 	 * 设置流离线
 	 */

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java

@@ -140,6 +140,9 @@ public interface DeviceChannelMapper {
     @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId} AND channelId=#{channelId}"})
     void offline(String deviceId,  String channelId);
 
+    @Update(value = {"UPDATE device_channel SET status=0 WHERE deviceId=#{deviceId}"})
+    void offlineByDeviceId(String deviceId);
+
     @Update(value = {"UPDATE device_channel SET status=1 WHERE deviceId=#{deviceId} AND channelId=#{channelId}"})
     void online(String deviceId,  String channelId);
 

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/storager/dao/PlatformCatalogMapper.java

@@ -50,7 +50,7 @@ public interface PlatformCatalogMapper {
     @Select("SELECT pc.* FROM  platform_catalog pc WHERE  pc.id = #{id}")
     PlatformCatalog selectParentCatalog(String id);
 
-    @Select("SELECT pc.id as channelId, pc.name, pc.civilCode, pc.businessGroupId,'0' as parental, pc.parentId  " +
+    @Select("SELECT pc.id as channelId, pc.name, pc.civilCode, pc.businessGroupId,'1' as parental, pc.parentId  " +
             " FROM platform_catalog pc WHERE pc.platformId=#{platformId}")
     List<DeviceChannel> queryCatalogInPlatform(String platformId);
 }

+ 5 - 3
src/main/java/com/genersoft/iot/vmp/storager/dao/StreamPushMapper.java

@@ -15,9 +15,10 @@ import java.util.List;
 public interface StreamPushMapper {
 
     @Insert("INSERT INTO stream_push (app, stream, totalReaderCount, originType, originTypeStr, " +
-            "pushTime, aliveSecond, mediaServerId, serverId, updateTime, createTime, pushIng) VALUES" +
+            "pushTime, aliveSecond, mediaServerId, serverId, updateTime, createTime, pushIng, self) VALUES" +
             "('${app}', '${stream}', '${totalReaderCount}', '${originType}', '${originTypeStr}', " +
-            "'${pushTime}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' , '${updateTime}' , '${createTime}', ${pushIng} )")
+            "'${pushTime}', '${aliveSecond}', '${mediaServerId}' , '${serverId}' , '${updateTime}' , '${createTime}', " +
+            "${pushIng}, ${self} )")
     int add(StreamPushItem streamPushItem);
 
 
@@ -31,6 +32,7 @@ public interface StreamPushMapper {
             "<if test=\"pushTime != null\">, pushTime='${pushTime}'</if>" +
             "<if test=\"aliveSecond != null\">, aliveSecond='${aliveSecond}'</if>" +
             "<if test=\"pushIng != null\">, pushIng=${pushIng}</if>" +
+            "<if test=\"self != null\">, self=${self}</if>" +
             "WHERE app=#{app} AND stream=#{stream}"+
             " </script>"})
     int update(StreamPushItem streamPushItem);
@@ -119,7 +121,7 @@ public interface StreamPushMapper {
     @Update("UPDATE stream_push " +
             "SET pushIng=${pushIng} " +
             "WHERE app=#{app} AND stream=#{stream}")
-    int updatePushStatus(String app, String stream, boolean status);
+    int updatePushStatus(String app, String stream, boolean pushIng);
 
     @Update("UPDATE stream_push " +
             "SET status=#{status} " +

+ 5 - 5
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -479,7 +479,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void sendStreamChangeMsg(String type, JSONObject jsonObject) {
         String key = VideoManagerConstants.WVP_MSG_STREAM_CHANGE_PREFIX + type;
-        logger.debug("[redis 流变化事件] {}: {}", key, jsonObject.toString());
+        logger.info("[redis 流变化事件] {}: {}", key, jsonObject.toString());
         redis.convertAndSend(key, jsonObject);
     }
 
@@ -688,21 +688,21 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void sendMobilePositionMsg(JSONObject jsonObject) {
         String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_MOBILE_POSITION;
-        logger.info("[redis 移动位置订阅通知] {}: {}", key, jsonObject.toString());
+        logger.info("[redis发送通知]移动位置 {}: {}", key, jsonObject.toString());
         redis.convertAndSend(key, jsonObject);
     }
 
     @Override
     public void sendStreamPushRequestedMsg(MessageForPushChannel msg) {
         String key = VideoManagerConstants.VM_MSG_STREAM_PUSH_REQUESTED;
-        logger.info("[redis 推流被请求通知] {}: {}/{}", key, msg.getApp(), msg.getStream());
+        logger.info("[redis发送通知]推流被请求 {}: {}/{}", key, msg.getApp(), msg.getStream());
         redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
     }
 
     @Override
     public void sendAlarmMsg(AlarmChannelMessage msg) {
         String key = VideoManagerConstants.VM_MSG_SUBSCRIBE_ALARM;
-        logger.info("[redis 报警通知] {}: {}", key, JSON.toJSON(msg));
+        logger.info("[redis发送通知] 报警{}: {}", key, JSON.toJSON(msg));
         redis.convertAndSend(key, (JSONObject)JSON.toJSON(msg));
     }
 
@@ -715,7 +715,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     public void sendStreamPushRequestedMsgForStatus() {
         String key = VideoManagerConstants.VM_MSG_GET_ALL_ONLINE_REQUESTED;
-        logger.info("[redis 通知]获取所有推流设备的状态");
+        logger.info("[redis通知]获取所有推流设备的状态");
         JSONObject jsonObject = new JSONObject();
         jsonObject.put(key, key);
         redis.convertAndSend(key, jsonObject);

+ 7 - 38
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java

@@ -635,47 +635,11 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 		return streamProxyMapper.selectOne(app, stream);
 	}
 
-	@Override
-	public void updateMediaList(List<StreamPushItem> streamPushItems) {
-		if (streamPushItems == null || streamPushItems.size() == 0) {
-			return;
-		}
-		logger.info("updateMediaList:  " + streamPushItems.size());
-		streamPushMapper.addAll(streamPushItems);
-		// TODO 待优化
-		for (int i = 0; i < streamPushItems.size(); i++) {
-			int onlineResult = mediaOnline(streamPushItems.get(i).getApp(), streamPushItems.get(i).getStream());
-			if (onlineResult > 0) {
-				// 发送上线通知
-				eventPublisher.catalogEventPublishForStream(null, streamPushItems.get(i), CatalogEvent.ON);
-			}
-		}
-	}
-
-
-
-	@Override
-	public void updateMedia(StreamPushItem streamPushItem) {
-		streamPushMapper.del(streamPushItem.getApp(), streamPushItem.getStream());
-		streamPushMapper.add(streamPushItem);
-		mediaOffline(streamPushItem.getApp(), streamPushItem.getStream());
-	}
-
 	@Override
 	public int removeMedia(String app, String stream) {
 		return streamPushMapper.del(app, stream);
 	}
 
-	@Override
-	public StreamPushItem getMedia(String app, String stream) {
-		return streamPushMapper.selectOne(app, stream);
-	}
-
-	@Override
-	public void clearMediaList() {
-		streamPushMapper.clear();
-	}
-
 	@Override
 	public int mediaOffline(String app, String stream) {
 		GbStream gbStream = gbStreamMapper.selectOne(app, stream);
@@ -683,7 +647,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 		if ("proxy".equals(gbStream.getStreamType())) {
 			result = streamProxyMapper.updateStatus(app, stream, false);
 		}else {
-			result = streamPushMapper.updateStatus(app, stream, false);
+			result = streamPushMapper.updatePushStatus(app, stream, false);
 		}
 		return result;
 	}
@@ -695,7 +659,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 		if ("proxy".equals(gbStream.getStreamType())) {
 			result = streamProxyMapper.updateStatus(app, stream, true);
 		}else {
-			result = streamPushMapper.updateStatus(app, stream, true);
+			result = streamPushMapper.updatePushStatus(app, stream, true);
 		}
 		return result;
 	}
@@ -741,6 +705,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 			if (platformCatalog.getPlatformId().equals(platformCatalog.getParentId())) {
 				// 第一层节点
 				platformCatalog.setBusinessGroupId(platformCatalog.getId());
+				platformCatalog.setParentId(platform.getDeviceGBId());
 			}else {
 				// 获取顶层的
 				PlatformCatalog topCatalog = getTopCatalog(platformCatalog.getParentId(), platformCatalog.getPlatformId());
@@ -749,6 +714,10 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 		}
 		if (platform.getTreeType().equals(TreeType.CIVIL_CODE)) {
 			platformCatalog.setCivilCode(platformCatalog.getId());
+			if (platformCatalog.getPlatformId().equals(platformCatalog.getParentId())) {
+				// 第一层节点
+				platformCatalog.setParentId(platform.getDeviceGBId());
+			}
 		}
 
 		int result = catalogMapper.add(platformCatalog);

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/bean/WVPResult.java

@@ -12,6 +12,7 @@ public class WVPResult<T> {
         this.data = data;
     }
 
+
     private int code;
     private String msg;
     private T data;

+ 14 - 6
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/platform/PlatformController.java

@@ -8,6 +8,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
 import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
+import com.genersoft.iot.vmp.gb28181.bean.TreeType;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.service.IPlatformChannelService;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@@ -463,13 +464,20 @@ public class PlatformController {
         if (logger.isDebugEnabled()) {
             logger.debug("查询目录,platformId: {}, parentId: {}", platformId, parentId);
         }
+        ParentPlatform platform = storager.queryParentPlatByServerGBId(platformId);
+        if (platform == null) {
+            return new ResponseEntity<>(new WVPResult<>(400, "平台未找到", null), HttpStatus.OK);
+        }
+        if (platformId.equals(parentId)) {
+            parentId = platform.getDeviceGBId();
+        }
         List<PlatformCatalog> platformCatalogList = storager.getChildrenCatalogByPlatform(platformId, parentId);
-        // 查询下属的国标通道
-//        List<PlatformCatalog> catalogsForChannel = storager.queryChannelInParentPlatformAndCatalog(platformId, parentId);
-        // 查询下属的直播流通道
-//        List<PlatformCatalog> catalogsForStream = storager.queryStreamInParentPlatformAndCatalog(platformId, parentId);
-//        platformCatalogList.addAll(catalogsForChannel);
-//        platformCatalogList.addAll(catalogsForStream);
+//        if (platform.getTreeType().equals(TreeType.BUSINESS_GROUP)) {
+//            platformCatalogList = storager.getChildrenCatalogByPlatform(platformId, parentId);
+//        }else {
+//
+//        }
+
         WVPResult<List<PlatformCatalog>> result = new WVPResult<>();
         result.setCode(0);
         result.setMsg("success");

+ 15 - 1
src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java

@@ -8,6 +8,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.VersionInfo;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.IHookSubscribe;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServerItem;
 import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.utils.SpringBeanFactory;
@@ -38,7 +40,7 @@ import java.util.Set;
 public class ServerController {
 
     @Autowired
-    private ConfigurableApplicationContext context;
+    private ZLMHttpHookSubscribe zlmHttpHookSubscribe;
 
     @Autowired
     private IMediaServerService mediaServerService;
@@ -254,6 +256,18 @@ public class ServerController {
         return result;
     }
 
+    @ApiOperation("获取当前所有hook")
+    @GetMapping(value = "/hooks")
+    @ResponseBody
+    public WVPResult<List<IHookSubscribe>> getHooks(){
+        WVPResult<List<IHookSubscribe>> result = new WVPResult<>();
+        result.setCode(0);
+        result.setMsg("success");
+        List<IHookSubscribe> all = zlmHttpHookSubscribe.getAll();
+        result.setData(all);
+        return result;
+    }
+
 //    @ApiOperation("当前进行中的动态任务")
 //    @GetMapping(value = "/dynamicTask")
 //    @ResponseBody

+ 46 - 8
src/main/java/com/genersoft/iot/vmp/vmanager/streamPush/StreamPushController.java

@@ -4,6 +4,7 @@ import com.alibaba.excel.EasyExcel;
 import com.alibaba.excel.ExcelReader;
 import com.alibaba.excel.read.metadata.ReadSheet;
 import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.security.SecurityUtils;
 import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
 import com.genersoft.iot.vmp.gb28181.bean.GbStream;
@@ -63,6 +64,9 @@ public class StreamPushController {
     @Autowired
     private IMediaService mediaService;
 
+    @Autowired
+    private UserSetting userSetting;
+
     @ApiOperation("推流列表查询")
     @ApiImplicitParams({
             @ApiImplicitParam(name="page", value = "当前页", required = true, dataTypeClass = Integer.class),
@@ -260,29 +264,63 @@ public class StreamPushController {
     })
     @GetMapping(value = "/getPlayUrl")
     @ResponseBody
-    public WVPResult<StreamInfo> getPlayUrl(HttpServletRequest request, @RequestParam String app,
-                                                             @RequestParam String stream,
-                                                             @RequestParam(required = false) String mediaServerId){
+    public WVPResult<StreamInfo> getPlayUrl(@RequestParam String app,@RequestParam String stream,
+                                            @RequestParam(required = false) String mediaServerId){
         boolean authority = false;
         // 是否登陆用户, 登陆用户返回完整信息
         LoginUser userInfo = SecurityUtils.getUserInfo();
         if (userInfo!= null) {
             authority = true;
         }
-
-        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
-
         WVPResult<StreamInfo> result = new WVPResult<>();
+        StreamPushItem push = streamPushService.getPush(app, stream);
+        if (push != null && !push.isSelf()) {
+            result.setCode(-1);
+            result.setMsg("来自其他平台的推流信息");
+            return result;
+        }
+        StreamInfo streamInfo = mediaService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
         if (streamInfo != null){
             result.setCode(0);
-            result.setMsg("scccess");
+            result.setMsg("success");
             result.setData(streamInfo);
         }else {
             result.setCode(-1);
-            result.setMsg("fail");
+            result.setMsg("获取播放地址失败");
         }
+
         return result;
     }
 
+    /**
+     * 获取推流播放地址
+     * @param stream 推流信息
+     * @return
+     */
+    @ApiOperation("获取推流播放地址")
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "stream", value = "推流信息", dataTypeClass = StreamPushItem.class),
+    })
+    @PostMapping(value = "/add")
+    @ResponseBody
+    public WVPResult<StreamInfo> add(@RequestBody StreamPushItem stream){
+        if (StringUtils.isEmpty(stream.getGbId())) {
+
+            return new WVPResult<>(400, "国标ID不可为空", null);
+        }
+        if (StringUtils.isEmpty(stream.getApp()) && StringUtils.isEmpty(stream.getStream())) {
+            return new WVPResult<>(400, "app或stream不可为空", null);
+        }
+        stream.setStatus(false);
+        stream.setPushIng(false);
+        stream.setAliveSecond(0L);
+        stream.setTotalReaderCount("0");
+        boolean result = streamPushService.add(stream);
 
+        if (result) {
+            return new WVPResult<>(0, "success", null);
+        }else {
+            return new WVPResult<>(-1, "fail", null);
+        }
+    }
 }

+ 17 - 7
web_src/src/components/PushVideoList.vue

@@ -34,6 +34,8 @@
         <el-button icon="el-icon-delete" size="mini" style="margin-right: 1rem;"
                    :disabled="multipleSelection.length === 0" type="danger" @click="batchDel">批量移除
         </el-button>
+        <el-button icon="el-icon-plus" size="mini" style="margin-right: 1rem;" type="primary" @click="addStream">添加通道
+        </el-button>
         <el-button icon="el-icon-refresh-right" circle size="mini" @click="refresh()"></el-button>
       </div>
     </div>
@@ -67,14 +69,14 @@
       </el-table-column>
       <el-table-column label="本平台推流"  min-width="100">
         <template slot-scope="scope">
-          {{scope.row.pushIng && !!!scope.row.serverId ? '是' : '否' }}
+          {{scope.row.pushIng && !!scope.row.self ? '是' : '否' }}
         </template>
       </el-table-column>
 
       <el-table-column label="操作" min-width="360"  fixed="right">
         <template slot-scope="scope">
           <el-button size="medium" icon="el-icon-video-play"
-                     v-if="(scope.row.status == false && scope.row.gbId == null) || scope.row.status"
+                     v-if="scope.row.pushIng === true"
                      @click="playPush(scope.row)" type="text">播放
           </el-button>
           <el-divider direction="vertical"></el-divider>
@@ -108,7 +110,7 @@
 <script>
 import streamProxyEdit from './dialog/StreamProxyEdit.vue'
 import devicePlayer from './dialog/devicePlayer.vue'
-import addStreamTOGB from './dialog/addStreamTOGB.vue'
+import addStreamTOGB from './dialog/pushStreamEdit.vue'
 import uiHeader from '../layout/UiHeader.vue'
 import importChannel from './dialog/importChannel.vue'
 import MediaServer from './service/MediaServer'
@@ -200,10 +202,15 @@ export default {
         }
       }).then(function (res) {
         that.getListLoading = false;
-        that.$refs.devicePlayer.openDialog("streamPlay", null, null, {
-          streamInfo: res.data.data,
-          hasAudio: true
-        });
+        if (res.data.code === 0 ) {
+          that.$refs.devicePlayer.openDialog("streamPlay", null, null, {
+            streamInfo: res.data.data,
+            hasAudio: true
+          });
+        }else {
+          that.$message.error(res.data.msg);
+        }
+
       }).catch(function (error) {
         console.error(error);
         that.getListLoading = false;
@@ -252,6 +259,9 @@ export default {
 
       })
     },
+    addStream: function (){
+      this.$refs.addStreamTOGB.openDialog(null, this.initData);
+    },
     batchDel: function () {
       this.$confirm(`确定删除选中的${this.multipleSelection.length}个通道?`, '提示', {
         confirmButtonText: '确定',

+ 1 - 2
web_src/src/components/channelList.vue

@@ -124,7 +124,6 @@
 import devicePlayer from './dialog/devicePlayer.vue'
 import uiHeader from '../layout/UiHeader.vue'
 import moment from "moment";
-import DviceService from "./service/DeviceService";
 import DeviceService from "./service/DeviceService";
 import DeviceTree from "./common/DeviceTree";
 
@@ -318,7 +317,7 @@ export default {
     changeSubchannel(itemData) {
       this.beforeUrl = this.$router.currentRoute.path;
 
-      var url = `/${this.$router.currentRoute.name}/${this.$router.currentRoute.params.deviceId}/${itemData.channelId}/${this.$router.currentRoute.params.count}/1`
+      var url = `/${this.$router.currentRoute.name}/${this.$router.currentRoute.params.deviceId}/${itemData.channelId}`
       this.$router.push(url).then(() => {
         this.searchSrt = "";
         this.channelType = "";

+ 24 - 19
web_src/src/components/dialog/SyncChannelProgress.vue

@@ -63,34 +63,39 @@ export default {
           }
 
           if (res.data.data != null) {
-            if (res.data.data.total == 0) {
-              if (res.data.data.errorMsg !== null ){
-                this.msg = res.data.data.errorMsg;
-                this.syncStatus = "exception"
-              }else {
-                this.msg = `等待同步中`;
-                this.timmer = setTimeout(this.getProgress, 300)
-              }
-            }else  {
-              if (res.data.data.total == res.data.data.current) {
-                this.syncStatus = "success"
-                this.percentage = 100;
-                this.msg = '同步成功';
-              }else {
+            if (res.data.syncIng) {
+              if (res.data.data.total == 0) {
                 if (res.data.data.errorMsg !== null ){
                   this.msg = res.data.data.errorMsg;
                   this.syncStatus = "exception"
                 }else {
-                  this.total = res.data.data.total;
-                  this.current = res.data.data.current;
-                  this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
-                  this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
+                  this.msg = `等待同步中`;
                   this.timmer = setTimeout(this.getProgress, 300)
                 }
+              }else  {
+                if (res.data.data.total == res.data.data.current) {
+                  this.syncStatus = "success"
+                  this.percentage = 100;
+                  this.msg = '同步成功';
+                }else {
+                  if (res.data.data.errorMsg !== null ){
+                    this.msg = res.data.data.errorMsg;
+                    this.syncStatus = "exception"
+                  }else {
+                    this.total = res.data.data.total;
+                    this.current = res.data.data.current;
+                    this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
+                    this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
+                    this.timmer = setTimeout(this.getProgress, 300)
+                  }
+                }
               }
+            }else {
+              this.syncStatus = "success"
+              this.percentage = 100;
+              this.msg = '同步成功';
             }
           }
-
         }else {
           if (this.syncFlag) {
             this.syncStatus = "success"

+ 3 - 4
web_src/src/components/dialog/catalogEdit.vue

@@ -70,12 +70,11 @@ export default {
         console.log(catalogType)
         // 216 为虚拟组织 215 为业务分组;目录第一级必须为业务分组, 业务分组下为虚拟组织,虚拟组织下可以有其他虚拟组织
         if (this.level === 1 && catalogType !== "215") {
-          return callback(new Error('业务分组模式下第一层目录的编号10到13位必须为215'));
+          return callback(new Error('业务分组模式下第一层目录的编号11到13位必须为215'));
         }
         if (this.level > 1 && catalogType !== "216") {
-          return callback(new Error('业务分组模式下第一层以下目录的编号10到13位必须为216'));
+          return callback(new Error('业务分组模式下第一层以下目录的编号11到13位必须为216'));
         }
-
       }
       callback();
     }
@@ -94,7 +93,7 @@ export default {
       },
       rules: {
         name: [{ required: true, message: "请输入名称", trigger: "blur" }],
-        id: [{ trigger: "blur",validator: checkId  }]
+        id: [{ required: true, trigger: "blur",validator: checkId  }]
       },
     };
   },

+ 43 - 18
web_src/src/components/dialog/addStreamTOGB.vue

@@ -15,10 +15,10 @@
                 <el-input v-model="proxyParam.name" clearable></el-input>
               </el-form-item>
               <el-form-item label="流应用名" prop="app">
-                <el-input v-model="proxyParam.app" clearable :disabled="true"></el-input>
+                <el-input v-model="proxyParam.app" clearable :disabled="edit"></el-input>
               </el-form-item>
               <el-form-item label="流ID" prop="stream">
-                <el-input v-model="proxyParam.stream" clearable :disabled="true"></el-input>
+                <el-input v-model="proxyParam.stream" clearable :disabled="edit"></el-input>
               </el-form-item>
               <el-form-item label="国标编码" prop="gbId">
                 <el-input v-model="proxyParam.gbId" placeholder="设置国标编码可推送到国标" clearable></el-input>
@@ -28,7 +28,6 @@
                   <el-button type="primary" @click="onSubmit">保存</el-button>
                   <el-button @click="close">取消</el-button>
                 </div>
-                
               </el-form-item>
             </el-form>
       </div>
@@ -38,7 +37,7 @@
 
 <script>
 export default {
-  name: "streamProxyEdit",
+  name: "pushStreamEdit",
   props: {},
   computed: {},
   created() {},
@@ -63,13 +62,13 @@ export default {
       listChangeCallback: null,
       showDialog: false,
       isLoging: false,
+      edit: false,
       proxyParam: {
           name: null,
           app: null,
           stream: null,
           gbId: null,
       },
-      
       rules: {
         name: [{ required: true, message: "请输入名称", trigger: "blur" }],
         app: [{ required: true, message: "请输入应用名", trigger: "blur" }],
@@ -84,30 +83,53 @@ export default {
       this.listChangeCallback = callback;
       if (proxyParam != null) {
         this.proxyParam = proxyParam;
-      } 
+        this.edit = true
+      }
     },
     onSubmit: function () {
       console.log("onSubmit");
-      var that = this;
-      that.$axios({
-        method:"post",
-        url:`/api/push/save_to_gb`, 
-        data: that.proxyParam
-      }).then(function (res) {
+      if (this.edit) {
+        this.$axios({
+          method:"post",
+          url:`/api/push/save_to_gb`,
+          data: this.proxyParam
+        }).then( (res) => {
           if (res.data == "success") {
-            that.$message({
+            this.$message({
               showClose: true,
               message: "保存成功",
               type: "success",
             });
-            that.showDialog = false;
-            if (that.listChangeCallback != null) {
-              that.listChangeCallback();
+            this.showDialog = false;
+            if (this.listChangeCallback != null) {
+              this.listChangeCallback();
             }
           }
-      }).catch(function (error) {
+        }).catch((error)=> {
           console.log(error);
-      });
+        });
+      }else {
+        this.$axios({
+          method:"post",
+          url:`/api/push/add`,
+          data: this.proxyParam
+        }).then( (res) => {
+          if (res.data.code === 0) {
+            this.$message({
+              showClose: true,
+              message: "保存成功",
+              type: "success",
+            });
+            this.showDialog = false;
+            if (this.listChangeCallback != null) {
+              this.listChangeCallback();
+            }
+          }
+        }).catch((error)=> {
+          console.log(error);
+        });
+      }
+
     },
     close: function () {
       console.log("关闭加入GB");
@@ -131,6 +153,9 @@ export default {
       if (this.platform.enable && this.platform.expires == "0") {
         this.platform.expires = "300";
       }
+    },
+    handleNodeClick: function (node){
+
     }
   },
 };