Files
peter/seek/douban_com/douban_group_seek.py
konjacpotato 491061acc7
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
add 数据库超时重试机制
2026-02-27 10:29:29 +08:00

141 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
from DrissionPage import Chromium, ChromiumOptions
from psycopg import OperationalError
from config.database import SessionLocal
from models.source_content import SourceContent
from utils import logger
class DoubanGroupSeek:
def __init__(self, group_id):
co = ChromiumOptions().set_local_port(9333)
self.browser = Chromium(addr_or_opts=co)
self.group_id = group_id
def fetch_with_retry(self, session, model, link, max_retries=3, base_delay=1):
"""
带重试的数据库查询函数
:param session: SQLAlchemy 会话
:param model: 模型类
:param link: 要查询的链接
:param max_retries: 最大重试次数
:param base_delay: 初始延迟时间(秒)
:return: 查询结果或 None
"""
for attempt in range(1, max_retries + 1):
try:
# 执行查询
result = session.query(model.id).filter(model.link == link).first()
return result # 成功则返回结果
except OperationalError as e:
# 打印错误信息(可选)
logger.error(f"数据库查询失败 (尝试 {attempt}/{max_retries}): {e}")
# 回滚当前会话,避免事务挂起影响后续操作
session.rollback()
if attempt == max_retries:
# 最后一次重试失败,抛出异常或记录错误后返回 None
logger.info(f"已达到最大重试次数,放弃查询链接: {link}")
raise # 或者返回 None根据业务决定
# 计算等待时间:指数退避
sleep_time = base_delay * (2 ** (attempt - 1))
time.sleep(sleep_time)
except Exception as e:
# 如果还想捕获其他数据库错误,可以统一处理
logger.error(f"数据库错误: {e}")
session.rollback()
raise # 非临时性错误,直接抛出
return None # 正常情况下不会执行到这里
def seek(self):
db = SessionLocal()
group_url = f'https://www.douban.com/group/{self.group_id}'
tab = self.browser.new_tab()
tab.get(group_url)
title = tab.title
ele_table = tab.ele('tag:table@class=olt')
ele_trs = ele_table.eles('tag:tr@!class=th')
topics = []
for ele_tr in ele_trs:
topic_title = ele_tr.ele('tag:a').text
topic_url = ele_tr.ele('tag:a').attr('href')
update_time = ele_tr.ele('.time').text
topics.append((topic_title, topic_url, update_time))
# 去掉两个置顶的帖子根据title包含“置顶”来判断因为置顶的帖子一般情况是组规则和公告
topics = [(title, url, update_time) for title, url, update_time in topics if "置顶" not in title]
# 根据更新时间过滤update_time格式为“10-18 12:34”只保留24小时内的帖子
time_str = time.strftime("%m-%d %H:%M", time.localtime(time.time() - 24 * 3600))
topics = [(title, url, update_time) for title, url, update_time in topics if update_time >= time_str]
# 打印要爬取的主题列表
logger.info(f"Found {len(topics)} potential new topics to crawl:")
results = []
for topic_title, topic_url, update_time in topics:
# 检索数据库根据topic_url查询是否已存在
# existing_content = db.query(SourceContent.id).filter(SourceContent.link == topic_url).first()
existing_content = self.fetch_with_retry(db, SourceContent, topic_url)
if existing_content:
# logger.info(f"Topic already exists in database, skipping: {topic_title}:{topic_url}")
continue
logger.info(f"fetch 标题:{topic_title} 链接:{topic_url} 更新时间:{update_time}\n")
tab.get(topic_url)
tab.wait(30) # 等待页面加载完成,时间可根据实际情况调整
try:
title = tab.title
ele_article = tab.ele('.article')
# 获取帖子内容、发布时间、IP地址位置、作者等信息
ele_topic_content = ele_article.ele('#topic-content')
ele_topic_doc = ele_topic_content.ele('.topic-doc')
content = ele_topic_doc.ele('.topic-content').text
post_time = ele_topic_doc.ele('.create-time').text
ip_location = ele_topic_doc.ele('.ip-location').text
author = ele_topic_doc.ele('.from').text
# 获取评论列表
comments = []
# 评论不一定存在,需先判断
try:
ele_comments = ele_article.ele('#comments')
ele_comments_list = ele_comments.eles('tag:li')
for ele_comment in ele_comments_list:
comment_content = ele_comment.ele('.reply-content').text
comment_time = ele_comment.ele('.pubtime').text
comment_author = ele_comment.ele('tag:h4').child().text
comments.append({
"comment_content": comment_content,
"comment_time": comment_time,
"comment_author": comment_author
})
except Exception as e:
logger.warning(f"No comments found for topic {topic_title}:{topic_url}: {str(e)}")
results.append((topic_url, json.dumps({
"title": title,
"content": content,
"post_time": post_time,
"ip_location": ip_location,
"author": author,
"comments": comments
}, ensure_ascii=False)))
except Exception as e:
logger.error(f"Error processing topic {topic_title}:{topic_url}: {str(e)}")
continue
# 存入数据库
for topic_url, data in results:
source_content = SourceContent(
link=topic_url,
platform='douban',
content=data
)
db.add(source_content)
db.commit()
tab.close()