Compare commits

...

13 Commits

Author SHA1 Message Date
491061acc7 add 数据库超时重试机制
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-27 10:29:29 +08:00
f93fda8efa add job 执行异常的 邮件报警逻辑
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-20 19:03:02 +08:00
ac6c881763 调整豆瓣小组帖子获取逻辑
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-20 18:51:35 +08:00
43a3fdcded add log type file
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-15 21:56:31 +08:00
ebcfd97893 fix: TypeError: unsupported operand type(s) for -: 'datetime.datetime' and 'NoneType'
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-15 21:51:35 +08:00
780d025e18 modify gitea action
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-15 20:10:20 +08:00
79d86988d1 modify gitea action
Some checks failed
Gitea Actions Demo / host-commands (push) Failing after 0s
2026-02-15 19:53:15 +08:00
4471bbfdc8 modify gitea action
Some checks failed
Gitea Actions Demo / host-commands (push) Failing after 0s
2026-02-15 19:51:31 +08:00
b453e8ccbc modify gitea action
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-15 19:21:52 +08:00
0f97d71a6b change gitea action
Some checks failed
Gitea Actions Demo / host-commands (push) Has been cancelled
2026-02-15 19:16:42 +08:00
a7d5306acc db: add table article 2026-02-15 15:27:21 +08:00
ddacea9166 container add timezone
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 13s
2026-02-15 13:09:34 +08:00
b267a3276c requirements: add psycopg-binary
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 12s
2026-02-15 12:42:52 +08:00
25 changed files with 296 additions and 43 deletions

12
.env
View File

@ -4,7 +4,8 @@ DEBUG=true
# 日志配置 # 日志配置
LOG_LEVEL=DEBUG LOG_LEVEL=DEBUG
LOG_TYPE=console LOG_TYPE=console,file
LOG_FILE_PATH=logs
# 数据库配置 # 数据库配置
DB_HOST= 47.119.128.161 # 192.168.1.200 DB_HOST= 47.119.128.161 # 192.168.1.200
@ -12,3 +13,12 @@ DB_PORT=19732
DB_USER=postgres DB_USER=postgres
DB_PASS=postgres DB_PASS=postgres
DB_NAME=peter DB_NAME=peter
# 邮件发送配置
SMTP_HOST=smtp.qq.com
SMTP_PORT=587
SMTP_USER=1026090807@qq.com
SMTP_PASSWORD=hqorvminmnuubebf
SMTP_USE_TLS=true
SMTP_FROM=Peter
ERROR_NOTIFICATION_EMAIL=changsongd@126.com

View File

@ -3,28 +3,10 @@ run-name: ${{ gitea.actor }} is testing out Gitea Actions 🚀
on: [push] on: [push]
jobs: jobs:
deploy: # 一个直接在宿主机上执行的 Job
runs-on: ubuntu-latest host-commands:
container: runs-on: mac # 需要有一个标签为 mac 的 runner 运行在宿主机上
image: gitea/runner-images:ubuntu-latest
steps: steps:
- name: clone project code - name: 在宿主机上执行命令
run: git clone ${{ gitea.server_url }}/${{ gitea.repository }} .
- name: List files
run: ls -la
- name: Stop running containers
run: | run: |
docker compose down || true echo "This command runs directly on the host machine"
- name: Remove old image
run: |
IMAGE_NAME=$(basename "$PWD")
echo "Removing old image: $IMAGE_NAME"
docker images | grep "$IMAGE_NAME" && docker rmi -f $(docker images "$IMAGE_NAME" -q) || echo "No old image found."
- name: Build new image
run: |
docker build -t $(basename "$PWD"):latest .
- name: Start containers
run: |
docker compose up -d
- name: Show container status
run: docker ps

1
.gitignore vendored
View File

@ -1 +1,2 @@
__pycache__ __pycache__
logs

View File

