commit 2c8426d543fcac16100fac2cf0cf788eee90bbb9 Author: konjacpotato Date: Wed Nov 5 21:00:19 2025 +0800 import arlo diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..466f7b3 --- /dev/null +++ b/Readme.md @@ -0,0 +1,7 @@ +# Arlo(阿洛) + +Arlo is a postman responsible for distribution tasks. + +## requirement + +- python: 3.12 diff --git a/arlo.py b/arlo.py new file mode 100644 index 0000000..bd40ff4 --- /dev/null +++ b/arlo.py @@ -0,0 +1,36 @@ +import datetime +from functools import partial + +from apscheduler.events import EVENT_JOB_ERROR +from apscheduler.schedulers.blocking import BlockingScheduler + +from config import config +from log.log_manager import log, logger +from task.manager_task import manager_task + + +def job_error_listener(event): + if event.exception: + logger.error(f"Job {event.job_id} crashed: {str(event.exception)}") + # 可添加邮件/钉钉告警逻辑 + + +if __name__ == '__main__': + scheduler = BlockingScheduler() + # 每隔config.scheduler_interval秒执行一次任务,同时设定第一次执行在程序启动后10秒后执行 + scheduler.add_job( + partial(manager_task, scheduler), + 'interval', + seconds=config.scheduler_interval, + jitter=30, # 添加随机抖动避免任务雪崩 + next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=10) # 替代 date 触发器 + ) + + # 添加任务错误监听器 + scheduler.add_listener(job_error_listener, EVENT_JOB_ERROR) + + try: + log("started successfully.") + scheduler.start() # 阻塞运行 + except (KeyboardInterrupt, SystemExit): + log("Shutting down ...") diff --git a/channel/__init__.py b/channel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channel/__pycache__/__init__.cpython-312.pyc b/channel/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..516b400 Binary files /dev/null and b/channel/__pycache__/__init__.cpython-312.pyc differ diff --git a/channel/toutiao/__init__.py b/channel/toutiao/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/channel/toutiao/__pycache__/__init__.cpython-312.pyc b/channel/toutiao/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..cdeaa39 Binary files /dev/null and b/channel/toutiao/__pycache__/__init__.cpython-312.pyc differ diff --git a/channel/toutiao/__pycache__/toutiao.cpython-312.pyc b/channel/toutiao/__pycache__/toutiao.cpython-312.pyc new file mode 100644 index 0000000..390e4fb Binary files /dev/null and b/channel/toutiao/__pycache__/toutiao.cpython-312.pyc differ diff --git a/channel/toutiao/toutiao.py b/channel/toutiao/toutiao.py new file mode 100644 index 0000000..3ad4a7d --- /dev/null +++ b/channel/toutiao/toutiao.py @@ -0,0 +1,82 @@ +from DrissionPage import Chromium, ChromiumOptions +from DrissionPage.errors import ElementNotFoundError + +from database.tcontentdispatch.model import TContentDispatch +from log.log_manager import log + + +class Toutiao: + def __init__(self, article: TContentDispatch): + self.article = article + co = ChromiumOptions().auto_port() # auto_port后需要重新登录账号 + # co = ChromiumOptions() + self.browser = Chromium(addr_or_opts=co) + self.tab = self.browser.latest_tab + self.tab.get('https://mp.toutiao.com/') + + def need_login(self): + self.tab.wait(5) + try: + login_dialog = self.tab.ele('.login-wrap') + log(login_dialog.html) + return True + except ElementNotFoundError: + return False + + def login_with_password(self): + try: + # 1. 点击‘密码登录’按钮,切换到密码登录界面 + password_login_btn = self.tab.ele('.web-login-other-login-method__list__item__icon web-login-other-login-method__list__item__icon__account_pwd') + password_login_btn.click() + # 2. 输入账号 + username_input_box = self.tab.ele('.web-login-normal-input__input') + username_input_box.input('17704081680') + # 3. 输入密码 + password_input_box = self.tab.ele('.web-login-button-input__input') + password_input_box.input('G*9dkvm834;.,[') + # 4. 勾选同意协议 + agree_checkbox = self.tab.ele('.web-login-confirm-info__checkbox') + agree_checkbox.click() + # 5. 点击登录按钮 + login_btn = self.tab.ele('.web-login-button') + login_btn.click() + except ElementNotFoundError: + log('尝试进行账号登录,但登录界面元素未找到') + + + def publish(self): + try: + # 1. 处理登录 + if self.need_login(): + self.login_with_password() + # 2. 新建文章 + new_article_btn = self.tab.ele('.byte-menu-inline base_creation_tab').ele('.byte-menu-item') + new_article_btn.click() + # 3. 输入标题 + title_input_box = self.tab.ele('.editor-title autofit-textarea-wrapper').ele('tag:textarea') + title_input_box.input(self.article.title) + # 4. 输入正文 + content_input_area = self.tab.ele('.ProseMirror') + if self.article.ai_content: + content_input_area.input(self.article.ai_content) + else: + content_input_area.input(self.article.content) + # 5. 等待10秒,文章会自动保存到草稿箱 + self.tab.wait(10) + except (ElementNotFoundError, AttributeError): + log('发布文章出现异常') + finally: + # 6. 结束 + self.finish() + + def finish(self): + # 关闭浏览器 + self.browser.quit() + + +if __name__ == '__main__': + article = TContentDispatch() + article.title = '今日新鲜事' + article.content = '当地时间29日,阿塞拜疆总统阿利耶夫称,阿克套空难原因系飞机“遭受来自地面的攻击受损”,飞机在俄境内格罗兹尼附近尾部遭地面射击严重破坏且失去控制。' + toutiao = Toutiao(article) + toutiao.publish() \ No newline at end of file diff --git a/config/__init__.py b/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/config/__pycache__/__init__.cpython-312.pyc b/config/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..596586d Binary files /dev/null and b/config/__pycache__/__init__.cpython-312.pyc differ diff --git a/config/__pycache__/config.cpython-312.pyc b/config/__pycache__/config.cpython-312.pyc new file mode 100644 index 0000000..2f1f178 Binary files /dev/null and b/config/__pycache__/config.cpython-312.pyc differ diff --git a/config/config.py b/config/config.py new file mode 100644 index 0000000..8b1103a --- /dev/null +++ b/config/config.py @@ -0,0 +1,4 @@ +# scheduler name +scheduler_name = 'arlo' +# scheduler interval in seconds +scheduler_interval = 300 \ No newline at end of file diff --git a/database/Readme.md b/database/Readme.md new file mode 100644 index 0000000..3d63793 --- /dev/null +++ b/database/Readme.md @@ -0,0 +1,25 @@ +# 数据库模块 + +数据库模块主要分为三个部分:database.py、model.py、crud.py。 +- database.py:包含数据库连接和会话管理。 +- model.py:定义数据库模型(实体类),包括表结构、字段类型、约束等。 +- crud.py:定义数据库操作函数,包括增删改查等。 + +## 使用示例 + +```python +from database.database import get_session +from database.tscheduler.crud import get_task_by_id + +with get_session() as db: + task = get_task_by_id(db, 1) + print(task) + print(task.id) +``` + +## 新增数据表流程 + +新增数据表主要涉及模型定义和CRUD实现。 +1. 在database包下创建以数据表命名的包名,然后在里面创建model.py和crud.py。 +2. model.py 定义数据库模型 +3. crud.py 定义数据库操作函数 \ No newline at end of file diff --git a/database/__init__.py b/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/database/__pycache__/__init__.cpython-312.pyc b/database/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..926b441 Binary files /dev/null and b/database/__pycache__/__init__.cpython-312.pyc differ diff --git a/database/__pycache__/database.cpython-312.pyc b/database/__pycache__/database.cpython-312.pyc new file mode 100644 index 0000000..2f62fca Binary files /dev/null and b/database/__pycache__/database.cpython-312.pyc differ diff --git a/database/database.py b/database/database.py new file mode 100644 index 0000000..c80c549 --- /dev/null +++ b/database/database.py @@ -0,0 +1,37 @@ +from contextlib import contextmanager + +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, declarative_base + +from log.log_manager import logger + +Base = declarative_base() + +DATABASE_URL = 'postgresql+psycopg://postgres:K8u3fg0o@47.119.128.161:60001/squirrel' +engine = create_engine( + DATABASE_URL, + pool_size=10, + max_overflow=20, + pool_timeout=30, + pool_recycle=1800, # 防止数据库端连接过期 + connect_args={ + 'connect_timeout': 15, + 'keepalives_idle': 60, + 'keepalives_interval': 10, + 'keepalives_count': 5 + } +) +Base.metadata.create_all(engine) + +@contextmanager +def get_session(): + session = sessionmaker(bind=engine)() + try: + yield session + session.commit() # 自动提交成功的事务 + except Exception as e: + session.rollback() # 异常时回滚 + logger.error(f"Database operation failed: {str(e)}") + raise # 重新抛出异常 + finally: + session.close() # 确保会话关闭 \ No newline at end of file diff --git a/database/tcontentdispatch/__init__.py b/database/tcontentdispatch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/database/tcontentdispatch/__pycache__/__init__.cpython-312.pyc b/database/tcontentdispatch/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..06fd099 Binary files /dev/null and b/database/tcontentdispatch/__pycache__/__init__.cpython-312.pyc differ diff --git a/database/tcontentdispatch/__pycache__/curd.cpython-312.pyc b/database/tcontentdispatch/__pycache__/curd.cpython-312.pyc new file mode 100644 index 0000000..b8bc68a Binary files /dev/null and b/database/tcontentdispatch/__pycache__/curd.cpython-312.pyc differ diff --git a/database/tcontentdispatch/__pycache__/model.cpython-312.pyc b/database/tcontentdispatch/__pycache__/model.cpython-312.pyc new file mode 100644 index 0000000..af106e6 Binary files /dev/null and b/database/tcontentdispatch/__pycache__/model.cpython-312.pyc differ diff --git a/database/tcontentdispatch/curd.py b/database/tcontentdispatch/curd.py new file mode 100644 index 0000000..3581d45 --- /dev/null +++ b/database/tcontentdispatch/curd.py @@ -0,0 +1,69 @@ +from datetime import datetime, timedelta, timezone + +from sqlalchemy import desc + +from database.tcontentdispatch.model import TContentDispatch + + +def create_content(db, content: TContentDispatch): + db.add(content) + db.commit() + db.refresh(content) + return content + + +def create_or_update_content(db, content: TContentDispatch): + content_in_db = db.query(TContentDispatch).filter(TContentDispatch.id == content.id).first() + + if content_in_db: + # Update existing content + db.commit() # Save changes to the database + db.refresh(content) # Refresh the object with the updated values + else: + # Create new content if not found in DB + db.add(content) + db.commit() # Save new content to the database + db.refresh(content) # Refresh to get the content's updated state (e.g., ID if it's auto-generated) + + +def get_content_by_id(db, content_id: int): + return db.query(TContentDispatch).filter(TContentDispatch.id == content_id).first() + +def get_content_by_title_and_category(db, title: str, category: str): + return db.query(TContentDispatch).filter(TContentDispatch.title == title, TContentDispatch.category == category).first() + +def get_contents_to_dispatch(db) -> list[TContentDispatch]: + return db.query(TContentDispatch).filter(TContentDispatch.is_sent == False).all() + +def get_recent_24_hours_content_to_dispatch(db, category: str) -> TContentDispatch: + # 获取查询日期的起始和结束时间 + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(hours=24) + # 查询最近的一条未发送的内容 + return db.query(TContentDispatch).filter( + TContentDispatch.category == category, + # TContentDispatch.is_sent == False, + TContentDispatch.creation_date >= start_time, + TContentDispatch.creation_date < end_time + ).order_by(desc(TContentDispatch.creation_date)).first() + +def finish_contents_to_dispatch(db, ids): + db.query(TContentDispatch).filter(TContentDispatch.id.in_(ids)).update({'is_sent': True}) + db.commit() + +def update_content(db, content_id: int, updates: dict): + content = db.query(TContentDispatch).filter(TContentDispatch.id == content_id).first() + if content: + for key, value in updates.items(): + setattr(content, key, value) + db.commit() + db.refresh(content) + return content + + +def delete_content(db, content_id: int): + content = db.query(TContentDispatch).filter(TContentDispatch.id == content_id).first() + if content: + db.delete(content) + db.commit() + return content diff --git a/database/tcontentdispatch/model.py b/database/tcontentdispatch/model.py new file mode 100644 index 0000000..cc954e4 --- /dev/null +++ b/database/tcontentdispatch/model.py @@ -0,0 +1,33 @@ +from datetime import datetime + +from sqlalchemy import Column, Integer, String, Text, Boolean, TIMESTAMP, func +from sqlalchemy.dialects.postgresql import BIGINT +from dataclasses import dataclass +from database.database import Base + +@dataclass +class TContentDispatch(Base): + __tablename__ = 't_content_dispatch' + + id: int = Column(BIGINT, primary_key=True, autoincrement=True, comment='自动递增的唯一任务ID') + creation_date: datetime = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='记录数据条目的创建时间') + is_sent: bool = Column(Boolean, default=False, nullable=False, comment='表示数据条目是否已被发送') + category: str = Column(String(255), nullable=False, comment='类别') + title: str = Column(String(255), nullable=False, comment='标题') + cover_image: str = Column(Text, nullable=True, comment='封面') + poster_image: str = Column(Text, nullable=True, comment='海报') + opening_text: str = Column(Text, nullable=True, comment='开头语') + content: str = Column(Text, nullable=False, comment='内容') + ai_content: str = Column(Text, nullable=True, comment='AI编辑的内容') + closing_text: str = Column(Text, nullable=True, comment='结束语') + is_scheduled: bool = Column(Boolean, default=False, nullable=False, comment='是否设置为定时发送') + schedule_time: str = Column(TIMESTAMP, nullable=True, comment='定时发送的具体时间') + format: str = Column(String(50), nullable=True, comment='数据条目的格式') + ai_generate: int = Column(Integer, default=0, nullable=False, comment='是否AI生成。0否,1部分参与,2是。') + + def __repr__(self): + return f"" + + def get_creation_date_in_localtime(self) -> datetime: + # 将 UTC 时间转换为localtime + return self.creation_date.astimezone() \ No newline at end of file diff --git a/database/thealthknowledge/__init__.py b/database/thealthknowledge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/database/thealthknowledge/__pycache__/__init__.cpython-312.pyc b/database/thealthknowledge/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..1ec5b6d Binary files /dev/null and b/database/thealthknowledge/__pycache__/__init__.cpython-312.pyc differ diff --git a/database/thealthknowledge/__pycache__/health_knowledge.cpython-312.pyc b/database/thealthknowledge/__pycache__/health_knowledge.cpython-312.pyc new file mode 100644 index 0000000..eae32fe Binary files /dev/null and b/database/thealthknowledge/__pycache__/health_knowledge.cpython-312.pyc differ diff --git a/database/thealthknowledge/health_knowledge.py b/database/thealthknowledge/health_knowledge.py new file mode 100644 index 0000000..242d90e --- /dev/null +++ b/database/thealthknowledge/health_knowledge.py @@ -0,0 +1,38 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +from sqlalchemy import Column, String, TIMESTAMP, func, Boolean +from sqlalchemy.dialects.postgresql import BIGINT + +from database.database import Base + + +@dataclass +class THealthKnowledge(Base): + __tablename__ = 't_health_knowledge' + + id: int = Column(BIGINT, primary_key=True, autoincrement=True, comment='序号') + subject: Optional[str] = Column(String, nullable=True, comment='主题') + knowledge: Optional[str] = Column(String, nullable=True, comment='知识内容') + keywords: Optional[str] = Column(String, nullable=True, comment='关键字') + url: Optional[str] = Column(String, nullable=True, comment='链接') + is_used: bool = Column(Boolean, default=False, nullable=False, comment='表示数据条目是否已被使用') + create_time: datetime = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='创建时间') + + def __repr__(self): + return (f"") + +# 把未使用的数据按照create_time排序,获取最老的1条数据 +def get_oldest_unused_data(db): + unused_data = db.query(THealthKnowledge).filter(THealthKnowledge.is_used == False).order_by( + THealthKnowledge.create_time).first() + return unused_data + +# 把数据标记为已使用 +def mark_data_as_used(db, data): + data.is_used = True + db.commit() \ No newline at end of file diff --git a/database/tscheduler/__init__.py b/database/tscheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/database/tscheduler/__pycache__/__init__.cpython-312.pyc b/database/tscheduler/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..4b57285 Binary files /dev/null and b/database/tscheduler/__pycache__/__init__.cpython-312.pyc differ diff --git a/database/tscheduler/__pycache__/crud.cpython-312.pyc b/database/tscheduler/__pycache__/crud.cpython-312.pyc new file mode 100644 index 0000000..8ca16b4 Binary files /dev/null and b/database/tscheduler/__pycache__/crud.cpython-312.pyc differ diff --git a/database/tscheduler/__pycache__/model.cpython-312.pyc b/database/tscheduler/__pycache__/model.cpython-312.pyc new file mode 100644 index 0000000..9d4bae0 Binary files /dev/null and b/database/tscheduler/__pycache__/model.cpython-312.pyc differ diff --git a/database/tscheduler/crud.py b/database/tscheduler/crud.py new file mode 100644 index 0000000..6f11fce --- /dev/null +++ b/database/tscheduler/crud.py @@ -0,0 +1,35 @@ +from database.tscheduler.model import TScheduler + +def create_task(db, task: TScheduler): + db.add(task) + db.commit() + db.refresh(task) + return task + +def get_task_by_id(db, task_id: int): + return db.query(TScheduler).filter(TScheduler.id == task_id).first() + +def get_active_tasks(db): + return db.query(TScheduler).filter(TScheduler.active == True).all() + +def get_tasks_by_executor(db, executor: str): + return db.query(TScheduler).filter( + TScheduler.executor == executor, + TScheduler.active == True + ).all() + +def update_task(db, task_id: int, updates: dict): + task = db.query(TScheduler).filter(TScheduler.id == task_id).first() + if task: + for key, value in updates.items(): + setattr(task, key, value) + db.commit() + db.refresh(task) + return task + +def delete_task(db, task_id: int): + task = db.query(TScheduler).filter(TScheduler.id == task_id).first() + if task: + db.delete(task) + db.commit() + return task \ No newline at end of file diff --git a/database/tscheduler/model.py b/database/tscheduler/model.py new file mode 100644 index 0000000..7fb8b6c --- /dev/null +++ b/database/tscheduler/model.py @@ -0,0 +1,26 @@ +from dataclasses import dataclass +from datetime import datetime +from typing import Optional +from sqlalchemy import Column, Integer, String, Boolean, Text, DateTime +from database.database import Base + +@dataclass +class TScheduler(Base): + __tablename__ = 't_scheduler' + + id: int = Column(Integer, primary_key=True, autoincrement=True, comment='自动递增的唯一任务ID') + task_name: str = Column(String(64), nullable=False, comment='任务名称') + trigger: str = Column(String(10), nullable=False, comment='调度方式,interval、cron、date') + interval_seconds: Optional[int] = Column(Integer, nullable=True, comment='固定时间间隔(秒),用于 interval 类型') + cron_expression: Optional[str] = Column(String(255), nullable=True, comment='CRON 表达式,用于 cron 类型') + execution_date: Optional[datetime] = Column(DateTime, nullable=True, comment='执行时间,用于 date 类型') + task_payload: Optional[str] = Column(Text, nullable=True, comment='任务相关的参数或数据') + active: Optional[bool] = Column(Boolean, default=False, nullable=True, comment='任务状态,是否启用') + executor: Optional[str] = Column(String(32), nullable=True, comment='任务执行者') + handler: Optional[str] = Column(String(32), nullable=True, comment='任务执行程序') + last_run: Optional[datetime] = Column(DateTime, nullable=True, comment='上一次执行时间') + next_run: Optional[datetime] = Column(DateTime, nullable=True, comment='下一次执行时间') + create_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='创建时间') + update_time: datetime = Column(DateTime, default=datetime.utcnow, nullable=True, comment='更新时间') + module_path: Optional[str] = Column(String(255), nullable=True, comment='任务逻辑所在模块名称') + function_name: Optional[str] = Column(String(32), nullable=True, comment='任务逻辑的函数名称') \ No newline at end of file diff --git a/database/ttaskqueue/__init__.py b/database/ttaskqueue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/database/ttaskqueue/__pycache__/__init__.cpython-312.pyc b/database/ttaskqueue/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..6f83dc7 Binary files /dev/null and b/database/ttaskqueue/__pycache__/__init__.cpython-312.pyc differ diff --git a/database/ttaskqueue/__pycache__/curd.cpython-312.pyc b/database/ttaskqueue/__pycache__/curd.cpython-312.pyc new file mode 100644 index 0000000..3374265 Binary files /dev/null and b/database/ttaskqueue/__pycache__/curd.cpython-312.pyc differ diff --git a/database/ttaskqueue/__pycache__/model.cpython-312.pyc b/database/ttaskqueue/__pycache__/model.cpython-312.pyc new file mode 100644 index 0000000..b0391c0 Binary files /dev/null and b/database/ttaskqueue/__pycache__/model.cpython-312.pyc differ diff --git a/database/ttaskqueue/curd.py b/database/ttaskqueue/curd.py new file mode 100644 index 0000000..0aceed0 --- /dev/null +++ b/database/ttaskqueue/curd.py @@ -0,0 +1,21 @@ +from database.ttaskqueue.model import TTaskQueue + + +def create_task(db, task: TTaskQueue): + db.add(task) + db.commit() + db.refresh(task) + return task + +def get_tasks_to_finish(db) -> list[TTaskQueue]: + return db.query(TTaskQueue).filter(TTaskQueue.finished == False).all() + +def finish_task(db, task_id: int): + task = db.query(TTaskQueue).filter(TTaskQueue.id == task_id).first() + if task: + task.finished = True + db.commit() + db.refresh(task) + return task + + diff --git a/database/ttaskqueue/model.py b/database/ttaskqueue/model.py new file mode 100644 index 0000000..0b593f4 --- /dev/null +++ b/database/ttaskqueue/model.py @@ -0,0 +1,18 @@ +from datetime import datetime + +from sqlalchemy import Column, String, Boolean, TIMESTAMP, func +from sqlalchemy.dialects.postgresql import BIGINT +from dataclasses import dataclass +from database.database import Base + +@dataclass +class TTaskQueue(Base): + __tablename__ = 't_task_queue' + + id: int = Column(BIGINT, primary_key=True, autoincrement=True, comment='自动递增的唯一任务ID') + create_time: datetime = Column(TIMESTAMP(timezone=True), server_default=func.now(), nullable=False, comment='任务创建时间') + task_name: str = Column(String, nullable=True, comment='任务名称') + module_path: str = Column(String, nullable=True, comment='任务模块路径') + function_name: str = Column(String, nullable=True, comment='任务函数名称') + scheduler: str = Column(String, nullable=True, comment='任务执行者名称') + finished: bool = Column(Boolean, default=False, nullable=False, comment='任务是否执行完成') diff --git a/log/__init__.py b/log/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/log/__pycache__/__init__.cpython-312.pyc b/log/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..de331d3 Binary files /dev/null and b/log/__pycache__/__init__.cpython-312.pyc differ diff --git a/log/__pycache__/log_manager.cpython-312.pyc b/log/__pycache__/log_manager.cpython-312.pyc new file mode 100644 index 0000000..4800725 Binary files /dev/null and b/log/__pycache__/log_manager.cpython-312.pyc differ diff --git a/log/log_manager.py b/log/log_manager.py new file mode 100644 index 0000000..d7b5dd5 --- /dev/null +++ b/log/log_manager.py @@ -0,0 +1,70 @@ +import logging.config +import sys + +import config.config + +""" +Usage: +1 code +from log.log_manager import logger +logger.info("Starting Jarvas") +2 app start +python demo.py --logconfig=log_prod.config + +当前目录下的log_prod.config是一份参考配置 +""" + +# default logging config for development +LOG_DEV_CONFIG = { + "version": 1, + "disable_existing_loggers": False, + "loggers": { + "root": { + "level": "INFO", + "handlers": ["consoleHandler"] + } + }, + "handlers": { + "consoleHandler": { + "class": "logging.StreamHandler", + "level": "INFO", + "formatter": "verbose", + "stream": sys.stdout + } + }, + "formatters": { + "verbose": { + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", + "datefmt": "%Y-%m-%d %H:%M:%S" + } + } +} + +log_config_message = "" +# 获取命令行参数 +args = sys.argv + +# 查找包含 'logconfig' 的参数 +logconfig_param = next((arg for arg in args if '--logconfig' in arg), None) +logconfig_value = None +if logconfig_param: + # 如果找到了 logconfig 参数,提取其值 + _, logconfig_value = logconfig_param.split('=') # 以 '=' 分割 + log_config_message = f"--logconfig value: {logconfig_value}" +else: + log_config_message = "没有找到 --logconfig 参数,使用默认log配置" + +if logconfig_value: + # 使用入参日志配置文件 + logging.config.fileConfig(logconfig_value) +else: + # 使用默认日志配置 + logging.config.dictConfig(LOG_DEV_CONFIG) + +logger = logging.getLogger('root') +# 打印日志配置信息 +logger.info(log_config_message) + + +def log(message: str): + logger.info(f'{config.config.scheduler_name} {message}') diff --git a/log/log_prod.config b/log/log_prod.config new file mode 100644 index 0000000..fe2255b --- /dev/null +++ b/log/log_prod.config @@ -0,0 +1,22 @@ +[loggers] +keys=root + +[handlers] +keys=fileHandler + +[formatters] +keys=verbose + +[logger_root] +level=INFO +handlers=fileHandler + +[handler_fileHandler] +class=FileHandler +level=INFO +formatter=verbose +args=('app.log', 'a') + +[formatter_verbose] +format=%(asctime)s - %(name)s - %(levelname)s - %(message)s +datefmt=%Y-%m-%d %H:%M:%S diff --git a/mail/__init__.py b/mail/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mail/__pycache__/__init__.cpython-312.pyc b/mail/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..38e93f3 Binary files /dev/null and b/mail/__pycache__/__init__.cpython-312.pyc differ diff --git a/mail/__pycache__/mail_manager.cpython-312.pyc b/mail/__pycache__/mail_manager.cpython-312.pyc new file mode 100644 index 0000000..565e92c Binary files /dev/null and b/mail/__pycache__/mail_manager.cpython-312.pyc differ diff --git a/mail/mail_manager.py b/mail/mail_manager.py new file mode 100644 index 0000000..33e5fd7 --- /dev/null +++ b/mail/mail_manager.py @@ -0,0 +1,37 @@ +import smtplib +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart + +from log.log_manager import log + + +class MailManager: + def __init__(self, email: str = "1026090807@qq.com", password: str = "hqorvminmnuubebf"): + self.sender_email = email + self.server = smtplib.SMTP('smtp.qq.com', 587) + self.server.starttls() # Secure the connection + self.server.login(email, password) # Login with your email and password + + + def send_mail(self, subject: str, content: str, receiver_email:str = "changsongd@126.com"): + message = MIMEMultipart() + message['From'] = self.sender_email + message['To'] = receiver_email + message['Subject'] = subject + message.attach(MIMEText(content, 'plain')) + text = message.as_string() + try: + self.server.sendmail(self.sender_email, receiver_email, text) + except Exception as e: + log(f"send mail failed with error {e}. subject: {subject}") + + def finish(self): + self.server.quit() + +def send_mail(subject: str, content: str, receiver_email:str = None): + mail_manager = MailManager() + if receiver_email is None: + mail_manager.send_mail(subject, content) + else: + mail_manager.send_mail(subject, content, receiver_email) + mail_manager.finish() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..32b3743 Binary files /dev/null and b/requirements.txt differ diff --git a/task/__init__.py b/task/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/task/__pycache__/__init__.cpython-312.pyc b/task/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..ea6fdc6 Binary files /dev/null and b/task/__pycache__/__init__.cpython-312.pyc differ diff --git a/task/__pycache__/manager_task.cpython-312.pyc b/task/__pycache__/manager_task.cpython-312.pyc new file mode 100644 index 0000000..dd3615f Binary files /dev/null and b/task/__pycache__/manager_task.cpython-312.pyc differ diff --git a/task/article_publish/__init__.py b/task/article_publish/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/task/article_publish/__pycache__/__init__.cpython-312.pyc b/task/article_publish/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..21d706f Binary files /dev/null and b/task/article_publish/__pycache__/__init__.cpython-312.pyc differ diff --git a/task/article_publish/__pycache__/article_publish.cpython-312.pyc b/task/article_publish/__pycache__/article_publish.cpython-312.pyc new file mode 100644 index 0000000..93c0654 Binary files /dev/null and b/task/article_publish/__pycache__/article_publish.cpython-312.pyc differ diff --git a/task/article_publish/article_publish.py b/task/article_publish/article_publish.py new file mode 100644 index 0000000..e154887 --- /dev/null +++ b/task/article_publish/article_publish.py @@ -0,0 +1,38 @@ +from channel.toutiao.toutiao import Toutiao +from database.database import get_session +from database.tcontentdispatch.curd import get_recent_24_hours_content_to_dispatch +from database.tscheduler.model import TScheduler +from database.ttaskqueue.curd import create_task +from database.ttaskqueue.model import TTaskQueue +from log.log_manager import log +from task.manager_task import execute_task + + +def article_publish(): + # 1. 从数据库获取文章 + with get_session() as db: + # 2. 获取所有未发布的文章 + article = get_recent_24_hours_content_to_dispatch(db, '新鲜事') + if article: + # 3. toutiao发布文章 + toutiao = Toutiao(article) + toutiao.publish() + # 4. 打印日志 + log(f'publish article {article.title} to toutiao success with article id: {article.id} and article time: {article.get_creation_date_in_localtime()}') + + +def article_publish_task(): + execute_task(article_publish) + +def article_publish_use_task_queue(scheduler: TScheduler): + with get_session() as db: + task = TTaskQueue() + task.task_name = 'toutiao_article_publish' + task.module_path = scheduler.module_path + task.function_name = scheduler.handler + task.scheduler = scheduler.task_name + create_task(db, task) + + +if __name__ == '__main__': + article_publish_task() diff --git a/task/health_knowledge/__init__.py b/task/health_knowledge/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/task/health_knowledge/__pycache__/__init__.cpython-312.pyc b/task/health_knowledge/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..39c2f49 Binary files /dev/null and b/task/health_knowledge/__pycache__/__init__.cpython-312.pyc differ diff --git a/task/health_knowledge/__pycache__/health_knowledge.cpython-312.pyc b/task/health_knowledge/__pycache__/health_knowledge.cpython-312.pyc new file mode 100644 index 0000000..d85f77f Binary files /dev/null and b/task/health_knowledge/__pycache__/health_knowledge.cpython-312.pyc differ diff --git a/task/health_knowledge/health_knowledge.py b/task/health_knowledge/health_knowledge.py new file mode 100644 index 0000000..f038ec0 --- /dev/null +++ b/task/health_knowledge/health_knowledge.py @@ -0,0 +1,21 @@ +from database.database import get_session +from database.thealthknowledge.health_knowledge import get_oldest_unused_data, mark_data_as_used +from database.tscheduler.model import TScheduler +from log.log_manager import log +from mail.mail_manager import send_mail + + +def send_health_knowledge_mail_task(scheduler: TScheduler): + with get_session() as db: + # 获取需要发送的内容列表 + health_knowledge = get_oldest_unused_data(db) + # 发送邮件 + subject = health_knowledge.subject + content = health_knowledge.knowledge + send_mail(subject, content, receiver_email="lu9531@126.com") + log(f"send mail success with title {subject}, content {content[:20]}.") + # 更新数据库 + mark_data_as_used(db, health_knowledge) + +if __name__ == '__main__': + send_health_knowledge_mail_task(TScheduler()) \ No newline at end of file diff --git a/task/manager_task.py b/task/manager_task.py new file mode 100644 index 0000000..00882db --- /dev/null +++ b/task/manager_task.py @@ -0,0 +1,114 @@ +import functools +import importlib +import time +from datetime import datetime, timedelta +from functools import partial + +from apscheduler.schedulers.blocking import BlockingScheduler + +from config import config +from database.database import get_session +from database.tscheduler.crud import get_tasks_by_executor +from log.log_manager import log + +""" +这是一个特殊的任务,负责管理任务,命名为管理者任务。 + +工作流程: +1 检索数据库任务数据表 +2 检查是否已经在任务队列中,如果不在则添加 + +任务执行时间间隔为600秒。 + +""" + + +def log_task_execution(task_name: str, start_time: float, end_time: float = None): + """辅助函数,记录任务的开始和结束日志""" + start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)) + end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time)) + if end_time is None: + log(f"{task_name} start execute at {start_time_str}") + else: + elapsed_time = end_time - start_time + log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds") + + +def execute_task(task: callable): + """执行任务并记录日志""" + start_time = time.time() + task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__ + log_task_execution(task_name, start_time) # 先记录开始时间 + task() + end_time = time.time() + log_task_execution(task_name, start_time, end_time) # 记录结束时间 + +# 从数据库加载任务 +def load_tasks(scheduler: BlockingScheduler): + with get_session() as db: + tasks = get_tasks_by_executor(db, config.scheduler_name) + for task in tasks: + module_path = task.module_path + function_name = task.function_name + trigger = task.trigger + interval_seconds = task.interval_seconds + task_id = task.id + + # 动态导入模块和函数 + module = importlib.import_module(module_path) + task_function = partial(execute_task, partial(getattr(module, function_name), task)) + + # 检查任务是否已存在 + if not scheduler.get_job(str(task_id)): + if trigger == "interval": + scheduler.add_job( + task_function, + "interval", + seconds=interval_seconds, + id=str(task_id), + replace_existing=True + ) + log(f"Task {task.task_name} added with interval {interval_seconds} seconds") + elif trigger == "cron": + # 解析 cron 表达式的字段 + fields = task.cron_expression.split() + # 确保字段长度符合七字段格式 + if len(fields) != 7: + raise ValueError("无效的 Quartz cron 表达式") + # 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*` + if fields[5] == '?': + fields[5] = '*' # 替换 `day_of_week` 字段中的 `?` + # 使用 cron 表达式的字段添加任务 + scheduler.add_job( + task_function, # 要执行的任务 + 'cron', # 使用 cron 触发器 + second=fields[0], # 秒 + minute=fields[1], # 分钟 + hour=fields[2], # 小时 + day=fields[3], # 日期 + month=fields[4], # 月份 + day_of_week=fields[5], # 星期 + year=fields[6], # 年份 + id=str(task_id), + replace_existing=True + ) + log(f"Task {task.task_name} added with cron {task.cron_expression}") + elif trigger == "date": + # 如果task.execution_date为None,则设置为当前时间10秒后 + if task.execution_date is None : + task.execution_date = datetime.now() + timedelta(seconds=10) + scheduler.add_job( + task_function, + "date", + run_date=task.execution_date, + id=str(task_id), + replace_existing=True + ) + log(f"Task {task.task_name} added with date {task.execution_date}") + else: + log(f"Invalid trigger type: {trigger}") + + +# 管理者任务 +def manager_task(scheduler: BlockingScheduler): + load_tasks(scheduler) diff --git a/task/queue/__init__.py b/task/queue/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/task/queue/__pycache__/__init__.cpython-312.pyc b/task/queue/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..2657cd6 Binary files /dev/null and b/task/queue/__pycache__/__init__.cpython-312.pyc differ diff --git a/task/queue/__pycache__/task_queue.cpython-312.pyc b/task/queue/__pycache__/task_queue.cpython-312.pyc new file mode 100644 index 0000000..30f0fbe Binary files /dev/null and b/task/queue/__pycache__/task_queue.cpython-312.pyc differ diff --git a/task/queue/task_queue.py b/task/queue/task_queue.py new file mode 100644 index 0000000..18b1c26 --- /dev/null +++ b/task/queue/task_queue.py @@ -0,0 +1,29 @@ +import importlib +from functools import partial + +from database.database import get_session +from database.tscheduler.model import TScheduler +from database.ttaskqueue.curd import get_tasks_to_finish, finish_task +from log.log_manager import log +from task.manager_task import execute_task + + +def start_task_queue(scheduler: TScheduler): + with get_session() as db: + tasks = get_tasks_to_finish(db) + if len(tasks) > 0: + log(f'start task queue with task size {len(tasks)}') + for task in tasks: + # 1. 动态构建任务函数 + module = importlib.import_module(task.module_path) + task_function = partial(execute_task, getattr(module, task.function_name)) + # 2. 执行任务 + task_function() + # 3. 标记任务完成 + finish_task(db, task.id) + # 4. 打印日志 + log(f"{task} finish") + + +if __name__ == '__main__': + start_task_queue(TScheduler()) diff --git a/task/reference_message/__init__.py b/task/reference_message/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/task/reference_message/__pycache__/__init__.cpython-312.pyc b/task/reference_message/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..3f4a07d Binary files /dev/null and b/task/reference_message/__pycache__/__init__.cpython-312.pyc differ diff --git a/task/reference_message/__pycache__/reference_message.cpython-312.pyc b/task/reference_message/__pycache__/reference_message.cpython-312.pyc new file mode 100644 index 0000000..6705fac Binary files /dev/null and b/task/reference_message/__pycache__/reference_message.cpython-312.pyc differ diff --git a/task/reference_message/reference_message.py b/task/reference_message/reference_message.py new file mode 100644 index 0000000..33f9faa --- /dev/null +++ b/task/reference_message/reference_message.py @@ -0,0 +1,27 @@ +from database.database import get_session +from database.tcontentdispatch.curd import get_contents_to_dispatch, finish_contents_to_dispatch +from database.tscheduler.model import TScheduler +from log.log_manager import log +from mail.mail_manager import send_mail + + +def send_reference_message_mail_task(scheduler: TScheduler): + with get_session() as db: + # 获取需要发送的内容列表 + dispatch_contents = get_contents_to_dispatch(db) + # 发送邮件 + ids = [] + for dispatch_content in dispatch_contents: + subject = dispatch_content.title + content = dispatch_content.content + send_mail(subject, dispatch_content.content) + ids.append(dispatch_content.id) + log(f"send mail success with title {subject}, content {content[:20]}.") + if dispatch_content.ai_content: + send_mail(subject + '[AI]', dispatch_content.ai_content) + log(f"send ai content mail success with title {subject}, content {dispatch_content.ai_content[:20]}.") + # 更新数据库 + finish_contents_to_dispatch(db, ids) + +if __name__ == '__main__': + send_reference_message_mail_task(TScheduler()) \ No newline at end of file