Compare commits

...

7 Commits

Author SHA1 Message Date
39138982ee fix 邮件发送失败问题
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 11s
2026-02-15 17:10:17 +08:00
fa2fb120d9 fix mail send error
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 10s
2026-02-15 16:45:25 +08:00
d8e6b58014 发件人使用人名
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 11s
2026-02-15 16:13:43 +08:00
5c3c429620 task: add send common mail task
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 11s
2026-02-15 15:53:48 +08:00
200550b4f8 支持更新task,当task信息改变时 2025-11-18 20:23:04 +08:00
ca39d1f891 调整
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 30s
2025-11-14 21:58:36 +08:00
52d5bbc216 优化 2025-11-14 21:45:00 +08:00
23 changed files with 554 additions and 68 deletions

14
.env Normal file
View File

@ -0,0 +1,14 @@
ENV=dev
DEBUG=true
# 日志配置
LOG_LEVEL=DEBUG
LOG_TYPE=console
# 数据库配置
DB_HOST= 47.119.128.161 # 192.168.1.200
DB_PORT=19732
DB_USER=postgres
DB_PASS=postgres
DB_NAME=peter

100
arlo.py
View File

@ -1,36 +1,110 @@
"""Entrypoint for the scheduler process.
This module starts the APScheduler scheduler and ensures a graceful
shutdown on SIGINT/SIGTERM. It also improves job error logging to
include exception tracebacks for easier debugging.
"""
import datetime
import signal
import traceback
from functools import partial
from typing import Any
from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from log.log_manager import log, logger
from task.manager_task import manager_task
from utils import logger
def job_error_listener(event):
if event.exception:
logger.error(f"Job {event.job_id} crashed: {str(event.exception)}")
# 可添加邮件/钉钉告警逻辑
def _format_exception(exc: BaseException) -> str:
"""Return a nicely formatted exception with traceback."""
return "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
if __name__ == '__main__':
def job_error_listener(event: Any) -> None:
"""Listener for job errors that logs the exception and traceback.
Uses the APScheduler event object. This is defensive: if an exception
is present it logs its string representation and the full traceback to
the configured logger. Keep this lightweight to avoid raising inside
the listener.
"""
try:
if getattr(event, "exception", None):
exc = event.exception
logger.error(f"Job {getattr(event, 'job_id', '<unknown>')} crashed: {exc}")
logger.error(_format_exception(exc))
# 可添加邮件告警逻辑(如果需要且已配置)
except Exception:
# We must not let the listener raise — log and continue.
logger.exception("Exception in job_error_listener")
def _validate_interval(value: Any) -> int:
"""Return a valid positive integer interval in seconds.
If the provided value is invalid, returns a safe default (300).
"""
DEFAULT = 300
try:
iv = int(value)
if iv <= 0:
raise ValueError("interval must be > 0")
return iv
except Exception:
logger.warning(
"Invalid config.scheduler_interval=%r, falling back to %s seconds",
value,
DEFAULT,
)
return DEFAULT
def main() -> None:
"""Create and start the scheduler with graceful shutdown handling."""
scheduler = BlockingScheduler()
# 每隔config.scheduler_interval秒执行一次任务同时设定第一次执行在程序启动后10秒后执行
interval_seconds = _validate_interval(getattr(config, "scheduler_interval", None))
# 每隔 interval_seconds 秒执行一次任务同时设定第一次执行在程序启动10秒后执行
scheduler.add_job(
partial(manager_task, scheduler),
'interval',
seconds=config.scheduler_interval,
jitter=30, # 添加随机抖动避免任务雪崩
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器
"interval",
seconds=interval_seconds,
jitter=30,
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10),
)
# 添加任务错误监听器
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
# Graceful shutdown handlers
def _shutdown(signum, frame):
logger.info(f"Received signal {signum}. Shutting down scheduler...")
try:
scheduler.shutdown(wait=False)
except Exception:
logger.exception("Error while shutting down scheduler")
# Ensure process exits after scheduler shutdown
# sys.exit(0)
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
try:
log("started successfully.")
logger.info("started successfully.")
scheduler.start() # 阻塞运行
except (KeyboardInterrupt, SystemExit):
log("Shutting down ...")
logger.info("Shutting down ...")
if scheduler.state == 1:
try:
scheduler.shutdown(wait=False)
except Exception:
logger.exception("Error while shutting down scheduler")
if __name__ == "__main__":
main()

