subprocess模块

直接使用的函数

import subprocess
import time
from typing import Union, List, Tuple

def run_cmd(
    cmd: Union[str, List[str]], 
    timeout: int = 30,
    shell: bool = False,
    cwd: str = None
) -> Tuple[int, str, str]:
    """
    执行命令并返回结果
    
    Args:
        cmd: 命令或命令列表
        timeout: 超时时间(秒)
        shell: 是否使用shell
        cwd: 工作目录
    
    Returns:
        (返回码, 标准输出, 标准错误)
    
    Examples:
        >>> code, out, err = run_cmd(['ls', '-l'])
        >>> code, out, err = run_cmd('ls -l | grep py', shell=True)
    """
    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=shell,
            cwd=cwd
        )
        return result.returncode, result.stdout, result.stderr
    except subprocess.TimeoutExpired:
        return -1, "", f"命令执行超时({timeout}秒)"
    except FileNotFoundError:
        return -2, "", f"命令未找到: {cmd}"
    except Exception as e:
        return -3, "", str(e)


# 使用示例
if __name__ == '__main__':
    # 执行命令
    code, out, err = run_cmd(['ls', '-la'])
    if code == 0:
        print(out)
    else:
        print(f"错误: {err}")
    
    # 带超时
    code, out, err = run_cmd(['ping', 'google.com'], timeout=3)
    
    # 使用 shell
    code, out, err = run_cmd('echo "hello" | wc -c', shell=True)

subprocess 是 Python 标准库中用于创建和管理子进程的模块。它允许你运行系统命令、启动另一个 Python 脚本、调用外部程序,并获取它们的输出、错误信息和返回码。
核心功能对比

函数/类    特点  适用场景
subprocess.run()    Python 3.5+ 推荐,阻塞等待完成   大多数简单场景
subprocess.Popen()  更底层,非阻塞,可实时交互   需要持续通信或控制进程
subprocess.call()   旧版本,已过时 不建议新代码使用
subprocess.check_output()   仅获取输出   简单获取标准输出

基本使用示例

  1. subprocess.run() - 推荐方式
import subprocess

# 运行命令,等待完成
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout)      # 标准输出
print(result.stderr)      # 错误输出
print(result.returncode)  # 返回码(0表示成功)

# 检查执行是否成功(非0返回码会抛出异常)
subprocess.run(['ls', '/nonexistent'], check=True)

# 使用 shell 模式(注意安全风险)
subprocess.run('echo "Hello" | grep H', shell=True)
  1. subprocess.Popen() - 高级控制
import subprocess

# 启动进程但不等待
proc = subprocess.Popen(['ping', '-c', '4', 'google.com'], 
                        stdout=subprocess.PIPE, 
                        stderr=subprocess.PIPE,
                        text=True)

# 等待完成并获取输出
stdout, stderr = proc.communicate()
print(f"返回码: {proc.returncode}")

# 实时读取输出
proc = subprocess.Popen(['ping', '-c', '4', 'google.com'], 
                        stdout=subprocess.PIPE, 
                        text=True)

while True:
    output = proc.stdout.readline()
    if output == '' and proc.poll() is not None:
        break
    if output:
        print(output.strip())

# 向进程输入数据
proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
stdout, _ = proc.communicate(input='Hello from Python!')
print(stdout)  # 输出: Hello from Python!
  1. 管道连接多个命令
# 等价于: ls -l | grep ".py" | wc -l
p1 = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', '.py'], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(['wc', '-l'], stdin=p2.stdout, stdout=subprocess.PIPE)

p1.stdout.close()
p2.stdout.close()
result = p3.communicate()[0]
print(f"找到 {result.decode().strip()} 个 Python 文件")

常用参数说明

参数  作用
capture_output=True 捕获 stdout 和 stderr
text=True   返回字符串而非 bytes
check=True  命令失败时抛出 CalledProcessError
shell=True  通过 shell 执行(有安全风险)
timeout=5   超时限制(秒)
env={'KEY':'value'} 设置环境变量
cwd='/path' 设置工作目录

