Procházet zdrojové kódy

直接返回mp4格式文件路径-推流成功版本

xujunwei před 5 měsíci
rodič
revize
5f2567ea2c
1 změnil soubory, kde provedl 192 přidání a 22 odebrání
  1. 192 22
      app.py

+ 192 - 22
app.py

@@ -35,6 +35,17 @@ stream_executor = ProcessPoolExecutor(max_workers=4)
 stream_tasks: Dict[str, Dict] = {}  # 存储任务信息:{task_id: {"future": Future, "start_time": long, "source": str, "stream_url": str}}
 stream_pids: Dict[str, int] = {}  # 记录每个任务的worker进程pid
 
+def create_new_executor():
+    """创建新的进程池执行器"""
+    global stream_executor
+    try:
+        if stream_executor:
+            stream_executor.shutdown(wait=False)
+    except:
+        pass
+    stream_executor = ProcessPoolExecutor(max_workers=4)
+    print("已重新创建进程池执行器")
+
 def cleanup_completed_tasks():
     """清理已完成的任务"""
     completed_tasks = []
@@ -46,6 +57,13 @@ def cleanup_completed_tasks():
         del stream_tasks[task_id]
         if task_id in stream_pids:
             del stream_pids[task_id]
+        # 清理PID文件
+        pid_file = f"/tmp/yolov12_task_{task_id}.pid"
+        try:
+            if os.path.exists(pid_file):
+                os.remove(pid_file)
+        except:
+            pass
         print(f"已清理完成的任务: {task_id}")
 
 def yolov12_inference(image, video, model_id, image_size, conf_threshold):
@@ -417,9 +435,40 @@ def yolov12_stream_worker(params_dict, task_id):
     import signal
     from ultralytics import YOLO
 
+    # 记录当前进程PID
+    current_pid = os.getpid()
+    print(f"任务 {task_id} worker进程启动,PID: {current_pid}")
+    
+    # 将PID写入临时文件,供主进程读取
+    pid_file = f"/tmp/yolov12_task_{task_id}.pid"
+    try:
+        with open(pid_file, 'w') as f:
+            f.write(str(current_pid))
+    except Exception as e:
+        print(f"任务 {task_id} 写入PID文件失败: {e}")
+    
     # 注册SIGTERM信号处理器
     def handle_sigterm(signum, frame):
         print(f"任务 {task_id} 收到终止信号,准备退出")
+        # 清理PID文件
+        try:
+            if os.path.exists(pid_file):
+                os.remove(pid_file)
+        except:
+            pass
+        # 清理ffmpeg进程
+        if 'ffmpeg_process' in locals() and ffmpeg_process:
+            try:
+                ffmpeg_process.terminate()
+                ffmpeg_process.wait(timeout=2)
+            except:
+                pass
+        # 清理视频捕获
+        if 'cap' in locals() and cap:
+            try:
+                cap.release()
+            except:
+                pass
         exit(0)
     signal.signal(signal.SIGTERM, handle_sigterm)
 
@@ -780,16 +829,34 @@ async def yolov12_stream_async(params: StreamParams):
     try:
         # 异步执行推理任务
         loop = asyncio.get_event_loop()
-        future = loop.run_in_executor(
-            stream_executor, partial(yolov12_stream_worker, params.dict(), task_id)
-        )
+        
+        # 确保进程池健康
+        ensure_executor_healthy()
+        
+        # 检查进程池是否可用
+        try:
+            future = loop.run_in_executor(
+                stream_executor, partial(yolov12_stream_worker, params.dict(), task_id)
+            )
+        except Exception as pool_error:
+            logging.warning(f"进程池异常,尝试重新创建: {pool_error}")
+            create_new_executor()
+            future = loop.run_in_executor(
+                stream_executor, partial(yolov12_stream_worker, params.dict(), task_id)
+            )
         
         # 添加任务完成后的清理回调
         def cleanup_task(fut):
             try:
-                # 获取任务结果(如果有异常会抛出)
-                result = fut.result()
-                print(f"任务 {task_id} 已完成,结果: {result}")
+                # 检查任务是否被取消
+                if fut.cancelled():
+                    print(f"任务 {task_id} 已被取消")
+                else:
+                    # 获取任务结果(如果有异常会抛出)
+                    result = fut.result()
+                    print(f"任务 {task_id} 已完成,结果: {result}")
+            except asyncio.CancelledError:
+                print(f"任务 {task_id} 被取消")
             except Exception as e:
                 print(f"任务 {task_id} 执行异常: {e}")
             finally:
@@ -798,6 +865,13 @@ async def yolov12_stream_async(params: StreamParams):
                     del stream_tasks[task_id]
                 if task_id in stream_pids:
                     del stream_pids[task_id]
+                # 清理PID文件
+                pid_file = f"/tmp/yolov12_task_{task_id}.pid"
+                try:
+                    if os.path.exists(pid_file):
+                        os.remove(pid_file)
+                except:
+                    pass
                 print(f"已清理任务: {task_id}")
         
         future.add_done_callback(cleanup_task)
@@ -808,6 +882,23 @@ async def yolov12_stream_async(params: StreamParams):
             "stream_url": params.stream_url
         }
         
