Преглед изворни кода

直接返回mp4格式文件路径-推流成功版本

xujunwei пре 5 месеци
родитељ
комит
62dd56d705
1 измењених фајлова са 208 додато и 33 уклоњено
  1. 208 33
      app.py

+ 208 - 33
app.py

@@ -9,6 +9,7 @@ import threading
 import cv2
 import gradio as gr
 import uvicorn
+import asyncio
 from fastapi import FastAPI
 from fastapi import status
 from fastapi.exceptions import RequestValidationError
@@ -21,11 +22,19 @@ import glob
 import subprocess
 import signal
 import time
-from typing import Optional
+from typing import Optional, Dict
+from concurrent.futures import ProcessPoolExecutor, Future
+from functools import partial
+import uuid
 
 # 设置日志格式和级别
 logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(message)s')
 
+# 创建进程池执行器和任务管理
+stream_executor = ProcessPoolExecutor(max_workers=4)
+stream_tasks: Dict[str, Future] = {}
+stream_pids: Dict[str, int] = {}  # 记录每个任务的worker进程pid
+
 def yolov12_inference(image, video, model_id, image_size, conf_threshold):
     model = YOLO(model_id)
     if image:
@@ -380,16 +389,34 @@ class StreamParams(BaseModel):
     device: str = ""
     # 可根据需要补充更多参数
 
-@app_fastapi.post("/yolov12/stream")
-def yolov12_stream(params: StreamParams):
+def yolov12_stream_worker(params_dict, task_id):
     """
-    RESTful POST接口:/yolov12/stream
-    接收视频拉流地址和推流地址,调用YOLO模型推理,使用ffmpeg将推理后的视频推送到推流地址。
-    返回格式:{"code": 0/1, "msg": "success/错误原因", "result": None}
+    同步推理函数,在进程池中执行
+    支持信号终止
     """
-    logging.info("收到/yolov12/stream请求")
-    logging.info(f"请求参数: {params}")
-    
+    import os
+    import cv2
+    import time
+    import subprocess
+    import signal
+    from ultralytics import YOLO
+
+    # 注册SIGTERM信号处理器
+    def handle_sigterm(signum, frame):
+        print(f"任务 {task_id} 收到终止信号,准备退出")
+        exit(0)
+    signal.signal(signal.SIGTERM, handle_sigterm)
+
+    model_path = params_dict['model']
+    source = params_dict['source']
+    stream_url = params_dict['stream_url']
+    fps = params_dict.get('fps', 25)
+    bitrate = params_dict.get('bitrate', '2000k')
+    conf = params_dict.get('conf', 0.25)
+    iou = params_dict.get('iou', 0.7)
+    imgsz = params_dict.get('imgsz', 640)
+    device = params_dict.get('device', '')
+
     # 全局变量用于存储进程引用
     ffmpeg_process = None
     
@@ -403,21 +430,21 @@ def yolov12_stream(params: StreamParams):
             except subprocess.TimeoutExpired:
                 ffmpeg_process.kill()
             except Exception as e:
-                logging.warning(f"清理ffmpeg进程时出错: {e}")
-    
+                print(f"清理ffmpeg进程时出错: {e}")
+
     try:
-        model = YOLO(params.model)
-        cap = cv2.VideoCapture(params.source)
+        model = YOLO(model_path)
+        cap = cv2.VideoCapture(source)
         if not cap.isOpened():
-            return {"code": 1, "msg": f"无法打开视频流: {params.source}", "result": None}
+            return {"code": 1, "msg": f"无法打开视频流: {source}", "result": None}
         
         # 获取视频流信息
-        fps = cap.get(cv2.CAP_PROP_FPS)
+        fps_cap = cap.get(cv2.CAP_PROP_FPS)
         width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
         height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
         
         # 使用实际帧率,如果获取不到则使用参数中的fps