1
config/__init__.py Normal file
View File

@ -0,0 +1 @@
from config.settings import settings

18
config/database.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from config.settings import settings
SQLALCHEMY_SYNC_URL = (
f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASS}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
engine = create_engine(
SQLALCHEMY_SYNC_URL,
echo=False, # 开发可改 True
future=True
)
SessionLocal = scoped_session(
sessionmaker(bind=engine, autoflush=False, autocommit=False)
)

22
config/env_loader.py Normal file
View File

@ -0,0 +1,22 @@
import os
from dotenv import load_dotenv
def load_env():
"""
自动根据 ENV 加载对应的 .env 文件
"""
base_file = ".env"
prod_file = ".env.prod"
test_file = ".env.test"
# 先加载基础 .env
if os.path.exists(base_file):
load_dotenv(base_file)
# 根据参数 ENV 再加载其他环境
env = os.getenv("ENV", "dev")
if env == "prod" and os.path.exists(prod_file):
load_dotenv(prod_file, override=True)
elif env == "test" and os.path.exists(test_file):
load_dotenv(test_file, override=True)

31
config/settings.py Normal file
View File

@ -0,0 +1,31 @@
from pydantic_settings import BaseSettings
from pydantic import Field
from config.env_loader import load_env
# 先加载 ENV & .env
load_env()
class Settings(BaseSettings):
# 环境
ENV: str = Field("dev")
DEBUG: bool = Field(True)
# 日志
LOG_LEVEL: str = Field("LOG_LEVEL")
LOG_FILE_PATH: str = Field("logs")
LOG_TYPE: str = Field("console")
# 数据库
DB_HOST: str
DB_PORT: int
DB_USER: str
DB_PASS: str
DB_NAME: str
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# 全局唯一配置实例
settings = Settings()

View File

@ -21,7 +21,14 @@ engine = create_engine(
'keepalives_count': 5
}
)
Base.metadata.create_all(engine)
def get_engine():
return engine
# 不在模块导入时自动创建表;提供显式创建函数
def create_tables():
# 确保相关 model 模块已被导入并注册到 Base
Base.metadata.create_all(engine)
@contextmanager
def get_session():

32
database/tcontent/crud.py Normal file
View File

@ -0,0 +1,32 @@
from database.tcontent.model import TContent
def create_content(db, content: TContent):
db.add(content)
db.commit()
db.refresh(content)
return content
def get_content_by_id(db, content_id: int):
return db.query(TContent).filter(TContent.id == content_id).first()
def update_content(db, content_id: int, updates: dict):
content = db.query(TContent).filter(TContent.id == content_id).first()
if content:
for key, value in updates.items():
setattr(content, key, value)
db.commit()
db.refresh(content)
return content
def delete_content(db, content_id: int):
content = db.query(TContent).filter(TContent.id == content_id).first()
if content:
db.delete(content)
db.commit()
return content
def drop_table(db):
TContent.__table__.drop(db.get_bind(), checkfirst=True)
def create_table(db):
TContent.__table__.create(db.get_bind(), checkfirst=True)

View File

@ -0,0 +1,15 @@
import database.tcontent.model
from database.database import get_session, create_tables
from database.tcontent.crud import drop_table, get_content_by_id
# create_tables()
# with get_session() as db:
# task = get_content_by_id(db, 1)
# print(task)
# print(task.id)
with get_session() as db:
drop_table(db)

View File

@ -0,0 +1,12 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, func
from database.database import Base
class TContent(Base):
__tablename__ = 't_content'
id = Column(Integer, primary_key=True, autoincrement=True, comment='自动递增的唯一内容ID')
project = Column(String(64), nullable=False, comment='项目名称', index=True)
subject = Column(String(256), nullable=False, comment='主题')
content = Column(Text, nullable=True, comment='内容')
create_time = Column(DateTime, server_default=func.now(), nullable=False, comment='创建时间')
update_time = Column(DateTime, server_default=func.now(), onupdate=func.now(), nullable=False, comment='更新时间')

