From f796a3833b70727013970101ccd9dd86c6d15ada Mon Sep 17 00:00:00 2001 From: konjacpotato Date: Mon, 24 Nov 2025 21:45:12 +0800 Subject: [PATCH] commit code --- llm/generate_podcast.py | 194 ++++++++++++++++++++++++++++++++++++++++ llm/prompt.py | 113 +++++++++++++++++++++++ requirements.txt | 3 +- scheduler/jobs.py | 155 +++++++++++++++++++++++++++++++- 4 files changed, 463 insertions(+), 2 deletions(-) create mode 100644 llm/generate_podcast.py create mode 100644 llm/prompt.py diff --git a/llm/generate_podcast.py b/llm/generate_podcast.py new file mode 100644 index 0000000..1e16ae1 --- /dev/null +++ b/llm/generate_podcast.py @@ -0,0 +1,194 @@ +import json +from datetime import datetime, timedelta, timezone +import re +from typing import Any, Dict, List, Optional + +from openai import OpenAI + +from config.settings import settings +from llm import prompt as prompts +from utils.logger import logger + + +BASE_URL = "https://dashscope.aliyuncs.com/compatible-mode/v1" +MODEL = "deepseek-v3.2-exp" + + +def _make_client() -> OpenAI: + return OpenAI(api_key=settings.DASHSCOPE_API_KEY, base_url=BASE_URL) + + +def _call_model(system_prompt: Optional[str], user_prompt: str, stream: bool = False) -> Any: + client = _make_client() + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + messages.append({"role": "user", "content": user_prompt}) + + # Non-streaming call for simplicity + resp = client.chat.completions.create(model=MODEL, messages=messages, stream=stream) + # When stream=False the SDK typically returns a full object; content location may vary. + # We'll try common access patterns. + try: + # OpenAI-compatible: resp.choices[0].message.content + return resp.choices[0].message.content + except Exception: + try: + # fallback: resp.choices[0].text + return resp.choices[0].text + except Exception: + # As last resort, return raw resp + return resp + + +def _extract_json(text: str) -> str: + """Attempt to extract the first JSON object/array from text.""" + if not isinstance(text, str): + raise ValueError("Expected text to be str") + # Find first '[' or '{' + start_idx = None + for i, ch in enumerate(text): + if ch in "[{": + start_idx = i + break + if start_idx is None: + raise ValueError("No JSON object/array found in text") + + # Try to find a matching closing bracket by scanning and counting + stack = [] + for j in range(start_idx, len(text)): + ch = text[j] + if ch in "{[": + stack.append(ch) + elif ch in "]}": + if not stack: + continue + opening = stack.pop() + if (opening == "{" and ch != "}") or (opening == "[" and ch != "]"): + # mismatched, continue + continue + if not stack: + return text[start_idx : j + 1] + + # Fallback: try regex to capture last '}' or ']' occurrence + m = re.search(r"(\{.*\}|\[.*\])", text, re.S) + if m: + return m.group(1) + raise ValueError("Could not extract JSON from model output") + + +def _parse_json_safe(text: str) -> Any: + try: + return json.loads(text) + except Exception: + # try to extract JSON substring + jtext = _extract_json(text) + return json.loads(jtext) + + +def generate_topics(start_time: Optional[str] = None, end_time: Optional[str] = None) -> List[Dict[str, Any]]: + """Call prompt_a to get a list of candidate meme topics. + + If start_time/end_time are provided (YYYY-MM-DD), they will be injected into the prompt + to limit the timeframe the model should scan. + + If start_time/end_time are not provided, default to the last 7 days: + end_time = today (UTC, YYYY-MM-DD) + start_time = end_time - 7 days + + Both parameters should be strings in YYYY-MM-DD format when provided. + """ + # compute defaults (UTC) + if end_time is None: + end_date = datetime.now(timezone.utc).date() + end_time = end_date.isoformat() + + if start_time is None: + start_date = end_date - timedelta(days=7) + start_time = start_date.isoformat() + + user_prompt = prompts.prompt_a + # If the prompt contains the literal placeholder, replace it; otherwise append a time line. + if "start_time ~ end_time" in user_prompt: + if start_time is None: + start_time = "" + if end_time is None: + end_time = "" + user_prompt = user_prompt.replace("start_time ~ end_time", f"{start_time} ~ {end_time}") + + logger.debug(f"prompt for generate_topics:\n{user_prompt}") + + content = _call_model(system_prompt=None, user_prompt=user_prompt) + logger.debug(f"raw output from generate_topics:\n{content}") + if isinstance(content, (dict, list)): + return content + text = content if isinstance(content, str) else str(content) + data = _parse_json_safe(text) + if not isinstance(data, list): + raise ValueError("prompt_a did not return a JSON array") + logger.debug(f"result for generate_topics:\n{data}") + return data + + +def generate_bits(meme_name: str, research_text: str, prompt_bit: str = prompts.prompt_b) -> Dict[str, Any]: + user_prompt = prompt_bit + f"\n\nmeme_name: {meme_name}\nresearch:\n{research_text}\n" + content = _call_model(system_prompt=None, user_prompt=user_prompt) + text = content if isinstance(content, str) else str(content) + data = _parse_json_safe(text) + return data + +def generate_bit(meme_name: str, research_text: str, prompt_bit: str) -> Dict[str, Any]: + user_prompt = prompt_bit + f"\n\nmeme_name: {meme_name}\nresearch:\n{research_text}\n" + content = _call_model(system_prompt=None, user_prompt=user_prompt) + text = content if isinstance(content, str) else str(content) + data = _parse_json_safe(text) + return data + + +def generate_script(meme_name: str, materials_text: str) -> Dict[str, Any]: + user_prompt = prompts.prompt_c + f"\n\nmeme_name: {meme_name}\nmaterials:\n{materials_text}\n" + content = _call_model(system_prompt=None, user_prompt=user_prompt) + text = content if isinstance(content, str) else str(content) + data = _parse_json_safe(text) + return data + + +def orchestrate_for_first_topic() -> Dict[str, Any]: + """High-level orchestration: pick first topic, synthesize research, create bits and final script.""" + topics = generate_topics() + if not topics: + raise RuntimeError("No topics returned") + + top = topics[0] + meme = top.get("title") or top.get("name") or "未知梗" + + # Build a concise research text from topic fields + parts = [] + if "summary" in top: + parts.append(f"简介:{top['summary']}") + if "origin" in top: + parts.append(f"可能起源:{top['origin']}") + if "reach_estimate" in top: + parts.append(f"传播估计:{top['reach_estimate']}") + if "angles" in top: + parts.append("角度:" + "; ".join(top.get("angles", []))) + + research_text = "\n".join(parts) + + bits = generate_bits(meme, research_text) + + # Combine materials: human-crafted research + selected bits + materials = research_text + "\n\n" + json.dumps(bits, ensure_ascii=False, indent=2) + + script = generate_script(meme, materials) + + return {"topic": top, "bits": bits, "script": script} + + +if __name__ == "__main__": + # quick sanity check when run as script (will call API if keys present) + try: + out = orchestrate_for_first_topic() + print(json.dumps(out, ensure_ascii=False, indent=2)) + except Exception as e: + print(f"Error during orchestration: {e}") diff --git a/llm/prompt.py b/llm/prompt.py new file mode 100644 index 0000000..3549295 --- /dev/null +++ b/llm/prompt.py @@ -0,0 +1,113 @@ + + +prompt_a = """ +你是网络文化研究员。请扫描近一周(start_time ~ end_time)中文互联网的热点,挑选并输出5个适合做播客主题的“梗”。 + +输出要求(严格返回 JSON 数组,仅输出 JSON,不要额外解释): +[ + { + "title": "梗名称(不超过6字)", + "summary": "一句话简述(≤30字)", + "origin": "可能起源平台或事件(1-2项)", + "reach_estimate": "传播广度估计(简短量化或描述,如“百万级阅读”/“小范围社群内”)", + "angles": ["值得深挖的文化/社会角度(1-3项)"], + "debut_time": "首次出现时间(精确到日,格式YYYY-MM-DD)" + }, + ... +] + +每项尽量简明扼要,避免长段落。字段内容中文优先,数值或量级请尽量提供简短量化表述。 +""" + +prompt_b = """ +你是脱口秀编剧。输入两个变量: +- meme_name:要写段子的梗名称(字符串) +- research:关于该梗的深度研究文本(字符串) + +根据以上输入,创作3篇风格不同的脱口秀段子,要求如下并严格返回 JSON 对象(仅输出 JSON): +{ + "meme": "梗名称", + "bits": [ + {"style": "观察生活", "text": "…(口语化,适合朗读,含‘铺垫->笑点’结构,1000-1200字)"}, + {"style": "夸张讽刺", "text": "…(夸张视角,含‘铺垫->笑点’结构,1000-1200字)"}, + {"style": "角色扮演", "text": "…(以第一人称表演,含‘铺垫->笑点’结构,1000-1200字)"} + ] +} + +要求:语言口语化、节奏感强,避免书面化长句;每段保留清晰的‘铺垫-笑点’节奏。不要添加额外说明或元信息。 +""" + +prompt_b1 = """ +你是脱口秀编剧。输入两个变量: +- meme_name:要写段子的梗名称(字符串) +- research:关于该梗的深度研究文本(字符串) + +根据以上输入,创作3篇风格不同的脱口秀段子,要求如下并严格返回 JSON 对象(仅输出 JSON): +{ + "meme": "梗名称", + "style": "观察生活", + "text": "…(口语化,适合朗读,含‘铺垫->笑点’结构,1000-1200字)" +} + +要求:语言口语化、节奏感强,避免书面化长句;每段保留清晰的‘铺垫-笑点’节奏。不要添加额外说明或元信息。 +""" + +prompt_b2 = """ +你是脱口秀编剧。输入两个变量: +- meme_name:要写段子的梗名称(字符串) +- research:关于该梗的深度研究文本(字符串) + +根据以上输入,创作3篇风格不同的脱口秀段子,要求如下并严格返回 JSON 对象(仅输出 JSON): +{ + "meme": "梗名称", + "style": "夸张讽刺", + "text": "…(夸张视角,含‘铺垫->笑点’结构,1000-1200字)" +} + +要求:语言口语化、节奏感强,避免书面化长句;每段保留清晰的‘铺垫-笑点’节奏。不要添加额外说明或元信息。 +""" + +prompt_b3 = """ +你是脱口秀编剧。输入两个变量: +- meme_name:要写段子的梗名称(字符串) +- research:关于该梗的深度研究文本(字符串) + +根据以上输入,创作3篇风格不同的脱口秀段子,要求如下并严格返回 JSON 对象(仅输出 JSON): +{ + "meme": "梗名称", + "style": "角色扮演", + "text": "…(以第一人称表演,含‘铺垫->笑点’结构,1000-1200字)" +} + +要求:语言口语化、节奏感强,避免书面化长句;每段保留清晰的‘铺垫-笑点’节奏。不要添加额外说明或元信息。 +""" + +prompt_c = """ +你是播客编剧。输入两个变量: +- meme_name:梗名称(字符串) +- materials:包含“深度研究”与若干脱口秀段子的文本(字符串),已由人工筛选 + +任务:把 materials 整合成一篇完整的播客文稿,结构严格按照:开场白 -> 梗介绍 -> 起源考据 -> 传播路径 -> 影响分析 -> 脱口秀环节(插入2-3个段子) -> 结束语 + +输出格式(严格 JSON,对话按顺序列出,角色限定为 host/guest): +{ + "title": "节目标题(建议不超12字)", + "script": [ + {"role": "host", "text": "开场白(口语化,20-60字)"}, + {"role": "host", "text": "梗介绍(简明,30-80字)"}, + {"role": "guest", "text": "起源考据(40-120字)"}, + {"role": "host", "text": "传播路径(30-80字)"}, + {"role": "guest", "text": "影响分析(40-120字)"}, + {"role": "host", "text": "转入脱口秀环节的台词(15-40字)"}, + {"role": "guest", "text": "段子A(来自 materials,1000-1200字)"}, + {"role": "guest", "text": "段子B(来自 materials,1000-1200字)"}, + {"role": "guest", "text": "段子C(来自 materials,1000-1200字)"}, + {"role": "host", "text": "结束语(15-40字)"} + ] +} + +要求: +- 语言口语化,避免书面语;角色语气分别为:host(理性、引导)、guest(幽默、即兴)。 +- 在 script 中只保留最终可直接朗读的台词,不要加入编剧说明或括注。每段尽量简洁,便于主播读出。 +- 严格输出 JSON,不要额外解释或多余文本。 +""" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index cd230d4..1be8ed8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,5 @@ alembic apscheduler fastapi uvicorn -gunicorn \ No newline at end of file +gunicorn +openai \ No newline at end of file diff --git a/scheduler/jobs.py b/scheduler/jobs.py index 96af507..49cb10e 100644 --- a/scheduler/jobs.py +++ b/scheduler/jobs.py @@ -1,6 +1,159 @@ +import json +from llm import prompt from utils.logger import logger import datetime +from llm.generate_podcast import generate_topics +from models.script import Script +from config.database import SessionLocal def job_heartbeat(): - logger.info(f"[heartbeat] {datetime.datetime.now()}") \ No newline at end of file + logger.info(f"[heartbeat] {datetime.datetime.now()}") + +def job_generate_topics(): + """定时任务:搜索上一周的热门梗并保存至数据库。""" + + # 1. 调用 LLM 生成热门梗列表 + topics = generate_topics() + content = {"topics": topics} + if not topics: + logger.warning("No topics generated.") + return + + # 2. 构建 Script 实例 + # subject 以当前日期为准,格式 YYYY-MM-DD + today_str = datetime.datetime.now().strftime("%Y-%m-%d") + db = SessionLocal() + try: + # 查询是否已存在 project+subject 唯一记录 + script = db.query(Script).filter_by(project="梗文化研究所", subject=today_str).first() + if script: + # 存在则更新内容 + script.content = json.dumps(content, ensure_ascii=False, indent=2) + db.commit() + logger.info(f"Updated script for {today_str} with {len(topics)} topics.") + else: + # 不存在则新建 + script = Script( + project="梗文化研究所", + subject=today_str, + content=json.dumps(content, ensure_ascii=False, indent=2) + ) + db.add(script) + db.commit() + logger.info(f"Saved script for {today_str} with {len(topics)} topics.") + except Exception as e: + db.rollback() + logger.error(f"Failed to save/update script for {today_str}: {e}") + + +def job_generate_bits(): + """定时任务:为最新梗生成脱口秀段子并保存至数据库。""" + db = SessionLocal() + try: + # 获取最新的 Script 记录 + script = db.query(Script).filter_by(project="梗文化研究所").order_by(Script.create_time.desc()).first() + if not script or not script.content: + logger.warning("No script found for generating bits.") + return + + data = json.loads(script.content) + topics = data.get("topics", []) + if not topics: + logger.warning("No topics in the latest script.") + return + + # 仅处理第一个梗 + top = topics[0] + meme_name = top.get("title") or top.get("name") or "未知梗" + + # 构建研究文本 + parts = [] + if "summary" in top: + parts.append(f"简介:{top['summary']}") + if "origin" in top: + parts.append(f"可能起源:{top['origin']}") + if "reach_estimate" in top: + parts.append(f"传播估计:{top['reach_estimate']}") + if "angles" in top: + parts.append("角度:" + "; ".join(top.get("angles", []))) + + research_text = "\n".join(parts) + + bits = [] + + # 调用 LLM 生成段子 + from llm.generate_podcast import generate_bit + bit = generate_bit(meme_name, research_text, prompt.prompt_b1) + logger.debug(f"Generated bits for meme '{meme_name}': {bit}") + bits.append(bit) + bit = generate_bit(meme_name, research_text, prompt.prompt_b2) + logger.debug(f"Generated bits for meme '{meme_name}': {bit}") + bits.append(bit) + bit = generate_bit(meme_name, research_text, prompt.prompt_b3) + logger.debug(f"Generated bits for meme '{meme_name}': {bit}") + bits.append(bit) + content = {"topics": topics, "bits": bits} + script.content = json.dumps(content, ensure_ascii=False, indent=2) + db.commit() + logger.info(f"Saved bits for meme '{meme_name}' with {len(bits)} segments.") + except Exception as e: + db.rollback() + logger.error(f"Failed to generate/save bits: {e}") + + +def job_generate_script(): + """定时任务:为最新梗生成完整脱口秀脚本并保存至数据库。""" + logger.debug("Starting job_generate_script") + db = SessionLocal() + try: + # 获取最新的 Script 记录 + script = db.query(Script).filter_by(project="梗文化研究所").order_by(Script.create_time.desc()).first() + if not script or not script.content: + logger.warning("No script found for generating full script.") + return + + data = json.loads(script.content) + topics = data.get("topics", []) + bits = data.get("bits", []) + if not topics: + logger.warning("No topics in the latest script.") + return + if not bits: + logger.warning("No bits in the latest script.") + return + + # 仅处理第一个梗 + top = topics[0] + meme_name = top.get("title") or top.get("name") or "未知梗" + logger.debug(f"Generating full script for meme '{meme_name}'") + # 构建材料文本 + parts = [] + if "summary" in top: + parts.append(f"简介:{top['summary']}") + if "origin" in top: + parts.append(f"可能起源:{top['origin']}") + if "reach_estimate" in top: + parts.append(f"传播估计:{top['reach_estimate']}") + if "angles" in top: + parts.append("角度:" + "; ".join(top.get("angles", []))) + + research_text = "\n".join(parts) + materials_text = research_text + "\n\n" + json.dumps(bits, ensure_ascii=False, indent=2) + + # 调用 LLM 生成完整脚本 + from llm.generate_podcast import generate_script + full_script = generate_script(meme_name, materials_text) + content = {"topics": topics, "bits": bits, "script": full_script} + script.content = json.dumps(content, ensure_ascii=False, indent=2) + db.commit() + logger.info(f"Saved full script for meme '{meme_name}'.") + except Exception as e: + db.rollback() + logger.error(f"Failed to generate/save full script: {e}") + +# For manual testing +if __name__ == "__main__": + # job_generate_topics() + # job_generate_bits() + job_generate_script() \ No newline at end of file