Преглед на файлове

直接返回mp4格式文件路径-推流成功版本

xujunwei преди 5 месеца
родител
ревизия
82925e2ad7
променени са 1 файла, в които са добавени 207 реда и са изтрити 64 реда
  1. 207 64
      app.py

+ 207 - 64
app.py

@@ -35,6 +35,28 @@ stream_executor = ProcessPoolExecutor(max_workers=4)
 stream_tasks: Dict[str, Dict] = {}  # 存储任务信息:{task_id: {"future": Future, "start_time": long, "source": str, "stream_url": str}}
 stream_tasks: Dict[str, Dict] = {}  # 存储任务信息:{task_id: {"future": Future, "start_time": long, "source": str, "stream_url": str}}
 stream_pids: Dict[str, int] = {}  # 记录每个任务的worker进程pid
 stream_pids: Dict[str, int] = {}  # 记录每个任务的worker进程pid
 
 
+# 定期清理任务
+def periodic_cleanup():
+    """定期清理已完成的任务"""
+    import threading
+    import time
+    
+    def cleanup_loop():
+        while True:
+            try:
+                cleanup_completed_tasks()
+                time.sleep(5)  # 每5秒清理一次
+            except Exception as e:
+                logging.error(f"定期清理任务时出错: {e}")
+                time.sleep(5)  # 出错后等待5秒再继续
+    
+    cleanup_thread = threading.Thread(target=cleanup_loop, daemon=True)
+    cleanup_thread.start()
+    return cleanup_thread
+
+# 启动定期清理
+cleanup_thread = periodic_cleanup()
+
 def create_new_executor():
 def create_new_executor():
     """创建新的进程池执行器"""
     """创建新的进程池执行器"""
     global stream_executor
     global stream_executor
@@ -50,21 +72,34 @@ def cleanup_completed_tasks():
     """清理已完成的任务"""
     """清理已完成的任务"""
     completed_tasks = []
     completed_tasks = []
     for task_id, task_info in stream_tasks.items():
     for task_id, task_info in stream_tasks.items():
-        if task_info["future"].done():
+        try:
+            if task_info["future"].done():
+                # 尝试获取结果,但设置超时避免卡住
+                try:
+                    task_info["future"].result(timeout=0.05)  # 50ms超时
+                except Exception as e:
+                    logging.warning(f"清理任务 {task_id} 时获取结果出错: {e}")
+                completed_tasks.append(task_id)
+        except Exception as e:
+            logging.warning(f"检查任务 {task_id} 状态时出错: {e}")
+            # 如果检查状态出错,也标记为需要清理
             completed_tasks.append(task_id)
             completed_tasks.append(task_id)
     
     
     for task_id in completed_tasks:
     for task_id in completed_tasks:
-        del stream_tasks[task_id]
-        if task_id in stream_pids:
-            del stream_pids[task_id]
-        # 清理PID文件
-        pid_file = f"/tmp/yolov12_task_{task_id}.pid"
         try:
         try:
-            if os.path.exists(pid_file):
-                os.remove(pid_file)
-        except:
-            pass
-        print(f"已清理完成的任务: {task_id}")
+            del stream_tasks[task_id]
+            if task_id in stream_pids:
+                del stream_pids[task_id]
+            # 清理PID文件
+            pid_file = f"/tmp/yolov12_task_{task_id}.pid"
+            try:
+                if os.path.exists(pid_file):
+                    os.remove(pid_file)
+            except Exception as pid_error:
+                logging.warning(f"清理PID文件 {pid_file} 时出错: {pid_error}")
+            print(f"已清理完成的任务: {task_id}")
+        except Exception as e:
+            logging.error(f"清理任务 {task_id} 时出错: {e}")
 
 
 def yolov12_inference(image, video, model_id, image_size, conf_threshold):
 def yolov12_inference(image, video, model_id, image_size, conf_threshold):
     model = YOLO(model_id)
     model = YOLO(model_id)
@@ -735,6 +770,10 @@ def yolov12_stream_worker(params_dict, task_id):
                         stderr_output = ffmpeg_process.stderr.read().decode() if ffmpeg_process.stderr else "未知错误"
                         stderr_output = ffmpeg_process.stderr.read().decode() if ffmpeg_process.stderr else "未知错误"
                         print(f"任务 {task_id} ffmpeg进程已退出: {stderr_output}")
                         print(f"任务 {task_id} ffmpeg进程已退出: {stderr_output}")
                         
                         
