From 52d5bbc2167bf1b067978c1eb68d6d76e1a29fe8 Mon Sep 17 00:00:00 2001 From: konjacpotato Date: Fri, 14 Nov 2025 21:45:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- arlo.py | 95 ++++++++++++++++++++++--- database/tscheduler/model.py | 2 +- task/manager_task.py | 133 +++++++++++++++++++++++------------ 3 files changed, 173 insertions(+), 57 deletions(-) diff --git a/arlo.py b/arlo.py index bd40ff4..ec96373 100644 --- a/arlo.py +++ b/arlo.py @@ -1,5 +1,16 @@ +"""Entrypoint for the scheduler process. + +This module starts the APScheduler scheduler and ensures a graceful +shutdown on SIGINT/SIGTERM. It also improves job error logging to +include exception tracebacks for easier debugging. +""" + import datetime +import signal +import sys +import traceback from functools import partial +from typing import Any from apscheduler.events import EVENT_JOB_ERROR from apscheduler.schedulers.blocking import BlockingScheduler @@ -9,28 +20,92 @@ from log.log_manager import log, logger from task.manager_task import manager_task -def job_error_listener(event): - if event.exception: - logger.error(f"Job {event.job_id} crashed: {str(event.exception)}") - # 可添加邮件/钉钉告警逻辑 +def _format_exception(exc: BaseException) -> str: + """Return a nicely formatted exception with traceback.""" + return "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) -if __name__ == '__main__': +def job_error_listener(event: Any) -> None: + """Listener for job errors that logs the exception and traceback. + + Uses the APScheduler event object. This is defensive: if an exception + is present it logs its string representation and the full traceback to + the configured logger. Keep this lightweight to avoid raising inside + the listener. + """ + try: + if getattr(event, "exception", None): + exc = event.exception + logger.error(f"Job {getattr(event, 'job_id', '')} crashed: {exc}") + logger.error(_format_exception(exc)) + # 可添加邮件告警逻辑(如果需要且已配置) + except Exception: + # We must not let the listener raise — log and continue. + logger.exception("Exception in job_error_listener") + + +def _validate_interval(value: Any) -> int: + """Return a valid positive integer interval in seconds. + + If the provided value is invalid, returns a safe default (300). + """ + DEFAULT = 300 + try: + iv = int(value) + if iv <= 0: + raise ValueError("interval must be > 0") + return iv + except Exception: + logger.warning( + "Invalid config.scheduler_interval=%r, falling back to %s seconds", + value, + DEFAULT, + ) + return DEFAULT + + +def main() -> None: + """Create and start the scheduler with graceful shutdown handling.""" scheduler = BlockingScheduler() - # 每隔config.scheduler_interval秒执行一次任务,同时设定第一次执行在程序启动后10秒后执行 + + interval_seconds = _validate_interval(getattr(config, "scheduler_interval", None)) + + # 每隔 interval_seconds 秒执行一次任务,同时设定第一次执行在程序启动10秒后执行 scheduler.add_job( partial(manager_task, scheduler), - 'interval', - seconds=config.scheduler_interval, - jitter=30, # 添加随机抖动避免任务雪崩 - next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器 + "interval", + seconds=interval_seconds, + jitter=30, + next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10), ) # 添加任务错误监听器 scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR) + # Graceful shutdown handlers + def _shutdown(signum, frame): + log(f"Received signal {signum}. Shutting down scheduler...") + try: + scheduler.shutdown(wait=False) + except Exception: + logger.exception("Error while shutting down scheduler") + # Ensure process exits after scheduler shutdown + # sys.exit(0) + + signal.signal(signal.SIGINT, _shutdown) + signal.signal(signal.SIGTERM, _shutdown) + try: log("started successfully.") scheduler.start() # 阻塞运行 except (KeyboardInterrupt, SystemExit): log("Shutting down ...") + if scheduler.state == 1: + try: + scheduler.shutdown(wait=False) + except Exception: + logger.exception("Error while shutting down scheduler") + + +if __name__ == "__main__": + main() diff --git a/database/tscheduler/model.py b/database/tscheduler/model.py index 7fb8b6c..7e930d1 100644 --- a/database/tscheduler/model.py +++ b/database/tscheduler/model.py @@ -17,7 +17,7 @@ class TScheduler(Base): task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据') active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用') executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者') - handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序') + description: Optional[str] = Column(String(32), nullable=True, comment='任务描述') last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间') next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间') create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间') diff --git a/task/manager_task.py b/task/manager_task.py index 00882db..537da7b 100644 --- a/task/manager_task.py +++ b/task/manager_task.py @@ -12,13 +12,13 @@ from database.tscheduler.crud import get_tasks_by_executor from log.log_manager import log """ -这是一个特殊的任务,负责管理任务,命名为管理者任务。 +这是一个特殊的任务,负责管理任务,命名为任务管理者任务。 工作流程: -1 检索数据库任务数据表 +1 检索数据库任务数据表t_scheduler 2 检查是否已经在任务队列中,如果不在则添加 -任务执行时间间隔为600秒。 +任务执行时间间隔为{config.scheduler_interval}秒。 """ @@ -26,20 +26,40 @@ from log.log_manager import log def log_task_execution(task_name: str, start_time: float, end_time: float = None): """辅助函数,记录任务的开始和结束日志""" start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) - end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) + # 如果没有提供 end_time,只记录开始日志 if end_time is None: log(f"{task_name} start execute at {start_time_str}") - else: - elapsed_time = end_time - start_time - log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") + return + + # 否则格式化结束时间并记录耗时 + try: + end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) + except Exception: + end_time_str = str(end_time) + elapsed_time = end_time - start_time + log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") def execute_task(task: callable): """执行任务并记录日志""" start_time = time.time() - task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__ + # 获取可读的任务名称 + try: + task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__ + except Exception: + task_name = getattr(task, '__name__', repr(task)) + log_task_execution(task_name, start_time) # 先记录开始时间 - task() + + # 捕获任务执行中的异常,防止调度器崩溃 + try: + task() + except Exception as e: + end_time = time.time() + log(f"{task_name} raised exception: {e}") + log_task_execution(task_name, start_time, end_time) + return + end_time = time.time() log_task_execution(task_name, start_time, end_time) # 记录结束时间 @@ -47,68 +67,89 @@ def execute_task(task: callable): def load_tasks(scheduler: BlockingScheduler): with get_session() as db: tasks = get_tasks_by_executor(db, config.scheduler_name) - for task in tasks: - module_path = task.module_path - function_name = task.function_name - trigger = task.trigger - interval_seconds = task.interval_seconds - task_id = task.id + for db_task in tasks: + module_path = db_task.module_path + function_name = db_task.function_name + trigger = db_task.trigger + interval_seconds = db_task.interval_seconds + task_id = db_task.id - # 动态导入模块和函数 - module = importlib.import_module(module_path) - task_function = partial(execute_task, partial(getattr(module, function_name), task)) + # 动态导入模块和函数,失败则记录并跳过该任务 + try: + module = importlib.import_module(module_path) + func = getattr(module, function_name) + except Exception as e: + log(f"Failed to import {module_path}.{function_name} for task id={task_id}: {e}") + continue - # 检查任务是否已存在 - if not scheduler.get_job(str(task_id)): + # 将数据库任务对象作为参数传入任务函数 + task_callable = partial(func, db_task) + task_function = partial(execute_task, task_callable) + + try: if trigger == "interval": scheduler.add_job( task_function, "interval", - seconds=interval_seconds, + seconds=interval_seconds or 0, id=str(task_id), replace_existing=True ) - log(f"Task {task.task_name} added with interval {interval_seconds} seconds") + log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds") + elif trigger == "cron": - # 解析 cron 表达式的字段 - fields = task.cron_expression.split() - # 确保字段长度符合七字段格式 + fields = (db_task.cron_expression or "").split() if len(fields) != 7: - raise ValueError("无效的 Quartz cron 表达式") - # 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*` + log(f"Invalid cron expression for task id={task_id}: {db_task.cron_expression}") + continue + # 将 Quartz 的 `?` 替换为 APScheduler 可接受的 `*` if fields[5] == '?': - fields[5] = '*' # 替换 `day_of_week` 字段中的 `?` - # 使用 cron 表达式的字段添加任务 + fields[5] = '*' + scheduler.add_job( - task_function, # 要执行的任务 - 'cron', # 使用 cron 触发器 - second=fields[0], # 秒 - minute=fields[1], # 分钟 - hour=fields[2], # 小时 - day=fields[3], # 日期 - month=fields[4], # 月份 - day_of_week=fields[5], # 星期 - year=fields[6], # 年份 + task_function, + 'cron', + second=fields[0], + minute=fields[1], + hour=fields[2], + day=fields[3], + month=fields[4], + day_of_week=fields[5], + year=fields[6], id=str(task_id), replace_existing=True ) - log(f"Task {task.task_name} added with cron {task.cron_expression}") + log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}") + elif trigger == "date": - # 如果task.execution_date为None,则设置为当前时间10秒后 - if task.execution_date is None : - task.execution_date = datetime.now() + timedelta(seconds=10) + run_date = db_task.execution_date + # 如果 execution_date 为 None 或不是 datetime,尝试解析或回退为现在+10s + if run_date is None or not isinstance(run_date, datetime): + try: + # 尝试解析 ISO 格式字符串(如果传入的是字符串) + if isinstance(run_date, str): + run_date = datetime.fromisoformat(run_date) + else: + run_date = datetime.now() + timedelta(seconds=10) + except Exception: + run_date = datetime.now() + timedelta(seconds=10) + scheduler.add_job( task_function, "date", - run_date=task.execution_date, + run_date=run_date, id=str(task_id), replace_existing=True ) - log(f"Task {task.task_name} added with date {task.execution_date}") + log(f"Task {db_task.task_name} added/updated with date {run_date}") + else: - log(f"Invalid trigger type: {trigger}") + log(f"Invalid trigger type for task id={task_id}: {trigger}") + + except Exception as e: + log(f"Failed to schedule task id={task_id}: {e}") -# 管理者任务 +# 任务管理者的任务 def manager_task(scheduler: BlockingScheduler): load_tasks(scheduler)