from datetime import datetime import json from task.manager_task import execute_task from config.database import SessionLocal from models import SourceContent, Article from utils import logger from llm import LLMThinkingEngine def story_edit_task(): with SessionLocal() as db: # 获取今天的帖子(限定,最多50条) today_contents = db.query(SourceContent).filter( SourceContent.create_time >= (datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)) ).order_by(SourceContent.create_time.desc()).limit(50).all() if len(today_contents) == 0: logger.info("story_edit_task finish, content size 0") return logger.info(f"story_edit_task get {len(today_contents)} contents") # 按照帖子正文字数排序 # 定义提取函数:解析JSON并返回content字段长度 def get_content_length(item): try: if not item.content: return 0 data = json.loads(item.content) # 安全获取 content 字段,避免 None body = data.get('content') or '' return len(body) except (json.JSONDecodeError, TypeError, AttributeError): return 0 today_contents.sort(key=lambda x: get_content_length(x), reverse=True) # 去掉帖子正文字数小于200的帖子 to_processed_contents = [content for content in today_contents if get_content_length(content) >= 200] logger.info(f"story_edit_task after filter content size {len(to_processed_contents)}") # 如果没有符合条件的帖子,直接使用字数最多的帖子(即使它的字数小于200) if len(to_processed_contents) == 0 and len(today_contents) > 0: to_processed_contents = [today_contents[0]] # 下面会调用LLM对帖子进行筛选,此处限定所有帖子的正文字数之和不超过10000字(成本安全考虑) total_length = sum(get_content_length(content) for content in to_processed_contents) if total_length > 10000: # 从字数最多的帖子开始,逐步移除,直到总字数不超过10000 while total_length > 10000 and to_processed_contents: removed_content = to_processed_contents.pop() total_length -= get_content_length(removed_content) # 如果to_processed_contents数量超过2条,则让LLM从中选择2条最适合创作故事的帖子 # 定义提取函数:解析JSON并返回content内容 def get_content(item): try: if not item.content: return "" data = json.loads(item.content) # 安全获取 content 字段,避免 None body = data.get('content') or '' return body except (json.JSONDecodeError, TypeError, AttributeError): return "" if len(to_processed_contents) > 2: llm_engine = LLMThinkingEngine(system_prompt_file="real_estate_story_selection_system_prompt.txt") content_list_str = "\n".join([f"{idx+1}. {get_content(content)}" for idx, content in enumerate(to_processed_contents)]) logger.info(f"story_edit_task LLM selection content list: {content_list_str}") selection_result = llm_engine.think(content_list_str) logger.info(f"story_edit_task LLM selection result: {selection_result}") # 解析LLM的选择结果,提取出数字编号 selected_indices = [] for part in selection_result.split(","): part = part.strip() if part.isdigit(): idx = int(part) - 1 if 0 <= idx < len(to_processed_contents): selected_indices.append(idx) if len(selected_indices) >= 2: break to_processed_contents = [to_processed_contents[idx] for idx in selected_indices] logger.info(f"story_edit_task after LLM selection content size {len(to_processed_contents)}") # 下面是对筛选后的帖子进行故事创作,目前先处理一条内容,后续再改成批量处理 llm_engine = LLMThinkingEngine(system_prompt_file="wechat_official_account_system_prompt.txt") for content in to_processed_contents: logger.info(f"story_edit_task content id: {content.id}, title: {content.link}, platform: {content.platform}") story = llm_engine.think(f"【素材内容】\n{content.content}") logger.info(f"story_edit_task content id: {content.id} story: {story}") # llm生成的结果有时不是json结构,会在前后增加一些文本,需要提取出json部分进行解析 try: json_start = story.find("{") json_end = story.rfind("}") + 1 if json_start != -1 and json_end != -1: story = story[json_start:json_end] else: logger.warning(f"story_edit_task content id: {content.id} llm生成的结果不是有效的json格式,无法提取故事内容") continue except json.JSONDecodeError: logger.warning(f"story_edit_task content id: {content.id} llm生成的结果不是有效的json格式,无法解析故事内容") continue # 将生成的故事写入Article表 json_story = json.loads(story) title = json_story.get("title", "无标题") article_content = json_story.get("body", "无内容") # article_content有连续多个换行的情况,替换成单个换行 # article_content = "\n".join([line.strip() for line in article_content.splitlines() if line.strip()]) article = Article( title=title, keywords=None, content=article_content, used=False ) db.add(article) db.commit() # break # 目前先处理一条内容,后续再改成批量处理 llm_engine = LLMThinkingEngine(system_prompt_file="real_estate_story_system_prompt.txt") for content in to_processed_contents: logger.info(f"story_edit_task content id: {content.id}, title: {content.link}, platform: {content.platform}") story = llm_engine.think(f"故事素材:{content.content}") logger.info(f"story_edit_task content id: {content.id} story: {story}") # llm生成的结果有时不是json结构,会在前后增加一些文本,需要提取出json部分进行解析 try: json_start = story.find("{") json_end = story.rfind("}") + 1 if json_start != -1 and json_end != -1: story = story[json_start:json_end] else: logger.warning(f"story_edit_task content id: {content.id} llm生成的结果不是有效的json格式,无法提取故事内容") continue except json.JSONDecodeError: logger.warning(f"story_edit_task content id: {content.id} llm生成的结果不是有效的json格式,无法解析故事内容") continue # 将生成的故事写入Article表 json_story = json.loads(story) title = json_story.get("title", "无标题") paragraphs = json_story.get("body", ["无内容"]) article_content = "\n".join(paragraphs) article = Article( title=title, keywords=None, content=article_content, used=False ) db.add(article) db.commit() # break # 目前先处理一条内容,后续再改成批量处理 llm_engine = LLMThinkingEngine(system_prompt_file="real_estate_story_short_system_prompt.txt") for content in to_processed_contents: logger.info(f"story_edit_task content id: {content.id}, title: {content.link}, platform: {content.platform}") story = llm_engine.think(f"故事素材:{content.content}") logger.info(f"story_edit_task content id: {content.id} story: {story}") # llm生成的结果有时不是json结构,会在前后增加一些文本,需要提取出json部分进行解析 try: json_start = story.find("{") json_end = story.rfind("}") + 1 if json_start != -1 and json_end != -1: story = story[json_start:json_end] else: logger.warning(f"story_edit_task content id: {content.id} llm生成的结果不是有效的json格式,无法提取故事内容") continue except json.JSONDecodeError: logger.warning(f"story_edit_task content id: {content.id} llm生成的结果不是有效的json格式,无法解析故事内容") continue # 将生成的故事写入Article表 json_story = json.loads(story) title = json_story.get("title", "无标题") paragraphs = json_story.get("body", ["无内容"]) article_content = "\n".join(paragraphs) article = Article( title=title, keywords=None, content=article_content, used=False ) db.add(article) db.commit() # break # 目前先处理一条内容,后续再改成批量处理 if __name__ == "__main__": execute_task(story_edit_task)