648540858 před 1 rokem
rodič
revize
c6dfb63f8f

+ 59 - 0
src/main/java/com/genersoft/iot/vmp/media/bean/ResultForOnPublish.java

@@ -0,0 +1,59 @@
+package com.genersoft.iot.vmp.media.bean;
+
+public class ResultForOnPublish {
+
+    private boolean enable_audio;
+    private boolean enable_mp4;
+    private int mp4_max_second;
+    private String mp4_save_path;
+    private String stream_replace;
+    private Integer modify_stamp;
+
+    public boolean isEnable_audio() {
+        return enable_audio;
+    }
+
+    public void setEnable_audio(boolean enable_audio) {
+        this.enable_audio = enable_audio;
+    }
+
+    public boolean isEnable_mp4() {
+        return enable_mp4;
+    }
+
+    public void setEnable_mp4(boolean enable_mp4) {
+        this.enable_mp4 = enable_mp4;
+    }
+
+    public int getMp4_max_second() {
+        return mp4_max_second;
+    }
+
+    public void setMp4_max_second(int mp4_max_second) {
+        this.mp4_max_second = mp4_max_second;
+    }
+
+    public String getMp4_save_path() {
+        return mp4_save_path;
+    }
+
+    public void setMp4_save_path(String mp4_save_path) {
+        this.mp4_save_path = mp4_save_path;
+    }
+
+    public String getStream_replace() {
+        return stream_replace;
+    }
+
+    public void setStream_replace(String stream_replace) {
+        this.stream_replace = stream_replace;
+    }
+
+    public Integer getModify_stamp() {
+        return modify_stamp;
+    }
+
+    public void setModify_stamp(Integer modify_stamp) {
+        this.modify_stamp = modify_stamp;
+    }
+}

+ 75 - 0
src/main/java/com/genersoft/iot/vmp/media/event/MediaArrivalEvent.java

@@ -0,0 +1,75 @@
+package com.genersoft.iot.vmp.media.event;
+
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * 流到来事件
+ */
+public class MediaArrivalEvent extends ApplicationEvent {
+    public MediaArrivalEvent(Object source) {
+        super(source);
+    }
+
+    public static MediaArrivalEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){
+        MediaArrivalEvent mediaArrivalEvent = new MediaArrivalEvent(source);
+        mediaArrivalEvent.setMediaInfo(MediaInfo.getInstance(hookParam));
+        mediaArrivalEvent.setApp(hookParam.getApp());
+        mediaArrivalEvent.setStream(hookParam.getStream());
+        mediaArrivalEvent.setMediaServer(mediaServer);
+        mediaArrivalEvent.setSchema(hookParam.getSchema());
+        return mediaArrivalEvent;
+    }
+
+    private MediaInfo mediaInfo;
+
+    private String app;
+
+    private String stream;
+
+    private MediaServer mediaServer;
+
+    private String schema;
+
+    public MediaInfo getMediaInfo() {
+        return mediaInfo;
+    }
+
+    public void setMediaInfo(MediaInfo mediaInfo) {
+        this.mediaInfo = mediaInfo;
+    }
+
+    public String getApp() {
+        return app;
+    }
+
+    public void setApp(String app) {
+        this.app = app;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public MediaServer getMediaServer() {
+        return mediaServer;
+    }
+
+    public void setMediaServer(MediaServer mediaServer) {
+        this.mediaServer = mediaServer;
+    }
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+}

+ 65 - 0
src/main/java/com/genersoft/iot/vmp/media/event/MediaDepartureEvent.java

