Files
arlo/task/manager_task.py
2025-11-18 20:23:04 +08:00

192 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import functools
import importlib
import time
from datetime import datetime, timedelta
from functools import partial
from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from database.database import get_session
from database.tscheduler.crud import get_tasks_by_executor
from log.log_manager import log
"""
这是一个特殊的任务,负责管理任务,命名为任务管理者任务。
工作流程:
1 检索数据库任务数据表t_scheduler
2 检查是否已经在任务队列中,如果不在则添加
任务执行时间间隔为{config.scheduler_interval}秒。
"""
def log_task_execution(task_name: str, start_time: float, end_time: float = None):
"""辅助函数,记录任务的开始和结束日志"""
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
# 如果没有提供 end_time只记录开始日志
if end_time is None:
log(f"{task_name} start execute at {start_time_str}")
return
# 否则格式化结束时间并记录耗时
try:
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
except Exception:
end_time_str = str(end_time)
elapsed_time = end_time - start_time
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
def execute_task(task: callable):
"""执行任务并记录日志"""
start_time = time.time()
# 获取可读的任务名称
try:
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
except Exception:
task_name = getattr(task, '__name__', repr(task))
log_task_execution(task_name, start_time) # 先记录开始时间
# 捕获任务执行中的异常,防止调度器崩溃
try:
task()
except Exception as e:
end_time = time.time()
log(f"{task_name} raised exception: {e}")
log_task_execution(task_name, start_time, end_time)
return
end_time = time.time()
log_task_execution(task_name, start_time, end_time) # 记录结束时间
# 从数据库加载任务
def load_tasks(scheduler: BlockingScheduler):
def build_db_signature(db_task) -> str:
t = (db_task.trigger or "").lower()
if t == "interval":
sec = int(db_task.interval_seconds or 0)
return f"{db_task.task_name}#interval:seconds={sec}"
if t == "cron":
expr = (db_task.cron_expression or "").strip()
parts = expr.split()
# 规范化:确保 7 个字段,替换 Quartz 的 '?' 为 '*'
if len(parts) == 7:
if parts[5] == '?':
parts[5] = '*'
expr = " ".join(parts)
return f"{db_task.task_name}#cron:{expr}"
if t == "date":
run_date = db_task.execution_date
if isinstance(run_date, datetime):
return f"{db_task.task_name}#date:{run_date.isoformat()}"
try:
return f"{db_task.task_name}#date:{str(run_date)}"
except Exception:
return f"{db_task.task_name}#date:invalid"
return f"{db_task.task_name}#unknown:{t}"
with get_session() as db:
tasks = get_tasks_by_executor(db, config.scheduler_name)
for db_task in tasks:
module_path = db_task.module_path
function_name = db_task.function_name
trigger = db_task.trigger
interval_seconds = db_task.interval_seconds
task_id = db_task.id
# 动态导入模块和函数,失败则记录并跳过该任务
try:
module = importlib.import_module(module_path)
func = getattr(module, function_name)
except Exception as e:
log(f"Failed to import {module_path}.{function_name} for task id={task_id}: {e}")
continue
# 将数据库任务对象作为参数传入任务函数
task_callable = partial(func, db_task)
task_function = partial(execute_task, task_callable)
# 计算数据库中当前任务的签名
signature = build_db_signature(db_task)
job = scheduler.get_job(str(task_id))
# 如果任务已经存在并且签名相同,则跳过更新
if job and getattr(job, "name", None) == signature:
log(f"Task {db_task.task_name} (id={task_id}) already scheduled and unchanged, skipping.")
continue
try:
if trigger == "interval":
scheduler.add_job(
task_function,
"interval",
seconds=interval_seconds or 0,
id=str(task_id),
replace_existing=True,
name=signature
)
log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds")
elif trigger == "cron":
fields = (db_task.cron_expression or "").split()
if len(fields) != 7:
log(f"Invalid cron expression for task id={task_id}: {db_task.cron_expression}")
continue
# 将 Quartz 的 `?` 替换为 APScheduler 可接受的 `*`
if fields[5] == '?':
fields[5] = '*'
scheduler.add_job(
task_function,
'cron',
second=fields[0],
minute=fields[1],
hour=fields[2],
day=fields[3],
month=fields[4],
day_of_week=fields[5],
year=fields[6],
id=str(task_id),
replace_existing=True,
name=signature
)
log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}")
elif trigger == "date":
run_date = db_task.execution_date
# 如果 execution_date 为 None 或不是 datetime尝试解析或回退为现在+10s
if run_date is None or not isinstance(run_date, datetime):
try:
# 尝试解析 ISO 格式字符串(如果传入的是字符串)
if isinstance(run_date, str):
run_date = datetime.fromisoformat(run_date)
else:
run_date = datetime.now() + timedelta(seconds=10)
except Exception:
run_date = datetime.now() + timedelta(seconds=10)
scheduler.add_job(
task_function,
"date",
run_date=run_date,
id=str(task_id),
replace_existing=True,
name=signature
)
log(f"Task {db_task.task_name} added/updated with date {run_date}")
else:
log(f"Invalid trigger type for task id={task_id}: {trigger}")
except Exception as e:
log(f"Failed to schedule task id={task_id}: {e}")
# 任务管理者的任务
def manager_task(scheduler: BlockingScheduler):
load_tasks(scheduler)