task: add real estate story
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 26s

This commit is contained in:
konjacpotato
2026-02-15 15:29:13 +08:00
parent 5267db8a0d
commit 1e2d739d00
20 changed files with 554 additions and 10 deletions

17
.env Normal file
View File

@ -0,0 +1,17 @@
ENV=dev
DEBUG=true
# 日志配置
LOG_LEVEL=DEBUG
LOG_TYPE=console
# 数据库配置
DB_HOST= 47.119.128.161 # 192.168.1.200
DB_PORT=19732
DB_USER=postgres
DB_PASS=postgres
DB_NAME=peter
# LLM配置
LLM_API_KEY=sk-88d6437a6c224ccbb761ec7d994e3b34

1
config/__init__.py Normal file
View File

@ -0,0 +1 @@
from config.settings import settings

18
config/database.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from config.settings import settings
SQLALCHEMY_SYNC_URL = (
f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASS}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
engine = create_engine(
SQLALCHEMY_SYNC_URL,
echo=False, # 开发可改 True
future=True
)
SessionLocal = scoped_session(
sessionmaker(bind=engine, autoflush=False, autocommit=False)
)

22
config/env_loader.py Normal file
View File

@ -0,0 +1,22 @@
import os
from dotenv import load_dotenv
def load_env():
"""
自动根据 ENV 加载对应的 .env 文件
"""
base_file = ".env"
prod_file = ".env.prod"
test_file = ".env.test"
# 先加载基础 .env
if os.path.exists(base_file):
load_dotenv(base_file)
# 根据参数 ENV 再加载其他环境
env = os.getenv("ENV", "dev")
if env == "prod" and os.path.exists(prod_file):
load_dotenv(prod_file, override=True)
elif env == "test" and os.path.exists(test_file):
load_dotenv(test_file, override=True)

34
config/settings.py Normal file
View File

@ -0,0 +1,34 @@
from pydantic_settings import BaseSettings
from pydantic import Field
from config.env_loader import load_env
# 先加载 ENV & .env
load_env()
class Settings(BaseSettings):
# 环境
ENV: str = Field("dev")
DEBUG: bool = Field(True)
# 日志
LOG_LEVEL: str = Field("LOG_LEVEL")
LOG_FILE_PATH: str = Field("logs")
LOG_TYPE: str = Field("console")
# 数据库
DB_HOST: str
DB_PORT: int
DB_USER: str
DB_PASS: str
DB_NAME: str
# LLM配置
LLM_API_KEY: str = Field("LLM_API_KEY")
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# 全局唯一配置实例
settings = Settings()

View File

@ -5,3 +5,5 @@ services:
image: edward:latest
container_name: edward
restart: always
environment:
- TZ=Asia/Shanghai # 设置时区环境变量

View File

@ -5,8 +5,8 @@ from apscheduler.events import EVENT_JOB_ERROR
from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from log.log_manager import log, logger
from task.manager_task import manager_task
from utils import logger
def job_error_listener(event):
@ -29,7 +29,7 @@ if __name__ == '__main__':
scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR)
try:
log("started successfully.")
logger.info("Edward started successfully.")
scheduler.start() # 阻塞运行
except (KeyboardInterrupt, SystemExit):
log("Shutting down ...")
logger.info("Shutting down ...")

1
llm/__init__.py Normal file
View File

@ -0,0 +1 @@
from llm.llm_thinking_engine import LLMThinkingEngine

144
llm/llm_thinking_engine.py Normal file
View File

