Compare commits

...

16 Commits

Author SHA1 Message Date
491061acc7 add 数据库超时重试机制
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-27 10:29:29 +08:00
f93fda8efa add job 执行异常的 邮件报警逻辑
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-20 19:03:02 +08:00
ac6c881763 调整豆瓣小组帖子获取逻辑
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-20 18:51:35 +08:00
43a3fdcded add log type file
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-15 21:56:31 +08:00
ebcfd97893 fix: TypeError: unsupported operand type(s) for -: 'datetime.datetime' and 'NoneType'
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-15 21:51:35 +08:00
780d025e18 modify gitea action
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-15 20:10:20 +08:00
79d86988d1 modify gitea action
Some checks failed
Gitea Actions Demo / host-commands (push) Failing after 0s
2026-02-15 19:53:15 +08:00
4471bbfdc8 modify gitea action
Some checks failed
Gitea Actions Demo / host-commands (push) Failing after 0s
2026-02-15 19:51:31 +08:00
b453e8ccbc modify gitea action
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
2026-02-15 19:21:52 +08:00
0f97d71a6b change gitea action
Some checks failed
Gitea Actions Demo / host-commands (push) Has been cancelled
2026-02-15 19:16:42 +08:00
a7d5306acc db: add table article 2026-02-15 15:27:21 +08:00
ddacea9166 container add timezone
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 13s
2026-02-15 13:09:34 +08:00
b267a3276c requirements: add psycopg-binary
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 12s
2026-02-15 12:42:52 +08:00
51d1c403f5 seek: douban group
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 33s
2026-02-15 12:37:48 +08:00
72b117b57c add table source_content 2026-02-14 19:10:34 +08:00
33366f5339 根据数据库同步修改 2026-02-14 17:17:14 +08:00
38 changed files with 897 additions and 43 deletions

24
.env Normal file
View File

@ -0,0 +1,24 @@
ENV=dev
DEBUG=true
# 日志配置
LOG_LEVEL=DEBUG
LOG_TYPE=console,file
LOG_FILE_PATH=logs
# 数据库配置
DB_HOST= 47.119.128.161 # 192.168.1.200
DB_PORT=19732
DB_USER=postgres
DB_PASS=postgres
DB_NAME=peter
# 邮件发送配置
SMTP_HOST=smtp.qq.com
SMTP_PORT=587
SMTP_USER=1026090807@qq.com
SMTP_PASSWORD=hqorvminmnuubebf
SMTP_USE_TLS=true
SMTP_FROM=Peter
ERROR_NOTIFICATION_EMAIL=changsongd@126.com

View File

@ -3,28 +3,10 @@ run-name: ${{ gitea.actor }} is testing out Gitea Actions 🚀
on: [push]
jobs:
deploy:
runs-on: ubuntu-latest
container:
image: gitea/runner-images:ubuntu-latest
# 一个直接在宿主机上执行的 Job
host-commands:
runs-on: mac # 需要有一个标签为 mac 的 runner 运行在宿主机上
steps:
- name: clone project code
run: git clone ${{ gitea.server_url }}/${{ gitea.repository }} .
- name: List files
run: ls -la
- name: Stop running containers
- name: 在宿主机上执行命令
run: |
docker compose down || true
- name: Remove old image
run: |
IMAGE_NAME=$(basename "$PWD")
echo "Removing old image: $IMAGE_NAME"
docker images | grep "$IMAGE_NAME" && docker rmi -f $(docker images "$IMAGE_NAME" -q) || echo "No old image found."
- name: Build new image
run: |
docker build -t $(basename "$PWD"):latest .
- name: Start containers
run: |
docker compose up -d
- name: Show container status
run: docker ps
echo "This command runs directly on the host machine"

3
.gitignore vendored
View File

@ -1 +1,2 @@
__pycache__
__pycache__
logs

149
alembic.ini Normal file
View File