+                        # 检查是否是partial file错误
+                        if "partial file" in stderr_output.lower():
+                            print(f"任务 {task_id} 检测到partial file错误,这通常表示视频文件写入不完整")
+                        
                         # 尝试重启ffmpeg进程
                         # 尝试重启ffmpeg进程
                         print(f"任务 {task_id} 尝试重启ffmpeg进程...")
                         print(f"任务 {task_id} 尝试重启ffmpeg进程...")
                         cleanup_process()
                         cleanup_process()
@@ -762,8 +801,21 @@ def yolov12_stream_worker(params_dict, task_id):
                         if ffmpeg_process.stderr:
                         if ffmpeg_process.stderr:
                             stderr_output = ffmpeg_process.stderr.read().decode()
                             stderr_output = ffmpeg_process.stderr.read().decode()
                             print(f"任务 {task_id} ffmpeg错误输出: {stderr_output}")
                             print(f"任务 {task_id} ffmpeg错误输出: {stderr_output}")
-                    except:
-                        pass
+                            
+                            # 检查是否是partial file错误
+                            if "partial file" in stderr_output.lower():
+                                print(f"任务 {task_id} 检测到partial file错误,这通常表示视频文件写入不完整")
+                                # 对于partial file错误,尝试重启ffmpeg进程
+                                print(f"任务 {task_id} 尝试重启ffmpeg进程...")
+                                cleanup_process()
+                                if start_ffmpeg():
+                                    print(f"任务 {task_id} ffmpeg进程重启成功,继续处理")
+                                    continue
+                                else:
+                                    print(f"任务 {task_id} ffmpeg进程重启失败,停止处理")
+                                    break
+                    except Exception as stderr_error:
+                        print(f"任务 {task_id} 读取ffmpeg错误输出时出错: {stderr_error}")
                     
                     
                     # 尝试重启ffmpeg进程
                     # 尝试重启ffmpeg进程
                     print(f"任务 {task_id} 尝试重启ffmpeg进程...")
                     print(f"任务 {task_id} 尝试重启ffmpeg进程...")
@@ -800,6 +852,8 @@ def yolov12_stream_worker(params_dict, task_id):
             error_msg += " - 推流服务器拒绝连接,请检查推流地址是否正确"
             error_msg += " - 推流服务器拒绝连接,请检查推流地址是否正确"
         elif "Permission denied" in error_msg:
         elif "Permission denied" in error_msg:
             error_msg += " - 权限不足,请检查推流地址的访问权限"
             error_msg += " - 权限不足,请检查推流地址的访问权限"
+        elif "partial file" in error_msg.lower():
+            error_msg += " - 视频文件写入不完整,可能原因:1) 磁盘空间不足 2) 网络中断 3) 推流服务器异常 4) ffmpeg进程被意外终止。建议:1) 检查磁盘空间 2) 检查网络连接 3) 重启推流服务器 4) 使用save_local=true先保存到本地文件测试"
         
         
         # 记录详细的调试信息
         # 记录详细的调试信息
         print(f"任务 {task_id} 调试信息:")
         print(f"任务 {task_id} 调试信息:")
@@ -809,6 +863,7 @@ def yolov12_stream_worker(params_dict, task_id):
         print(f"  - 视频尺寸: {width}x{height}")
         print(f"  - 视频尺寸: {width}x{height}")
         print(f"  - 帧率: {output_fps}")
         print(f"  - 帧率: {output_fps}")
         print(f"  - 码率: {bitrate}")
         print(f"  - 码率: {bitrate}")
+        print(f"  - 处理帧数: {frame_count}")
         
         
         return {"code": 1, "msg": error_msg, "result": None}
         return {"code": 1, "msg": error_msg, "result": None}
 
 
@@ -852,27 +907,33 @@ async def yolov12_stream_async(params: StreamParams):
                 if fut.cancelled():
                 if fut.cancelled():
                     print(f"任务 {task_id} 已被取消")
                     print(f"任务 {task_id} 已被取消")
                 else:
                 else:
-                    # 获取任务结果(如果有异常会抛出)
-                    result = fut.result()
-                    print(f"任务 {task_id} 已完成,结果: {result}")
+                    # 获取任务结果(如果有异常会抛出),设置超时避免卡住
+                    try:
+                        result = fut.result(timeout=0.1)  # 100ms超时
+                        print(f"任务 {task_id} 已完成,结果: {result}")
+                    except Exception as result_error:
+                        print(f"任务 {task_id} 获取结果时出错: {result_error}")
             except asyncio.CancelledError:
             except asyncio.CancelledError:
                 print(f"任务 {task_id} 被取消")
                 print(f"任务 {task_id} 被取消")
             except Exception as e:
             except Exception as e:
                 print(f"任务 {task_id} 执行异常: {e}")
                 print(f"任务 {task_id} 执行异常: {e}")
             finally:
             finally:
                 # 清理任务
                 # 清理任务
-                if task_id in stream_tasks:
-                    del stream_tasks[task_id]
-                if task_id in stream_pids:
-                    del stream_pids[task_id]
-                # 清理PID文件
-                pid_file = f"/tmp/yolov12_task_{task_id}.pid"
                 try:
                 try:
-                    if os.path.exists(pid_file):
-                        os.remove(pid_file)
-                except:
-                    pass
-                print(f"已清理任务: {task_id}")
+                    if task_id in stream_tasks:
+                        del stream_tasks[task_id]
+                    if task_id in stream_pids:
+                        del stream_pids[task_id]
+                    # 清理PID文件
+                    pid_file = f"/tmp/yolov12_task_{task_id}.pid"
+                    try:
+                        if os.path.exists(pid_file):
+                            os.remove(pid_file)
+                    except Exception as pid_error:
+                        print(f"清理PID文件 {pid_file} 时出错: {pid_error}")
+                    print(f"已清理任务: {task_id}")
+                except Exception as cleanup_error:
+                    print(f"清理任务 {task_id} 时出错: {cleanup_error}")
         
         
         future.add_done_callback(cleanup_task)
         future.add_done_callback(cleanup_task)
         stream_tasks[task_id] = {
         stream_tasks[task_id] = {
@@ -1013,20 +1074,36 @@ async def get_stream_status(task_id: str):
                         }
                         }
                     }
                     }
                 else:
                 else:
-                    result = task_info["future"].result()
-                    run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
-                    return {
-                        "code": 0, 
-                        "msg": "已完成", 
-                        "result": {
-                            "status": "completed", 
-                            "result": result,
-                            "start_time": task_info["start_time"],
-                            "run_time": round(run_time, 2),
-                            "source": task_info["source"],
-                            "stream_url": task_info["stream_url"]
+                    try:
+                        result = task_info["future"].result(timeout=0.1)  # 100ms超时
+                        run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
+                        return {
+                            "code": 0, 
+                            "msg": "已完成", 
+                            "result": {
+                                "status": "completed", 
+                                "result": result,
+                                "start_time": task_info["start_time"],
+                                "run_time": round(run_time, 2),
+                                "source": task_info["source"],
+                                "stream_url": task_info["stream_url"]
+                            }
+                        }
+                    except Exception as result_error:
+                        logging.warning(f"任务 {task_id} 获取结果时出错: {result_error}")
+                        run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
+                        return {
+                            "code": 1, 
+                            "msg": f"任务异常: {result_error}", 
+                            "result": {
+                                "status": "failed", 
+                                "error": str(result_error),
+                                "start_time": task_info["start_time"],
+                                "run_time": round(run_time, 2),
+                                "source": task_info["source"],
+                                "stream_url": task_info["stream_url"]
+                            }
                         }
                         }
-                    }
             except asyncio.CancelledError:
             except asyncio.CancelledError:
                 run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
                 run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
                 return {
                 return {
@@ -1099,31 +1176,42 @@ async def list_stream_tasks():
     try:
     try:
         tasks_info = []
         tasks_info = []
         for task_id, task_info in stream_tasks.items():
         for task_id, task_info in stream_tasks.items():
-            if task_info["future"].done():
-                try:
-                    if task_info["future"].cancelled():
+            try:
+                if task_info["future"].done():
+                    try:
+                        if task_info["future"].cancelled():
+                            status = "cancelled"
+                        else:
+                            # 添加超时机制,避免卡住
+                            try:
+                                task_info["future"].result(timeout=0.1)  # 100ms超时
+                                status = "completed"
+                            except Exception as result_error:
+                                logging.warning(f"任务 {task_id} 获取结果时出错: {result_error}")
+                                status = "failed"
+                    except asyncio.CancelledError:
                         status = "cancelled"
                         status = "cancelled"
-                    else:
-                        task_info["future"].result()  # 检查是否有异常
-                        status = "completed"
-                except asyncio.CancelledError:
-                    status = "cancelled"
-                except:
-                    status = "failed"
-            else:
-                status = "running"
-            
-            # 计算运行时长
-            run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
-            
-            tasks_info.append({
-                "task_id": task_id,
-                "status": status,
-                "start_time": task_info["start_time"],
-                "run_time": round(run_time, 2),  # 运行时长(秒)
-                "source": task_info["source"],  # 播流地址
-                "stream_url": task_info["stream_url"]  # 推流地址
-            })
+                    except Exception as e:
+                        logging.warning(f"任务 {task_id} 状态检查时出错: {e}")
+                        status = "failed"
+                else:
+                    status = "running"
+                
+                # 计算运行时长
+                run_time = (time.time() * 1000 - task_info["start_time"]) / 1000  # 转换为秒
+                
+                tasks_info.append({
+                    "task_id": task_id,
+                    "status": status,
+                    "start_time": task_info["start_time"],
+                    "run_time": round(run_time, 2),  # 运行时长(秒)
+                    "source": task_info["source"],  # 播流地址
+                    "stream_url": task_info["stream_url"]  # 推流地址
+                })
+            except Exception as task_error:
+                logging.error(f"处理任务 {task_id} 时出错: {task_error}")
+                # 即使单个任务出错,也继续处理其他任务
+                continue
         
         
         return {
         return {
             "code": 0,
             "code": 0,
@@ -1134,6 +1222,61 @@ async def list_stream_tasks():
         logging.error(f"查询所有任务时发生异常: {e}")
         logging.error(f"查询所有任务时发生异常: {e}")
         return {"code": 1, "msg": str(e), "result": None}
         return {"code": 1, "msg": str(e), "result": None}
 
 
+@app_fastapi.get("/yolov12/health")
+async def health_check():
+    """
+    RESTful GET接口:/yolov12/health
+    健康检查接口,返回系统状态信息
+    返回格式:{"code": 0, "msg": "success", "result": {"status": "状态", "tasks_count": 任务数量, "executor_healthy": 进程池状态}}
+    """
+    try:
+        # 清理已完成的任务
+        cleanup_completed_tasks()
+        
+        # 检查进程池状态
+        executor_healthy = check_executor_health()
+        
+        # 统计任务数量
+        running_tasks = 0
+        completed_tasks = 0
+        failed_tasks = 0
+        
+        for task_info in stream_tasks.values():
+            try:
+                if task_info["future"].done():
+                    try:
+                        if task_info["future"].cancelled():
+                            completed_tasks += 1
+                        else:
+                            task_info["future"].result(timeout=0.05)
+                            completed_tasks += 1
+                    except:
+                        failed_tasks += 1
+                else:
+                    running_tasks += 1
+            except:
+                failed_tasks += 1
+        
+        return {
+            "code": 0,
+            "msg": "success",
+            "result": {
+                "status": "healthy" if executor_healthy else "unhealthy",
+                "tasks_count": len(stream_tasks),
+                "running_tasks": running_tasks,
+                "completed_tasks": completed_tasks,
+                "failed_tasks": failed_tasks,
+                "executor_healthy": executor_healthy
+            }
+        }
+    except Exception as e:
+        logging.error(f"健康检查时发生异常: {e}")
+        return {
+            "code": 1,
+            "msg": str(e),
+            "result": None
+        }
+
 # 全局异常处理器:参数校验失败时统一返回格式
 # 全局异常处理器:参数校验失败时统一返回格式
 @app_fastapi.exception_handler(RequestValidationError)
 @app_fastapi.exception_handler(RequestValidationError)
 async def validation_exception_handler(request, exc):
 async def validation_exception_handler(request, exc):