Skip to main content
今日目标:Shell 虽然方便,但逻辑一复杂就成了”遗屎山”。今天学习 Python 的 subprocess 模块,学会如何在 Python 代码中优雅地执行系统命令,把 Shell 脚本重构为 Python 脚本。今天不只是写代码,而是要彻底理解 “为什么 subprocess 比 os.system 更安全”“如何正确捕获命令输出” 以及 “如何避免 Shell 注入攻击”

学习内容 (30 mins)

在开始写代码前,先搞懂这些核心概念,否则后面的代码你会看得云里雾里。
什么是 os.system?os.system() 是 Python 早期用于执行系统命令的方法,它会直接调用 Shell 执行命令。为什么不再推荐使用?
  • 无法获取输出
    • os.system() 只能返回状态码(0 表示成功,非 0 表示失败)
    • 无法获取命令的标准输出(stdout)和错误输出(stderr)
    • 输出直接打印到终端,无法在代码中处理
  • 安全隐患
    • 如果命令包含用户输入,容易被 Shell 注入攻击
    • 示例:os.system(f"rm {user_input}") 如果 user_input"file; rm -rf /" 就会执行危险操作
  • 跨平台问题
    • 不同操作系统的命令可能不同
    • Windows 和 Linux 的命令格式差异很大
替代方案:使用 subprocess 模块,更安全、更灵活。
什么是 subprocess?subprocess 是 Python 标准库中用于创建子进程、执行系统命令的模块,是执行外部命令的官方推荐方式。为什么 subprocess 更好?
  • 可以捕获输出:可以获取命令的标准输出和错误输出
  • 更安全:使用列表传参,避免 Shell 注入
  • 更灵活:可以控制输入、输出、错误处理
  • 跨平台:在不同操作系统上行为一致
核心方法
  • subprocess.run()(推荐):
    • 执行命令并等待完成
    • capture_output=True:捕获 stdout 和 stderr
    • text=True:自动将字节流解码为字符串
    • check=True:命令失败时自动抛出异常
    • timeout=秒数:设置超时时间
  • subprocess.Popen()(高级):
    • 用于需要实时处理输出的场景
    • 可以逐行读取输出,不等待命令完成
    • 适合长时间运行的命令
安全规范
  • 推荐:使用列表传参 subprocess.run(["ls", "-l"])
  • 禁止:使用字符串传参 subprocess.run("ls -l", shell=True)(除非必要)
  • 原因:列表形式避免了 Shell 解析,防止注入攻击

代码任务 (90 mins)

1

环境准备

确保虚拟环境已激活:
# 确保虚拟环境已激活(提示符前有 (.venv))
source .venv/bin/activate

# subprocess 是 Python 标准库,无需安装
2

任务 A:基础命令执行

编写 11_run_cmd.py,演示 subprocess 的基本用法。
#!/usr/bin/env python3
"""
Day 11 - subprocess 基础示例
演示如何安全地执行系统命令
"""

import subprocess

# ========== 1. 基本命令执行 ==========
print("=== 基本命令执行 ===")

# 定义要执行的命令(使用列表形式,更安全)
cmd = ["ls", "-l", "/tmp"]

try:
    # 执行命令
    # capture_output=True: 捕获标准输出和错误输出
    # text=True: 将字节流自动解码为字符串
    # check=True: 如果命令失败(返回非 0),自动抛出异常
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        check=True
    )
    
    print("命令执行成功!")
    print(f"返回码: {result.returncode}")  # 0 表示成功
    print(f"标准输出:\n{result.stdout}")
    
except subprocess.CalledProcessError as e:
    # 命令执行失败
    print(f"❌ 命令执行失败!")
    print(f"返回码: {e.returncode}")
    print(f"错误信息: {e.stderr}")

# ========== 2. 不捕获输出的命令 ==========
print("\n=== 不捕获输出(直接打印到终端)===")

# 如果不需要处理输出,可以不捕获
result = subprocess.run(["echo", "Hello from Python!"])
# 输出会直接打印到终端

# ========== 3. 带超时的命令 ==========
print("\n=== 带超时的命令 ===")

