Skip to main content
今日目标:将 Python 代码与 MySQL 数据库”联姻”。但我们今天不走寻常路,将直接学习 异步 (Async) 数据库交互。这是 FastAPI 高性能的关键——如果你的 web 很快,但数据库查询卡住了线程,那整个服务还是慢的。今天不只是写代码,而是要彻底理解 “为什么需要异步数据库”“SQLAlchemy 如何工作” 以及 “如何设计数据库模型”

学习内容 (30 mins)

在开始写代码前,先搞懂这些核心概念,否则后面的代码你会看得云里雾里。
同步 vs 异步 (Sync vs Async)同步方式(传统)
  • 服务员(线程)给厨师下单 → 站在窗口傻等菜做好 → 端菜
  • 这期间他不能服务其他客人
  • 如果有很多客人,需要很多服务员(线程),资源消耗大
异步方式(现代)
  • 服务员给厨师下单 → 转头去服务下一桌 → 等菜好了再回来端
  • 一个服务员可以同时服务多桌客人
  • 资源利用率高,性能更好
在 Web 开发中的应用
  • 数据库 IO:查询数据库需要等待网络和磁盘 IO
  • 同步方式:线程被阻塞,等待数据库响应
  • 异步方式:使用 await,线程可以去处理其他请求,数据库响应后再继续
性能对比
  • 同步:1000 个并发请求需要 1000 个线程(资源消耗大)
  • 异步:1000 个并发请求可能只需要 10 个线程(资源消耗小)
什么时候用异步?
  • 适合:IO 密集型操作(数据库查询、网络请求、文件读写)
  • 不适合:CPU 密集型操作(计算、图像处理)
什么是 SQLAlchemy?SQLAlchemy 是 Python 最流行的 ORM(Object-Relational Mapping)框架,用于将 Python 对象映射到数据库表。为什么需要 ORM?
  • 避免手写 SQL:使用 Python 对象操作数据库,更安全
  • 类型安全:IDE 支持好,减少错误
  • 数据库无关:可以轻松切换数据库(MySQL、PostgreSQL 等)
  • 自动迁移:配合 Alembic 可以自动管理数据库结构变更
SQLAlchemy 核心组件
  • Engine(引擎)
    • 数据库的”驱动器”,负责真正的网络连接
    • 一个应用通常只有一个 Engine 实例
    • 创建方式:create_async_engine(DATABASE_URL)
  • Session(会话)
    • 你的”临时工作区”
    • 对对象的改动(add、commit)都发生在这里
    • 每个请求应该有一个独立的 Session
    • 使用完后必须关闭,否则会泄漏连接
  • Model(模型)
    • Python 类(Class)与数据库表(Table)的映射关系
    • 继承 Base,定义表结构和字段
    • 示例:class User(Base): __tablename__ = "users"
