|
|
@@ -32,14 +32,14 @@ logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)s - %(
|
|
|
|
|
|
# 创建进程池执行器和任务管理
|
|
|
stream_executor = ProcessPoolExecutor(max_workers=4)
|
|
|
-stream_tasks: Dict[str, Future] = {}
|
|
|
+stream_tasks: Dict[str, Dict] = {} # 存储任务信息:{task_id: {"future": Future, "start_time": long, "source": str, "stream_url": str}}
|
|
|
stream_pids: Dict[str, int] = {} # 记录每个任务的worker进程pid
|
|
|
|
|
|
def cleanup_completed_tasks():
|
|
|
"""清理已完成的任务"""
|
|
|
completed_tasks = []
|
|
|
- for task_id, future in stream_tasks.items():
|
|
|
- if future.done():
|
|
|
+ for task_id, task_info in stream_tasks.items():
|
|
|
+ if task_info["future"].done():
|
|
|
completed_tasks.append(task_id)
|
|
|
|
|
|
for task_id in completed_tasks:
|
|
|
@@ -801,7 +801,12 @@ async def yolov12_stream_async(params: StreamParams):
|
|
|
print(f"已清理任务: {task_id}")
|
|
|
|
|
|
future.add_done_callback(cleanup_task)
|
|
|
- stream_tasks[task_id] = future
|
|
|
+ stream_tasks[task_id] = {
|
|
|
+ "future": future,
|
|
|
+ "start_time": int(time.time() * 1000), # 毫秒级时间戳,long类型
|
|
|
+ "source": params.source,
|
|
|
+ "stream_url": params.stream_url
|
|
|
+ }
|
|
|
|
|
|
logging.info(f"任务 {task_id} 已提交到进程池")
|
|
|
|
|
|
@@ -831,16 +836,16 @@ async def cancel_stream_task(task_id: str):
|
|
|
# 先清理已完成的任务
|
|
|
cleanup_completed_tasks()
|
|
|
|
|
|
- future = stream_tasks.get(task_id)
|
|
|
- if not future:
|
|
|
+ task_info = stream_tasks.get(task_id)
|
|
|
+ if not task_info:
|
|
|
return {"code": 1, "msg": "任务不存在", "result": None}
|
|
|
|
|
|
- if future.done():
|
|
|
+ if task_info["future"].done():
|
|
|
return {"code": 1, "msg": "任务已完成,无法取消", "result": None}
|
|
|
|
|
|
try:
|
|
|
# 尝试取消任务
|
|
|
- cancelled = future.cancel()
|
|
|
+ cancelled = task_info["future"].cancel()
|
|
|
if cancelled:
|
|
|
logging.info(f"任务 {task_id} 已取消")
|
|
|
# 取消后立即清理
|
|
|
@@ -867,36 +872,66 @@ async def get_stream_status(task_id: str):
|
|
|
# 先清理已完成的任务
|
|
|
cleanup_completed_tasks()
|
|
|
|
|
|
- future = stream_tasks.get(task_id)
|
|
|
- if not future:
|
|
|
+ task_info = stream_tasks.get(task_id)
|
|
|
+ if not task_info:
|
|
|
return {"code": 1, "msg": "任务不存在", "result": None}
|
|
|
|
|
|
try:
|
|
|
- if future.done():
|
|
|
+ if task_info["future"].done():
|
|
|
try:
|
|
|
- result = future.result()
|
|
|
+ result = task_info["future"].result()
|
|
|
+ run_time = (time.time() * 1000 - task_info["start_time"]) / 1000 # 转换为秒
|
|
|
return {
|
|
|
"code": 0,
|
|
|
"msg": "已完成",
|
|
|
- "result": {"status": "completed", "result": result}
|
|
|
+ "result": {
|
|
|
+ "status": "completed",
|
|
|
+ "result": result,
|
|
|
+ "start_time": task_info["start_time"],
|
|
|
+ "run_time": round(run_time, 2),
|
|
|
+ "source": task_info["source"],
|
|
|
+ "stream_url": task_info["stream_url"]
|
|
|
+ }
|
|
|
}
|
|
|
except Exception as e:
|
|
|
+ run_time = (time.time() * 1000 - task_info["start_time"]) / 1000 # 转换为秒
|
|
|
return {
|
|
|
"code": 1,
|
|
|
"msg": f"任务异常: {e}",
|
|
|
- "result": {"status": "failed", "error": str(e)}
|
|
|
+ "result": {
|
|
|
+ "status": "failed",
|
|
|
+ "error": str(e),
|
|
|
+ "start_time": task_info["start_time"],
|
|
|
+ "run_time": round(run_time, 2),
|
|
|
+ "source": task_info["source"],
|
|
|
+ "stream_url": task_info["stream_url"]
|
|
|
+ }
|
|
|
}
|
|
|
- elif future.cancelled():
|
|
|
+ elif task_info["future"].cancelled():
|
|
|
+ run_time = (time.time() * 1000 - task_info["start_time"]) / 1000 # 转换为秒
|
|
|
return {
|
|
|
"code": 0,
|
|
|
"msg": "已取消",
|
|
|
- "result": {"status": "cancelled"}
|
|
|
+ "result": {
|
|
|
+ "status": "cancelled",
|
|
|
+ "start_time": task_info["start_time"],
|
|
|
+ "run_time": round(run_time, 2),
|
|
|
+ "source": task_info["source"],
|
|
|
+ "stream_url": task_info["stream_url"]
|
|
|
+ }
|
|
|
}
|
|
|
else:
|
|
|
+ run_time = (time.time() * 1000 - task_info["start_time"]) / 1000 # 转换为秒
|
|
|
return {
|
|
|
"code": 0,
|
|
|
"msg": "运行中",
|
|
|
- "result": {"status": "running"}
|
|
|
+ "result": {
|
|
|
+ "status": "running",
|
|
|
+ "start_time": task_info["start_time"],
|
|
|
+ "run_time": round(run_time, 2),
|
|
|
+ "source": task_info["source"],
|
|
|
+ "stream_url": task_info["stream_url"]
|
|
|
+ }
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logging.error(f"查询任务状态时发生异常: {e}")
|
|
|
@@ -916,22 +951,29 @@ async def list_stream_tasks():
|
|
|
|
|
|
try:
|
|
|
tasks_info = []
|
|
|
- for task_id, future in stream_tasks.items():
|
|
|
- if future.done():
|
|
|
- if future.cancelled():
|
|
|
+ for task_id, task_info in stream_tasks.items():
|
|
|
+ if task_info["future"].done():
|
|
|
+ if task_info["future"].cancelled():
|
|
|
status = "cancelled"
|
|
|
else:
|
|
|
try:
|
|
|
- future.result() # 检查是否有异常
|
|
|
+ task_info["future"].result() # 检查是否有异常
|
|
|
status = "completed"
|
|
|
except:
|
|
|
status = "failed"
|
|
|
else:
|
|
|
status = "running"
|
|
|
|
|
|
+ # 计算运行时长
|
|
|
+ run_time = (time.time() * 1000 - task_info["start_time"]) / 1000 # 转换为秒
|
|
|
+
|
|
|
tasks_info.append({
|
|
|
"task_id": task_id,
|
|
|
- "status": status
|
|
|
+ "status": status,
|
|
|
+ "start_time": task_info["start_time"],
|
|
|
+ "run_time": round(run_time, 2), # 运行时长(秒)
|
|
|
+ "source": task_info["source"], # 播流地址
|
|
|
+ "stream_url": task_info["stream_url"] # 推流地址
|
|
|
})
|
|
|
|
|
|
return {
|