Compare commits

...

3 Commits

Author SHA1 Message Date
51d1c403f5 seek: douban group
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 33s
2026-02-15 12:37:48 +08:00
72b117b57c add table source_content 2026-02-14 19:10:34 +08:00
33366f5339 根据数据库同步修改 2026-02-14 17:17:14 +08:00
26 changed files with 619 additions and 18 deletions

14
.env Normal file
View File

@ -0,0 +1,14 @@
ENV=dev
DEBUG=true
# 日志配置
LOG_LEVEL=DEBUG
LOG_TYPE=console
# 数据库配置
DB_HOST= 47.119.128.161 # 192.168.1.200
DB_PORT=19732
DB_USER=postgres
DB_PASS=postgres
DB_NAME=peter

149
alembic.ini Normal file
View File

@ -0,0 +1,149 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts.
# this is typically a path given in POSIX (e.g. forward slashes)
# format, relative to the token %(here)s which refers to the location of this
# ini file
script_location = %(here)s/migrations
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# Or organize into date-based subdirectories (requires recursive_version_locations = true)
# file_template = %%(year)d/%%(month).2d/%%(day).2d_%%(hour).2d%%(minute).2d_%%(second).2d_%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory. for multiple paths, the path separator
# is defined by "path_separator" below.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the tzdata library which can be installed by adding
# `alembic[tz]` to the pip requirements.
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to <script_location>/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "path_separator"
# below.
# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions
# path_separator; This indicates what character is used to split lists of file
# paths, including version_locations and prepend_sys_path within configparser
# files such as alembic.ini.
# The default rendered in new alembic.ini files is "os", which uses os.pathsep
# to provide os-dependent path splitting.
#
# Note that in order to support legacy alembic.ini files, this default does NOT
# take place if path_separator is not present in alembic.ini. If this
# option is omitted entirely, fallback logic is as follows:
#
# 1. Parsing of the version_locations option falls back to using the legacy
# "version_path_separator" key, which if absent then falls back to the legacy
# behavior of splitting on spaces and/or commas.
# 2. Parsing of the prepend_sys_path option falls back to the legacy
# behavior of splitting on spaces, commas, or colons.
#
# Valid values for path_separator are:
#
# path_separator = :
# path_separator = ;
# path_separator = space
# path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# database URL. This is consumed by the user-maintained env.py script only.
# other means of configuring database URLs may be customized within the env.py
# file.
sqlalchemy.url = postgresql+psycopg://postgres:postgres@192.168.1.200:19732/peter
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module
# hooks = ruff
# ruff.type = module
# ruff.module = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Alternatively, use the exec runner to execute a binary found on your PATH
# hooks = ruff
# ruff.type = exec
# ruff.executable = ruff
# ruff.options = check --fix REVISION_SCRIPT_FILENAME
# Logging configuration. This is also consumed by the user-maintained
# env.py script only.
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
config/__init__.py Normal file
View File

@ -0,0 +1 @@
from config.settings import settings

18
config/database.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from config.settings import settings
SQLALCHEMY_SYNC_URL = (
f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASS}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
engine = create_engine(
SQLALCHEMY_SYNC_URL,
echo=False, # 开发可改 True
future=True
)
SessionLocal = scoped_session(
sessionmaker(bind=engine, autoflush=False, autocommit=False)
)

22
config/env_loader.py Normal file
View File

@ -0,0 +1,22 @@
import os
from dotenv import load_dotenv
def load_env():
"""
自动根据 ENV 加载对应的 .env 文件
"""
base_file = ".env"
prod_file = ".env.prod"
test_file = ".env.test"
# 先加载基础 .env
if os.path.exists(base_file):
load_dotenv(base_file)
# 根据参数 ENV 再加载其他环境
env = os.getenv("ENV", "dev")
if env == "prod" and os.path.exists(prod_file):
load_dotenv(prod_file, override=True)
elif env == "test" and os.path.exists(test_file):
load_dotenv(test_file, override=True)

31
config/settings.py Normal file
View File

@ -0,0 +1,31 @@
from pydantic_settings import BaseSettings
from pydantic import Field
from config.env_loader import load_env
# 先加载 ENV & .env
load_env()
class Settings(BaseSettings):
# 环境
ENV: str = Field("dev")
DEBUG: bool = Field(True)
# 日志
LOG_LEVEL: str = Field("LOG_LEVEL")
LOG_FILE_PATH: str = Field("logs")
LOG_TYPE: str = Field("console")
# 数据库
DB_HOST: str
DB_PORT: int
DB_USER: str
DB_PASS: str
DB_NAME: str
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# 全局唯一配置实例
settings = Settings()

View File

@ -17,7 +17,7 @@ class TScheduler(Base):
task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据')
active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用')
executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者')
handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序')
description: Optional[str] = Column(String(256), nullable=True, comment='任务描述')
last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间')
next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间')
create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间')

1
migrations/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

81
migrations/env.py Normal file
View File

