Browse Source

Merge pull request #771 from mrjackwang/wvp-28181-2.0

修复历史录像下载问题,查询历史录像问题
648540858 2 năm trước cách đây
mục cha
commit
08c2fa45f7

+ 1 - 2
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java

@@ -122,7 +122,7 @@ public interface ISIPCommander {
 	 */ 
 	 */ 
 	void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
 	void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
 						   String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
 						   String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
-						   SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException;
+						   SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
 
 
 	/**
 	/**
 	 * 视频流停止
 	 * 视频流停止
@@ -221,7 +221,6 @@ public interface ISIPCommander {
 	 *
 	 *
 	 * @param device      视频设备
 	 * @param device      视频设备
 	 * @param channelId      通道id,非通道则是设备本身
 	 * @param channelId      通道id,非通道则是设备本身
-	 * @param frontCmd     上级平台的指令,如果存在则直接下发
 	 * @param enabled     看守位使能:1 = 开启,0 = 关闭
 	 * @param enabled     看守位使能:1 = 开启,0 = 关闭
 	 * @param resetTime   自动归位时间间隔,开启看守位时使用,单位:秒(s)
 	 * @param resetTime   自动归位时间间隔,开启看守位时使用,单位:秒(s)
 	 * @param presetIndex 调用预置位编号,开启看守位时使用,取值范围0~255
 	 * @param presetIndex 调用预置位编号,开启看守位时使用,取值范围0~255

+ 23 - 12
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -29,7 +29,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.DependsOn;
 import org.springframework.context.annotation.DependsOn;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
 import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
 
 
 import javax.sip.InvalidArgumentException;
 import javax.sip.InvalidArgumentException;
 import javax.sip.ResponseEvent;
 import javax.sip.ResponseEvent;
@@ -471,8 +470,9 @@ public class SIPCommander implements ISIPCommander {
      */
      */
     @Override
     @Override
     public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
     public void downloadStreamCmd(MediaServerItem mediaServerItem, SSRCInfo ssrcInfo, Device device, String channelId,
-                                  String startTime, String endTime, int downloadSpeed, InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
-                                  SipSubscribe.Event errorEvent) throws InvalidArgumentException, SipException, ParseException {
+                                  String startTime, String endTime, int downloadSpeed,
+                                  InviteStreamCallback inviteStreamCallback, InviteStreamCallback hookEvent,
+                                  SipSubscribe.Event errorEvent,SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException {
 
 
         logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
         logger.info("{} 分配的ZLM为: {} [{}:{}]", ssrcInfo.getStream(), mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
         String sdpIp;
         String sdpIp;
@@ -541,11 +541,14 @@ public class SIPCommander implements ISIPCommander {
         content.append("a=downloadspeed:" + downloadSpeed + "\r\n");
         content.append("a=downloadspeed:" + downloadSpeed + "\r\n");
 
 
         content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
         content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
-        
+        logger.debug("此时请求下载信令的ssrc===>{}",ssrcInfo.getSsrc());
         HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
         HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, null, mediaServerItem.getId());
         // 添加订阅
         // 添加订阅
+        CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
+        String callId=newCallIdHeader.getCallId();
         subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
         subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject json) -> {
-            hookEvent.call(new InviteStreamInfo(mediaServerItem, json,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream()));
+            logger.debug("sipc 添加订阅===callId {}",callId);
+            hookEvent.call(new InviteStreamInfo(mediaServerItem, json,callId, "rtp", ssrcInfo.getStream()));
             subscribe.removeSubscribe(hookSubscribe);
             subscribe.removeSubscribe(hookSubscribe);
             hookSubscribe.getContent().put("regist", false);
             hookSubscribe.getContent().put("regist", false);
             hookSubscribe.getContent().put("schema", "rtsp");
             hookSubscribe.getContent().put("schema", "rtsp");
@@ -554,7 +557,7 @@ public class SIPCommander implements ISIPCommander {
                     (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> {
                     (MediaServerItem mediaServerItemForEnd, JSONObject jsonForEnd) -> {
                         logger.info("[录像]下载结束, 发送BYE");
                         logger.info("[录像]下载结束, 发送BYE");
                         try {
                         try {
-                            streamByeCmd(device, channelId, ssrcInfo.getStream(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId());
+                            streamByeCmd(device, channelId, ssrcInfo.getStream(),callId);
                         } catch (InvalidArgumentException | ParseException | SipException |
                         } catch (InvalidArgumentException | ParseException | SipException |
                                  SsrcTransactionNotFoundException e) {
                                  SsrcTransactionNotFoundException e) {
                             logger.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage());
                             logger.error("[录像]下载结束, 发送BYE失败 {}", e.getMessage());
@@ -562,15 +565,24 @@ public class SIPCommander implements ISIPCommander {
                     });
                     });
         });
         });
 
 
-        Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc());
+        Request request = headerProvider.createPlaybackInviteRequest(device, channelId, content.toString(), null, SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc());
         if (inviteStreamCallback != null) {
         if (inviteStreamCallback != null) {
-            inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()).getCallId(), "rtp", ssrcInfo.getStream()));
+            inviteStreamCallback.call(new InviteStreamInfo(mediaServerItem, null,callId, "rtp", ssrcInfo.getStream()));
         }
         }
 
 
