Explorar o código

Merge branch '648540858:wvp-28181-2.0' into wvp-28181-2.0

mk1990 %!s(int64=3) %!d(string=hai) anos
pai
achega
22e1d92a9d
Modificáronse 26 ficheiros con 543 adicións e 246 borrados
  1. 91 0
      bootstrap.sh
  2. 49 7
      src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java
  3. 1 0
      src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java
  4. 10 0
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java
  5. 49 2
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java
  6. 24 0
      src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java
  7. 5 1
      src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java
  8. 0 50
      src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java
  9. 0 2
      src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java
  10. 36 26
      src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java
  11. 4 0
      src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java
  12. 7 0
      src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java
  13. 38 31
      src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java
  14. 28 3
      src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java
  15. 1 1
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java
  16. 21 10
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java
  17. 3 3
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java
  18. 2 2
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java
  19. 18 9
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java
  20. 17 35
      src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java
  21. 3 10
      src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java
  22. 23 21
      src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java
  23. 8 5
      src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java
  24. 37 9
      src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java
  25. 39 3
      src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java
  26. 29 16
      web_src/src/components/dialog/SyncChannelProgress.vue

+ 91 - 0
bootstrap.sh

@@ -0,0 +1,91 @@
+#!/bin/bash
+
+######################################################
+# Copyright 2019 Pham Ngoc Hoai
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Repo: https://github.com/tyrion9/spring-boot-startup-script
+#
+######### PARAM ######################################
+
+JAVA_OPT=-Xmx1024m
+JARFILE=`ls -1r *.jar 2>/dev/null | head -n 1`
+PID_FILE=pid.file
+RUNNING=N
+PWD=`pwd`
+
+######### DO NOT MODIFY ########
+
+if [ -f $PID_FILE ]; then
+        PID=`cat $PID_FILE`
+        if [ ! -z "$PID" ] && kill -0 $PID 2>/dev/null; then
+                RUNNING=Y
+        fi
+fi
+
+start()
+{
+        if [ $RUNNING == "Y" ]; then
+                echo "Application already started"
+        else
+                if [ -z "$JARFILE" ]
+                then
+                        echo "ERROR: jar file not found"
+                else
+                        nohup java  $JAVA_OPT -Djava.security.egd=file:/dev/./urandom -jar $PWD/$JARFILE > nohup.out 2>&1  &
+                        echo $! > $PID_FILE
+                        echo "Application $JARFILE starting..."
+                        tail -f nohup.out
+                fi
+        fi
+}
+
+stop()
+{
+        if [ $RUNNING == "Y" ]; then
+                kill -9 $PID
+                rm -f $PID_FILE
+                echo "Application stopped"
+        else
+                echo "Application not running"
+        fi
+}
+
+restart()
+{
+        stop
+        start
+}
+
+case "$1" in
+
+        'start')
+                start
+                ;;
+
+        'stop')
+                stop
+                ;;
+
+        'restart')
+                restart
+                ;;
+
+        *)
+                echo "Usage: $0 {  start | stop | restart  }"
+                exit 1
+                ;;
+esac
+exit 0
+

+ 49 - 7
src/main/java/com/genersoft/iot/vmp/conf/DynamicTask.java

@@ -1,6 +1,9 @@
 package com.genersoft.iot.vmp.conf;
 
 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
