Files
peter/seek/douban_com/douban_group_seek.py
konjacpotato ac6c881763
All checks were successful
Gitea Actions Demo / host-commands (push) Successful in 0s
调整豆瓣小组帖子获取逻辑
2026-02-20 18:51:35 +08:00

105 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
from DrissionPage import Chromium, ChromiumOptions
from config.database import SessionLocal
from models.source_content import SourceContent
from utils import logger
class DoubanGroupSeek:
def __init__(self, group_id):
co = ChromiumOptions().set_local_port(9333)
self.browser = Chromium(addr_or_opts=co)
self.group_id = group_id
def seek(self):
db = SessionLocal()
group_url = f'https://www.douban.com/group/{self.group_id}'
tab = self.browser.new_tab()
tab.get(group_url)
title = tab.title
ele_table = tab.ele('tag:table@class=olt')
ele_trs = ele_table.eles('tag:tr@!class=th')
topics = []
for ele_tr in ele_trs:
topic_title = ele_tr.ele('tag:a').text
topic_url = ele_tr.ele('tag:a').attr('href')
update_time = ele_tr.ele('.time').text
topics.append((topic_title, topic_url, update_time))
# 去掉两个置顶的帖子根据title包含“置顶”来判断因为置顶的帖子一般情况是组规则和公告
topics = [(title, url, update_time) for title, url, update_time in topics if "置顶" not in title]
# 根据更新时间过滤update_time格式为“10-18 12:34”只保留24小时内的帖子
time_str = time.strftime("%m-%d %H:%M", time.localtime(time.time() - 24 * 3600))
topics = [(title, url, update_time) for title, url, update_time in topics if update_time >= time_str]
# 打印要爬取的主题列表
logger.info(f"Found {len(topics)} potential new topics to crawl:")
results = []
for topic_title, topic_url, update_time in topics:
# 检索数据库根据topic_url查询是否已存在
existing_content = db.query(SourceContent.id).filter(SourceContent.link == topic_url).first()
if existing_content:
# logger.info(f"Topic already exists in database, skipping: {topic_title}:{topic_url}")
continue
logger.info(f"fetch 标题:{topic_title} 链接:{topic_url} 更新时间:{update_time}\n")
tab.get(topic_url)
tab.wait(30) # 等待页面加载完成,时间可根据实际情况调整
try:
title = tab.title
ele_article = tab.ele('.article')
# 获取帖子内容、发布时间、IP地址位置、作者等信息
ele_topic_content = ele_article.ele('#topic-content')
ele_topic_doc = ele_topic_content.ele('.topic-doc')
content = ele_topic_doc.ele('.topic-content').text
post_time = ele_topic_doc.ele('.create-time').text
ip_location = ele_topic_doc.ele('.ip-location').text
author = ele_topic_doc.ele('.from').text
# 获取评论列表
comments = []
# 评论不一定存在,需先判断
try:
ele_comments = ele_article.ele('#comments')
ele_comments_list = ele_comments.eles('tag:li')
for ele_comment in ele_comments_list:
comment_content = ele_comment.ele('.reply-content').text
comment_time = ele_comment.ele('.pubtime').text
comment_author = ele_comment.ele('tag:h4').child().text
comments.append({
"comment_content": comment_content,
"comment_time": comment_time,
"comment_author": comment_author
})
except Exception as e:
logger.warning(f"No comments found for topic {topic_title}:{topic_url}: {str(e)}")
results.append((topic_url, json.dumps({
"title": title,
"content": content,
"post_time": post_time,
"ip_location": ip_location,
"author": author,
"comments": comments
}, ensure_ascii=False)))
except Exception as e:
logger.error(f"Error processing topic {topic_title}:{topic_url}: {str(e)}")
continue
# 存入数据库
for topic_url, data in results:
source_content = SourceContent(
link=topic_url,
platform='douban',
content=data
)
db.add(source_content)
db.commit()
tab.close()