Files
edward/task/news/reference_message.py
konjacpotato 5267db8a0d
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 15s
import edward
2025-11-12 21:19:26 +08:00

50 lines
2.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import time
from database.database import get_session
from database.tcontentdispatch.curd import get_content_by_title_and_category, create_or_update_content
from database.tcontentdispatch.model import TContentDispatch
from database.tmaterial.crud import update_material_by_id, \
get_materials_for_generate_news
from log.log_manager import log
from task.manager_task import execute_task
def generate_news_task():
with get_session() as db:
# 1. 构建今日新闻文章标题格式今日新闻yyyy-MM-dd
title = ("今日新闻" +
time.strftime("%Y", time.localtime()) + '' +
time.strftime("%m", time.localtime()) + '' +
time.strftime("%d", time.localtime()) + '')
# 2. 从内容分发数据表获取当前标题和分类的文章是否存在
content_dispatch = get_content_by_title_and_category(db, title, "新鲜事")
content = ""
if content_dispatch is not None:
content = content_dispatch.content
# 从最后一条获取并计算开始编号
result = re.findall(r'(?<!\.)\d+\. ', content)
start_num = int(re.findall(r'\d+', result[-1])[-1]) + 1
else:
content_dispatch = TContentDispatch(category="新鲜事", title=title, ai_generate=1)
start_num = 1
# 3. 从新闻素材数据表获取房产类的新闻列表
news_list = get_materials_for_generate_news(db)
# 4. 拼接成文章正文content
for i, news in enumerate(news_list, start=start_num): # Using enumerate to control the index starting from 1
content += f"{i}. {news.ai_summary}\n"
# 5. 把content写入数据库
if content is not None and content != "" and news_list is not None:
content_dispatch.content = content
content_dispatch.is_sent = False
create_or_update_content(db, content_dispatch)
# 6. 把news_list更新入数据库更新字段is_usage为True
for news in news_list:
news.is_usage = True
update_material_by_id(db, news)
log(f"generate_news_task finish, news count {start_num + len(news_list)}")
if __name__ == "__main__":
execute_task(generate_news_task)