import datetime from functools import partial from apscheduler.events import EVENT_JOB_ERROR from apscheduler.schedulers.blocking import BlockingScheduler from config import config from log.log_manager import log, logger from task.manager_task import manager_task def job_error_listener(event): if event.exception: logger.error(f"Job {event.job_id} crashed: {str(event.exception)}") # 可添加邮件/钉钉告警逻辑 if __name__ == '__main__': scheduler = BlockingScheduler() # 每隔config.scheduler_interval秒执行一次任务,同时设定第一次执行在程序启动后10秒后执行 scheduler.add_job( partial(manager_task, scheduler), 'interval', seconds=config.scheduler_interval, jitter=30, # 添加随机抖动避免任务雪崩 next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器 ) # 添加任务错误监听器 scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR) try: log("started successfully.") scheduler.start() # 阻塞运行 except (KeyboardInterrupt, SystemExit): log("Shutting down ...")