try:
    # timeout=5: 5 秒后如果命令未完成,会抛出 TimeoutExpired 异常
    result = subprocess.run(
        ["sleep", "2"],  # 模拟一个需要 2 秒的命令
        timeout=5,
        capture_output=True,
        text=True
    )
    print("✅ 命令在超时前完成")
except subprocess.TimeoutExpired:
    print("❌ 命令执行超时")

# ========== 4. 获取命令返回码 ==========
print("\n=== 获取命令返回码 ===")

# 不设置 check=True,即使失败也不会抛出异常
result = subprocess.run(["ls", "/nonexistent"], capture_output=True, text=True)

if result.returncode == 0:
    print("✅ 命令执行成功")
else:
    print(f"❌ 命令执行失败,返回码: {result.returncode}")
    print(f"错误信息: {result.stderr}")

# ========== 5. 执行 Shell 命令(谨慎使用)==========
print("\n=== 执行 Shell 命令(谨慎使用)===")

# 只有在确实需要 Shell 特性(如管道、重定向)时才使用 shell=True
# 注意:使用 shell=True 时,应该传递字符串而不是列表
result = subprocess.run(
    "echo $HOME | wc -c",  # 使用 Shell 管道
    shell=True,
    capture_output=True,
    text=True
)
print(f"HOME 路径长度: {result.stdout.strip()}")
代码解释
  • 列表传参["ls", "-l"]"ls -l" 更安全
  • capture_output=True:捕获输出,可以在代码中处理
  • text=True:自动解码字节流为字符串
  • check=True:失败时自动抛出异常
  • timeout:防止命令无限执行
运行脚本
python 11_run_cmd.py
验证步骤
  1. 脚本能正常运行,无语法错误
  2. 观察命令输出是否正确
  3. 测试错误处理(如访问不存在的目录)
常见错误
  • FileNotFoundError: [Errno 2] No such file or directory - 命令不存在,检查命令路径
  • subprocess.TimeoutExpired - 命令执行超时,增加 timeout 或优化命令
  • subprocess.CalledProcessError - 命令执行失败,检查命令和参数
3

任务 B:磁盘监控脚本重构

将 Day 05 的 Shell 磁盘监控脚本重构为 Python 版本 11_disk_monitor.py
#!/usr/bin/env python3
"""
Day 11 - 磁盘监控脚本(Python 版本)
将 Shell 脚本重构为 Python,演示 subprocess 的实际应用
"""

import subprocess
import re

def get_disk_usage(mount_point: str = "/") -> int:
    """
    获取指定挂载点的磁盘使用率
    
    参数:
        mount_point: 挂载点路径(默认 "/")
    
    返回:
        磁盘使用率(百分比,0-100)
    """
    try:
        # 执行 df -h 命令
        # -h: 人类可读格式(KB, MB, GB)
        result = subprocess.run(
            ["df", "-h", mount_point],
            capture_output=True,
            text=True,
            check=True
        )
        
        # 解析输出
        # 输出格式示例:
        # Filesystem      Size  Used Avail Use% Mounted on
        # /dev/sda1        50G   25G   25G  50% /
        lines = result.stdout.strip().split("\n")
        
        if len(lines) < 2:
            raise ValueError("无法解析 df 命令输出")
        
        # 第二行是数据行(第一行是表头)
        data_line = lines[1]
        
        # 分割行,获取各个字段
        # 使用正则表达式提取使用率(去掉 % 符号)
        match = re.search(r'(\d+)%', data_line)
        if match:
            usage_percent = int(match.group(1))
            return usage_percent
        else:
            raise ValueError("无法从输出中提取使用率")
            
    except subprocess.CalledProcessError as e:
        print(f"❌ 执行 df 命令失败: {e}")
        return -1
    except Exception as e:
        print(f"❌ 解析磁盘使用率失败: {e}")
        return -1

