置顶

FastAPI + Celery 实战:异步任务里调用 Redis 和数据库的全解析及生产级组织方案

作者:admin | 分类:顶尖机器人 | 浏览:3 | 日期:2026年04月02日
在现代Web开发中,FastAPI凭借其高性能异步特性成为构建API的热门选择,而Celery则是处理异步任务的利器。但在异步任务中调用Redis和数据库时,稍有不慎就会引发连接泄露、资源耗尽等问题。本文将从踩坑案例出发,解析问题根源,并给出生产级的代码组织方案。 一、异步任务调用Redis和数据库的常见陷阱 很多开发者在使用FastAPI的BackgroundTasks时,会直接将请求生命周期内创建的数据库Session或RedisConnection传递给异步任务,看似简单高效,实则隐藏巨大风险。 曾有一个用户头像处理项目,上传头像后异步生成缩略图,将结果存入MySQL,同时把用户ID和任务ID存入Redis做状态追踪。上线初期运行正常,但几天后开始随机报错:MySQL连接已关闭、Redis连接数超限,甚至出现任务执行到一半数据写入失败的情况。 经过排查发现,问题出在资源生命周期管理上。FastAPI的BackgroundTasks在请求响应返回后执行,但请求结束时,框架会自动关闭请求生命周期内创建的数据库会话和Redis连接。此时异步任务再使用这些已关闭的资源,必然报错。核心教训是:绝不能在异步任务中复用请求生命周期内的资源对象。 二、核心原理:资源生命周期分离 正确的做法是遵循“谁用谁创建,用完自己关”的原则,让异步任务拥有独立的资源管理逻辑。在任务函数内部重新创建数据库Session和Redis连接,任务执行完毕后确保资源被正确关闭或归还。 这意味着要摒弃依赖外部传递资源的方式,转而在任务内部实现资源的初始化与释放。虽然增加了代码量,但能从根本上避免资源生命周期错乱问题,保证异步任务的稳定性。 三、生产级代码组织方案 1. 项目结构设计 合理的项目结构是代码可维护的基础,推荐如下结构: project/ ├── app/ │ ├── api/ # 路由层 │ ├── core/ # 核心配置 │ │ ├── database.py # 数据库配置 │ │ └── redis_client.py # Redis配置 │ ├── models/ # 数据库模型 │ ├── schemas/ # Pydantic模型 │ └── tasks/ # 异步任务模块 │ ├── __init__.py │ ├── user_tasks.py # 具体任务实现 │ └── worker.py # 任务资源管理工具 └── requirements.txt 2. 核心资源管理实现 在tasks/worker.py中定义资源管理的辅助函数,使用上下文管理器确保资源正确释放: from sqlalchemy.orm import sessionmaker from app.core.database import engine from app.core.redis_client import get_redis_client from contextlib import contextmanager # 模块级别创建Session工厂 SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) @contextmanager def get_db_session(): """每个任务独立创建数据库会话,用完即关""" db = SessionLocal() try: yield db db.commit() except Exception: db.rollback() raise finally: db.close() @contextmanager def get_redis_conn(): """每个任务独立创建Redis连接,用完归还""" redis_conn = get_redis_client() try: yield redis_conn finally: redis_conn.close() 3. 异步任务实现 在tasks/user_tasks.py中编写具体任务逻辑,使用上述资源管理函数: from app.tasks.worker import get_db_session, get_redis_conn from app.models import UserAvatar from PIL import Image import os @celery_app.task(bind=True, max_retries=3) def process_avatar(self, user_id: int, avatar_path: str): """处理用户头像,生成缩略图并更新数据库和Redis""" try: # 生成不同尺寸缩略图 with Image.open(avatar_path) as img: sizes = [(100, 100), (200, 200), (400, 400)] thumb_paths = [] for size in sizes: thumb = img.resize(size) thumb_path = f"avatars/thumb_{size}_{user_id}.jpg" thumb.save(thumb_path) thumb_paths.append(thumb_path) # 更新数据库 with get_db_session() as db: user_avatar = UserAvatar(user_id=user_id, thumb_paths=thumb_paths) db.add(user_avatar) # 更新Redis状态 with get_redis_conn() as redis: redis.set(f"task_status:{self.request.id}", "completed") redis.set(f"user_avatar:{user_id}", ",".join(thumb_paths)) return {"status": "success", "user_id": user_id} except Exception as exc: # 异常重试 with get_redis_conn() as redis: redis.set(f"task_status:{self.request.id}", f"failed: {str(exc)}") raise self.retry(exc=exc, countdown=2**self.request.retries) 4. FastAPI接口集成 在api/endpoints.py中编写接口,触发异步任务: from fastapi import APIRouter, UploadFile, File from app.tasks.user_tasks import process_avatar import shutil router = APIRouter() @router.post("/upload-avatar") async def upload_avatar(user_id: int, file: UploadFile = File(...)): """上传用户头像并触发异步处理任务""" # 保存上传文件到临时目录 temp_path = f"temp/{user_id}_{file.filename}" with open(temp_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) # 触发异步任务 task = process_avatar.delay(user_id, temp_path) return {"task_id": task.id, "message": "头像处理任务已启动"} 四、总结 通过资源生命周期分离和合理的代码组织,我们可以在FastAPI + Celery的架构中安全、高效地调用Redis和数据库。这种方案不仅解决了连接泄露等问题,还提高了代码的可维护性和可扩展性。在实际项目中,还可以结合Celery的任务状态跟踪、重试机制等特性,进一步提升系统的稳定性和用户体验。