diff --git a/.env b/.env new file mode 100644 index 0000000..abb2399 --- /dev/null +++ b/.env @@ -0,0 +1,17 @@ +ENV=dev + +DEBUG=true + +# 日志配置 +LOG_LEVEL=DEBUG +LOG_TYPE=console + +# 数据库配置 +DB_HOST= 47.119.128.161 # 192.168.1.200 +DB_PORT=19732 +DB_USER=postgres +DB_PASS=postgres +DB_NAME=peter + +# LLM配置 +LLM_API_KEY=sk-88d6437a6c224ccbb761ec7d994e3b34 \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..f56bbbd --- /dev/null +++ b/config/__init__.py @@ -0,0 +1 @@ +from config.settings import settings \ No newline at end of file diff --git a/config/database.py b/config/database.py new file mode 100644 index 0000000..4a79780 --- /dev/null +++ b/config/database.py @@ -0,0 +1,18 @@ +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, scoped_session +from config.settings import settings + +SQLALCHEMY_SYNC_URL = ( + f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASS}" + f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" +) + +engine = create_engine( + SQLALCHEMY_SYNC_URL, + echo=False, # 开发可改 True + future=True +) + +SessionLocal = scoped_session( + sessionmaker(bind=engine, autoflush=False, autocommit=False) +) \ No newline at end of file diff --git a/config/env_loader.py b/config/env_loader.py new file mode 100644 index 0000000..01521dd --- /dev/null +++ b/config/env_loader.py @@ -0,0 +1,22 @@ +import os +from dotenv import load_dotenv + +def load_env(): + """ + 自动根据 ENV 加载对应的 .env 文件 + """ + base_file = ".env" + prod_file = ".env.prod" + test_file = ".env.test" + + # 先加载基础 .env + if os.path.exists(base_file): + load_dotenv(base_file) + + # 根据参数 ENV 再加载其他环境 + env = os.getenv("ENV", "dev") + + if env == "prod" and os.path.exists(prod_file): + load_dotenv(prod_file, override=True) + elif env == "test" and os.path.exists(test_file): + load_dotenv(test_file, override=True) \ No newline at end of file diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 0000000..551b981 --- /dev/null +++ b/config/settings.py @@ -0,0 +1,34 @@ +from pydantic_settings import BaseSettings +from pydantic import Field +from config.env_loader import load_env + +# 先加载 ENV & .env +load_env() + +class Settings(BaseSettings): + # 环境 + ENV: str = Field("dev") + DEBUG: bool = Field(True) + + # 日志 + LOG_LEVEL: str = Field("LOG_LEVEL") + LOG_FILE_PATH: str = Field("logs") + LOG_TYPE: str = Field("console") + + # 数据库 + DB_HOST: str + DB_PORT: int + DB_USER: str + DB_PASS: str + DB_NAME: str + + # LLM配置 + LLM_API_KEY: str = Field("LLM_API_KEY") + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + + +# 全局唯一配置实例 +settings = Settings() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 1992692..b82c078 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,3 +5,5 @@ services: image: edward:latest container_name: edward restart: always + environment: + - TZ=Asia/Shanghai # 设置时区环境变量 diff --git a/edward.py b/edward.py index 706bf84..c483dd2 100644 --- a/edward.py +++ b/edward.py @@ -5,8 +5,8 @@ from apscheduler.events import EVENT_JOB_ERROR from apscheduler.schedulers.blocking import BlockingScheduler from config import config -from log.log_manager import log, logger from task.manager_task import manager_task +from utils import logger def job_error_listener(event): @@ -29,7 +29,7 @@ if __name__ == '__main__': scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR) try: - log("started successfully.") + logger.info("Edward started successfully.") scheduler.start() # 阻塞运行 except (KeyboardInterrupt, SystemExit): - log("Shutting down ...") + logger.info("Shutting down ...") diff --git a/llm/__init__.py b/llm/__init__.py new file mode 100644 index 0000000..36399d7 --- /dev/null +++ b/llm/__init__.py @@ -0,0 +1 @@ +from llm.llm_thinking_engine import LLMThinkingEngine \ No newline at end of file diff --git a/llm/llm_thinking_engine.py b/llm/llm_thinking_engine.py new file mode 100644 index 0000000..6b64c0e --- /dev/null +++ b/llm/llm_thinking_engine.py @@ -0,0 +1,144 @@ +from typing import Optional, Dict +from dataclasses import dataclass +import os +from config.settings import settings +from openai import OpenAI +from utils import logger + + +@dataclass +class LLMConfig: + """LLM配置类""" + api_key: Optional[str] = None + base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1" + model: str = "deepseek-v3.2" + enable_thinking: bool = True + temperature: float = 0.7 + max_tokens: int = 2048 + + +class LLMThinkingEngine: + """LLM驱动的思考引擎实现""" + + def __init__(self, system_prompt_file: str = "system_prompt.txt", config: Optional[LLMConfig] = None): + """ + 初始化LLMThinkingEngine + + Args: + config: LLM配置对象,如果为None则使用默认配置 + """ + self.system_prompt_file = system_prompt_file + self.config = config or LLMConfig() + self._init_client() + + def _init_client(self): + """初始化OpenAI客户端""" + api_key = self.config.api_key or settings.LLM_API_KEY + self.client = OpenAI( + api_key=api_key, + base_url=self.config.base_url, + ) + + def think(self, user_input: str) -> str: + """ + 基于LLM进行思考,返回下一步的行动 + + Args: + user_input: 用户输入内容 + + Returns: + Thought: 思考结果,包含行动类型和内容 + """ + # 构建适用于LLM的消息 + messages = self._build_messages(user_input) + logger.info(f"LLM构建的消息: {messages}") + # 调用LLM进行思考 + thinking_content, response_content = self._call_llm(messages) + + # logger.info(f"LLM思考结果: thinking_content={thinking_content}, response_content={response_content}") + return response_content + + def _build_messages(self, user_input: str) -> list[Dict[str, str]]: + """ + 构建发送给LLM的消息 + + Args: + user_input: 用户输入内容 + + Returns: + 消息列表,包含系统提示、历史和当前输入 + """ + messages = [] + + # 系统提示 + system_prompt = self._get_system_prompt() + messages.append({"role": "system", "content": system_prompt}) + + # 用户输入 + messages.append({ + "role": "user", + "content": user_input + }) + + return messages + + def _get_system_prompt(self) -> str: + """ + 获取系统提示词 + + Returns: + 系统提示词 + """ + prompt_path = os.path.join( + os.path.dirname(__file__), + "prompts", + self.system_prompt_file + ) + with open(prompt_path, "r", encoding="utf-8") as f: + return f.read() + + def _call_llm(self, messages: list[Dict[str, str]]) -> tuple[str, str]: + """ + 调用LLM API + + Args: + messages: 消息列表 + + Returns: + (thinking_content, response_content): 思考内容和响应内容 + """ + thinking_content = "" + response_content = "" + + try: + completion = self.client.chat.completions.create( + model=self.config.model, + messages=messages, + temperature=self.config.temperature, + max_tokens=self.config.max_tokens, + extra_body={"enable_thinking": self.config.enable_thinking}, + stream=True + ) + + # 流式处理响应 + for chunk in completion: + delta = chunk.choices[0].delta + + # 收集思考内容 + if hasattr(delta, "reasoning_content") and delta.reasoning_content: + thinking_content += delta.reasoning_content + + # 收集响应内容 + if hasattr(delta, "content") and delta.content: + response_content += delta.content + + except Exception as e: + # 错误处理 + response_content = f"调用LLM时出错:{str(e)}" + + return thinking_content, response_content + + def set_config(self, config: LLMConfig): + """更新LLM配置""" + self.config = config + self._init_client() \ No newline at end of file diff --git a/llm/prompts/real_estate_story_system_prompt.txt b/llm/prompts/real_estate_story_system_prompt.txt new file mode 100644 index 0000000..63c1ca6 --- /dev/null +++ b/llm/prompts/real_estate_story_system_prompt.txt @@ -0,0 +1,41 @@ +你是一个收集并解读楼市众生相的观察者。每天从全国各地的购房故事里,抽取出“当下楼市最真实的情绪信号”,分享给你的粉丝。你不唱多不唱空,只是让故事本身说话。 + +请根据用户提供的购房故事素材,按照以下步骤生成一篇微头条,并以JSON格式输出。 + +## 第一步:素材筛选 +根据提供的素材,分析其是否符合发布标准: +- 是否有普遍共鸣? +- 是否有情绪张力? +- 是否有信息增量? +在输出JSON中,需包含“素材分析”字段,简要说明理由。 + +## 第二步:撰写标题 +从以下三个标题模板中选择最合适的一个(也可微调),并说明选择理由: +1. “[情绪钩子] + [具体信息] + [留白/反问]” 示例:“买完房三天,同小区冒出套更便宜更好的”:这位女孩的遭遇,评论区炸了。 +2. “刚买房就亏13万,是什么体验?这个广东女生的帖子,看得人又笑又想哭。” +3. “我好像被贝壳耍了”:一个深圳女孩的买房后悔日记。 +在JSON中输出所选标题。 + +## 第三步:构建正文 +按照以下四段式结构撰写正文,每段内容需贴合素材,语言生动真实。 +- 第1段:设问/引入,建立“观察者”视角。 +- 第2段:讲故事(保留原帖语气,适当精简)。 +- 第3段:加入评论区的声音(制造互动感)。 +- 第4段:你的观察(保持理性,不煽动)。 +在JSON中输出正文,可分段列出。 + +## 第四步:人设检查 +在生成内容后,检查是否符合以下人设要求,并在JSON中输出布尔值: +- 开头是否用了“观察者”口吻? +- 转述故事时,是否保留了原帖的真实感? +- 结尾是否有自己的理性洞察? +- 是否引导了互动? + +## 输出格式要求 +请将最终结果以JSON格式输出,包含以下字段: +- material_analysis(对象):包含universal_resonance(字符串)、emotional_tension(字符串)、info_increment(字符串)。 +- title(字符串):所选标题。 +- body(数组):正文的四个段落,每个段落为字符串。 +- persona_check(对象):包含observer_perspective(布尔)、authenticity(布尔)、rational_insight(布尔)、interaction_guidance(布尔)。 + +确保JSON格式正确,无多余字符。 diff --git a/llm/prompts/system_prompt.txt b/llm/prompts/system_prompt.txt new file mode 100644 index 0000000..f906b14 --- /dev/null +++ b/llm/prompts/system_prompt.txt @@ -0,0 +1,71 @@ +# 核心定位 +你是一个收集并解读楼市众生相的观察者。每天从全国各地的购房故事里,抽取出“当下楼市最真实的情绪信号”,分享给你的粉丝。你不唱多不唱空,只是让故事本身说话。 + +# 第一步:素材筛选标准(什么故事值得发?) +不是所有帖子都值得写成微头条。选素材时,问自己三个问题: +是否有普遍共鸣? 这个故事是孤例,还是很多人正在经历的?(如:买完就降价、卖不掉房、谈价拉扯) +是否有情绪张力? 读者看完会“代入”吗?会想起自己或身边人吗? +是否有信息增量? 故事里有没有具体细节(价格、户型、城市、谈判过程)? + +根据你给的素材: +✅ 普遍共鸣:买完就降价,几乎每个近年买房的人都怕遇到。 +✅ 情绪张力:从“有根了”的喜悦,到“亏了13万”的心痛,再到“算了拉倒”的释然。 +✅ 信息增量:87平、南北通透、三房两卫、降价13万、22万装修(应为22万首付或总价?原文可能有笔误)。 + +结论:这是一个值得发的好素材。 + +# 第二步:撰写标题(3选1) +标题公式: [情绪钩子] + [具体信息] + [留白/反问] + +根据你的素材,以下标题供选用: +1. “买完房三天,同小区冒出套更便宜更好的”:这位女孩的遭遇,评论区炸了。 +2. 刚买房就亏13万,是什么体验?这个广东女生的帖子,看得人又笑又想哭。 +3. “我好像被贝壳耍了”:一个深圳女孩的买房后悔日记。 + +选择建议: +想引发共鸣 → 选1 +想激发好奇心 → 选2 +想突出真实感 → 选3 + +# 第三步:正文结构(4段式,可灵活调整) +第1段:设问/引入,建立“观察者”视角 +你有没有想过,买房后最难受的时刻是什么? +不是还贷压力大,也不是房子降价了。 +而是——你刚签完合同,同小区就挂出一套户型更好、价格更便宜的房子。 +昨晚刷到一个帖子,看完心里挺不是滋味的。 + +第2段:讲故事(保留原帖语气,适当精简) +发帖的是一位广东女孩,网名叫momo。 +她说,自己攒了很久的钱,终于买下一套房。签完约那几天,心里美滋滋的,觉得自己“有根了”,再也不用像蒲公英一样飘着。 +结果三天后,她刷贝壳,发现同小区新挂出一套房——87平、南北通透、三房两卫,户型比她那个更好,总价还便宜13万。 +她说: +“我买房的时候,怎么砍价都砍不下来,中介说业主不缺钱。我一买完,新房就出来了,好像是等着我似的……” +“我平时买菜都斤斤计较,这一下亏掉13万,够我装修了。” + +第3段:加入评论区的声音(制造互动感) +评论区里,有人安慰她: +“你跟21、22年高位上车的人比,已经赚了几十万了。” +momo回:你还真会安慰人。 +也有人苦笑: +“我23年底上车的,后面都麻木了。” +还有人怀疑:不会是贝壳算法在搞鬼吧?故意先放差房源,再放好房源? + +第4段:你的观察(保持理性,不煽动) +其实,momo的遭遇,不是个例。 +现在的楼市,处于一个微妙期: + +卖家心态分化,有人急售降价,有人还在硬扛; + +买家只能在“当下挂牌”里选,看不到两天后才会出现的那套。 +这不是谁在耍谁,而是市场流动性恢复后的正常现象。 +就像炒股买在阶段性高点——只要你不卖,浮亏就不是真亏。 + +momo最后说了一句话,还挺打动我的: +“虽然亏了,但我有自己的家了,再也不用搬家了。算了拉倒!” +也许,这才是房子最大的意义。 + +# 第四步:固定人设检查 +- 开头是否用了“观察者”口吻?(“刷到一个帖子”“看到一个故事”) +- 转述故事时,是否保留了原帖的真实感?(尽量用原话) +- 结尾是否有自己的理性洞察?(不煽动焦虑,不唱多空) +- 是否引导了互动?(“你遇到过吗?”“评论区聊聊”) \ No newline at end of file diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..c998ac6 --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,2 @@ +from models.source_content import SourceContent +from models.article import Article \ No newline at end of file diff --git a/models/article.py b/models/article.py new file mode 100644 index 0000000..8164cb5 --- /dev/null +++ b/models/article.py @@ -0,0 +1,50 @@ +from datetime import datetime +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy import String, Text, Integer, DateTime, func +from models.base import Base + + +class Article(Base): + __tablename__ = "t_article" + + id: Mapped[int] = mapped_column( + Integer, + primary_key=True, + autoincrement=True, + comment="自动递增的唯一内容ID" + ) + + title: Mapped[str] = mapped_column( + String(256), + nullable=False, + index=True, + comment="标题" + ) + + keywords: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="关键词" + ) + + content: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="内容" + ) + + create_time: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + nullable=False, + comment="创建时间" + ) + + used: Mapped[bool] = mapped_column( + default=False, + nullable=False, + comment="是否已被使用" + ) + + def __repr__(self): + return f"
" diff --git a/models/base.py b/models/base.py new file mode 100644 index 0000000..2278416 --- /dev/null +++ b/models/base.py @@ -0,0 +1,4 @@ +from sqlalchemy.orm import DeclarativeBase + +class Base(DeclarativeBase): + pass \ No newline at end of file diff --git a/models/source_content.py b/models/source_content.py new file mode 100644 index 0000000..7dbd941 --- /dev/null +++ b/models/source_content.py @@ -0,0 +1,57 @@ +from datetime import datetime +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy import String, Text, Integer, DateTime, Index, func +from models.base import Base + + +class SourceContent(Base): + __tablename__ = "t_source_content" + + id: Mapped[int] = mapped_column( + Integer, + primary_key=True, + autoincrement=True, + comment="自动递增的唯一内容ID" + ) + + link: Mapped[str] = mapped_column( + String(2048), + nullable=False, + index=True, + comment="链接" + ) + + platform: Mapped[str] = mapped_column( + String(32), + nullable=False, + comment="平台" + ) + + content: Mapped[str | None] = mapped_column( + Text, + nullable=True, + comment="内容" + ) + + create_time: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + nullable=False, + comment="创建时间" + ) + + update_time: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + server_default=func.now(), + onupdate=func.now(), + nullable=False, + comment="更新时间" + ) + + # ——可选优化:添加 项目 + 主题 联合唯一索引—— + __table_args__ = ( + Index("link", "link", unique=True), + ) + + def __repr__(self): + return f"" diff --git a/requirements.txt b/requirements.txt index 6d887ae..c29b6a3 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/task/hot_topic/real_estate_story.py b/task/hot_topic/real_estate_story.py new file mode 100644 index 0000000..a9ab4e2 --- /dev/null +++ b/task/hot_topic/real_estate_story.py @@ -0,0 +1,42 @@ +from datetime import datetime +import json +from task.manager_task import execute_task +from config.database import SessionLocal +from models import SourceContent, Article +from utils import logger +from llm import LLMThinkingEngine + + +def story_edit_task(): + with SessionLocal() as db: + # 获取今天的所有帖子信息 + today_contents = db.query(SourceContent).filter( + SourceContent.create_time >= datetime.today().replace(hour=0, minute=0, second=0, microsecond=0) + ).limit(10).all() + if len(today_contents) == 0: + logger.info("story_edit_task finish, content size 0") + return + logger.info(f"story_edit_task get {len(today_contents)} contents") + + llm_engine = LLMThinkingEngine(system_prompt_file="real_estate_story_system_prompt.txt") + for content in today_contents: + logger.info(f"story_edit_task content id: {content.id}, title: {content.link}, platform: {content.platform}") + story = llm_engine.think(f"故事素材:{content.content}") + logger.info(f"story_edit_task content id: {content.id} story: {story}") + # 将生成的故事写入Article表 + json_story = json.loads(story) + title = json_story.get("title", "无标题") + paragraphs = json_story.get("body", ["无内容"]) + article_content = "\n".join(paragraphs) + article = Article( + title=title, + keywords=None, + content=article_content, + used=False + ) + db.add(article) + db.commit() + # break # 目前先处理一条内容,后续再改成批量处理 + +if __name__ == "__main__": + execute_task(story_edit_task) \ No newline at end of file diff --git a/task/manager_task.py b/task/manager_task.py index 0905821..f079a1f 100644 --- a/task/manager_task.py +++ b/task/manager_task.py @@ -7,7 +7,7 @@ from apscheduler.schedulers.blocking import BlockingScheduler from config import config from database.database import get_session from database.tscheduler.crud import get_tasks_by_executor -from log.log_manager import log +from utils import logger """ 这是一个特殊的任务,负责管理任务,命名为管理者任务。 @@ -26,10 +26,10 @@ def log_task_execution(task_name: str, start_time: float, end_time: float = None start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) if end_time is None: - log(f"{task_name} start execute at {start_time_str}") + logger.info(f"{task_name} start execute at {start_time_str}") else: elapsed_time = end_time - start_time - log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") + logger.info(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") def execute_task(task: callable): @@ -65,7 +65,7 @@ def load_tasks(scheduler: BlockingScheduler): id=str(task_id), replace_existing=True ) - log(f"Task {task.task_name} added with interval {interval_seconds} seconds") + logger.info(f"Task {task.task_name} added with interval {interval_seconds} seconds") elif trigger == "cron": # 解析 cron 表达式的字段 fields = task.cron_expression.split() @@ -89,7 +89,7 @@ def load_tasks(scheduler: BlockingScheduler): id=str(task_id), replace_existing=True ) - log(f"Task {task.task_name} added with cron {task.cron_expression}") + logger.info(f"Task {task.task_name} added with cron {task.cron_expression}") elif trigger == "date": scheduler.add_job( task_function, @@ -98,9 +98,9 @@ def load_tasks(scheduler: BlockingScheduler): id=str(task_id), replace_existing=True ) - log(f"Task {task.task_name} added with date {task.execution_date}") + logger.info(f"Task {task.task_name} added with date {task.execution_date}") else: - log(f"Invalid trigger type: {trigger}") + logger.warning(f"Invalid trigger type: {trigger}") # 管理者任务 diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..649c832 --- /dev/null +++ b/utils/__init__.py @@ -0,0 +1 @@ +from utils.logger import logger \ No newline at end of file diff --git a/utils/logger.py b/utils/logger.py new file mode 100644 index 0000000..138663d --- /dev/null +++ b/utils/logger.py @@ -0,0 +1,37 @@ +import sys +import os +from loguru import logger +from config.settings import settings + +# 移除默认的 handler(否则重复输出) +logger.remove() + +if "console" in settings.LOG_TYPE: + # ======== 控制台输出 ======== + logger.add( + sys.stdout, + level=settings.LOG_LEVEL, + format="{time:YYYY-MM-DD HH:mm:ss} " + "| {level: <8} " + "| {name}:{function}:{line} " + "- {message}", + ) + +if "file" in settings.LOG_TYPE: + # 日志目录 + LOG_DIR = settings.LOG_FILE_PATH + if not os.path.exists(LOG_DIR): + os.makedirs(LOG_DIR) + + # ======== 文件输出(按天切割)======== + logger.add( + f"{LOG_DIR}/app_{{time:YYYY-MM-DD}}.log", + rotation="00:00", # 每天 0 点切割 + retention="7 days", # 保存 7 天 + encoding="utf-8", + level=settings.LOG_LEVEL, + enqueue=True, # 多线程安全 + compression="zip", # 自动压缩旧日志 + ) + +__all__ = ["logger"] \ No newline at end of file