Files
edward/database/tvideoscript/video_script.py
konjacpotato 5267db8a0d
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 15s
import edward
2025-11-12 21:19:26 +08:00

57 lines
2.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from sqlalchemy import Column, String, TIMESTAMP, func
from database.database import Base, get_session
from utils import utils
@dataclass
class VideoScript(Base):
__tablename__ = 't_video_script'
id: str = Column(String, primary_key=True, comment='唯一标识')
title: str = Column(String, nullable=False, comment='标题')
description: Optional[str] = Column(String, nullable=True, comment='描述')
keywords: Optional[str] = Column(String, nullable=True, comment='话题关键词')
url: str = Column(String, nullable=False, comment='话题链接')
script: str = Column(String, nullable=True, comment='视频脚本')
content: str = Column(String, nullable=True, comment='话题内容')
create_time: datetime = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='创建时间')
def __repr__(self):
return f"<VideoScript(topic={self.topic}, url={self.url}, id={self.id}, description={self.description}, keywords={self.keywords})>"
def create_video_script(video_script: VideoScript):
if video_script.id is None:
video_script.id = utils.get_md5(video_script.url)
with get_session() as db:
db.add(video_script)
db.commit()
db.refresh(video_script)
return video_script
def video_script_not_exists(url_list: list):
"""
url_list如果在数据库中已经存在则去除掉
:param url_list:
:return:
"""
with get_session() as db:
video_scripts = db.query(VideoScript).filter(VideoScript.url.in_(url_list)).all()
for video_script in video_scripts:
url_list.remove(video_script.url)
return url_list
def get_today_video_script(db):
today = datetime.now().date()
return db.query(VideoScript).filter(func.date(VideoScript.create_time) == today).all()
def update_video_script(db, video_script: VideoScript):
db.merge(video_script)
db.commit()
db.refresh(video_script)
return video_script