错误处理

import subprocess

try:
    result = subprocess.run(['false'], check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
    print(f"命令失败,返回码: {e.returncode}")
    print(f"错误输出: {e.stderr}")
except subprocess.TimeoutExpired as e:
    print(f"命令超时: {e.timeout}秒")
except FileNotFoundError as e:
    print(f"命令未找到: {e}")

安全注意事项

避免 shell=True 当命令参数来自用户输入时,存在命令注入风险:

python
# 危险!用户输入 "; rm -rf /" 会执行删除操作
user_input = input("输入文件名: ")
subprocess.run(f'ls {user_input}', shell=True)

# 安全:使用列表形式
subprocess.run(['ls', user_input])
必须使用 shell=True 时,确保输入是可信的或经过严格验证。

实际应用示例

import subprocess
import sys

def run_command(cmd, timeout=30):
    """安全运行命令并返回输出"""
    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            check=False
        )
        return {
            'success': result.returncode == 0,
            'stdout': result.stdout,
            'stderr': result.stderr,
            'returncode': result.returncode
        }
    except subprocess.TimeoutExpired:
        return {'success': False, 'error': f'命令执行超过{timeout}秒'}

# 使用示例
result = run_command(['ping', '-c', '1', '8.8.8.8'])
if result['success']:
    print(result['stdout'])
else:
    print(f"错误: {result['stderr']}")

选择 subprocess.run() 处理绝大多数任务,需要实时交互或长时间运行的任务使用 Popen。始终优先使用列表形式的命令参数,避免 shell=True 除非绝对必要。

完整封装的命令执行函数
基础版封装

import subprocess
import sys
from typing import List, Optional, Union, Tuple, Dict, Any

def run_cmd_simple(
    cmd: Union[str, List[str]],
    timeout: int = 30,
    check: bool = False,
    shell: bool = False
) -> Tuple[int, str, str]:
    """
    简单封装的命令执行函数
    
    Args:
        cmd: 命令(字符串或列表)
        timeout: 超时时间(秒)
        check: 是否检查返回码
        shell: 是否使用shell
    
    Returns:
        (returncode, stdout, stderr)
    
    Examples:
        >>> code, out, err = run_cmd_simple(['ls', '-l'])
        >>> code, out, err = run_cmd_simple('ls -l', shell=True)
    """
    try:
        result = subprocess.run(
            cmd,
            shell=True,
            capture_output=True,
            text=True,
            timeout=timeout,
            check=check,
            shell=shell
        )
        return result.returncode, result.stdout, result.stderr
    except subprocess.TimeoutExpired:
        return -1, "", f"Command timeout after {timeout}s"
    except subprocess.CalledProcessError as e:
        return e.returncode, e.stdout, e.stderr
    except FileNotFoundError:
        return -2, "", f"Command not found: {cmd}"

企业级封装

import subprocess
import os
import signal
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List, Union
from dataclasses import dataclass, field
from enum import Enum

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CommandStatus(Enum):
    """命令执行状态"""
    SUCCESS = "success"
    FAILED = "failed"
    TIMEOUT = "timeout"
    NOT_FOUND = "not_found"
    INTERRUPTED = "interrupted"

@dataclass
class CommandResult:
    """命令执行结果"""
    status: CommandStatus
    returncode: int
    stdout: str
    stderr: str
    command: str
    execution_time: float
    start_time: datetime = field(default_factory=datetime.now)
    
    @property
    def success(self) -> bool:
        return self.status == CommandStatus.SUCCESS
    
    @property
    def output(self) -> str:
        """优先返回 stdout,如果为空则返回 stderr"""
        return self.stdout if self.stdout else self.stderr
    
    def __str__(self) -> str:
        return f"CommandResult(status={self.status.value}, returncode={self.returncode}, time={self.execution_time:.2f}s)"