View File

@ -17,7 +17,7 @@ class TScheduler(Base):
task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据')
active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用')
executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者')
handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序')
description: Optional[str] = Column(String(32), nullable=True, comment='任务描述')
last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间')
next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间')
create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间')

View File

@ -5,3 +5,5 @@ services:
image: arlo:latest
container_name: arlo
restart: always
environment:
- TZ=Asia/Shanghai # 设置时区环境变量

View File

@ -1,7 +1,7 @@
import logging.config
import sys
import config.config
from config import config
"""
Usage:
@ -66,5 +66,11 @@ logger = logging.getLogger('root')
logger.info(log_config_message)
def log(message: str):
logger.info(f'{config.config.scheduler_name} {message}')
def log(message: str) -> None:
"""Helper wrapper to log a message prefixed with the scheduler name.
Kept small and typed to be a safe, low-risk refactor: it unifies how
`config` is imported across the codebase (other modules use
`from config import config`).
"""
logger.info(f'{config.scheduler_name} {message}')

View File

@ -15,7 +15,7 @@ class MailManager:
def send_mail(self, subject: str, content: str, receiver_email:str = "changsongd@126.com"):
message = MIMEMultipart()
message['From'] = self.sender_email
message['From'] = f'Arlo <{self.sender_email}>'
message['To'] = receiver_email
message['Subject'] = subject
message.attach(MIMEText(content, 'plain'))

2
models/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from models.source_content import SourceContent
from models.article import Article

50
models/article.py Normal file
View File

@ -0,0 +1,50 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Text, Integer, DateTime, func
from models.base import Base
class Article(Base):
__tablename__ = "t_article"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
comment="自动递增的唯一内容ID"
)
title: Mapped[str] = mapped_column(
String(256),
nullable=False,
index=True,
comment="标题"
)
keywords: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="关键词"
)
content: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="内容"
)
create_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
comment="创建时间"
)
used: Mapped[bool] = mapped_column(
default=False,
nullable=False,
comment="是否已被使用"
)
def __repr__(self):
return f"<Article id={self.id} title={self.title!r} used={self.used!r}>"

4
models/base.py Normal file
View File

@ -0,0 +1,4 @@
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass

57
models/source_content.py Normal file
View File

@ -0,0 +1,57 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Text, Integer, DateTime, Index, func
from models.base import Base
class SourceContent(Base):
__tablename__ = "t_source_content"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
comment="自动递增的唯一内容ID"
)
link: Mapped[str] = mapped_column(
String(2048),
nullable=False,
index=True,
comment="链接"
)
platform: Mapped[str] = mapped_column(
String(32),
nullable=False,
comment="平台"
)
content: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="内容"
)
create_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
comment="创建时间"
)
update_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
comment="更新时间"
)
# ——可选优化:添加 项目 + 主题 联合唯一索引——
__table_args__ = (
Index("link", "link", unique=True),
)
def __repr__(self):
return f"<SourceContent id={self.id} link={self.link!r} platform={self.platform!r}>"

Binary file not shown.

View File

@ -0,0 +1,24 @@
from database.tscheduler.model import TScheduler
from mail.mail_manager import send_mail
from config.database import SessionLocal
from models import Article
from utils import logger
def common_mail_task(scheduler: TScheduler):
with SessionLocal() as db:
# 获取需要发送的内容列表
articles = db.query(Article).filter(Article.used == False).all()
# 发送邮件
for article in articles:
subject = article.title
content = article.content
send_mail(subject, content, receiver_email="changsongd@126.com")
logger.info(f"send mail success with title {subject}, content {content[:20]}.")
# 更新数据库
for article in articles:
article.used = True
db.commit()
if __name__ == '__main__':
common_mail_task(TScheduler())

View File

@ -12,13 +12,13 @@ from database.tscheduler.crud import get_tasks_by_executor
from log.log_manager import log
"""
这是一个特殊的任务,负责管理任务,命名为管理者任务。
这是一个特殊的任务,负责管理任务,命名为任务管理者任务。
工作流程:
1 检索数据库任务数据表
1 检索数据库任务数据表t_scheduler
2 检查是否已经在任务队列中,如果不在则添加
任务执行时间间隔为600秒。
任务执行时间间隔为{config.scheduler_interval}秒。
"""
@ -26,89 +26,166 @@ from log.log_manager import log
def log_task_execution(task_name: str, start_time: float, end_time: float = None):
"""辅助函数,记录任务的开始和结束日志"""
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
# 如果没有提供 end_time只记录开始日志
if end_time is None:
log(f"{task_name} start execute at {start_time_str}")
else:
elapsed_time = end_time - start_time
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
return
# 否则格式化结束时间并记录耗时
try:
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
except Exception:
end_time_str = str(end_time)
elapsed_time = end_time - start_time
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
def execute_task(task: callable):
"""执行任务并记录日志"""
start_time = time.time()
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
# 获取可读的任务名称
try:
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
except Exception:
task_name = getattr(task, '__name__', repr(task))
log_task_execution(task_name, start_time) # 先记录开始时间
task()
# 捕获任务执行中的异常,防止调度器崩溃
try:
task()
except Exception as e:
end_time = time.time()
log(f"{task_name} raised exception: {e}")
log_task_execution(task_name, start_time, end_time)
return
end_time = time.time()
log_task_execution(task_name, start_time, end_time) # 记录结束时间
# 从数据库加载任务
def load_tasks(scheduler: BlockingScheduler):
def build_db_signature(db_task) -> str:
t = (db_task.trigger or "").lower()
if t == "interval":
sec = int(db_task.interval_seconds or 0)
return f"{db_task.task_name}#interval:seconds={sec}"
if t == "cron":
expr = (db_task.cron_expression or "").strip()
parts = expr.split()
# 规范化:确保 7 个字段,替换 Quartz 的 '?' 为 '*'
if len(parts) == 7:
if parts[5] == '?':
parts[5] = '*'
expr = " ".join(parts)
return f"{db_task.task_name}#cron:{expr}"
if t == "date":
run_date = db_task.execution_date
if isinstance(run_date, datetime):
return f"{db_task.task_name}#date:{run_date.isoformat()}"
try:
return f"{db_task.task_name}#date:{str(run_date)}"
except Exception:
return f"{db_task.task_name}#date:invalid"
return f"{db_task.task_name}#unknown:{t}"
with get_session() as db:
tasks = get_tasks_by_executor(db, config.scheduler_name)
for task in tasks:
module_path = task.module_path
function_name = task.function_name
trigger = task.trigger
interval_seconds = task.interval_seconds
task_id = task.id
for db_task in tasks:
module_path = db_task.module_path
function_name = db_task.function_name
trigger = db_task.trigger
interval_seconds = db_task.interval_seconds
task_id = db_task.id
# 动态导入模块和函数
module = importlib.import_module(module_path)
task_function = partial(execute_task, partial(getattr(module, function_name), task))
# 动态导入模块和函数,失败则记录并跳过该任务
try:
module = importlib.import_module(module_path)
func = getattr(module, function_name)
except Exception as e:
log(f"Failed to import {module_path}.{function_name} for task id={task_id}: {e}")
continue
# 检查任务是否已存在
if not scheduler.get_job(str(task_id)):
# 将数据库任务对象作为参数传入任务函数
task_callable = partial(func, db_task)
task_function = partial(execute_task, task_callable)
# 计算数据库中当前任务的签名
signature = build_db_signature(db_task)
job = scheduler.get_job(str(task_id))
# 如果任务已经存在并且签名相同,则跳过更新
if job and getattr(job, "name", None) == signature:
log(f"Task {db_task.task_name} (id={task_id}) already scheduled and unchanged, skipping.")
continue
try:
if trigger == "interval":
scheduler.add_job(
task_function,
"interval",
seconds=interval_seconds,
seconds=interval_seconds or 0,
id=str(task_id),
replace_existing=True
replace_existing=True,
name=signature
)
log(f"Task {task.task_name} added with interval {interval_seconds} seconds")
log(f"Task {db_task.task_name} added/updated with interval {interval_seconds} seconds")
elif trigger == "cron":
# 解析 cron 表达式的字段
fields = task.cron_expression.split()
# 确保字段长度符合七字段格式
fields = (db_task.cron_expression or "").split()
if len(fields) != 7:
raise ValueError("无效的 Quartz cron 表达式")
# 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*`
log(f"Invalid cron expression for task id={task_id}: {db_task.cron_expression}")
continue
# 将 Quartz 的 `?` 替换为 APScheduler 可接受的 `*`
if fields[5] == '?':
fields[5] = '*' # 替换 `day_of_week` 字段中的 `?`
# 使用 cron 表达式的字段添加任务
fields[5] = '*'
scheduler.add_job(
task_function, # 要执行的任务
'cron', # 使用 cron 触发器
second=fields[0], # 秒
minute=fields[1], # 分钟
hour=fields[2], # 小时
day=fields[3], # 日期
month=fields[4], # 月份
day_of_week=fields[5], # 星期
year=fields[6], # 年份
task_function,
'cron',
second=fields[0],
minute=fields[1],
hour=fields[2],
day=fields[3],
month=fields[4],
day_of_week=fields[5],
year=fields[6],
id=str(task_id),
replace_existing=True
replace_existing=True,
name=signature
)
log(f"Task {task.task_name} added with cron {task.cron_expression}")
log(f"Task {db_task.task_name} added/updated with cron {db_task.cron_expression}")
elif trigger == "date":
# 如果task.execution_date为None则设置为当前时间10秒后
if task.execution_date is None :
task.execution_date = datetime.now() + timedelta(seconds=10)
run_date = db_task.execution_date
# 如果 execution_date 为 None 或不是 datetime尝试解析或回退为现在+10s
if run_date is None or not isinstance(run_date, datetime):
try:
# 尝试解析 ISO 格式字符串(如果传入的是字符串)
if isinstance(run_date, str):
run_date = datetime.fromisoformat(run_date)
else:
run_date = datetime.now() + timedelta(seconds=10)
except Exception:
run_date = datetime.now() + timedelta(seconds=10)
scheduler.add_job(
task_function,
"date",
run_date=task.execution_date,
run_date=run_date,
id=str(task_id),
replace_existing=True
replace_existing=True,
name=signature
)
log(f"Task {task.task_name} added with date {task.execution_date}")
log(f"Task {db_task.task_name} added/updated with date {run_date}")
else:
log(f"Invalid trigger type: {trigger}")
log(f"Invalid trigger type for task id={task_id}: {trigger}")
except Exception as e:
log(f"Failed to schedule task id={task_id}: {e}")
# 管理者任务
# 任务管理者任务
def manager_task(scheduler: BlockingScheduler):
load_tasks(scheduler)

1
utils/__init__.py Normal file
View File

@ -0,0 +1 @@
from utils.logger import logger

37
utils/logger.py Normal file
View File

@ -0,0 +1,37 @@
import sys
import os
from loguru import logger
from config.settings import settings
# 移除默认的 handler否则重复输出
logger.remove()
if "console" in settings.LOG_TYPE:
# ======== 控制台输出 ========
logger.add(
sys.stdout,
level=settings.LOG_LEVEL,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> "
"| <level>{level: <8}</level> "
"| <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> "
"- <level>{message}</level>",
)
if "file" in settings.LOG_TYPE:
# 日志目录
LOG_DIR = settings.LOG_FILE_PATH
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# ======== 文件输出(按天切割)========
logger.add(
f"{LOG_DIR}/app_{{time:YYYY-MM-DD}}.log",
rotation="00:00", # 每天 0 点切割
retention="7 days", # 保存 7 天
encoding="utf-8",
level=settings.LOG_LEVEL,
enqueue=True, # 多线程安全
compression="zip", # 自动压缩旧日志
)
__all__ = ["logger"]