Przeglądaj źródła

直接返回mp4格式文件路径-推流成功版本

xujunwei 5 miesięcy temu
rodzic
commit
98472648b9
1 zmienionych plików z 45 dodań i 0 usunięć
  1. 45 0
      app.py

+ 45 - 0
app.py

@@ -35,6 +35,19 @@ stream_executor = ProcessPoolExecutor(max_workers=4)
 stream_tasks: Dict[str, Future] = {}
 stream_pids: Dict[str, int] = {}  # 记录每个任务的worker进程pid
 
+def cleanup_completed_tasks():
+    """清理已完成的任务"""
+    completed_tasks = []
+    for task_id, future in stream_tasks.items():
+        if future.done():
+            completed_tasks.append(task_id)
+    
+    for task_id in completed_tasks:
+        del stream_tasks[task_id]
+        if task_id in stream_pids:
+            del stream_pids[task_id]
+        print(f"已清理完成的任务: {task_id}")
+
 def yolov12_inference(image, video, model_id, image_size, conf_threshold):
     model = YOLO(model_id)
     if image:
@@ -770,6 +783,24 @@ async def yolov12_stream_async(params: StreamParams):
         future = loop.run_in_executor(
             stream_executor, partial(yolov12_stream_worker, params.dict(), task_id)
         )
+        
+        # 添加任务完成后的清理回调
+        def cleanup_task(fut):
+            try:
+                # 获取任务结果(如果有异常会抛出)
+                result = fut.result()
+                print(f"任务 {task_id} 已完成,结果: {result}")
+            except Exception as e:
+                print(f"任务 {task_id} 执行异常: {e}")
+            finally:
+                # 清理任务
+                if task_id in stream_tasks:
+                    del stream_tasks[task_id]
+                if task_id in stream_pids:
+                    del stream_pids[task_id]
+                print(f"已清理任务: {task_id}")
+        
+        future.add_done_callback(cleanup_task)
         stream_tasks[task_id] = future
         
         logging.info(f"任务 {task_id} 已提交到进程池")
@@ -797,6 +828,9 @@ async def cancel_stream_task(task_id: str):
     """
     logging.info(f"收到取消任务请求: {task_id}")
     
+    # 先清理已完成的任务
+    cleanup_completed_tasks()
+    
     future = stream_tasks.get(task_id)
     if not future:
         return {"code": 1, "msg": "任务不存在", "result": None}
@@ -809,6 +843,11 @@ async def cancel_stream_task(task_id: str):
         cancelled = future.cancel()
         if cancelled:
             logging.info(f"任务 {task_id} 已取消")
+            # 取消后立即清理
+            if task_id in stream_tasks:
+                del stream_tasks[task_id]
+            if task_id in stream_pids:
+                del stream_pids[task_id]
             return {"code": 0, "msg": "任务已取消", "result": None}
         else:
             return {"code": 1, "msg": "任务无法取消(可能正在运行)", "result": None}
@@ -825,6 +864,9 @@ async def get_stream_status(task_id: str):
     """
     logging.info(f"收到查询任务状态请求: {task_id}")
     
+    # 先清理已完成的任务
+    cleanup_completed_tasks()
+    
     future = stream_tasks.get(task_id)
     if not future:
         return {"code": 1, "msg": "任务不存在", "result": None}
@@ -869,6 +911,9 @@ async def list_stream_tasks():
     """
     logging.info("收到查询所有任务请求")
     
+    # 先清理已完成的任务
+    cleanup_completed_tasks()
+
     try:
         tasks_info = []
         for task_id, future in stream_tasks.items():