Skip to main content
实战阶段 I:今天我们将构建整个系统的“心脏”——Master 节点。它的核心要求是:高并发写入性能(对接无数个 Agent)和 数据安全性。我们将结合 Week 3 学到的 FastAPI 和 Pydantic 知识,打造一个工业级的后端服务。

学习内容 (30 mins)

在开始写代码前,先复习一下我们要用到的关键库,特别是异步数据库操作。
Python 的性能瓶颈
  • 同步 IO (Sync): 传统 Flask/Django 是阻塞的。当代码执行 db.query() 时,整个线程都在傻等数据库返回,CPU 处于闲置状态。如果来了 1000 个请求,你的服务器就卡死了。
  • 异步 IO (Async): 使用 async/await。当代码执行 await db.execute() 时,Python 会把控制权交还给事件循环 (Event Loop),去处理别人的请求。等数据库查好了,再像回调一样通知回来。
  • Master 的场景: 我们是一个写密集型的应用,数据库 IO 是最大的瓶颈,所以必须用 Async。
现代 ORM 的写法
  • 旧写法: session.add(obj); session.commit()
  • 新写法: session.add(obj); await session.commit()
  • 驱动变化: 我们不能再用 pymysql(它是同步的),而要换成 aiomysql
垃圾进,垃圾出 (GIGO)
  • Agent 可能会发疯,上报 CPU = -50% 或者 CPU = 9999%,或者传个字符串过来。
  • 如果我们不做校验直接存库,后续画图就会报错。
  • Pydantic 作用: 在数据进入业务逻辑之前,像安检员一样,把不合格的数据直接挡回去 (Return 422)。

开发任务 (90 mins)

1

步骤 1: 依赖安装与数据库配置

我们将使用 aiomysql 作为异步驱动。
# 如果没安装,请先执行
pip install fastapi uvicorn sqlalchemy aiomysql pymysql cryptography
验证步骤:
  1. 运行 python3 master/database.py。如果没有报错,说明驱动安装成功。
2

步骤 2: 定义数据模型 (Model & Schema)

这是最容易混淆的地方:Model 是给数据库看的,Schema 是给 API 看的。
from sqlalchemy import Column, Integer, String, Float, DateTime
from sqlalchemy.sql import func
from database import Base

# ORM 模型:对应数据库表结构
class Metric(Base):
    __tablename__ = "server_metrics"
    
    id = Column(Integer, primary_key=True)
    hostname = Column(String(64), index=True)
    ip = Column(String(15))
    cpu = Column(Float)
    mem = Column(Float)
    disk = Column(Float)
    # server_default 让数据库自己处理时间
    created_at = Column(DateTime, server_default=func.now())
3

步骤 3: 核心 API 实现

编写 main.py,把所有模块串起来。
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, desc
import models, database, schemas

app = FastAPI(title="Hybrid-Monitor Master")

# 启动钩子:自动建表
# 注意:生产环境通常用 Alembic 做迁移,但开发环境这样最快
@app.on_event("startup")
async def startup():
    async with database.engine.begin() as conn:
        await conn.run_sync(models.Base.metadata.create_all)

# 接口 A: 接收上报 (POST)
# response_model=dict 简化返回,我们只需要告诉 Agent 成功了
@app.post("/api/v1/report", status_code=201)
async def report_metric(
    data: schemas.MetricReport, 
    db: AsyncSession = Depends(database.get_db)
):
    # 1. 将 Pydantic Schema 转为 ORM Model
    # **data.dict() 是解包语法,相当于 hostname=data.hostname, ...
    new_metric = models.Metric(**data.dict())
    
    # 2. 添加到 Session
    db.add(new_metric)
    
    # 3. 异步提交
    # ⚠️ 必须 await commit,否则数据不会入库
    await db.commit()
    
    return {"msg": "saved"}

# 接口 B: 获取全网负载最高的 5 台机器 (GET)
@app.get("/api/v1/top")
async def get_top_cpu(db: AsyncSession = Depends(database.get_db)):
    # 异步查询语法: select(模型).order_by(...).limit(...)
    query = select(models.Metric).order_by(desc(models.Metric.cpu)).limit(5)
    
    result = await db.execute(query)
    
    # scalars().all() 将结果转换为 Model 对象列表
    return result.scalars().all()

if __name__ == "__main__":
    import uvicorn
    # 启动服务
    uvicorn.run(app, host="0.0.0.0", port=8000)
代码解释
  • async def report_metric(...):异步函数,支持高并发
  • data: schemas.MetricReport:Pydantic 自动验证请求数据
  • Depends(database.get_db):依赖注入,自动管理数据库会话
  • await db.commit():异步提交,必须加 await
异步编程的设计哲学
  • IO 不阻塞:数据库操作时,线程可以去处理其他请求
  • 高并发:一个线程可以处理多个请求,提高吞吐量
  • 资源管理:依赖注入自动管理资源生命周期
验证步骤:
  1. 启动 MySQL 容器 (回顾 Day 01)
  2. 运行服务: python3 master/main.py
  3. 打开浏览器访问 http://localhost:8000/docs (Swagger UI)
  4. 测试 POST /api/v1/report:
    {
      "hostname": "test-box",
      "ip": "1.1.1.1",
      "cpu": 50,
      "mem": 50,
      "disk": 50
    }
    
  5. 如果返回 201 且控制台打印了 INSERT SQL,说明成功!
  6. 测试 GET /api/v1/top,应该能看到上报的数据
常见错误
  • GreenletError: 这是一个经典的 SQLAlchemy 错误,通常意味这你在 Async 环境里用了 Sync 的代码(比如直接访问关联属性)。确保所有 DB 操作都有 await
  • Connection Refused: 数据库没起,或者端口不对。
  • 422 Validation Error: 测试 JSON 数据类型不对。

拓展任务 (30 mins)

查询优化

任务:目前的 /top 接口每次都查全表排序,效率极低。挑战:修改 SQL 查询,只查询 最近 1 分钟内 上报的数据中的 Top 5。
  • 提示:from datetime import datetime, timedelta
  • where(models.Metric.created_at > datetime.now() - timedelta(minutes=1))

结构优化

任务:将代码拆分到 routers/ 目录。挑战:把 API 路由逻辑从 main.py 移到 routers/metrics.py,保持 main.py 干净。这是大型项目的标准做法。

今日产出物

  • database.py - 异步数据库连接
  • models.py & schemas.py - 双层数据模型
  • main.py - 核心 API 逻辑

参考代码

查看参考代码

GitHub 完整代码仓库

SQLAlchemy 异步指南

官方 AsyncIO 文档

实际应用场景

CMDB 资产管理

  • 我们今天写的是动态监控。
  • 其实稍微改改,增加一个 /register 接口,让 Agent 启动时上报 CPU 型号、内存大小、OS 版本,这不就是一个简单的 CMDB (配置管理数据库) 吗?
Next Step: 大脑已经准备好了。下一步(Pt.2),我们将去构建那只勤劳的“小蜜蜂”——Go Agent。Go 环境准备好了吗?

上一阶段: 需求分析

Day 29 | 综合大项目需求分析

下一阶段: Agent 开发

Day 30 Pt.2 | 探针 Agent 开发