@ -0,0 +1,144 @@
from typing import Optional, Dict
from dataclasses import dataclass
import os
from config.settings import settings
from openai import OpenAI
from utils import logger
@dataclass
class LLMConfig:
"""LLM配置类"""
api_key: Optional[str] = None
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
model: str = "deepseek-v3.2"
enable_thinking: bool = True
temperature: float = 0.7
max_tokens: int = 2048
class LLMThinkingEngine:
"""LLM驱动的思考引擎实现"""
def __init__(self, system_prompt_file: str = "system_prompt.txt", config: Optional[LLMConfig] = None):
"""
初始化LLMThinkingEngine
Args:
config: LLM配置对象如果为None则使用默认配置
"""
self.system_prompt_file = system_prompt_file
self.config = config or LLMConfig()
self._init_client()
def _init_client(self):
"""初始化OpenAI客户端"""
api_key = self.config.api_key or settings.LLM_API_KEY
self.client = OpenAI(
api_key=api_key,
base_url=self.config.base_url,
)
def think(self, user_input: str) -> str:
"""
基于LLM进行思考返回下一步的行动
Args:
user_input: 用户输入内容
Returns:
Thought: 思考结果,包含行动类型和内容
"""
# 构建适用于LLM的消息
messages = self._build_messages(user_input)
logger.info(f"LLM构建的消息: {messages}")
# 调用LLM进行思考
thinking_content, response_content = self._call_llm(messages)
# logger.info(f"LLM思考结果: thinking_content={thinking_content}, response_content={response_content}")
return response_content
def _build_messages(self, user_input: str) -> list[Dict[str, str]]:
"""
构建发送给LLM的消息
Args:
user_input: 用户输入内容
Returns:
消息列表,包含系统提示、历史和当前输入
"""
messages = []
# 系统提示
system_prompt = self._get_system_prompt()
messages.append({"role": "system", "content": system_prompt})
# 用户输入
messages.append({
"role": "user",
"content": user_input
})
return messages
def _get_system_prompt(self) -> str:
"""
获取系统提示词
Returns:
系统提示词
"""
prompt_path = os.path.join(
os.path.dirname(__file__),
"prompts",
self.system_prompt_file
)
with open(prompt_path, "r", encoding="utf-8") as f:
return f.read()
def _call_llm(self, messages: list[Dict[str, str]]) -> tuple[str, str]:
"""
调用LLM API
Args:
messages: 消息列表
Returns:
(thinking_content, response_content): 思考内容和响应内容
"""
thinking_content = ""
response_content = ""
try:
completion = self.client.chat.completions.create(
model=self.config.model,
messages=messages,
temperature=self.config.temperature,
max_tokens=self.config.max_tokens,
extra_body={"enable_thinking": self.config.enable_thinking},
stream=True
)
# 流式处理响应
for chunk in completion:
delta = chunk.choices[0].delta
# 收集思考内容
if hasattr(delta, "reasoning_content") and delta.reasoning_content:
thinking_content += delta.reasoning_content
# 收集响应内容
if hasattr(delta, "content") and delta.content:
response_content += delta.content
except Exception as e:
# 错误处理
response_content = f"调用LLM时出错{str(e)}"
return thinking_content, response_content
def set_config(self, config: LLMConfig):
"""更新LLM配置"""
self.config = config
self._init_client()

View File

@ -0,0 +1,41 @@
你是一个收集并解读楼市众生相的观察者。每天从全国各地的购房故事里,抽取出“当下楼市最真实的情绪信号”,分享给你的粉丝。你不唱多不唱空,只是让故事本身说话。
请根据用户提供的购房故事素材按照以下步骤生成一篇微头条并以JSON格式输出。
## 第一步:素材筛选
根据提供的素材,分析其是否符合发布标准:
- 是否有普遍共鸣?
- 是否有情绪张力?
- 是否有信息增量?
在输出JSON中需包含“素材分析”字段简要说明理由。
## 第二步:撰写标题
从以下三个标题模板中选择最合适的一个(也可微调),并说明选择理由:
1. “[情绪钩子] + [具体信息] + [留白/反问]” 示例:“买完房三天,同小区冒出套更便宜更好的”:这位女孩的遭遇,评论区炸了。
2. “刚买房就亏13万是什么体验这个广东女生的帖子看得人又笑又想哭。”
3. “我好像被贝壳耍了”:一个深圳女孩的买房后悔日记。
在JSON中输出所选标题。
## 第三步:构建正文
按照以下四段式结构撰写正文,每段内容需贴合素材,语言生动真实。
- 第1段设问/引入,建立“观察者”视角。
- 第2段讲故事保留原帖语气适当精简
- 第3段加入评论区的声音制造互动感
- 第4段你的观察保持理性不煽动
在JSON中输出正文可分段列出。
## 第四步:人设检查
在生成内容后检查是否符合以下人设要求并在JSON中输出布尔值
- 开头是否用了“观察者”口吻?
- 转述故事时,是否保留了原帖的真实感?
- 结尾是否有自己的理性洞察?
- 是否引导了互动?
## 输出格式要求
请将最终结果以JSON格式输出包含以下字段
- material_analysis对象包含universal_resonance字符串、emotional_tension字符串、info_increment字符串
- title字符串所选标题。
- body数组正文的四个段落每个段落为字符串。
- persona_check对象包含observer_perspective布尔、authenticity布尔、rational_insight布尔、interaction_guidance布尔
确保JSON格式正确无多余字符。

View File