class CommandExecutor:
    """
    企业级命令执行器
    
    特性:
    - 自动重试
    - 超时控制
    - 日志记录
    - 环境变量管理
    - 工作目录切换
    - 实时输出回调
    - 信号处理
    """
    
    def __init__(
        self,
        default_timeout: int = 30,
        default_retry: int = 0,
        default_retry_delay: float = 1.0,
        log_output: bool = True,
        raise_on_error: bool = False
    ):
        """
        初始化执行器
        
        Args:
            default_timeout: 默认超时时间(秒)
            default_retry: 默认重试次数
            default_retry_delay: 默认重试延迟(秒)
            log_output: 是否记录输出到日志
            raise_on_error: 错误时是否抛出异常
        """
        self.default_timeout = default_timeout
        self.default_retry = default_retry
        self.default_retry_delay = default_retry_delay
        self.log_output = log_output
        self.raise_on_error = raise_on_error
        
    def execute(
        self,
        cmd: Union[str, List[str]],
        timeout: Optional[int] = None,
        retry: Optional[int] = None,
        retry_delay: Optional[float] = None,
        shell: bool = False,
        cwd: Optional[str] = None,
        env: Optional[Dict[str, str]] = None,
        input_data: Optional[str] = None,
        realtime_output: bool = False,
        output_callback: Optional[callable] = None,
        check: bool = False,
        **kwargs
    ) -> CommandResult:
        """
        执行命令
        
        Args:
            cmd: 要执行的命令
            timeout: 超时时间(秒),None则使用默认值
            retry: 重试次数,None则使用默认值
            retry_delay: 重试延迟,None则使用默认值
            shell: 是否使用shell执行
            cwd: 工作目录
            env: 环境变量
            input_data: 发送到stdin的数据
            realtime_output: 是否实时输出(会阻塞直到命令完成)
            output_callback: 实时输出的回调函数
            check: 是否检查返回码(失败时抛出异常)
            **kwargs: 传递给Popen的其他参数
        
        Returns:
            CommandResult对象
        
        Raises:
            subprocess.CalledProcessError: 当check=True且命令失败时
            subprocess.TimeoutExpired: 当超时发生时
        """
        timeout = timeout or self.default_timeout
        retry = retry if retry is not None else self.default_retry
        retry_delay = retry_delay if retry_delay is not None else self.default_retry_delay
        
        # 准备环境变量
        final_env = None
        if env:
            final_env = os.environ.copy()
            final_env.update(env)
        
        # 命令字符串用于日志
        cmd_str = cmd if isinstance(cmd, str) else ' '.join(cmd)
        
        # 重试逻辑
        for attempt in range(retry + 1):
            try:
                if realtime_output:
                    result = self._execute_realtime(
                        cmd, timeout, shell, cwd, final_env, 
                        input_data, output_callback, **kwargs
                    )
                else:
                    result = self._execute_blocking(
                        cmd, timeout, shell, cwd, final_env, 
                        input_data, **kwargs
                    )
                
                # 记录日志
                if self.log_output:
                    self._log_result(cmd_str, result)
                
                # 检查返回码
                if check and not result.success:
                    raise subprocess.CalledProcessError(
                        result.returncode, cmd_str, result.stdout, result.stderr
                    )
                
                return result
                
            except subprocess.TimeoutExpired as e:
                logger.warning(f"Command timeout (attempt {attempt + 1}/{retry + 1}): {cmd_str}")
                if attempt == retry:
                    return CommandResult(
                        status=CommandStatus.TIMEOUT,
                        returncode=-1,
                        stdout="",
                        stderr=str(e),
                        command=cmd_str,
                        execution_time=timeout
                    )
                import time
                time.sleep(retry_delay)
                
            except FileNotFoundError as e:
                logger.error(f"Command not found: {cmd_str}")
                return CommandResult(
                    status=CommandStatus.NOT_FOUND,
                    returncode=-2,
                    stdout="",
                    stderr=str(e),
                    command=cmd_str,
                    execution_time=0
                )
                
            except KeyboardInterrupt:
                logger.warning(f"Command interrupted: {cmd_str}")
                return CommandResult(
                    status=CommandStatus.INTERRUPTED,
                    returncode=-3,
                    stdout="",
                    stderr="Interrupted by user",
                    command=cmd_str,
                    execution_time=0
                )
        
        # 不应该到达这里
        return CommandResult(
            status=CommandStatus.FAILED,
            returncode=-999,
            stdout="",
            stderr="Unknown error",
            command=cmd_str,
            execution_time=0
        )
    
    def _execute_blocking(
        self,
        cmd: Union[str, List[str]],
        timeout: int,
        shell: bool,
        cwd: Optional[str],
        env: Optional[Dict[str, str]],
        input_data: Optional[str],
        **kwargs
    ) -> CommandResult:
        """阻塞执行命令"""
        import time
        
        start_time = time.time()
        
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=shell,
            cwd=cwd,
            env=env,
            input=input_data,
            **kwargs
        )
        
        execution_time = time.time() - start_time
        
        return CommandResult(
            status=CommandStatus.SUCCESS if result.returncode == 0 else CommandStatus.FAILED,
            returncode=result.returncode,
            stdout=result.stdout,
            stderr=result.stderr,
            command=cmd if isinstance(cmd, str) else ' '.join(cmd),
            execution_time=execution_time
        )
    
    def _execute_realtime(
        self,
        cmd: Union[str, List[str]],
        timeout: int,
        shell: bool,
        cwd: Optional[str],
        env: Optional[Dict[str, str]],
        input_data: Optional[str],
        output_callback: Optional[callable],
        **kwargs
    ) -> CommandResult:
        """实时输出执行命令"""
        import time
        import threading
        
        start_time = time.time()
        stdout_lines = []
        stderr_lines = []
        
        # 启动进程
        proc = subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE if input_data else None,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            shell=shell,
            cwd=cwd,
            env=env,
            **kwargs
        )
        
        def read_output(pipe, lines_list, callback, is_stderr=False):
            for line in iter(pipe.readline, ''):
                if line:
                    lines_list.append(line)
                    if callback:
                        callback(line, is_stderr)
            pipe.close()
        
        # 启动输出读取线程
        stdout_thread = threading.Thread(
            target=read_output, 
            args=(proc.stdout, stdout_lines, output_callback, False)
        )
        stderr_thread = threading.Thread(
            target=read_output,
            args=(proc.stderr, stderr_lines, output_callback, True)
        )
        
        stdout_thread.start()
        stderr_thread.start()
        
        # 发送输入数据
        if input_data:
            proc.stdin.write(input_data)
            proc.stdin.close()
        
        # 等待进程结束(带超时)
        try:
            proc.wait(timeout=timeout)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait()
            raise
        
        # 等待线程结束
        stdout_thread.join()
        stderr_thread.join()
        
        execution_time = time.time() - start_time
        
        return CommandResult(
            status=CommandStatus.SUCCESS if proc.returncode == 0 else CommandStatus.FAILED,
            returncode=proc.returncode,
            stdout=''.join(stdout_lines),
            stderr=''.join(stderr_lines),
            command=cmd if isinstance(cmd, str) else ' '.join(cmd),
            execution_time=execution_time
        )
    
    def _log_result(self, cmd: str, result: CommandResult):
        """记录执行结果到日志"""
        if result.success:
            logger.info(f"Command succeeded: {cmd} (time={result.execution_time:.2f}s)")
        else:
            logger.error(f"Command failed: {cmd} (code={result.returncode}, time={result.execution_time:.2f}s)")
        
        if self.log_output and result.stdout:
            logger.debug(f"STDOUT: {result.stdout[:500]}")
        if self.log_output and result.stderr:
            logger.warning(f"STDERR: {result.stderr[:500]}")
    
    def execute_async(
        self,
        cmd: Union[str, List[str]],
        callback: Optional[callable] = None,
        **kwargs
    ) -> subprocess.Popen:
        """
        异步执行命令(非阻塞)
        
        Args:
            cmd: 命令
            callback: 完成时的回调函数
            **kwargs: 传递给Popen的参数
        
        Returns:
            Popen对象,可用于后续通信
        """
        proc = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            **kwargs
        )
        
        if callback:
            def wait_and_callback():
                stdout, stderr = proc.communicate()
                callback(proc.returncode, stdout, stderr)
            
            import threading
            thread = threading.Thread(target=wait_and_callback)
            thread.start()
        
        return proc
    
    def get_output(
        self,
        cmd: Union[str, List[str]],
        **kwargs
    ) -> str:
        """
        获取命令输出(仅stdout)
        
        Returns:
            标准输出字符串,失败时返回空字符串
        """
        result = self.execute(cmd, **kwargs)
        return result.stdout if result.success else ""
    
    def check_output(
        self,
        cmd: Union[str, List[str]],
        **kwargs
    ) -> str:
        """
        获取命令输出,失败时抛出异常
        
        Returns:
            标准输出字符串
        
        Raises:
            subprocess.CalledProcessError: 命令失败时
        """
        result = self.execute(cmd, check=True, **kwargs)
        return result.stdout