+        # 等待worker进程启动并读取PID
+        pid_file = f"/tmp/yolov12_task_{task_id}.pid"
+        max_wait = 50  # 最多等待5秒
+        for _ in range(max_wait):
+            if os.path.exists(pid_file):
+                try:
+                    with open(pid_file, 'r') as f:
+                        worker_pid = int(f.read().strip())
+                        stream_pids[task_id] = worker_pid
+                        logging.info(f"任务 {task_id} worker进程PID: {worker_pid}")
+                        break
+                except Exception as e:
+                    logging.warning(f"读取PID文件失败: {e}")
+            await asyncio.sleep(0.1)
+        else:
+            logging.warning(f"任务 {task_id} 无法获取worker进程PID")
+        
         logging.info(f"任务 {task_id} 已提交到进程池")
         
         return {
@@ -844,15 +935,43 @@ async def cancel_stream_task(task_id: str):
         return {"code": 1, "msg": "任务已完成,无法取消", "result": None}
     
     try:
-        # 尝试取消任务
+        # 尝试取消Future
         cancelled = task_info["future"].cancel()
+        
+        # 获取worker进程的PID并终止进程
+        worker_pid = stream_pids.get(task_id)
+        if worker_pid:
+            try:
+                logging.info(f"终止worker进程: {worker_pid}")
+                # 使用SIGTERM优雅终止,而不是强制杀死
+                os.kill(worker_pid, signal.SIGTERM)
+                
+                # 等待进程结束,最多等待3秒
+                import time
+                for _ in range(30):  # 30次 * 0.1秒 = 3秒
+                    try:
+                        os.kill(worker_pid, 0)  # 检查进程是否还存在
+                        time.sleep(0.1)
+                    except OSError:
+                        # 进程已经结束
+                        break
+                else:
+                    # 如果进程还在运行,记录警告但不强制杀死
+                    logging.warning(f"进程 {worker_pid} 未能在3秒内优雅退出,但已发送取消信号")
+                    
+            except OSError as e:
+                logging.warning(f"终止进程时出错: {e}")
+        
         if cancelled:
             logging.info(f"任务 {task_id} 已取消")
-            # 取消后立即清理
-            if task_id in stream_tasks:
-                del stream_tasks[task_id]
-            if task_id in stream_pids:
-                del stream_pids[task_id]
+            # 取消后不立即清理,让回调函数处理
+            # 只清理PID文件,因为进程可能还在运行
+            pid_file = f"/tmp/yolov12_task_{task_id}.pid"
+            try:
+                if os.path.exists(pid_file):
+                    os.remove(pid_file)
+            except:
+                pass
             return {"code": 0, "msg": "任务已取消", "result": None}
         else:
             return {"code": 1, "msg": "任务无法取消(可能正在运行)", "result": None}
@@ -879,14 +998,42 @@ async def get_stream_status(task_id: str):
     try:
         if task_info["future"].done():
             try:
-                result = task_info["future"].result()
+                # 检查是否被取消
+                if task_info["future"].cancelled():
+                    run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
+                    return {
+                        "code": 0, 
+                        "msg": "已取消", 
+                        "result": {
+                            "status": "cancelled",
+                            "start_time": task_info["start_time"],
+                            "run_time": round(run_time, 2),
+                            "source": task_info["source"],
+                            "stream_url": task_info["stream_url"]
+                        }
+                    }
+                else:
+                    result = task_info["future"].result()
+                    run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
+                    return {
+                        "code": 0, 
+                        "msg": "已完成", 
+                        "result": {
+                            "status": "completed", 
+                            "result": result,
+                            "start_time": task_info["start_time"],
+                            "run_time": round(run_time, 2),
+                            "source": task_info["source"],
+                            "stream_url": task_info["stream_url"]
+                        }
+                    }
+            except asyncio.CancelledError:
                 run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
                 return {
                     "code": 0, 
-                    "msg": "已完成", 
+                    "msg": "已取消", 
                     "result": {
-                        "status": "completed", 
-                        "result": result,
+                        "status": "cancelled",
                         "start_time": task_info["start_time"],
                         "run_time": round(run_time, 2),
                         "source": task_info["source"],
@@ -953,14 +1100,16 @@ async def list_stream_tasks():
         tasks_info = []
         for task_id, task_info in stream_tasks.items():
             if task_info["future"].done():
-                if task_info["future"].cancelled():
-                    status = "cancelled"
-                else:
-                    try:
+                try:
+                    if task_info["future"].cancelled():
+                        status = "cancelled"
+                    else:
                         task_info["future"].result()  # 检查是否有异常
                         status = "completed"
-                    except:
-                        status = "failed"
+                except asyncio.CancelledError:
+                    status = "cancelled"
+                except:
+                    status = "failed"
             else:
                 status = "running"
             
@@ -999,6 +1148,27 @@ async def validation_exception_handler(request, exc):
         }
     )
 
+def check_executor_health():
+    """检查进程池执行器状态"""
+    global stream_executor
+    try:
+        # 尝试提交一个简单的测试任务
+        test_future = stream_executor.submit(lambda: 1)
+        test_future.result(timeout=1)
+        test_future.cancel()
+        return True
+    except Exception as e:
+        logging.warning(f"进程池健康检查失败: {e}")
+        return False
+
+def ensure_executor_healthy():
+    """确保进程池执行器健康"""
+    if not check_executor_health():
+        logging.warning("进程池不健康,重新创建")
+        create_new_executor()
+        return False
+    return True
+
 if __name__ == "__main__":
     threading.Thread(target=start_gradio, daemon=True).start()
     uvicorn.run(app_fastapi, host="0.0.0.0", port=8000)