import peter

This commit is contained in:
konjacpotato
2025-11-12 20:42:16 +08:00
commit 8c1a740f0b
147 changed files with 2763 additions and 0 deletions

3
Readme.md Normal file
View File

@ -0,0 +1,3 @@
# Peter
电影里面的蜘蛛侠叫Peter Parker.

0
config/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

4
config/config.py Normal file
View File

@ -0,0 +1,4 @@
# scheduler name
scheduler_name = 'peter'
# scheduler interval in seconds
scheduler_interval = 3600

5
database/Readme.md Normal file
View File

@ -0,0 +1,5 @@
```
t_top_topic table DDL
上面是top_topic数据表的DDL根据DDL信息在database模块下按照项目结构创建model.py和crud.py
```

0
database/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

37
database/database.py Normal file
View File

@ -0,0 +1,37 @@
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from log.log_manager import logger
Base = declarative_base()
DATABASE_URL = 'postgresql+psycopg://postgres:K8u3fg0o@47.119.128.161:60001/squirrel'
engine = create_engine(
DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800, # 防止数据库端连接过期
connect_args={
'connect_timeout': 15,
'keepalives_idle': 60,
'keepalives_interval': 10,
'keepalives_count': 5
}
)
Base.metadata.create_all(engine)
@contextmanager
def get_session():
session = sessionmaker(bind=engine)()
try:
yield session
session.commit() # 自动提交成功的事务
except Exception as e:
session.rollback() # 异常时回滚
logger.error(f"Database operation failed: {str(e)}")
raise # 重新抛出异常
finally:
session.close() # 确保会话关闭

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,77 @@
from database.thotcontent.model import THotContent
from log.log_manager import logger
def create_hot_content(db, hot_content: THotContent):
db.add(hot_content)
db.commit()
db.refresh(hot_content)
return hot_content
# 插入数据库之前判断数据库中是否已经存在根据news.url 判断
def create_content_if_url_not_exists(db, hot_content: THotContent):
# 检查是否已经存在具有相同 URL 的记录
existing_content = db.query(THotContent).filter(THotContent.url == hot_content.url).first()
if existing_content:
# 如果记录已存在,直接返回已有的记录
return existing_content
# 如果记录不存在,插入新的记录
db.add(hot_content)
db.commit()
db.refresh(hot_content)
return hot_content
def create_contents_top3_if_url_not_exists(db, contents: list[THotContent]):
logger.info(f"采集到内容数量:{len(contents)},存入数据库前三")
# 按照 THotContent.content_upvote_count 对contents进行排序
contents.sort(key=lambda x: x.content_upvote_count, reverse=True)
# 保留 contents 的前3条
contents = contents[:3]
inserted_contents = [] # 用于保存实际插入的新闻记录
for content in contents:
# 检查是否已经存在具有相同 URL 的记录
existing_content = db.query(THotContent).filter(THotContent.url == content.url).first()
if not existing_content:
# 如果记录不存在,插入新的记录
db.add(content)
inserted_contents.append(content)
# 批量提交所有插入的记录
db.commit()
# 刷新所有新插入的记录
for content in inserted_contents:
db.refresh(content)
return inserted_contents
def get_hot_content_by_id(db, hot_content_id: int):
return db.query(THotContent).filter(THotContent.id == hot_content_id).first()
def get_hot_content_by_topic_id(db, topic_id: int):
return db.query(THotContent).filter(THotContent.topic_id == topic_id).all()
def get_hot_contents(db, skip: int = 0, limit: int = 100):
return db.query(THotContent).offset(skip).limit(limit).all()
def update_hot_content(db, hot_content_id: int, updates: dict):
hot_content = db.query(THotContent).filter(THotContent.id == hot_content_id).first()
if hot_content:
for key, value in updates.items():
setattr(hot_content, key, value)
db.commit()
db.refresh(hot_content)
return hot_content
def delete_hot_content(db, hot_content_id: int):
hot_content = db.query(THotContent).filter(THotContent.id == hot_content_id).first()
if hot_content:
db.delete(hot_content)
db.commit()

View File

@ -0,0 +1,23 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, Integer, String, BIGINT, TIMESTAMP, func
from sqlalchemy.dialects.postgresql import BIGINT
from database.database import Base
@dataclass
class THotContent(Base):
__tablename__ = 't_hot_content'
id: int = Column(BIGINT, primary_key=True, autoincrement=True, comment='序号')
topic_id: int = Column(BIGINT, nullable=False, comment='关联话题ID')
url: Optional[str] = Column(String, nullable=True, comment='内容链接')
content: Optional[str] = Column(String, nullable=True, comment='内容详情')
content_upvote_count: Optional[int] = Column(BIGINT, nullable=True, comment='内容点赞数量')
content_comment_count: Optional[int] = Column(Integer, nullable=True, comment='内容评论数量')
create_time: datetime = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='创建时间')
def __repr__(self):
return f"<THotContent(id={self.id}, topic_id={self.topic_id}, url={self.url}, content_upvote_count={self.content_upvote_count})>"

