diff --git a/task/manager_task.py b/task/manager_task.py index 537da7b..08917e6 100644 --- a/task/manager_task.py +++ b/task/manager_task.py @@ -65,6 +65,30 @@ def execute_task(task: callable): # 从数据库加载任务 def load_tasks(scheduler: BlockingScheduler): + def build_db_signature(db_task) -> str: + t = (db_task.trigger or "").lower() + if t == "interval": + sec = int(db_task.interval_seconds or 0) + return f"{db_task.task_name}#interval:seconds={sec}" + if t == "cron": + expr = (db_task.cron_expression or "").strip() + parts = expr.split() + # 规范化:确保 7 个字段,替换 Quartz 的 '?' 为 '*' + if len(parts) == 7: + if parts[5] == '?': + parts[5] = '*' + expr = " ".join(parts) + return f"{db_task.task_name}#cron:{expr}" + if t == "date": + run_date = db_task.execution_date + if isinstance(run_date, datetime): + return f"{db_task.task_name}#date:{run_date.isoformat()}" + try: + return f"{db_task.task_name}#date:{str(run_date)}" + except Exception: + return f"{db_task.task_name}#date:invalid" + return f"{db_task.task_name}#unknown:{t}" + with get_session() as db: tasks = get_tasks_by_executor(db, config.scheduler_name) for db_task in tasks: @@ -86,6 +110,15 @@ def load_tasks(scheduler: BlockingScheduler): task_callable = partial(func, db_task) task_function = partial(execute_task, task_callable) + # 计算数据库中当前任务的签名 + signature = build_db_signature(db_task) + + job = scheduler.get_job(str(task_id)) + # 如果任务已经存在并且签名相同,则跳过更新 + if job and getattr(job, "name", None) == signature: + log(f"Task {db_task.task_name} (id={task_id}) already scheduled and unchanged, skipping.") + continue + try: if trigger == "interval": scheduler.add_job( @@ -93,7 +126,8 @@ def load_tasks(scheduler: BlockingScheduler): "interval", seconds=interval_seconds or 0, id=str(task_id), - replace_existing=True + replace_existing=True, + name=signature ) log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds") @@ -117,7 +151,8 @@ def load_tasks(scheduler: BlockingScheduler): day_of_week=fields[5], year=fields[6], id=str(task_id), - replace_existing=True + replace_existing=True, + name=signature ) log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}") @@ -139,7 +174,8 @@ def load_tasks(scheduler: BlockingScheduler): "date", run_date=run_date, id=str(task_id), - replace_existing=True + replace_existing=True, + name=signature ) log(f"Task {db_task.task_name} added/updated with date {run_date}")