@@ -0,0 +1,65 @@
+package com.genersoft.iot.vmp.media.event;
+
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookListener;
+import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * 流离开事件
+ */
+public class MediaDepartureEvent extends ApplicationEvent {
+    public MediaDepartureEvent(Object source) {
+        super(source);
+    }
+
+    private String app;
+
+    private String stream;
+
+    private MediaServer mediaServer;
+
+    private String schema;
+
+    public static MediaDepartureEvent getInstance(Object source, OnStreamChangedHookParam hookParam, MediaServer mediaServer){
+        MediaDepartureEvent mediaDepartureEven = new MediaDepartureEvent(source);
+        mediaDepartureEven.setApp(hookParam.getApp());
+        mediaDepartureEven.setStream(hookParam.getStream());
+        mediaDepartureEven.setSchema(hookParam.getSchema());
+        mediaDepartureEven.setMediaServer(mediaServer);
+        return mediaDepartureEven;
+    }
+
+    public String getApp() {
+        return app;
+    }
+
+    public void setApp(String app) {
+        this.app = app;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public MediaServer getMediaServer() {
+        return mediaServer;
+    }
+
+    public void setMediaServer(MediaServer mediaServer) {
+        this.mediaServer = mediaServer;
+    }
+
+    public String getSchema() {
+        return schema;
+    }
+
+    public void setSchema(String schema) {
+        this.schema = schema;
+    }
+}

+ 31 - 0
src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java

@@ -8,11 +8,15 @@ import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
 import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent;
 import com.genersoft.iot.vmp.media.event.MediaServerDeleteEvent;
 import com.genersoft.iot.vmp.media.service.IMediaNodeServerService;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
 import com.genersoft.iot.vmp.service.bean.SSRCInfo;
@@ -30,7 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.event.EventListener;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.util.ObjectUtils;
 
@@ -71,6 +77,31 @@ public class MediaServerServiceImpl implements IMediaServerService {
     private ApplicationEventPublisher applicationEventPublisher;
 
 
+    /**
+     * 流到来的处理
+     */
+    @Async("taskExecutor")
+    @org.springframework.context.event.EventListener
+    public void onApplicationEvent(MediaArrivalEvent event) {
+        if ("rtsp".equals(event.getSchema())) {
+            logger.info("流变化:注册 app->{}, stream->{}", event.getApp(), event.getStream());
+            addCount(event.getSeverId());
+        }
+    }
+
+    /**
+     * 流离开的处理
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+        if ("rtsp".equals(event.getSchema())) {
+            logger.info("流变化:注销, app->{}, stream->{}", event.getApp(), event.getStream());
+            removeCount(event.getSeverId());
+        }
+    }
+
+
     /**
      * 初始化
      */

+ 21 - 117
src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java

@@ -19,6 +19,9 @@ import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommander;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.*;
@@ -190,50 +193,7 @@ public class ZLMHttpHookListener {
         if (mediaServer == null) {
             return new HookResultForOnPublish(200, "success");
         }
-        // 推流鉴权的处理
-        if (!"rtp".equals(param.getApp())) {
-            StreamProxyItem stream = streamProxyService.getStreamProxyByAppAndStream(param.getApp(), param.getStream());
-            if (stream != null) {
-                HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
-                result.setEnable_audio(stream.isEnableAudio());
-                result.setEnable_mp4(stream.isEnableMp4());
-                return result;
-            }
-            if (userSetting.getPushAuthority()) {
-                // 对于推流进行鉴权
-                Map<String, String> paramMap = urlParamToMap(param.getParams());
-                // 推流鉴权
-                if (param.getParams() == null) {
-                    logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
-                    return new HookResultForOnPublish(401, "Unauthorized");
-                }
-
-                String sign = paramMap.get("sign");
-                if (sign == null) {
-                    logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
-                    return new HookResultForOnPublish(401, "Unauthorized");
-                }
-                // 推流自定义播放鉴权码
-                String callId = paramMap.get("callId");
-                // 鉴权配置
-                boolean hasAuthority = userService.checkPushAuthority(callId, sign);
-                if (!hasAuthority) {
-                    logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
-                    return new HookResultForOnPublish(401, "Unauthorized");
-                }
-                StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
-                streamAuthorityInfo.setCallId(callId);
-                streamAuthorityInfo.setSign(sign);
-                // 鉴权通过
-                redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), param.getStream(), streamAuthorityInfo);
-            }
-        } else {
-            zlmMediaListManager.sendStreamEvent(param.getApp(), param.getStream(), param.getMediaServerId());
-        }
-
 
-        HookResultForOnPublish result = HookResultForOnPublish.SUCCESS();
-        result.setEnable_audio(true);
         taskExecutor.execute(() -> {
             ZlmHttpHookSubscribe.Event subscribe = this.subscribe.sendNotify(HookType.on_publish, json);
             if (subscribe != null) {
@@ -241,81 +201,16 @@ public class ZLMHttpHookListener {
             }
         });
 
-        // 是否录像
-        if ("rtp".equals(param.getApp())) {
-            result.setEnable_mp4(userSetting.getRecordSip());
-        } else {
-            result.setEnable_mp4(userSetting.isRecordPushLive());
+        ResultForOnPublish resultForOnPublish = mediaService.authenticatePublish(mediaServer, param.getApp(), param.getStream(), param.getParams());
+        if (resultForOnPublish != null) {
+            HookResultForOnPublish successResult = HookResultForOnPublish.getInstance(resultForOnPublish);
+            logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, successResult);
+            return successResult;
+        }else {
+            HookResultForOnPublish fail = HookResultForOnPublish.Fail();
+            logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, fail);
+            return fail;
         }
-        // 国标流
-        if ("rtp".equals(param.getApp())) {
-
-            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, param.getStream());
-
-            // 单端口模式下修改流 ID
-            if (!mediaServer.isRtpEnable() && inviteInfo == null) {
-                String ssrc = String.format("%010d", Long.parseLong(param.getStream(), 16));
-                inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
-                if (inviteInfo != null) {
-                    result.setStream_replace(inviteInfo.getStream());
-                    logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", param.getStream(), inviteInfo.getStream());
-                }
-            }
-
-            // 设置音频信息及录制信息
-            List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, param.getStream());
-            if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
-
-                // 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
-                StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(param);
-                streamAuthorityInfo.setApp(param.getApp());
-                streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
-                streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
-
-                redisCatchStorage.updateStreamAuthorityInfo(param.getApp(), ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
-
-                String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
-                String channelId = ssrcTransactionForAll.get(0).getChannelId();
-                DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
-                if (deviceChannel != null) {
-                    result.setEnable_audio(deviceChannel.isHasAudio());
-                }
-                // 如果是录像下载就设置视频间隔十秒
-                if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
-                    // 获取录像的总时长,然后设置为这个视频的时长
-                    InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, param.getStream());
-                    if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
-                        String startTime = inviteInfoForDownload.getStreamInfo().getStartTime();
-                        String endTime = inviteInfoForDownload.getStreamInfo().getEndTime();
-                        long difference = DateUtil.getDifference(startTime, endTime) / 1000;
-                        result.setMp4_max_second((int) difference);
-                        result.setEnable_mp4(true);
-                        // 设置为2保证得到的mp4的时长是正常的
-                        result.setModify_stamp(2);
-                    }
-                }
-                // 如果是talk对讲,则默认获取声音
-                if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) {
-                    result.setEnable_audio(true);
-                }
-            }
-        } else if (param.getApp().equals("broadcast")) {
-            result.setEnable_audio(true);
-        } else if (param.getApp().equals("talk")) {
-            result.setEnable_audio(true);
-        }
-        if (param.getApp().equalsIgnoreCase("rtp")) {
-            String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + param.getStream();
-            OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
-
-            String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + param.getStream();
-            OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
-            if (otherRtpSendInfo != null || otherPsSendInfo != null) {
-                result.setEnable_mp4(true);
-            }
-        }
-        logger.info("[ZLM HOOK]推流鉴权 响应:{}->{}->>>>{}", param.getMediaServerId(), param, result);
-        return result;
     }
 
 
