Compare commits
2 Commits
fd2a3171ad
...
ca39d1f891
| Author | SHA1 | Date | |
|---|---|---|---|
| ca39d1f891 | |||
| 52d5bbc216 |
95
arlo.py
95
arlo.py
@ -1,5 +1,16 @@
|
|||||||
|
"""Entrypoint for the scheduler process.
|
||||||
|
|
||||||
|
This module starts the APScheduler scheduler and ensures a graceful
|
||||||
|
shutdown on SIGINT/SIGTERM. It also improves job error logging to
|
||||||
|
include exception tracebacks for easier debugging.
|
||||||
|
"""
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from apscheduler.events import EVENT_JOB_ERROR
|
from apscheduler.events import EVENT_JOB_ERROR
|
||||||
from apscheduler.schedulers.blocking import BlockingScheduler
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
||||||
@ -9,28 +20,92 @@ from log.log_manager import log, logger
|
|||||||
from task.manager_task import manager_task
|
from task.manager_task import manager_task
|
||||||
|
|
||||||
|
|
||||||
def job_error_listener(event):
|
def _format_exception(exc: BaseException) -> str:
|
||||||
if event.exception:
|
"""Return a nicely formatted exception with traceback."""
|
||||||
logger.error(f"Job {event.job_id} crashed: {str(event.exception)}")
|
return "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
|
||||||
# 可添加邮件/钉钉告警逻辑
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
def job_error_listener(event: Any) -> None:
|
||||||
|
"""Listener for job errors that logs the exception and traceback.
|
||||||
|
|
||||||
|
Uses the APScheduler event object. This is defensive: if an exception
|
||||||
|
is present it logs its string representation and the full traceback to
|
||||||
|
the configured logger. Keep this lightweight to avoid raising inside
|
||||||
|
the listener.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
if getattr(event, "exception", None):
|
||||||
|
exc = event.exception
|
||||||
|
logger.error(f"Job {getattr(event, 'job_id', '<unknown>')} crashed: {exc}")
|
||||||
|
logger.error(_format_exception(exc))
|
||||||
|
# 可添加邮件告警逻辑(如果需要且已配置)
|
||||||
|
except Exception:
|
||||||
|
# We must not let the listener raise — log and continue.
|
||||||
|
logger.exception("Exception in job_error_listener")
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_interval(value: Any) -> int:
|
||||||
|
"""Return a valid positive integer interval in seconds.
|
||||||
|
|
||||||
|
If the provided value is invalid, returns a safe default (300).
|
||||||
|
"""
|
||||||
|
DEFAULT = 300
|
||||||
|
try:
|
||||||
|
iv = int(value)
|
||||||
|
if iv <= 0:
|
||||||
|
raise ValueError("interval must be > 0")
|
||||||
|
return iv
|
||||||
|
except Exception:
|
||||||
|
logger.warning(
|
||||||
|
"Invalid config.scheduler_interval=%r, falling back to %s seconds",
|
||||||
|
value,
|
||||||
|
DEFAULT,
|
||||||
|
)
|
||||||
|
return DEFAULT
|
||||||
|
|
||||||
|
|
||||||
|
def main() -> None:
|
||||||
|
"""Create and start the scheduler with graceful shutdown handling."""
|
||||||
scheduler = BlockingScheduler()
|
scheduler = BlockingScheduler()
|
||||||
# 每隔config.scheduler_interval秒执行一次任务,同时设定第一次执行在程序启动后10秒后执行
|
|
||||||
|
interval_seconds = _validate_interval(getattr(config, "scheduler_interval", None))
|
||||||
|
|
||||||
|
# 每隔 interval_seconds 秒执行一次任务,同时设定第一次执行在程序启动10秒后执行
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
partial(manager_task, scheduler),
|
partial(manager_task, scheduler),
|
||||||
'interval',
|
"interval",
|
||||||
seconds=config.scheduler_interval,
|
seconds=interval_seconds,
|
||||||
jitter=30, # 添加随机抖动避免任务雪崩
|
jitter=30,
|
||||||
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器
|
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10),
|
||||||
)
|
)
|
||||||
|
|
||||||
# 添加任务错误监听器
|
# 添加任务错误监听器
|
||||||
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
|
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
|
||||||
|
|
||||||
|
# Graceful shutdown handlers
|
||||||
|
def _shutdown(signum, frame):
|
||||||
|
log(f"Received signal {signum}. Shutting down scheduler...")
|
||||||
|
try:
|
||||||
|
scheduler.shutdown(wait=False)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error while shutting down scheduler")
|
||||||
|
# Ensure process exits after scheduler shutdown
|
||||||
|
# sys.exit(0)
|
||||||
|
|
||||||
|
signal.signal(signal.SIGINT, _shutdown)
|
||||||
|
signal.signal(signal.SIGTERM, _shutdown)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
log("started successfully.")
|
log("started successfully.")
|
||||||
scheduler.start() # 阻塞运行
|
scheduler.start() # 阻塞运行
|
||||||
except (KeyboardInterrupt, SystemExit):
|
except (KeyboardInterrupt, SystemExit):
|
||||||
log("Shutting down ...")
|
log("Shutting down ...")
|
||||||
|
if scheduler.state == 1:
|
||||||
|
try:
|
||||||
|
scheduler.shutdown(wait=False)
|
||||||
|
except Exception:
|
||||||
|
logger.exception("Error while shutting down scheduler")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|||||||
@ -17,7 +17,7 @@ class TScheduler(Base):
|
|||||||
task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据')
|
task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据')
|
||||||
active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用')
|
active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用')
|
||||||
executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者')
|
executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者')
|
||||||
handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序')
|
description: Optional[str] = Column(String(32), nullable=True, comment='任务描述')
|
||||||
last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间')
|
last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间')
|
||||||
next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间')
|
next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间')
|
||||||
create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间')
|
create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间')
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
import logging.config
|
import logging.config
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
import config.config
|
from config import config
|
||||||
|
|
||||||
"""
|
"""
|
||||||
Usage:
|
Usage:
|
||||||
@ -66,5 +66,11 @@ logger = logging.getLogger('root')
|
|||||||
logger.info(log_config_message)
|
logger.info(log_config_message)
|
||||||
|
|
||||||
|
|
||||||
def log(message: str):
|
def log(message: str) -> None:
|
||||||
logger.info(f'{config.config.scheduler_name} {message}')
|
"""Helper wrapper to log a message prefixed with the scheduler name.
|
||||||
|
|
||||||
|
Kept small and typed to be a safe, low-risk refactor: it unifies how
|
||||||
|
`config` is imported across the codebase (other modules use
|
||||||
|
`from config import config`).
|
||||||
|
"""
|
||||||
|
logger.info(f'{config.scheduler_name} {message}')
|
||||||
|
|||||||
@ -12,13 +12,13 @@ from database.tscheduler.crud import get_tasks_by_executor
|
|||||||
from log.log_manager import log
|
from log.log_manager import log
|
||||||
|
|
||||||
"""
|
"""
|
||||||
这是一个特殊的任务,负责管理任务,命名为管理者任务。
|
这是一个特殊的任务,负责管理任务,命名为任务管理者任务。
|
||||||
|
|
||||||
工作流程:
|
工作流程:
|
||||||
1 检索数据库任务数据表
|
1 检索数据库任务数据表t_scheduler
|
||||||
2 检查是否已经在任务队列中,如果不在则添加
|
2 检查是否已经在任务队列中,如果不在则添加
|
||||||
|
|
||||||
任务执行时间间隔为600秒。
|
任务执行时间间隔为{config.scheduler_interval}秒。
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -26,20 +26,40 @@ from log.log_manager import log
|
|||||||
def log_task_execution(task_name: str, start_time: float, end_time: float = None):
|
def log_task_execution(task_name: str, start_time: float, end_time: float = None):
|
||||||
"""辅助函数,记录任务的开始和结束日志"""
|
"""辅助函数,记录任务的开始和结束日志"""
|
||||||
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
|
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
|
||||||
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
|
# 如果没有提供 end_time,只记录开始日志
|
||||||
if end_time is None:
|
if end_time is None:
|
||||||
log(f"{task_name} start execute at {start_time_str}")
|
log(f"{task_name} start execute at {start_time_str}")
|
||||||
else:
|
return
|
||||||
elapsed_time = end_time - start_time
|
|
||||||
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
|
# 否则格式化结束时间并记录耗时
|
||||||
|
try:
|
||||||
|
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
|
||||||
|
except Exception:
|
||||||
|
end_time_str = str(end_time)
|
||||||
|
elapsed_time = end_time - start_time
|
||||||
|
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
|
||||||
|
|
||||||
|
|
||||||
def execute_task(task: callable):
|
def execute_task(task: callable):
|
||||||
"""执行任务并记录日志"""
|
"""执行任务并记录日志"""
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
|
# 获取可读的任务名称
|
||||||
|
try:
|
||||||
|
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
|
||||||
|
except Exception:
|
||||||
|
task_name = getattr(task, '__name__', repr(task))
|
||||||
|
|
||||||
log_task_execution(task_name, start_time) # 先记录开始时间
|
log_task_execution(task_name, start_time) # 先记录开始时间
|
||||||
task()
|
|
||||||
|
# 捕获任务执行中的异常,防止调度器崩溃
|
||||||
|
try:
|
||||||
|
task()
|
||||||
|
except Exception as e:
|
||||||
|
end_time = time.time()
|
||||||
|
log(f"{task_name} raised exception: {e}")
|
||||||
|
log_task_execution(task_name, start_time, end_time)
|
||||||
|
return
|
||||||
|
|
||||||
end_time = time.time()
|
end_time = time.time()
|
||||||
log_task_execution(task_name, start_time, end_time) # 记录结束时间
|
log_task_execution(task_name, start_time, end_time) # 记录结束时间
|
||||||
|
|
||||||
@ -47,68 +67,89 @@ def execute_task(task: callable):
|
|||||||
def load_tasks(scheduler: BlockingScheduler):
|
def load_tasks(scheduler: BlockingScheduler):
|
||||||
with get_session() as db:
|
with get_session() as db:
|
||||||
tasks = get_tasks_by_executor(db, config.scheduler_name)
|
tasks = get_tasks_by_executor(db, config.scheduler_name)
|
||||||
for task in tasks:
|
for db_task in tasks:
|
||||||
module_path = task.module_path
|
module_path = db_task.module_path
|
||||||
function_name = task.function_name
|
function_name = db_task.function_name
|
||||||
trigger = task.trigger
|
trigger = db_task.trigger
|
||||||
interval_seconds = task.interval_seconds
|
interval_seconds = db_task.interval_seconds
|
||||||
task_id = task.id
|
task_id = db_task.id
|
||||||
|
|
||||||
# 动态导入模块和函数
|
# 动态导入模块和函数,失败则记录并跳过该任务
|
||||||
module = importlib.import_module(module_path)
|
try:
|
||||||
task_function = partial(execute_task, partial(getattr(module, function_name), task))
|
module = importlib.import_module(module_path)
|
||||||
|
func = getattr(module, function_name)
|
||||||
|
except Exception as e:
|
||||||
|
log(f"Failed to import {module_path}.{function_name} for task id={task_id}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
# 检查任务是否已存在
|
# 将数据库任务对象作为参数传入任务函数
|
||||||
if not scheduler.get_job(str(task_id)):
|
task_callable = partial(func, db_task)
|
||||||
|
task_function = partial(execute_task, task_callable)
|
||||||
|
|
||||||
|
try:
|
||||||
if trigger == "interval":
|
if trigger == "interval":
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
task_function,
|
task_function,
|
||||||
"interval",
|
"interval",
|
||||||
seconds=interval_seconds,
|
seconds=interval_seconds or 0,
|
||||||
id=str(task_id),
|
id=str(task_id),
|
||||||
replace_existing=True
|
replace_existing=True
|
||||||
)
|
)
|
||||||
log(f"Task {task.task_name} added with interval {interval_seconds} seconds")
|
log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds")
|
||||||
|
|
||||||
elif trigger == "cron":
|
elif trigger == "cron":
|
||||||
# 解析 cron 表达式的字段
|
fields = (db_task.cron_expression or "").split()
|
||||||
fields = task.cron_expression.split()
|
|
||||||
# 确保字段长度符合七字段格式
|
|
||||||
if len(fields) != 7:
|
if len(fields) != 7:
|
||||||
raise ValueError("无效的 Quartz cron 表达式")
|
log(f"Invalid cron expression for task id={task_id}: {db_task.cron_expression}")
|
||||||
# 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*`
|
continue
|
||||||
|
# 将 Quartz 的 `?` 替换为 APScheduler 可接受的 `*`
|
||||||
if fields[5] == '?':
|
if fields[5] == '?':
|
||||||
fields[5] = '*' # 替换 `day_of_week` 字段中的 `?`
|
fields[5] = '*'
|
||||||
# 使用 cron 表达式的字段添加任务
|
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
task_function, # 要执行的任务
|
task_function,
|
||||||
'cron', # 使用 cron 触发器
|
'cron',
|
||||||
second=fields[0], # 秒
|
second=fields[0],
|
||||||
minute=fields[1], # 分钟
|
minute=fields[1],
|
||||||
hour=fields[2], # 小时
|
hour=fields[2],
|
||||||
day=fields[3], # 日期
|
day=fields[3],
|
||||||
month=fields[4], # 月份
|
month=fields[4],
|
||||||
day_of_week=fields[5], # 星期
|
day_of_week=fields[5],
|
||||||
year=fields[6], # 年份
|
year=fields[6],
|
||||||
id=str(task_id),
|
id=str(task_id),
|
||||||
replace_existing=True
|
replace_existing=True
|
||||||
)
|
)
|
||||||
log(f"Task {task.task_name} added with cron {task.cron_expression}")
|
log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}")
|
||||||
|
|
||||||
elif trigger == "date":
|
elif trigger == "date":
|
||||||
# 如果task.execution_date为None,则设置为当前时间10秒后
|
run_date = db_task.execution_date
|
||||||
if task.execution_date is None :
|
# 如果 execution_date 为 None 或不是 datetime,尝试解析或回退为现在+10s
|
||||||
task.execution_date = datetime.now() + timedelta(seconds=10)
|
if run_date is None or not isinstance(run_date, datetime):
|
||||||
|
try:
|
||||||
|
# 尝试解析 ISO 格式字符串(如果传入的是字符串)
|
||||||
|
if isinstance(run_date, str):
|
||||||
|
run_date = datetime.fromisoformat(run_date)
|
||||||
|
else:
|
||||||
|
run_date = datetime.now() + timedelta(seconds=10)
|
||||||
|
except Exception:
|
||||||
|
run_date = datetime.now() + timedelta(seconds=10)
|
||||||
|
|
||||||
scheduler.add_job(
|
scheduler.add_job(
|
||||||
task_function,
|
task_function,
|
||||||
"date",
|
"date",
|
||||||
run_date=task.execution_date,
|
run_date=run_date,
|
||||||
id=str(task_id),
|
id=str(task_id),
|
||||||
replace_existing=True
|
replace_existing=True
|
||||||
)
|
)
|
||||||
log(f"Task {task.task_name} added with date {task.execution_date}")
|
log(f"Task {db_task.task_name} added/updated with date {run_date}")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log(f"Invalid trigger type: {trigger}")
|
log(f"Invalid trigger type for task id={task_id}: {trigger}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
log(f"Failed to schedule task id={task_id}: {e}")
|
||||||
|
|
||||||
|
|
||||||
# 管理者任务
|
# 任务管理者的任务
|
||||||
def manager_task(scheduler: BlockingScheduler):
|
def manager_task(scheduler: BlockingScheduler):
|
||||||
load_tasks(scheduler)
|
load_tasks(scheduler)
|
||||||
|
|||||||
Reference in New Issue
Block a user