View File

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,90 @@
from database.thottopic.model import THotTopic
def create_hot_topic(db, hot_topic: THotTopic):
db.add(hot_topic)
db.commit()
db.refresh(hot_topic)
return hot_topic
# 插入数据库之前判断数据库中是否已经存在根据news.url 判断
def create_topic_if_url_not_exists(db, hot_topic: THotTopic):
# 检查是否已经存在具有相同 URL 的记录
existing_topic = db.query(THotTopic).filter(THotTopic.url == hot_topic.url).first()
if existing_topic:
# 如果记录已存在,直接返回已有的记录
return existing_topic
# 如果记录不存在,插入新的记录
db.add(hot_topic)
db.commit()
db.refresh(hot_topic)
return hot_topic
def create_topics_if_url_not_exists(db, topics: list[THotTopic]):
inserted_topics = [] # 用于保存实际插入的新闻记录
for topic in topics:
# 检查是否已经存在具有相同 URL 的记录
existing_topic = db.query(THotTopic).filter(THotTopic.url == topic.url).first()
if not existing_topic:
# 如果记录不存在,插入新的记录
db.add(topic)
inserted_topics.append(topic)
# 批量提交所有插入的记录
db.commit()
# 刷新所有新插入的记录
for topic in inserted_topics:
db.refresh(topic)
return inserted_topics
def hot_topic_not_exists(db, url_list: list) -> list:
"""
url如果在数据库中已经存在则去除掉
:param db:
:param url_list:
:return:
"""
hot_topics = db.query(THotTopic).filter(THotTopic.url.in_(url_list)).all()
for hot_topic in hot_topics:
url_list.remove(hot_topic.url)
return url_list
def get_hot_topic_by_id(db, hot_topic_id: int):
return db.query(THotTopic).filter(THotTopic.id == hot_topic_id).first()
def get_hot_topics(db, skip: int = 0, limit: int = 100):
return db.query(THotTopic).offset(skip).limit(limit).all()
# 根据THotTopic.update_time排序获取最新的THotTopic
def get_latest_hot_topic(db):
return db.query(THotTopic).order_by(THotTopic.update_time.desc()).first()
def update_hot_topic(db, hot_topic: THotTopic):
db.merge(hot_topic)
db.commit()
db.refresh(hot_topic)
return hot_topic
# def update_hot_topic(db, hot_topic_id: int, updates: dict):
# db.query(THotTopic).filter(THotTopic.id == hot_topic_id).update(updates)
# db.commit()
# return db.query(THotTopic).filter(THotTopic.id == hot_topic_id).first()
def delete_hot_topic(db, hot_topic_id: int):
hot_topic = db.query(THotTopic).filter(THotTopic.id == hot_topic_id).first()
if hot_topic:
db.delete(hot_topic)
db.commit()
return hot_topic

View File

@ -0,0 +1,34 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, String, Integer, TIMESTAMP, func
from sqlalchemy.dialects.postgresql import BIGINT
from database.database import Base
@dataclass
class THotTopic(Base):
__tablename__ = 't_hot_topic'
id: int = Column(BIGINT, primary_key=True, autoincrement=True, comment='序号')
topic: str = Column(String, nullable=False, comment='话题')
topic_description: Optional[str] = Column(String, nullable=True, comment='话题描述')
url: Optional[str] = Column(String, nullable=True, comment='话题链接')
source: Optional[str] = Column(String, nullable=True, comment='话题来源')
keywords: Optional[str] = Column(String, nullable=True, comment='话题关键词')
content_count: int = Column(Integer, default=0, nullable=False, comment='话题内容数量')
comment_count: int = Column(Integer, default=0, nullable=False, comment='话题评论数量')
follower_count: int = Column(Integer, default=0, nullable=False, comment='话题关注者数量')
date_created: Optional[datetime] = Column(TIMESTAMP(timezone=True), nullable=True, comment='话题创建时间')
date_modified: Optional[datetime] = Column(TIMESTAMP(timezone=True), nullable=True, comment='话题修改时间')
top_content_url: Optional[str] = Column(String, nullable=True, comment='热内内容链接')
top_content_upvote_count: Optional[int] = Column(BIGINT, nullable=True, comment='热门内容点赞数量')
top_content_comment_count: Optional[int] = Column(Integer, nullable=True, comment='热门内容评论数量')
create_time: datetime = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='创建时间')
update_time: Optional[datetime] = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='更新时间')
ai_script: Optional[str] = Column(String, nullable=True, comment='内容脚本')
def __repr__(self):
return f"<THotTopic(topic={self.topic}, url={self.url}, id={self.id}, source={self.source}, content_count={self.content_count})>"

View File

View File

@ -0,0 +1,31 @@
from database.tinformationsource.model import TInformationSource
def create_information_source(db, information_source: TInformationSource):
db.add(information_source)
db.commit()
db.refresh(information_source)
return information_source
def get_information_source_by_id(db, information_source_id: int):
return db.query(TInformationSource).filter(TInformationSource.id == information_source_id).first()
def get_active_information_sources(db) -> list:
return db.query(TInformationSource).filter(TInformationSource.active == True).all()
def update_information_source(db, information_source_id: int, updates: dict):
update_information = db.query(TInformationSource).filter(TInformationSource.id == information_source_id).first()
if update_information:
for key, value in updates.items():
setattr(update_information, key, value)
db.commit()
db.refresh(update_information)
return update_information
def delete_update_information(db, information_source_id: int):
update_information = db.query(TInformationSource).filter(TInformationSource.id == information_source_id).first()
if update_information:
db.delete(update_information)
db.commit()
return update_information

View File

@ -0,0 +1,34 @@
from dataclasses import dataclass
from sqlalchemy import Column, String, Boolean, TIMESTAMP, func, INT
from sqlalchemy.dialects.postgresql import BIGINT
from database.database import Base
@dataclass
class TInformationSource(Base):
__tablename__ = 't_information_source'
id: int = Column(BIGINT, primary_key=True, autoincrement=True, comment='编号')
title: str = Column(String, nullable=False, comment='标题')
description: str = Column(String, nullable=True, comment='描述')
keywords: str = Column(String, nullable=True, comment='关键字')
url: str = Column(String, nullable=True, comment='网站链接')
rss: str = Column(String, nullable=True, comment='RSS链接')
api: str = Column(String, nullable=True, comment='API')
primary_category: str = Column(String, nullable=True, comment='一级类别')
secondary_category: str = Column(String, nullable=True, comment='二级类别')
tertiary_category: str = Column(String, nullable=True, comment='三级类别')
label: str = Column(String, nullable=True, comment='标签')
lang: str = Column(String, nullable=False, default='zh', comment='语言')
priority: int = Column(INT, nullable=False, default=100, comment='优先级')
active: bool = Column(Boolean, default=False, nullable=False, comment='是否启用false未启用true启用')
module: str = Column(String, nullable=True, comment='任务逻辑所在模块名称')
method: str = Column(String, nullable=True, comment='任务逻辑的函数名称')
create_time: str = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='创建时间')
update_time: str = Column(TIMESTAMP(timezone=True), server_default=func.now(), onupdate=func.now(), nullable=False, comment='更新时间')
is_static: bool = Column(Boolean, default=True, nullable=False, comment='是否是静态网站false动态true静态')
def __repr__(self):
return f"<TInformationSource(id={self.id}, title={self.title}, category={self.category}, active={self.active})>"

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