@ -22,6 +22,15 @@ class Settings(BaseSettings):
DB_PASS: str DB_PASS: str
DB_NAME: str DB_NAME: str
# 邮件发送配置
SMTP_HOST: str = Field("SMTP_HOST")
SMTP_PORT: int = Field(587, env="SMTP_PORT")
SMTP_USER: str = Field("SMTP_USER")
SMTP_PASSWORD: str = Field("SMTP_PASSWORD")
SMTP_USE_TLS: bool = Field(True, env="SMTP_USE_TLS")
SMTP_FROM: str = Field("SMTP_FROM")
ERROR_NOTIFICATION_EMAIL: str = Field("ERROR_NOTIFICATION_EMAIL")
class Config: class Config:
env_file = ".env" env_file = ".env"
env_file_encoding = "utf-8" env_file_encoding = "utf-8"

View File

@ -5,3 +5,5 @@ services:
image: peter:latest image: peter:latest
container_name: peter container_name: peter
restart: always restart: always
environment:
- TZ=Asia/Shanghai # 设置时区环境变量

View File

@ -7,6 +7,7 @@ from alembic import context
from models.base import Base from models.base import Base
import models.source_content # 导入模型以注册到 Base.metadata import models.source_content # 导入模型以注册到 Base.metadata
import models.article # 导入模型以注册到 Base.metadata
# this is the Alembic Config object, which provides # this is the Alembic Config object, which provides
# access to the values within the .ini file in use. # access to the values within the .ini file in use.

View File

@ -0,0 +1,42 @@
"""add table article
Revision ID: 7de2da2e824b
Revises: dfad2ce9d3b7
Create Date: 2026-02-15 15:11:54.662014
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '7de2da2e824b'
down_revision: Union[str, Sequence[str], None] = 'dfad2ce9d3b7'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('t_article',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False, comment='自动递增的唯一内容ID'),
sa.Column('title', sa.String(length=256), nullable=False, comment='标题'),
sa.Column('keywords', sa.Text(), nullable=True, comment='关键词'),
sa.Column('content', sa.Text(), nullable=True, comment='内容'),
sa.Column('create_time', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False, comment='创建时间'),
sa.Column('used', sa.Boolean(), nullable=False, comment='是否已被使用'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_t_article_title'), 't_article', ['title'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_t_article_title'), table_name='t_article')
op.drop_table('t_article')
# ### end Alembic commands ###

View File

@ -0,0 +1,32 @@
"""add table article
Revision ID: dfad2ce9d3b7
Revises: fc8b7693c66b
Create Date: 2026-02-15 15:09:51.788382
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'dfad2ce9d3b7'
down_revision: Union[str, Sequence[str], None] = 'fc8b7693c66b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

2
models/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from models.source_content import SourceContent
from models.article import Article

50
models/article.py Normal file
View File

@ -0,0 +1,50 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Text, Integer, DateTime, func
from models.base import Base
class Article(Base):
__tablename__ = "t_article"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
comment="自动递增的唯一内容ID"
)
title: Mapped[str] = mapped_column(
String(256),
nullable=False,
index=True,
comment="标题"
)
keywords: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="关键词"
)
content: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="内容"
)
create_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
comment="创建时间"
)
used: Mapped[bool] = mapped_column(
default=False,
nullable=False,
comment="是否已被使用"
)
def __repr__(self):
return f"<Article id={self.id} title={self.title!r} used={self.used!r}>"

View File

@ -2,15 +2,24 @@
from functools import partial from functools import partial
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_ERROR from apscheduler.events import EVENT_JOB_ERROR
from config import config from config import config, settings
from task import manager_task from task import manager_task
from utils import logger from utils import logger, MailSender
def job_error_listener(event): def job_error_listener(event):
if event.exception: if event.exception:
logger.error(f"Job {event.job_id} crashed: {str(event.exception)}") logger.error(f"Job {event.job_id} crashed: {str(event.exception)}")
# 可添加邮件/钉钉告警逻辑 # 可添加邮件/钉钉告警逻辑
try:
mail_sender = MailSender()
mail_sender.execute(
to_addrs=settings.ERROR_NOTIFICATION_EMAIL,
subject=f"Job {event.job_id} crashed",
body=f"Job {event.job_id} crashed with error: {str(event.exception)}"
)
except Exception as e:
logger.error(f"Failed to send error notification email: {e}")
if __name__ == '__main__': if __name__ == '__main__':

