Parcourir la source

直接返回mp4格式文件路径-推流成功版本

xujunwei il y a 5 mois
Parent
commit
df4e36ab20
1 fichiers modifiés avec 228 ajouts et 26 suppressions
  1. 228 26
      app.py

+ 228 - 26
app.py

@@ -376,17 +376,20 @@ class StreamParams(BaseModel):
     stream_url: 推流地址(如rtmp推流地址)
     fps: 输出帧率
     bitrate: 输出码率
+    save_local: 是否保存到本地文件(用于调试)
     其他参数同 predict
     """
     model: str = "yolov12m.pt"
     source: str = None
     stream_url: str = None
     fps: int = 25
-    bitrate: str = "2000k"
+    bitrate: str = "6000k"  # 提高默认码率
     conf: float = 0.25
     iou: Optional[float] = 0.7
     imgsz: int = 640
     device: str = ""
+    save_local: bool = False  # 是否保存到本地文件
+    skip_connectivity_test: bool = True  # 是否跳过连通性测试
     # 可根据需要补充更多参数
 
 def yolov12_stream_worker(params_dict, task_id):
@@ -419,7 +422,9 @@ def yolov12_stream_worker(params_dict, task_id):
 
     # 全局变量用于存储进程引用
     ffmpeg_process = None
-    
+    max_retries = 3
+    retry_count = 0
+
     def cleanup_process():
         """清理ffmpeg进程"""
         nonlocal ffmpeg_process
@@ -432,7 +437,126 @@ def yolov12_stream_worker(params_dict, task_id):
             except Exception as e:
                 print(f"清理ffmpeg进程时出错: {e}")
 
+    def start_ffmpeg():
+        """启动ffmpeg进程"""
+        nonlocal ffmpeg_process, retry_count, ffmpeg_cmd
+        try:
+            print(f"任务 {task_id} 启动ffmpeg命令: {' '.join(ffmpeg_cmd)}")
+            ffmpeg_process = subprocess.Popen(
+                ffmpeg_cmd,
+                stdin=subprocess.PIPE,
+                stdout=subprocess.PIPE,
+                stderr=subprocess.PIPE,
+                bufsize=0
+            )
+
+            # 等待ffmpeg启动
+            time.sleep(1)
+
+            if ffmpeg_process.poll() is not None:
+                # ffmpeg进程异常退出
+                stderr_output = ffmpeg_process.stderr.read().decode() if ffmpeg_process.stderr else "未知错误"
+                raise Exception(f"ffmpeg启动失败: {stderr_output}")
+
+            retry_count = 0  # 重置重试计数
+            return True
+
+        except Exception as e:
+            print(f"任务 {task_id} 启动ffmpeg失败: {e}")
+            retry_count += 1
+
+            # 如果是第一次失败且当前使用的是简单配置,尝试切换到高级配置
+            if retry_count == 1 and ffmpeg_cmd == simple_ffmpeg_cmd:
+                print(f"任务 {task_id} 简单配置失败,尝试使用高级配置")
+                ffmpeg_cmd = advanced_ffmpeg_cmd
+                return start_ffmpeg()
+
+            if retry_count < max_retries:
+                print(f"任务 {task_id} 尝试重试 ({retry_count}/{max_retries})")
+                time.sleep(2)  # 等待2秒后重试
+                return start_ffmpeg()
+            else:
+                print(f"任务 {task_id} ffmpeg启动失败,已达到最大重试次数")
+                return False
+
     try:
+        # 测试推流地址连通性
+        def test_stream_url():
+            """测试推流地址是否可达"""
+            try:
+                print(f"任务 {task_id} 开始测试推流地址: {stream_url}")
+
+                # 解析RTMP URL
+                if stream_url.startswith('rtmp://'):
+                    # 提取服务器地址和端口
+                    url_parts = stream_url.replace('rtmp://', '').split('/')
+                    server_part = url_parts[0]
+                    server_host = server_part.split(':')[0] if ':' in server_part else server_part
+                    server_port = server_part.split(':')[1] if ':' in server_part else '1935'
+
+                    print(f"任务 {task_id} 解析的服务器信息: {server_host}:{server_port}")
+
+                    # 先测试网络连通性
+                    import socket
+                    try:
+                        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                        sock.settimeout(5)
+                        result = sock.connect_ex((server_host, int(server_port)))
+                        sock.close()
+
+                        if result != 0:
+                            print(f"任务 {task_id} 网络连通性测试失败: {server_host}:{server_port}")
+                            return False
+                        else:
+                            print(f"任务 {task_id} 网络连通性测试成功: {server_host}:{server_port}")
+                    except Exception as net_error:
+                        print(f"任务 {task_id} 网络测试异常: {net_error}")
+                        return False
+
+                # 使用ffprobe测试推流地址
+                test_cmd = [
+                    'ffprobe',
+                    '-v', 'error',
+                    '-print_format', 'json',
+                    '-show_format',
+                    '-timeout', '5000000',  # 5秒超时
+                    stream_url
+                ]
+
+                print(f"任务 {task_id} 执行ffprobe命令: {' '.join(test_cmd)}")
+                result = subprocess.run(test_cmd, capture_output=True, text=True, timeout=10)
+
+                if result.returncode == 0:
+                    print(f"任务 {task_id} ffprobe测试成功")
+                    return True
+                else:
+                    print(f"任务 {task_id} ffprobe测试失败,返回码: {result.returncode}")
+                    print(f"任务 {task_id} ffprobe错误输出: {result.stderr}")
+                    return False
+
+            except subprocess.TimeoutExpired:
+                print(f"任务 {task_id} ffprobe测试超时")
+                return False
+            except Exception as e:
+                print(f"任务 {task_id} 推流地址测试异常: {e}")
+                return False
+
+        # 如果是RTMP推流且不是保存到本地,先测试连通性
+        if (stream_url.startswith('rtmp://') and
+            not params_dict.get('save_local', False) and
+            not params_dict.get('skip_connectivity_test', False)):
+
+            print(f"任务 {task_id} 测试推流地址连通性...")
+            if not test_stream_url():
+                print(f"任务 {task_id} 推流地址不可达: {stream_url}")
+                return {
+                    "code": 1,
+                    "msg": f"推流地址不可达: {stream_url}。可能的原因:1. 推流服务器未启动或不可访问 2. 网络连接问题 3. 防火墙阻止连接 4. 推流地址格式错误。建议解决方案:1. 检查推流服务器状态 2. 验证网络连接 3. 使用skip_connectivity_test=true跳过连通性测试 4. 使用save_local=true先保存到本地文件测试",
+                    "result": None
+                }
+        elif params_dict.get('skip_connectivity_test', False):
+            print(f"任务 {task_id} 跳过连通性测试,直接开始推流")
+        
         model = YOLO(model_path)
         cap = cv2.VideoCapture(source)
         if not cap.isOpened():
@@ -446,8 +570,37 @@ def yolov12_stream_worker(params_dict, task_id):
         # 使用实际帧率,如果获取不到则使用参数中的fps
         output_fps = fps_cap if fps_cap > 0 else fps
         
-        # 构建ffmpeg命令
-        ffmpeg_cmd = [
+        # 构建ffmpeg命令 - 高质量编码,增加错误处理
+        # 首先尝试简单配置,如果失败再使用复杂配置
+        
+        # 确定输出格式和文件
+        if params_dict.get('save_local', False):
+            # 保存到本地文件
+            output_file = f"output_{task_id}.mp4"
+            output_format = 'mp4'
+        else:
+            # 推流
+            output_file = stream_url
+            output_format = 'flv' if stream_url.startswith('rtmp') else 'mpegts'
+        
+        simple_ffmpeg_cmd = [
+            'ffmpeg',
+            '-f', 'rawvideo',
+            '-pix_fmt', 'bgr24',
+            '-s', f'{width}x{height}',
+            '-r', str(output_fps),
+            '-i', '-',  # 从stdin读取
+            '-c:v', 'libx264',
+            '-preset', 'ultrafast',  # 使用最快的预设
+            '-tune', 'zerolatency',
+            '-b:v', bitrate,
+            '-loglevel', 'error',  # 只显示错误信息
+            '-f', output_format,
+            '-y',  # 覆盖输出文件
+            output_file
+        ]
+        
+        advanced_ffmpeg_cmd = [
             'ffmpeg',
             '-f', 'rawvideo',
             '-pix_fmt', 'bgr24',
@@ -455,35 +608,34 @@ def yolov12_stream_worker(params_dict, task_id):
             '-r', str(output_fps),
             '-i', '-',  # 从stdin读取
             '-c:v', 'libx264',
-            '-preset', 'ultrafast',
+            '-preset', 'medium',  # 改为medium,平衡质量和速度
             '-tune', 'zerolatency',
+            '-profile:v', 'high',  # 使用high profile
+            '-level', '4.1',  # 设置编码级别
             '-b:v', bitrate,
             '-maxrate', bitrate,
-            '-bufsize', '4000k',
+            '-bufsize', '8000k',  # 增加缓冲区大小
             '-g', str(int(output_fps) * 2),  # GOP大小
-            '-f', 'flv' if stream_url.startswith('rtmp') else 'mpegts',
+            '-keyint_min', str(int(output_fps)),  # 最小关键帧间隔
+            '-sc_threshold', '0',  # 禁用场景切换检测
+            '-bf', '3',  # B帧数量
+            '-refs', '6',  # 参考帧数量
+            '-x264opts', 'no-scenecut=1:nal-hrd=cbr:force-cfr=1',  # x264特定选项
+            '-color_primaries', 'bt709',  # 色彩空间
+            '-color_trc', 'bt709',  # 色彩传输特性
+            '-colorspace', 'bt709',  # 色彩空间
+            '-loglevel', 'error',  # 只显示错误信息
+            '-f', output_format,
             '-y',  # 覆盖输出文件
-            stream_url
+            output_file
         ]
         
-        print(f"任务 {task_id} 启动ffmpeg命令: {' '.join(ffmpeg_cmd)}")
+        # 默认使用简单配置
+        ffmpeg_cmd = simple_ffmpeg_cmd
         
         # 启动ffmpeg进程
-        ffmpeg_process = subprocess.Popen(
-            ffmpeg_cmd,
-            stdin=subprocess.PIPE,
-            stdout=subprocess.PIPE,
-            stderr=subprocess.PIPE,
-            bufsize=0
-        )
-        
-        # 等待ffmpeg启动
-        time.sleep(1)
-        
-        if ffmpeg_process.poll() is not None:
-            # ffmpeg进程异常退出
-            stderr_output = ffmpeg_process.stderr.read().decode() if ffmpeg_process.stderr else "未知错误"
-            return {"code": 1, "msg": f"ffmpeg启动失败: {stderr_output}", "result": None}
+        if not start_ffmpeg():
+            return {"code": 1, "msg": "ffmpeg启动失败,已达到最大重试次数", "result": None}
         
         frame_count = 0
         start_time = time.time()
@@ -516,6 +668,21 @@ def yolov12_stream_worker(params_dict, task_id):
                 
                 # 将处理后的帧写入ffmpeg
                 try:
+                    # 检查ffmpeg进程是否还在运行
+                    if ffmpeg_process.poll() is not None:
+                        stderr_output = ffmpeg_process.stderr.read().decode() if ffmpeg_process.stderr else "未知错误"
+                        print(f"任务 {task_id} ffmpeg进程已退出: {stderr_output}")
+                        
+                        # 尝试重启ffmpeg进程
+                        print(f"任务 {task_id} 尝试重启ffmpeg进程...")
+                        cleanup_process()
+                        if start_ffmpeg():
+                            print(f"任务 {task_id} ffmpeg进程重启成功,继续处理")
+                            continue
+                        else:
+                            print(f"任务 {task_id} ffmpeg进程重启失败,停止处理")
+                            break
+                    
                     ffmpeg_process.stdin.write(annotated_frame.tobytes())
                     ffmpeg_process.stdin.flush()
                     frame_count += 1
@@ -528,7 +695,23 @@ def yolov12_stream_worker(params_dict, task_id):
                         
                 except IOError as e:
                     print(f"任务 {task_id} 写入ffmpeg时出错: {e}")
-                    break
+                    # 尝试获取ffmpeg的错误输出
+                    try:
+                        if ffmpeg_process.stderr:
+                            stderr_output = ffmpeg_process.stderr.read().decode()
+                            print(f"任务 {task_id} ffmpeg错误输出: {stderr_output}")
+                    except:
+                        pass
+                    
+                    # 尝试重启ffmpeg进程
+                    print(f"任务 {task_id} 尝试重启ffmpeg进程...")
+                    cleanup_process()
+                    if start_ffmpeg():
+                        print(f"任务 {task_id} ffmpeg进程重启成功,继续处理")
+                        continue
+                    else:
+                        print(f"任务 {task_id} ffmpeg进程重启失败,停止处理")
+                        break
                     
         except KeyboardInterrupt:
             print(f"任务 {task_id} 收到中断信号,停止处理")
@@ -546,7 +729,26 @@ def yolov12_stream_worker(params_dict, task_id):
     except Exception as e:
         print(f"任务 {task_id} 发生异常: {e}")
         cleanup_process()
-        return {"code": 1, "msg": str(e), "result": None}
+        
+        # 提供更详细的错误信息
+        error_msg = str(e)
+        if "Broken pipe" in error_msg:
+            error_msg += " - 这通常表示推流地址不可达或网络连接问题,建议:1) 检查推流服务器是否运行 2) 检查网络连接 3) 使用save_local=true先保存到本地文件测试"
+        elif "Connection refused" in error_msg:
+            error_msg += " - 推流服务器拒绝连接,请检查推流地址是否正确"
+        elif "Permission denied" in error_msg:
+            error_msg += " - 权限不足,请检查推流地址的访问权限"
+        
+        # 记录详细的调试信息
+        print(f"任务 {task_id} 调试信息:")
+        print(f"  - 输入源: {source}")
+        print(f"  - 推流地址: {stream_url}")
+        print(f"  - 保存到本地: {params_dict.get('save_local', False)}")
+        print(f"  - 视频尺寸: {width}x{height}")
+        print(f"  - 帧率: {output_fps}")
+        print(f"  - 码率: {bitrate}")
+        
+        return {"code": 1, "msg": error_msg, "result": None}
 
 @app_fastapi.post("/yolov12/stream")
 async def yolov12_stream_async(params: StreamParams):