All checks were successful
Gitea Actions Demo / deploy (push) Successful in 13s
292 lines
12 KiB
Python
292 lines
12 KiB
Python
import json
|
|
from llm import prompt
|
|
from utils.logger import logger
|
|
import datetime
|
|
from llm.generate_podcast import generate_topics
|
|
import os
|
|
import asyncio
|
|
from models.script import Script
|
|
from config.database import SessionLocal
|
|
|
|
|
|
def job_heartbeat():
|
|
logger.info(f"[heartbeat] {datetime.datetime.now()}")
|
|
|
|
def job_generate_topics():
|
|
"""定时任务:搜索上一周的热门梗并保存至数据库。"""
|
|
|
|
# 1. 调用 LLM 生成热门梗列表
|
|
topics = generate_topics()
|
|
content = {"topics": topics}
|
|
if not topics:
|
|
logger.warning("No topics generated.")
|
|
return
|
|
|
|
# 2. 构建 Script 实例
|
|
# subject 以当前日期为准,格式 YYYY-MM-DD
|
|
today_str = datetime.datetime.now().strftime("%Y-%m-%d")
|
|
db = SessionLocal()
|
|
try:
|
|
# 查询是否已存在 project+subject 唯一记录
|
|
script = db.query(Script).filter_by(project="梗文化研究所", subject=today_str).first()
|
|
if script:
|
|
# 存在则更新内容
|
|
script.content = json.dumps(content, ensure_ascii=False, separators=(",", ":"))
|
|
db.commit()
|
|
logger.info(f"Updated script for {today_str} with {len(topics)} topics.")
|
|
else:
|
|
# 不存在则新建
|
|
script = Script(
|
|
project="梗文化研究所",
|
|
subject=today_str,
|
|
content=json.dumps(content, ensure_ascii=False, separators=(",", ":"))
|
|
)
|
|
db.add(script)
|
|
db.commit()
|
|
logger.info(f"Saved script for {today_str} with {len(topics)} topics.")
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Failed to save/update script for {today_str}: {e}")
|
|
|
|
|
|
def job_generate_bits():
|
|
"""定时任务:为最新梗生成脱口秀段子并保存至数据库。"""
|
|
db = SessionLocal()
|
|
try:
|
|
# 获取最新的 Script 记录
|
|
script = db.query(Script).filter_by(project="梗文化研究所").order_by(Script.create_time.desc()).first()
|
|
if not script or not script.content:
|
|
logger.warning("No script found for generating bits.")
|
|
return
|
|
|
|
data = json.loads(script.content)
|
|
topics = data.get("topics", [])
|
|
if not topics:
|
|
logger.warning("No topics in the latest script.")
|
|
return
|
|
|
|
# 仅处理第一个梗
|
|
top = topics[0]
|
|
meme_name = top.get("title") or top.get("name") or "未知梗"
|
|
|
|
# 构建研究文本
|
|
parts = []
|
|
if "summary" in top:
|
|
parts.append(f"简介:{top['summary']}")
|
|
if "origin" in top:
|
|
parts.append(f"可能起源:{top['origin']}")
|
|
if "reach_estimate" in top:
|
|
parts.append(f"传播估计:{top['reach_estimate']}")
|
|
if "angles" in top:
|
|
parts.append("角度:" + "; ".join(top.get("angles", [])))
|
|
|
|
research_text = "\n".join(parts)
|
|
|
|
bits = []
|
|
|
|
# 调用 LLM 生成段子
|
|
from llm.generate_podcast import generate_bit
|
|
bit = generate_bit(meme_name, research_text, prompt.prompt_b1)
|
|
logger.debug(f"Generated bits for meme '{meme_name}': {bit}")
|
|
bits.append(bit)
|
|
bit = generate_bit(meme_name, research_text, prompt.prompt_b2)
|
|
logger.debug(f"Generated bits for meme '{meme_name}': {bit}")
|
|
bits.append(bit)
|
|
bit = generate_bit(meme_name, research_text, prompt.prompt_b3)
|
|
logger.debug(f"Generated bits for meme '{meme_name}': {bit}")
|
|
bits.append(bit)
|
|
content = {"topics": topics, "bits": bits}
|
|
script.content = json.dumps(content, ensure_ascii=False, separators=(",", ":"))
|
|
db.commit()
|
|
logger.info(f"Saved bits for meme '{meme_name}' with {len(bits)} segments.")
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Failed to generate/save bits: {e}")
|
|
|
|
|
|
def job_generate_script():
|
|
"""定时任务:为最新梗生成完整脱口秀脚本并保存至数据库。"""
|
|
logger.debug("Starting job_generate_script")
|
|
db = SessionLocal()
|
|
try:
|
|
# 获取最新的 Script 记录
|
|
script = db.query(Script).filter_by(project="梗文化研究所").order_by(Script.create_time.desc()).first()
|
|
if not script or not script.content:
|
|
logger.warning("No script found for generating full script.")
|
|
return
|
|
|
|
data = json.loads(script.content)
|
|
topics = data.get("topics", [])
|
|
bits = data.get("bits", [])
|
|
if not topics:
|
|
logger.warning("No topics in the latest script.")
|
|
return
|
|
if not bits:
|
|
logger.warning("No bits in the latest script.")
|
|
return
|
|
|
|
# 仅处理第一个梗
|
|
top = topics[0]
|
|
meme_name = top.get("title") or top.get("name") or "未知梗"
|
|
logger.debug(f"Generating full script for meme '{meme_name}'")
|
|
# 构建材料文本
|
|
parts = []
|
|
if "summary" in top:
|
|
parts.append(f"简介:{top['summary']}")
|
|
if "origin" in top:
|
|
parts.append(f"可能起源:{top['origin']}")
|
|
if "reach_estimate" in top:
|
|
parts.append(f"传播估计:{top['reach_estimate']}")
|
|
if "angles" in top:
|
|
parts.append("角度:" + "; ".join(top.get("angles", [])))
|
|
|
|
research_text = "\n".join(parts)
|
|
materials_text = research_text + "\n\n" + json.dumps(bits, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
# 调用 LLM 生成完整脚本
|
|
from llm.generate_podcast import generate_script
|
|
full_script = generate_script(meme_name, materials_text)
|
|
content = {"topics": topics, "bits": bits, "script": full_script}
|
|
script.content = json.dumps(content, ensure_ascii=False, separators=(",", ":"))
|
|
db.commit()
|
|
logger.info(f"Saved full script for meme '{meme_name}'.")
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Failed to generate/save full script: {e}")
|
|
|
|
|
|
def job_synthesize_podcast_audio():
|
|
"""定时任务:从数据库读取最新完整脚本,调用 TTS 服务生成整期播客音频并保存到磁盘。"""
|
|
logger.info("Starting job_synthesize_podcast_audio")
|
|
db = SessionLocal()
|
|
try:
|
|
script = db.query(Script).filter_by(project="梗文化研究所").order_by(Script.create_time.desc()).first()
|
|
if not script or not script.content:
|
|
logger.warning("No script found for synthesizing podcast audio.")
|
|
return
|
|
|
|
data = json.loads(script.content)
|
|
full_script = data.get("script") or {}
|
|
if not full_script:
|
|
logger.warning("No 'script' section found in latest Script record.")
|
|
return
|
|
|
|
title = full_script.get("title") or f"podcast_{script.id}"
|
|
script_items = full_script.get("script", [])
|
|
if not script_items:
|
|
logger.warning("Empty script items, nothing to synthesize.")
|
|
return
|
|
|
|
# 按段落分别合成(使用 TTSService 的 text/voice 参数),再拼接音频
|
|
from config.settings import settings
|
|
from tts.service import TTSService
|
|
|
|
# 角色到声音的映射(可按需扩展或放到配置中)
|
|
role_voice_map = {
|
|
"host": settings.TTS_VOICE or "yanglan",
|
|
"guest_a": "zhisheng",
|
|
"guest_b": "trump",
|
|
"guest_c": "tangseng",
|
|
# fallback for other roles
|
|
"default": settings.TTS_VOICE or "yanglan",
|
|
}
|
|
|
|
segment_audio_bytes = []
|
|
|
|
for idx, item in enumerate(script_items):
|
|
role = (item.get("role") or "").lower()
|
|
text = item.get("text", "").strip()
|
|
if not text:
|
|
continue
|
|
|
|
voice = role_voice_map.get(role, role_voice_map["default"]) if role else role_voice_map["default"]
|
|
|
|
try:
|
|
logger.debug(f"Synthesizing segment {idx} role={role} voice={voice} text='{text[:30]}...'")
|
|
seg_audio = asyncio.run(TTSService.synthesize(text=text, voice=voice, language=settings.TTS_LANGUAGE))
|
|
segment_audio_bytes.append((idx, role or "segment", seg_audio))
|
|
logger.debug(f"Synthesized segment {idx} role={role} size={seg_audio.getbuffer().nbytes}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to synthesize segment {idx} (role={role}): {e}")
|
|
|
|
if not segment_audio_bytes:
|
|
logger.warning("No audio segments synthesized; aborting podcast save.")
|
|
return
|
|
|
|
# 保存每个分段为独立文件,并记录它们
|
|
segment_out_dir = os.path.join("output", "segments")
|
|
if os.path.exists(segment_out_dir):
|
|
# 删除旧文件
|
|
for f in os.listdir(segment_out_dir):
|
|
os.remove(os.path.join(segment_out_dir, f))
|
|
else:
|
|
os.makedirs(segment_out_dir, exist_ok=True)
|
|
segment_paths = []
|
|
safe_title = "_".join(title.split())
|
|
for idx, role, seg in sorted(segment_audio_bytes, key=lambda x: x[0]):
|
|
seg.seek(0)
|
|
seg_filename = f"{safe_title}_{script.subject}_{script.id}_seg{idx}_{role}.wav"
|
|
seg_path = os.path.join(segment_out_dir, seg_filename)
|
|
with open(seg_path, "wb") as fw:
|
|
fw.write(seg.getvalue())
|
|
segment_paths.append(seg_path)
|
|
|
|
logger.info(f"Saved {len(segment_paths)} segment files to {segment_out_dir}; combined file not created")
|
|
|
|
# 从segment_out_dir读取音频并合并
|
|
segment_audio_to_combined_bytes = []
|
|
for seg_path in segment_paths:
|
|
with open(seg_path, "rb") as fr:
|
|
audio_data = fr.read()
|
|
from io import BytesIO
|
|
segment_audio_to_combined_bytes.append((0, "segment", BytesIO(audio_data)))
|
|
# seg = segment_audio_to_combined_bytes[0][2]
|
|
# data = seg.getvalue()
|
|
# print("LEN:", len(data))
|
|
# print("HEAD (hex):", data[:32].hex())
|
|
# print("HEAD (ascii):", data[:32])
|
|
|
|
# 保存或合并音频:优先使用 pydub (ffmpeg),否则保存为独立段文件
|
|
out_dir = os.path.join("output", "podcasts")
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
final_filename = f"{safe_title}_{script.subject}_{script.id}.wav"
|
|
final_path = os.path.join(out_dir, final_filename)
|
|
|
|
try:
|
|
from pydub import AudioSegment
|
|
|
|
combined = None
|
|
for idx, role, seg in sorted(segment_audio_to_combined_bytes, key=lambda x: x[0]):
|
|
seg.seek(0)
|
|
audio_seg = AudioSegment.from_file(seg, format="wav")
|
|
if combined is None:
|
|
combined = audio_seg
|
|
else:
|
|
combined = combined + audio_seg
|
|
|
|
if combined is not None:
|
|
combined.export(final_path, format="wav")
|
|
logger.info(f"Saved combined podcast audio to {final_path}")
|
|
return
|
|
|
|
except Exception as e:
|
|
logger.warning(f"pydub/ffmpeg not available or merge failed: {e}; falling back to per-segment files")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to synthesize/save podcast audio: {e}")
|
|
finally:
|
|
db.close()
|
|
|
|
# For manual testing
|
|
# if __name__ == "__main__":
|
|
# 选题策划和背景素材搜集
|
|
# job_generate_topics()
|
|
|
|
# 脱口秀段子创作
|
|
# job_generate_bits()
|
|
|
|
# 完整播客脚本生成
|
|
# job_generate_script()
|
|
|
|
# 播客音频合成
|
|
# job_synthesize_podcast_audio() |