87
database/tnews/crud.py Normal file
View File

@ -0,0 +1,87 @@
from database.tnews.model import TNews
def create_news(db, news: TNews):
db.add(news)
db.commit()
db.refresh(news)
return news
# 插入数据库之前判断数据库中是否已经存在根据news.url 判断
def create_news_if_url_not_exists(db, news: TNews):
# 检查是否已经存在具有相同 URL 的记录
existing_news = db.query(TNews).filter(TNews.url == news.url).first()
if existing_news:
# 如果记录已存在,直接返回已有的记录
return existing_news
# 如果记录不存在,插入新的记录
db.add(news)
db.commit()
db.refresh(news)
return news
def create_news_list_if_url_not_exists(db, news_list: list[TNews]):
inserted_news = [] # 用于保存实际插入的新闻记录
for news in news_list:
# 检查是否已经存在具有相同 URL 的记录
existing_news = db.query(TNews).filter(TNews.url == news.url).first()
if not existing_news:
# 如果记录不存在,插入新的记录
db.add(news)
inserted_news.append(news)
# 批量提交所有插入的记录
db.commit()
# 刷新所有新插入的记录
for news in inserted_news:
db.refresh(news)
return inserted_news
def get_news_by_id(db, news_id: int):
return db.query(TNews).filter(TNews.id == news_id).first()
def get_news_need_content(db):
return db.query(TNews).filter(TNews.content == None).all()
def get_news_need_summary(db):
return db.query(TNews).filter(TNews.ai_summary == None).all()
def get_news_for_generate_reference_message(db, news_type: str) -> list[TNews]:
return db.query(TNews).filter(
TNews.type == news_type,
TNews.ai_summary != None,
TNews.is_usage == False
).order_by(TNews.occurrence_date.desc()).all()
def update_news_by_id(db, news: TNews):
db.merge(news)
db.commit()
def update_news(db, news_id: int, updates: dict):
news = db.query(TNews).filter(TNews.id == news_id).first()
if news:
for key, value in updates.items():
setattr(news, key, value)
db.commit()
db.refresh(news)
return news
def delete_news(db, news_id: int):
news = db.query(TNews).filter(TNews.id == news_id).first()
if news:
db.delete(news)
db.commit()
return news

25
database/tnews/model.py Normal file
View File

@ -0,0 +1,25 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, String, Boolean, DateTime, BigInteger, text, INT
from database.database import Base
@dataclass
class TNews(Base):
__tablename__ = 't_news'
id: int = Column(BigInteger, primary_key=True, autoincrement=True, comment='编号')
title: Optional[str] = Column(String, nullable=True, comment='标题')
summary: Optional[str] = Column(String, nullable=True, comment='摘要')
url: Optional[str] = Column(String, nullable=True, comment='链接')
content: Optional[str] = Column(String, nullable=True, comment='内容/正文')
occurrence_date: Optional[datetime] = Column(DateTime(timezone=True), nullable=True, comment='发布日期')
source: Optional[str] = Column(String, nullable=True, comment='来源')
primary_category: str = Column(String, nullable=True, comment='一级类别')
secondary_category: str = Column(String, nullable=True, comment='二级类别')
tertiary_category: str = Column(String, nullable=True, comment='三级类别')
label: str = Column(String, nullable=True, comment='标签')
lang: str = Column(String, nullable=False, default='zh', comment='语言')
is_usage: bool = Column(Boolean, nullable=False, default=False, server_default=text('false'), comment='是否已用')
create_time: datetime = Column(DateTime(timezone=True), nullable=False, server_default=text('now()'), comment='创建日期')

View File

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,35 @@
from database.tscheduler.model import TScheduler
def create_task(db, task: TScheduler):
db.add(task)
db.commit()
db.refresh(task)
return task
def get_task_by_id(db, task_id: int):
return db.query(TScheduler).filter(TScheduler.id == task_id).first()
def get_active_tasks(db):
return db.query(TScheduler).filter(TScheduler.active == True).all()
def get_tasks_by_executor(db, executor: str):
return db.query(TScheduler).filter(
TScheduler.executor == executor,
TScheduler.active == True
).all()
def update_task(db, task_id: int, updates: dict):
task = db.query(TScheduler).filter(TScheduler.id == task_id).first()
if task:
for key, value in updates.items():
setattr(task, key, value)
db.commit()
db.refresh(task)
return task
def delete_task(db, task_id: int):
task = db.query(TScheduler).filter(TScheduler.id == task_id).first()
if task:
db.delete(task)
db.commit()
return task

View File

@ -0,0 +1,26 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, Integer, String, Boolean, Text, DateTime
from database.database import Base
@dataclass
class TScheduler(Base):
__tablename__ = 't_scheduler'
id: int = Column(Integer, primary_key=True, autoincrement=True, comment='自动递增的唯一任务ID')
task_name: str = Column(String(64), nullable=False, comment='任务名称')
trigger: str = Column(String(10), nullable=False, comment='调度方式interval、cron、date')
interval_seconds: Optional[int] = Column(Integer, nullable=True, comment='固定时间间隔(秒),用于 interval 类型')
cron_expression: Optional[str] = Column(String(255), nullable=True, comment='CRON 表达式,用于 cron 类型')
execution_date: Optional[datetime] = Column(DateTime, nullable=True, comment='执行时间,用于 date 类型')
task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据')
active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用')
executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者')
handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序')
last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间')
next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间')
create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间')
update_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='更新时间')
module_path: Optional[str] = Column(String(255), nullable=True, comment='任务逻辑所在模块名称')
function_name: Optional[str] = Column(String(256), nullable=True, comment='任务逻辑的函数名称')

