Files
peter/database/thottopic/crud.py
konjacpotato 8c1a740f0b import peter
2025-11-12 20:42:16 +08:00

91 lines
2.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from database.thottopic.model import THotTopic
def create_hot_topic(db, hot_topic: THotTopic):
db.add(hot_topic)
db.commit()
db.refresh(hot_topic)
return hot_topic
# 插入数据库之前判断数据库中是否已经存在根据news.url 判断
def create_topic_if_url_not_exists(db, hot_topic: THotTopic):
# 检查是否已经存在具有相同 URL 的记录
existing_topic = db.query(THotTopic).filter(THotTopic.url == hot_topic.url).first()
if existing_topic:
# 如果记录已存在,直接返回已有的记录
return existing_topic
# 如果记录不存在,插入新的记录
db.add(hot_topic)
db.commit()
db.refresh(hot_topic)
return hot_topic
def create_topics_if_url_not_exists(db, topics: list[THotTopic]):
inserted_topics = [] # 用于保存实际插入的新闻记录
for topic in topics:
# 检查是否已经存在具有相同 URL 的记录
existing_topic = db.query(THotTopic).filter(THotTopic.url == topic.url).first()
if not existing_topic:
# 如果记录不存在,插入新的记录
db.add(topic)
inserted_topics.append(topic)
# 批量提交所有插入的记录
db.commit()
# 刷新所有新插入的记录
for topic in inserted_topics:
db.refresh(topic)
return inserted_topics
def hot_topic_not_exists(db, url_list: list) -> list:
"""
url如果在数据库中已经存在则去除掉
:param db:
:param url_list:
:return:
"""
hot_topics = db.query(THotTopic).filter(THotTopic.url.in_(url_list)).all()
for hot_topic in hot_topics:
url_list.remove(hot_topic.url)
return url_list
def get_hot_topic_by_id(db, hot_topic_id: int):
return db.query(THotTopic).filter(THotTopic.id == hot_topic_id).first()
def get_hot_topics(db, skip: int = 0, limit: int = 100):
return db.query(THotTopic).offset(skip).limit(limit).all()
# 根据THotTopic.update_time排序获取最新的THotTopic
def get_latest_hot_topic(db):
return db.query(THotTopic).order_by(THotTopic.update_time.desc()).first()
def update_hot_topic(db, hot_topic: THotTopic):
db.merge(hot_topic)
db.commit()
db.refresh(hot_topic)
return hot_topic
# def update_hot_topic(db, hot_topic_id: int, updates: dict):
# db.query(THotTopic).filter(THotTopic.id == hot_topic_id).update(updates)
# db.commit()
# return db.query(THotTopic).filter(THotTopic.id == hot_topic_id).first()
def delete_hot_topic(db, hot_topic_id: int):
hot_topic = db.query(THotTopic).filter(THotTopic.id == hot_topic_id).first()
if hot_topic:
db.delete(hot_topic)
db.commit()
return hot_topic