@@ -326,11 +221,20 @@ public class ZLMHttpHookListener {
     @PostMapping(value = "/on_stream_changed", produces = "application/json;charset=UTF-8")
     public HookResult onStreamChanged(@RequestBody OnStreamChangedHookParam param) {
 
+        MediaServer mediaServer = mediaServerService.getOne(param.getMediaServerId());
+
         if (param.isRegist()) {
             logger.info("[ZLM HOOK] 流注册, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
+            MediaArrivalEvent mediaArrivalEvent = MediaArrivalEvent.getInstance(this, param, mediaServer);
+            applicationEventPublisher.publishEvent(mediaArrivalEvent);
         } else {
             logger.info("[ZLM HOOK] 流注销, {}->{}->{}/{}", param.getMediaServerId(), param.getSchema(), param.getApp(), param.getStream());
+            MediaDepartureEvent mediaArrivalEvent = MediaDepartureEvent.getInstance(this, param, mediaServer);
+            applicationEventPublisher.publishEvent(mediaArrivalEvent);
         }
+        return HookResult.SUCCESS();
+
+
 
         JSONObject json = (JSONObject) JSON.toJSON(param);
         taskExecutor.execute(() -> {

+ 13 - 10
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/StreamAuthorityInfo.java

@@ -1,5 +1,6 @@
 package com.genersoft.iot.vmp.media.zlm.dto;
 
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnPublishHookParam;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
 
@@ -97,21 +98,23 @@ public class StreamAuthorityInfo {
         this.sign = sign;
     }
 
-    public static StreamAuthorityInfo getInstanceByHook(OnPublishHookParam hookParam) {
+    public static StreamAuthorityInfo getInstanceByHook(String app, String stream, String id) {
         StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo();
-        streamAuthorityInfo.setApp(hookParam.getApp());
-        streamAuthorityInfo.setStream(hookParam.getStream());
-        streamAuthorityInfo.setId(hookParam.getId());
+        streamAuthorityInfo.setApp(app);
+        streamAuthorityInfo.setStream(stream);
+        streamAuthorityInfo.setId(id);
         return streamAuthorityInfo;
     }
 
-    public static StreamAuthorityInfo getInstanceByHook(OnStreamChangedHookParam onStreamChangedHookParam) {
+    public static StreamAuthorityInfo getInstanceByHook(MediaArrivalEvent event) {
         StreamAuthorityInfo streamAuthorityInfo = new StreamAuthorityInfo();
-        streamAuthorityInfo.setApp(onStreamChangedHookParam.getApp());
-        streamAuthorityInfo.setStream(onStreamChangedHookParam.getStream());
-        streamAuthorityInfo.setId(onStreamChangedHookParam.getMediaServerId());
-        streamAuthorityInfo.setOriginType(onStreamChangedHookParam.getOriginType());
-        streamAuthorityInfo.setOriginTypeStr(onStreamChangedHookParam.getOriginTypeStr());
+        streamAuthorityInfo.setApp(event.getApp());
+        streamAuthorityInfo.setStream(event.getStream());
+        streamAuthorityInfo.setId(event.getSeverId());
+        if (event.getMediaInfo() != null) {
+            streamAuthorityInfo.setOriginType(event.getMediaInfo().getOriginType());
+        }
+
         return streamAuthorityInfo;
     }
 }

+ 2 - 2
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResult.java

@@ -18,8 +18,8 @@ public class HookResult {
         return new HookResult(0, "success");
     }
 
-    public static HookResult Fail(){
-        return new HookResult(-1, "fail");
+    public static HookResultForOnPublish Fail(){
+        return new HookResultForOnPublish(-1, "fail");
     }
 
     public int getCode() {

+ 13 - 0
src/main/java/com/genersoft/iot/vmp/media/zlm/dto/hook/HookResultForOnPublish.java

@@ -1,5 +1,7 @@
 package com.genersoft.iot.vmp.media.zlm.dto.hook;
 
+import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
+
 public class HookResultForOnPublish extends HookResult{
 
     private boolean enable_audio;
@@ -16,6 +18,17 @@ public class HookResultForOnPublish extends HookResult{
         return new HookResultForOnPublish(0, "success");
     }
 
+    public static HookResultForOnPublish getInstance(ResultForOnPublish resultForOnPublish){
+        HookResultForOnPublish successResult = new HookResultForOnPublish(0, "success");
+        successResult.setEnable_audio(resultForOnPublish.isEnable_audio());
+        successResult.setEnable_mp4(resultForOnPublish.isEnable_mp4());
+        successResult.setModify_stamp(resultForOnPublish.getModify_stamp());
+        successResult.setStream_replace(resultForOnPublish.getStream_replace());
+        successResult.setMp4_max_second(resultForOnPublish.getMp4_max_second());
+        successResult.setMp4_save_path(resultForOnPublish.getMp4_save_path());
+        return successResult;
+    }
+
     public HookResultForOnPublish(int code, String msg) {
         setCode(code);
         setMsg(msg);

+ 2 - 1
src/main/java/com/genersoft/iot/vmp/service/IMediaService.java

@@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service;
 
 import com.genersoft.iot.vmp.common.StreamInfo;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
 
 /**
@@ -47,5 +48,5 @@ public interface IMediaService {
      */
     boolean authenticatePlay(String app, String stream, String callId);
 
-    boolean authenticatePublish(String app, String stream, String callId, String sign);
+    ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params);
 }

+ 34 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/InviteStreamServiceImpl.java

@@ -6,13 +6,18 @@ import com.genersoft.iot.vmp.common.InviteInfo;
 import com.genersoft.iot.vmp.common.InviteSessionStatus;
 import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.common.VideoManagerConstants;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
 import com.genersoft.iot.vmp.service.IInviteStreamService;
 import com.genersoft.iot.vmp.service.bean.ErrorCallback;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
 import com.genersoft.iot.vmp.utils.redis.RedisUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
@@ -31,6 +36,35 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
     @Autowired
     private RedisTemplate<Object, Object> redisTemplate;
 
+    @Autowired
+    private IVideoManagerStorage storage;
+
+    /**
+     * 流到来的处理
+     */
+    @Async("taskExecutor")
+    @org.springframework.context.event.EventListener
+    public void onApplicationEvent(MediaArrivalEvent event) {
+        if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
+
+        }
+    }
+
+    /**
+     * 流离开的处理
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+        if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
+            InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
+            if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
+                removeInviteInfo(inviteInfo);
+                stopPlay(inviteInfo.getDeviceId(), inviteInfo.getChannelId());
+            }
+        }
+    }
+
     @Override
     public void updateInviteInfo(InviteInfo inviteInfo) {
         if (inviteInfo == null || (inviteInfo.getDeviceId() == null || inviteInfo.getChannelId() == null)) {

+ 189 - 1
src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java

@@ -1,22 +1,43 @@
 package com.genersoft.iot.vmp.service.impl;
 
+import com.genersoft.iot.vmp.common.InviteInfo;
+import com.genersoft.iot.vmp.common.InviteSessionType;
 import com.genersoft.iot.vmp.common.StreamInfo;
+import com.genersoft.iot.vmp.common.VideoManagerConstants;
 import com.genersoft.iot.vmp.conf.MediaConfig;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.conf.exception.ControllerException;
+import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
+import com.genersoft.iot.vmp.gb28181.bean.SsrcTransaction;
+import com.genersoft.iot.vmp.gb28181.session.VideoStreamSessionManager;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.ZLMHttpHookListener;
+import com.genersoft.iot.vmp.media.zlm.ZLMMediaListManager;
+import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
+import com.genersoft.iot.vmp.media.zlm.dto.HookType;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
+import com.genersoft.iot.vmp.media.zlm.dto.StreamProxyItem;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResultForOnPublish;
-import com.genersoft.iot.vmp.service.IMediaService;
+import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.IVideoManagerStorage;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
+import com.genersoft.iot.vmp.vmanager.bean.OtherPsSendInfo;
+import com.genersoft.iot.vmp.vmanager.bean.OtherRtpSendInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Service;
 import org.springframework.util.ObjectUtils;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 @Service
 public class MediaServiceImpl implements IMediaService {
@@ -32,6 +53,31 @@ public class MediaServiceImpl implements IMediaService {
     @Autowired
     private MediaConfig mediaConfig;
 
+    @Autowired
+    private IStreamProxyService streamProxyService;
+
+    @Autowired
+    private UserSetting userSetting;
+
+    @Autowired
+    private RedisTemplate<Object, Object> redisTemplate;
+
+    @Autowired
+    private IUserService userService;
+
+    @Autowired
+    private IInviteStreamService inviteStreamService;
+
+    @Autowired
+    private VideoStreamSessionManager sessionManager;
+
+    @Autowired
+    private IVideoManagerStorage storager;
+
+    @Autowired
+    private ZLMMediaListManager zlmMediaListManager;
+
+
 
     @Override
     public StreamInfo getStreamInfoByAppAndStream(MediaServer mediaServerItem, String app, String stream, MediaInfo mediaInfo, String callId) {
@@ -103,4 +149,146 @@ public class MediaServiceImpl implements IMediaService {
         StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
         return (streamAuthorityInfo != null && streamAuthorityInfo.getCallId() != null && !streamAuthorityInfo.getCallId().equals(callId));
     }
+
+    @Override
+    public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
+        // 推流鉴权的处理
+        if (!"rtp".equals(app)) {
+            StreamProxyItem streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
+            if (streamProxyItem != null) {
+                ResultForOnPublish result = new ResultForOnPublish();
+                result.setEnable_audio(streamProxyItem.isEnableAudio());
+                result.setEnable_mp4(streamProxyItem.isEnableMp4());
+                return result;
+            }
+            if (userSetting.getPushAuthority()) {
+                // 对于推流进行鉴权
+                Map<String, String> paramMap = urlParamToMap(params);
+                // 推流鉴权
+                if (params == null) {
+                    logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
+                    throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
+                }
+
+                String sign = paramMap.get("sign");
+                if (sign == null) {
+                    logger.info("推流鉴权失败: 缺少必要参数:sign=md5(user表的pushKey)");
+                    throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
+                }
+                // 推流自定义播放鉴权码
+                String callId = paramMap.get("callId");
+                // 鉴权配置
+                boolean hasAuthority = userService.checkPushAuthority(callId, sign);
+                if (!hasAuthority) {
+                    logger.info("推流鉴权失败: sign 无权限: callId={}. sign={}", callId, sign);
+                    throw new ControllerException(ErrorCode.ERROR401.getCode(), "Unauthorized");
+                }
+                StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
+                streamAuthorityInfo.setCallId(callId);
+                streamAuthorityInfo.setSign(sign);
+                // 鉴权通过
+                redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
+            }
+        } else {
+            zlmMediaListManager.sendStreamEvent(app, stream, mediaServer.getId());
+        }
+
+
+        ResultForOnPublish result = new ResultForOnPublish();
+        result.setEnable_audio(true);
+
+
+        // 是否录像
+        if ("rtp".equals(app)) {
+            result.setEnable_mp4(userSetting.getRecordSip());
+        } else {
+            result.setEnable_mp4(userSetting.isRecordPushLive());
+        }
+        // 国标流
+        if ("rtp".equals(app)) {
+
+            InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
+
+            // 单端口模式下修改流 ID
+            if (!mediaServer.isRtpEnable() && inviteInfo == null) {
+                String ssrc = String.format("%010d", Long.parseLong(stream, 16));
+                inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
+                if (inviteInfo != null) {
+                    result.setStream_replace(inviteInfo.getStream());
+                    logger.info("[ZLM HOOK]推流鉴权 stream: {} 替换为 {}", stream, inviteInfo.getStream());
+                }
+            }
+
+            // 设置音频信息及录制信息
+            List<SsrcTransaction> ssrcTransactionForAll = sessionManager.getSsrcTransactionForAll(null, null, null, stream);
+            if (ssrcTransactionForAll != null && ssrcTransactionForAll.size() == 1) {
+
+                // 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
+                StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
+                streamAuthorityInfo.setApp(app);
+                streamAuthorityInfo.setStream(ssrcTransactionForAll.get(0).getStream());
+                streamAuthorityInfo.setCallId(ssrcTransactionForAll.get(0).getSipTransactionInfo().getCallId());
+
+                redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransactionForAll.get(0).getStream(), streamAuthorityInfo);
+
+                String deviceId = ssrcTransactionForAll.get(0).getDeviceId();
+                String channelId = ssrcTransactionForAll.get(0).getChannelId();
+                DeviceChannel deviceChannel = storager.queryChannel(deviceId, channelId);
+                if (deviceChannel != null) {
+                    result.setEnable_audio(deviceChannel.isHasAudio());
+                }
+                // 如果是录像下载就设置视频间隔十秒
+                if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.DOWNLOAD) {
+                    // 获取录像的总时长,然后设置为这个视频的时长
+                    InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, deviceId, channelId, stream);
+                    if (inviteInfoForDownload != null && inviteInfoForDownload.getStreamInfo() != null) {
+                        String startTime = inviteInfoForDownload.getStreamInfo().getStartTime();
+                        String endTime = inviteInfoForDownload.getStreamInfo().getEndTime();
+                        long difference = DateUtil.getDifference(startTime, endTime) / 1000;
+                        result.setMp4_max_second((int) difference);
+                        result.setEnable_mp4(true);
+                        // 设置为2保证得到的mp4的时长是正常的
+                        result.setModify_stamp(2);
+                    }
+                }
+                // 如果是talk对讲,则默认获取声音
+                if (ssrcTransactionForAll.get(0).getType() == InviteSessionType.TALK) {
+                    result.setEnable_audio(true);
+                }
+            }
+        } else if (app.equals("broadcast")) {
+            result.setEnable_audio(true);
+        } else if (app.equals("talk")) {
+            result.setEnable_audio(true);
+        }
+        if (app.equalsIgnoreCase("rtp")) {
+            String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
+            OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
+
+            String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + stream;
+            OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
+            if (otherRtpSendInfo != null || otherPsSendInfo != null) {
+                result.setEnable_mp4(true);
+            }
+        }
+        return result;
+    }
+
+    private Map<String, String> urlParamToMap(String params) {
+        HashMap<String, String> map = new HashMap<>();
+        if (ObjectUtils.isEmpty(params)) {
+            return map;
+        }
+        String[] paramsArray = params.split("&");
+        if (paramsArray.length == 0) {
+            return map;
+        }
+        for (String param : paramsArray) {
+            String[] paramArray = param.split("=");
+            if (paramArray.length == 2) {
+                map.put(paramArray[0], paramArray[1]);
+            }
+        }
+        return map;
+    }
 }

+ 77 - 4
src/main/java/com/genersoft/iot/vmp/service/impl/PlayServiceImpl.java

@@ -17,17 +17,17 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
 import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
 import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.SendRtpPortManager;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeFactory;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForRecordMp4;
-import com.genersoft.iot.vmp.media.zlm.dto.HookSubscribeForStreamChange;
-import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.HookParam;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnRecordMp4HookParam;
 import com.genersoft.iot.vmp.media.zlm.dto.hook.OnStreamChangedHookParam;
+import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
 import com.genersoft.iot.vmp.service.*;
 import com.genersoft.iot.vmp.service.bean.*;
 import com.genersoft.iot.vmp.service.redisMsg.RedisGbPlayMsgListener;
@@ -43,6 +43,8 @@ import gov.nist.javax.sip.message.SIPResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.util.ObjectUtils;
 
@@ -121,6 +123,77 @@ public class PlayServiceImpl implements IPlayService {
     @Autowired
     private SSRCFactory ssrcFactory;
 
+    /**
+     * 流到来的处理
+     */
+    @Async("taskExecutor")
+    @org.springframework.context.event.EventListener
+    public void onApplicationEvent(MediaArrivalEvent event) {
+        if ("broadcast".equals(event.getApp())) {
+            if (event.getStream().indexOf("_") > 0) {
+                String[] streamArray = event.getStream().split("_");
+                if (streamArray.length == 2) {
+                    String deviceId = streamArray[0];
+                    String channelId = streamArray[1];
+                    Device device = deviceService.getDevice(deviceId);
+                    if (device == null) {
+                        logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
+                        return;
+                    }
+                    if ("broadcast".equals(event.getApp())) {
+                        if (audioBroadcastManager.exit(deviceId, channelId)) {
+                            stopAudioBroadcast(deviceId, channelId);
+                        }
+                        // 开启语音对讲通道
+                        try {
+                            audioBroadcastCmd(device, channelId, event.getMediaServer(),
+                                    event.getApp(), event.getStream(), 60, false, (msg) -> {
+                                        logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
+                                    });
+                        } catch (InvalidArgumentException | ParseException | SipException e) {
+                            logger.error("[命令发送失败] 语音对讲: {}", e.getMessage());
+                        }
+                    }else if ("talk".equals(event.getApp())) {
+                        // 开启语音对讲通道
+                        talkCmd(device, channelId, event.getMediaServer(), event.getStream(), (msg) -> {
+                            logger.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
+                        });
+                    }
+                }
+            }
+        }
+
+
+    }
+
+    /**
+     * 流离开的处理
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+        if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
+            if (event.getStream().indexOf("_") > 0) {
+                String[] streamArray = event.getStream().split("_");
+                if (streamArray.length == 2) {
+                    String deviceId = streamArray[0];
+                    String channelId = streamArray[1];
+                    Device device = deviceService.getDevice(deviceId);
+                    if (device == null) {
+                        logger.info("[语音对讲/喊话] 未找到设备:{}", deviceId);
+                        return;
+                    }
+                    if ("broadcast".equals(event.getApp())) {
+                        stopAudioBroadcast(deviceId, channelId);
+                    }else if ("talk".equals(event.getApp())) {
+                        stopTalk(device, channelId, false);
+                    }
+
+                }
+            }
+        }
+    }
+
 
     @Override
     public SSRCInfo play(MediaServer mediaServerItem, String deviceId, String channelId, String ssrc, ErrorCallback<Object> callback) {

+ 26 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/StreamProxyServiceImpl.java

@@ -9,6 +9,8 @@ import com.genersoft.iot.vmp.conf.UserSetting;
 import com.genersoft.iot.vmp.conf.exception.ControllerException;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
 import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.ZLMServerFactory;
 import com.genersoft.iot.vmp.media.zlm.ZlmHttpHookSubscribe;
@@ -33,7 +35,9 @@ import com.github.pagehelper.PageInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
@@ -102,6 +106,28 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
     @Autowired
     TransactionDefinition transactionDefinition;
 
+    /**
+     * 流到来的处理
+     */
+    @Async("taskExecutor")
+    @org.springframework.context.event.EventListener
+    public void onApplicationEvent(MediaArrivalEvent event) {
+        if ("rtsp".equals(event.getSchema())) {
+            updateStatus(true, event.getApp(), event.getStream());
+        }
+    }
+
+    /**
+     * 流离开的处理
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+        if ("rtsp".equals(event.getSchema())) {
+            updateStatus(true, event.getApp(), event.getStream());
+        }
+    }
+
 
     @Override
     public void save(StreamProxyItem param, GeneralCallback<StreamInfo> callback) {

+ 42 - 0
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java

@@ -10,6 +10,10 @@ import com.genersoft.iot.vmp.gb28181.bean.ParentPlatform;
 import com.genersoft.iot.vmp.gb28181.bean.PlatformCatalog;
 import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
 import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.media.bean.MediaInfo;
+import com.genersoft.iot.vmp.media.event.MediaArrivalEvent;
+import com.genersoft.iot.vmp.media.event.MediaDepartureEvent;
+import com.genersoft.iot.vmp.media.event.MediaServerChangeEvent;
 import com.genersoft.iot.vmp.media.service.IMediaServerService;
 import com.genersoft.iot.vmp.media.zlm.dto.MediaServer;
 import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
@@ -28,7 +32,9 @@ import com.github.pagehelper.PageInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
 import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.TransactionDefinition;
 import org.springframework.transaction.TransactionStatus;
@@ -85,6 +91,42 @@ public class StreamPushServiceImpl implements IStreamPushService {
     @Autowired
     private MediaConfig mediaConfig;
 
+    /**
+     * 流到来的处理
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaArrivalEvent event) {
+        MediaInfo mediaInfo = event.getMediaInfo();
+        if (mediaInfo == null) {
+            return;
+        }
+        if (mediaInfo.getOriginType() != OriginType.RTMP_PUSH.ordinal()
+                && mediaInfo.getOriginType() != OriginType.RTSP_PUSH.ordinal()
+                && mediaInfo.getOriginType() != OriginType.RTC_PUSH.ordinal()) {
+            return;
+        }
+
+        StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(event.getApp(), event.getStream());
+        if (streamAuthorityInfo == null) {
+            streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(event);
+        } else {
+            streamAuthorityInfo.setOriginType(mediaInfo.getOriginType());
+        }
+        redisCatchStorage.updateStreamAuthorityInfo(event.getApp(), event.getStream(), streamAuthorityInfo);
+
+
+    }
+
+    /**
+     * 流离开的处理
+     */
+    @Async("taskExecutor")
+    @EventListener
+    public void onApplicationEvent(MediaDepartureEvent event) {
+
+    }
+
 
     private List<StreamPushItem> handleJSON(List<StreamInfo> streamInfoList) {
         if (streamInfoList == null || streamInfoList.isEmpty()) {