@ -0,0 +1,149 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/migrations
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# Or organize into date-based subdirectories (requires recursive_version_locations = true)
# file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the tzdata library which can be installed by adding
# `alembic[tz]` to the pip requirements.
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
sqlalchemy.url = postgresql+psycopg://postgres:postgres@192.168.1.200:19732/peter
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
config/__init__.py Normal file
View File

@ -0,0 +1 @@
from config.settings import settings

18
config/database.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from config.settings import settings
SQLALCHEMY_SYNC_URL = (
f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASS}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
engine = create_engine(
SQLALCHEMY_SYNC_URL,
echo=False, # 开发可改 True
future=True
)
SessionLocal = scoped_session(
sessionmaker(bind=engine, autoflush=False, autocommit=False)
)

22
config/env_loader.py Normal file
View File

@ -0,0 +1,22 @@
import os
from dotenv import load_dotenv
def load_env():
"""
自动根据 ENV 加载对应的 .env 文件
"""
base_file = ".env"
prod_file = ".env.prod"
test_file = ".env.test"
# 先加载基础 .env
if os.path.exists(base_file):
load_dotenv(base_file)
# 根据参数 ENV 再加载其他环境
env = os.getenv("ENV", "dev")
if env == "prod" and os.path.exists(prod_file):
load_dotenv(prod_file, override=True)
elif env == "test" and os.path.exists(test_file):
load_dotenv(test_file, override=True)

40
config/settings.py Normal file
View File

@ -0,0 +1,40 @@
from pydantic_settings import BaseSettings
from pydantic import Field
from config.env_loader import load_env
# 先加载 ENV & .env
load_env()
class Settings(BaseSettings):
# 环境
ENV: str = Field("dev")
DEBUG: bool = Field(True)
# 日志
LOG_LEVEL: str = Field("LOG_LEVEL")
LOG_FILE_PATH: str = Field("logs")
LOG_TYPE: str = Field("console")
# 数据库
DB_HOST: str
DB_PORT: int
DB_USER: str
DB_PASS: str
DB_NAME: str
# 邮件发送配置
SMTP_HOST: str = Field("SMTP_HOST")
SMTP_PORT: int = Field(587, env="SMTP_PORT")
SMTP_USER: str = Field("SMTP_USER")
SMTP_PASSWORD: str = Field("SMTP_PASSWORD")
SMTP_USE_TLS: bool = Field(True, env="SMTP_USE_TLS")
SMTP_FROM: str = Field("SMTP_FROM")
ERROR_NOTIFICATION_EMAIL: str = Field("ERROR_NOTIFICATION_EMAIL")
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# 全局唯一配置实例
settings = Settings()

View File

@ -17,7 +17,7 @@ class TScheduler(Base):
task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据')
active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用')
executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者')
handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序')
description: Optional[str] = Column(String(256), nullable=True, comment='任务描述')
last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间')
next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间')
create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间')

View File

@ -5,3 +5,5 @@ services:
image: peter:latest
container_name: peter
restart: always
environment:
- TZ=Asia/Shanghai # 设置时区环境变量

1
migrations/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

82
migrations/env.py Normal file
View File

@ -0,0 +1,82 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from models.base import Base
import models.source_content # 导入模型以注册到 Base.metadata
import models.article # 导入模型以注册到 Base.metadata
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

28
migrations/script.py.mako Normal file
View File

@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,42 @@
"""add table article
Revision ID: 7de2da2e824b
Revises: dfad2ce9d3b7
Create Date: 2026-02-15 15:11:54.662014
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '7de2da2e824b'
down_revision: Union[str, Sequence[str], None] = 'dfad2ce9d3b7'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('t_article',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False, comment='自动递增的唯一内容ID'),
sa.Column('title', sa.String(length=256), nullable=False, comment='标题'),
sa.Column('keywords', sa.Text(), nullable=True, comment='关键词'),
sa.Column('content', sa.Text(), nullable=True, comment='内容'),
sa.Column('create_time', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False, comment='创建时间'),
sa.Column('used', sa.Boolean(), nullable=False, comment='是否已被使用'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_t_article_title'), 't_article', ['title'], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_t_article_title'), table_name='t_article')
op.drop_table('t_article')
# ### end Alembic commands ###

View File

@ -0,0 +1,32 @@
"""add table article
Revision ID: dfad2ce9d3b7
Revises: fc8b7693c66b
Create Date: 2026-02-15 15:09:51.788382
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'dfad2ce9d3b7'
down_revision: Union[str, Sequence[str], None] = 'fc8b7693c66b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
pass
# ### end Alembic commands ###