Binary file not shown.

View File

@ -1,6 +1,7 @@
import json import json
from time import sleep import time
from DrissionPage import Chromium, ChromiumOptions from DrissionPage import Chromium, ChromiumOptions
from psycopg import OperationalError
from config.database import SessionLocal from config.database import SessionLocal
from models.source_content import SourceContent from models.source_content import SourceContent
from utils import logger from utils import logger
@ -12,10 +13,42 @@ class DoubanGroupSeek:
self.browser = Chromium(addr_or_opts=co) self.browser = Chromium(addr_or_opts=co)
self.group_id = group_id self.group_id = group_id
def fetch_with_retry(self, session, model, link, max_retries=3, base_delay=1):
"""
带重试的数据库查询函数
:param session: SQLAlchemy 会话
:param model: 模型类
:param link: 要查询的链接
:param max_retries: 最大重试次数
:param base_delay: 初始延迟时间(秒)
:return: 查询结果或 None
"""
for attempt in range(1, max_retries + 1):
try:
# 执行查询
result = session.query(model.id).filter(model.link == link).first()
return result # 成功则返回结果
except OperationalError as e:
# 打印错误信息(可选)
logger.error(f"数据库查询失败 (尝试 {attempt}/{max_retries}): {e}")
# 回滚当前会话,避免事务挂起影响后续操作
session.rollback()
if attempt == max_retries:
# 最后一次重试失败,抛出异常或记录错误后返回 None
logger.info(f"已达到最大重试次数,放弃查询链接: {link}")
raise # 或者返回 None根据业务决定
# 计算等待时间:指数退避
sleep_time = base_delay * (2 ** (attempt - 1))
time.sleep(sleep_time)
except Exception as e:
# 如果还想捕获其他数据库错误,可以统一处理
logger.error(f"数据库错误: {e}")
session.rollback()
raise # 非临时性错误,直接抛出
return None # 正常情况下不会执行到这里
def seek(self): def seek(self):
db = SessionLocal() db = SessionLocal()
# 获取最近100条数据的URL列表用于过滤掉已存在的URL避免重复爬取和存储
recent_contents = db.query(SourceContent).order_by(SourceContent.id.desc()).limit(100).all()
group_url = f'https://www.douban.com/group/{self.group_id}' group_url = f'https://www.douban.com/group/{self.group_id}'
tab = self.browser.new_tab() tab = self.browser.new_tab()
@ -28,20 +61,30 @@ class DoubanGroupSeek:
for ele_tr in ele_trs: for ele_tr in ele_trs:
topic_title = ele_tr.ele('tag:a').text topic_title = ele_tr.ele('tag:a').text
topic_url = ele_tr.ele('tag:a').attr('href') topic_url = ele_tr.ele('tag:a').attr('href')
topics.append((topic_title, topic_url)) update_time = ele_tr.ele('.time').text
topics.append((topic_title, topic_url, update_time))
# 过滤掉已存在的URL # 去掉两个置顶的帖子根据title包含“置顶”来判断因为置顶的帖子一般情况是组规则和公告
existing_urls = set(content.link for content in recent_contents) topics = [(title, url, update_time) for title, url, update_time in topics if "置顶" not in title]
topics = [(title, url) for title, url in topics if url not in existing_urls]
# 根据更新时间过滤update_time格式为“10-18 12:34”只保留24小时内的帖子
time_str = time.strftime("%m-%d %H:%M", time.localtime(time.time() - 24 * 3600))
topics = [(title, url, update_time) for title, url, update_time in topics if update_time >= time_str]
# 打印要爬取的主题列表 # 打印要爬取的主题列表
logger.info(f"Found {len(topics)} new topics to crawl:") logger.info(f"Found {len(topics)} potential new topics to crawl:")
for topic_title, topic_url in topics:
logger.info(f"标题:{topic_title} 链接:{topic_url}\n")
results = [] results = []
for topic_title, topic_url in topics: for topic_title, topic_url, update_time in topics:
logger.info(f"fetch 标题:{topic_title} 链接:{topic_url}\n") # 检索数据库根据topic_url查询是否已存在
# existing_content = db.query(SourceContent.id).filter(SourceContent.link == topic_url).first()
existing_content = self.fetch_with_retry(db, SourceContent, topic_url)
if existing_content:
# logger.info(f"Topic already exists in database, skipping: {topic_title}:{topic_url}")
continue
logger.info(f"fetch 标题:{topic_title} 链接:{topic_url} 更新时间:{update_time}\n")
tab.get(topic_url) tab.get(topic_url)
tab.wait(30) # 等待页面加载完成,时间可根据实际情况调整 tab.wait(30) # 等待页面加载完成,时间可根据实际情况调整
try: try:

