This commit is contained in:
konjacpotato
2025-11-14 21:45:00 +08:00
parent fd2a3171ad
commit 52d5bbc216
3 changed files with 173 additions and 57 deletions

95
arlo.py
View File

@ -1,5 +1,16 @@
"""Entrypoint for the scheduler process.
This module starts the APScheduler scheduler and ensures a graceful
shutdown on SIGINT/SIGTERM. It also improves job error logging to
include exception tracebacks for easier debugging.
"""
import datetime import datetime
import signal
import sys
import traceback
from functools import partial from functools import partial
from typing import Any
from apscheduler.events import EVENT_JOB_ERROR from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.blocking import BlockingScheduler
@ -9,28 +20,92 @@ from log.log_manager import log, logger
from task.manager_task import manager_task from task.manager_task import manager_task
def job_error_listener(event): def _format_exception(exc: BaseException) -> str:
if event.exception: """Return a nicely formatted exception with traceback."""
logger.error(f"Job {event.job_id} crashed: {str(event.exception)}") return "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
# 可添加邮件/钉钉告警逻辑
if __name__ == '__main__': def job_error_listener(event: Any) -> None:
"""Listener for job errors that logs the exception and traceback.
Uses the APScheduler event object. This is defensive: if an exception
is present it logs its string representation and the full traceback to
the configured logger. Keep this lightweight to avoid raising inside
the listener.
"""
try:
if getattr(event, "exception", None):
exc = event.exception
logger.error(f"Job {getattr(event, 'job_id', '<unknown>')} crashed: {exc}")
logger.error(_format_exception(exc))
# 可添加邮件告警逻辑(如果需要且已配置)
except Exception:
# We must not let the listener raise — log and continue.
logger.exception("Exception in job_error_listener")
def _validate_interval(value: Any) -> int:
"""Return a valid positive integer interval in seconds.
If the provided value is invalid, returns a safe default (300).
"""
DEFAULT = 300
try:
iv = int(value)
if iv <= 0:
raise ValueError("interval must be > 0")
return iv
except Exception:
logger.warning(
"Invalid config.scheduler_interval=%r, falling back to %s seconds",
value,
DEFAULT,
)
return DEFAULT
def main() -> None:
"""Create and start the scheduler with graceful shutdown handling."""
scheduler = BlockingScheduler() scheduler = BlockingScheduler()
# 每隔config.scheduler_interval秒执行一次任务同时设定第一次执行在程序启动后10秒后执行
interval_seconds = _validate_interval(getattr(config, "scheduler_interval", None))
# 每隔 interval_seconds 秒执行一次任务同时设定第一次执行在程序启动10秒后执行
scheduler.add_job( scheduler.add_job(
partial(manager_task, scheduler), partial(manager_task, scheduler),
'interval', "interval",
seconds=config.scheduler_interval, seconds=interval_seconds,
jitter=30, # 添加随机抖动避免任务雪崩 jitter=30,
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器 next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10),
) )
# 添加任务错误监听器 # 添加任务错误监听器
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR) scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
# Graceful shutdown handlers
def _shutdown(signum, frame):
log(f"Received signal {signum}. Shutting down scheduler...")
try:
scheduler.shutdown(wait=False)
except Exception:
logger.exception("Error while shutting down scheduler")
# Ensure process exits after scheduler shutdown
# sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
try: try:
log("started successfully.") log("started successfully.")
scheduler.start() # 阻塞运行 scheduler.start() # 阻塞运行
except (KeyboardInterrupt, SystemExit): except (KeyboardInterrupt, SystemExit):
log("Shutting down ...") log("Shutting down ...")
if scheduler.state == 1:
try:
scheduler.shutdown(wait=False)
except Exception:
logger.exception("Error while shutting down scheduler")
if __name__ == "__main__":
main()

View File

@ -17,7 +17,7 @@ class TScheduler(Base):
task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据') task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据')
active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用') active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用')
executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者') executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者')
handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序') description: Optional[str] = Column(String(32), nullable=True, comment='任务描述')
last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间') last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间')
next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间') next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间')
create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间') create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间')

View File