# 全局默认执行器实例
_default_executor = CommandExecutor()

def run_cmd(
    cmd: Union[str, List[str]],
    **kwargs
) -> CommandResult:
    """
    便捷函数:执行命令并返回结果
    
    Examples:
        >>> result = run_cmd(['ls', '-l'])
        >>> if result.success:
        ...     print(result.stdout)
        
        >>> result = run_cmd('ping 127.0.0.1 -c 4', shell=True, timeout=10)
        
        >>> # 实时输出
        >>> def on_output(line, is_error):
        ...     print(f"{'ERROR' if is_error else 'OUT'}: {line}", end='')
        >>> result = run_cmd(['ping', 'google.com'], realtime_output=True, output_callback=on_output)
    """
    return _default_executor.execute(cmd, **kwargs)

def get_cmd_output(
    cmd: Union[str, List[str]],
    **kwargs
) -> str:
    """
    便捷函数:快速获取命令输出
    
    Examples:
        >>> output = get_cmd_output(['date'])
        >>> print(f"Current date: {output}")
    """
    return _default_executor.get_output(cmd, **kwargs)

def check_cmd_output(
    cmd: Union[str, List[str]],
    **kwargs
) -> str:
    """
    便捷函数:获取输出,失败抛异常
    
    Examples:
        >>> output = check_cmd_output(['which', 'python'])
        >>> print(f"Python location: {output}")
    """
    return _default_executor.check_output(cmd, **kwargs)

