113 lines
4.6 KiB
Python
113 lines
4.6 KiB
Python
import importlib
|
|
import time
|
|
|
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
|
|
|
from config import config
|
|
from database.database import get_session
|
|
from database.tscheduler.crud import get_tasks_by_executor
|
|
from log.log_manager import log
|
|
|
|
"""
|
|
这是一个特殊的任务,负责管理任务,命名为管理者任务。
|
|
|
|
工作流程:
|
|
1 检索数据库任务数据表
|
|
2 检查是否已经在任务队列中,如果不在则添加
|
|
|
|
任务执行时间间隔为600秒。
|
|
|
|
"""
|
|
|
|
def log_task_execution(task_name: str, start_time: float, end_time: float = None):
|
|
"""辅助函数,记录任务的开始和结束日志"""
|
|
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
|
|
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
|
|
if end_time is None:
|
|
log(f"{task_name} start execute at {start_time_str}")
|
|
else:
|
|
elapsed_time = end_time - start_time
|
|
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
|
|
|
|
|
|
def execute_task(task: callable):
|
|
"""执行任务并记录日志"""
|
|
start_time = time.time()
|
|
log_task_execution(task.__name__, start_time) # 先记录开始时间
|
|
task()
|
|
end_time = time.time()
|
|
log_task_execution(task.__name__, start_time, end_time) # 记录结束时间
|
|
|
|
# 从数据库加载任务
|
|
def load_tasks(scheduler: BlockingScheduler):
|
|
with get_session() as db:
|
|
tasks = get_tasks_by_executor(db, config.scheduler_name)
|
|
|
|
for task in tasks:
|
|
module_path = task.module_path
|
|
function_name = task.function_name
|
|
trigger = task.trigger
|
|
interval_seconds = task.interval_seconds
|
|
task_id = task.id
|
|
|
|
# 动态导入模块和函数
|
|
module = importlib.import_module(module_path)
|
|
task_function = getattr(module, function_name)
|
|
|
|
job = scheduler.get_job(str(task_id))
|
|
# 检查任务是否已存在
|
|
if not job:
|
|
if trigger == "interval":
|
|
scheduler.add_job(
|
|
task_function,
|
|
"interval",
|
|
seconds=interval_seconds,
|
|
id=str(task_id),
|
|
replace_existing=True,
|
|
misfire_grace_time=interval_seconds
|
|
)
|
|
log(f"Task {task.task_name} added with interval {interval_seconds} seconds")
|
|
elif trigger == "cron":
|
|
# 解析 cron 表达式的字段
|
|
fields = task.cron_expression.split()
|
|
# 确保字段长度符合七字段格式
|
|
if len(fields) != 7:
|
|
raise ValueError("无效的 Quartz cron 表达式")
|
|
# 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*`
|
|
if fields[5] == '?':
|
|
fields[5] = '*' # 替换 `day_of_week` 字段中的 `?`
|
|
# 使用 cron 表达式的字段添加任务
|
|
scheduler.add_job(
|
|
task_function, # 要执行的任务
|
|
'cron', # 使用 cron 触发器
|
|
second=fields[0], # 秒
|
|
minute=fields[1], # 分钟
|
|
hour=fields[2], # 小时
|
|
day=fields[3], # 日期
|
|
month=fields[4], # 月份
|
|
day_of_week=fields[5], # 星期
|
|
year=fields[6], # 年份
|
|
id=str(task_id),
|
|
replace_existing=True
|
|
)
|
|
log(f"Task {task.task_name} added with cron {task.cron_expression}")
|
|
elif trigger == "date":
|
|
scheduler.add_job(
|
|
task_function,
|
|
"date",
|
|
run_date=task["run_date_and_time"],
|
|
id=str(task_id),
|
|
replace_existing=True
|
|
)
|
|
log(f"Task {task.task_name} added with date {task.execution_date}")
|
|
else:
|
|
log(f"Task Invalid trigger type: {trigger}")
|
|
else:
|
|
log(f"Task {task.task_name} already exists......")
|
|
run_time = job.next_run_time - job.trigger.start_date
|
|
log(f"Task {task.task_name} already exists, run time is {run_time}")
|
|
|
|
# 管理者任务
|
|
def manager_task(scheduler: BlockingScheduler):
|
|
load_tasks(scheduler)
|