import datetime from functools import partial from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.events import EVENT_JOB_ERROR from config import config, settings from task import manager_task from utils import logger, MailSender def job_error_listener(event): if event.exception: logger.error(f"Job {event.job_id} crashed: {str(event.exception)}") # 可添加邮件/钉钉告警逻辑 try: mail_sender = MailSender() mail_sender.execute( to_addrs=settings.ERROR_NOTIFICATION_EMAIL, subject=f"Job {event.job_id} crashed", body=f"Job {event.job_id} crashed with error: {str(event.exception)}" ) except Exception as e: logger.error(f"Failed to send error notification email: {e}") if __name__ == '__main__': scheduler = BlockingScheduler() # 每隔config.scheduler_interval秒执行一次任务,同时设定第一次执行在程序启动后10秒后执行 scheduler.add_job( partial(manager_task, scheduler), 'interval', seconds=config.scheduler_interval, jitter=30, # 添加随机抖动避免任务雪崩 next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器 ) # 添加任务错误监听器 scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR) try: logger.info(f"started successfully.") scheduler.start() # 阻塞运行 except (KeyboardInterrupt, SystemExit): logger.info(f"Shutting down ...")