import importlib import time from apscheduler.schedulers.blocking import BlockingScheduler from config import config from database.database import get_session from database.tscheduler.crud import get_tasks_by_executor from utils import logger """ 这是一个特殊的任务,负责管理任务,命名为管理者任务。 工作流程: 1 检索数据库任务数据表 2 检查是否已经在任务队列中,如果不在则添加 任务执行时间间隔为600秒。 """ def log_task_execution(task_name: str, start_time: float, end_time: float = None): """辅助函数,记录任务的开始和结束日志""" start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) if end_time is None: logger.info(f"{task_name} start execute at {start_time_str}") else: elapsed_time = end_time - start_time logger.info(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") def execute_task(task: callable): """执行任务并记录日志""" start_time = time.time() log_task_execution(task.__name__, start_time) # 先记录开始时间 task() end_time = time.time() log_task_execution(task.__name__, start_time, end_time) # 记录结束时间 # 从数据库加载任务 def load_tasks(scheduler: BlockingScheduler): with get_session() as db: tasks = get_tasks_by_executor(db, config.scheduler_name) for task in tasks: module_path = task.module_path function_name = task.function_name trigger = task.trigger interval_seconds = task.interval_seconds task_id = task.id # 动态导入模块和函数 module = importlib.import_module(module_path) task_function = getattr(module, function_name) job = scheduler.get_job(str(task_id)) # 检查任务是否已存在 if not job: if trigger == "interval": scheduler.add_job( task_function, "interval", seconds=interval_seconds, id=str(task_id), replace_existing=True, misfire_grace_time=interval_seconds ) logger.info(f"Task {task.task_name} added with interval {interval_seconds} seconds") elif trigger == "cron": # 解析 cron 表达式的字段 fields = task.cron_expression.split() # 确保字段长度符合七字段格式 if len(fields) != 7: raise ValueError("无效的 Quartz cron 表达式") # 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*` if fields[5] == '?': fields[5] = '*' # 替换 `day_of_week` 字段中的 `?` # 使用 cron 表达式的字段添加任务 scheduler.add_job( task_function, # 要执行的任务 'cron', # 使用 cron 触发器 second=fields[0], # 秒 minute=fields[1], # 分钟 hour=fields[2], # 小时 day=fields[3], # 日期 month=fields[4], # 月份 day_of_week=fields[5], # 星期 year=fields[6], # 年份 id=str(task_id), replace_existing=True ) logger.info(f"Task {task.task_name} added with cron {task.cron_expression}") elif trigger == "date": scheduler.add_job( task_function, "date", run_date=task["run_date_and_time"], id=str(task_id), replace_existing=True ) logger.info(f"Task {task.task_name} added with date {task.execution_date}") else: logger.warning(f"Task Invalid trigger type: {trigger}") else: logger.info(f"Task {task.task_name} already exists......") run_time = job.next_run_time - job.trigger.start_date logger.info(f"Task {task.task_name} already exists, run time is {run_time}") # 管理者任务 def manager_task(scheduler: BlockingScheduler): load_tasks(scheduler)