import arlo
This commit is contained in:
0
task/__init__.py
Normal file
0
task/__init__.py
Normal file
BIN
task/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
task/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
task/__pycache__/manager_task.cpython-312.pyc
Normal file
BIN
task/__pycache__/manager_task.cpython-312.pyc
Normal file
Binary file not shown.
0
task/article_publish/__init__.py
Normal file
0
task/article_publish/__init__.py
Normal file
BIN
task/article_publish/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
task/article_publish/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
task/article_publish/__pycache__/article_publish.cpython-312.pyc
Normal file
BIN
task/article_publish/__pycache__/article_publish.cpython-312.pyc
Normal file
Binary file not shown.
38
task/article_publish/article_publish.py
Normal file
38
task/article_publish/article_publish.py
Normal file
@ -0,0 +1,38 @@
|
||||
from channel.toutiao.toutiao import Toutiao
|
||||
from database.database import get_session
|
||||
from database.tcontentdispatch.curd import get_recent_24_hours_content_to_dispatch
|
||||
from database.tscheduler.model import TScheduler
|
||||
from database.ttaskqueue.curd import create_task
|
||||
from database.ttaskqueue.model import TTaskQueue
|
||||
from log.log_manager import log
|
||||
from task.manager_task import execute_task
|
||||
|
||||
|
||||
def article_publish():
|
||||
# 1. 从数据库获取文章
|
||||
with get_session() as db:
|
||||
# 2. 获取所有未发布的文章
|
||||
article = get_recent_24_hours_content_to_dispatch(db, '新鲜事')
|
||||
if article:
|
||||
# 3. toutiao发布文章
|
||||
toutiao = Toutiao(article)
|
||||
toutiao.publish()
|
||||
# 4. 打印日志
|
||||
log(f'publish article {article.title} to toutiao success with article id: {article.id} and article time: {article.get_creation_date_in_localtime()}')
|
||||
|
||||
|
||||
def article_publish_task():
|
||||
execute_task(article_publish)
|
||||
|
||||
def article_publish_use_task_queue(scheduler: TScheduler):
|
||||
with get_session() as db:
|
||||
task = TTaskQueue()
|
||||
task.task_name = 'toutiao_article_publish'
|
||||
task.module_path = scheduler.module_path
|
||||
task.function_name = scheduler.handler
|
||||
task.scheduler = scheduler.task_name
|
||||
create_task(db, task)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
article_publish_task()
|
||||
0
task/health_knowledge/__init__.py
Normal file
0
task/health_knowledge/__init__.py
Normal file
BIN
task/health_knowledge/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
task/health_knowledge/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
Binary file not shown.
21
task/health_knowledge/health_knowledge.py
Normal file
21
task/health_knowledge/health_knowledge.py
Normal file
@ -0,0 +1,21 @@
|
||||
from database.database import get_session
|
||||
from database.thealthknowledge.health_knowledge import get_oldest_unused_data, mark_data_as_used
|
||||
from database.tscheduler.model import TScheduler
|
||||
from log.log_manager import log
|
||||
from mail.mail_manager import send_mail
|
||||
|
||||
|
||||
def send_health_knowledge_mail_task(scheduler: TScheduler):
|
||||
with get_session() as db:
|
||||
# 获取需要发送的内容列表
|
||||
health_knowledge = get_oldest_unused_data(db)
|
||||
# 发送邮件
|
||||
subject = health_knowledge.subject
|
||||
content = health_knowledge.knowledge
|
||||
send_mail(subject, content, receiver_email="lu9531@126.com")
|
||||
log(f"send mail success with title {subject}, content {content[:20]}.")
|
||||
# 更新数据库
|
||||
mark_data_as_used(db, health_knowledge)
|
||||
|
||||
if __name__ == '__main__':
|
||||
send_health_knowledge_mail_task(TScheduler())
|
||||
114
task/manager_task.py
Normal file
114
task/manager_task.py
Normal file
@ -0,0 +1,114 @@
|
||||
import functools
|
||||
import importlib
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from functools import partial
|
||||
|
||||
from apscheduler.schedulers.blocking import BlockingScheduler
|
||||
|
||||
from config import config
|
||||
from database.database import get_session
|
||||
from database.tscheduler.crud import get_tasks_by_executor
|
||||
from log.log_manager import log
|
||||
|
||||
"""
|
||||
这是一个特殊的任务,负责管理任务,命名为管理者任务。
|
||||
|
||||
工作流程:
|
||||
1 检索数据库任务数据表
|
||||
2 检查是否已经在任务队列中,如果不在则添加
|
||||
|
||||
任务执行时间间隔为600秒。
|
||||
|
||||
"""
|
||||
|
||||
|
||||
def log_task_execution(task_name: str, start_time: float, end_time: float = None):
|
||||
"""辅助函数,记录任务的开始和结束日志"""
|
||||
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time))
|
||||
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end_time))
|
||||
if end_time is None:
|
||||
log(f"{task_name} start execute at {start_time_str}")
|
||||
else:
|
||||
elapsed_time = end_time - start_time
|
||||
log(f"{task_name} end execute at {end_time_str}, use time {elapsed_time:.2f} seconds")
|
||||
|
||||
|
||||
def execute_task(task: callable):
|
||||
"""执行任务并记录日志"""
|
||||
start_time = time.time()
|
||||
task_name = task.func.__name__ if isinstance(task, functools.partial) else task.__name__
|
||||
log_task_execution(task_name, start_time) # 先记录开始时间
|
||||
task()
|
||||
end_time = time.time()
|
||||
log_task_execution(task_name, start_time, end_time) # 记录结束时间
|
||||
|
||||
# 从数据库加载任务
|
||||
def load_tasks(scheduler: BlockingScheduler):
|
||||
with get_session() as db:
|
||||
tasks = get_tasks_by_executor(db, config.scheduler_name)
|
||||
for task in tasks:
|
||||
module_path = task.module_path
|
||||
function_name = task.function_name
|
||||
trigger = task.trigger
|
||||
interval_seconds = task.interval_seconds
|
||||
task_id = task.id
|
||||
|
||||
# 动态导入模块和函数
|
||||
module = importlib.import_module(module_path)
|
||||
task_function = partial(execute_task, partial(getattr(module, function_name), task))
|
||||
|
||||
# 检查任务是否已存在
|
||||
if not scheduler.get_job(str(task_id)):
|
||||
if trigger == "interval":
|
||||
scheduler.add_job(
|
||||
task_function,
|
||||
"interval",
|
||||
seconds=interval_seconds,
|
||||
id=str(task_id),
|
||||
replace_existing=True
|
||||
)
|
||||
log(f"Task {task.task_name} added with interval {interval_seconds} seconds")
|
||||
elif trigger == "cron":
|
||||
# 解析 cron 表达式的字段
|
||||
fields = task.cron_expression.split()
|
||||
# 确保字段长度符合七字段格式
|
||||
if len(fields) != 7:
|
||||
raise ValueError("无效的 Quartz cron 表达式")
|
||||
# 替换 Quartz 风格的 `?` 为 APScheduler 可接受的 `*`
|
||||
if fields[5] == '?':
|
||||
fields[5] = '*' # 替换 `day_of_week` 字段中的 `?`
|
||||
# 使用 cron 表达式的字段添加任务
|
||||
scheduler.add_job(
|
||||
task_function, # 要执行的任务
|
||||
'cron', # 使用 cron 触发器
|
||||
second=fields[0], # 秒
|
||||
minute=fields[1], # 分钟
|
||||
hour=fields[2], # 小时
|
||||
day=fields[3], # 日期
|
||||
month=fields[4], # 月份
|
||||
day_of_week=fields[5], # 星期
|
||||
year=fields[6], # 年份
|
||||
id=str(task_id),
|
||||
replace_existing=True
|
||||
)
|
||||
log(f"Task {task.task_name} added with cron {task.cron_expression}")
|
||||
elif trigger == "date":
|
||||
# 如果task.execution_date为None,则设置为当前时间10秒后
|
||||
if task.execution_date is None :
|
||||
task.execution_date = datetime.now() + timedelta(seconds=10)
|
||||
scheduler.add_job(
|
||||
task_function,
|
||||
"date",
|
||||
run_date=task.execution_date,
|
||||
id=str(task_id),
|
||||
replace_existing=True
|
||||
)
|
||||
log(f"Task {task.task_name} added with date {task.execution_date}")
|
||||
else:
|
||||
log(f"Invalid trigger type: {trigger}")
|
||||
|
||||
|
||||
# 管理者任务
|
||||
def manager_task(scheduler: BlockingScheduler):
|
||||
load_tasks(scheduler)
|
||||
0
task/queue/__init__.py
Normal file
0
task/queue/__init__.py
Normal file
BIN
task/queue/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
task/queue/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
BIN
task/queue/__pycache__/task_queue.cpython-312.pyc
Normal file
BIN
task/queue/__pycache__/task_queue.cpython-312.pyc
Normal file
Binary file not shown.
29
task/queue/task_queue.py
Normal file
29
task/queue/task_queue.py
Normal file
@ -0,0 +1,29 @@
|
||||
import importlib
|
||||
from functools import partial
|
||||
|
||||
from database.database import get_session
|
||||
from database.tscheduler.model import TScheduler
|
||||
from database.ttaskqueue.curd import get_tasks_to_finish, finish_task
|
||||
from log.log_manager import log
|
||||
from task.manager_task import execute_task
|
||||
|
||||
|
||||
def start_task_queue(scheduler: TScheduler):
|
||||
with get_session() as db:
|
||||
tasks = get_tasks_to_finish(db)
|
||||
if len(tasks) > 0:
|
||||
log(f'start task queue with task size {len(tasks)}')
|
||||
for task in tasks:
|
||||
# 1. 动态构建任务函数
|
||||
module = importlib.import_module(task.module_path)
|
||||
task_function = partial(execute_task, getattr(module, task.function_name))
|
||||
# 2. 执行任务
|
||||
task_function()
|
||||
# 3. 标记任务完成
|
||||
finish_task(db, task.id)
|
||||
# 4. 打印日志
|
||||
log(f"{task} finish")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
start_task_queue(TScheduler())
|
||||
0
task/reference_message/__init__.py
Normal file
0
task/reference_message/__init__.py
Normal file
BIN
task/reference_message/__pycache__/__init__.cpython-312.pyc
Normal file
BIN
task/reference_message/__pycache__/__init__.cpython-312.pyc
Normal file
Binary file not shown.
Binary file not shown.
27
task/reference_message/reference_message.py
Normal file
27
task/reference_message/reference_message.py
Normal file
@ -0,0 +1,27 @@
|
||||
from database.database import get_session
|
||||
from database.tcontentdispatch.curd import get_contents_to_dispatch, finish_contents_to_dispatch
|
||||
from database.tscheduler.model import TScheduler
|
||||
from log.log_manager import log
|
||||
from mail.mail_manager import send_mail
|
||||
|
||||
|
||||
def send_reference_message_mail_task(scheduler: TScheduler):
|
||||
with get_session() as db:
|
||||
# 获取需要发送的内容列表
|
||||
dispatch_contents = get_contents_to_dispatch(db)
|
||||
# 发送邮件
|
||||
ids = []
|
||||
for dispatch_content in dispatch_contents:
|
||||
subject = dispatch_content.title
|
||||
content = dispatch_content.content
|
||||
send_mail(subject, dispatch_content.content)
|
||||
ids.append(dispatch_content.id)
|
||||
log(f"send mail success with title {subject}, content {content[:20]}.")
|
||||
if dispatch_content.ai_content:
|
||||
send_mail(subject + '[AI]', dispatch_content.ai_content)
|
||||
log(f"send ai content mail success with title {subject}, content {dispatch_content.ai_content[:20]}.")
|
||||
# 更新数据库
|
||||
finish_contents_to_dispatch(db, ids)
|
||||
|
||||
if __name__ == '__main__':
|
||||
send_reference_message_mail_task(TScheduler())
|
||||
Reference in New Issue
Block a user