View File

@ -104,8 +104,6 @@ def load_tasks(scheduler: BlockingScheduler):
logger.warning(f"Task Invalid trigger type: {trigger}") logger.warning(f"Task Invalid trigger type: {trigger}")
else: else:
logger.info(f"Task {task.task_name} already exists......") logger.info(f"Task {task.task_name} already exists......")
run_time = job.next_run_time - job.trigger.start_date
logger.info(f"Task {task.task_name} already exists, run time is {run_time}")
# 管理者任务 # 管理者任务
def manager_task(scheduler: BlockingScheduler): def manager_task(scheduler: BlockingScheduler):

View File

@ -1 +1,2 @@
from utils.logger import logger from utils.logger import logger
from utils.mail_sender import MailSender

71
utils/mail_sender.py Normal file
View File

@ -0,0 +1,71 @@
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
from utils import logger
from config import settings
class MailSender:
"""邮件发送工具从环境变量读取SMTP配置"""
def execute(self, to_addrs, subject, body, from_addr=None, body_type='plain'):
"""
发送邮件
:param to_addrs: 收件人,可以是单个邮箱字符串或邮箱列表
:param subject: 邮件主题
:param body: 邮件正文
:param from_addr: 发件人地址若为None则使用 SMTP_USER
:param body_type: 正文类型,'plain''html'
:return: True 表示发送成功
:raises ValueError: 配置缺失或发送失败时抛出异常
"""
# ----- 1. 获取SMTP配置 -----
smtp_host = settings.SMTP_HOST
smtp_port = settings.SMTP_PORT
username = settings.SMTP_USER
password = settings.SMTP_PASSWORD
use_tls = settings.SMTP_USE_TLS
from_name = settings.SMTP_FROM # 发件人显示名称,可选
# 必要配置校验
if not all([smtp_host, smtp_port, username, password]):
missing = []
if not smtp_host: missing.append(self.ENV_SMTP_HOST)
if not smtp_port: missing.append(self.ENV_SMTP_PORT)
if not username: missing.append(self.ENV_SMTP_USER)
if not password: missing.append(self.ENV_SMTP_PASSWORD)
raise ValueError(f"缺少必要的SMTP环境变量: {', '.join(missing)}")
# ----- 2. 构建邮件对象 -----
# 处理收件人格式
if isinstance(to_addrs, str):
to_addrs = [to_addrs]
to_str = ','.join(to_addrs)
# 确定发件人地址
if from_addr is None:
from_addr = username
# 创建邮件正文
msg = MIMEText(body, body_type, 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
# 发件人:可包含显示名称
if from_name:
msg['From'] = formataddr((Header(from_name, 'utf-8').encode(), from_addr))
else:
msg['From'] = from_addr
msg['To'] = to_str
# ----- 3. 发送邮件 -----
try:
logger.info(f"准备发送邮件,收件人: {to_str}, 主题: {subject}")
with smtplib.SMTP(smtp_host, int(smtp_port)) as server:
if use_tls:
server.starttls()
server.login(username, password)
server.send_message(msg)
logger.info(f"邮件发送成功,收件人: {to_str}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {e}")
raise ValueError(f"邮件发送失败: {e}")