diff --git a/.env b/.env index e7046d4..d5c246a 100644 --- a/.env +++ b/.env @@ -7,7 +7,7 @@ LOG_LEVEL=DEBUG LOG_TYPE=console # 数据库配置 -DB_HOST=192.168.1.200 +DB_HOST= 47.119.128.161 # 192.168.1.200 DB_PORT=19732 DB_USER=postgres DB_PASS=postgres diff --git a/config/__pycache__/__init__.cpython-312.pyc b/config/__pycache__/__init__.cpython-312.pyc index bbef600..392cd15 100644 Binary files a/config/__pycache__/__init__.cpython-312.pyc and b/config/__pycache__/__init__.cpython-312.pyc differ diff --git a/peter.py b/peter.py index ed57a39..e09ff91 100644 --- a/peter.py +++ b/peter.py @@ -1,13 +1,11 @@ import datetime from functools import partial - from apscheduler.schedulers.blocking import BlockingScheduler - -from config import config -from log.log_manager import log, logger -from task.manager_task import manager_task - from apscheduler.events import EVENT_JOB_ERROR +from config import config +from task import manager_task +from utils import logger + def job_error_listener(event): if event.exception: @@ -31,7 +29,7 @@ if __name__ == '__main__': scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR) try: - log(f"started successfully.") + logger.info(f"started successfully.") scheduler.start() # 阻塞运行 except (KeyboardInterrupt, SystemExit): - log(f"Shutting down ...") + logger.info(f"Shutting down ...") diff --git a/seek/douban_com/douban_group_seek.py b/seek/douban_com/douban_group_seek.py new file mode 100644 index 0000000..780b891 --- /dev/null +++ b/seek/douban_com/douban_group_seek.py @@ -0,0 +1,98 @@ +import json +from time import sleep +from DrissionPage import Chromium, ChromiumOptions +from config.database import SessionLocal +from models.source_content import SourceContent +from utils import logger + + +class DoubanGroupSeek: + def __init__(self, group_id): + co = ChromiumOptions().set_local_port(9333) + self.browser = Chromium(addr_or_opts=co) + self.group_id = group_id + + def seek(self): + db = SessionLocal() + # 获取最近100条数据的URL列表,用于过滤掉已存在的URL,避免重复爬取和存储 + recent_contents = db.query(SourceContent).order_by(SourceContent.id.desc()).limit(100).all() + + group_url = f'https://www.douban.com/group/{self.group_id}' + tab = self.browser.new_tab() + tab.get(group_url) + + title = tab.title + ele_table = tab.ele('tag:table@class=olt') + ele_trs = ele_table.eles('tag:tr@!class=th') + topics = [] + for ele_tr in ele_trs: + topic_title = ele_tr.ele('tag:a').text + topic_url = ele_tr.ele('tag:a').attr('href') + topics.append((topic_title, topic_url)) + + # 过滤掉已存在的URL + existing_urls = set(content.link for content in recent_contents) + topics = [(title, url) for title, url in topics if url not in existing_urls] + + # 打印要爬取的主题列表 + logger.info(f"Found {len(topics)} new topics to crawl:") + for topic_title, topic_url in topics: + logger.info(f"标题:{topic_title} 链接:{topic_url}\n") + + results = [] + for topic_title, topic_url in topics: + logger.info(f"fetch 标题:{topic_title} 链接:{topic_url}\n") + tab.get(topic_url) + tab.wait(30) # 等待页面加载完成,时间可根据实际情况调整 + try: + title = tab.title + ele_article = tab.ele('.article') + + # 获取帖子内容、发布时间、IP地址位置、作者等信息 + ele_topic_content = ele_article.ele('#topic-content') + ele_topic_doc = ele_topic_content.ele('.topic-doc') + content = ele_topic_doc.ele('.topic-content').text + post_time = ele_topic_doc.ele('.create-time').text + ip_location = ele_topic_doc.ele('.ip-location').text + author = ele_topic_doc.ele('.from').text + + # 获取评论列表 + comments = [] + # 评论不一定存在,需先判断 + try: + ele_comments = ele_article.ele('#comments') + ele_comments_list = ele_comments.eles('tag:li') + for ele_comment in ele_comments_list: + comment_content = ele_comment.ele('.reply-content').text + comment_time = ele_comment.ele('.pubtime').text + comment_author = ele_comment.ele('tag:h4').child().text + comments.append({ + "comment_content": comment_content, + "comment_time": comment_time, + "comment_author": comment_author + }) + except Exception as e: + logger.warning(f"No comments found for topic {topic_title}:{topic_url}: {str(e)}") + + results.append((topic_url, json.dumps({ + "title": title, + "content": content, + "post_time": post_time, + "ip_location": ip_location, + "author": author, + "comments": comments + }, ensure_ascii=False))) + except Exception as e: + logger.error(f"Error processing topic {topic_title}:{topic_url}: {str(e)}") + continue + + # 存入数据库 + for topic_url, data in results: + source_content = SourceContent( + link=topic_url, + platform='douban', + content=data + ) + db.add(source_content) + db.commit() + tab.close() \ No newline at end of file diff --git a/task/__init__.py b/task/__init__.py new file mode 100644 index 0000000..5f6b4a9 --- /dev/null +++ b/task/__init__.py @@ -0,0 +1,2 @@ +from task.manager_task import manager_task +from task.hot_topic.douban import spider_task as douban_hot_topic_task \ No newline at end of file diff --git a/task/__pycache__/__init__.cpython-312.pyc b/task/__pycache__/__init__.cpython-312.pyc index 1fdafe3..88bd7a8 100644 Binary files a/task/__pycache__/__init__.cpython-312.pyc and b/task/__pycache__/__init__.cpython-312.pyc differ diff --git a/task/__pycache__/manager_task.cpython-312.pyc b/task/__pycache__/manager_task.cpython-312.pyc index 8b3d96d..09f6d3f 100644 Binary files a/task/__pycache__/manager_task.cpython-312.pyc and b/task/__pycache__/manager_task.cpython-312.pyc differ diff --git a/task/hot_topic/douban.py b/task/hot_topic/douban.py new file mode 100644 index 0000000..33a5b3b --- /dev/null +++ b/task/hot_topic/douban.py @@ -0,0 +1,15 @@ +from task.manager_task import execute_task +from seek.douban_com.douban_group_seek import DoubanGroupSeek +from utils import logger + + +def spider_task(): + logger.info(f"Douban hot topic spider task start execute......") + # 社畜买房共进会 小组 ID 为 677158 + douban_group_seek = DoubanGroupSeek(group_id=677158) + douban_group_seek.seek() + logger.info(f"Douban hot topic spider task end execute......") + + +if __name__ == '__main__': + execute_task(spider_task) \ No newline at end of file diff --git a/task/manager_task.py b/task/manager_task.py index e9fb398..d13a5bb 100644 --- a/task/manager_task.py +++ b/task/manager_task.py @@ -6,7 +6,7 @@ from apscheduler.schedulers.blocking import BlockingScheduler from config import config from database.database import get_session from database.tscheduler.crud import get_tasks_by_executor -from log.log_manager import log +from utils import logger """ 这是一个特殊的任务,负责管理任务,命名为管理者任务。 @@ -24,10 +24,10 @@ def log_task_execution(task_name: str, start_time: float, end_time: float = None start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) if end_time is None: - log(f"{task_name} start execute at {start_time_str}") + logger.info(f"{task_name} start execute at {start_time_str}") else: elapsed_time = end_time - start_time - log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") + logger.info(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") def execute_task(task: callable): @@ -66,7 +66,7 @@ def load_tasks(scheduler: BlockingScheduler): replace_existing=True, misfire_grace_time=interval_seconds ) - log(f"Task {task.task_name} added with interval {interval_seconds} seconds") + logger.info(f"Task {task.task_name} added with interval {interval_seconds} seconds") elif trigger == "cron": # 解析 cron 表达式的字段 fields = task.cron_expression.split() @@ -90,7 +90,7 @@ def load_tasks(scheduler: BlockingScheduler): id=str(task_id), replace_existing=True ) - log(f"Task {task.task_name} added with cron {task.cron_expression}") + logger.info(f"Task {task.task_name} added with cron {task.cron_expression}") elif trigger == "date": scheduler.add_job( task_function, @@ -99,13 +99,13 @@ def load_tasks(scheduler: BlockingScheduler): id=str(task_id), replace_existing=True ) - log(f"Task {task.task_name} added with date {task.execution_date}") + logger.info(f"Task {task.task_name} added with date {task.execution_date}") else: - log(f"Task Invalid trigger type: {trigger}") + logger.warning(f"Task Invalid trigger type: {trigger}") else: - log(f"Task {task.task_name} already exists......") + logger.info(f"Task {task.task_name} already exists......") run_time = job.next_run_time - job.trigger.start_date - log(f"Task {task.task_name} already exists, run time is {run_time}") + logger.info(f"Task {task.task_name} already exists, run time is {run_time}") # 管理者任务 def manager_task(scheduler: BlockingScheduler): diff --git a/utils/__pycache__/__init__.cpython-312.pyc b/utils/__pycache__/__init__.cpython-312.pyc index e83ec45..28f99f8 100644 Binary files a/utils/__pycache__/__init__.cpython-312.pyc and b/utils/__pycache__/__init__.cpython-312.pyc differ