import project
All checks were successful
Gitea Actions Demo / deploy (push) Successful in 35s

This commit is contained in:
2025-11-20 21:41:13 +08:00
commit 32d15e8c92
21 changed files with 401 additions and 0 deletions

6
.dockerignore Normal file
View File

@ -0,0 +1,6 @@
.git
.gitea
.gitignore
Readme.md
Dockerfile
docker-compose.yml

16
.env Normal file
View File

@ -0,0 +1,16 @@
ENV=dev
DEBUG=true
TIMEZONE=UTC
APP_NAME=MemeApp
# 日志配置
LOG_LEVEL=DEBUG
LOG_TYPE=console
# 数据库配置
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASS=123456
DB_NAME=mydb

15
.env.prod Normal file
View File

@ -0,0 +1,15 @@
ENV=prod
DEBUG=false
# 日志配置
LOG_LEVEL=INFO
LOG_TYPE=file
LOG_FILE_PATH=logs
# 数据库配置
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASS=123456
DB_NAME=mydb

14
.env.test Normal file
View File

@ -0,0 +1,14 @@
ENV=test
DEBUG=true
# 日志配置
LOG_LEVEL=DEBUG
LOG_TYPE=console
# 数据库配置
DB_HOST=localhost
DB_PORT=5432
DB_USER=postgres
DB_PASS=123456
DB_NAME=mydb

View File

@ -0,0 +1,30 @@
name: Gitea Actions Demo
run-name: ${{ gitea.actor }} is testing out Gitea Actions 🚀
on: [push]
jobs:
deploy:
runs-on: ubuntu-latest
container:
image: gitea/runner-images:ubuntu-latest
steps:
- name: clone project code
run: git clone ${{ gitea.server_url }}/${{ gitea.repository }} .
- name: List files
run: ls -la
- name: Stop running containers
run: |
docker compose down || true
- name: Remove old image
run: |
IMAGE_NAME=$(basename "$PWD")
echo "Removing old image: $IMAGE_NAME"
docker images | grep "$IMAGE_NAME" && docker rmi -f $(docker images "$IMAGE_NAME" -q) || echo "No old image found."
- name: Build new image
run: |
docker build -t $(basename "$PWD"):latest .
- name: Start containers
run: |
docker compose up -d
- name: Show container status
run: docker ps

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
__pycache__

27
Dockerfile Normal file
View File

@ -0,0 +1,27 @@
# 使用官方轻量级 Python 基础镜像
FROM python:3.12-slim
# 时区(可选)
ENV TZ=Asia/Shanghai
# 设置工作目录(容器内路径)
WORKDIR /app
# 将项目文件复制到容器中
COPY . /app
# (可选)如果你有 requirements.txt则先复制并安装依赖
RUN if [ -f requirements.txt ]; then \
pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple/; \
fi
# 设置环境变量(防止 Python 缓存文件)
ENV PYTHONUNBUFFERED=1
# 暴露端口
EXPOSE 8000
#----------------------------
# 启动 FastAPI生产模式
#----------------------------
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

0
README.md Normal file
View File

9
api/deps.py Normal file
View File

@ -0,0 +1,9 @@
from config.database import SessionLocal
from fastapi import Depends
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

6
api/v1/routers.py Normal file
View File

@ -0,0 +1,6 @@
from fastapi import APIRouter
# from .users import router as user_router
api_router = APIRouter()
# api_router.include_router(user_router, prefix="/users", tags=["users"])

18
config/app.py Normal file
View File

@ -0,0 +1,18 @@
from fastapi import FastAPI
from config.settings import settings
from api.v1.routers import api_router
def create_app(lifespan=None) -> FastAPI:
app = FastAPI(
title=settings.APP_NAME,
debug=settings.DEBUG,
lifespan=lifespan,
)
# 注册路由
app.include_router(api_router, prefix="/api/v1")
return app
# app = create_app()

18
config/database.py Normal file
View File

@ -0,0 +1,18 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from config.settings import settings
SQLALCHEMY_SYNC_URL = (
f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASS}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
engine = create_engine(
SQLALCHEMY_SYNC_URL,
echo=False, # 开发可改 True
future=True
)
SessionLocal = scoped_session(
sessionmaker(bind=engine, autoflush=False, autocommit=False)
)

22
config/env_loader.py Normal file
View File

@ -0,0 +1,22 @@
import os
from dotenv import load_dotenv
def load_env():
"""
自动根据 ENV 加载对应的 .env 文件
"""
base_file = ".env"
prod_file = ".env.prod"
test_file = ".env.test"
# 先加载基础 .env
if os.path.exists(base_file):
load_dotenv(base_file)
# 根据参数 ENV 再加载其他环境
env = os.getenv("ENV", "dev")
if env == "prod" and os.path.exists(prod_file):
load_dotenv(prod_file, override=True)
elif env == "test" and os.path.exists(test_file):
load_dotenv(test_file, override=True)

33
config/settings.py Normal file
View File