View File

View File

@ -0,0 +1,47 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, String, TIMESTAMP, func
from database.database import Base, get_session
from utils import utils
@dataclass
class VideoScript(Base):
__tablename__ = 't_video_script'
id: str = Column(String, primary_key=True, comment='唯一标识')
title: str = Column(String, nullable=False, comment='标题')
description: Optional[str] = Column(String, nullable=True, comment='描述')
keywords: Optional[str] = Column(String, nullable=True, comment='话题关键词')
url: str = Column(String, nullable=False, comment='话题链接')
script: str = Column(String, nullable=True, comment='视频脚本')
content: str = Column(String, nullable=True, comment='话题内容')
create_time: datetime = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='创建时间')
def __repr__(self):
return f"<THotTopic(topic={self.topic}, url={self.url}, id={self.id}, description={self.description}, keywords={self.keywords})>"
def create_video_script(video_script: VideoScript):
if video_script.id is None:
video_script.id = utils.get_md5(video_script.url)
with get_session() as db:
db.add(video_script)
db.commit()
db.refresh(video_script)
return video_script
def video_script_not_exists(url_list: list):
"""
url_list如果在数据库中已经存在则去除掉
:param url_list:
:return:
"""
with get_session() as db:
video_scripts = db.query(VideoScript).filter(VideoScript.url.in_(url_list)).all()
for video_script in video_scripts:
url_list.remove(video_script.url)
return url_list

0
log/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

70
log/log_manager.py Normal file
View File

@ -0,0 +1,70 @@
import logging.config
import sys
import config.config
"""
Usage:
1 code
from log.log_manager import logger
logger.info("Starting Jarvas")
2 app start
python demo.py --logconfig=log_prod.config
当前目录下的log_prod.config是一份参考配置
"""
# default logging config for development
LOG_DEV_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"loggers": {
"root": {
"level": "INFO",
"handlers": ["consoleHandler"]
}
},
"handlers": {
"consoleHandler": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "verbose",
"stream": sys.stdout
}
},
"formatters": {
"verbose": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S"
}
}
}
log_config_message = ""
# 获取命令行参数
args = sys.argv
# 查找包含 'logconfig' 的参数
logconfig_param = next((arg for arg in args if '--logconfig' in arg), None)
logconfig_value = None
if logconfig_param:
# 如果找到了 logconfig 参数,提取其值
_, logconfig_value = logconfig_param.split('=') # 以 '=' 分割
log_config_message = f"--logconfig value: {logconfig_value}"
else:
log_config_message = "没有找到 --logconfig 参数使用默认log配置"
if logconfig_value:
# 使用入参日志配置文件
logging.config.fileConfig(logconfig_value)
else:
# 使用默认日志配置
logging.config.dictConfig(LOG_DEV_CONFIG)
logger = logging.getLogger('root')
# 打印日志配置信息
logger.info(log_config_message)
def log(message: str):
logger.info(f'{config.config.scheduler_name} {message}')

22
log/log_prod.config Normal file
View File

@ -0,0 +1,22 @@
[loggers]
keys=root
[handlers]
keys=fileHandler
[formatters]
keys=verbose
[logger_root]
level=INFO
handlers=fileHandler
[handler_fileHandler]
class=FileHandler
level=INFO
formatter=verbose
args=('app.log', 'a')
[formatter_verbose]
format=%(asctime)s - %(name)s - %(levelname)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S

37
peter.py Normal file
View File

@ -0,0 +1,37 @@
import datetime
from functools import partial
from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from log.log_manager import log, logger
from task.manager_task import manager_task
from apscheduler.events import EVENT_JOB_ERROR
def job_error_listener(event):
if event.exception:
logger.error(f"Job {event.job_id} crashed: {str(event.exception)}")
# 可添加邮件/钉钉告警逻辑
if __name__ == '__main__':
scheduler = BlockingScheduler()
# 每隔config.scheduler_interval秒执行一次任务同时设定第一次执行在程序启动后10秒后执行
scheduler.add_job(
partial(manager_task, scheduler),
'interval',
seconds=config.scheduler_interval,
jitter=30, # 添加随机抖动避免任务雪崩
next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器
)
# 添加任务错误监听器
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
try:
log(f"started successfully.")
scheduler.start() # 阻塞运行
except (KeyboardInterrupt, SystemExit):
log(f"Shutting down ...")

BIN
requirements.txt Normal file

Binary file not shown.

0
seek/163_com/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

45
seek/163_com/content.py Normal file
View File

