Просмотр исходного кода

优化目录更新,更新失败时只更新收到的内容不重置所有通道和自动拉流

648540858 3 лет назад
Родитель
Сommit
c19ad94c3e

+ 10 - 4
src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataCatch.java

@@ -109,12 +109,18 @@ public class CatalogDataCatch {
 
         for (String deviceId : keys) {
             CatalogData catalogData = data.get(deviceId);
-            if ( catalogData.getLastTime().isBefore(instantBefore5S)) { // 超过五秒收不到消息任务超时, 只更新这一部分数据
+            if ( catalogData.getLastTime().isBefore(instantBefore5S)) {
+                // 超过五秒收不到消息任务超时, 只更新这一部分数据, 收到数据与声明的总数一致,则重置通道数据,数据不全则只对收到的数据做更新操作
                 if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.runIng)) {
-                    storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
+                    if (catalogData.getTotal() == catalogData.getChannelList().size()) {
+                        storager.resetChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
+                    }else {
+                        storager.updateChannels(catalogData.getDevice().getDeviceId(), catalogData.getChannelList());
+                    }
+                    String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
+                    catalogData.setErrorMsg(errorMsg);
                     if (catalogData.getTotal() != catalogData.getChannelList().size()) {
-                        String errorMsg = "更新成功,共" + catalogData.getTotal() + "条,已更新" + catalogData.getChannelList().size() + "条";
-                        catalogData.setErrorMsg(errorMsg);
+
                     }
                 }else if (catalogData.getStatus().equals(CatalogData.CatalogDataStatus.ready)) {
                     String errorMsg = "同步失败,等待回复超时";

+ 3 - 62
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java

@@ -3,7 +3,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request;
 import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.SIPSender;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
-import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.message.SIPRequest;
 import gov.nist.javax.sip.message.SIPResponse;
 import org.apache.commons.lang3.ArrayUtils;
@@ -14,19 +13,17 @@ import org.dom4j.io.SAXReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
 
 import javax.sip.*;
 import javax.sip.address.Address;
-import javax.sip.address.AddressFactory;
 import javax.sip.address.SipURI;
-import javax.sip.header.*;
+import javax.sip.header.ContentTypeHeader;
+import javax.sip.header.ExpiresHeader;
+import javax.sip.header.HeaderFactory;
 import javax.sip.message.MessageFactory;
 import javax.sip.message.Request;
 import javax.sip.message.Response;
 import java.io.ByteArrayInputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -44,15 +41,6 @@ public abstract class SIPRequestProcessorParent {
 	@Autowired
 	private SIPSender sipSender;
 
-	public AddressFactory getAddressFactory() {
-		try {
-			return SipFactory.getInstance().createAddressFactory();
-		} catch (PeerUnavailableException e) {
-			e.printStackTrace();
-		}
-		return null;
-	}
-
 	public HeaderFactory getHeaderFactory() {
 		try {
 			return SipFactory.getInstance().createHeaderFactory();
@@ -93,53 +81,6 @@ public abstract class SIPRequestProcessorParent {
 		return responseAck(sipRequest, statusCode, msg, null);
 	}
 
-//	public SIPResponse responseAck(ServerTransaction serverTransaction, int statusCode, String msg, ResponseAckExtraParam responseAckExtraParam) throws SipException, InvalidArgumentException, ParseException {
-//		if (serverTransaction == null) {
-//			logger.warn("[回复消息] ServerTransaction 为null");
-//			return null;
-//		}
-//		ToHeader toHeader = (ToHeader) serverTransaction.getRequest().getHeader(ToHeader.NAME);
-//		if (toHeader.getTag() == null) {
-//			toHeader.setTag(SipUtils.getNewTag());
-//		}
-//		SIPResponse response = (SIPResponse)getMessageFactory().createResponse(statusCode, serverTransaction.getRequest());
-//		if (msg != null) {
-//			response.setReasonPhrase(msg);
-//		}
-//		if (responseAckExtraParam != null) {
-//			if (responseAckExtraParam.sipURI != null && serverTransaction.getRequest().getMethod().equals(Request.INVITE)) {
-//				logger.debug("responseSdpAck SipURI: {}:{}", responseAckExtraParam.sipURI.getHost(), responseAckExtraParam.sipURI.getPort());
-//				Address concatAddress = SipFactory.getInstance().createAddressFactory().createAddress(
-//						SipFactory.getInstance().createAddressFactory().createSipURI(responseAckExtraParam.sipURI.getUser(),  responseAckExtraParam.sipURI.getHost()+":"+responseAckExtraParam.sipURI.getPort()
-//						));
-//				response.addHeader(SipFactory.getInstance().createHeaderFactory().createContactHeader(concatAddress));
-//			}
-//			if (responseAckExtraParam.contentTypeHeader != null) {
-//				response.setContent(responseAckExtraParam.content, responseAckExtraParam.contentTypeHeader);
-//			}
-//
-//			if (serverTransaction.getRequest().getMethod().equals(Request.SUBSCRIBE)) {
-//				if (responseAckExtraParam.expires == -1) {
-//					logger.error("[参数不全] 2xx的SUBSCRIBE回复,必须设置Expires header");
-//				}else {
-//					ExpiresHeader expiresHeader = SipFactory.getInstance().createHeaderFactory().createExpiresHeader(responseAckExtraParam.expires);
-//					response.addHeader(expiresHeader);
-//				}
-//			}
-//		}else {
-//			if (serverTransaction.getRequest().getMethod().equals(Request.SUBSCRIBE)) {
-//				logger.error("[参数不全] 2xx的SUBSCRIBE回复,必须设置Expires header");
-//			}
-//		}
-//		serverTransaction.sendResponse(response);
-//		if (statusCode >= 200 && !"NOTIFY".equalsIgnoreCase(serverTransaction.getRequest().getMethod())) {
-//			if (serverTransaction.getDialog() != null) {
-//				serverTransaction.getDialog().delete();
-//			}
-//		}
-//		return response;
-//	}
-
 	public SIPResponse responseAck(SIPRequest sipRequest, int statusCode, String msg, ResponseAckExtraParam responseAckExtraParam) throws SipException, InvalidArgumentException, ParseException {
 		if (sipRequest.getToHeader().getTag() == null) {
 			sipRequest.getToHeader().setTag(SipUtils.getNewTag());

+ 1 - 1
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java

@@ -228,7 +228,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
                     }
                     return;
                 } else {
-                    logger.info("通道不存在,返回404");
+                    logger.info("通道不存在,返回404: {}", channelId);
                     try {
                         // 通道不存在,发404,资源不存在
                         responseAck(request, Response.NOT_FOUND);

Разница между файлами не показана из-за своего большого размера
+ 631 - 619
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java


+ 36 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java

@@ -0,0 +1,36 @@
+package com.genersoft.iot.vmp.media.zlm.dto.hook;
+
+public class HookResult {
+
+    private int code;
+    private String msg;
+
+
+    public HookResult() {
+    }
+
+    public HookResult(int code, String msg) {
+        this.code = code;
+        this.msg = msg;
+    }
+
+    public static HookResult SUCCESS(){
+        return new HookResult(0, "success");
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+}

+ 44 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java

@@ -0,0 +1,44 @@
+package com.genersoft.iot.vmp.media.zlm.dto.hook;
+
+public class HookResultForOnPublish extends HookResult{
+
+    private boolean enable_audio;
+    private boolean enable_mp4;
+    private int mp4_max_second;
+
+    public HookResultForOnPublish() {
+    }
+
+    public static HookResultForOnPublish SUCCESS(){
+        return new HookResultForOnPublish(0, "success");
+    }
+
+    public HookResultForOnPublish(int code, String msg) {
+        setCode(code);
+        setMsg(msg);
+    }
+
+    public boolean isEnable_audio() {
+        return enable_audio;
+    }
+
+    public void setEnable_audio(boolean enable_audio) {
+        this.enable_audio = enable_audio;
+    }
+
+    public boolean isEnable_mp4() {
+        return enable_mp4;
+    }
+
+    public void setEnable_mp4(boolean enable_mp4) {
+        this.enable_mp4 = enable_mp4;
+    }
+
+    public int getMp4_max_second() {
+        return mp4_max_second;
+    }
+
+    public void setMp4_max_second(int mp4_max_second) {
+        this.mp4_max_second = mp4_max_second;
+    }
+}

+ 4 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/GbStreamServiceImpl.java

@@ -152,6 +152,10 @@ public class GbStreamServiceImpl implements IGbStreamService {
 
     @Override
     public void sendCatalogMsg(GbStream gbStream, String type) {
+        if (gbStream == null || type == null) {
+            logger.warn("[发送目录订阅]类型:流信息或类型为NULL");
+            return;
+        }
         List<GbStream> gbStreams = new ArrayList<>();
         if (gbStream.getGbId() != null) {
             gbStreams.add(gbStream);

+ 3 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -184,7 +184,9 @@ public class StreamPushServiceImpl implements IStreamPushService {
     @Override
     public boolean stop(String app, String streamId) {
         StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
-        gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
+        if (streamPushItem != null) {
+            gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
+        }
 
         platformGbStreamMapper.delByAppAndStream(app, streamId);
         gbStreamMapper.del(app, streamId);

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/storager/IVideoManagerStorage.java

@@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.storager;
 
 import com.genersoft.iot.vmp.gb28181.bean.*;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
-import com.genersoft.iot.vmp.media.zlm.dto.StreamPushItem;
 import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
 import com.genersoft.iot.vmp.storager.dao.dto.ChannelSourceInfo;
 import com.genersoft.iot.vmp.vmanager.gb28181.platform.bean.ChannelReduce;
@@ -324,6 +323,8 @@ public interface IVideoManagerStorage {
 	 */
 	boolean resetChannels(String deviceId, List<DeviceChannel> deviceChannelList);
 
+	boolean updateChannels(String deviceId, List<DeviceChannel> deviceChannelList);
+
 	/**
 	 * 获取目录信息
 	 * @param platformId

+ 113 - 0
src/main/java/com/genersoft/iot/vmp/storager/impl/VideoManagerStorageImpl.java

@@ -194,6 +194,119 @@ public class VideoManagerStorageImpl implements IVideoManagerStorage {
 
 	}
 
+
+	@Override
+	public boolean updateChannels(String deviceId, List<DeviceChannel> deviceChannelList) {
+		if (CollectionUtils.isEmpty(deviceChannelList)) {
+			return false;
+		}
+		List<DeviceChannel> allChannels = deviceChannelMapper.queryAllChannels(deviceId);
+		Map<String,DeviceChannel> allChannelMap = new ConcurrentHashMap<>();
+		if (allChannels.size() > 0) {
+			for (DeviceChannel deviceChannel : allChannels) {
+				allChannelMap.put(deviceChannel.getChannelId(), deviceChannel);
+			}
+		}
+		List<DeviceChannel> addChannels = new ArrayList<>();
+		List<DeviceChannel> updateChannels = new ArrayList<>();
+
+
+		TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
+		// 数据去重
+		StringBuilder stringBuilder = new StringBuilder();
+		Map<String, Integer> subContMap = new HashMap<>();
+		if (deviceChannelList.size() > 0) {
+			// 数据去重
+			Set<String> gbIdSet = new HashSet<>();
+			for (DeviceChannel deviceChannel : deviceChannelList) {
+				if (!gbIdSet.contains(deviceChannel.getChannelId())) {
+					gbIdSet.add(deviceChannel.getChannelId());
+					if (allChannelMap.containsKey(deviceChannel.getChannelId())) {
+						deviceChannel.setStreamId(allChannelMap.get(deviceChannel.getChannelId()).getStreamId());
+						deviceChannel.setHasAudio(allChannelMap.get(deviceChannel.getChannelId()).isHasAudio());
+						updateChannels.add(deviceChannel);
+					}else {
+						addChannels.add(deviceChannel);
+					}
+					if (!ObjectUtils.isEmpty(deviceChannel.getParentId())) {
+						if (subContMap.get(deviceChannel.getParentId()) == null) {
+							subContMap.put(deviceChannel.getParentId(), 1);
+						}else {
+							Integer count = subContMap.get(deviceChannel.getParentId());
+							subContMap.put(deviceChannel.getParentId(), count++);
+						}
+					}
+				}else {
+					stringBuilder.append(deviceChannel.getChannelId()).append(",");
+				}
+			}
+			if (addChannels.size() > 0) {
+				for (DeviceChannel channel : addChannels) {
+					if (subContMap.get(channel.getChannelId()) != null){
+						channel.setSubCount(subContMap.get(channel.getChannelId()));
+					}
+				}
+			}
+			if (updateChannels.size() > 0) {
+				for (DeviceChannel channel : updateChannels) {
+					if (subContMap.get(channel.getChannelId()) != null){
+						channel.setSubCount(subContMap.get(channel.getChannelId()));
+					}
+				}
+			}
+
+		}
+		if (stringBuilder.length() > 0) {
+			logger.info("[目录查询]收到的数据存在重复: {}" , stringBuilder);
+		}
+		if(CollectionUtils.isEmpty(updateChannels) && CollectionUtils.isEmpty(addChannels) ){
+			logger.info("通道更新,数据为空={}" , deviceChannelList);
+			return false;
+		}
+		try {
+			int limitCount = 300;
+			boolean result = false;
+			if (addChannels.size() > 0) {
+				if (addChannels.size() > limitCount) {
+					for (int i = 0; i < addChannels.size(); i += limitCount) {
+						int toIndex = i + limitCount;
+						if (i + limitCount > addChannels.size()) {
+							toIndex = addChannels.size();
+						}
+						result = result || deviceChannelMapper.batchAdd(addChannels.subList(i, toIndex)) < 0;
+					}
+				}else {
+					result = result || deviceChannelMapper.batchAdd(addChannels) < 0;
+				}
+			}
+			if (updateChannels.size() > 0) {
+				if (updateChannels.size() > limitCount) {
+					for (int i = 0; i < updateChannels.size(); i += limitCount) {
+						int toIndex = i + limitCount;
+						if (i + limitCount > updateChannels.size()) {
+							toIndex = updateChannels.size();
+						}
+						result = result || deviceChannelMapper.batchUpdate(updateChannels.subList(i, toIndex)) < 0;
+					}
+				}else {
+					result = result || deviceChannelMapper.batchUpdate(updateChannels) < 0;
+				}
+			}
+			if (result) {
+				//事务回滚
+				dataSourceTransactionManager.rollback(transactionStatus);
+			}else {
+				//手动提交
+				dataSourceTransactionManager.commit(transactionStatus);
+			}
+			return true;
+		}catch (Exception e) {
+			e.printStackTrace();
+			dataSourceTransactionManager.rollback(transactionStatus);
+			return false;
+		}
+	}
+
 	@Override
 	public void deviceChannelOnline(String deviceId, String channelId) {
 		deviceChannelMapper.online(deviceId, channelId);

+ 1 - 5
src/main/java/com/genersoft/iot/vmp/vmanager/user/UserController.java

@@ -11,17 +11,13 @@ import com.genersoft.iot.vmp.utils.DateUtil;
 import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
 import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
 import com.github.pagehelper.PageInfo;
-
 import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import io.swagger.v3.oas.annotations.tags.Tag;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
 import org.springframework.security.authentication.AuthenticationManager;
 import org.springframework.util.DigestUtils;
 import org.springframework.util.ObjectUtils;
-import org.springframework.util.StringUtils;
 import org.springframework.web.bind.annotation.*;
 
 import javax.security.sasl.AuthenticationException;
@@ -90,7 +86,7 @@ public class UserController {
 
 
     @PostMapping("/add")
-    @Operation(summary = "停止视频回放")
+    @Operation(summary = "添加用户")
     @Parameter(name = "username", description = "用户名", required = true)
     @Parameter(name = "password", description = "密码(未md5加密的密码)", required = true)
     @Parameter(name = "roleId", description = "角色ID", required = true)