#!/usr/bin/env python3
"""
Day 13 - 运维自动化工具箱
融合文件处理、正则、subprocess、数据库操作的综合实战项目
功能:
1. 日志分析:从日志文件中提取错误信息
2. 系统监控:获取系统负载和资源使用情况
3. 数据存储:将监控数据存入数据库
4. 告警通知:当系统异常时发送告警
"""
import re
import time
import subprocess
import pymysql
from contextlib import contextmanager
from typing import Tuple, Dict, Optional
from datetime import datetime
# ========== 配置区 ==========
LOG_FILE = "app_error.log"
DB_CONFIG = {
"host": "localhost",
"user": "root",
"password": "root",
"database": "infra_db",
"charset": "utf8mb4",
"cursorclass": pymysql.cursors.DictCursor
}
# 告警阈值配置
ERROR_THRESHOLD = 3 # 错误数量阈值
LOAD_THRESHOLD = 2.0 # 系统负载阈值
# ========== 1. 数据库上下文管理器 (Day 12) ==========
@contextmanager
def db_cursor():
"""
数据库游标上下文管理器
自动处理连接的创建、提交、回滚和关闭
"""
conn = None
try:
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
yield cursor
conn.commit() # 提交事务
except Exception as e:
if conn:
conn.rollback() # 回滚事务
raise e
finally:
if conn:
conn.close() # 关闭连接
# ========== 2. 系统信息获取 (Day 11) ==========
def get_system_load() -> float:
"""
获取系统 1 分钟负载平均值
返回:
负载值(float),如果获取失败返回 0.0
"""
try:
# 执行 uptime 命令
result = subprocess.run(
["uptime"],
capture_output=True,
text=True,
check=True
)
# 解析输出
# 输出格式: ... load average: 1.50, 1.20, 1.00
# 我们需要第一个值(1分钟负载)
match = re.search(r"load average:\s+([\d\.]+)", result.stdout)
if match:
return float(match.group(1))
else:
print("⚠️ 无法解析负载信息")
return 0.0
except subprocess.CalledProcessError:
print("⚠️ 获取系统负载失败")
return 0.0
except Exception as e:
print(f"⚠️ 未知错误: {e}")
return 0.0
def get_memory_usage() -> Dict[str, float]:
"""
获取内存使用情况
返回:
包含内存信息的字典
"""
try:
# 执行 free -m 命令(以 MB 为单位)
result = subprocess.run(
["free", "-m"],
capture_output=True,
text=True,
check=True
)
# 解析输出
# 输出格式:
# total used free shared buff/cache available
# Mem: 8192 4096 2048 256 2048 4096
lines = result.stdout.strip().split("\n")
if len(lines) < 2:
return {}
mem_line = lines[1]
numbers = [int(x) for x in mem_line.split()[1:]]
if len(numbers) >= 6:
total, used, free, shared, buff_cache, available = numbers[:6]
return {
"total": total,
"used": used,
"free": free,
"available": available,
"usage_percent": round((used / total) * 100, 2)
}
return {}
except Exception as e:
print(f"⚠️ 获取内存信息失败: {e}")
return {}
# ========== 3. 日志分析核心 (Day 10) ==========
def analyze_logs() -> Tuple[int, str, list]:
"""
分析日志文件,提取错误信息
返回:
(错误数量, 最后一条错误信息, 所有错误列表)
"""
error_count = 0
latest_error = ""
error_list = []
try:
with open(LOG_FILE, "r", encoding="utf-8") as f:
for line in f:
# 使用正则表达式匹配错误日志
# 格式: [时间] ERROR: 错误信息
error_match = re.search(r'\[([^\]]+)\]\s+ERROR:\s+(.+)', line)
if error_match:
error_count += 1
timestamp = error_match.group(1)
error_msg = error_match.group(2)
latest_error = error_msg
error_list.append({
"timestamp": timestamp,
"message": error_msg
})
return error_count, latest_error, error_list
except FileNotFoundError:
print(f"⚠️ 日志文件不存在: {LOG_FILE}")
return 0, "", []
except Exception as e:
print(f"⚠️ 分析日志失败: {e}")
return 0, "", []
# ========== 4. 告警推送 ==========
def send_alert(message: str) -> bool:
"""
发送告警通知(模拟)
参数:
message: 告警消息
返回:
是否发送成功
"""
# 在实际环境中,这里会调用钉钉、Slack 等 Webhook
# 示例(需要配置 WEBHOOK_URL):
# import requests
# response = requests.post(
# WEBHOOK_URL,
# json={"msgtype": "text", "text": {"content": message}}
# )
# return response.status_code == 200
print(f"🚨 [告警] {message}")
return True
# ========== 5. 数据存储 ==========
def save_monitor_data(load_avg: float, error_count: int, last_error: str) -> bool:
"""
将监控数据存入数据库
参数:
load_avg: 系统负载
error_count: 错误数量
last_error: 最后一条错误信息
返回:
是否保存成功
"""
try:
with db_cursor() as cursor:
sql = """
INSERT INTO monitor_logs
(load_avg, error_count, last_error_msg)
VALUES (%s, %s, %s)
"""
cursor.execute(sql, (load_avg, error_count, last_error))
print(f"✅ 监控数据已保存到数据库")
return True
except Exception as e:
print(f"❌ 保存数据失败: {e}")
return False
# ========== 主流程 ==========
def main():
"""
主函数:执行完整的监控流程
"""
print("=" * 60)
print("🔍 开始系统监控检查...")
print("=" * 60)
# 1. 获取系统信息
print("\n--- 系统信息采集 ---")
load_avg = get_system_load()
mem_info = get_memory_usage()
print(f"系统负载(1分钟): {load_avg:.2f}")
if mem_info:
print(f"内存使用率: {mem_info['usage_percent']}%")
# 2. 分析日志
print("\n--- 日志分析 ---")
error_count, last_error, error_list = analyze_logs()
print(f"发现错误数量: {error_count}")
if error_list:
print("错误详情:")
for i, err in enumerate(error_list[:3], 1): # 只显示前 3 条
print(f" {i}. [{err['timestamp']}] {err['message']}")
# 3. 保存数据
print("\n--- 数据存储 ---")
save_monitor_data(load_avg, error_count, last_error)
# 4. 告警判断
print("\n--- 告警检查 ---")
alert_triggered = False
if error_count >= ERROR_THRESHOLD:
alert_msg = f"错误数量超过阈值!当前: {error_count}, 阈值: {ERROR_THRESHOLD}"
send_alert(alert_msg)
alert_triggered = True
if load_avg >= LOAD_THRESHOLD:
alert_msg = f"系统负载过高!当前: {load_avg:.2f}, 阈值: {LOAD_THRESHOLD}"
send_alert(alert_msg)
alert_triggered = True
if not alert_triggered:
print("✅ 系统状态正常,无需告警")
print("\n" + "=" * 60)
print("✅ 监控检查完成")
print("=" * 60)
if __name__ == "__main__":
main()