@ -0,0 +1,45 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.content_base import ContentBase
class ArticleContent(ContentBase):
def __init__(self, news: TNews):
super().__init__(news)
def get_content(self):
try:
content_ = self.session.s_ele('.post_body').text
except ElementNotFoundError:
content_ = 'not found element'
return content_
def get_content(information_source: TInformationSource) -> list:
article_content = ArticleContent(information_source)
result = article_content.get_content()
article_content.finish()
return result
def content_task(news: TNews):
logger.info(f'{news.title} news_task start execute at {datetime.datetime.now()}', )
ofweek_com_ai = ArticleContent(news)
ofweek_com_ai.do_seek_task()
ofweek_com_ai.finish()
logger.info(f'{news.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
news_ = TNews()
news_.is_static = True
news_.url = 'https://www.163.com/dy/article/JKC1V4E70519DDQ2.html'
content = get_content(news_)
logger.info(content)
logger.info('Done.')

59
seek/163_com/house.py Normal file
View File

@ -0,0 +1,59 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.seek_base import SeekBase
class House(SeekBase):
def __init__(self, information_source: TInformationSource):
super().__init__(information_source)
def get_news(self):
news_result = []
_news_list = self.session.s_ele('.news-first').s_eles('.data_row news_article clearfix2 ')
for _news in _news_list:
try:
rs_news = TNews()
rs_news.title = _news.s_ele('.news_title').s_ele('tag:a').text
rs_news.url = _news.s_ele('tag:a').link
# rs_news.summary = _news.s_ele('tag:p').text
# rs_news.occurrence_date = self.process_time(tmp.s_eles('tag:span')[1].text)
rs_news.source = self.information_source.title
news_result.append(rs_news)
except ElementNotFoundError as e:
logger.error(f"ElementNotFoundError: {e} - Failed to find element in news item.")
except Exception as e:
logger.error(f'Unexpected error occurred: {e}')
return news_result
def get_news(information_source: TInformationSource) -> list:
instance = House(information_source)
news_list = instance.get_news()
instance.finish()
return news_list
def news_task(information_source: TInformationSource):
logger.info(f'{information_source.title} news_task start execute at {datetime.datetime.now()}', )
instance = House(information_source)
instance.do_seek_task()
instance.finish()
logger.info(f'{information_source.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
information_source_ = TInformationSource()
information_source_.is_static = True
information_source_.url = 'https://sz.house.163.com/'
information_source_.title = '房产_网易'
news_task(information_source_)
# news_list_ = get_news(information_source_)
# for news in news_list_:
# print(news)
logger.info('Done.')

0
seek/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,46 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.content_base import ContentBase
class ArticleContent(ContentBase):
def __init__(self, news: TNews):
super().__init__(news)
def get_content(self):
content_ = ''
try:
content_ = self.session.s_ele('.^info-content').text
except ElementNotFoundError:
content_ = 'not found element'
return content_
def get_content(information_source: TInformationSource) -> list:
article_content = ArticleContent(information_source)
result = article_content.get_content()
article_content.finish()
return result
def content_task(news: TNews):
logger.info(f'{news.title} news_task start execute at {datetime.datetime.now()}', )
ofweek_com_ai = ArticleContent(news)
ofweek_com_ai.do_seek_task()
ofweek_com_ai.finish()
logger.info(f'{news.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
news_ = TNews()
news_.is_static = True
news_.url = 'https://sz.news.anjuke.com/louping-965203-pan528488.html'
content = get_content(news_)
logger.info(content)
logger.info('Done.')

62
seek/anjuke_com/house.py Normal file
View File

@ -0,0 +1,62 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.seek_base import SeekBase
from utils.time_utils import process_time
class House(SeekBase):
def __init__(self, information_source: TInformationSource):
super().__init__(information_source)
def get_news(self):
news_result = []
print(self.session.html)
_news_list = self.session.s_ele('.main-list').s_eles('.m-list-item clearfix')
for _news in _news_list:
try:
rs_news = TNews()
tmp_ = _news.s_ele('.item-col-right')
rs_news.title = tmp_.s_ele('tag:h3').text
rs_news.url = tmp_.s_ele('tag:a').link
rs_news.summary = tmp_.s_eles('tag:a')[1].text
rs_news.occurrence_date = process_time(tmp_.s_ele('.info__time').text)
rs_news.source = self.information_source.title
news_result.append(rs_news)
except ElementNotFoundError as e:
logger.error(f"ElementNotFoundError: {e} - Failed to find element in news item.")
except Exception as e:
logger.error(f'Unexpected error occurred: {e}')
return news_result
def get_news(information_source: TInformationSource) -> list:
instance = House(information_source)
news_list = instance.get_news()
instance.finish()
return news_list
def news_task(information_source: TInformationSource):
logger.info(f'{information_source.title} news_task start execute at {datetime.datetime.now()}', )
instance = House(information_source)
instance.do_seek_task()
instance.finish()
logger.info(f'{information_source.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
information_source_ = TInformationSource()
information_source_.is_static = True
information_source_.url = 'https://sz.news.anjuke.com/hot/'
information_source_.title = '房产_安居客'
news_task(information_source_)
# news_list_ = get_news(information_source_)
# for news in news_list_:
# print(news)
logger.info('Done.')

0
seek/cnn_com/__init__.py Normal file
View File

58
seek/cnn_com/content.py Normal file
View File

@ -0,0 +1,58 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.content_base import ContentBase
class ArticleContent(ContentBase):
def __init__(self, news: TNews):
super().__init__(news)
def get_content(self):
content_ = ''
try:
content_ = self.session.s_ele('#detailContent').text
except ElementNotFoundError:
content_ = 'not found element'
return content_
def get_occurrence_date(self):
try:
header_time = self.session.s_ele('.header-time left')
year = header_time.s_ele('.year').text # 2023
day = header_time.s_ele('.day').text # 12/27
time = header_time.s_ele('.time').text # 08:05:11
occurrence_date_ = f'{year}/{day} {time}'
print(occurrence_date_)
except ElementNotFoundError:
occurrence_date_ = None
return occurrence_date_
def get_content(information_source: TInformationSource) -> list:
article_content = ArticleContent(information_source)
result = article_content.get_content()
article_content.get_occurrence_date()
article_content.finish()
return result
def content_task(news: TNews):
logger.info(f'{news.title} news_task start execute at {datetime.datetime.now()}', )
article_content = ArticleContent(news)
article_content.do_seek_task()
article_content.finish()
logger.info(f'{news.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
news_ = TNews()
news_.is_static = True
news_.url = 'https://www.news.cn/politics/leaders/20241227/90e76f85ad4a43ba94802b07c5736e00/c.html'
content = get_content(news_)
logger.info(content)
logger.info('Done.')

62
seek/cnn_com/edition.py Normal file
View File

@ -0,0 +1,62 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.seek_base import SeekBase
class Edition(SeekBase):
def __init__(self, information_source: TInformationSource):
super().__init__(information_source)
def get_news(self):
news_result = []
# _news_list = self.tab.s_ele('.zone zone--t-light zone-2-observer').s_eles('.stack')
# _news_list = self.tab.s_ele('.zone zone--t-light zone-2-observer').s_eles('.stack__items ')
_news_list = self.tab.s_ele('.zone zone--t-light zone-2-observer').s_eles('tag:a')
for _news in _news_list:
print(_news.html)
try:
rs_news = TNews()
rs_news.title = _news.text
rs_news.url = _news.link
# rs_news.summary = tmp_.s_eles('tag:a')[1].text
# rs_news.occurrence_date = self.process_time(tmp_.s_ele('.info__time').text)
rs_news.source = self.information_source.title
news_result.append(rs_news)
except ElementNotFoundError as e:
logger.error(f"ElementNotFoundError: {e} - Failed to find element in news item.")
except Exception as e:
logger.error(f'Unexpected error occurred: {e}')
return news_result
def get_news(information_source: TInformationSource) -> list:
instance = Edition(information_source)
news_list = instance.get_news()
instance.finish()
return news_list
def news_task(information_source: TInformationSource):
logger.info(f'{information_source.title} news_task start execute at {datetime.datetime.now()}', )
instance = Edition(information_source)
instance.do_seek_task()
instance.finish()
logger.info(f'{information_source.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
information_source_ = TInformationSource()
information_source_.is_static = False
information_source_.url = 'https://edition.cnn.com/'
information_source_.title = 'edition_CNN'
# news_task(information_source_)
news_list_ = get_news(information_source_)
for news in news_list_:
print(news)
logger.info('Done.')

50
seek/content_base.py Normal file
View File

@ -0,0 +1,50 @@
from abc import ABC, abstractmethod
from DrissionPage import Chromium, SessionPage, ChromiumOptions
from database.database import get_session
from database.tnews.crud import update_news_by_id
from database.tnews.model import TNews
from log.log_manager import log
class ContentBase(ABC):
def __init__(self, news: TNews):
self.news = news
self.session = None # 初始化为 None
self.browser = None # 初始化为 None
if news.is_static:
self.session = SessionPage()
self.session.get(news.url)
else:
co = ChromiumOptions()
self.browser = Chromium(addr_or_opts=co)
# self.tab = self.browser.latest_tab
self.tab = self.browser.new_tab()
self.tab.get(news.url)
@abstractmethod
def get_content(self):
"""Abstract method to fetch news from a specific source."""
pass
def get_occurrence_date(self):
return None
def do_seek_task(self):
"""Saves the list of news to the database if the URL does not already exist."""
self.news.content = self.get_content()
if self.news.occurrence_date is None:
self.news.occurrence_date = self.get_occurrence_date()
with get_session() as db:
update_news_by_id(db, self.news)
log(f'successful fetch {self.news.title} news content into the database.')
def finish(self):
"""Closes the browser and session."""
if self.tab:
self.tab.close()
# if self.browser:
# self.browser.quit()
if self.session:
self.session.close()

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

46
seek/fang_com/content.py Normal file
View File

@ -0,0 +1,46 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.content_base import ContentBase
class ArticleContent(ContentBase):
def __init__(self, news: TNews):
super().__init__(news)
def get_content(self):
content_ = ''
try:
content_ = self.session.s_ele('.^news-text').text
except ElementNotFoundError:
content_ = 'not found element'
return content_
def get_content(information_source: TInformationSource) -> list:
article_content = ArticleContent(information_source)
result = article_content.get_content()
article_content.finish()
return result
def content_task(news: TNews):
logger.info(f'{news.title} news_task start execute at {datetime.datetime.now()}', )
ofweek_com_ai = ArticleContent(news)
ofweek_com_ai.do_seek_task()
ofweek_com_ai.finish()
logger.info(f'{news.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
news_ = TNews()
news_.is_static = True
news_.url = 'https://sz.news.fang.com/open/51863596.html'
content = get_content(news_)
logger.info(content)
logger.info('Done.')

64
seek/fang_com/house.py Normal file
View File

@ -0,0 +1,64 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.seek_base import SeekBase
from utils.time_utils import process_time
class House(SeekBase):
def __init__(self, information_source: TInformationSource):
super().__init__(information_source)
def get_news(self):
news_result = []
_news_list = self.session.s_ele('.news-list').s_eles('tag:li')
for _news in _news_list:
try:
rs_news = TNews()
tmp = _news.s_ele('.txt')
rs_news.title = tmp.s_ele('tag:a').text
rs_news.url = tmp.s_ele('tag:a').link
rs_news.summary = tmp.s_ele('tag:p').text
rs_news.occurrence_date = process_time(tmp.s_eles('tag:span')[1].text)
rs_news.source = self.information_source.title
news_result.append(rs_news)
except ElementNotFoundError as e:
if _news.s_ele('.item'):
# 此为视频内容,跳过
continue
logger.error(f"ElementNotFoundError: {e} - Failed to find element in news item.")
except Exception as e:
logger.error(f'Unexpected error occurred: {e}')
return news_result
def get_news(information_source: TInformationSource) -> list:
instance = House(information_source)
news_list = instance.get_news()
instance.finish()
return news_list
def news_task(information_source: TInformationSource):
logger.info(f'{information_source.title} news_task start execute at {datetime.datetime.now()}', )
instance = House(information_source)
instance.do_seek_task()
instance.finish()
logger.info(f'{information_source.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
information_source_ = TInformationSource()
information_source_.is_static = True
information_source_.url = 'https://sz.news.fang.com/'
information_source_.title = '房产_房天下'
news_task(information_source_)
# news_list_ = get_news(information_source_)
# for news in news_list_:
# print(news)
logger.info('Done.')

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

46
seek/focus_cn/content.py Normal file
View File

@ -0,0 +1,46 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.content_base import ContentBase
class ArticleContent(ContentBase):
def __init__(self, news: TNews):
news.is_static = True
super().__init__(news)
def get_content(self):
try:
content_ = self.session.s_ele('.article').text
except ElementNotFoundError:
content_ = 'not found element'
return content_
def get_content(information_source: TInformationSource) -> list:
article_content = ArticleContent(information_source)
result = article_content.get_content()
article_content.finish()
return result
def content_task(news: TNews):
logger.info(f'{news.title} news_task start execute at {datetime.datetime.now()}', )
ofweek_com_ai = ArticleContent(news)
ofweek_com_ai.do_seek_task()
ofweek_com_ai.finish()
logger.info(f'{news.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
news_ = TNews()
news_.is_static = True
news_.url = 'https://www.focus.cn/a/842171870_124752'
content = get_content(news_)
logger.info(content)
logger.info('Done.')

62
seek/focus_cn/house.py Normal file
View File

@ -0,0 +1,62 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.seek_base import SeekBase
from utils.time_utils import process_time
class House(SeekBase):
def __init__(self, information_source: TInformationSource):
super().__init__(information_source)
def get_news(self):
news_result = []
self.tab.wait.ele_displayed('.FeedList')
_news_list = self.tab.s_ele('.cbd-recommend').s_eles('.FeedList')
for _news in _news_list:
try:
rs_news = TNews()
rs_news.title = _news.s_ele('.item-text-content-title').text
link = _news.s_ele('tag:a').link
rs_news.url = link.split('?')[0]
rs_news.summary = _news.s_ele('.item-text-content-description').text
rs_news.occurrence_date = process_time(_news.s_ele('.extra-info-item').text)
rs_news.source = self.information_source.title
news_result.append(rs_news)
except ElementNotFoundError as e:
logger.error(f"ElementNotFoundError: {e} - Failed to find element in news item.")
except Exception as e:
logger.error(f'Unexpected error occurred: {e}')
return news_result
def get_news(information_source: TInformationSource) -> list:
instance = House(information_source)
news_list = instance.get_news()
instance.finish()
return news_list
def news_task(information_source: TInformationSource):
logger.info(f'{information_source.title} news_task start execute at {datetime.datetime.now()}', )
instance = House(information_source)
instance.do_seek_task()
instance.finish()
logger.info(f'{information_source.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
information_source_ = TInformationSource()
information_source_.is_static = False
information_source_.url = 'https://sz.focus.cn/zixun/'
information_source_.title = '房产_搜狐焦点'
news_task(information_source_)
# news_list_ = get_news(information_source_)
# for news in news_list_:
# print(news)
logger.info('Done.')

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

45
seek/leju_com/content.py Normal file
View File

@ -0,0 +1,45 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.content_base import ContentBase
class ArticleContent(ContentBase):
def __init__(self, news: TNews):
super().__init__(news)
def get_content(self):
try:
content_ = self.session.s_ele('.^sf_news_contend').text
except ElementNotFoundError:
content_ = 'not found element'
return content_
def get_content(information_source: TInformationSource) -> list:
article_content = ArticleContent(information_source)
result = article_content.get_content()
article_content.finish()
return result
def content_task(news: TNews):
logger.info(f'{news.title} news_task start execute at {datetime.datetime.now()}', )
ofweek_com_ai = ArticleContent(news)
ofweek_com_ai.do_seek_task()
ofweek_com_ai.finish()
logger.info(f'{news.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
news_ = TNews()
news_.is_static = True
news_.url = 'https://sz.leju.com/news/2024-12-18/18427272536617796292963.shtml'
content = get_content(news_)
logger.info(content)
logger.info('Done.')

60
seek/leju_com/house.py Normal file
View File

@ -0,0 +1,60 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.seek_base import SeekBase
from utils.time_utils import process_time
class House(SeekBase):
def __init__(self, information_source: TInformationSource):
super().__init__(information_source)
def get_news(self):
news_result = []
_news_list = self.session.s_ele('.sf_listPage').s_eles('tag:li')
for _news in _news_list:
try:
rs_news = TNews()
rs_news.title = _news.s_ele('tag:a').text
rs_news.url = _news.s_ele('tag:a').link
rs_news.summary = _news.s_ele('tag:p').text
rs_news.occurrence_date = process_time(_news.s_ele('.tag').text)
rs_news.source = self.information_source.title
news_result.append(rs_news)
except ElementNotFoundError as e:
logger.error(f"ElementNotFoundError: {e} - Failed to find element in news item.")
except Exception as e:
logger.error(f'Unexpected error occurred: {e}')
return news_result
def get_news(information_source: TInformationSource) -> list:
instance = House(information_source)
news_list = instance.get_news()
instance.finish()
return news_list
def news_task(information_source: TInformationSource):
logger.info(f'{information_source.title} news_task start execute at {datetime.datetime.now()}', )
instance = House(information_source)
instance.do_seek_task()
instance.finish()
logger.info(f'{information_source.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
information_source_ = TInformationSource()
information_source_.is_static = True
information_source_.url = 'https://sz.leju.com/news/'
information_source_.title = '房产_新浪乐居'
# news_task(information_source_)
news_list_ = get_news(information_source_)
for news in news_list_:
print(news)
logger.info('Done.')

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

45
seek/mittr_com/content.py Normal file
View File

@ -0,0 +1,45 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.content_base import ContentBase
class ArticleContent(ContentBase):
def __init__(self, news: TNews):
super().__init__(news)
def get_content(self):
try:
content_ = self.tab.s_ele('.content').text
except ElementNotFoundError:
content_ = 'not found element'
return content_
def get_content(information_source: TInformationSource) -> list:
article_content = ArticleContent(information_source)
result = article_content.get_content()
article_content.finish()
return result
def content_task(news: TNews):
logger.info(f'{news.title} news_task start execute at {datetime.datetime.now()}', )
ofweek_com_ai = ArticleContent(news)
ofweek_com_ai.do_seek_task()
ofweek_com_ai.finish()
logger.info(f'{news.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
news_ = TNews()
news_.is_static = False
news_.url = 'https://www.mittrchina.com/news/detail/14218'
content = get_content(news_)
logger.info(content)
logger.info('Done.')

63
seek/mittr_com/mit_t_r.py Normal file
View File

@ -0,0 +1,63 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.seek_base import SeekBase
from utils.time_utils import process_time
class MittrChinaCom(SeekBase):
def __init__(self, information_source: TInformationSource):
super().__init__(information_source)
def get_news(self):
news_result = []
self.tab.wait.ele_displayed('.last-item')
_news_list = self.tab.s_ele('.lastest-list').s_eles('.last-item')
for _news in _news_list:
try:
tnews = TNews()
tnews.title = _news.s_ele('tag:a').text
tnews.url = _news.s_ele('tag:a').link
_time = _news.parent().s_ele('.time').text
tnews.occurrence_date = process_time(_time)
tnews.source = self.information_source.title
news_result.append(tnews)
except ElementNotFoundError as e:
logger.error(f"ElementNotFoundError: {e} - Failed to find element in news item.")
except Exception as e:
logger.error(f'Unexpected error occurred: {e}')
return news_result
def get_news(information_source: TInformationSource) -> list:
mittr = MittrChinaCom(information_source)
news_list = mittr.get_news()
mittr.finish()
return news_list
def news_task(information_source: TInformationSource):
logger.info(f'{information_source.title} news_task start execute at {datetime.datetime.now()}', )
mittr = MittrChinaCom(information_source)
mittr.do_seek_task()
mittr.finish()
logger.info(f'{information_source.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
information_source_ = TInformationSource()
information_source_.is_static = False
information_source_.url = 'https://www.mittrchina.com/'
information_source_.title = '科技_麻省理工科技评论'
news_task(information_source_)
# news_list_ = get_news(information_source_)
# for news in news_list_:
# print(news)
logger.info('Done.')

View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

62
seek/ofweek_com/ai.py Normal file
View File

@ -0,0 +1,62 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.seek_base import SeekBase
from utils.time_utils import process_time
class OfweekComAi(SeekBase):
def __init__(self, information_source: TInformationSource):
super().__init__(information_source)
def get_news(self):
news_result = []
_news_list = self.session.s_ele('.main-cont-left w640').s_eles('.^top-title')
for _news in _news_list:
try:
__news = TNews()
__news.title = _news.s_ele('tag:a').text
__news.url = _news.s_ele('tag:a').link
_time = _news.parent().s_eles('tag:span')[4].text
__news.occurrence_date = process_time(_time)
__news.source = self.information_source.title
news_result.append(__news)
except ElementNotFoundError as e:
logger.error(f"ElementNotFoundError: {e} - Failed to find element in news item.")
except Exception as e:
logger.error(f'Unexpected error occurred: {e}')
return news_result
def get_news(information_source: TInformationSource) -> list:
ofweek_com_ai = OfweekComAi(information_source)
news_list = ofweek_com_ai.get_news()
ofweek_com_ai.finish()
return news_list
def news_task(information_source: TInformationSource):
logger.info(f'{information_source.title} news_task start execute at {datetime.datetime.now()}', )
ofweek_com_ai = OfweekComAi(information_source)
ofweek_com_ai.do_seek_task()
ofweek_com_ai.finish()
logger.info(f'{information_source.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
information_source_ = TInformationSource()
information_source_.is_static = True
information_source_.url = 'https://www.ofweek.com/ai/'
information_source_.title = '人工智能_维科网'
news_task(information_source_)
# news_list_ = get_news(information_source_)
# for news in news_list_:
# print(news)
logger.info('Done.')

View File

@ -0,0 +1,46 @@
import datetime
from DrissionPage.errors import ElementNotFoundError
from database.tinformationsource.model import TInformationSource
from database.tnews.model import TNews
from log.log_manager import logger
from seek.content_base import ContentBase
class ArticleContent(ContentBase):
def __init__(self, news: TNews):
super().__init__(news)
def get_content(self):
content_ = ''
try:
content_ = self.session.s_ele('.artical-content').text
except ElementNotFoundError:
content_ = 'not found element'
return content_
def get_content(information_source: TInformationSource) -> list:
article_content = ArticleContent(information_source)
result = article_content.get_content()
article_content.finish()
return result
def content_task(news: TNews):
logger.info(f'{news.title} news_task start execute at {datetime.datetime.now()}', )
ofweek_com_ai = ArticleContent(news)
ofweek_com_ai.do_seek_task()
ofweek_com_ai.finish()
logger.info(f'{news.title} news_task end execute at {datetime.datetime.now()}')
if __name__ == '__main__':
logger.info('This module is not for direct call!')
news_ = TNews()
news_.is_static = True
news_.url = 'https://www.ofweek.com/ai/2024-12/ART-201721-8120-30654143.html'
content = get_content(news_)
logger.info(content)
logger.info('Done.')

57
seek/seek_base.py Normal file
View File

@ -0,0 +1,57 @@
from abc import ABC, abstractmethod
from DrissionPage import Chromium, SessionPage, ChromiumOptions
from database.database import get_session
from database.tinformationsource.model import TInformationSource
from database.tnews.crud import create_news_list_if_url_not_exists
from log.log_manager import log
class SeekBase(ABC):
def __init__(self, information_source: TInformationSource):
self.information_source = information_source
self.session = None # 初始化为 None
self.browser = None # 初始化为 None
self.tab = None
if information_source.is_static:
self.session = SessionPage()
self.session.get(information_source.url)
else:
co = ChromiumOptions()
self.browser = Chromium()
self.tab = self.browser.new_tab()
self.tab.get(information_source.url)
@abstractmethod
def get_news(self):
"""Abstract method to fetch news from a specific source."""
pass
def do_seek_task(self):
"""Saves the list of news to the database if the URL does not already exist."""
news_list = self.get_news()
for news in news_list:
if news.primary_category is None:
news.primary_category = self.information_source.primary_category
if news.secondary_category is None:
news.secondary_category = self.information_source.secondary_category
if news.tertiary_category is None:
news.tertiary_category = self.information_source.tertiary_category
if news.label is None:
news.label = self.information_source.label
if news.lang is None:
news.lang = self.information_source.lang
with get_session() as db:
inserted_news = create_news_list_if_url_not_exists(db, news_list)
log(f'Inserted {len(inserted_news)} {self.information_source.title} news items into the database.')
return inserted_news
def finish(self):
"""Closes the browser and session."""
if self.tab:
self.tab.close()
# if self.browser:
# self.browser.quit()
if self.session:
self.session.close()

Some files were not shown because too many files have changed in this diff Show More