Преглед изворни кода

优化info消息的cseq计数

648540858 пре 4 година
родитељ
комит
0c10e8d9d3

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java

@@ -56,6 +56,8 @@ public class VideoManagerConstants {
 
 	public static final String MEDIA_TRANSACTION_USED_PREFIX = "VMP_MEDIA_TRANSACTION_";
 
+	public static final String SIP_CSEQ_PREFIX = "VMP_SIP_CSEQ_";
+
 	//************************** redis 消息*********************************
 	public static final String WVP_MSG_STREAM_CHANGE_PREFIX = "WVP_MSG_STREAM_CHANGE_";
 

+ 7 - 1
src/main/java/com/genersoft/iot/vmp/conf/runner/SipDeviceRunner.java

@@ -1,5 +1,7 @@
 package com.genersoft.iot.vmp.conf.runner;
 
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -23,6 +25,9 @@ public class SipDeviceRunner implements CommandLineRunner {
     @Autowired
     private IRedisCatchStorage redisCatchStorage;
 
+    @Autowired
+    private UserSetup userSetup;
+
     @Override
     public void run(String... args) throws Exception {
         // 读取redis没有心跳信息的则设置为离线,等收到下次心跳设置为在线
@@ -32,7 +37,8 @@ public class SipDeviceRunner implements CommandLineRunner {
         for (String deviceId : onlineForAll) {
             storager.online(deviceId);
         }
-
+        // 重置cseq计数
+        redisCatchStorage.resetAllCSEQ();
         // TODO 查询在线设备那些开启了订阅,为设备开启定时的目录订阅
     }
 }

+ 10 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java

@@ -14,7 +14,7 @@ import javax.sip.message.Request;
 
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
-import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -35,6 +35,9 @@ public class SIPRequestHeaderProvider {
 	@Autowired
 	private SipFactory sipFactory;
 
+	@Autowired
+	private IRedisCatchStorage redisCatchStorage;
+
 	@Autowired
 	private VideoStreamSessionManager streamSession;
 	
@@ -195,6 +198,7 @@ public class SIPRequestHeaderProvider {
 
 		// Forwards
 		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
+
 		// ceq
 		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory().createCSeqHeader(1L, Request.SUBSCRIBE);
 
@@ -218,7 +222,7 @@ public class SIPRequestHeaderProvider {
 		return request;
 	}
 
-	public Request createInfoRequest(Device device, StreamInfo streamInfo, String content)
+	public Request createInfoRequest(Device device, StreamInfo streamInfo, String content, Long cseq)
 			throws PeerUnavailableException, ParseException, InvalidArgumentException {
 		Request request = null;
 		Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId());
@@ -247,10 +251,12 @@ public class SIPRequestHeaderProvider {
 
 		// Forwards
 		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
-
+		if (cseq == null) {
+			cseq = redisCatchStorage.getCSEQ(Request.INFO);
+		}
 		// ceq
 		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory()
-				.createCSeqHeader(InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()), Request.INFO);
+				.createCSeqHeader(cseq, Request.INFO);
 
 		request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader,
 				fromHeader, toHeader, viaHeaders, maxForwards);

+ 12 - 10
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -18,7 +18,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
-import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
 import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.SipStackImpl;
 import gov.nist.javax.sip.message.SIPRequest;
@@ -1553,12 +1552,12 @@ public class SIPCommander implements ISIPCommander {
 	@Override
 	public void playPauseCmd(Device device, StreamInfo streamInfo) {
 		try {
-
+			Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
 			StringBuffer content = new StringBuffer(200);
 			content.append("PAUSE RTSP/1.0\r\n");
-			content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+			content.append("CSeq: " + cseq + "\r\n");
 			content.append("PauseTime: now\r\n");
-			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
 			logger.info(request.toString());
 			ClientTransaction clientTransaction = null;
 			if ("TCP".equals(device.getTransport())) {
@@ -1581,11 +1580,12 @@ public class SIPCommander implements ISIPCommander {
 	@Override
 	public void playResumeCmd(Device device, StreamInfo streamInfo) {
 		try {
+			Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
 			StringBuffer content = new StringBuffer(200);
 			content.append("PLAY RTSP/1.0\r\n");
-			content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+			content.append("CSeq: " + cseq + "\r\n");
 			content.append("Range: npt=now-\r\n");
-			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
 			logger.info(request.toString());
 			ClientTransaction clientTransaction = null;
 			if ("TCP".equals(device.getTransport())) {
@@ -1607,12 +1607,13 @@ public class SIPCommander implements ISIPCommander {
 	@Override
 	public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) {
 		try {
+			Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
 			StringBuffer content = new StringBuffer(200);
 			content.append("PLAY RTSP/1.0\r\n");
-			content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+			content.append("CSeq: " + cseq + "\r\n");
 			content.append("Range: npt=" + Math.abs(seekTime) + "-\r\n");
 
-			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
 			logger.info(request.toString());
 			ClientTransaction clientTransaction = null;
 			if ("TCP".equals(device.getTransport())) {
@@ -1634,11 +1635,12 @@ public class SIPCommander implements ISIPCommander {
 	@Override
 	public void playSpeedCmd(Device device, StreamInfo streamInfo, Double speed) {
 		try {
+			Long cseq = redisCatchStorage.getCSEQ(Request.INFO);
 			StringBuffer content = new StringBuffer(200);
 			content.append("PLAY RTSP/1.0\r\n");
-			content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+			content.append("CSeq: " + cseq + "\r\n");
 			content.append("Scale: " + String.format("%.1f",speed) + "\r\n");
-			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString(), cseq);
 			logger.info(request.toString());
 			ClientTransaction clientTransaction = null;
 			if ("TCP".equals(device.getTransport())) {

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRunner.java

@@ -89,7 +89,7 @@ public class ZLMRunner implements CommandLineRunner {
                 });
 
         // 获取zlm信息
-        logger.info("等待默认zlm接入...");
+        logger.info("[zlm接入]等待默认zlm中...");
 
         // 获取所有的zlm, 并开启主动连接
         List<MediaServerItem> all = mediaServerService.getAllFromDatabase();

+ 18 - 14
src/main/java/com/genersoft/iot/vmp/service/bean/CatalogSubscribeTask.java

@@ -25,24 +25,28 @@ public class CatalogSubscribeTask implements Runnable{
         sipCommander.catalogSubscribe(device, eventResult -> {
             ResponseEvent event = (ResponseEvent) eventResult.event;
             Element rootElement = null;
-            try {
-                rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
-            } catch (DocumentException e) {
-                e.printStackTrace();
-            }
-            Element resultElement = rootElement.element("Result");
-            String result = resultElement.getText();
-            if (result.toUpperCase().equals("OK")){
-                // 成功
-                logger.info("目录订阅成功: {}", device.getDeviceId());
+            if (event.getResponse().getRawContent() != null) {
+                try {
+                    rootElement = XmlUtil.getRootElement(event.getResponse().getRawContent(), "gb2312");
+                } catch (DocumentException e) {
+                    e.printStackTrace();
+                }
+                Element resultElement = rootElement.element("Result");
+                String result = resultElement.getText();
+                if (result.toUpperCase().equals("OK")){
+                    // 成功
+                    logger.info("[目录订阅]成功: {}", device.getDeviceId());
+                }else {
+                    // 失败
+                    logger.info("[目录订阅]失败: {}-{}", device.getDeviceId(), result);
+                }
             }else {
-                // 失败
-                logger.info("目录订阅失败: {}-{}", device.getDeviceId(), result);
+                // 成功
+                logger.info("[目录订阅]成功: {}", device.getDeviceId());
             }
-
         },eventResult -> {
             // 失败
-            logger.warn("目录订阅失败: {}-信令发送失败", device.getDeviceId());
+            logger.warn("[目录订阅]失败,信令发送失败: {}-{} ", device.getDeviceId(), eventResult.msg);
         });
     }
 }

+ 2 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/DeviceServiceImpl.java

@@ -51,6 +51,8 @@ public class DeviceServiceImpl implements IDeviceService {
         dynamicTask.stopCron(device.getDeviceId());
         device.setSubscribeCycleForCatalog(0);
         sipCommander.catalogSubscribe(device, null, null);
+        // 清空cseq计数
+
         return true;
     }
 }

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServerServiceImpl.java

@@ -83,7 +83,7 @@ public class MediaServerServiceImpl implements IMediaServerService, CommandLineR
      */
     @Override
     public void run(String... args) throws Exception {
-        logger.info("Media Server 缓存初始化");
+        logger.info("[缓存初始化] Media Server ");
         List<MediaServerItem> mediaServerItemList = mediaServerMapper.queryAll();
         for (MediaServerItem mediaServerItem : mediaServerItemList) {
             if (StringUtils.isEmpty(mediaServerItem.getId())) {

+ 10 - 0
src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java

@@ -14,6 +14,14 @@ import java.util.Map;
 
 public interface IRedisCatchStorage {
 
+    /**
+     * 计数器。为cseq进行计数
+     *
+     * @param method sip 方法
+     * @return
+     */
+    Long getCSEQ(String method);
+
     /**
      * 开始播放时将流存入
      *
@@ -181,4 +189,6 @@ public interface IRedisCatchStorage {
      * 获取Device
      */
     Device getDevice(String deviceId);
+
+    void resetAllCSEQ();
 }

+ 22 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -36,6 +36,28 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
 
     private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
+    @Override
+    public Long getCSEQ(String method) {
+        String key = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetup.getServerId() + "_" +  method;
+
+        long result =  redis.incr(key, 1L);
+        if (result > Integer.MAX_VALUE) {
+            redis.set(key, 1);
+            result = 1;
+        }
+        return result;
+    }
+
+    @Override
+    public void resetAllCSEQ() {
+        String scanKey = VideoManagerConstants.SIP_CSEQ_PREFIX  + userSetup.getServerId() + "_*";
+        List<Object> keys = redis.scan(scanKey);
+        for (int i = 0; i < keys.size(); i++) {
+            String key = (String) keys.get(i);
+            redis.set(key, 1);
+        }
+    }
+
     /**
      * 开始播放时将流存入redis
      *

+ 0 - 13
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java

@@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.service.IPlayService;
-import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
@@ -31,7 +30,6 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
 import org.springframework.web.context.request.async.DeferredResult;
 
-import javax.sip.message.Response;
 import java.util.UUID;
 
 @Api(tags = "视频回放")
@@ -168,7 +166,6 @@ public class PlaybackController {
 			logger.warn("streamId不存在!");
 			return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
 		}
-		setCseq(streamId);
 		Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
 		cmder.playPauseCmd(device, streamInfo);
 		json.put("msg", "ok");
@@ -189,7 +186,6 @@ public class PlaybackController {
 			logger.warn("streamId不存在!");
 			return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
 		}
-		setCseq(streamId);
 		Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
 		cmder.playResumeCmd(device, streamInfo);
 		json.put("msg", "ok");
@@ -211,7 +207,6 @@ public class PlaybackController {
 			logger.warn("streamId不存在!");
 			return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
 		}
-		setCseq(streamId);
 		Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
 		cmder.playSeekCmd(device, streamInfo, seekTime);
 		json.put("msg", "ok");
@@ -238,18 +233,10 @@ public class PlaybackController {
 			logger.warn("不支持的speed: " + speed);
 			return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
 		}
-		setCseq(streamId);
 		Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
 		cmder.playSpeedCmd(device, streamInfo, speed);
 		json.put("msg", "ok");
 		return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
 	}
 
-	public void setCseq(String streamId) {
-		if (InfoCseqCache.CSEQCACHE.containsKey(streamId)) {
-			InfoCseqCache.CSEQCACHE.put(streamId, InfoCseqCache.CSEQCACHE.get(streamId) + 1);
-		} else {
-			InfoCseqCache.CSEQCACHE.put(streamId, 2L);
-		}
-	}
 }

+ 0 - 14
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java

@@ -1,14 +0,0 @@
-package com.genersoft.iot.vmp.vmanager.gb28181.session;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * @ClassName: InfoCseqCache
- * @Description: INFO类型的Sip中cseq的缓存
- */
-public class InfoCseqCache {
-
-    public static Map<String, Long> CSEQCACHE = new ConcurrentHashMap<>();
-
-}