Skip to main content
实战日:Day 13 是 Python 阶段的期末考试。我们将融合前 5 天的所有知识点(文件处理、正则、subprocess、PyMySQL),开发一个 端到端的自动化监控与告警工具。今天不只是写代码,而是要彻底理解 “如何将多个知识点整合成完整项目”“如何设计一个可扩展的监控系统” 以及 “如何让脚本在生产环境中稳定运行”

实战目标 | Project Goal

开发工具 13_ops_toolbox.py,实现以下”一条龙”服务:

1. 日志采集

读取 Nginx/App 日志文件,利用正则表达式提取关键错误信息 (Error/Exception)。

2. 系统快照

调用 subprocess 获取当前 CPU/内存负载,作为环境上下文。

3. 数据持久化

将提取的错误信息 + 系统负载写入 MySQL 数据库进行归档。

4. 异常告警

如果错误数超过阈值,触发(模拟)钉钉/Slack 告警

代码任务 (120 mins)

1

环境准备

确保所有依赖已安装:
# 确保虚拟环境已激活
source .venv/bin/activate

# 安装依赖
pip install pymysql requests

# 确保 MySQL 服务正在运行
# docker run -d -p 3306:3306 --name mysql-learn -e MYSQL_ROOT_PASSWORD=root mysql:8.0
创建模拟日志文件 app_error.log
cat > app_error.log << 'EOF'
[2026-01-31 10:00:01] INFO: Service started
[2026-01-31 10:05:23] ERROR: Database connection timeout (IP: 192.168.1.5)
[2026-01-31 10:06:12] ERROR: Out of memory in worker process
[2026-01-31 10:07:00] INFO: Health check passed
[2026-01-31 10:08:15] ERROR: Failed to connect to Redis (IP: 192.168.1.10)
EOF
2

数据库建表

在使用脚本前,先创建数据库表:
-- 连接到 MySQL
-- docker exec -it mysql-learn mysql -uroot -proot

-- 创建数据库(如果不存在)
CREATE DATABASE IF NOT EXISTS infra_db;
USE infra_db;