@ -0,0 +1,81 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
from models.base import Base
import models.source_content # 导入模型以注册到 Base.metadata
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

28
migrations/script.py.mako Normal file
View File

@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,44 @@
"""create table source_content
Revision ID: fc8b7693c66b
Revises:
Create Date: 2026-02-14 19:07:21.557137
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'fc8b7693c66b'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('t_source_content',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False, comment='自动递增的唯一内容ID'),
sa.Column('link', sa.String(length=2048), nullable=False, comment='链接'),
sa.Column('platform', sa.String(length=32), nullable=False, comment='平台'),
sa.Column('content', sa.Text(), nullable=True, comment='内容'),
sa.Column('create_time', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False, comment='创建时间'),
sa.Column('update_time', sa.DateTime(timezone=True), server_default=sa.text('now()'), nullable=False, comment='更新时间'),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_t_source_content_link'), 't_source_content', ['link'], unique=False)
op.create_index('link', 't_source_content', ['link'], unique=True)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index('link', table_name='t_source_content')
op.drop_index(op.f('ix_t_source_content_link'), table_name='t_source_content')
op.drop_table('t_source_content')
# ### end Alembic commands ###

4
models/base.py Normal file
View File

@ -0,0 +1,4 @@
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass

57
models/source_content.py Normal file
View File

@ -0,0 +1,57 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Text, Integer, DateTime, Index, func
from models.base import Base
class SourceContent(Base):
__tablename__ = "t_source_content"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
comment="自动递增的唯一内容ID"
)
link: Mapped[str] = mapped_column(
String(2048),
nullable=False,
index=True,
comment="链接"
)
platform: Mapped[str] = mapped_column(
String(32),
nullable=False,
comment="平台"
)
content: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="内容"
)
create_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
comment="创建时间"
)
update_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
comment="更新时间"
)
# ——可选优化:添加 项目 + 主题 联合唯一索引——
__table_args__ = (
Index("link", "link", unique=True),
)
def __repr__(self):
return f"<SourceContent id={self.id} link={self.link!r} platform={self.platform!r}>"

View File

@ -1,13 +1,11 @@
import datetime
from functools import partial
from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from log.log_manager import log, logger
from task.manager_task import manager_task
from apscheduler.events import EVENT_JOB_ERROR
from config import config
from task import manager_task
from utils import logger
def job_error_listener(event):
if event.exception:
@ -31,7 +29,7 @@ if __name__ == '__main__':
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
try:
log(f"started successfully.")
logger.info(f"started successfully.")
scheduler.start() # 阻塞运行
except (KeyboardInterrupt, SystemExit):
log(f"Shutting down ...")
logger.info(f"Shutting down ...")

Binary file not shown.

View File

@ -0,0 +1,98 @@
import json
from time import sleep
from DrissionPage import Chromium, ChromiumOptions
from config.database import SessionLocal
from models.source_content import SourceContent
from utils import logger
class DoubanGroupSeek:
def __init__(self, group_id):
co = ChromiumOptions().set_local_port(9333)
self.browser = Chromium(addr_or_opts=co)
self.group_id = group_id
def seek(self):
db = SessionLocal()
# 获取最近100条数据的URL列表用于过滤掉已存在的URL避免重复爬取和存储
recent_contents = db.query(SourceContent).order_by(SourceContent.id.desc()).limit(100).all()
group_url = f'https://www.douban.com/group/{self.group_id}'
tab = self.browser.new_tab()
tab.get(group_url)
title = tab.title
ele_table = tab.ele('tag:table@class=olt')
ele_trs = ele_table.eles('tag:tr@!class=th')
topics = []
for ele_tr in ele_trs:
topic_title = ele_tr.ele('tag:a').text
topic_url = ele_tr.ele('tag:a').attr('href')
topics.append((topic_title, topic_url))
# 过滤掉已存在的URL
existing_urls = set(content.link for content in recent_contents)
topics = [(title, url) for title, url in topics if url not in existing_urls]
# 打印要爬取的主题列表
logger.info(f"Found {len(topics)} new topics to crawl:")
for topic_title, topic_url in topics:
logger.info(f"标题:{topic_title} 链接:{topic_url}\n")
results = []
for topic_title, topic_url in topics:
logger.info(f"fetch 标题:{topic_title} 链接:{topic_url}\n")
tab.get(topic_url)
tab.wait(30) # 等待页面加载完成,时间可根据实际情况调整
try:
title = tab.title
ele_article = tab.ele('.article')
# 获取帖子内容、发布时间、IP地址位置、作者等信息
ele_topic_content = ele_article.ele('#topic-content')
ele_topic_doc = ele_topic_content.ele('.topic-doc')
content = ele_topic_doc.ele('.topic-content').text
post_time = ele_topic_doc.ele('.create-time').text
ip_location = ele_topic_doc.ele('.ip-location').text
author = ele_topic_doc.ele('.from').text
# 获取评论列表
comments = []
# 评论不一定存在,需先判断
try:
ele_comments = ele_article.ele('#comments')
ele_comments_list = ele_comments.eles('tag:li')
for ele_comment in ele_comments_list:
comment_content = ele_comment.ele('.reply-content').text
comment_time = ele_comment.ele('.pubtime').text
comment_author = ele_comment.ele('tag:h4').child().text
comments.append({
"comment_content": comment_content,
"comment_time": comment_time,
"comment_author": comment_author
})
except Exception as e:
logger.warning(f"No comments found for topic {topic_title}:{topic_url}: {str(e)}")
results.append((topic_url, json.dumps({
"title": title,
"content": content,
"post_time": post_time,
"ip_location": ip_location,
"author": author,
"comments": comments
}, ensure_ascii=False)))
except Exception as e:
logger.error(f"Error processing topic {topic_title}:{topic_url}: {str(e)}")
continue
# 存入数据库
for topic_url, data in results:
source_content = SourceContent(
link=topic_url,
platform='douban',
content=data
)
db.add(source_content)
db.commit()
tab.close()

2
task/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from task.manager_task import manager_task
from task.hot_topic.douban import spider_task as douban_hot_topic_task

15
task/hot_topic/douban.py Normal file
View File

@ -0,0 +1,15 @@
from task.manager_task import execute_task
from seek.douban_com.douban_group_seek import DoubanGroupSeek
from utils import logger
def spider_task():
logger.info(f"Douban hot topic spider task start execute......")
# 社畜买房共进会 小组 ID 为 677158
douban_group_seek = DoubanGroupSeek(group_id=677158)
douban_group_seek.seek()
logger.info(f"Douban hot topic spider task end execute......")
if __name__ == '__main__':
execute_task(spider_task)

View File

@ -6,7 +6,7 @@ from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from database.database import get_session
from database.tscheduler.crud import get_tasks_by_executor
from log.log_manager import log
from utils import logger
"""
这是一个特殊的任务,负责管理任务,命名为管理者任务。
@ -24,10 +24,10 @@ def log_task_execution(task_name: str, start_time: float, end_time: float = None
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
if end_time is None:
log(f"{task_name} start execute at {start_time_str}")
logger.info(f"{task_name} start execute at {start_time_str}")
else:
elapsed_time = end_time - start_time
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
logger.info(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
def execute_task(task: callable):
@ -66,7 +66,7 @@ def load_tasks(scheduler: BlockingScheduler):
replace_existing=True,
misfire_grace_time=interval_seconds
)
log(f"Task {task.task_name} added with interval {interval_seconds} seconds")
logger.info(f"Task {task.task_name} added with interval {interval_seconds} seconds")
elif trigger == "cron":
# 解析 cron 表达式的字段
fields = task.cron_expression.split()
@ -90,7 +90,7 @@ def load_tasks(scheduler: BlockingScheduler):
id=str(task_id),
replace_existing=True
)
log(f"Task {task.task_name} added with cron {task.cron_expression}")
logger.info(f"Task {task.task_name} added with cron {task.cron_expression}")
elif trigger == "date":
scheduler.add_job(
task_function,
@ -99,13 +99,13 @@ def load_tasks(scheduler: BlockingScheduler):
id=str(task_id),
replace_existing=True
)
log(f"Task {task.task_name} added with date {task.execution_date}")
logger.info(f"Task {task.task_name} added with date {task.execution_date}")
else:
log(f"Task Invalid trigger type: {trigger}")
logger.warning(f"Task Invalid trigger type: {trigger}")
else:
log(f"Task {task.task_name} already exists......")
logger.info(f"Task {task.task_name} already exists......")
run_time = job.next_run_time - job.trigger.start_date
log(f"Task {task.task_name} already exists, run time is {run_time}")
logger.info(f"Task {task.task_name} already exists, run time is {run_time}")
# 管理者任务
def manager_task(scheduler: BlockingScheduler):

1
utils/__init__.py Normal file
View File

@ -0,0 +1 @@
from utils.logger import logger

37
utils/logger.py Normal file
View File

@ -0,0 +1,37 @@
import sys
import os
from loguru import logger
from config.settings import settings
# 移除默认的 handler否则重复输出
logger.remove()
if "console" in settings.LOG_TYPE:
# ======== 控制台输出 ========
logger.add(
sys.stdout,
level=settings.LOG_LEVEL,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> "
"| <level>{level: <8}</level> "
"| <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> "
"- <level>{message}</level>",
)
if "file" in settings.LOG_TYPE:
# 日志目录
LOG_DIR = settings.LOG_FILE_PATH
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# ======== 文件输出(按天切割)========
logger.add(
f"{LOG_DIR}/app_{{time:YYYY-MM-DD}}.log",
rotation="00:00", # 每天 0 点切割
retention="7 days", # 保存 7 天
encoding="utf-8",
level=settings.LOG_LEVEL,
enqueue=True, # 多线程安全
compression="zip", # 自动压缩旧日志
)
__all__ = ["logger"]