异步 SQLAlchemy 的关键
  • create_async_engine:创建异步引擎(不是 create_engine
  • AsyncSession:使用异步会话(不是 Session
  • await:所有数据库操作都要加 await
  • 驱动选择:MySQL 用 aiomysql,PostgreSQL 用 asyncpg

代码任务 (90 mins)

1

环境准备

确保虚拟环境已激活,并安装必要的包:
# 确保虚拟环境已激活
source .venv/bin/activate

# 安装 SQLAlchemy(ORM 框架)
pip install sqlalchemy

# 安装异步 MySQL 驱动
pip install aiomysql

# 如果使用 PostgreSQL,安装:
# pip install asyncpg

# 确保 MySQL 服务正在运行
# docker run -d -p 3306:3306 --name mysql-learn -e MYSQL_ROOT_PASSWORD=root mysql:8.0
2

任务 A:配置数据库核心 (Engine & Model)

新建 database.py 文件,配置数据库连接和模型。
#!/usr/bin/env python3
"""
Day 18 - 数据库配置和模型定义
演示如何使用 SQLAlchemy 异步引擎和模型定义
"""

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime

# ========== 1. 连接字符串 ==========
# 格式: mysql+aiomysql://用户名:密码@地址:端口/库名
# 重点: 必须明确指定 +aiomysql(表示使用异步驱动)
# 如果是 PostgreSQL: postgresql+asyncpg://...
DATABASE_URL = "mysql+aiomysql://root:root@localhost/infra_db"

# ========== 2. 创建异步引擎 ==========
# echo=True: 会打印生成的 SQL 语句,方便调试(生产环境应该设为 False)
# pool_pre_ping=True: 连接池预检查,自动重连断开的连接
engine = create_async_engine(
    DATABASE_URL, 
    echo=True,
    pool_pre_ping=True
)

# ========== 3. 创建会话工厂 ==========
# 以后每次请求进来,都从这里拿一个新的 session
# class_=AsyncSession: 指定使用异步会话
# expire_on_commit=False: 提交后不使对象过期(便于返回数据)
AsyncSessionLocal = sessionmaker(
    engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)

# ========== 4. 声明基类 ==========
# 所有的 Model 都要继承这个 Base
Base = declarative_base()

# ========== 5. 定义 User 模型 ==========
class User(Base):
    """
    用户模型
    对应数据库中的 users 表
    """
    __tablename__ = "users"  # 对应数据库里的表名
    
    # 主键:自增整数
    id = Column(Integer, primary_key=True, index=True)
    
    # 用户名:字符串,唯一,非空
    username = Column(String(50), unique=True, nullable=False)
    
    # 邮箱:字符串,可为空
    email = Column(String(100), nullable=True)
    
    # 创建时间:日期时间,默认当前时间
    created_at = Column(DateTime, default=datetime.utcnow)
代码解释
  • create_async_engine:创建异步引擎(不是 create_engine
  • mysql+aiomysql://:指定使用异步 MySQL 驱动
  • AsyncSessionLocal:会话工厂,用于创建数据库会话
  • Base:所有模型的基类
  • Column:定义数据库列
验证步骤
  1. 检查文件语法是否正确
  2. 尝试导入:python -c "from database import engine, User; print('OK')"
3

任务 B:编写 CRUD 接口

修改 18_main.py,实现数据库的增删改查操作。
#!/usr/bin/env python3
"""
Day 18 - FastAPI 数据库整合实战
演示如何使用异步 SQLAlchemy 进行数据库操作
"""

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.future import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import AsyncSessionLocal, User, Base, engine

# 初始化 FastAPI 应用
app = FastAPI(
    title="Database Integration API",
    description="演示 FastAPI + SQLAlchemy 异步数据库操作",
    version="1.0.0"
)

# ========== 启动时自动建表 ==========
# 仅用于开发环境,生产环境请使用 Alembic 进行数据库迁移
@app.on_event("startup")
async def startup():
    """
    应用启动时自动创建数据库表
    """
    async with engine.begin() as conn:
        # 创建所有继承自 Base 的模型对应的表
        await conn.run_sync(Base.metadata.create_all)
    print("✅ Database tables created")

# ========== 关键依赖: get_db ==========
# 这是一个 Generator 用于依赖注入
# 它的作用是:请求来了 -> 开连接 -> 给路由用 -> 路由跑完 -> 关连接
async def get_db():
    """
    获取数据库会话的依赖函数
    
    使用 Generator 模式,确保会话在使用完后自动关闭
    """
    async with AsyncSessionLocal() as session:
        # yield 表示这是一个 Generator
        # FastAPI 会在路由函数执行完后自动关闭 session
        yield session

# ========== 1. 创建用户 (POST) ==========
@app.post("/users/", status_code=201)
async def create_user(
    username: str,
    email: str = None,
    db: AsyncSession = Depends(get_db)
):
    """
    创建新用户
    
    参数:
        username: 用户名(必填)
        email: 邮箱(可选)
        db: 数据库会话(依赖注入)
    
    返回:
        User: 创建的用户对象
    """
    # 1. 检查用户名是否已存在
    result = await db.execute(
        select(User).where(User.username == username)
    )
    existing_user = result.scalars().first()
    
    if existing_user:
        raise HTTPException(
            status_code=400,
            detail="Username already exists"
        )
    
    # 2. 实例化 ORM 对象(此时还没进数据库)
    new_user = User(username=username, email=email)
    
    # 3. 添加到暂存区
    db.add(new_user)
    
    # 4. 提交事务(这一步才真正执行 SQL INSERT)
    # 注意: 必须加 await,因为这是网络 IO
    await db.commit()
    
    # 5. 刷新对象(为了拿回数据库生成的 id 和 created_at)
    await db.refresh(new_user)
    
    return new_user

# ========== 2. 查询用户列表 (GET) ==========
@app.get("/users/")
async def read_users(
    skip: int = 0,
    limit: int = 10,
    db: AsyncSession = Depends(get_db)
):
    """
    分页查询用户列表
    
    参数:
        skip: 跳过多少条记录(默认 0)
        limit: 返回多少条记录(默认 10)
        db: 数据库会话(依赖注入)
    
    返回:
        List[User]: 用户列表
    """
    # 新版 SQLAlchemy 查询语法: select(Model)
    # offset(skip): 跳过 skip 条
    # limit(limit): 只取 limit 条
    result = await db.execute(
        select(User).offset(skip).limit(limit)
    )
    
    # scalars().all() 把结果集转换成纯净的列表
    users = result.scalars().all()
    return users

# ========== 3. 查询单个用户 (GET) ==========
@app.get("/users/{user_id}")
async def read_user(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    """
    查询单个用户
    
    参数:
        user_id: 用户 ID(路径参数)
        db: 数据库会话(依赖注入)
    
    返回:
        User: 用户对象
    """
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalars().first()
    
    if not user:
        raise HTTPException(
            status_code=404,
            detail="User not found"
        )
    
    return user

# ========== 4. 更新用户 (PUT) ==========
@app.put("/users/{user_id}")
async def update_user(
    user_id: int,
    email: str = None,
    db: AsyncSession = Depends(get_db)
):
    """
    更新用户信息
    
    参数:
        user_id: 用户 ID(路径参数)
        email: 新邮箱(可选)
        db: 数据库会话(依赖注入)
    
    返回:
        User: 更新后的用户对象
    """
    # 1. 查询用户
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalars().first()
    
    if not user:
        raise HTTPException(
            status_code=404,
            detail="User not found"
        )
    
    # 2. 更新字段
    if email is not None:
        user.email = email
    
    # 3. 提交事务
    await db.commit()
    await db.refresh(user)
    
    return user

# ========== 5. 删除用户 (DELETE) ==========
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(
    user_id: int,
    db: AsyncSession = Depends(get_db)
):
    """
    删除用户
    
    参数:
        user_id: 用户 ID(路径参数)
        db: 数据库会话(依赖注入)
    
    返回:
        204 No Content(无响应体)
    """
    # 1. 查询用户
    result = await db.execute(
        select(User).where(User.id == user_id)
    )
    user = result.scalars().first()
    
    if not user:
        raise HTTPException(
            status_code=404,
            detail="User not found"
        )
    
    # 2. 删除用户
    await db.delete(user)
    
    # 3. 提交事务
    await db.commit()
    
    # 204 状态码表示成功但无响应体
    return None
代码解释
  • get_db():依赖函数,自动管理数据库会话的生命周期
  • select(User):新版 SQLAlchemy 查询语法
  • await db.execute():执行查询(必须加 await)
  • db.add():添加对象到会话
  • await db.commit():提交事务(必须加 await)
  • await db.refresh():刷新对象,获取数据库生成的值
运行脚本
# 启动服务器
uvicorn 18_main:app --reload
验证步骤
  1. 启动服务器后,检查控制台是否输出 “Database tables created”
  2. 访问 http://localhost:8000/docs 打开 Swagger UI
  3. 测试创建用户:
    • 点击 POST /users/
    • 输入 username: "test_user"email: "test@example.com"
    • 执行,应该返回 201 和用户信息
  4. 测试查询用户列表:
    • 点击 GET /users/
    • 执行,应该看到用户列表
  5. 在数据库中验证:
    docker exec -it mysql-learn mysql -uroot -proot -e "USE infra_db; SELECT * FROM users;"
    
常见错误
  • ModuleNotFoundError: No module named 'aiomysql' - 未安装 aiomysql,执行 pip install aiomysql
  • Can't connect to MySQL server - 数据库服务未启动或连接配置错误
  • Table 'infra_db.users' doesn't exist - 表不存在,检查是否执行了建表操作
  • RuntimeError: This event loop is already running - 异步事件循环冲突,检查代码中是否有同步调用

拓展任务 (30 mins)

挑战 1:数据库迁移 (Alembic)

任务:研究如何使用 Alembic 进行数据库迁移。步骤
  1. 安装 Alembic:pip install alembic
  2. 初始化:alembic init alembic
  3. 生成迁移:alembic revision --autogenerate -m "create users table"
  4. 执行迁移:alembic upgrade head
提示:生产环境必须使用 Alembic,不要用 Base.metadata.create_all()

挑战 2:关联查询

任务:创建一个 Post 模型,与 User 建立一对多关系。提示
from sqlalchemy.orm import relationship

class Post(Base):
    user_id = Column(Integer, ForeignKey('users.id'))
    user = relationship("User", back_populates="posts")

今日产出物

  • database.py - 数据库配置和模型定义
  • 18_main.py - 完整的 CRUD API 实现

参考代码

查看参考代码

在 GitHub 查看完整的示例代码

在线运行

使用在线编辑器测试代码

实际应用场景

异步数据库在 API 开发中的应用

  • 高并发场景:处理大量并发请求,提高性能
  • 资源优化:减少线程数量,降低资源消耗
  • 响应速度:异步 IO 不会阻塞其他请求
  • 可扩展性:支持更多并发连接

ORM 设计最佳实践

  • 模型分离:将模型定义放在独立的文件中
  • 会话管理:使用依赖注入自动管理会话生命周期
  • 事务控制:合理使用 commit 和 rollback
  • 查询优化:使用 select 而不是 query,使用索引
  • 迁移管理:使用 Alembic 管理数据库结构变更
与 Day 19 的关联:今天学习的数据库操作,明天会学习如何将数据库配置移到环境变量,并使用 Docker 部署应用。

上一天: 依赖注入

Day 17 | 依赖注入与中间件

下一天: 生产部署

Day 19 | 环境变量与 Docker 部署