Compare commits
7 Commits
fd2a3171ad
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 39138982ee | |||
| fa2fb120d9 | |||
| d8e6b58014 | |||
| 5c3c429620 | |||
| 200550b4f8 | |||
| ca39d1f891 | |||
| 52d5bbc216 |
14
.env
Normal file
14
.env
Normal file
@ -0,0 +1,14 @@
|
||||
ENV=dev
|
||||
|
||||
DEBUG=true
|
||||
|
||||
# 日志配置
|
||||
LOG_LEVEL=DEBUG
|
||||
LOG_TYPE=console
|
||||
|
||||
# 数据库配置
|
||||
DB_HOST= 47.119.128.161 # 192.168.1.200
|
||||
DB_PORT=19732
|
||||
DB_USER=postgres
|
||||
DB_PASS=postgres
|
||||
DB_NAME=peter
|
||||
100
arlo.py
100
arlo.py
@ -1,36 +1,110 @@
|
||||
"""Entrypoint for the scheduler process.
|
||||
|
||||
This module starts the APScheduler scheduler and ensures a graceful
|
||||
shutdown on SIGINT/SIGTERM. It also improves job error logging to
|
||||
include exception tracebacks for easier debugging.
|
||||
"""
|
||||
|
||||
import datetime
|
||||
import signal
|
||||
import traceback
|
||||
from functools import partial
|
||||
from typing import Any
|
||||
|
||||
from apscheduler.events import EVENT_JOB_ERROR
|
||||
from apscheduler.schedulers.blocking import BlockingScheduler
|
||||
|
||||
from config import config
|
||||
from log.log_manager import log, logger
|
||||
from task.manager_task import manager_task
|
||||
from utils import logger
|
||||
|
||||
|
||||
def job_error_listener(event):
|
||||
if event.exception:
|
||||
logger.error(f"Job {event.job_id} crashed: {str(event.exception)}")
|
||||
# 可添加邮件/钉钉告警逻辑
|
||||
def _format_exception(exc: BaseException) -> str:
|
||||
"""Return a nicely formatted exception with traceback."""
|
||||
return "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
def job_error_listener(event: Any) -> None:
|
||||
"""Listener for job errors that logs the exception and traceback.
|
||||
|
||||
Uses the APScheduler event object. This is defensive: if an exception
|
||||
is present it logs its string representation and the full traceback to
|
||||
the configured logger. Keep this lightweight to avoid raising inside
|
||||
the listener.
|
||||
"""
|
||||
try:
|
||||
if getattr(event, "exception", None):
|
||||
exc = event.exception
|
||||
logger.error(f"Job {getattr(event, 'job_id', '<unknown>')} crashed: {exc}")
|
||||
logger.error(_format_exception(exc))
|
||||
# 可添加邮件告警逻辑(如果需要且已配置)
|
||||
except Exception:
|
||||
# We must not let the listener raise — log and continue.
|
||||
logger.exception("Exception in job_error_listener")
|
||||
|
||||
|
||||
def _validate_interval(value: Any) -> int:
|
||||
"""Return a valid positive integer interval in seconds.
|
||||
|
||||
If the provided value is invalid, returns a safe default (300).
|
||||
"""
|
||||
DEFAULT = 300
|
||||
try:
|
||||
iv = int(value)
|
||||
if iv <= 0:
|
||||
raise ValueError("interval must be > 0")
|
||||
return iv
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Invalid config.scheduler_interval=%r, falling back to %s seconds",
|
||||
value,
|
||||
DEFAULT,
|
||||
)
|
||||
return DEFAULT
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Create and start the scheduler with graceful shutdown handling."""
|
||||
scheduler = BlockingScheduler()
|
||||
# 每隔config.scheduler_interval秒执行一次任务,同时设定第一次执行在程序启动后10秒后执行
|
||||
|
||||
interval_seconds = _validate_interval(getattr(config, "scheduler_interval", None))
|
||||
|
||||
# 每隔 interval_seconds 秒执行一次任务,同时设定第一次执行在程序启动10秒后执行
|
||||
scheduler.add_job(
|
||||
partial(manager_task, scheduler),
|
||||
'interval',
|
||||
seconds=config.scheduler_interval,
|
||||
jitter=30, # 添加随机抖动避免任务雪崩
|
||||
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器
|
||||
"interval",
|
||||
seconds=interval_seconds,
|
||||
jitter=30,
|
||||
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10),
|
||||
)
|
||||
|
||||
# 添加任务错误监听器
|
||||
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
|
||||
|
||||
# Graceful shutdown handlers
|
||||
def _shutdown(signum, frame):
|
||||
logger.info(f"Received signal {signum}. Shutting down scheduler...")
|
||||
try:
|
||||
scheduler.shutdown(wait=False)
|
||||
except Exception:
|
||||
logger.exception("Error while shutting down scheduler")
|
||||
# Ensure process exits after scheduler shutdown
|
||||
# sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGINT, _shutdown)
|
||||
signal.signal(signal.SIGTERM, _shutdown)
|
||||
|
||||
try:
|
||||
log("started successfully.")
|
||||
logger.info("started successfully.")
|
||||
scheduler.start() # 阻塞运行
|
||||
except (KeyboardInterrupt, SystemExit):
|
||||
log("Shutting down ...")
|
||||
logger.info("Shutting down ...")
|
||||
if scheduler.state == 1:
|
||||
try:
|
||||
scheduler.shutdown(wait=False)
|
||||
except Exception:
|
||||
logger.exception("Error while shutting down scheduler")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
1
config/__init__.py
Normal file
1
config/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from config.settings import settings
|
||||
18
config/database.py
Normal file
18
config/database.py
Normal file
@ -0,0 +1,18 @@
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import sessionmaker, scoped_session
|
||||
from config.settings import settings
|
||||
|
||||
SQLALCHEMY_SYNC_URL = (
|
||||
f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASS}"
|
||||
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
|
||||
)
|
||||
|
||||
engine = create_engine(
|
||||
SQLALCHEMY_SYNC_URL,
|
||||
echo=False, # 开发可改 True
|
||||
future=True
|
||||
)
|
||||
|
||||
SessionLocal = scoped_session(
|
||||
sessionmaker(bind=engine, autoflush=False, autocommit=False)
|
||||
)
|
||||
22
config/env_loader.py
Normal file
22
config/env_loader.py
Normal file
@ -0,0 +1,22 @@
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
def load_env():
|
||||
"""
|
||||
自动根据 ENV 加载对应的 .env 文件
|
||||
"""
|
||||
base_file = ".env"
|
||||
prod_file = ".env.prod"
|
||||
test_file = ".env.test"
|
||||
|
||||
# 先加载基础 .env
|
||||
if os.path.exists(base_file):
|
||||
load_dotenv(base_file)
|
||||
|
||||
# 根据参数 ENV 再加载其他环境
|
||||
env = os.getenv("ENV", "dev")
|
||||
|
||||
if env == "prod" and os.path.exists(prod_file):
|
||||
load_dotenv(prod_file, override=True)
|
||||
elif env == "test" and os.path.exists(test_file):
|
||||
load_dotenv(test_file, override=True)
|
||||
31
config/settings.py
Normal file
31
config/settings.py
Normal file
@ -0,0 +1,31 @@
|
||||
from pydantic_settings import BaseSettings
|
||||
from pydantic import Field
|
||||
from config.env_loader import load_env
|
||||
|
||||
# 先加载 ENV & .env
|
||||
load_env()
|
||||
|
||||
class Settings(BaseSettings):
|
||||
# 环境
|
||||
ENV: str = Field("dev")
|
||||
DEBUG: bool = Field(True)
|
||||
|
||||
# 日志
|
||||
LOG_LEVEL: str = Field("LOG_LEVEL")
|
||||
LOG_FILE_PATH: str = Field("logs")
|
||||
LOG_TYPE: str = Field("console")
|
||||
|
||||
# 数据库
|
||||
DB_HOST: str
|
||||
DB_PORT: int
|
||||
DB_USER: str
|
||||
DB_PASS: str
|
||||
DB_NAME: str
|
||||
|
||||
class Config:
|
||||
env_file = ".env"
|
||||
env_file_encoding = "utf-8"
|
||||
|
||||
|
||||
# 全局唯一配置实例
|
||||
settings = Settings()
|
||||
@ -21,7 +21,14 @@ engine = create_engine(
|
||||
'keepalives_count': 5
|
||||
}
|
||||
)
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
def get_engine():
|
||||
return engine
|
||||
|
||||
# 不在模块导入时自动创建表;提供显式创建函数
|
||||
def create_tables():
|
||||
# 确保相关 model 模块已被导入并注册到 Base
|
||||
Base.metadata.create_all(engine)
|
||||
|
||||
@contextmanager
|
||||
def get_session():
|
||||
|
||||
32
database/tcontent/crud.py
Normal file
32
database/tcontent/crud.py
Normal file
@ -0,0 +1,32 @@
|
||||
from database.tcontent.model import TContent
|
||||
|
||||
def create_content(db, content: TContent):
|
||||
db.add(content)
|
||||
db.commit()
|
||||
db.refresh(content)
|
||||
return content
|
||||
|
||||
def get_content_by_id(db, content_id: int):
|
||||
return db.query(TContent).filter(TContent.id == content_id).first()
|
||||
|
||||
def update_content(db, content_id: int, updates: dict):
|
||||
content = db.query(TContent).filter(TContent.id == content_id).first()
|
||||
if content:
|
||||
for key, value in updates.items():
|
||||
setattr(content, key, value)
|
||||
db.commit()
|
||||
db.refresh(content)
|
||||
return content
|
||||
|
||||
def delete_content(db, content_id: int):
|
||||
content = db.query(TContent).filter(TContent.id == content_id).first()
|
||||
if content:
|
||||
db.delete(content)
|
||||
db.commit()
|
||||
return content
|
||||
|
||||
def drop_table(db):
|
||||
TContent.__table__.drop(db.get_bind(), checkfirst=True)
|
||||
|
||||
def create_table(db):
|
||||
TContent.__table__.create(db.get_bind(), checkfirst=True)
|
||||
15
database/tcontent/debug.py
Normal file
15
database/tcontent/debug.py
Normal file
@ -0,0 +1,15 @@
|
||||
import database.tcontent.model
|
||||
from database.database import get_session, create_tables
|
||||
from database.tcontent.crud import drop_table, get_content_by_id
|
||||
|
||||
|
||||
|
||||
# create_tables()
|
||||
|
||||
# with get_session() as db:
|
||||
# task = get_content_by_id(db, 1)
|
||||
# print(task)
|
||||
# print(task.id)
|
||||
|
||||
with get_session() as db:
|
||||
drop_table(db)
|
||||
12
database/tcontent/model.py
Normal file
12
database/tcontent/model.py
Normal file
@ -0,0 +1,12 @@
|
||||
from sqlalchemy import Column, Integer, String, Text, DateTime, func
|
||||
from database.database import Base
|
||||
|
||||
class TContent(Base):
|
||||
__tablename__ = 't_content'
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True, comment='自动递增的唯一内容ID')
|
||||
project = Column(String(64), nullable=False, comment='项目名称', index=True)
|
||||
subject = Column(String(256), nullable=False, comment='主题')
|
||||
content = Column(Text, nullable=True, comment='内容')
|
||||
create_time = Column(DateTime, server_default=func.now(), nullable=False, comment='创建时间')
|
||||
update_time = Column(DateTime, server_default=func.now(), onupdate=func.now(), nullable=False, comment='更新时间')
|
||||
@ -17,7 +17,7 @@ class TScheduler(Base):
|
||||
task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据')
|
||||
active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用')
|
||||
executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者')
|
||||
handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序')
|
||||
description: Optional[str] = Column(String(32), nullable=True, comment='任务描述')
|
||||
last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间')
|
||||
next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间')
|
||||
create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间')
|
||||
|
||||
@ -5,3 +5,5 @@ services:
|
||||
image: arlo:latest
|
||||
container_name: arlo
|
||||
restart: always
|
||||
environment:
|
||||
- TZ=Asia/Shanghai # 设置时区环境变量
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import logging.config
|
||||
import sys
|
||||
|
||||
import config.config
|
||||
from config import config
|
||||
|
||||
"""
|
||||
Usage:
|
||||
@ -66,5 +66,11 @@ logger = logging.getLogger('root')
|
||||
logger.info(log_config_message)
|
||||
|
||||
|
||||
def log(message: str):
|
||||
logger.info(f'{config.config.scheduler_name} {message}')
|
||||
def log(message: str) -> None:
|
||||
"""Helper wrapper to log a message prefixed with the scheduler name.
|
||||
|
||||
Kept small and typed to be a safe, low-risk refactor: it unifies how
|
||||
`config` is imported across the codebase (other modules use
|
||||
`from config import config`).
|
||||
"""
|
||||
logger.info(f'{config.scheduler_name} {message}')
|
||||
|
||||
@ -15,7 +15,7 @@ class MailManager:
|
||||
|
||||
def send_mail(self, subject: str, content: str, receiver_email:str = "changsongd@126.com"):
|
||||
message = MIMEMultipart()
|
||||
message['From'] = self.sender_email
|
||||
message['From'] = f'Arlo <{self.sender_email}>'
|
||||
message['To'] = receiver_email
|
||||
message['Subject'] = subject
|
||||
message.attach(MIMEText(content, 'plain'))
|
||||
|
||||
2
models/__init__.py
Normal file
2
models/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from models.source_content import SourceContent
|
||||
from models.article import Article
|
||||
50
models/article.py
Normal file
50
models/article.py
Normal file
@ -0,0 +1,50 @@
|
||||
from datetime import datetime
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
from sqlalchemy import String, Text, Integer, DateTime, func
|
||||
from models.base import Base
|
||||
|
||||
|
||||
class Article(Base):
|
||||
__tablename__ = "t_article"
|
||||
|
||||
id: Mapped[int] = mapped_column(
|
||||
Integer,
|
||||
primary_key=True,
|
||||
autoincrement=True,
|
||||
comment="自动递增的唯一内容ID"
|
||||
)
|
||||
|
||||
title: Mapped[str] = mapped_column(
|
||||
String(256),
|
||||
nullable=False,
|
||||
index=True,
|
||||
comment="标题"
|
||||
)
|
||||
|
||||
keywords: Mapped[str | None] = mapped_column(
|
||||
Text,
|
||||
nullable=True,
|
||||
comment="关键词"
|
||||
)
|
||||
|
||||
content: Mapped[str | None] = mapped_column(
|
||||
Text,
|
||||
nullable=True,
|
||||
comment="内容"
|
||||
)
|
||||
|
||||
create_time: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
nullable=False,
|
||||
comment="创建时间"
|
||||
)
|
||||
|
||||
used: Mapped[bool] = mapped_column(
|
||||
default=False,
|
||||
nullable=False,
|
||||
comment="是否已被使用"
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<Article id={self.id} title={self.title!r} used={self.used!r}>"
|
||||
4
models/base.py
Normal file
4
models/base.py
Normal file
@ -0,0 +1,4 @@
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
pass
|
||||
57
models/source_content.py
Normal file
57
models/source_content.py
Normal file
@ -0,0 +1,57 @@
|
||||
from datetime import datetime
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
from sqlalchemy import String, Text, Integer, DateTime, Index, func
|
||||
from models.base import Base
|
||||
|
||||
|
||||
class SourceContent(Base):
|
||||
__tablename__ = "t_source_content"
|
||||
|
||||
id: Mapped[int] = mapped_column(
|
||||
Integer,
|
||||
primary_key=True,
|
||||
autoincrement=True,
|
||||
comment="自动递增的唯一内容ID"
|
||||
)
|
||||
|
||||
link: Mapped[str] = mapped_column(
|
||||
String(2048),
|
||||
nullable=False,
|
||||
index=True,
|
||||
comment="链接"
|
||||
)
|
||||
|
||||
platform: Mapped[str] = mapped_column(
|
||||
String(32),
|
||||
nullable=False,
|
||||
comment="平台"
|
||||
)
|
||||
|
||||
content: Mapped[str | None] = mapped_column(
|
||||
Text,
|
||||
nullable=True,
|
||||
comment="内容"
|
||||
)
|
||||
|
||||
create_time: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
nullable=False,
|
||||
comment="创建时间"
|
||||
)
|
||||
|
||||
update_time: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
onupdate=func.now(),
|
||||
nullable=False,
|
||||
comment="更新时间"
|
||||
)
|
||||
|
||||
# ——可选优化:添加 项目 + 主题 联合唯一索引——
|
||||
__table_args__ = (
|
||||
Index("link", "link", unique=True),
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f"<SourceContent id={self.id} link={self.link!r} platform={self.platform!r}>"
|
||||
BIN
requirements.txt
BIN
requirements.txt
Binary file not shown.
24
task/common_mail/common_mail_task.py
Normal file
24
task/common_mail/common_mail_task.py
Normal file
@ -0,0 +1,24 @@
|
||||
from database.tscheduler.model import TScheduler
|
||||
from mail.mail_manager import send_mail
|
||||
from config.database import SessionLocal
|
||||
from models import Article
|
||||
from utils import logger
|
||||
|
||||
|
||||
def common_mail_task(scheduler: TScheduler):
|
||||
with SessionLocal() as db:
|
||||
# 获取需要发送的内容列表
|
||||
articles = db.query(Article).filter(Article.used == False).all()
|
||||
# 发送邮件
|
||||
for article in articles:
|
||||
subject = article.title
|
||||
content = article.content
|
||||
send_mail(subject, content, receiver_email="changsongd@126.com")
|
||||
logger.info(f"send mail success with title {subject}, content {content[:20]}.")
|
||||
# 更新数据库
|
||||
for article in articles:
|
||||
article.used = True
|
||||
db.commit()
|
||||
|
||||
if __name__ == '__main__':
|
||||
common_mail_task(TScheduler())
|
||||
@ -12,13 +12,13 @@ from database.tscheduler.crud import get_tasks_by_executor
|
||||
from log.log_manager import log
|
||||
|
||||
"""
|
||||
这是一个特殊的任务,负责管理任务,命名为管理者任务。
|
||||
这是一个特殊的任务,负责管理任务,命名为任务管理者任务。
|
||||
|
||||
工作流程:
|
||||
1 检索数据库任务数据表
|
||||
1 检索数据库任务数据表t_scheduler
|
||||
2 检查是否已经在任务队列中,如果不在则添加
|
||||
|
||||
任务执行时间间隔为600秒。
|
||||
任务执行时间间隔为{config.scheduler_interval}秒。
|
||||
|
||||
"""
|
||||
|
||||
@ -26,89 +26,166 @@ from log.log_manager import log
|
||||
def log_task_execution(task_name: str, start_time: float, end_time: float = None):
|
||||
"""辅助函数,记录任务的开始和结束日志"""
|
||||
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
|
||||
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
|
||||
# 如果没有提供 end_time,只记录开始日志
|
||||
if end_time is None:
|
||||
log(f"{task_name} start execute at {start_time_str}")
|
||||
else:
|
||||
elapsed_time = end_time - start_time
|
||||
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
|
||||
return
|
||||
|
||||
# 否则格式化结束时间并记录耗时
|
||||
try:
|
||||
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
|
||||
except Exception:
|
||||
end_time_str = str(end_time)
|
||||
elapsed_time = end_time - start_time
|
||||
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
|
||||
|
||||
|
||||
def execute_task(task: callable):
|
||||
"""执行任务并记录日志"""
|
||||
start_time = time.time()
|
||||
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
|
||||
# 获取可读的任务名称
|
||||
try:
|
||||
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
|
||||
except Exception:
|
||||
task_name = getattr(task, '__name__', repr(task))
|
||||
|
||||
log_task_execution(task_name, start_time) # 先记录开始时间
|
||||
task()
|
||||
|
||||
# 捕获任务执行中的异常,防止调度器崩溃
|
||||
try:
|
||||
task()
|
||||
except Exception as e:
|
||||
end_time = time.time()
|
||||
log(f"{task_name} raised exception: {e}")
|
||||
log_task_execution(task_name, start_time, end_time)
|
||||
return
|
||||
|
||||
end_time = time.time()
|
||||
log_task_execution(task_name, start_time, end_time) # 记录结束时间
|
||||
|
||||
# 从数据库加载任务
|
||||
def load_tasks(scheduler: BlockingScheduler):
|
||||
def build_db_signature(db_task) -> str:
|
||||
t = (db_task.trigger or "").lower()
|
||||
if t == "interval":
|
||||
sec = int(db_task.interval_seconds or 0)
|
||||
return f"{db_task.task_name}#interval:seconds={sec}"
|
||||
if t == "cron":
|
||||
expr = (db_task.cron_expression or "").strip()
|
||||
parts = expr.split()
|
||||
# 规范化:确保 7 个字段,替换 Quartz 的 '?' 为 '*'
|
||||
if len(parts) == 7:
|
||||
if parts[5] == '?':
|
||||
parts[5] = '*'
|
||||
expr = " ".join(parts)
|
||||
return f"{db_task.task_name}#cron:{expr}"
|
||||
if t == "date":
|
||||
run_date = db_task.execution_date
|
||||
if isinstance(run_date, datetime):
|
||||
return f"{db_task.task_name}#date:{run_date.isoformat()}"
|
||||
try:
|
||||
return f"{db_task.task_name}#date:{str(run_date)}"
|
||||
except Exception:
|
||||
return f"{db_task.task_name}#date:invalid"
|
||||
return f"{db_task.task_name}#unknown:{t}"
|
||||
|
||||
with get_session() as db:
|
||||
tasks = get_tasks_by_executor(db, config.scheduler_name)
|
||||
for task in tasks:
|
||||
module_path = task.module_path
|
||||
function_name = task.function_name
|
||||
trigger = task.trigger
|
||||
interval_seconds = task.interval_seconds
|
||||
task_id = task.id
|
||||
for db_task in tasks:
|
||||
module_path = db_task.module_path
|
||||
function_name = db_task.function_name
|
||||
trigger = db_task.trigger
|
||||
interval_seconds = db_task.interval_seconds
|
||||
task_id = db_task.id
|
||||
|
||||
# 动态导入模块和函数
|
||||
module = importlib.import_module(module_path)
|
||||
task_function = partial(execute_task, partial(getattr(module, function_name), task))
|
||||
# 动态导入模块和函数,失败则记录并跳过该任务
|
||||
try:
|
||||
module = importlib.import_module(module_path)
|
||||
func = getattr(module, function_name)
|
||||
except Exception as e:
|
||||
log(f"Failed to import {module_path}.{function_name} for task id={task_id}: {e}")
|
||||
continue
|
||||
|
||||
# 检查任务是否已存在
|
||||
if not scheduler.get_job(str(task_id)):
|
||||
# 将数据库任务对象作为参数传入任务函数
|
||||
task_callable = partial(func, db_task)
|
||||
task_function = partial(execute_task, task_callable)
|
||||
|
||||
# 计算数据库中当前任务的签名
|
||||
signature = build_db_signature(db_task)
|
||||
|
||||
job = scheduler.get_job(str(task_id))
|
||||
# 如果任务已经存在并且签名相同,则跳过更新
|
||||
if job and getattr(job, "name", None) == signature:
|
||||
log(f"Task {db_task.task_name} (id={task_id}) already scheduled and unchanged, skipping.")
|
||||
continue
|
||||
|
||||
try:
|
||||
if trigger == "interval":
|
||||
scheduler.add_job(
|
||||
task_function,
|
||||
"interval",
|
||||
seconds=interval_seconds,
|
||||
seconds=interval_seconds or 0,
|
||||
id=str(task_id),
|
||||
replace_existing=True
|
||||
replace_existing=True,
|
||||
name=signature
|
||||
)
|
||||
log(f"Task {task.task_name} added with interval {interval_seconds} seconds")
|
||||
log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds")
|
||||
|
||||
elif trigger == "cron":
|
||||
# 解析 cron 表达式的字段
|
||||
fields = task.cron_expression.split()
|
||||
# 确保字段长度符合七字段格式
|
||||
fields = (db_task.cron_expression or "").split()
|
||||
if len(fields) != 7:
|
||||
raise ValueError("无效的 Quartz cron 表达式")
|
||||
# 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*`
|
||||
log(f"Invalid cron expression for task id={task_id}: {db_task.cron_expression}")
|
||||
continue
|
||||
# 将 Quartz 的 `?` 替换为 APScheduler 可接受的 `*`
|
||||
if fields[5] == '?':
|
||||
fields[5] = '*' # 替换 `day_of_week` 字段中的 `?`
|
||||
# 使用 cron 表达式的字段添加任务
|
||||
fields[5] = '*'
|
||||
|
||||
scheduler.add_job(
|
||||
task_function, # 要执行的任务
|
||||
'cron', # 使用 cron 触发器
|
||||
second=fields[0], # 秒
|
||||
minute=fields[1], # 分钟
|
||||
hour=fields[2], # 小时
|
||||
day=fields[3], # 日期
|
||||
month=fields[4], # 月份
|
||||
day_of_week=fields[5], # 星期
|
||||
year=fields[6], # 年份
|
||||
task_function,
|
||||
'cron',
|
||||
second=fields[0],
|
||||
minute=fields[1],
|
||||
hour=fields[2],
|
||||
day=fields[3],
|
||||
month=fields[4],
|
||||
day_of_week=fields[5],
|
||||
year=fields[6],
|
||||
id=str(task_id),
|
||||
replace_existing=True
|
||||
replace_existing=True,
|
||||
name=signature
|
||||
)
|
||||
log(f"Task {task.task_name} added with cron {task.cron_expression}")
|
||||
log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}")
|
||||
|
||||
elif trigger == "date":
|
||||
# 如果task.execution_date为None,则设置为当前时间10秒后
|
||||
if task.execution_date is None :
|
||||
task.execution_date = datetime.now() + timedelta(seconds=10)
|
||||
run_date = db_task.execution_date
|
||||
# 如果 execution_date 为 None 或不是 datetime,尝试解析或回退为现在+10s
|
||||
if run_date is None or not isinstance(run_date, datetime):
|
||||
try:
|
||||
# 尝试解析 ISO 格式字符串(如果传入的是字符串)
|
||||
if isinstance(run_date, str):
|
||||
run_date = datetime.fromisoformat(run_date)
|
||||
else:
|
||||
run_date = datetime.now() + timedelta(seconds=10)
|
||||
except Exception:
|
||||
run_date = datetime.now() + timedelta(seconds=10)
|
||||
|
||||
scheduler.add_job(
|
||||
task_function,
|
||||
"date",
|
||||
run_date=task.execution_date,
|
||||
run_date=run_date,
|
||||
id=str(task_id),
|
||||
replace_existing=True
|
||||
replace_existing=True,
|
||||
name=signature
|
||||
)
|
||||
log(f"Task {task.task_name} added with date {task.execution_date}")
|
||||
log(f"Task {db_task.task_name} added/updated with date {run_date}")
|
||||
|
||||
else:
|
||||
log(f"Invalid trigger type: {trigger}")
|
||||
log(f"Invalid trigger type for task id={task_id}: {trigger}")
|
||||
|
||||
except Exception as e:
|
||||
log(f"Failed to schedule task id={task_id}: {e}")
|
||||
|
||||
|
||||
# 管理者任务
|
||||
# 任务管理者的任务
|
||||
def manager_task(scheduler: BlockingScheduler):
|
||||
load_tasks(scheduler)
|
||||
|
||||
1
utils/__init__.py
Normal file
1
utils/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from utils.logger import logger
|
||||
37
utils/logger.py
Normal file
37
utils/logger.py
Normal file
@ -0,0 +1,37 @@
|
||||
import sys
|
||||
import os
|
||||
from loguru import logger
|
||||
from config.settings import settings
|
||||
|
||||
# 移除默认的 handler(否则重复输出)
|
||||
logger.remove()
|
||||
|
||||
if "console" in settings.LOG_TYPE:
|
||||
# ======== 控制台输出 ========
|
||||
logger.add(
|
||||
sys.stdout,
|
||||
level=settings.LOG_LEVEL,
|
||||
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> "
|
||||
"| <level>{level: <8}</level> "
|
||||
"| <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> "
|
||||
"- <level>{message}</level>",
|
||||
)
|
||||
|
||||
if "file" in settings.LOG_TYPE:
|
||||
# 日志目录
|
||||
LOG_DIR = settings.LOG_FILE_PATH
|
||||
if not os.path.exists(LOG_DIR):
|
||||
os.makedirs(LOG_DIR)
|
||||
|
||||
# ======== 文件输出(按天切割)========
|
||||
logger.add(
|
||||
f"{LOG_DIR}/app_{{time:YYYY-MM-DD}}.log",
|
||||
rotation="00:00", # 每天 0 点切割
|
||||
retention="7 days", # 保存 7 天
|
||||
encoding="utf-8",
|
||||
level=settings.LOG_LEVEL,
|
||||
enqueue=True, # 多线程安全
|
||||
compression="zip", # 自动压缩旧日志
|
||||
)
|
||||
|
||||
__all__ = ["logger"]
|
||||
Reference in New Issue
Block a user