Преглед изворни кода

Merge remote-tracking branch 'origin/wvp-28181-2.0' into wvp-28181-2.0

648540858 пре 4 година
родитељ
комит
dbcd050c66

+ 21 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java

@@ -1,5 +1,6 @@
 package com.genersoft.iot.vmp.gb28181.transmit.cmd;
 
+import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
 import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookSubscribe;
@@ -121,6 +122,26 @@ public interface ISIPCommander {
 	void streamByeCmd(String deviceId, String channelId, SipSubscribe.Event okEvent);
 	void streamByeCmd(String deviceId, String channelId);
 
+	/**
+	 * 回放暂停
+	 */
+	void playPauseCmd(Device device, StreamInfo streamInfo);
+
+	/**
+	 * 回放恢复
+	 */
+	void playResumeCmd(Device device, StreamInfo streamInfo);
+
+	/**
+	 * 回放拖动播放
+	 */
+	void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime);
+
+	/**
+	 * 回放倍速播放
+	 */
+	void playSpeedCmd(Device device, StreamInfo streamInfo, String speed);
+
 	/**
 	 * 语音广播
 	 * 

+ 53 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/SIPRequestHeaderProvider.java

@@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd;
 import java.text.ParseException;
 import java.util.ArrayList;
 
+import javax.sip.Dialog;
 import javax.sip.InvalidArgumentException;
 import javax.sip.PeerUnavailableException;
 import javax.sip.SipFactory;
@@ -11,6 +12,9 @@ import javax.sip.address.SipURI;
 import javax.sip.header.*;
 import javax.sip.message.Request;
 
+import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
+import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -30,6 +34,9 @@ public class SIPRequestHeaderProvider {
 	
 	@Autowired
 	private SipFactory sipFactory;
+
+	@Autowired
+	private VideoStreamSessionManager streamSession;
 	
 	public Request createMessageRequest(Device device, String content, String viaTag, String fromTag, String toTag, CallIdHeader callIdHeader) throws ParseException, InvalidArgumentException, PeerUnavailableException {
 		Request request = null;
@@ -210,4 +217,50 @@ public class SIPRequestHeaderProvider {
 		request.setContent(content, contentTypeHeader);
 		return request;
 	}
+
+	public Request createInfoRequest(Device device, StreamInfo streamInfo, String content)
+			throws PeerUnavailableException, ParseException, InvalidArgumentException {
+		Request request = null;
+		Dialog dialog = streamSession.getDialog(streamInfo.getDeviceID(), streamInfo.getChannelId());
+
+		SipURI requestLine = sipFactory.createAddressFactory().createSipURI(device.getDeviceId(),
+				device.getHostAddress());
+		// via
+		ArrayList<ViaHeader> viaHeaders = new ArrayList<ViaHeader>();
+		ViaHeader viaHeader = sipFactory.createHeaderFactory().createViaHeader(device.getIp(), device.getPort(),
+				device.getTransport(), null);
+		viaHeader.setRPort();
+		viaHeaders.add(viaHeader);
+		// from
+		SipURI fromSipURI = sipFactory.createAddressFactory().createSipURI(sipConfig.getId(),
+				sipConfig.getDomain());
+		Address fromAddress = sipFactory.createAddressFactory().createAddress(fromSipURI);
+		FromHeader fromHeader = sipFactory.createHeaderFactory().createFromHeader(fromAddress, dialog.getLocalTag());
+		// to
+		SipURI toSipURI = sipFactory.createAddressFactory().createSipURI(streamInfo.getChannelId(),
+				sipConfig.getDomain());
+		Address toAddress = sipFactory.createAddressFactory().createAddress(toSipURI);
+		ToHeader toHeader = sipFactory.createHeaderFactory().createToHeader(toAddress, dialog.getRemoteTag());
+
+		// callid
+		CallIdHeader callIdHeader = dialog.getCallId();
+
+		// Forwards
+		MaxForwardsHeader maxForwards = sipFactory.createHeaderFactory().createMaxForwardsHeader(70);
+
+		// ceq
+		CSeqHeader cSeqHeader = sipFactory.createHeaderFactory()
+				.createCSeqHeader(InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()), Request.INFO);
+
+		request = sipFactory.createMessageFactory().createRequest(requestLine, Request.INFO, callIdHeader, cSeqHeader,
+				fromHeader, toHeader, viaHeaders, maxForwards);
+		Address concatAddress = sipFactory.createAddressFactory().createAddress(sipFactory.createAddressFactory()
+				.createSipURI(sipConfig.getId(), sipConfig.getIp() + ":" + sipConfig.getPort()));
+		request.addHeader(sipFactory.createHeaderFactory().createContactHeader(concatAddress));
+
+		ContentTypeHeader contentTypeHeader = sipFactory.createHeaderFactory().createContentTypeHeader("Application",
+				"MANSRTSP");
+		request.setContent(content, contentTypeHeader);
+		return request;
+	}
 }

+ 108 - 0
src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java

@@ -1,6 +1,7 @@
 package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
 
 import com.alibaba.fastjson.JSONObject;
+import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.conf.SipConfig;
 import com.genersoft.iot.vmp.conf.UserSetup;
 import com.genersoft.iot.vmp.gb28181.bean.Device;
@@ -17,6 +18,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.storager.IVideoManagerStorager;
+import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
 import gov.nist.javax.sip.SipProviderImpl;
 import gov.nist.javax.sip.SipStackImpl;
 import gov.nist.javax.sip.message.SIPRequest;
@@ -1543,4 +1545,110 @@ public class SIPCommander implements ISIPCommander {
 		clientTransaction.sendRequest();
 		return clientTransaction;
 	}
+
+	/**
+	 * 回放暂停
+	 */
+	@Override
+	public void playPauseCmd(Device device, StreamInfo streamInfo) {
+		try {
+
+			StringBuffer content = new StringBuffer(200);
+			content.append("PAUSE RTSP/1.0\r\n");
+			content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+			content.append("PauseTime: now\r\n");
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+			logger.info(request.toString());
+			ClientTransaction clientTransaction = null;
+			if ("TCP".equals(device.getTransport())) {
+				clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+			} else if ("UDP".equals(device.getTransport())) {
+				clientTransaction = udpSipProvider.getNewClientTransaction(request);
+			}
+			if (clientTransaction != null) {
+				clientTransaction.sendRequest();
+			}
+
+		} catch (SipException | ParseException | InvalidArgumentException e) {
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * 回放恢复
+	 */
+	@Override
+	public void playResumeCmd(Device device, StreamInfo streamInfo) {
+		try {
+			StringBuffer content = new StringBuffer(200);
+			content.append("PLAY RTSP/1.0\r\n");
+			content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+			content.append("Range: npt=now-\r\n");
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+			logger.info(request.toString());
+			ClientTransaction clientTransaction = null;
+			if ("TCP".equals(device.getTransport())) {
+				clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+			} else if ("UDP".equals(device.getTransport())) {
+				clientTransaction = udpSipProvider.getNewClientTransaction(request);
+			}
+
+			clientTransaction.sendRequest();
+
+		} catch (SipException | ParseException | InvalidArgumentException e) {
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * 回放拖动播放
+	 */
+	@Override
+	public void playSeekCmd(Device device, StreamInfo streamInfo, long seekTime) {
+		try {
+			StringBuffer content = new StringBuffer(200);
+			content.append("PLAY RTSP/1.0\r\n");
+			content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+			content.append("Range: npt=" + seekTime + "-\r\n");
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+			logger.info(request.toString());
+			ClientTransaction clientTransaction = null;
+			if ("TCP".equals(device.getTransport())) {
+				clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+			} else if ("UDP".equals(device.getTransport())) {
+				clientTransaction = udpSipProvider.getNewClientTransaction(request);
+			}
+
+			clientTransaction.sendRequest();
+
+		} catch (SipException | ParseException | InvalidArgumentException e) {
+			e.printStackTrace();
+		}
+	}
+
+	/**
+	 * 回放倍速播放
+	 */
+	@Override
+	public void playSpeedCmd(Device device, StreamInfo streamInfo, String speed) {
+		try {
+			StringBuffer content = new StringBuffer(200);
+			content.append("PLAY RTSP/1.0\r\n");
+			content.append("CSeq: " + InfoCseqCache.CSEQCACHE.get(streamInfo.getStreamId()) + "\r\n");
+			content.append("Scale: " + speed + ".000000\r\n");
+			Request request = headerProvider.createInfoRequest(device, streamInfo, content.toString());
+			logger.info(request.toString());
+			ClientTransaction clientTransaction = null;
+			if ("TCP".equals(device.getTransport())) {
+				clientTransaction = tcpSipProvider.getNewClientTransaction(request);
+			} else if ("UDP".equals(device.getTransport())) {
+				clientTransaction = udpSipProvider.getNewClientTransaction(request);
+			}
+
+			clientTransaction.sendRequest();
+
+		} catch (SipException | ParseException | InvalidArgumentException e) {
+			e.printStackTrace();
+		}
+	}
 }

+ 95 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/playback/PlaybackController.java

@@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.service.IMediaServerService;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
 import com.genersoft.iot.vmp.service.IPlayService;
+import com.genersoft.iot.vmp.vmanager.gb28181.session.InfoCseqCache;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
@@ -152,4 +153,98 @@ public class PlaybackController {
 			return new ResponseEntity<String>(HttpStatus.INTERNAL_SERVER_ERROR);
 		}
 	}
+
+	@ApiOperation("回放暂停")
+	@ApiImplicitParams({
+			@ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class),
+	})
+	@GetMapping("/pause/{streamId}")
+	public ResponseEntity<String> playPause(@PathVariable String streamId) {
+		logger.info("playPause: "+streamId);
+		JSONObject json = new JSONObject();
+		StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
+		if (null == streamInfo) {
+			json.put("msg", "streamId不存在");
+			logger.warn("streamId不存在!");
+			return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
+		}
+		setCseq(streamId);
+		Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
+		cmder.playPauseCmd(device, streamInfo);
+		json.put("msg", "ok");
+		return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
+	}
+
+	@ApiOperation("回放恢复")
+	@ApiImplicitParams({
+			@ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class),
+	})
+	@GetMapping("/resume/{streamId}")
+	public ResponseEntity<String> playResume(@PathVariable String streamId) {
+		logger.info("playResume: "+streamId);
+		JSONObject json = new JSONObject();
+		StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
+		if (null == streamInfo) {
+			json.put("msg", "streamId不存在");
+			logger.warn("streamId不存在!");
+			return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
+		}
+		setCseq(streamId);
+		Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
+		cmder.playResumeCmd(device, streamInfo);
+		json.put("msg", "ok");
+		return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
+	}
+
+	@ApiOperation("回放拖动播放")
+	@ApiImplicitParams({
+			@ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class),
+			@ApiImplicitParam(name = "seekTime", value = "拖动偏移量,单位s", dataTypeClass = Long.class),
+	})
+	@GetMapping("/seek/{streamId}/{seekTime}")
+	public ResponseEntity<String> playSeek(@PathVariable String streamId, @PathVariable long seekTime) {
+		logger.info("playSeek: "+streamId+", "+seekTime);
+		JSONObject json = new JSONObject();
+		StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
+		if (null == streamInfo) {
+			json.put("msg", "streamId不存在");
+			logger.warn("streamId不存在!");
+			return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
+		}
+		setCseq(streamId);
+		Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
+		cmder.playSeekCmd(device, streamInfo, seekTime);
+		json.put("msg", "ok");
+		return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
+	}
+
+	@ApiOperation("回放倍速播放")
+	@ApiImplicitParams({
+			@ApiImplicitParam(name = "streamId", value = "回放流ID", dataTypeClass = String.class),
+			@ApiImplicitParam(name = "speed", value = "倍速 1、2、4", dataTypeClass = String.class),
+	})
+	@GetMapping("/speed/{streamId}/{speed}")
+	public ResponseEntity<String> playSpeed(@PathVariable String streamId, @PathVariable String speed) {
+		logger.info("playSpeed: "+streamId+", "+speed);
+		JSONObject json = new JSONObject();
+		StreamInfo streamInfo = redisCatchStorage.queryPlaybackByStreamId(streamId);
+		if (null == streamInfo) {
+			json.put("msg", "streamId不存在");
+			logger.warn("streamId不存在!");
+			return new ResponseEntity<String>(json.toString(), HttpStatus.BAD_REQUEST);
+		}
+		setCseq(streamId);
+		Device device = storager.queryVideoDevice(streamInfo.getDeviceID());
+		cmder.playSpeedCmd(device, streamInfo, speed);
+		json.put("msg", "ok");
+		return new ResponseEntity<String>(json.toString(), HttpStatus.OK);
+	}
+
+	public void setCseq(String streamId) {
+		if (InfoCseqCache.CSEQCACHE.containsKey(streamId)) {
+			InfoCseqCache.CSEQCACHE.put(streamId, InfoCseqCache.CSEQCACHE.get(streamId) + 1);
+		} else {
+			InfoCseqCache.CSEQCACHE.put(streamId, 2L);
+		}
+	}
 }

+ 14 - 0
src/main/java/com/genersoft/iot/vmp/vmanager/gb28181/session/InfoCseqCache.java

@@ -0,0 +1,14 @@
+package com.genersoft.iot.vmp.vmanager.gb28181.session;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @ClassName: InfoCseqCache
+ * @Description: INFO类型的Sip中cseq的缓存
+ */
+public class InfoCseqCache {
+
+    public static Map<String, Long> CSEQCACHE = new ConcurrentHashMap<>();
+
+}