-- 创建监控日志表
CREATE TABLE IF NOT EXISTS monitor_logs (
    id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY COMMENT '日志ID',
    load_avg FLOAT COMMENT '系统负载(1分钟)',
    error_count INT COMMENT '错误数量',
    last_error_msg VARCHAR(500) COMMENT '最后一条错误信息',
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='系统监控日志表';
3

完整的 Python 代码实现

这个脚本融合了前 5 天的所有知识点,可以作为你未来的”万能脚手架”。
#!/usr/bin/env python3
"""
Day 13 - 运维自动化工具箱
融合文件处理、正则、subprocess、数据库操作的综合实战项目

功能:
1. 日志分析:从日志文件中提取错误信息
2. 系统监控:获取系统负载和资源使用情况
3. 数据存储:将监控数据存入数据库
4. 告警通知:当系统异常时发送告警
"""

import re
import time
import subprocess
import pymysql
from contextlib import contextmanager
from typing import Tuple, Dict, Optional
from datetime import datetime

# ========== 配置区 ==========
LOG_FILE = "app_error.log"
DB_CONFIG = {
    "host": "localhost",
    "user": "root",
    "password": "root",
    "database": "infra_db",
    "charset": "utf8mb4",
    "cursorclass": pymysql.cursors.DictCursor
}

# 告警阈值配置
ERROR_THRESHOLD = 3  # 错误数量阈值
LOAD_THRESHOLD = 2.0  # 系统负载阈值

# ========== 1. 数据库上下文管理器 (Day 12) ==========
@contextmanager
def db_cursor():
    """
    数据库游标上下文管理器
    
    自动处理连接的创建、提交、回滚和关闭
    """
    conn = None
    try:
        conn = pymysql.connect(**DB_CONFIG)
        cursor = conn.cursor()
        yield cursor
        conn.commit()  # 提交事务
    except Exception as e:
        if conn:
            conn.rollback()  # 回滚事务
        raise e
    finally:
        if conn:
            conn.close()  # 关闭连接

# ========== 2. 系统信息获取 (Day 11) ==========
def get_system_load() -> float:
    """
    获取系统 1 分钟负载平均值
    
    返回:
        负载值(float),如果获取失败返回 0.0
    """
    try:
        # 执行 uptime 命令
        result = subprocess.run(
            ["uptime"],
            capture_output=True,
            text=True,
            check=True
        )
        
        # 解析输出
        # 输出格式: ... load average: 1.50, 1.20, 1.00
        # 我们需要第一个值(1分钟负载)
        match = re.search(r"load average:\s+([\d\.]+)", result.stdout)
        if match:
            return float(match.group(1))
        else:
            print("⚠️  无法解析负载信息")
            return 0.0
            
    except subprocess.CalledProcessError:
        print("⚠️  获取系统负载失败")
        return 0.0
    except Exception as e:
        print(f"⚠️  未知错误: {e}")
        return 0.0

def get_memory_usage() -> Dict[str, float]:
    """
    获取内存使用情况
    
    返回:
        包含内存信息的字典
    """
    try:
        # 执行 free -m 命令(以 MB 为单位)
        result = subprocess.run(
            ["free", "-m"],
            capture_output=True,
            text=True,
            check=True
        )
        
        # 解析输出
        # 输出格式:
        #               total        used        free      shared  buff/cache   available
        # Mem:          8192        4096        2048         256        2048        4096
        lines = result.stdout.strip().split("\n")
        if len(lines) < 2:
            return {}
        
        mem_line = lines[1]
        numbers = [int(x) for x in mem_line.split()[1:]]
        
        if len(numbers) >= 6:
            total, used, free, shared, buff_cache, available = numbers[:6]
            return {
                "total": total,
                "used": used,
                "free": free,
                "available": available,
                "usage_percent": round((used / total) * 100, 2)
            }
        return {}
        
    except Exception as e:
        print(f"⚠️  获取内存信息失败: {e}")
        return {}

# ========== 3. 日志分析核心 (Day 10) ==========
def analyze_logs() -> Tuple[int, str, list]:
    """
    分析日志文件,提取错误信息
    
    返回:
        (错误数量, 最后一条错误信息, 所有错误列表)
    """
    error_count = 0
    latest_error = ""
    error_list = []
    
    try:
        with open(LOG_FILE, "r", encoding="utf-8") as f:
            for line in f:
                # 使用正则表达式匹配错误日志
                # 格式: [时间] ERROR: 错误信息
                error_match = re.search(r'\[([^\]]+)\]\s+ERROR:\s+(.+)', line)
                if error_match:
                    error_count += 1
                    timestamp = error_match.group(1)
                    error_msg = error_match.group(2)
                    latest_error = error_msg
                    error_list.append({
                        "timestamp": timestamp,
                        "message": error_msg
                    })
        
        return error_count, latest_error, error_list
        
    except FileNotFoundError:
        print(f"⚠️  日志文件不存在: {LOG_FILE}")
        return 0, "", []
    except Exception as e:
        print(f"⚠️  分析日志失败: {e}")
        return 0, "", []

# ========== 4. 告警推送 ==========
def send_alert(message: str) -> bool:
    """
    发送告警通知(模拟)
    
    参数:
        message: 告警消息
    
    返回:
        是否发送成功
    """
    # 在实际环境中,这里会调用钉钉、Slack 等 Webhook
    # 示例(需要配置 WEBHOOK_URL):
    # import requests
    # response = requests.post(
    #     WEBHOOK_URL,
    #     json={"msgtype": "text", "text": {"content": message}}
    # )
    # return response.status_code == 200
    
    print(f"🚨 [告警] {message}")
    return True

# ========== 5. 数据存储 ==========
def save_monitor_data(load_avg: float, error_count: int, last_error: str) -> bool:
    """
    将监控数据存入数据库
    
    参数:
        load_avg: 系统负载
        error_count: 错误数量
        last_error: 最后一条错误信息
    
    返回:
        是否保存成功
    """
    try:
        with db_cursor() as cursor:
            sql = """
                INSERT INTO monitor_logs 
                (load_avg, error_count, last_error_msg) 
                VALUES (%s, %s, %s)
            """
            cursor.execute(sql, (load_avg, error_count, last_error))
            print(f"✅ 监控数据已保存到数据库")
            return True
    except Exception as e:
        print(f"❌ 保存数据失败: {e}")
        return False

# ========== 主流程 ==========
def main():
    """
    主函数:执行完整的监控流程
    """
    print("=" * 60)
    print("🔍 开始系统监控检查...")
    print("=" * 60)
    
    # 1. 获取系统信息
    print("\n--- 系统信息采集 ---")
    load_avg = get_system_load()
    mem_info = get_memory_usage()
    
    print(f"系统负载(1分钟): {load_avg:.2f}")
    if mem_info:
        print(f"内存使用率: {mem_info['usage_percent']}%")
    
    # 2. 分析日志
    print("\n--- 日志分析 ---")
    error_count, last_error, error_list = analyze_logs()
    print(f"发现错误数量: {error_count}")
    if error_list:
        print("错误详情:")
        for i, err in enumerate(error_list[:3], 1):  # 只显示前 3 条
            print(f"  {i}. [{err['timestamp']}] {err['message']}")
    
    # 3. 保存数据
    print("\n--- 数据存储 ---")
    save_monitor_data(load_avg, error_count, last_error)
    
    # 4. 告警判断
    print("\n--- 告警检查 ---")
    alert_triggered = False
    
    if error_count >= ERROR_THRESHOLD:
        alert_msg = f"错误数量超过阈值!当前: {error_count}, 阈值: {ERROR_THRESHOLD}"
        send_alert(alert_msg)
        alert_triggered = True
    
    if load_avg >= LOAD_THRESHOLD:
        alert_msg = f"系统负载过高!当前: {load_avg:.2f}, 阈值: {LOAD_THRESHOLD}"
        send_alert(alert_msg)
        alert_triggered = True
    
    if not alert_triggered:
        print("✅ 系统状态正常,无需告警")
    
    print("\n" + "=" * 60)
    print("✅ 监控检查完成")
    print("=" * 60)

if __name__ == "__main__":
    main()
代码结构说明
  • 配置区:集中管理所有配置,便于修改
  • 数据库管理:使用上下文管理器确保连接正确关闭
  • 系统监控:使用 subprocess 获取系统信息
  • 日志分析:使用正则表达式提取错误信息
  • 数据存储:使用参数化查询防止 SQL 注入
  • 告警机制:根据阈值判断是否需要告警
运行脚本
python 13_ops_toolbox.py
验证步骤
  1. 确保 MySQL 服务正在运行
  2. 确保 app_error.log 文件存在
  3. 运行脚本,检查输出是否正确
  4. 在数据库中验证数据是否插入成功:
    SELECT * FROM monitor_logs ORDER BY created_at DESC LIMIT 5;
    
  5. 测试告警功能:修改日志文件增加错误数量,观察是否触发告警
常见错误
  • FileNotFoundError: [Errno 2] No such file or directory: 'app_error.log' - 日志文件不存在,需要先创建
  • pymysql.err.OperationalError: (2003, "Can't connect to MySQL server") - 数据库服务未启动
  • subprocess.CalledProcessError - 系统命令执行失败,检查命令是否存在

拓展任务 (30 mins)

挑战 1:定时执行

任务:研究如何使用 Linux 的 crontab 让脚本定时执行。步骤
  1. 编辑 crontab:crontab -e
  2. 添加定时任务:* * * * * /path/to/python3 /path/to/13_ops_toolbox.py >> /tmp/ops.log 2>&1
  3. 验证:等待 1 分钟后检查日志文件
提示:不要在 Python 里写 while True + sleep,使用 crontab 更可靠。

挑战 2:真实告警

任务:配置真实的钉钉或 Slack Webhook,实现真正的告警推送。提示
  • 钉钉:创建自定义机器人,获取 Webhook URL
  • Slack:创建 Incoming Webhook,获取 URL
  • 使用 requests 库发送 POST 请求

今日产出物

  • 13_ops_toolbox.py - 完整的运维自动化工具箱
  • app_error.log - 模拟日志文件
  • monitor_logs 表 - 数据库监控日志表

参考代码

查看参考代码

在 GitHub 查看完整的示例代码

在线运行

使用在线编辑器测试代码

实际应用场景

监控工具在生产环境中的应用

  • 日志监控:实时监控应用日志,及时发现错误
  • 系统监控:定期检查系统资源使用情况
  • 告警通知:异常情况及时通知运维人员
  • 数据归档:将监控数据存入数据库,便于后续分析
  • 自动化运维:减少人工巡检,提高效率

项目设计要点

  • 模块化设计:将功能拆分成独立函数,便于测试和维护
  • 配置集中管理:所有配置集中在顶部,便于修改
  • 错误处理:完善的异常处理,确保脚本稳定运行
  • 可扩展性:预留扩展接口,便于后续添加功能
  • 文档完善:代码注释清晰,便于团队协作
与 Day 14 的关联:今天完成的综合项目,明天会进行复盘和总结,回顾第二周学到的所有知识点,并为第三周的 FastAPI 学习做准备。

上一天: MySQL操作

Day 12 | Python 操作 MySQL

下一天: 第二阶段复盘

Day 14 | 第二阶段复盘