使用示例

# 示例1: 基础使用
result = run_cmd(['ls', '-la'])
if result.success:
    print(f"成功: {result.stdout}")
else:
    print(f"失败: {result.stderr}")

# 示例2: 带超时和重试
result = run_cmd(
    ['curl', 'https://api.example.com/data'],
    timeout=5,
    retry=3,
    retry_delay=2
)

# 示例3: 实时输出
def print_output(line, is_error):
    prefix = "ERROR" if is_error else "OUT"
    print(f"[{prefix}] {line.rstrip()}")

result = run_cmd(
    ['ping', '8.8.8.8', '-c', '10'],
    realtime_output=True,
    output_callback=print_output
)

# 示例4: 环境变量和工作目录
result = run_cmd(
    ['python', 'build.py'],
    cwd='/project',
    env={'DEBUG': '1', 'PATH': '/custom/bin'}
)

# 示例5: 管道输入
result = run_cmd(
    ['grep', 'error'],
    input_data="line1\nline2 error\nline3",
    capture_output=True
)

# 示例6: 快速获取输出
output = get_cmd_output(['hostname'])
print(f"Hostname: {output.strip()}")

# 示例7: 检查命令执行
try:
    output = check_cmd_output(['git', 'status'])
    print("Git status:", output)
except subprocess.CalledProcessError as e:
    print(f"Git command failed: {e}")

# 示例8: 异步执行
proc = _default_executor.execute_async(
    ['long_running_task'],
    callback=lambda code, out, err: print(f"Done with code {code}")
)
# 继续做其他事...
proc.wait()  # 可选:等待完成
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容