-        sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, okEvent -> {
-            ResponseEvent responseEvent = (ResponseEvent) okEvent.event;
+        sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> {
+            ResponseEvent responseEvent = (ResponseEvent) event.event;
             SIPResponse response = (SIPResponse) responseEvent.getResponse();
             SIPResponse response = (SIPResponse) responseEvent.getResponse();
-            streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download);
+            String contentString =new String(response.getRawContent());
+            int ssrcIndex = contentString.indexOf("y=");
+            String ssrc=ssrcInfo.getSsrc();
+            if (ssrcIndex >= 0) {
+                ssrc = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
+            }
+            logger.debug("接收到的下载响应ssrc====>{}",ssrcInfo.getSsrc());
+            logger.debug("接收到的下载响应ssrc====>{}",ssrc);
+            streamSession.put(device.getDeviceId(), channelId, response.getCallIdHeader().getCallId(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, VideoStreamSessionManager.SessionType.download);
+            okEvent.response(event);
         });
         });
     }
     }
 
 
@@ -802,7 +814,6 @@ public class SIPCommander implements ISIPCommander {
      *
      *
      * @param device      视频设备
      * @param device      视频设备
      * @param channelId      通道id,非通道则是设备本身
      * @param channelId      通道id,非通道则是设备本身
-     * @param frontCmd     上级平台的指令,如果存在则直接下发
      * @param enabled     看守位使能:1 = 开启,0 = 关闭
      * @param enabled     看守位使能:1 = 开启,0 = 关闭
      * @param resetTime   自动归位时间间隔,开启看守位时使用,单位:秒(s)
      * @param resetTime   自动归位时间间隔,开启看守位时使用,单位:秒(s)
      * @param presetIndex 调用预置位编号,开启看守位时使用,取值范围0~255
      * @param presetIndex 调用预置位编号,开启看守位时使用,取值范围0~255

+ 3 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java

@@ -165,7 +165,10 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
         // 对数据进行排序
         // 对数据进行排序
         if(recordInfo!=null && recordInfo.getRecordList()!=null) {
         if(recordInfo!=null && recordInfo.getRecordList()!=null) {
             Collections.sort(recordInfo.getRecordList());
             Collections.sort(recordInfo.getRecordList());
+        }else{
+            recordInfo.setRecordList(new ArrayList<>());
         }
         }
+
         RequestMessage msg = new RequestMessage();
         RequestMessage msg = new RequestMessage();
         msg.setKey(key);
         msg.setKey(key);
         msg.setData(recordInfo);
         msg.setData(recordInfo);

+ 67 - 15
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -635,23 +635,75 @@ public class PlayServiceImpl implements IPlayService {
             hookCallBack.call(downloadResult);
             hookCallBack.call(downloadResult);
             streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
             streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
         };
         };
