Skip to main content
今日目标:Python 与 MySQL 是天生一对。今天我们不使用沉重的 ORM,而是学习使用轻量的 PyMySQL 驱动,手写 SQL 来实现”增删改查”,这是最高效的 Infra 开发方式。今天不只是写代码,而是要彻底理解 “为什么参数化查询能防止 SQL 注入”“上下文管理器如何确保连接关闭” 以及 “如何设计一个健壮的数据库操作类”

学习内容 (30 mins)

在开始写代码前,先搞懂这些核心概念,否则后面的代码你会看得云里雾里。
什么是 PyMySQL?PyMySQL 是一个纯 Python 实现的 MySQL 客户端库,用于连接和操作 MySQL 数据库。为什么选择 PyMySQL?
  • 轻量级:纯 Python 实现,无需编译
  • 简单易用:API 简洁,学习曲线平缓
  • 兼容性好:兼容 MySQL 5.5+ 和 MariaDB
  • 适合运维:不需要复杂的 ORM,直接写 SQL 更灵活
核心概念
  • 连接对象 (Connection)
    • 建立与数据库的 TCP 连接
    • 需要提供:host、user、password、database
    • 连接后需要手动关闭,否则会泄漏资源
  • 游标对象 (Cursor)
    • 用于执行 SQL 语句和获取结果
    • 一个连接可以有多个游标
    • 执行 SELECT 后需要 fetch 获取结果
  • 事务控制
    • commit():提交事务(INSERT/UPDATE/DELETE 必须提交才生效)
    • rollback():回滚事务(撤销未提交的更改)
    • 默认自动提交是关闭的,需要手动 commit
SQL 注入攻击SQL 注入是通过在输入中插入恶意 SQL 代码来攻击数据库的技术。危险示例
# ❌ 危险:使用字符串拼接
username = "admin'; DROP TABLE users; --"
sql = f"SELECT * FROM users WHERE username='{username}'"
# 实际执行的 SQL:
# SELECT * FROM users WHERE username='admin'; DROP TABLE users; --'
# 这会删除 users 表!
安全做法
# ✅ 安全:使用参数化查询
username = "admin'; DROP TABLE users; --"
sql = "SELECT * FROM users WHERE username=%s"
cursor.execute(sql, (username,))
# PyMySQL 会自动转义特殊字符,防止注入
参数化查询的优势
  • 防止注入:驱动会自动转义特殊字符
  • 性能更好:数据库可以缓存执行计划
  • 类型安全:驱动会自动处理类型转换
最佳实践
  • 禁止:使用 f-string 或 % 格式化拼接 SQL
  • 推荐:使用 %s 占位符 + execute(sql, (param,))
  • 推荐:使用上下文管理器确保连接关闭

代码任务 (90 mins)

1

环境准备

确保虚拟环境已激活,并安装 PyMySQL:
# 确保虚拟环境已激活(提示符前有 (.venv))
source .venv/bin/activate

# 安装 PyMySQL
pip install pymysql

# 确保 MySQL 服务正在运行(可以使用 Docker)
# docker run -d -p 3306:3306 --name mysql-learn -e MYSQL_ROOT_PASSWORD=root mysql:8.0
2

任务 A:数据库连接与基本操作

编写 12_db_basic.py,演示基本的数据库操作。
#!/usr/bin/env python3
"""
Day 12 - PyMySQL 基础操作
演示数据库连接、查询和更新
"""

import pymysql

# ========== 数据库配置 ==========
DB_CONFIG = {
    "host": "localhost",
    "user": "root",
    "password": "root",
    "database": "infra_db",
    "charset": "utf8mb4",
    "cursorclass": pymysql.cursors.DictCursor  # 返回字典格式,更易用
}

# ========== 1. 基本连接和查询 ==========
print("=== 基本连接和查询 ===")

try:
    # 建立连接
    conn = pymysql.connect(**DB_CONFIG)
    print("✅ 数据库连接成功")
    
    # 创建游标
    cursor = conn.cursor()
    
    # 执行查询(参数化查询,防止 SQL 注入)
    sql = "SELECT * FROM users WHERE id = %s"
    cursor.execute(sql, (1,))  # %s 会被替换为 1,自动转义
    
    # 获取结果
    result = cursor.fetchone()  # 获取一条记录
    if result:
        print(f"查询结果: {result}")
    else:
        print("未找到记录")
    
    # 关闭游标和连接
    cursor.close()
    conn.close()
    print("✅ 连接已关闭")
    
except pymysql.Error as e:
    print(f"❌ 数据库错误: {e}")

# ========== 2. 插入数据 ==========
print("\n=== 插入数据 ===")