View File

@ -0,0 +1,44 @@
"""create table source_content
Revision ID: fc8b7693c66b
Revises:
Create Date: 2026-02-14 19:07:21.557137
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'fc8b7693c66b'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('t_source_content',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False, comment='自动递增的唯一内容ID'),
sa.Column('link', sa.String(length=2048), nullable=False, comment='链接'),
sa.Column('platform', sa.String(length=32), nullable=False, comment='平台'),
sa.Column('content', sa.Text(), nullable=True, comment='内容'),
sa.Column('create_time', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False, comment='创建时间'),
sa.Column('update_time', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False, comment='更新时间'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_t_source_content_link'), 't_source_content', ['link'], unique=False)
op.create_index('link', 't_source_content', ['link'], unique=True)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('link', table_name='t_source_content')
op.drop_index(op.f('ix_t_source_content_link'), table_name='t_source_content')
op.drop_table('t_source_content')
# ### end Alembic commands ###

2
models/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from models.source_content import SourceContent
from models.article import Article

50
models/article.py Normal file
View File

@ -0,0 +1,50 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Text, Integer, DateTime, func
from models.base import Base
class Article(Base):
__tablename__ = "t_article"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
comment="自动递增的唯一内容ID"
)
title: Mapped[str] = mapped_column(
String(256),
nullable=False,
index=True,
comment="标题"
)
keywords: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="关键词"
)
content: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="内容"
)
create_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
comment="创建时间"
)
used: Mapped[bool] = mapped_column(
default=False,
nullable=False,
comment="是否已被使用"
)
def __repr__(self):
return f"<Article id={self.id} title={self.title!r} used={self.used!r}>"

4
models/base.py Normal file
View File

@ -0,0 +1,4 @@
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass

57
models/source_content.py Normal file
View File

@ -0,0 +1,57 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Text, Integer, DateTime, Index, func
from models.base import Base
class SourceContent(Base):
__tablename__ = "t_source_content"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
comment="自动递增的唯一内容ID"
)
link: Mapped[str] = mapped_column(
String(2048),
nullable=False,
index=True,
comment="链接"
)
platform: Mapped[str] = mapped_column(
String(32),
nullable=False,
comment="平台"
)
content: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="内容"
)
create_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
comment="创建时间"
)
update_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
comment="更新时间"
)
# ——可选优化:添加 项目 + 主题 联合唯一索引——
__table_args__ = (
Index("link", "link", unique=True),
)
def __repr__(self):
return f"<SourceContent id={self.id} link={self.link!r} platform={self.platform!r}>"

View File

@ -1,18 +1,25 @@
import datetime
from functools import partial
from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from log.log_manager import log, logger
from task.manager_task import manager_task
from apscheduler.events import EVENT_JOB_ERROR
from config import config, settings
from task import manager_task
from utils import logger, MailSender
def job_error_listener(event):
if event.exception:
logger.error(f"Job {event.job_id} crashed: {str(event.exception)}")
# 可添加邮件/钉钉告警逻辑
try:
mail_sender = MailSender()
mail_sender.execute(
to_addrs=settings.ERROR_NOTIFICATION_EMAIL,
subject=f"Job {event.job_id} crashed",
body=f"Job {event.job_id} crashed with error: {str(event.exception)}"
)
except Exception as e:
logger.error(f"Failed to send error notification email: {e}")
if __name__ == '__main__':
@ -31,7 +38,7 @@ if __name__ == '__main__':
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
try:
log(f"started successfully.")
logger.info(f"started successfully.")
scheduler.start() # 阻塞运行
except (KeyboardInterrupt, SystemExit):
log(f"Shutting down ...")
logger.info(f"Shutting down ...")

Binary file not shown.

View File

@ -0,0 +1,141 @@
import json
import time
from DrissionPage import Chromium, ChromiumOptions
from psycopg import OperationalError
from config.database import SessionLocal
from models.source_content import SourceContent
from utils import logger
class DoubanGroupSeek:
def __init__(self, group_id):
co = ChromiumOptions().set_local_port(9333)
self.browser = Chromium(addr_or_opts=co)
self.group_id = group_id
def fetch_with_retry(self, session, model, link, max_retries=3, base_delay=1):
"""
带重试的数据库查询函数
:param session: SQLAlchemy 会话
:param model: 模型类
:param link: 要查询的链接
:param max_retries: 最大重试次数
:param base_delay: 初始延迟时间(秒)
:return: 查询结果或 None
"""
for attempt in range(1, max_retries + 1):
try:
# 执行查询
result = session.query(model.id).filter(model.link == link).first()
return result # 成功则返回结果
except OperationalError as e:
# 打印错误信息(可选)
logger.error(f"数据库查询失败 (尝试 {attempt}/{max_retries}): {e}")
# 回滚当前会话,避免事务挂起影响后续操作
session.rollback()
if attempt == max_retries:
# 最后一次重试失败,抛出异常或记录错误后返回 None
logger.info(f"已达到最大重试次数,放弃查询链接: {link}")
raise # 或者返回 None根据业务决定
# 计算等待时间:指数退避
sleep_time = base_delay * (2 ** (attempt - 1))
time.sleep(sleep_time)
except Exception as e:
# 如果还想捕获其他数据库错误,可以统一处理
logger.error(f"数据库错误: {e}")
session.rollback()
raise # 非临时性错误,直接抛出
return None # 正常情况下不会执行到这里
def seek(self):
db = SessionLocal()
group_url = f'https://www.douban.com/group/{self.group_id}'
tab = self.browser.new_tab()
tab.get(group_url)
title = tab.title
ele_table = tab.ele('tag:table@class=olt')
ele_trs = ele_table.eles('tag:tr@!class=th')
topics = []
for ele_tr in ele_trs:
topic_title = ele_tr.ele('tag:a').text
topic_url = ele_tr.ele('tag:a').attr('href')
update_time = ele_tr.ele('.time').text
topics.append((topic_title, topic_url, update_time))
# 去掉两个置顶的帖子根据title包含“置顶”来判断因为置顶的帖子一般情况是组规则和公告
topics = [(title, url, update_time) for title, url, update_time in topics if "置顶" not in title]
# 根据更新时间过滤update_time格式为“10-18 12:34”只保留24小时内的帖子
time_str = time.strftime("%m-%d %H:%M", time.localtime(time.time() - 24 * 3600))
topics = [(title, url, update_time) for title, url, update_time in topics if update_time >= time_str]
# 打印要爬取的主题列表
logger.info(f"Found {len(topics)} potential new topics to crawl:")
results = []
for topic_title, topic_url, update_time in topics:
# 检索数据库根据topic_url查询是否已存在
# existing_content = db.query(SourceContent.id).filter(SourceContent.link == topic_url).first()
existing_content = self.fetch_with_retry(db, SourceContent, topic_url)
if existing_content:
# logger.info(f"Topic already exists in database, skipping: {topic_title}:{topic_url}")
continue
logger.info(f"fetch 标题:{topic_title} 链接:{topic_url} 更新时间:{update_time}\n")
tab.get(topic_url)
tab.wait(30) # 等待页面加载完成,时间可根据实际情况调整
try:
title = tab.title
ele_article = tab.ele('.article')
# 获取帖子内容、发布时间、IP地址位置、作者等信息
ele_topic_content = ele_article.ele('#topic-content')
ele_topic_doc = ele_topic_content.ele('.topic-doc')
content = ele_topic_doc.ele('.topic-content').text
post_time = ele_topic_doc.ele('.create-time').text
ip_location = ele_topic_doc.ele('.ip-location').text
author = ele_topic_doc.ele('.from').text
# 获取评论列表
comments = []
# 评论不一定存在,需先判断
try:
ele_comments = ele_article.ele('#comments')
ele_comments_list = ele_comments.eles('tag:li')
for ele_comment in ele_comments_list:
comment_content = ele_comment.ele('.reply-content').text
comment_time = ele_comment.ele('.pubtime').text
comment_author = ele_comment.ele('tag:h4').child().text
comments.append({
"comment_content": comment_content,
"comment_time": comment_time,
"comment_author": comment_author
})
except Exception as e:
logger.warning(f"No comments found for topic {topic_title}:{topic_url}: {str(e)}")
results.append((topic_url, json.dumps({
"title": title,
"content": content,
"post_time": post_time,
"ip_location": ip_location,
"author": author,
"comments": comments
}, ensure_ascii=False)))
except Exception as e:
logger.error(f"Error processing topic {topic_title}:{topic_url}: {str(e)}")
continue
# 存入数据库
for topic_url, data in results:
source_content = SourceContent(
link=topic_url,
platform='douban',
content=data
)
db.add(source_content)
db.commit()
tab.close()

2
task/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from task.manager_task import manager_task
from task.hot_topic.douban import spider_task as douban_hot_topic_task

15
task/hot_topic/douban.py Normal file
View File

@ -0,0 +1,15 @@
from task.manager_task import execute_task
from seek.douban_com.douban_group_seek import DoubanGroupSeek
from utils import logger
def spider_task():
logger.info(f"Douban hot topic spider task start execute......")
# 社畜买房共进会 小组 ID 为 677158
douban_group_seek = DoubanGroupSeek(group_id=677158)
douban_group_seek.seek()
logger.info(f"Douban hot topic spider task end execute......")
if __name__ == '__main__':
execute_task(spider_task)

View File

@ -6,7 +6,7 @@ from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from database.database import get_session
from database.tscheduler.crud import get_tasks_by_executor
from log.log_manager import log
from utils import logger
"""
这是一个特殊的任务,负责管理任务,命名为管理者任务。
@ -24,10 +24,10 @@ def log_task_execution(task_name: str, start_time: float, end_time: float = None
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
if end_time is None:
log(f"{task_name} start execute at {start_time_str}")
logger.info(f"{task_name} start execute at {start_time_str}")
else:
elapsed_time = end_time - start_time
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
logger.info(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
def execute_task(task: callable):
@ -66,7 +66,7 @@ def load_tasks(scheduler: BlockingScheduler):
replace_existing=True,
misfire_grace_time=interval_seconds
)
log(f"Task {task.task_name} added with interval {interval_seconds} seconds")
logger.info(f"Task {task.task_name} added with interval {interval_seconds} seconds")
elif trigger == "cron":
# 解析 cron 表达式的字段
fields = task.cron_expression.split()
@ -90,7 +90,7 @@ def load_tasks(scheduler: BlockingScheduler):
id=str(task_id),
replace_existing=True
)
log(f"Task {task.task_name} added with cron {task.cron_expression}")
logger.info(f"Task {task.task_name} added with cron {task.cron_expression}")
elif trigger == "date":
scheduler.add_job(
task_function,
@ -99,13 +99,11 @@ def load_tasks(scheduler: BlockingScheduler):
id=str(task_id),
replace_existing=True
)
log(f"Task {task.task_name} added with date {task.execution_date}")
logger.info(f"Task {task.task_name} added with date {task.execution_date}")
else:
log(f"Task Invalid trigger type: {trigger}")
logger.warning(f"Task Invalid trigger type: {trigger}")
else:
log(f"Task {task.task_name} already exists......")
run_time = job.next_run_time - job.trigger.start_date
log(f"Task {task.task_name} already exists, run time is {run_time}")
logger.info(f"Task {task.task_name} already exists......")
# 管理者任务
def manager_task(scheduler: BlockingScheduler):