@ -0,0 +1,71 @@
# 核心定位
你是一个收集并解读楼市众生相的观察者。每天从全国各地的购房故事里,抽取出“当下楼市最真实的情绪信号”,分享给你的粉丝。你不唱多不唱空,只是让故事本身说话。
# 第一步:素材筛选标准(什么故事值得发?)
不是所有帖子都值得写成微头条。选素材时,问自己三个问题:
是否有普遍共鸣? 这个故事是孤例,还是很多人正在经历的?(如:买完就降价、卖不掉房、谈价拉扯)
是否有情绪张力? 读者看完会“代入”吗?会想起自己或身边人吗?
是否有信息增量? 故事里有没有具体细节(价格、户型、城市、谈判过程)?
根据你给的素材:
✅ 普遍共鸣:买完就降价,几乎每个近年买房的人都怕遇到。
✅ 情绪张力从“有根了”的喜悦到“亏了13万”的心痛再到“算了拉倒”的释然。
✅ 信息增量87平、南北通透、三房两卫、降价13万、22万装修应为22万首付或总价原文可能有笔误
结论:这是一个值得发的好素材。
# 第二步撰写标题3选1
标题公式: [情绪钩子] + [具体信息] + [留白/反问]
根据你的素材,以下标题供选用:
1. “买完房三天,同小区冒出套更便宜更好的”:这位女孩的遭遇,评论区炸了。
2. 刚买房就亏13万是什么体验这个广东女生的帖子看得人又笑又想哭。
3. “我好像被贝壳耍了”:一个深圳女孩的买房后悔日记。
选择建议:
想引发共鸣 → 选1
想激发好奇心 → 选2
想突出真实感 → 选3
# 第三步正文结构4段式可灵活调整
第1段设问/引入,建立“观察者”视角
你有没有想过,买房后最难受的时刻是什么?
不是还贷压力大,也不是房子降价了。
而是——你刚签完合同,同小区就挂出一套户型更好、价格更便宜的房子。
昨晚刷到一个帖子,看完心里挺不是滋味的。
第2段讲故事保留原帖语气适当精简
发帖的是一位广东女孩网名叫momo。
她说,自己攒了很久的钱,终于买下一套房。签完约那几天,心里美滋滋的,觉得自己“有根了”,再也不用像蒲公英一样飘着。
结果三天后她刷贝壳发现同小区新挂出一套房——87平、南北通透、三房两卫户型比她那个更好总价还便宜13万。
她说:
“我买房的时候,怎么砍价都砍不下来,中介说业主不缺钱。我一买完,新房就出来了,好像是等着我似的……”
“我平时买菜都斤斤计较这一下亏掉13万够我装修了。”
第3段加入评论区的声音制造互动感
评论区里,有人安慰她:
“你跟21、22年高位上车的人比已经赚了几十万了。”
momo回你还真会安慰人。
也有人苦笑:
“我23年底上车的后面都麻木了。”
还有人怀疑:不会是贝壳算法在搞鬼吧?故意先放差房源,再放好房源?
第4段你的观察保持理性不煽动
其实momo的遭遇不是个例。
现在的楼市,处于一个微妙期:
卖家心态分化,有人急售降价,有人还在硬扛;
买家只能在“当下挂牌”里选,看不到两天后才会出现的那套。
这不是谁在耍谁,而是市场流动性恢复后的正常现象。
就像炒股买在阶段性高点——只要你不卖,浮亏就不是真亏。
momo最后说了一句话还挺打动我的
“虽然亏了,但我有自己的家了,再也不用搬家了。算了拉倒!”
也许,这才是房子最大的意义。
# 第四步:固定人设检查
- 开头是否用了“观察者”口吻?(“刷到一个帖子”“看到一个故事”)
- 转述故事时,是否保留了原帖的真实感?(尽量用原话)
- 结尾是否有自己的理性洞察?(不煽动焦虑,不唱多空)
- 是否引导了互动?(“你遇到过吗?”“评论区聊聊”)

2
models/__init__.py Normal file
View File

@ -0,0 +1,2 @@
from models.source_content import SourceContent
from models.article import Article

50
models/article.py Normal file
View File

@ -0,0 +1,50 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Text, Integer, DateTime, func
from models.base import Base
class Article(Base):
__tablename__ = "t_article"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
comment="自动递增的唯一内容ID"
)
title: Mapped[str] = mapped_column(
String(256),
nullable=False,
index=True,
comment="标题"
)
keywords: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="关键词"
)
content: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="内容"
)
create_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
comment="创建时间"
)
used: Mapped[bool] = mapped_column(
default=False,
nullable=False,
comment="是否已被使用"
)
def __repr__(self):
return f"<Article id={self.id} title={self.title!r} used={self.used!r}>"

4
models/base.py Normal file
View File

@ -0,0 +1,4 @@
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass

57
models/source_content.py Normal file
View File

@ -0,0 +1,57 @@
from datetime import datetime
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Text, Integer, DateTime, Index, func
from models.base import Base
class SourceContent(Base):
__tablename__ = "t_source_content"
id: Mapped[int] = mapped_column(
Integer,
primary_key=True,
autoincrement=True,
comment="自动递增的唯一内容ID"
)
link: Mapped[str] = mapped_column(
String(2048),
nullable=False,
index=True,
comment="链接"
)
platform: Mapped[str] = mapped_column(
String(32),
nullable=False,
comment="平台"
)
content: Mapped[str | None] = mapped_column(
Text,
nullable=True,
comment="内容"
)
create_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
comment="创建时间"
)
update_time: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
comment="更新时间"
)
# ——可选优化:添加 项目 + 主题 联合唯一索引——
__table_args__ = (
Index("link", "link", unique=True),
)
def __repr__(self):
return f"<SourceContent id={self.id} link={self.link!r} platform={self.platform!r}>"