try:
    conn = pymysql.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    # 插入数据(参数化查询)
    sql = "INSERT INTO users (username, email) VALUES (%s, %s)"
    cursor.execute(sql, ("python_user", "python@example.com"))
    
    # 获取插入的 ID
    new_id = cursor.lastrowid
    print(f"✅ 插入成功,新记录 ID: {new_id}")
    
    # 重要:必须提交事务,否则数据不会保存
    conn.commit()
    
    cursor.close()
    conn.close()
    
except pymysql.Error as e:
    # 如果出错,回滚事务
    conn.rollback()
    print(f"❌ 插入失败: {e}")

# ========== 3. 批量插入 ==========
print("\n=== 批量插入 ===")

try:
    conn = pymysql.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    # 准备批量数据
    users_data = [
        ("user1", "user1@example.com"),
        ("user2", "user2@example.com"),
        ("user3", "user3@example.com"),
    ]
    
    # 批量插入(使用 executemany)
    sql = "INSERT INTO users (username, email) VALUES (%s, %s)"
    cursor.executemany(sql, users_data)
    
    # 提交事务
    conn.commit()
    print(f"✅ 批量插入成功,插入了 {cursor.rowcount} 条记录")
    
    cursor.close()
    conn.close()
    
except pymysql.Error as e:
    conn.rollback()
    print(f"❌ 批量插入失败: {e}")

# ========== 4. 更新和删除 ==========
print("\n=== 更新和删除 ===")

try:
    conn = pymysql.connect(**DB_CONFIG)
    cursor = conn.cursor()
    
    # 更新数据
    sql = "UPDATE users SET email = %s WHERE username = %s"
    cursor.execute(sql, ("newemail@example.com", "python_user"))
    print(f"✅ 更新了 {cursor.rowcount} 条记录")
    
    # 删除数据(谨慎!)
    # sql = "DELETE FROM users WHERE username = %s"
    # cursor.execute(sql, ("python_user",))
    # print(f"✅ 删除了 {cursor.rowcount} 条记录")
    
    # 提交事务
    conn.commit()
    
    cursor.close()
    conn.close()
    
except pymysql.Error as e:
    conn.rollback()
    print(f"❌ 操作失败: {e}")
代码解释
  • 连接配置:使用字典存储配置,便于管理
  • 参数化查询:使用 %s 占位符,防止 SQL 注入
  • 事务控制:INSERT/UPDATE/DELETE 必须 commit 才生效
  • 错误处理:捕获异常并回滚事务
运行脚本
python 12_db_basic.py
验证步骤
  1. 确保 MySQL 服务正在运行
  2. 确保 infra_db 数据库和 users 表已创建
  3. 运行脚本,检查输出是否正确
  4. 在数据库中验证数据是否插入成功
常见错误
  • pymysql.err.OperationalError: (2003, "Can't connect to MySQL server") - 数据库服务未启动或连接配置错误
  • pymysql.err.ProgrammingError: (1146, "Table 'infra_db.users' doesn't exist") - 表不存在,需要先创建表
  • pymysql.err.IntegrityError: (1062, "Duplicate entry") - 违反唯一约束,检查主键或唯一索引
3

任务 B:数据库管理类封装

编写 12_db_manager.py,实现一个健壮的数据库管理类。
#!/usr/bin/env python3
"""
Day 12 - 数据库管理类
使用上下文管理器确保连接正确关闭
"""

import pymysql
from contextlib import contextmanager
from typing import Optional, Dict, List, Any

class DBManager:
    """数据库管理类"""
    
    def __init__(self, config: Dict[str, Any]):
        """
        初始化数据库管理器
        
        参数:
            config: 数据库配置字典
        """
        self.config = config
    
    @contextmanager
    def get_cursor(self):
        """
        获取数据库游标的上下文管理器
        
        使用示例:
            with db.get_cursor() as cursor:
                cursor.execute("SELECT * FROM users")
                result = cursor.fetchall()
        """
        conn = None
        try:
            # 建立连接
            conn = pymysql.connect(**self.config)
            cursor = conn.cursor()
            
            # 返回游标
            yield cursor
            
            # 如果没有异常,提交事务
            conn.commit()
            
        except Exception as e:
            # 如果发生异常,回滚事务
            if conn:
                conn.rollback()
            raise e
        finally:
            # 无论是否异常,都关闭连接
            if conn:
                conn.close()
    
    def query_one(self, sql: str, params: Optional[tuple] = None) -> Optional[Dict]:
        """
        查询单条记录
        
        参数:
            sql: SQL 语句
            params: 参数元组
        
        返回:
            字典或 None
        """
        with self.get_cursor() as cursor:
            cursor.execute(sql, params or ())
            return cursor.fetchone()
    
    def query_all(self, sql: str, params: Optional[tuple] = None) -> List[Dict]:
        """
        查询多条记录
        
        参数:
            sql: SQL 语句
            params: 参数元组
        
        返回:
            字典列表
        """
        with self.get_cursor() as cursor:
            cursor.execute(sql, params or ())
            return cursor.fetchall()
    
    def execute(self, sql: str, params: Optional[tuple] = None) -> int:
        """
        执行 INSERT/UPDATE/DELETE 语句
        
        参数:
            sql: SQL 语句
            params: 参数元组
        
        返回:
            受影响的行数
        """
        with self.get_cursor() as cursor:
            cursor.execute(sql, params or ())
            return cursor.rowcount
    
    def execute_many(self, sql: str, params_list: List[tuple]) -> int:
        """
        批量执行 INSERT/UPDATE/DELETE 语句
        
        参数:
            sql: SQL 语句
            params_list: 参数列表
        
        返回:
            受影响的行数
        """
        with self.get_cursor() as cursor:
            cursor.executemany(sql, params_list)
            return cursor.rowcount