2
utils/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from utils.logger import logger
from utils.mail_sender import MailSender

37
utils/logger.py Normal file
View File

@ -0,0 +1,37 @@
import sys
import os
from loguru import logger
from config.settings import settings
# 移除默认的 handler否则重复输出
logger.remove()
if "console" in settings.LOG_TYPE:
# ======== 控制台输出 ========
logger.add(
sys.stdout,
level=settings.LOG_LEVEL,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> "
"| <level>{level: <8}</level> "
"| <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> "
"- <level>{message}</level>",
)
if "file" in settings.LOG_TYPE:
# 日志目录
LOG_DIR = settings.LOG_FILE_PATH
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# ======== 文件输出(按天切割)========
logger.add(
f"{LOG_DIR}/app_{{time:YYYY-MM-DD}}.log",
rotation="00:00", # 每天 0 点切割
retention="7 days", # 保存 7 天
encoding="utf-8",
level=settings.LOG_LEVEL,
enqueue=True, # 多线程安全
compression="zip", # 自动压缩旧日志
)
__all__ = ["logger"]

71
utils/mail_sender.py Normal file
View File

@ -0,0 +1,71 @@
import smtplib
from email.mime.text import MIMEText
from email.header import Header
from email.utils import formataddr
from utils import logger
from config import settings
class MailSender:
"""邮件发送工具从环境变量读取SMTP配置"""
def execute(self, to_addrs, subject, body, from_addr=None, body_type='plain'):
"""
发送邮件
:param to_addrs: 收件人,可以是单个邮箱字符串或邮箱列表
:param subject: 邮件主题
:param body: 邮件正文
:param from_addr: 发件人地址若为None则使用 SMTP_USER
:param body_type: 正文类型,'plain''html'
:return: True 表示发送成功
:raises ValueError: 配置缺失或发送失败时抛出异常
"""
# ----- 1. 获取SMTP配置 -----
smtp_host = settings.SMTP_HOST
smtp_port = settings.SMTP_PORT
username = settings.SMTP_USER
password = settings.SMTP_PASSWORD
use_tls = settings.SMTP_USE_TLS
from_name = settings.SMTP_FROM # 发件人显示名称,可选
# 必要配置校验
if not all([smtp_host, smtp_port, username, password]):
missing = []
if not smtp_host: missing.append(self.ENV_SMTP_HOST)
if not smtp_port: missing.append(self.ENV_SMTP_PORT)
if not username: missing.append(self.ENV_SMTP_USER)
if not password: missing.append(self.ENV_SMTP_PASSWORD)
raise ValueError(f"缺少必要的SMTP环境变量: {', '.join(missing)}")
# ----- 2. 构建邮件对象 -----
# 处理收件人格式
if isinstance(to_addrs, str):
to_addrs = [to_addrs]
to_str = ','.join(to_addrs)
# 确定发件人地址
if from_addr is None:
from_addr = username
# 创建邮件正文
msg = MIMEText(body, body_type, 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
# 发件人:可包含显示名称
if from_name:
msg['From'] = formataddr((Header(from_name, 'utf-8').encode(), from_addr))
else:
msg['From'] = from_addr
msg['To'] = to_str
# ----- 3. 发送邮件 -----
try:
logger.info(f"准备发送邮件,收件人: {to_str}, 主题: {subject}")
with smtplib.SMTP(smtp_host, int(smtp_port)) as server:
if use_tls:
server.starttls()
server.login(username, password)
server.send_message(msg)
logger.info(f"邮件发送成功,收件人: {to_str}")
return True
except Exception as e:
logger.error(f"邮件发送失败: {e}")
raise ValueError(f"邮件发送失败: {e}")