+import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -18,6 +21,8 @@ import java.util.concurrent.ScheduledFuture;
 @Component
 public class DynamicTask {
 
+    private Logger logger = LoggerFactory.getLogger(DynamicTask.class);
+
     @Autowired
     private ThreadPoolTaskScheduler threadPoolTaskScheduler;
 
@@ -26,7 +31,12 @@ public class DynamicTask {
 
     @Bean
     public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
-        return new ThreadPoolTaskScheduler();
+        ThreadPoolTaskScheduler schedulerPool = new ThreadPoolTaskScheduler();
+        schedulerPool.setPoolSize(300);
+        schedulerPool.setWaitForTasksToCompleteOnShutdown(true);
+        schedulerPool.setAwaitTerminationSeconds(10);
+        return schedulerPool;
+
     }
 
     /**
@@ -37,11 +47,24 @@ public class DynamicTask {
      * @return
      */
     public void startCron(String key, Runnable task, int cycleForCatalog) {
-        stop(key);
+        ScheduledFuture future = futureMap.get(key);
+        if (future != null) {
+            if (future.isCancelled()) {
+                logger.info("任务【{}】已存在但是关闭状态!!!", key);
+            } else {
+                logger.info("任务【{}】已存在且已启动!!!", key);
+                return;
+            }
+        }
         // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
-        ScheduledFuture future = threadPoolTaskScheduler.scheduleWithFixedDelay(task, cycleForCatalog * 1000L);
-        futureMap.put(key, future);
-        runnableMap.put(key, task);
+        future = threadPoolTaskScheduler.scheduleAtFixedRate(task, cycleForCatalog * 1000L);
+        if (future != null){
+            futureMap.put(key, future);
+            runnableMap.put(key, task);
+            logger.info("任务【{}】启动成功!!!", key);
+        }else {
+            logger.info("任务【{}】启动失败!!!", key);
+        }
     }
 
     /**
@@ -54,9 +77,25 @@ public class DynamicTask {
     public void startDelay(String key, Runnable task, int delay) {
         stop(key);
         Date starTime = new Date(System.currentTimeMillis() + delay);
+
+        ScheduledFuture future = futureMap.get(key);
+        if (future != null) {
+            if (future.isCancelled()) {
+                logger.info("任务【{}】已存在但是关闭状态!!!", key);
+            } else {
+                logger.info("任务【{}】已存在且已启动!!!", key);
+                return;
+            }
+        }
         // scheduleWithFixedDelay 必须等待上一个任务结束才开始计时period, cycleForCatalog表示执行的间隔
-        ScheduledFuture future = threadPoolTaskScheduler.schedule(task, starTime);
-        futureMap.put(key, future);
+        future = threadPoolTaskScheduler.schedule(task, starTime);
+        if (future != null){
+            futureMap.put(key, future);
+            runnableMap.put(key, task);
+            logger.info("任务【{}】启动成功!!!", key);
+        }else {
+            logger.info("任务【{}】启动失败!!!", key);
+        }
     }
 
     public void stop(String key) {
@@ -78,4 +117,7 @@ public class DynamicTask {
         return futureMap.keySet();
     }
 
+    public Runnable get(String key) {
+        return runnableMap.get(key);
+    }
 }

+ 1 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/SipLayer.java

@@ -48,6 +48,7 @@ public class SipLayer{
 		properties.setProperty("javax.sip.STACK_NAME", "GB28181_SIP");
 		properties.setProperty("javax.sip.IP_ADDRESS", sipConfig.getMonitorIp());
 		properties.setProperty("gov.nist.javax.sip.LOG_MESSAGE_CONTENT", "true");
+		properties.setProperty("gov.nist.javax.sip.DELIVER_UNSOLICITED_NOTIFY", "true"); // 接收所有notify请求,即使没有订阅
 		/**
 		 * sip_server_log.log 和 sip_debug_log.log public static final int TRACE_NONE =
 		 * 0; public static final int TRACE_MESSAGES = 16; public static final int

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/CatalogData.java

@@ -4,6 +4,7 @@ import java.util.Date;
 import java.util.List;
 
 public class CatalogData {
+    private int sn; // 命令序列号
     private int total;
     private List<DeviceChannel> channelList;
     private Date lastTime;
@@ -15,6 +16,15 @@ public class CatalogData {
     }
     private CatalogDataStatus status;
 
+
+    public int getSn() {
+        return sn;
+    }
+
+    public void setSn(int sn) {
+        this.sn = sn;
+    }
+
     public int getTotal() {
         return total;
     }

+ 49 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java

@@ -1,5 +1,12 @@
 package com.genersoft.iot.vmp.gb28181.bean;
 
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.DynamicTask;
+import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
+import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.ArrayList;
@@ -9,12 +16,32 @@ import java.util.concurrent.ConcurrentHashMap;
 @Component
 public class SubscribeHolder {
 
+    @Autowired
+    private DynamicTask dynamicTask;
+
+    @Autowired
+    private IRedisCatchStorage redisCatchStorage;
+
+    @Autowired
+    private ISIPCommanderForPlatform sipCommanderForPlatform;
+
+    @Autowired
+    private IVideoManagerStorage storager;
+
+    private final String taskOverduePrefix = "subscribe_overdue_";
+
     private static ConcurrentHashMap<String, SubscribeInfo> catalogMap = new ConcurrentHashMap<>();
     private static ConcurrentHashMap<String, SubscribeInfo> mobilePositionMap = new ConcurrentHashMap<>();
 
 
     public void putCatalogSubscribe(String platformId, SubscribeInfo subscribeInfo) {
         catalogMap.put(platformId, subscribeInfo);
+        // 添加订阅到期
+        String taskOverdueKey = taskOverduePrefix +  "catalog_" + platformId;
+        dynamicTask.stop(taskOverdueKey);
+        // 添加任务处理订阅过期
+        dynamicTask.startDelay(taskOverdueKey, () -> removeCatalogSubscribe(subscribeInfo.getId()),
+                subscribeInfo.getExpires() * 1000);
     }
 
     public SubscribeInfo getCatalogSubscribe(String platformId) {
@@ -23,10 +50,24 @@ public class SubscribeHolder {
 
     public void removeCatalogSubscribe(String platformId) {
         catalogMap.remove(platformId);
+        String taskOverdueKey = taskOverduePrefix +  "catalog_" + platformId;
+        // 添加任务处理订阅过期
+        dynamicTask.stop(taskOverdueKey);
     }
 
     public void putMobilePositionSubscribe(String platformId, SubscribeInfo subscribeInfo) {
         mobilePositionMap.put(platformId, subscribeInfo);
+        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX +  "MobilePosition_" + platformId;
+        // 添加任务处理GPS定时推送
+        dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, subscribeInfo.getSn(), key, this), subscribeInfo.getGpsInterval());
+        String taskOverdueKey = taskOverduePrefix +  "MobilePosition_" + platformId;
+        dynamicTask.stop(taskOverdueKey);
+        // 添加任务处理订阅过期
+        dynamicTask.startDelay(taskOverdueKey, () -> {
+                    System.out.println("订阅过期");
+                    removeMobilePositionSubscribe(subscribeInfo.getId());
+                },
+                subscribeInfo.getExpires() * 1000);
     }
 
     public SubscribeInfo getMobilePositionSubscribe(String platformId) {
@@ -35,6 +76,12 @@ public class SubscribeHolder {
 
     public void removeMobilePositionSubscribe(String platformId) {
         mobilePositionMap.remove(platformId);
+        String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX +  "MobilePosition_" + platformId;
+        // 结束任务处理GPS定时推送
+        dynamicTask.stop(key);
+        String taskOverdueKey = taskOverduePrefix +  "MobilePosition_" + platformId;
+        // 添加任务处理订阅过期
+        dynamicTask.stop(taskOverdueKey);
     }
 
     public List<String> getAllCatalogSubscribePlatform() {
@@ -48,7 +95,7 @@ public class SubscribeHolder {
     }
 
     public void removeAllSubscribe(String platformId) {
-        mobilePositionMap.remove(platformId);
-        catalogMap.remove(platformId);
+        removeMobilePositionSubscribe(platformId);
+        removeCatalogSubscribe(platformId);
     }
 }

+ 24 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeInfo.java

@@ -33,6 +33,14 @@ public class SubscribeInfo {
     private ServerTransaction transaction;
     private Dialog dialog;
 
+    /**
+     * 以下为可选字段
+     * @return
+     */
+    private String sn;
+    private int gpsInterval;
+
+
     public String getId() {
         return id;
     }
@@ -88,4 +96,20 @@ public class SubscribeInfo {
     public void setDialog(Dialog dialog) {
         this.dialog = dialog;
     }
+
+    public String getSn() {
+        return sn;
+    }
+
+    public void setSn(String sn) {
+        this.sn = sn;
+    }
+
+    public int getGpsInterval() {
+        return gpsInterval;
+    }
+
+    public void setGpsInterval(int gpsInterval) {
+        this.gpsInterval = gpsInterval;
+    }
 }

+ 5 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/event/online/OnlineEventListener.java

@@ -54,6 +54,7 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
 	@Autowired
 	private SIPCommander cmder;
 
+
 	private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
 	@Override
@@ -76,7 +77,7 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
 			if (deviceInStore == null) { //第一次上线
 				logger.info("[{}] 首次注册,查询设备信息以及通道信息", device.getDeviceId());
 				cmder.deviceInfoQuery(device);
-				cmder.catalogQuery(device, null);
+				deviceService.sync(device);
 			}
 			break;
 		// 设备主动发送心跳触发的在线事件
@@ -99,7 +100,10 @@ public class OnlineEventListener implements ApplicationListener<OnlineEvent> {
 		storager.updateDevice(device);
 		// 上线添加订阅
 		if (device.getSubscribeCycleForCatalog() > 0) {
+			// 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
 			deviceService.addCatalogSubscribe(device);
+		}
+		if (device.getSubscribeCycleForMobilePosition() > 0) {
 			deviceService.addMobilePositionSubscribe(device);
 		}
 	}

+ 0 - 50
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/SubscribeListenerForPlatform.java

@@ -1,50 +0,0 @@
-package com.genersoft.iot.vmp.gb28181.event.subscribe;
-
-import com.genersoft.iot.vmp.common.VideoManagerConstants;
-import com.genersoft.iot.vmp.conf.DynamicTask;
-import com.genersoft.iot.vmp.conf.RedisKeyExpirationEventMessageListener;
-import com.genersoft.iot.vmp.conf.UserSetting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.data.redis.connection.Message;
-import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.stereotype.Component;
-
-/**    
- * 平台订阅到期事件
- */
-@Component
-public class SubscribeListenerForPlatform extends RedisKeyExpirationEventMessageListener {
-
-    private Logger logger = LoggerFactory.getLogger(SubscribeListenerForPlatform.class);
-
-	@Autowired
-	private UserSetting userSetting;
-
-    @Autowired
-    private DynamicTask dynamicTask;
-
-    public SubscribeListenerForPlatform(RedisMessageListenerContainer listenerContainer, UserSetting userSetting) {
-        super(listenerContainer, userSetting);
-    }
-
-
-    /**
-     * 监听失效的key
-     * @param message
-     * @param pattern
-     */
-    @Override
-    public void onMessage(Message message, byte[] pattern) {
-        //  获取失效的key
-        String expiredKey = message.toString();
-        logger.debug(expiredKey);
-        // 订阅到期
-        String PLATFORM_KEEPLIVEKEY_PREFIX = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() + "_";
-        if (expiredKey.startsWith(PLATFORM_KEEPLIVEKEY_PREFIX)) {
-            // 取消定时任务
-            dynamicTask.stop(expiredKey);
-        }
-    }
-}

+ 0 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java

@@ -61,8 +61,6 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
         if (event.getPlatformId() != null) {
             parentPlatform = storager.queryParentPlatByServerGBId(event.getPlatformId());
             if (parentPlatform != null && !parentPlatform.isStatus())return;
-            String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_Catalog_" + event.getPlatformId();
-//            subscribe = redisCatchStorage.getSubscribe(key);
             subscribe = subscribeHolder.getCatalogSubscribe(event.getPlatformId());
 
             if (subscribe == null) {

+ 36 - 26
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java

@@ -26,28 +26,35 @@ public class CatalogDataCatch {
     @Autowired
     private IVideoManagerStorage storager;
 
-    public void addReady(String key) {
-        CatalogData catalogData = data.get(key);
+    public void addReady(Device device, int sn ) {
+        CatalogData catalogData = data.get(device.getDeviceId());
         if (catalogData == null || catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end)) {
             catalogData = new CatalogData();
             catalogData.setChannelList(new ArrayList<>());
+            catalogData.setDevice(device);
+            catalogData.setSn(sn);
             catalogData.setStatus(CatalogData.CatalogDataStatus.ready);
             catalogData.setLastTime(new Date(System.currentTimeMillis()));
-            data.put(key, catalogData);
+            data.put(device.getDeviceId(), catalogData);
         }
     }
 
-    public void put(String key, int total, Device device, List<DeviceChannel> deviceChannelList) {
-        CatalogData catalogData = data.get(key);
+    public void put(String deviceId, int sn, int total, Device device, List<DeviceChannel> deviceChannelList) {
+        CatalogData catalogData = data.get(deviceId);
         if (catalogData == null) {
             catalogData = new CatalogData();
+            catalogData.setSn(sn);
             catalogData.setTotal(total);
             catalogData.setDevice(device);
             catalogData.setChannelList(new ArrayList<>());
             catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
             catalogData.setLastTime(new Date(System.currentTimeMillis()));
-            data.put(key, catalogData);
+            data.put(deviceId, catalogData);
         }else {
+            // 同一个设备的通道同步请求只考虑一个,其他的直接忽略
+            if (catalogData.getSn() != sn) {
+                return;
+            }
             catalogData.setTotal(total);
             catalogData.setDevice(device);
             catalogData.setStatus(CatalogData.CatalogDataStatus.runIng);
@@ -56,20 +63,20 @@ public class CatalogDataCatch {
         }
     }
 
-    public List<DeviceChannel> get(String key) {
-        CatalogData catalogData = data.get(key);
+    public List<DeviceChannel> get(String deviceId) {
+        CatalogData catalogData = data.get(deviceId);
         if (catalogData == null) return null;
         return catalogData.getChannelList();
     }
 
-    public int getTotal(String key) {
-        CatalogData catalogData = data.get(key);
+    public int getTotal(String deviceId) {
+        CatalogData catalogData = data.get(deviceId);
         if (catalogData == null) return 0;
         return catalogData.getTotal();
     }
 
-    public SyncStatus getSyncStatus(String key) {
-        CatalogData catalogData = data.get(key);
+    public SyncStatus getSyncStatus(String deviceId) {
+        CatalogData catalogData = data.get(deviceId);
         if (catalogData == null) return null;
         SyncStatus syncStatus = new SyncStatus();
         syncStatus.setCurrent(catalogData.getChannelList().size());
@@ -78,10 +85,6 @@ public class CatalogDataCatch {
         return syncStatus;
     }
 
-    public void del(String key) {
-        data.remove(key);
-    }
-
     @Scheduled(fixedRate = 5 * 1000)   //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
     private void timerTask(){
         Set<String> keys = data.keySet();
@@ -92,23 +95,30 @@ public class CatalogDataCatch {
         Calendar calendarBefore30S = Calendar.getInstance();
         calendarBefore30S.setTime(new Date());
         calendarBefore30S.set(Calendar.SECOND, calendarBefore30S.get(Calendar.SECOND) - 30);
-        for (String key : keys) {
-            CatalogData catalogData = data.get(key);
-            if (catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
-                storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
-                String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
+        for (String deviceId : keys) {
+            CatalogData catalogData = data.get(deviceId);
+            if ( catalogData.getLastTime().before(calendarBefore5S.getTime())) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
+                if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
+                    storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
+                    if (catalogData.getTotal() != catalogData.getChannelList().size()) {
+                        String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
+                        catalogData.setErrorMsg(errorMsg);
+                    }
+                }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
+                    String errorMsg = "同步失败,等待回复超时";
+                    catalogData.setErrorMsg(errorMsg);
+                }
                 catalogData.setStatus(CatalogData.CatalogDataStatus.end);
-                catalogData.setErrorMsg(errorMsg);
             }
-            if (catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
-                data.remove(key);
+            if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.end) && catalogData.getLastTime().before(calendarBefore30S.getTime())) { // 超过三十秒,如果标记为end则删除
+                data.remove(deviceId);
             }
         }
     }
 
 
-    public void setChannelSyncEnd(String key, String errorMsg) {
-        CatalogData catalogData = data.get(key);
+    public void setChannelSyncEnd(String deviceId, String errorMsg) {
+        CatalogData catalogData = data.get(deviceId);
         if (catalogData == null)return;
         catalogData.setStatus(CatalogData.CatalogDataStatus.end);
         catalogData.setErrorMsg(errorMsg);

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/task/ISubscribeTask.java

@@ -1,5 +1,9 @@
 package com.genersoft.iot.vmp.gb28181.task;
 
+import javax.sip.DialogState;
+
 public interface ISubscribeTask extends Runnable{
     void stop();
+
+    DialogState getDialogState();
 }

+ 7 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/CatalogSubscribeTask.java

@@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
 
 import javax.sip.Dialog;
 import javax.sip.DialogState;
@@ -72,4 +73,10 @@ public class CatalogSubscribeTask implements ISubscribeTask {
             });
         }
     }
+
+    @Override
+    public DialogState getDialogState() {
+        if (dialog == null) return null;
+        return dialog.getState();
+    }
 }

+ 38 - 31
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeHandlerTask.java

@@ -1,16 +1,16 @@
 package com.genersoft.iot.vmp.gb28181.task.impl;
 
-import com.genersoft.iot.vmp.gb28181.bean.GbStream;
-import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
-import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
-import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
+import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
 
-import java.text.SimpleDateFormat;
+import javax.sip.DialogState;
 import java.util.List;
 
 /**
@@ -18,20 +18,21 @@ import java.util.List;
  */
 public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
 
+    private Logger logger = LoggerFactory.getLogger(MobilePositionSubscribeHandlerTask.class);
+
     private IRedisCatchStorage redisCatchStorage;
     private IVideoManagerStorage storager;
     private ISIPCommanderForPlatform sipCommanderForPlatform;
     private SubscribeHolder subscribeHolder;
-    private String platformId;
+    private ParentPlatform platform;
     private String sn;
     private String key;
 
-    private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-
     public MobilePositionSubscribeHandlerTask(IRedisCatchStorage redisCatchStorage, ISIPCommanderForPlatform sipCommanderForPlatform, IVideoManagerStorage storager, String platformId, String sn, String key, SubscribeHolder subscribeInfo) {
+        System.out.println("MobilePositionSubscribeHandlerTask 初始化");
         this.redisCatchStorage = redisCatchStorage;
         this.storager = storager;
-        this.platformId = platformId;
+        this.platform = storager.queryParentPlatByServerGBId(platformId);
         this.sn = sn;
         this.key = key;
         this.sipCommanderForPlatform = sipCommanderForPlatform;
@@ -41,30 +42,31 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
     @Override
     public void run() {
 
-        SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platformId);
-
+        if (platform == null) return;
+        SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
         if (subscribe != null) {
-            ParentPlatform parentPlatform = storager.queryParentPlatByServerGBId(platformId);
-            if (parentPlatform == null || parentPlatform.isStatus()) {
-                // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
-                List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platformId);
-                if (gbStreams.size() > 0) {
-                    for (GbStream gbStream : gbStreams) {
-                        String gbId = gbStream.getGbId();
-                        GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
-                        if (gpsMsgInfo != null) {
-                            // 发送GPS消息
-                            sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
-                        }else {
-                            // 没有在redis找到新的消息就使用数据库的消息
-                            gpsMsgInfo = new GPSMsgInfo();
-                            gpsMsgInfo.setId(gbId);
-                            gpsMsgInfo.setLat(gbStream.getLongitude());
-                            gpsMsgInfo.setLng(gbStream.getLongitude());
-                            // 发送GPS消息
-                            sipCommanderForPlatform.sendNotifyMobilePosition(parentPlatform, gpsMsgInfo, subscribe);
-                        }
+
+//            if (!parentPlatform.isStatus()) {
+//                logger.info("发送订阅时发现平台已经离线:{}", platformId);
+//                return;
+//            }
+            // TODO 暂时只处理视频流的回复,后续增加对国标设备的支持
+            List<GbStream> gbStreams = storager.queryGbStreamListInPlatform(platform.getServerGBId());
+            if (gbStreams.size() == 0) {
+                logger.info("发送订阅时发现平台已经没有关联的直播流:{}", platform.getServerGBId());
+                return;
+            }
+            for (GbStream gbStream : gbStreams) {
+                String gbId = gbStream.getGbId();
+                GPSMsgInfo gpsMsgInfo = redisCatchStorage.getGpsMsgInfo(gbId);
+                if (gpsMsgInfo != null) { // 无最新位置不发送
+                    logger.info("无最新位置不发送");
+                    // 经纬度都为0不发送
+                    if (gpsMsgInfo.getLng() == 0 && gpsMsgInfo.getLat() == 0) {
+                        continue;
                     }
+                    // 发送GPS消息
+                    sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, subscribe);
                 }
             }
         }
@@ -74,4 +76,9 @@ public class MobilePositionSubscribeHandlerTask implements ISubscribeTask {
     public void stop() {
 
     }
+
+    @Override
+    public DialogState getDialogState() {
+        return null;
+    }
 }

+ 28 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/task/impl/MobilePositionSubscribeTask.java

@@ -6,10 +6,13 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import org.dom4j.Element;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
 
 import javax.sip.Dialog;
 import javax.sip.DialogState;
 import javax.sip.ResponseEvent;
+import java.util.Timer;
+import java.util.TimerTask;
 
 /**
  * 移动位置订阅的定时更新
@@ -20,6 +23,8 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
     private  ISIPCommander sipCommander;
     private Dialog dialog;
 
+    private Timer timer ;
+
     public MobilePositionSubscribeTask(Device device, ISIPCommander sipCommander) {
         this.device = device;
         this.sipCommander = sipCommander;
@@ -27,10 +32,14 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
 
     @Override
     public void run() {
+        if (timer != null ) {
+            timer.cancel();
+            timer = null;
+        }
         sipCommander.mobilePositionSubscribe(device, dialog, eventResult -> {
-            if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
-                dialog = eventResult.dialog;
-            }
+//            if (eventResult.dialog != null || eventResult.dialog.getState().equals(DialogState.CONFIRMED)) {
+//                dialog = eventResult.dialog;
+//            }
             ResponseEvent event = (ResponseEvent) eventResult.event;
             if (event.getResponse().getRawContent() != null) {
                 // 成功
@@ -43,6 +52,13 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
             dialog = null;
             // 失败
             logger.warn("[移动位置订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
+            timer = new Timer();
+            timer.schedule(new TimerTask() {
+                @Override
+                public void run() {
+                    MobilePositionSubscribeTask.this.run();
+                }
+            }, 2000);
         });
 
     }
@@ -56,6 +72,10 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
          * COMPLETED-> Completed Dialog状态-已完成
          * TERMINATED-> Terminated Dialog状态-终止
          */
+        if (timer != null ) {
+            timer.cancel();
+            timer = null;
+        }
         if (dialog != null && dialog.getState().equals(DialogState.CONFIRMED)) {
             logger.info("取消移动订阅时dialog状态为{}", dialog.getState());
             device.setSubscribeCycleForMobilePosition(0);
@@ -74,4 +94,9 @@ public class MobilePositionSubscribeTask implements ISubscribeTask {
             });
         }
     }
+    @Override
+    public DialogState getDialogState() {
+        if (dialog == null) return null;
+        return dialog.getState();
+    }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java

@@ -250,7 +250,7 @@ public interface ISIPCommander {
 	 * 
 	 * @param device 视频设备
 	 */
-	boolean catalogQuery(Device device, SipSubscribe.Event errorEvent);
+	boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent);
 	
 	/**
 	 * 查询录像信息

+ 21 - 10
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -1208,14 +1208,14 @@ public class SIPCommander implements ISIPCommander {
 	 * @param device 视频设备
 	 */ 
 	@Override
-	public boolean catalogQuery(Device device, SipSubscribe.Event errorEvent) {
+	public boolean catalogQuery(Device device, int sn, SipSubscribe.Event errorEvent) {
 		try {
 			StringBuffer catalogXml = new StringBuffer(200);
 			String charset = device.getCharset();
 			catalogXml.append("<?xml version=\"1.0\" encoding=\"" + charset + "\"?>\r\n");
 			catalogXml.append("<Query>\r\n");
 			catalogXml.append("<CmdType>Catalog</CmdType>\r\n");
-			catalogXml.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n");
+			catalogXml.append("<SN>" + sn + "</SN>\r\n");
 			catalogXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
 			catalogXml.append("</Query>\r\n");
 			
@@ -1566,17 +1566,28 @@ public class SIPCommander implements ISIPCommander {
 			cmdXml.append("<DeviceID>" + device.getDeviceId() + "</DeviceID>\r\n");
 			cmdXml.append("</Query>\r\n");
 
-			String tm = Long.toString(System.currentTimeMillis());
 
-			CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
-					: udpSipProvider.getNewCallId();
+			Request request;
+			if (dialog != null) {
+				logger.info("发送目录订阅消息时 dialog的状态为: {}", dialog.getState());
+				request = dialog.createRequest(Request.SUBSCRIBE);
+				ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
+				request.setContent(cmdXml.toString(), contentTypeHeader);
+				ExpiresHeader expireHeader = sipFactory.createHeaderFactory().createExpiresHeader(device.getSubscribeCycleForMobilePosition());
+				request.addHeader(expireHeader);
+			}else {
+				String tm = Long.toString(System.currentTimeMillis());
 
-			// 有效时间默认为60秒以上
-			Request request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
-					"fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
-					callIdHeader);
-			transmitRequest(device, request, errorEvent, okEvent);
+				CallIdHeader callIdHeader = device.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
+						: udpSipProvider.getNewCallId();
+
+				// 有效时间默认为60秒以上
+				request = headerProvider.createSubscribeRequest(device, cmdXml.toString(), "z9hG4bK-viaPos-" + tm,
+						"fromTagPos" + tm, null, device.getSubscribeCycleForCatalog(), "Catalog" ,
+						callIdHeader);
 
+			}
+			transmitRequest(device, request, errorEvent, okEvent);
 			return true;
 
 		} catch ( NumberFormatException | ParseException | InvalidArgumentException	| SipException e) {

+ 3 - 3
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderFroPlatform.java

@@ -385,7 +385,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
         if (parentPlatform == null) {
             return false;
         }
-
+        logger.info("[发送 移动位置订阅] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
         try {
             String characterSet = parentPlatform.getCharacterSet();
             StringBuffer deviceStatusXml = new StringBuffer(600);
@@ -405,7 +405,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
             CallIdHeader callIdHeader = parentPlatform.getTransport().equals("TCP") ? tcpSipProvider.getNewCallId()
                     : udpSipProvider.getNewCallId();
             callIdHeader.setCallId(subscribeInfo.getCallId());
-            logger.info("[发送Notify-MobilePosition] {}/{}->{},{}", parentPlatform.getServerGBId(), gpsMsgInfo.getId(), gpsMsgInfo.getLng(), gpsMsgInfo.getLat());
+
             sendNotify(parentPlatform, deviceStatusXml.toString(), subscribeInfo, eventResult -> {
                 logger.error("发送NOTIFY通知消息失败。错误:{} {}", eventResult.statusCode, eventResult.msg);
             }, null);
@@ -459,7 +459,7 @@ public class SIPCommanderFroPlatform implements ISIPCommanderForPlatform {
  		// 设置编码, 防止中文乱码
 		messageFactory.setDefaultContentEncodingCharset(characterSet);
         Dialog dialog  = subscribeInfo.getDialog();
-        if (dialog == null) return;
+        if (dialog == null || !dialog.getState().equals(DialogState.CONFIRMED)) return;
         SIPRequest notifyRequest = (SIPRequest)dialog.createRequest(Request.NOTIFY);
         ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application", "MANSCDP+xml");
         notifyRequest.setContent(catalogXmlContent, contentTypeHeader);

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java

@@ -146,7 +146,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 			} else {
 				mobilePosition.setAltitude(0.0);
 			}
-			logger.info("[收到Notify-MobilePosition]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
+			logger.info("[收到 移动位置订阅]:{}/{}->{}.{}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(),
 					mobilePosition.getLongitude(), mobilePosition.getLatitude());
 			mobilePosition.setReportSource("Mobile Position");
 			BaiduPoint bp = GpsUtil.Wgs84ToBd09(String.valueOf(mobilePosition.getLongitude()), String.valueOf(mobilePosition.getLatitude()));
@@ -281,7 +281,7 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements
 					Element eventElement = itemDevice.element("Event");
 					DeviceChannel channel = XmlUtil.channelContentHander(itemDevice);
 					channel.setDeviceId(device.getDeviceId());
-					logger.info("[收到Notify-Catalog]:{}/{}", device.getDeviceId(), channel.getChannelId());
+					logger.info("[收到 目录订阅]:{}/{}", device.getDeviceId(), channel.getChannelId());
 					switch (eventElement.getText().toUpperCase()) {
 						case CatalogEvent.ON: // 上线
 							logger.info("收到来自设备【{}】的通道【{}】上线通知", device.getDeviceId(), channel.getChannelId());

+ 18 - 9
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/SubscribeRequestProcessor.java

@@ -149,8 +149,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 			subscribeInfo.setDialog(dialog);
 		}
 		String sn = XmlUtil.getText(rootElement, "SN");
-		String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_MobilePosition_" + platformId;
-		logger.info("[notify-MobilePosition]: {}", platformId);
+		logger.info("[回复 移动位置订阅]: {}", platformId);
 		StringBuilder resultXml = new StringBuilder(200);
 		resultXml.append("<?xml version=\"1.0\" ?>\r\n")
 				.append("<Response>\r\n")
@@ -161,14 +160,25 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 				.append("</Response>\r\n");
 
 		if (subscribeInfo.getExpires() > 0) {
-			if (subscribeHolder.getMobilePositionSubscribe(platformId) != null) {
-				dynamicTask.stop(key);
-			}
 			String interval = XmlUtil.getText(rootElement, "Interval"); // GPS上报时间间隔
-			dynamicTask.startCron(key, new MobilePositionSubscribeHandlerTask(redisCatchStorage, sipCommanderForPlatform, storager,  platformId, sn, key, subscribeHolder), Integer.parseInt(interval) -1 );
+			if (interval == null) {
+				subscribeInfo.setGpsInterval(5);
+			}else {
+				subscribeInfo.setGpsInterval(Integer.parseInt(interval));
+			}
+
+			subscribeInfo.setSn(sn);
 			subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
+//			if (subscribeHolder.getMobilePositionSubscribe(platformId) == null ) {
+//				subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
+//			}else {
+//				if (subscribeHolder.getMobilePositionSubscribe(platformId).getDialog() != null
+//						&& subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState() != null
+//						&& !subscribeHolder.getMobilePositionSubscribe(platformId).getDialog().getState().equals(DialogState.CONFIRMED)) {
+//					subscribeHolder.putMobilePositionSubscribe(platformId, subscribeInfo);
+//				}
+//			}
 		}else if (subscribeInfo.getExpires() == 0) {
-			dynamicTask.stop(key);
 			subscribeHolder.removeMobilePositionSubscribe(platformId);
 		}
 
@@ -202,8 +212,7 @@ public class SubscribeRequestProcessor extends SIPRequestProcessorParent impleme
 			subscribeInfo.setDialog(dialog);
 		}
 		String sn = XmlUtil.getText(rootElement, "SN");
-		String key = VideoManagerConstants.SIP_SUBSCRIBE_PREFIX + userSetting.getServerId() +  "_Catalog_" + platformId;
-		logger.info("[notify-Catalog]: {}", platformId);
+		logger.info("[回复 目录订阅]: {}/{}", platformId, deviceID);
 		StringBuilder resultXml = new StringBuilder(200);
 		resultXml.append("<?xml version=\"1.0\" ?>\r\n")
 				.append("<Response>\r\n")

+ 17 - 35
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java

@@ -86,23 +86,17 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
             rootElement = getRootElement(evt, device.getCharset());
             Element deviceListElement = rootElement.element("DeviceList");
             Element sumNumElement = rootElement.element("SumNum");
-            if (sumNumElement == null || deviceListElement == null) {
+            Element snElement = rootElement.element("SN");
+            if (snElement == null || sumNumElement == null || deviceListElement == null) {
                 responseAck(evt, Response.BAD_REQUEST, "xml error");
                 return;
             }
             int sumNum = Integer.parseInt(sumNumElement.getText());
+
             if (sumNum == 0) {
                 // 数据已经完整接收
                 storager.cleanChannelsForDevice(device.getDeviceId());
-                RequestMessage msg = new RequestMessage();
-                msg.setKey(key);
-                WVPResult<Object> result = new WVPResult<>();
-                result.setCode(0);
-                result.setData(device);
-                msg.setData(result);
-                result.setMsg("更新成功,共0条");
-                deferredResultHolder.invokeAllResult(msg);
-                catalogDataCatch.del(key);
+                catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
             }else {
                 Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
                 if (deviceListIterator != null) {
@@ -123,31 +117,22 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
 
                         channelList.add(deviceChannel);
                     }
+                    int sn = Integer.parseInt(snElement.getText());
                     logger.info("收到来自设备【{}】的通道: {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.get(key) == null ? 0 :catalogDataCatch.get(key).size(), sumNum);
-                    catalogDataCatch.put(key, sumNum, device, channelList);
-                    if (catalogDataCatch.get(key).size() == sumNum) {
+                    catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, channelList);
+                    if (catalogDataCatch.get(device.getDeviceId()).size() == sumNum) {
                         // 数据已经完整接收
-                        boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(key));
-                        RequestMessage msg = new RequestMessage();
-                        msg.setKey(key);
-                        WVPResult<Object> result = new WVPResult<>();
-                        result.setCode(0);
-                        result.setData(device);
-                        if (resetChannelsResult || sumNum ==0) {
-                            result.setMsg("更新成功,共" + sumNum + "条,已更新" + catalogDataCatch.get(key).size() + "条");
+                        boolean resetChannelsResult = storager.resetChannels(device.getDeviceId(), catalogDataCatch.get(device.getDeviceId()));
+                        if (!resetChannelsResult) {
+                            String errorMsg = "接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(device.getDeviceId()).size() + "条";
+                            catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), errorMsg);
                         }else {
-                            result.setMsg("接收成功,写入失败,共" + sumNum + "条,已接收" + catalogDataCatch.get(key).size() + "条");
+                            catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), null);
                         }
-                        msg.setData(result);
-                        deferredResultHolder.invokeAllResult(msg);
-                        catalogDataCatch.del(key);
                     }
                 }
                 // 回复200 OK
                 responseAck(evt, Response.OK);
-                if (offLineDetector.isOnline(device.getDeviceId())) {
-                    publisher.onlineEventPublish(device, VideoManagerConstants.EVENT_ONLINE_MESSAGE);
-                }
             }
         } catch (DocumentException e) {
             e.printStackTrace();
@@ -231,21 +216,18 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
     }
 
     public SyncStatus getChannelSyncProgress(String deviceId) {
-        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
-        if (catalogDataCatch.get(key) == null) {
+        if (catalogDataCatch.get(deviceId) == null) {
             return null;
         }else {
-            return catalogDataCatch.getSyncStatus(key);
+            return catalogDataCatch.getSyncStatus(deviceId);
         }
     }
 
-    public void setChannelSyncReady(String deviceId) {
-        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
-        catalogDataCatch.addReady(key);
+    public void setChannelSyncReady(Device device, int sn) {
+        catalogDataCatch.addReady(device, sn);
     }
 
     public void setChannelSyncEnd(String deviceId, String errorMsg) {
-        String key = DeferredResultHolder.CALLBACK_CMD_CATALOG + deviceId;
-        catalogDataCatch.setChannelSyncEnd(key, errorMsg);
+        catalogDataCatch.setChannelSyncEnd(deviceId, errorMsg);
     }
 }

+ 3 - 10
src/main/java/com/genersoft/iot/vmp/service/IDeviceService.java

@@ -44,15 +44,8 @@ public interface IDeviceService {
     SyncStatus getChannelSyncStatus(String deviceId);
 
     /**
-     * 设置通道同步状态
-     * @param deviceId 设备ID
-     */
-    void setChannelSyncReady(String deviceId);
-
-    /**
-     * 设置同步结束
-     * @param deviceId 设备ID
-     * @param errorMsg 错误信息
+     * 通道同步
+     * @param device
      */
-    void setChannelSyncEnd(String deviceId, String errorMsg);
+    void sync(Device device);
 }

+ 23 - 21
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@@ -14,6 +14,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.sip.DialogState;
+
 /**
  * 设备业务(目录订阅)
  */
@@ -39,19 +41,17 @@ public class DeviceServiceImpl implements IDeviceService {
         if (device == null || device.getSubscribeCycleForCatalog() < 0) {
             return false;
         }
-        if (dynamicTask.contains(device.getDeviceId() + "catalog")) {
-            // 存在则停止现有的,开启新的
-            dynamicTask.stop(device.getDeviceId() + "catalog");
+        CatalogSubscribeTask task = (CatalogSubscribeTask)dynamicTask.get(device.getDeviceId() + "catalog");
+        if (task != null && task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加
+            return true;
         }
         logger.info("[添加目录订阅] 设备{}", device.getDeviceId());
         // 添加目录订阅
         CatalogSubscribeTask catalogSubscribeTask = new CatalogSubscribeTask(device, sipCommander);
-        catalogSubscribeTask.run();
         // 提前开始刷新订阅
-        int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog();
+        int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForCatalog(),30);
         // 设置最小值为30
-        subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30);
-        dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog);
+        dynamicTask.startCron(device.getDeviceId() + "catalog", catalogSubscribeTask, subscribeCycleForCatalog -1);
         return true;
     }
 
@@ -70,18 +70,16 @@ public class DeviceServiceImpl implements IDeviceService {
         if (device == null || device.getSubscribeCycleForMobilePosition() < 0) {
             return false;
         }
-        if (dynamicTask.contains(device.getDeviceId() + "mobile_position")) {
-            // 存在则停止现有的,开启新的
-            dynamicTask.stop(device.getDeviceId() + "mobile_position");
-        }
         logger.info("[添加移动位置订阅] 设备{}", device.getDeviceId());
+        MobilePositionSubscribeTask task = (MobilePositionSubscribeTask)dynamicTask.get(device.getDeviceId() + "mobile_position");
+        if (task != null &&  task.getDialogState() != null && task.getDialogState().equals(DialogState.CONFIRMED)) { // 已存在不需要再次添加
+            return true;
+        }
         // 添加目录订阅
         MobilePositionSubscribeTask mobilePositionSubscribeTask = new MobilePositionSubscribeTask(device, sipCommander);
-        mobilePositionSubscribeTask.run();
         // 提前开始刷新订阅
-        int subscribeCycleForCatalog = device.getSubscribeCycleForCatalog();
         // 设置最小值为30
-        subscribeCycleForCatalog = Math.max(subscribeCycleForCatalog, 30);
+        int subscribeCycleForCatalog = Math.max(device.getSubscribeCycleForMobilePosition(),30);
         dynamicTask.startCron(device.getDeviceId() + "mobile_position" , mobilePositionSubscribeTask, subscribeCycleForCatalog -1 );
         return true;
     }
@@ -102,12 +100,16 @@ public class DeviceServiceImpl implements IDeviceService {
     }
 
     @Override
-    public void setChannelSyncReady(String deviceId) {
-        catalogResponseMessageHandler.setChannelSyncReady(deviceId);
-    }
-
-    @Override
-    public void setChannelSyncEnd(String deviceId, String errorMsg) {
-        catalogResponseMessageHandler.setChannelSyncEnd(deviceId, errorMsg);
+    public void sync(Device device) {
+        if (catalogResponseMessageHandler.getChannelSyncProgress(device.getDeviceId()) != null) {
+            logger.info("开启同步时发现同步已经存在");
+            return;
+        }
+        int sn = (int)((Math.random()*9+1)*100000);
+        catalogResponseMessageHandler.setChannelSyncReady(device, sn);
+        sipCommander.catalogQuery(device, sn, event -> {
+            String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
+            catalogResponseMessageHandler.setChannelSyncEnd(device.getDeviceId(), errorMsg);
+        });
     }
 }

+ 8 - 5
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java

@@ -238,12 +238,15 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 
 	@Override
 	public boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
+		if (deviceChannelList == null) {
+			return false;
+		}
 		TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
 		// 数据去重
 		List<DeviceChannel> channels = new ArrayList<>();
 		StringBuilder stringBuilder = new StringBuilder();
 		Map<String, Integer> subContMap = new HashMap<>();
-		if (deviceChannelList.size() > 1) {
+		if (deviceChannelList != null && deviceChannelList.size() > 1) {
 			// 数据去重
 			Set<String> gbIdSet = new HashSet<>();
 			for (DeviceChannel deviceChannel : deviceChannelList) {
@@ -300,6 +303,7 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 			dataSourceTransactionManager.commit(transactionStatus);     //手动提交
 			return true;
 		}catch (Exception e) {
+			e.printStackTrace();
 			dataSourceTransactionManager.rollback(transactionStatus);
 			return false;
 		}
@@ -415,10 +419,9 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 		TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
 		boolean result = false;
 		try {
-			if (platformChannelMapper.delChannelForDeviceId(deviceId) <0  // 删除与国标平台的关联
-					|| deviceChannelMapper.cleanChannelsByDeviceId(deviceId) < 0 // 删除他的通道
-					|| deviceMapper.del(deviceId) < 0 // 移除设备信息
-			) {
+			platformChannelMapper.delChannelForDeviceId(deviceId);
+			deviceChannelMapper.cleanChannelsByDeviceId(deviceId);
+			if ( deviceMapper.del(deviceId) < 0 ) {
 				//事务回滚
 				dataSourceTransactionManager.rollback(transactionStatus);
 			}

+ 37 - 9
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/device/DeviceQuery.java

@@ -4,8 +4,13 @@ import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
 import com.genersoft.iot.vmp.gb28181.bean.SyncStatus;
 import com.genersoft.iot.vmp.gb28181.event.DeviceOffLineDetector;
+import com.genersoft.iot.vmp.gb28181.task.ISubscribeTask;
+import com.genersoft.iot.vmp.gb28181.task.impl.CatalogSubscribeTask;
+import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeHandlerTask;
+import com.genersoft.iot.vmp.gb28181.task.impl.MobilePositionSubscribeTask;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
 import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
@@ -29,9 +34,8 @@ import org.springframework.util.StringUtils;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
+import javax.sip.DialogState;
+import java.util.*;
 
 @Api(tags = "国标设备查询", value = "国标设备查询")
 @SuppressWarnings("rawtypes")
@@ -63,6 +67,9 @@ public class DeviceQuery {
 	@Autowired
 	private DynamicTask dynamicTask;
 
+	@Autowired
+	private SubscribeHolder subscribeHolder;
+
 	/**
 	 * 使用ID查询国标设备
 	 * @param deviceId 国标ID
@@ -165,12 +172,8 @@ public class DeviceQuery {
 			wvpResult.setData(syncStatus);
 			return wvpResult;
 		}
-		SyncStatus syncStatusReady = new SyncStatus();
-		deviceService.setChannelSyncReady(deviceId);
-		cmder.catalogQuery(device, event -> {
-			String errorMsg = String.format("同步通道失败,错误码: %s, %s", event.statusCode, event.msg);
-			deviceService.setChannelSyncEnd(deviceId, errorMsg);
-		});
+		deviceService.sync(device);
+
 		WVPResult<SyncStatus> wvpResult = new WVPResult<>();
 		wvpResult.setCode(0);
 		wvpResult.setMsg("开始同步");
@@ -469,4 +472,29 @@ public class DeviceQuery {
 		}
 		return wvpResult;
 	}
+
+	@GetMapping("/{deviceId}/subscribe_info")
+	@ApiOperation(value = "获取设备的订阅状态", notes = "获取设备的订阅状态")
+	public WVPResult<Map<String, String>> getSubscribeInfo(@PathVariable String deviceId) {
+		Set<String> allKeys = dynamicTask.getAllKeys();
+		Map<String, String> dialogStateMap = new HashMap<>();
+		for (String key : allKeys) {
+			if (key.startsWith(deviceId)) {
+				ISubscribeTask subscribeTask = (ISubscribeTask)dynamicTask.get(key);
+				DialogState dialogState = subscribeTask.getDialogState();
+				if (dialogState == null) {
+					continue;
+				}
+				if (subscribeTask instanceof CatalogSubscribeTask) {
+					dialogStateMap.put("catalog", dialogState.toString());
+				}else if (subscribeTask instanceof MobilePositionSubscribeTask) {
+					dialogStateMap.put("mobilePosition", dialogState.toString());
+				}
+			}
+		}
+		WVPResult<Map<String, String>> wvpResult = new WVPResult<>();
+		wvpResult.setCode(0);
+		wvpResult.setData(dialogStateMap);
+		return wvpResult;
+	}
 }

+ 39 - 3
src/main/java/com/genersoft/iot/vmp/vmanager/server/ServerController.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.genersoft.iot.vmp.VManageBootstrap;
 import com.genersoft.iot.vmp.common.VersionPo;
+import com.genersoft.iot.vmp.conf.DynamicTask;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.VersionInfo;
@@ -27,6 +28,7 @@ import javax.sip.ObjectInUseException;
 import javax.sip.SipProvider;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 @SuppressWarnings("rawtypes")
 @Api(tags = "服务控制")
@@ -42,13 +44,16 @@ public class ServerController {
     private IMediaServerService mediaServerService;
 
     @Autowired
-    VersionInfo versionInfo;
+    private VersionInfo versionInfo;
 
     @Autowired
-    SipConfig sipConfig;
+    private SipConfig sipConfig;
 
     @Autowired
-    UserSetting userSetting;
+    private UserSetting userSetting;
+
+    @Autowired
+    private DynamicTask dynamicTask;
 
     @Value("${server.port}")
     private int serverPort;
@@ -248,4 +253,35 @@ public class ServerController {
         result.setData(jsonObject);
         return result;
     }
+
+//    @ApiOperation("当前进行中的动态任务")
+//    @GetMapping(value = "/dynamicTask")
+//    @ResponseBody
+//    public WVPResult<JSONObject> getDynamicTask(){
+//        WVPResult<JSONObject> result = new WVPResult<>();
+//        result.setCode(0);
+//        result.setMsg("success");
+//
+//        JSONObject jsonObject = new JSONObject();
+//
+//        Set<String> allKeys = dynamicTask.getAllKeys();
+//        jsonObject.put("server.port", serverPort);
+//        if (StringUtils.isEmpty(type)) {
+//            jsonObject.put("sip", JSON.toJSON(sipConfig));
+//            jsonObject.put("base", JSON.toJSON(userSetting));
+//        }else {
+//            switch (type){
+//                case "sip":
+//                    jsonObject.put("sip", sipConfig);
+//                    break;
+//                case "base":
+//                    jsonObject.put("base", userSetting);
+//                    break;
+//                default:
+//                    break;
+//            }
+//        }
+//        result.setData(jsonObject);
+//        return result;
+//    }
 }

+ 29 - 16
web_src/src/components/dialog/SyncChannelProgress.vue

@@ -61,23 +61,36 @@ export default {
           if (!this.syncFlag) {
             this.syncFlag = true;
           }
-          if (res.data.data == null) {
-            this.syncStatus = "success"
-            this.percentage = 100;
-            this.msg = '同步成功';
-          }else if (res.data.data.total == 0){
-            this.msg = `等待同步中`;
-            this.timmer = setTimeout(this.getProgress, 300)
-          }else if (res.data.data.errorMsg !== null ){
-            this.msg = res.data.data.errorMsg;
-            this.syncStatus = "exception"
-          }else {
-            this.total = res.data.data.total;
-            this.current = res.data.data.current;
-            this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
-            this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
-            this.timmer = setTimeout(this.getProgress, 300)
+
+          if (res.data.data != null) {
+            if (res.data.data.total == 0) {
+              if (res.data.data.errorMsg !== null ){
+                this.msg = res.data.data.errorMsg;
+                this.syncStatus = "exception"
+              }else {
+                this.msg = `等待同步中`;
+                this.timmer = setTimeout(this.getProgress, 300)
+              }
+            }else  {
+              if (res.data.data.total == res.data.data.current) {
+                this.syncStatus = "success"
+                this.percentage = 100;
+                this.msg = '同步成功';
+              }else {
+                if (res.data.data.errorMsg !== null ){
+                  this.msg = res.data.data.errorMsg;
+                  this.syncStatus = "exception"
+                }else {
+                  this.total = res.data.data.total;
+                  this.current = res.data.data.current;
+                  this.percentage = Math.floor(Number(res.data.data.current)/Number(res.data.data.total)* 10000)/100;
+                  this.msg = `同步中...[${res.data.data.current}/${res.data.data.total}]`;
+                  this.timmer = setTimeout(this.getProgress, 300)
+                }
+              }
+            }
           }
+
         }else {
           if (this.syncFlag) {
             this.syncStatus = "success"