@ -0,0 +1,33 @@
from pydantic_settings import BaseSettings
from pydantic import Field
from config.env_loader import load_env
# 先加载 ENV & .env
load_env()
class Settings(BaseSettings):
# 环境
ENV: str = Field("dev")
DEBUG: bool = Field(True)
TIMEZONE: str = Field("UTC")
APP_NAME: str = Field("MemeApp")
# 日志
LOG_LEVEL: str = Field("LOG_LEVEL")
LOG_FILE_PATH: str = Field("logs")
LOG_TYPE: str = Field("console")
# 数据库
DB_HOST: str
DB_PORT: int
DB_USER: str
DB_PASS: str
DB_NAME: str
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
# 全局唯一配置实例
settings = Settings()

7
docker-compose.yml Normal file
View File

@ -0,0 +1,7 @@
version: "3.8"
services:
app:
image: meme:latest
container_name: meme
restart: always

84
main.py Normal file
View File

@ -0,0 +1,84 @@
from fastapi import FastAPI
from fastapi.concurrency import asynccontextmanager
from config.settings import settings
from utils.logger import logger
from scheduler.scheduler import scheduler
import scheduler.jobs as jobs
from config.app import create_app
"""
启动方式:
python -m uvicorn main:app --reload
说明:
- 使用 FastAPI lifespan 管理 APScheduler 生命周期(替代已废弃的 on_event
- 避免 Uvicorn reload 模式下调度器重复启动
- 所有 Job 在应用启动时统一注册
"""
def _add_jobs():
"""
注册所有定时任务。
注意:
- 增加重复检查,避免 reload 或多进程导致重复添加。
- replace_existing=True 保证任务更新时无需手动删除。
"""
if not scheduler.get_job("heartbeat-job"):
scheduler.add_job(
jobs.job_heartbeat,
trigger="interval",
seconds=10,
id="heartbeat-job",
replace_existing=True,
)
logger.info("Job 'heartbeat-job' registered.")
else:
logger.info("Job 'heartbeat-job' already exists. Skipped.")
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
FastAPI 应用生命周期管理。
startup:
- 注册定时任务
- 启动 APScheduler避免 reload 环境下重复启动)
shutdown:
- 安全关闭 APScheduler
yield:
- 让 FastAPI 在此期间正常运行
"""
logger.info("==== Lifespan: startup ====")
# 防止 Uvicorn reload 启动两个进程导致重复启动 scheduler
if scheduler.running:
logger.warning("Scheduler already running. Skip start.")
else:
_add_jobs()
try:
scheduler.start()
logger.info("APScheduler started.")
except Exception:
logger.exception("Failed to start APScheduler:")
raise
yield # ---- 应用运行中 ----
logger.info("==== Lifespan: shutdown ====")
# 仅在 scheduler 正常运行时关闭
if scheduler.running:
scheduler.shutdown(wait=False)
logger.info("APScheduler stopped.")
else:
logger.info("Scheduler was not running. Skip shutdown.")
# 创建 FastAPI 应用(注入 lifespan 管理逻辑)
app = create_app(lifespan=lifespan)

4
models/base.py Normal file
View File

@ -0,0 +1,4 @@
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
python-dotenv
pydantic-settings
loguru
sqlalchemy
psycopg
psycopg_binary
alembic
apscheduler
fastapi
uvicorn
gunicorn

6
scheduler/jobs.py Normal file
View File

@ -0,0 +1,6 @@
from utils.logger import logger
import datetime
def job_heartbeat():
logger.info(f"[heartbeat] {datetime.datetime.now()}")

37
scheduler/scheduler.py Normal file
View File

@ -0,0 +1,37 @@
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from loguru import logger
from config.settings import settings
SCHEDULER_DB_URL = (
f"postgresql+psycopg://{settings.DB_USER}:{settings.DB_PASS}"
f"@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}"
)
def create_scheduler():
job_stores = {
"default": SQLAlchemyJobStore(url=SCHEDULER_DB_URL)
}
executors = {
"default": ThreadPoolExecutor(10),
"processpool": ProcessPoolExecutor(2)
}
job_defaults = {
"coalesce": False, # 重叠任务处理
"max_instances": 3
}
scheduler = BackgroundScheduler(
# jobstores=job_stores,
executors=executors,
job_defaults=job_defaults,
timezone=settings.TIMEZONE,
)
return scheduler
scheduler = create_scheduler()

37
utils/logger.py Normal file
View File

@ -0,0 +1,37 @@
import sys
import os
from loguru import logger
from config.settings import settings
# 移除默认的 handler否则重复输出
logger.remove()
if "console" in settings.LOG_TYPE:
# ======== 控制台输出 ========
logger.add(
sys.stdout,
level=settings.LOG_LEVEL,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> "
"| <level>{level: <8}</level> "
"| <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> "
"- <level>{message}</level>",
)
if "file" in settings.LOG_TYPE:
# 日志目录
LOG_DIR = settings.LOG_FILE_PATH
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# ======== 文件输出(按天切割)========
logger.add(
f"{LOG_DIR}/app_{{time:YYYY-MM-DD}}.log",
rotation="00:00", # 每天 0 点切割
retention="7 days", # 保存 7 天
encoding="utf-8",
level=settings.LOG_LEVEL,
enqueue=True, # 多线程安全
compression="zip", # 自动压缩旧日志
)
__all__ = ["logger"]