Binary file not shown.

View File

@ -0,0 +1,42 @@
from datetime import datetime
import json
from task.manager_task import execute_task
from config.database import SessionLocal
from models import SourceContent, Article
from utils import logger
from llm import LLMThinkingEngine
def story_edit_task():
with SessionLocal() as db:
# 获取今天的所有帖子信息
today_contents = db.query(SourceContent).filter(
SourceContent.create_time >= datetime.today().replace(hour=0, minute=0, second=0, microsecond=0)
).limit(10).all()
if len(today_contents) == 0:
logger.info("story_edit_task finish, content size 0")
return
logger.info(f"story_edit_task get {len(today_contents)} contents")
llm_engine = LLMThinkingEngine(system_prompt_file="real_estate_story_system_prompt.txt")
for content in today_contents:
logger.info(f"story_edit_task content id: {content.id}, title: {content.link}, platform: {content.platform}")
story = llm_engine.think(f"故事素材:{content.content}")
logger.info(f"story_edit_task content id: {content.id} story: {story}")
# 将生成的故事写入Article表
json_story = json.loads(story)
title = json_story.get("title", "无标题")
paragraphs = json_story.get("body", ["无内容"])
article_content = "\n".join(paragraphs)
article = Article(
title=title,
keywords=None,
content=article_content,
used=False
)
db.add(article)
db.commit()
# break # 目前先处理一条内容,后续再改成批量处理
if __name__ == "__main__":
execute_task(story_edit_task)

View File

@ -7,7 +7,7 @@ from apscheduler.schedulers.blocking import BlockingScheduler
from config import config
from database.database import get_session
from database.tscheduler.crud import get_tasks_by_executor
from log.log_manager import log
from utils import logger
"""
这是一个特殊的任务,负责管理任务,命名为管理者任务。
@ -26,10 +26,10 @@ def log_task_execution(task_name: str, start_time: float, end_time: float = None
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
if end_time is None:
log(f"{task_name} start execute at {start_time_str}")
logger.info(f"{task_name} start execute at {start_time_str}")
else:
elapsed_time = end_time - start_time
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
logger.info(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
def execute_task(task: callable):
@ -65,7 +65,7 @@ def load_tasks(scheduler: BlockingScheduler):
id=str(task_id),
replace_existing=True
)
log(f"Task {task.task_name} added with interval {interval_seconds} seconds")
logger.info(f"Task {task.task_name} added with interval {interval_seconds} seconds")
elif trigger == "cron":
# 解析 cron 表达式的字段
fields = task.cron_expression.split()
@ -89,7 +89,7 @@ def load_tasks(scheduler: BlockingScheduler):
id=str(task_id),
replace_existing=True
)
log(f"Task {task.task_name} added with cron {task.cron_expression}")
logger.info(f"Task {task.task_name} added with cron {task.cron_expression}")
elif trigger == "date":
scheduler.add_job(
task_function,
@ -98,9 +98,9 @@ def load_tasks(scheduler: BlockingScheduler):
id=str(task_id),
replace_existing=True
)
log(f"Task {task.task_name} added with date {task.execution_date}")
logger.info(f"Task {task.task_name} added with date {task.execution_date}")
else:
log(f"Invalid trigger type: {trigger}")
logger.warning(f"Invalid trigger type: {trigger}")
# 管理者任务

1
utils/__init__.py Normal file
View File

@ -0,0 +1 @@
from utils.logger import logger

37
utils/logger.py Normal file
View File

@ -0,0 +1,37 @@
import sys
import os
from loguru import logger
from config.settings import settings
# 移除默认的 handler否则重复输出
logger.remove()
if "console" in settings.LOG_TYPE:
# ======== 控制台输出 ========
logger.add(
sys.stdout,
level=settings.LOG_LEVEL,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> "
"| <level>{level: <8}</level> "
"| <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> "
"- <level>{message}</level>",
)
if "file" in settings.LOG_TYPE:
# 日志目录
LOG_DIR = settings.LOG_FILE_PATH
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# ======== 文件输出(按天切割)========
logger.add(
f"{LOG_DIR}/app_{{time:YYYY-MM-DD}}.log",
rotation="00:00", # 每天 0 点切割
retention="7 days", # 保存 7 天
encoding="utf-8",
level=settings.LOG_LEVEL,
enqueue=True, # 多线程安全
compression="zip", # 自动压缩旧日志
)
__all__ = ["logger"]