@ -12,13 +12,13 @@ from database.tscheduler.crud import get_tasks_by_executor
from log.log_manager import log from log.log_manager import log
""" """
这是一个特殊的任务,负责管理任务,命名为管理者任务。 这是一个特殊的任务,负责管理任务,命名为任务管理者任务。
工作流程: 工作流程:
1 检索数据库任务数据表 1 检索数据库任务数据表t_scheduler
2 检查是否已经在任务队列中,如果不在则添加 2 检查是否已经在任务队列中,如果不在则添加
任务执行时间间隔为600秒。 任务执行时间间隔为{config.scheduler_interval}秒。
""" """
@ -26,20 +26,40 @@ from log.log_manager import log
def log_task_execution(task_name: str, start_time: float, end_time: float = None): def log_task_execution(task_name: str, start_time: float, end_time: float = None):
"""辅助函数,记录任务的开始和结束日志""" """辅助函数,记录任务的开始和结束日志"""
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) # 如果没有提供 end_time只记录开始日志
if end_time is None: if end_time is None:
log(f"{task_name} start execute at {start_time_str}") log(f"{task_name} start execute at {start_time_str}")
else: return
elapsed_time = end_time - start_time
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") # 否则格式化结束时间并记录耗时
try:
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
except Exception:
end_time_str = str(end_time)
elapsed_time = end_time - start_time
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
def execute_task(task: callable): def execute_task(task: callable):
"""执行任务并记录日志""" """执行任务并记录日志"""
start_time = time.time() start_time = time.time()
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__ # 获取可读的任务名称
try:
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
except Exception:
task_name = getattr(task, '__name__', repr(task))
log_task_execution(task_name, start_time) # 先记录开始时间 log_task_execution(task_name, start_time) # 先记录开始时间
task()
# 捕获任务执行中的异常,防止调度器崩溃
try:
task()
except Exception as e:
end_time = time.time()
log(f"{task_name} raised exception: {e}")
log_task_execution(task_name, start_time, end_time)
return
end_time = time.time() end_time = time.time()
log_task_execution(task_name, start_time, end_time) # 记录结束时间 log_task_execution(task_name, start_time, end_time) # 记录结束时间
@ -47,68 +67,89 @@ def execute_task(task: callable):
def load_tasks(scheduler: BlockingScheduler): def load_tasks(scheduler: BlockingScheduler):
with get_session() as db: with get_session() as db:
tasks = get_tasks_by_executor(db, config.scheduler_name) tasks = get_tasks_by_executor(db, config.scheduler_name)
for task in tasks: for db_task in tasks:
module_path = task.module_path module_path = db_task.module_path
function_name = task.function_name function_name = db_task.function_name
trigger = task.trigger trigger = db_task.trigger
interval_seconds = task.interval_seconds interval_seconds = db_task.interval_seconds
task_id = task.id task_id = db_task.id
# 动态导入模块和函数 # 动态导入模块和函数,失败则记录并跳过该任务
module = importlib.import_module(module_path) try:
task_function = partial(execute_task, partial(getattr(module, function_name), task)) module = importlib.import_module(module_path)
func = getattr(module, function_name)
except Exception as e:
log(f"Failed to import {module_path}.{function_name} for task id={task_id}: {e}")
continue
# 检查任务是否已存在 # 将数据库任务对象作为参数传入任务函数
if not scheduler.get_job(str(task_id)): task_callable = partial(func, db_task)
task_function = partial(execute_task, task_callable)
try:
if trigger == "interval": if trigger == "interval":
scheduler.add_job( scheduler.add_job(
task_function, task_function,
"interval", "interval",
seconds=interval_seconds, seconds=interval_seconds or 0,
id=str(task_id), id=str(task_id),
replace_existing=True replace_existing=True
) )
log(f"Task {task.task_name} added with interval {interval_seconds} seconds") log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds")
elif trigger == "cron": elif trigger == "cron":
# 解析 cron 表达式的字段 fields = (db_task.cron_expression or "").split()
fields = task.cron_expression.split()
# 确保字段长度符合七字段格式
if len(fields) != 7: if len(fields) != 7:
raise ValueError("无效的 Quartz cron 表达式") log(f"Invalid cron expression for task id={task_id}: {db_task.cron_expression}")
# 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*` continue
# 将 Quartz 的 `?` 替换为 APScheduler 可接受的 `*`
if fields[5] == '?': if fields[5] == '?':
fields[5] = '*' # 替换 `day_of_week` 字段中的 `?` fields[5] = '*'
# 使用 cron 表达式的字段添加任务
scheduler.add_job( scheduler.add_job(
task_function, # 要执行的任务 task_function,
'cron', # 使用 cron 触发器 'cron',
second=fields[0], # 秒 second=fields[0],
minute=fields[1], # 分钟 minute=fields[1],
hour=fields[2], # 小时 hour=fields[2],
day=fields[3], # 日期 day=fields[3],
month=fields[4], # 月份 month=fields[4],
day_of_week=fields[5], # 星期 day_of_week=fields[5],
year=fields[6], # 年份 year=fields[6],
id=str(task_id), id=str(task_id),
replace_existing=True replace_existing=True
) )
log(f"Task {task.task_name} added with cron {task.cron_expression}") log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}")
elif trigger == "date": elif trigger == "date":
# 如果task.execution_date为None则设置为当前时间10秒后 run_date = db_task.execution_date
if task.execution_date is None : # 如果 execution_date 为 None 或不是 datetime尝试解析或回退为现在+10s
task.execution_date = datetime.now() + timedelta(seconds=10) if run_date is None or not isinstance(run_date, datetime):
try:
# 尝试解析 ISO 格式字符串(如果传入的是字符串)
if isinstance(run_date, str):
run_date = datetime.fromisoformat(run_date)
else:
run_date = datetime.now() + timedelta(seconds=10)
except Exception:
run_date = datetime.now() + timedelta(seconds=10)
scheduler.add_job( scheduler.add_job(
task_function, task_function,
"date", "date",
run_date=task.execution_date, run_date=run_date,
id=str(task_id), id=str(task_id),
replace_existing=True replace_existing=True
) )
log(f"Task {task.task_name} added with date {task.execution_date}") log(f"Task {db_task.task_name} added/updated with date {run_date}")
else: else:
log(f"Invalid trigger type: {trigger}") log(f"Invalid trigger type for task id={task_id}: {trigger}")
except Exception as e:
log(f"Failed to schedule task id={task_id}: {e}")
# 管理者任务 # 任务管理者任务
def manager_task(scheduler: BlockingScheduler): def manager_task(scheduler: BlockingScheduler):
load_tasks(scheduler) load_tasks(scheduler)