def check_disk_health(threshold: int = 80) -> bool:
    """
    检查磁盘健康状态
    
    参数:
        threshold: 告警阈值(默认 80%)
    
    返回:
        True 表示健康,False 表示需要告警
    """
    usage = get_disk_usage()
    
    if usage < 0:
        print("❌ 无法获取磁盘使用率")
        return False
    
    print(f"当前磁盘使用率: {usage}%")
    
    if usage > threshold:
        print(f"🚨 告警: 磁盘使用率超过 {threshold}%!")
        return False
    else:
        print("✅ 磁盘空间充足")
        return True

def get_memory_usage() -> dict:
    """
    获取内存使用情况
    
    返回:
        包含内存信息的字典
    """
    try:
        # 执行 free -m 命令(以 MB 为单位)
        result = subprocess.run(
            ["free", "-m"],
            capture_output=True,
            text=True,
            check=True
        )
        
        # 解析输出
        # 输出格式示例:
        #               total        used        free      shared  buff/cache   available
        # Mem:          8192        4096        2048         256        2048        4096
        lines = result.stdout.strip().split("\n")
        mem_line = lines[1]  # 第二行是内存信息
        
        # 提取数字
        numbers = [int(x) for x in mem_line.split()[1:]]
        total, used, free, shared, buff_cache, available = numbers
        
        return {
            "total": total,
            "used": used,
            "free": free,
            "available": available,
            "usage_percent": round((used / total) * 100, 2)
        }
        
    except Exception as e:
        print(f"❌ 获取内存信息失败: {e}")
        return {}

if __name__ == "__main__":
    print("=" * 50)
    print("系统资源监控")
    print("=" * 50)
    
    # 检查磁盘
    print("\n--- 磁盘检查 ---")
    check_disk_health(threshold=80)
    
    # 检查内存
    print("\n--- 内存检查 ---")
    mem_info = get_memory_usage()
    if mem_info:
        print(f"总内存: {mem_info['total']} MB")
        print(f"已使用: {mem_info['used']} MB")
        print(f"可用: {mem_info['available']} MB")
        print(f"使用率: {mem_info['usage_percent']}%")
代码解释
  • 命令执行:使用 subprocess.run() 执行系统命令
  • 输出解析:使用字符串处理和正则表达式解析命令输出
  • 错误处理:捕获异常,优雅处理错误
  • 函数封装:将功能封装成函数,便于复用
运行脚本
python 11_disk_monitor.py
验证步骤
  1. 脚本能正常运行,无语法错误
  2. 检查磁盘使用率输出是否正确
  3. 检查内存使用率输出是否正确
  4. 测试告警功能(如果使用率超过阈值)

拓展任务 (30 mins)

挑战 1:实时输出处理

任务:使用 subprocess.Popen 实现实时打印 ping 命令的输出。提示
process = subprocess.Popen(["ping", "-c", "5", "google.com"], 
                           stdout=subprocess.PIPE, 
                           text=True)
for line in process.stdout:
    print(line, end='')

挑战 2:命令管道

任务:在 Python 中实现 Shell 管道功能,如 ps aux | grep python提示:使用两个 Popen 对象,将第一个的输出连接到第二个的输入。

今日产出物

  • 11_run_cmd.py - subprocess 基础示例
  • 11_disk_monitor.py - 磁盘监控脚本(Python 版本)

参考代码

查看参考代码

在 GitHub 查看完整的示例代码

在线运行

使用在线编辑器测试代码

实际应用场景

subprocess 在运维中的应用

  • 系统监控:执行系统命令获取资源使用情况(CPU、内存、磁盘)
  • 服务管理:启动、停止、重启服务
  • 文件操作:批量处理文件、备份数据
  • 日志收集:执行命令收集日志信息
  • 自动化部署:执行部署脚本、运行测试

最佳实践

  • 使用列表传参:避免 Shell 注入攻击
  • 设置超时:防止命令无限执行
  • 捕获异常:优雅处理命令失败
  • 解析输出:使用正则表达式或字符串处理解析命令输出
  • 函数封装:将命令执行封装成函数,便于复用和测试
与 Day 12 的关联:今天学习的 subprocess,明天会学习如何用 Python 操作 MySQL 数据库,将系统监控和数据库存储结合起来。

上一天: 文件处理

Day 10 | 文件处理与正则实战

下一天: 数据库操作

Day 12 | Python 操作 MySQL