-        output_fps = fps if fps > 0 else params.fps
+        output_fps = fps_cap if fps_cap > 0 else fps
         
         # 构建ffmpeg命令
         ffmpeg_cmd = [
@@ -430,16 +457,16 @@ def yolov12_stream(params: StreamParams):
             '-c:v', 'libx264',
             '-preset', 'ultrafast',
             '-tune', 'zerolatency',
-            '-b:v', params.bitrate,
-            '-maxrate', params.bitrate,
+            '-b:v', bitrate,
+            '-maxrate', bitrate,
             '-bufsize', '4000k',
-            '-g', str(output_fps * 2),  # GOP大小
-            '-f', 'flv' if params.stream_url.startswith('rtmp') else 'mpegts',
+            '-g', str(int(output_fps) * 2),  # GOP大小
+            '-f', 'flv' if stream_url.startswith('rtmp') else 'mpegts',
             '-y',  # 覆盖输出文件
-            params.stream_url
+            stream_url
         ]
         
-        logging.info(f"启动ffmpeg命令: {' '.join(ffmpeg_cmd)}")
+        print(f"任务 {task_id} 启动ffmpeg命令: {' '.join(ffmpeg_cmd)}")
         
         # 启动ffmpeg进程
         ffmpeg_process = subprocess.Popen(
@@ -471,19 +498,19 @@ def yolov12_stream(params: StreamParams):
                 try:
                     predict_kwargs = {
                         'source': frame,
-                        'imgsz': params.imgsz,
-                        'conf': params.conf,
-                        'device': params.device
+                        'imgsz': imgsz,
+                        'conf': conf,
+                        'device': device
                     }
                     
                     # 只有当iou不为None时才添加到参数中
-                    if params.iou is not None:
-                        predict_kwargs['iou'] = params.iou
+                    if iou is not None:
+                        predict_kwargs['iou'] = iou
                     
                     results = model.predict(**predict_kwargs)
                     annotated_frame = results[0].plot()
                 except Exception as predict_error:
-                    logging.error(f"YOLO推理出错: {predict_error}")
+                    print(f"任务 {task_id} YOLO推理出错: {predict_error}")
                     # 如果推理失败,使用原始帧
                     annotated_frame = frame
                 
@@ -497,14 +524,14 @@ def yolov12_stream(params: StreamParams):
                     if frame_count % 100 == 0:
                         elapsed_time = time.time() - start_time
                         current_fps = frame_count / elapsed_time
-                        logging.info(f"已处理 {frame_count} 帧,当前FPS: {current_fps:.2f}")
+                        print(f"任务 {task_id} 已处理 {frame_count} 帧,当前FPS: {current_fps:.2f}")
                         
                 except IOError as e:
-                    logging.error(f"写入ffmpeg时出错: {e}")
+                    print(f"任务 {task_id} 写入ffmpeg时出错: {e}")
                     break
                     
         except KeyboardInterrupt:
-            logging.info("收到中断信号,停止处理")
+            print(f"任务 {task_id} 收到中断信号,停止处理")
         finally:
             # 清理资源
             cap.release()
@@ -513,14 +540,162 @@ def yolov12_stream(params: StreamParams):
         elapsed_time = time.time() - start_time
         avg_fps = frame_count / elapsed_time if elapsed_time > 0 else 0
         
-        logging.info(f"推理并推流完成,共处理帧数: {frame_count},平均FPS: {avg_fps:.2f}")
+        print(f"任务 {task_id} 推理并推流完成,共处理帧数: {frame_count},平均FPS: {avg_fps:.2f}")
         return {"code": 0, "msg": "success", "result": {"frames_processed": frame_count, "avg_fps": avg_fps}}
         
     except Exception as e:
-        logging.error(f"/yolov12/stream 发生异常: {e}")
+        print(f"任务 {task_id} 发生异常: {e}")
         cleanup_process()
         return {"code": 1, "msg": str(e), "result": None}
 
+@app_fastapi.post("/yolov12/stream")
+async def yolov12_stream_async(params: StreamParams):
+    """
+    RESTful POST接口:/yolov12/stream
+    接收视频拉流地址和推流地址,调用YOLO模型推理,使用ffmpeg将推理后的视频推送到推流地址。
+    支持并发推理和任务取消。
+    返回格式:{"code": 0/1, "msg": "success/错误原因", "result": {"task_id": "任务ID"}}
+    """
+    logging.info("收到/yolov12/stream请求")
+    logging.info(f"请求参数: {params}")
+    
+    # 生成唯一任务ID
+    task_id = str(uuid.uuid4())
+    
+    try:
+        # 异步执行推理任务
+        loop = asyncio.get_event_loop()
+        future = loop.run_in_executor(
+            stream_executor, partial(yolov12_stream_worker, params.dict(), task_id)
+        )
+        stream_tasks[task_id] = future
+        
+        logging.info(f"任务 {task_id} 已提交到进程池")
+        
+        return {
+            "code": 0, 
+            "msg": "任务已提交", 
+            "result": task_id
+        }
+        
+    except Exception as e:
+        logging.error(f"提交任务时发生异常: {e}")
+        return {
+            "code": 1,
+            "msg": str(e),
+            "result": None
+                 }
+
+@app_fastapi.post("/yolov12/stream/cancel")
+async def cancel_stream_task(task_id: str):
+    """
+    RESTful POST接口:/yolov12/stream/cancel
+    取消指定的推理任务
+    返回格式:{"code": 0/1, "msg": "success/错误原因", "result": None}
+    """
+    logging.info(f"收到取消任务请求: {task_id}")
+    
+    future = stream_tasks.get(task_id)
+    if not future:
+        return {"code": 1, "msg": "任务不存在", "result": None}
+    
+    if future.done():
+        return {"code": 1, "msg": "任务已完成,无法取消", "result": None}
+    
+    try:
+        # 尝试取消任务
+        cancelled = future.cancel()
+        if cancelled:
+            logging.info(f"任务 {task_id} 已取消")
+            return {"code": 0, "msg": "任务已取消", "result": None}
+        else:
+            return {"code": 1, "msg": "任务无法取消(可能正在运行)", "result": None}
+    except Exception as e:
+        logging.error(f"取消任务时发生异常: {e}")
+        return {"code": 1, "msg": str(e), "result": None}
+
+@app_fastapi.get("/yolov12/stream/status")
+async def get_stream_status(task_id: str):
+    """
+    RESTful GET接口:/yolov12/stream/status
+    查询指定任务的状态
+    返回格式:{"code": 0/1, "msg": "success/错误原因", "result": {"status": "状态", "result": "结果"}}
+    """
+    logging.info(f"收到查询任务状态请求: {task_id}")
+    
+    future = stream_tasks.get(task_id)
+    if not future:
+        return {"code": 1, "msg": "任务不存在", "result": None}
+    
+    try:
+        if future.done():
+            try:
+                result = future.result()
+                return {
+                    "code": 0, 
+                    "msg": "已完成", 
+                    "result": {"status": "completed", "result": result}
+                }
+            except Exception as e:
+                return {
+                    "code": 1, 
+                    "msg": f"任务异常: {e}", 
+                    "result": {"status": "failed", "error": str(e)}
+                }
+        elif future.cancelled():
+            return {
+                "code": 0, 
+                "msg": "已取消", 
+                "result": {"status": "cancelled"}
+            }
+        else:
+            return {
+                "code": 0, 
+                "msg": "运行中", 
+                "result": {"status": "running"}
+            }
+    except Exception as e:
+        logging.error(f"查询任务状态时发生异常: {e}")
+        return {"code": 1, "msg": str(e), "result": None}
+
+@app_fastapi.get("/yolov12/stream/list")
+async def list_stream_tasks():
+    """
+    RESTful GET接口:/yolov12/stream/list
+    列出所有任务的状态
+    返回格式:{"code": 0, "msg": "success", "result": {"tasks": [{"task_id": "ID", "status": "状态"}]}}
+    """
+    logging.info("收到查询所有任务请求")
+    
+    try:
+        tasks_info = []
+        for task_id, future in stream_tasks.items():
+            if future.done():
+                if future.cancelled():
+                    status = "cancelled"
+                else:
+                    try:
+                        future.result()  # 检查是否有异常
+                        status = "completed"
+                    except:
+                        status = "failed"
+            else:
+                status = "running"
+            
+            tasks_info.append({
+                "task_id": task_id,
+                "status": status
+            })
+        
+        return {
+            "code": 0,
+            "msg": "success",
+            "result": tasks_info
+        }
+    except Exception as e:
+        logging.error(f"查询所有任务时发生异常: {e}")
+        return {"code": 1, "msg": str(e), "result": None}
+
 # 全局异常处理器:参数校验失败时统一返回格式
 @app_fastapi.exception_handler(RequestValidationError)
 async def validation_exception_handler(request, exc):