# ========== 使用示例 ==========
if __name__ == "__main__":
    # 数据库配置
    db_config = {
        "host": "localhost",
        "user": "root",
        "password": "root",
        "database": "infra_db",
        "charset": "utf8mb4",
        "cursorclass": pymysql.cursors.DictCursor
    }
    
    # 创建数据库管理器
    db = DBManager(db_config)
    
    # 1. 查询单条记录
    print("=== 查询单条记录 ===")
    user = db.query_one("SELECT * FROM users WHERE id = %s", (1,))
    if user:
        print(f"用户: {user['username']}, 邮箱: {user['email']}")
    
    # 2. 查询多条记录
    print("\n=== 查询多条记录 ===")
    users = db.query_all("SELECT * FROM users LIMIT 5")
    print(f"找到 {len(users)} 个用户")
    
    # 3. 插入数据
    print("\n=== 插入数据 ===")
    rows = db.execute(
        "INSERT INTO users (username, email) VALUES (%s, %s)",
        ("db_manager_user", "db@example.com")
    )
    print(f"插入了 {rows} 条记录")
    
    # 4. 批量插入
    print("\n=== 批量插入 ===")
    users_data = [
        ("batch_user1", "batch1@example.com"),
        ("batch_user2", "batch2@example.com"),
    ]
    rows = db.execute_many(
        "INSERT INTO users (username, email) VALUES (%s, %s)",
        users_data
    )
    print(f"批量插入了 {rows} 条记录")
代码解释
  • 上下文管理器:使用 @contextmanager 确保连接自动关闭
  • 类型注解:使用类型提示提高代码可读性
  • 方法封装:将常用操作封装成方法,便于使用
  • 异常处理:自动回滚事务,确保数据一致性
运行脚本
python 12_db_manager.py
验证步骤
  1. 脚本能正常运行,无语法错误
  2. 检查查询结果是否正确
  3. 验证数据是否插入成功
  4. 确认连接是否正确关闭(不会泄漏)

拓展任务 (30 mins)

挑战 1:系统监控入库

任务:结合 Day 11 学到的 subprocess,写一个脚本:每隔 1 分钟获取一次系统 CPU 使用率,并存入到 MySQL 的 server_monitor 表中。提示
  • 使用 time.sleep(60) 实现定时
  • 使用 subprocess 执行 topvmstat 命令
  • 使用 DBManager 类插入数据

挑战 2:连接池

任务:研究如何使用连接池(如 pymysql.pooling)来管理数据库连接,提高性能。提示:连接池可以复用连接,减少建立连接的开销。

今日产出物

  • 12_db_basic.py - PyMySQL 基础操作示例
  • 12_db_manager.py - 数据库管理类

参考代码

查看参考代码

在 GitHub 查看完整的示例代码

在线运行

使用在线编辑器测试代码

实际应用场景

数据库操作在运维中的应用

  • 监控数据存储:将系统监控数据存入数据库
  • 日志归档:将日志信息存入数据库便于查询
  • 配置管理:从数据库读取和更新配置
  • 报表生成:从数据库查询数据生成报表

最佳实践

  • 参数化查询:始终使用参数化查询,防止 SQL 注入
  • 上下文管理器:使用 with 语句确保连接关闭
  • 事务控制:合理使用事务,确保数据一致性
  • 错误处理:捕获异常并回滚事务
  • 连接管理:避免连接泄漏,及时关闭连接
与 Day 13 的关联:今天学习的数据库操作,明天会结合文件处理、正则表达式和 subprocess,开发一个完整的自动化监控工具。

上一天: 系统调用

Day 11 | Python 调用 Shell

下一天: 阶段实战

Day 13 | 第二阶段综合实战