import functools import importlib import time from datetime import datetime, timedelta from functools import partial from apscheduler.schedulers.blocking import BlockingScheduler from config import config from database.database import get_session from database.tscheduler.crud import get_tasks_by_executor from log.log_manager import log """ 这是一个特殊的任务,负责管理任务,命名为任务管理者任务。 工作流程: 1 检索数据库任务数据表t_scheduler 2 检查是否已经在任务队列中,如果不在则添加 任务执行时间间隔为{config.scheduler_interval}秒。 """ def log_task_execution(task_name: str, start_time: float, end_time: float = None): """辅助函数,记录任务的开始和结束日志""" start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) # 如果没有提供 end_time,只记录开始日志 if end_time is None: log(f"{task_name} start execute at {start_time_str}") return # 否则格式化结束时间并记录耗时 try: end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) except Exception: end_time_str = str(end_time) elapsed_time = end_time - start_time log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") def execute_task(task: callable): """执行任务并记录日志""" start_time = time.time() # 获取可读的任务名称 try: task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__ except Exception: task_name = getattr(task, '__name__', repr(task)) log_task_execution(task_name, start_time) # 先记录开始时间 # 捕获任务执行中的异常,防止调度器崩溃 try: task() except Exception as e: end_time = time.time() log(f"{task_name} raised exception: {e}") log_task_execution(task_name, start_time, end_time) return end_time = time.time() log_task_execution(task_name, start_time, end_time) # 记录结束时间 # 从数据库加载任务 def load_tasks(scheduler: BlockingScheduler): def build_db_signature(db_task) -> str: t = (db_task.trigger or "").lower() if t == "interval": sec = int(db_task.interval_seconds or 0) return f"{db_task.task_name}#interval:seconds={sec}" if t == "cron": expr = (db_task.cron_expression or "").strip() parts = expr.split() # 规范化:确保 7 个字段,替换 Quartz 的 '?' 为 '*' if len(parts) == 7: if parts[5] == '?': parts[5] = '*' expr = " ".join(parts) return f"{db_task.task_name}#cron:{expr}" if t == "date": run_date = db_task.execution_date if isinstance(run_date, datetime): return f"{db_task.task_name}#date:{run_date.isoformat()}" try: return f"{db_task.task_name}#date:{str(run_date)}" except Exception: return f"{db_task.task_name}#date:invalid" return f"{db_task.task_name}#unknown:{t}" with get_session() as db: tasks = get_tasks_by_executor(db, config.scheduler_name) for db_task in tasks: module_path = db_task.module_path function_name = db_task.function_name trigger = db_task.trigger interval_seconds = db_task.interval_seconds task_id = db_task.id # 动态导入模块和函数,失败则记录并跳过该任务 try: module = importlib.import_module(module_path) func = getattr(module, function_name) except Exception as e: log(f"Failed to import {module_path}.{function_name} for task id={task_id}: {e}") continue # 将数据库任务对象作为参数传入任务函数 task_callable = partial(func, db_task) task_function = partial(execute_task, task_callable) # 计算数据库中当前任务的签名 signature = build_db_signature(db_task) job = scheduler.get_job(str(task_id)) # 如果任务已经存在并且签名相同,则跳过更新 if job and getattr(job, "name", None) == signature: log(f"Task {db_task.task_name} (id={task_id}) already scheduled and unchanged, skipping.") continue try: if trigger == "interval": scheduler.add_job( task_function, "interval", seconds=interval_seconds or 0, id=str(task_id), replace_existing=True, name=signature ) log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds") elif trigger == "cron": fields = (db_task.cron_expression or "").split() if len(fields) != 7: log(f"Invalid cron expression for task id={task_id}: {db_task.cron_expression}") continue # 将 Quartz 的 `?` 替换为 APScheduler 可接受的 `*` if fields[5] == '?': fields[5] = '*' scheduler.add_job( task_function, 'cron', second=fields[0], minute=fields[1], hour=fields[2], day=fields[3], month=fields[4], day_of_week=fields[5], year=fields[6], id=str(task_id), replace_existing=True, name=signature ) log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}") elif trigger == "date": run_date = db_task.execution_date # 如果 execution_date 为 None 或不是 datetime,尝试解析或回退为现在+10s if run_date is None or not isinstance(run_date, datetime): try: # 尝试解析 ISO 格式字符串(如果传入的是字符串) if isinstance(run_date, str): run_date = datetime.fromisoformat(run_date) else: run_date = datetime.now() + timedelta(seconds=10) except Exception: run_date = datetime.now() + timedelta(seconds=10) scheduler.add_job( task_function, "date", run_date=run_date, id=str(task_id), replace_existing=True, name=signature ) log(f"Task {db_task.task_name} added/updated with date {run_date}") else: log(f"Invalid trigger type for task id={task_id}: {trigger}") except Exception as e: log(f"Failed to schedule task id={task_id}: {e}") # 任务管理者的任务 def manager_task(scheduler: BlockingScheduler): load_tasks(scheduler)