-
+        InviteStreamCallback hookEvent = (InviteStreamInfo inviteStreamInfo) -> {
+            logger.info("收到订阅消息: " + inviteStreamInfo.getCallId());
+            dynamicTask.stop(downLoadTimeOutTaskKey);
+            StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
+            streamInfo.setStartTime(startTime);
+            streamInfo.setEndTime(endTime);
+            redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
+            downloadResult.setCode(ErrorCode.SUCCESS.getCode());
+            downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
+            downloadResult.setData(streamInfo);
+            downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
+            downloadResult.setResponse(inviteStreamInfo.getResponse());
+            hookCallBack.call(downloadResult);
+        };
         try {
         try {
             cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
             cmder.downloadStreamCmd(mediaServerItem, ssrcInfo, device, channelId, startTime, endTime, downloadSpeed, infoCallBack,
-                    inviteStreamInfo -> {
-                        logger.info("收到订阅消息: " + inviteStreamInfo.getResponse().toJSONString());
-                        dynamicTask.stop(downLoadTimeOutTaskKey);
-                        StreamInfo streamInfo = onPublishHandler(inviteStreamInfo.getMediaServerItem(), inviteStreamInfo.getResponse(), deviceId, channelId);
-                        streamInfo.setStartTime(startTime);
-                        streamInfo.setEndTime(endTime);
-                        redisCatchStorage.startDownload(streamInfo, inviteStreamInfo.getCallId());
-                        downloadResult.setCode(ErrorCode.SUCCESS.getCode());
-                        downloadResult.setMsg(ErrorCode.SUCCESS.getMsg());
-                        downloadResult.setData(streamInfo);
-                        downloadResult.setMediaServerItem(inviteStreamInfo.getMediaServerItem());
-                        downloadResult.setResponse(inviteStreamInfo.getResponse());
-                        hookCallBack.call(downloadResult);
-                    }, errorEvent);
+                    hookEvent, errorEvent, eventResult ->
+                    {
+                        if (eventResult.type == SipSubscribe.EventResultType.response) {
+                            ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
+                            String contentString = new String(responseEvent.getResponse().getRawContent());
+                            // 获取ssrc
+                            int ssrcIndex = contentString.indexOf("y=");
+                            // 检查是否有y字段
+                            if (ssrcIndex >= 0) {
+                                //ssrc规定长度为10字节,不取余下长度以避免后续还有“f=”字段 TODO 后续对不规范的非10位ssrc兼容
+                                String ssrcInResponse = contentString.substring(ssrcIndex + 2, ssrcIndex + 12);
+                                // 查询到ssrc不一致且开启了ssrc校验则需要针对处理
+                                if (ssrcInfo.getSsrc().equals(ssrcInResponse)) {
+                                    return;
+                                }
+                                logger.info("[回放消息] 收到invite 200, 发现下级自定义了ssrc: {}", ssrcInResponse);
+                                if (!mediaServerItem.isRtpEnable() || device.isSsrcCheck()) {
+                                    logger.info("[回放消息] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
+
+                                    if (!mediaServerItem.getSsrcConfig().checkSsrc(ssrcInResponse)) {
+                                        // ssrc 不可用
+                                        // 释放ssrc
+                                        mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
+                                        streamSession.remove(device.getDeviceId(), channelId, ssrcInfo.getStream());
+                                        eventResult.msg = "下级自定义了ssrc,但是此ssrc不可用";
+                                        eventResult.statusCode = 400;
+                                        errorEvent.response(eventResult);
+                                        return;
+                                    }
+
+                                    // 单端口模式streamId也有变化,需要重新设置监听
+                                    if (!mediaServerItem.isRtpEnable()) {
+                                        // 添加订阅
+                                        HookSubscribeForStreamChange hookSubscribe = HookSubscribeFactory.on_stream_changed("rtp", ssrcInfo.getStream(), true, "rtsp", mediaServerItem.getId());
+                                        subscribe.removeSubscribe(hookSubscribe);
+                                        hookSubscribe.getContent().put("stream", String.format("%08x", Integer.parseInt(ssrcInResponse)).toUpperCase());
+                                        subscribe.addSubscribe(hookSubscribe, (MediaServerItem mediaServerItemInUse, JSONObject response) -> {
+                                            logger.info("[ZLM HOOK] ssrc修正后收到订阅消息: " + response.toJSONString());
+                                            dynamicTask.stop(downLoadTimeOutTaskKey);
+                                            // hook响应
+                                            onPublishHandlerForPlayback(mediaServerItemInUse, response, device.getDeviceId(), channelId, hookCallBack);
+                                            hookEvent.call(new InviteStreamInfo(mediaServerItem, null, eventResult.callId, "rtp", ssrcInfo.getStream()));
+                                        });
+                                    }
+                                    // 关闭rtp server
+                                    mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
+                                    // 重新开启ssrc server
+                                    mediaServerService.openRTPServer(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse, device.isSsrcCheck(), true, ssrcInfo.getPort());
+                                }
+                            }
+                        }
+
+                    });
         } catch (InvalidArgumentException | SipException | ParseException e) {
         } catch (InvalidArgumentException | SipException | ParseException e) {
             logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
             logger.error("[命令发送失败] 录像下载: {}", e.getMessage());
 
 

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/storager/dao/DeviceChannelMapper.java

@@ -316,10 +316,10 @@ public interface DeviceChannelMapper {
             "select * " +
             "select * " +
             "from device_channel " +
             "from device_channel " +
             "where deviceId=#{deviceId}" +
             "where deviceId=#{deviceId}" +
-            " <if test='parentId != null and length != null' > and parentId = #{parentId} or left(channelId, #{parentId.length()}) = #{parentId} and length(channelId)=#{length} </if>" +
+            " <if test='parentId != null and length != null' > and parentId = #{parentId} or left(channelId, LENGTH(#{parentId})) = #{parentId} and length(channelId)=#{length} </if>" +
             " <if test='parentId == null and length != null' > and parentId = #{parentId} or length(channelId)=#{length} </if>" +
             " <if test='parentId == null and length != null' > and parentId = #{parentId} or length(channelId)=#{length} </if>" +
             " <if test='parentId == null and length == null' > and parentId = #{parentId} </if>" +
             " <if test='parentId == null and length == null' > and parentId = #{parentId} </if>" +
-            " <if test='parentId != null and length == null' > and parentId = #{parentId} or left(channelId, #{parentId.length()}) = #{parentId} </if>" +
+            " <if test='parentId != null and length == null' > and parentId = #{parentId} or left(channelId, LENGTH(#{parentId})) = #{parentId} </if>" +
             " </script>"})
             " </script>"})
     List<DeviceChannel> getChannelsWithCivilCodeAndLength(String deviceId, String parentId, Integer length);
     List<DeviceChannel> getChannelsWithCivilCodeAndLength(String deviceId, String parentId, Integer length);
 
 

