支持更新task,当task信息改变时
This commit is contained in:
@ -65,6 +65,30 @@ def execute_task(task: callable):
|
|||||||
|
|
||||||
# 从数据库加载任务
|
# 从数据库加载任务
|
||||||
def load_tasks(scheduler: BlockingScheduler):
|
def load_tasks(scheduler: BlockingScheduler):
|
||||||
|
def build_db_signature(db_task) -> str:
|
||||||
|
t = (db_task.trigger or "").lower()
|
||||||
|
if t == "interval":
|
||||||
|
sec = int(db_task.interval_seconds or 0)
|
||||||
|
return f"{db_task.task_name}#interval:seconds={sec}"
|
||||||
|
if t == "cron":
|
||||||
|
expr = (db_task.cron_expression or "").strip()
|
||||||
|
parts = expr.split()
|
||||||
|
# 规范化:确保 7 个字段,替换 Quartz 的 '?' 为 '*'
|
||||||
|
if len(parts) == 7:
|
||||||
|
if parts[5] == '?':
|
||||||
|
parts[5] = '*'
|
||||||
|
expr = " ".join(parts)
|
||||||
|
return f"{db_task.task_name}#cron:{expr}"
|
||||||
|
if t == "date":
|
||||||
|
run_date = db_task.execution_date
|
||||||
|
if isinstance(run_date, datetime):
|
||||||
|
return f"{db_task.task_name}#date:{run_date.isoformat()}"
|
||||||
|
try:
|
||||||
|
return f"{db_task.task_name}#date:{str(run_date)}"
|
||||||
|
except Exception:
|
||||||
|
return f"{db_task.task_name}#date:invalid"
|
||||||
|
return f"{db_task.task_name}#unknown:{t}"
|
||||||
|
|
||||||
with get_session() as db:
|
with get_session() as db:
|
||||||
tasks = get_tasks_by_executor(db, config.scheduler_name)
|
tasks = get_tasks_by_executor(db, config.scheduler_name)
|
||||||
for db_task in tasks:
|
for db_task in tasks:
|
||||||
@ -86,6 +110,15 @@ def load_tasks(scheduler: BlockingScheduler):
|
|||||||
task_callable = partial(func, db_task)
|
task_callable = partial(func, db_task)
|
||||||
task_function = partial(execute_task, task_callable)
|
task_function = partial(execute_task, task_callable)
|
||||||
|
|
||||||
|
# 计算数据库中当前任务的签名
|
||||||
|
signature = build_db_signature(db_task)
|
||||||
|
|
||||||
|
job = scheduler.get_job(str(task_id))
|
||||||
|
# 如果任务已经存在并且签名相同,则跳过更新
|
||||||
|
if job and getattr(job, "name", None) == signature:
|
||||||
|
log(f"Task {db_task.task_name} (id={task_id}) already scheduled and unchanged, skipping.")
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if trigger == "interval":
|
if trigger == "interval":
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
@ -93,7 +126,8 @@ def load_tasks(scheduler: BlockingScheduler):
|
|||||||
"interval",
|
"interval",
|
||||||
seconds=interval_seconds or 0,
|
seconds=interval_seconds or 0,
|
||||||
id=str(task_id),
|
id=str(task_id),
|
||||||
replace_existing=True
|
replace_existing=True,
|
||||||
|
name=signature
|
||||||
)
|
)
|
||||||
log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds")
|
log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds")
|
||||||
|
|
||||||
@ -117,7 +151,8 @@ def load_tasks(scheduler: BlockingScheduler):
|
|||||||
day_of_week=fields[5],
|
day_of_week=fields[5],
|
||||||
year=fields[6],
|
year=fields[6],
|
||||||
id=str(task_id),
|
id=str(task_id),
|
||||||
replace_existing=True
|
replace_existing=True,
|
||||||
|
name=signature
|
||||||
)
|
)
|
||||||
log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}")
|
log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}")
|
||||||
|
|
||||||
@ -139,7 +174,8 @@ def load_tasks(scheduler: BlockingScheduler):
|
|||||||
"date",
|
"date",
|
||||||
run_date=run_date,
|
run_date=run_date,
|
||||||
id=str(task_id),
|
id=str(task_id),
|
||||||
replace_existing=True
|
replace_existing=True,
|
||||||
|
name=signature
|
||||||
)
|
)
|
||||||
log(f"Task {db_task.task_name} added/updated with date {run_date}")
|
log(f"Task {db_task.task_name} added/updated with date {run_date}")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user