+ 7 - 5
src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java

@@ -177,12 +177,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
     @Override
     @Override
     public boolean startDownload(StreamInfo stream, String callId) {
     public boolean startDownload(StreamInfo stream, String callId) {
         boolean result;
         boolean result;
+        String key=String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
+                userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId);
         if (stream.getProgress() == 1) {
         if (stream.getProgress() == 1) {
-            result = RedisUtil.set(String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
-                    userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream);
+            logger.debug("添加下载缓存==已完成下载=》{}",key);
+            result = RedisUtil.set(key, stream);
         }else {
         }else {
-            result = RedisUtil.set(String.format("%S_%s_%s_%s_%s_%s_%s", VideoManagerConstants.DOWNLOAD_PREFIX,
-                    userSetting.getServerId(), stream.getMediaServerId(), stream.getDeviceID(), stream.getChannelId(), stream.getStream(), callId), stream, 60*60);
+            logger.debug("添加下载缓存==未完成下载=》{}",key);
+            result = RedisUtil.set(key, stream, 60*60);
         }
         }
         return result;
         return result;
     }
     }
@@ -617,7 +619,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
                 stream,
                 stream,
                 callId
                 callId
         );
         );
-        List<Object> streamInfoScan = RedisUtil.scan(key);
+        List<Object> streamInfoScan = RedisUtil.scan2(key);
         if (streamInfoScan.size() > 0) {
         if (streamInfoScan.size() > 0) {
             return (StreamInfo) RedisUtil.get((String) streamInfoScan.get(0));
             return (StreamInfo) RedisUtil.get((String) streamInfoScan.get(0));
         }else {
         }else {

+ 7 - 1
src/main/java/com/genersoft/iot/vmp/utils/redis/RedisUtil.java

@@ -881,7 +881,13 @@ public class RedisUtil {
 
 
         return new ArrayList<>(resultKeys);
         return new ArrayList<>(resultKeys);
     }
     }
-
+    public static List<Object> scan2(String query) {
+        if (redisTemplate == null) {
+            redisTemplate = SpringBeanFactory.getBean("redisTemplate");
+        }
+        Set<String> keys = redisTemplate.keys(query);
+        return new ArrayList<>(keys);
+    }
     //    ============================== 消息发送与订阅 ==============================
     //    ============================== 消息发送与订阅 ==============================
     public static void convertAndSend(String channel, JSONObject msg) {
     public static void convertAndSend(String channel, JSONObject msg) {
         if (redisTemplate == null) {
         if (redisTemplate == null) {