subprocess模块

调用

import subprocess


def run_command(cmd):
    """
    执行系统命令并返回结果

    Args:
        cmd: 要执行的命令(字符串)

    Returns:
        tuple: (returncode, stdout, stderr)
    """
    try:
        result = subprocess.run(
            cmd,
            shell=True,  # 注意:有安全风险
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,  # 自动返回字符串,无需手动 decode
            timeout=30,
            encoding='utf-8'  # text=True 时此参数可省略
        )
        return result.returncode, result.stdout, result.stderr
    except subprocess.TimeoutExpired:
        return -1, "", f"命令执行超时(30秒): {cmd}"
    except subprocess.CalledProcessError as e:
        return e.returncode, e.stdout, e.stderr
    except Exception as e:
        return -1, "", f"执行命令时发生错误: {str(e)}"

直接使用的函数

import subprocess
import time
from typing import Union, List, Tuple

def run_cmd(
    cmd: Union[str, List[str]], 
    timeout: int = 30,
    shell: bool = False,
    cwd: str = None
) -> Tuple[int, str, str]:
    """
    执行命令并返回结果
    
    Args:
        cmd: 命令或命令列表
        timeout: 超时时间(秒)
        shell: 是否使用shell
        cwd: 工作目录
    
    Returns:
        (返回码, 标准输出, 标准错误)
    
    Examples:
        >>> code, out, err = run_cmd(['ls', '-l'])
        >>> code, out, err = run_cmd('ls -l | grep py', shell=True)
    """
    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=shell,
            cwd=cwd
        )
        return result.returncode, result.stdout, result.stderr
    except subprocess.TimeoutExpired:
        return -1, "", f"命令执行超时({timeout}秒)"
    except FileNotFoundError:
        return -2, "", f"命令未找到: {cmd}"
    except Exception as e:
        return -3, "", str(e)


# 使用示例
if __name__ == '__main__':
    # 执行命令
    code, out, err = run_cmd(['ls', '-la'])
    if code == 0:
        print(out)
    else:
        print(f"错误: {err}")
    
    # 带超时
    code, out, err = run_cmd(['ping', 'google.com'], timeout=3)
    
    # 使用 shell
    code, out, err = run_cmd('echo "hello" | wc -c', shell=True)

subprocess 是 Python 标准库中用于创建和管理子进程的模块。它允许你运行系统命令、启动另一个 Python 脚本、调用外部程序,并获取它们的输出、错误信息和返回码。
核心功能对比

函数/类    特点  适用场景
subprocess.run()    Python 3.5+ 推荐,阻塞等待完成   大多数简单场景
subprocess.Popen()  更底层,非阻塞,可实时交互   需要持续通信或控制进程
subprocess.call()   旧版本,已过时 不建议新代码使用
subprocess.check_output()   仅获取输出   简单获取标准输出

三、参数详细说明

3.1 run() 函数完整参数

subprocess.run(
    args,                    # 命令: 列表或字符串
    stdin=None,              # 标准输入
    stdout=None,             # 标准输出
    stderr=None,             # 标准错误
    capture_output=False,    # 是否捕获stdout/stderr (3.8+)
    shell=False,             # 是否通过shell执行
    cwd=None,                # 工作目录
    timeout=None,            # 超时秒数
    check=False,             # 失败时抛异常
    encoding=None,           # 编码
    errors=None,             # 编码错误处理
    text=None,               # 是否返回字符串 (3.7+)
    env=None,                # 环境变量
    universal_newlines=None, # 同text (旧参数)
    creationflags=0,         # Windows创建标志
    startupinfo=None,        # Windows启动信息
)

3.2 Popen() 构造函数参数

subprocess.Popen(
    args,
    bufsize=-1,              # 缓冲区大小
    executable=None,         # 替换程序
    stdin=None,
    stdout=None,
    stderr=None,
    preexec_fn=None,         # 执行前调用函数(Unix)
    close_fds=True,          # 关闭所有文件描述符
    shell=False,
    cwd=None,
    env=None,
    universal_newlines=None,
    startupinfo=None,
    creationflags=0,
    restore_signals=True,    # 恢复信号(Unix)
    start_new_session=False, # 新会话(Unix)
    pass_fds=(),             # 传递文件描述符
    encoding=None,
    errors=None,
    text=None,
)

基本使用示例

  1. subprocess.run() - 推荐方式
import subprocess

# 运行命令,等待完成
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout)      # 标准输出
print(result.stderr)      # 错误输出
print(result.returncode)  # 返回码(0表示成功)

# 检查执行是否成功(非0返回码会抛出异常)
subprocess.run(['ls', '/nonexistent'], check=True)

# 使用 shell 模式(注意安全风险)
subprocess.run('echo "Hello" | grep H', shell=True)
  1. subprocess.Popen() - 高级控制
import subprocess

# 启动进程但不等待
proc = subprocess.Popen(['ping', '-c', '4', 'google.com'], 
                        stdout=subprocess.PIPE, 
                        stderr=subprocess.PIPE,
                        text=True)

# 等待完成并获取输出
stdout, stderr = proc.communicate()
print(f"返回码: {proc.returncode}")

# 实时读取输出
proc = subprocess.Popen(['ping', '-c', '4', 'google.com'], 
                        stdout=subprocess.PIPE, 
                        text=True)

while True:
    output = proc.stdout.readline()
    if output == '' and proc.poll() is not None:
        break
    if output:
        print(output.strip())

# 向进程输入数据
proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
stdout, _ = proc.communicate(input='Hello from Python!')
print(stdout)  # 输出: Hello from Python!
  1. 管道连接多个命令
# 等价于: ls -l | grep ".py" | wc -l
p1 = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', '.py'], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(['wc', '-l'], stdin=p2.stdout, stdout=subprocess.PIPE)

p1.stdout.close()
p2.stdout.close()
result = p3.communicate()[0]
print(f"找到 {result.decode().strip()} 个 Python 文件")

常用参数说明

参数  作用
capture_output=True 捕获 stdout 和 stderr
text=True   返回字符串而非 bytes
check=True  命令失败时抛出 CalledProcessError
shell=True  通过 shell 执行(有安全风险)
timeout=5   超时限制(秒)
env={'KEY':'value'} 设置环境变量
cwd='/path' 设置工作目录

错误处理

import subprocess

try:
    result = subprocess.run(['false'], check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
    print(f"命令失败,返回码: {e.returncode}")
    print(f"错误输出: {e.stderr}")
except subprocess.TimeoutExpired as e:
    print(f"命令超时: {e.timeout}秒")
except FileNotFoundError as e:
    print(f"命令未找到: {e}")

安全注意事项

避免 shell=True 当命令参数来自用户输入时,存在命令注入风险:

python
# 危险!用户输入 "; rm -rf /" 会执行删除操作
user_input = input("输入文件名: ")
subprocess.run(f'ls {user_input}', shell=True)

# 安全:使用列表形式
subprocess.run(['ls', user_input])
必须使用 shell=True 时,确保输入是可信的或经过严格验证。

实际应用示例

import subprocess
import sys

def run_command(cmd, timeout=30):
    """安全运行命令并返回输出"""
    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            check=False
        )
        return {
            'success': result.returncode == 0,
            'stdout': result.stdout,
            'stderr': result.stderr,
            'returncode': result.returncode
        }
    except subprocess.TimeoutExpired:
        return {'success': False, 'error': f'命令执行超过{timeout}秒'}

# 使用示例
result = run_command(['ping', '-c', '1', '8.8.8.8'])
if result['success']:
    print(result['stdout'])
else:
    print(f"错误: {result['stderr']}")

选择 subprocess.run() 处理绝大多数任务,需要实时交互或长时间运行的任务使用 Popen。始终优先使用列表形式的命令参数,避免 shell=True 除非绝对必要。

完整封装的命令执行函数
基础版封装

import subprocess
import sys
from typing import List, Optional, Union, Tuple, Dict, Any

def run_cmd_simple(
    cmd: Union[str, List[str]],
    timeout: int = 30,
    check: bool = False,
    shell: bool = False
) -> Tuple[int, str, str]:
    """
    简单封装的命令执行函数
    
    Args:
        cmd: 命令(字符串或列表)
        timeout: 超时时间(秒)
        check: 是否检查返回码
        shell: 是否使用shell
    
    Returns:
        (returncode, stdout, stderr)
    
    Examples:
        >>> code, out, err = run_cmd_simple(['ls', '-l'])
        >>> code, out, err = run_cmd_simple('ls -l', shell=True)
    """
    try:
        result = subprocess.run(
            cmd,
            shell=True,
            capture_output=True,
            text=True,
            timeout=timeout,
            check=check,
            shell=shell
        )
        return result.returncode, result.stdout, result.stderr
    except subprocess.TimeoutExpired:
        return -1, "", f"Command timeout after {timeout}s"
    except subprocess.CalledProcessError as e:
        return e.returncode, e.stdout, e.stderr
    except FileNotFoundError:
        return -2, "", f"Command not found: {cmd}"

企业级封装

import subprocess
import os
import signal
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List, Union
from dataclasses import dataclass, field
from enum import Enum

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class CommandStatus(Enum):
    """命令执行状态"""
    SUCCESS = "success"
    FAILED = "failed"
    TIMEOUT = "timeout"
    NOT_FOUND = "not_found"
    INTERRUPTED = "interrupted"

@dataclass
class CommandResult:
    """命令执行结果"""
    status: CommandStatus
    returncode: int
    stdout: str
    stderr: str
    command: str
    execution_time: float
    start_time: datetime = field(default_factory=datetime.now)
    
    @property
    def success(self) -> bool:
        return self.status == CommandStatus.SUCCESS
    
    @property
    def output(self) -> str:
        """优先返回 stdout,如果为空则返回 stderr"""
        return self.stdout if self.stdout else self.stderr
    
    def __str__(self) -> str:
        return f"CommandResult(status={self.status.value}, returncode={self.returncode}, time={self.execution_time:.2f}s)"


class CommandExecutor:
    """
    企业级命令执行器
    
    特性:
    - 自动重试
    - 超时控制
    - 日志记录
    - 环境变量管理
    - 工作目录切换
    - 实时输出回调
    - 信号处理
    """
    
    def __init__(
        self,
        default_timeout: int = 30,
        default_retry: int = 0,
        default_retry_delay: float = 1.0,
        log_output: bool = True,
        raise_on_error: bool = False
    ):
        """
        初始化执行器
        
        Args:
            default_timeout: 默认超时时间(秒)
            default_retry: 默认重试次数
            default_retry_delay: 默认重试延迟(秒)
            log_output: 是否记录输出到日志
            raise_on_error: 错误时是否抛出异常
        """
        self.default_timeout = default_timeout
        self.default_retry = default_retry
        self.default_retry_delay = default_retry_delay
        self.log_output = log_output
        self.raise_on_error = raise_on_error
        
    def execute(
        self,
        cmd: Union[str, List[str]],
        timeout: Optional[int] = None,
        retry: Optional[int] = None,
        retry_delay: Optional[float] = None,
        shell: bool = False,
        cwd: Optional[str] = None,
        env: Optional[Dict[str, str]] = None,
        input_data: Optional[str] = None,
        realtime_output: bool = False,
        output_callback: Optional[callable] = None,
        check: bool = False,
        **kwargs
    ) -> CommandResult:
        """
        执行命令
        
        Args:
            cmd: 要执行的命令
            timeout: 超时时间(秒),None则使用默认值
            retry: 重试次数,None则使用默认值
            retry_delay: 重试延迟,None则使用默认值
            shell: 是否使用shell执行
            cwd: 工作目录
            env: 环境变量
            input_data: 发送到stdin的数据
            realtime_output: 是否实时输出(会阻塞直到命令完成)
            output_callback: 实时输出的回调函数
            check: 是否检查返回码(失败时抛出异常)
            **kwargs: 传递给Popen的其他参数
        
        Returns:
            CommandResult对象
        
        Raises:
            subprocess.CalledProcessError: 当check=True且命令失败时
            subprocess.TimeoutExpired: 当超时发生时
        """
        timeout = timeout or self.default_timeout
        retry = retry if retry is not None else self.default_retry
        retry_delay = retry_delay if retry_delay is not None else self.default_retry_delay
        
        # 准备环境变量
        final_env = None
        if env:
            final_env = os.environ.copy()
            final_env.update(env)
        
        # 命令字符串用于日志
        cmd_str = cmd if isinstance(cmd, str) else ' '.join(cmd)
        
        # 重试逻辑
        for attempt in range(retry + 1):
            try:
                if realtime_output:
                    result = self._execute_realtime(
                        cmd, timeout, shell, cwd, final_env, 
                        input_data, output_callback, **kwargs
                    )
                else:
                    result = self._execute_blocking(
                        cmd, timeout, shell, cwd, final_env, 
                        input_data, **kwargs
                    )
                
                # 记录日志
                if self.log_output:
                    self._log_result(cmd_str, result)
                
                # 检查返回码
                if check and not result.success:
                    raise subprocess.CalledProcessError(
                        result.returncode, cmd_str, result.stdout, result.stderr
                    )
                
                return result
                
            except subprocess.TimeoutExpired as e:
                logger.warning(f"Command timeout (attempt {attempt + 1}/{retry + 1}): {cmd_str}")
                if attempt == retry:
                    return CommandResult(
                        status=CommandStatus.TIMEOUT,
                        returncode=-1,
                        stdout="",
                        stderr=str(e),
                        command=cmd_str,
                        execution_time=timeout
                    )
                import time
                time.sleep(retry_delay)
                
            except FileNotFoundError as e:
                logger.error(f"Command not found: {cmd_str}")
                return CommandResult(
                    status=CommandStatus.NOT_FOUND,
                    returncode=-2,
                    stdout="",
                    stderr=str(e),
                    command=cmd_str,
                    execution_time=0
                )
                
            except KeyboardInterrupt:
                logger.warning(f"Command interrupted: {cmd_str}")
                return CommandResult(
                    status=CommandStatus.INTERRUPTED,
                    returncode=-3,
                    stdout="",
                    stderr="Interrupted by user",
                    command=cmd_str,
                    execution_time=0
                )
        
        # 不应该到达这里
        return CommandResult(
            status=CommandStatus.FAILED,
            returncode=-999,
            stdout="",
            stderr="Unknown error",
            command=cmd_str,
            execution_time=0
        )
    
    def _execute_blocking(
        self,
        cmd: Union[str, List[str]],
        timeout: int,
        shell: bool,
        cwd: Optional[str],
        env: Optional[Dict[str, str]],
        input_data: Optional[str],
        **kwargs
    ) -> CommandResult:
        """阻塞执行命令"""
        import time
        
        start_time = time.time()
        
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
            shell=shell,
            cwd=cwd,
            env=env,
            input=input_data,
            **kwargs
        )
        
        execution_time = time.time() - start_time
        
        return CommandResult(
            status=CommandStatus.SUCCESS if result.returncode == 0 else CommandStatus.FAILED,
            returncode=result.returncode,
            stdout=result.stdout,
            stderr=result.stderr,
            command=cmd if isinstance(cmd, str) else ' '.join(cmd),
            execution_time=execution_time
        )
    
    def _execute_realtime(
        self,
        cmd: Union[str, List[str]],
        timeout: int,
        shell: bool,
        cwd: Optional[str],
        env: Optional[Dict[str, str]],
        input_data: Optional[str],
        output_callback: Optional[callable],
        **kwargs
    ) -> CommandResult:
        """实时输出执行命令"""
        import time
        import threading
        
        start_time = time.time()
        stdout_lines = []
        stderr_lines = []
        
        # 启动进程
        proc = subprocess.Popen(
            cmd,
            stdin=subprocess.PIPE if input_data else None,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            shell=shell,
            cwd=cwd,
            env=env,
            **kwargs
        )
        
        def read_output(pipe, lines_list, callback, is_stderr=False):
            for line in iter(pipe.readline, ''):
                if line:
                    lines_list.append(line)
                    if callback:
                        callback(line, is_stderr)
            pipe.close()
        
        # 启动输出读取线程
        stdout_thread = threading.Thread(
            target=read_output, 
            args=(proc.stdout, stdout_lines, output_callback, False)
        )
        stderr_thread = threading.Thread(
            target=read_output,
            args=(proc.stderr, stderr_lines, output_callback, True)
        )
        
        stdout_thread.start()
        stderr_thread.start()
        
        # 发送输入数据
        if input_data:
            proc.stdin.write(input_data)
            proc.stdin.close()
        
        # 等待进程结束(带超时)
        try:
            proc.wait(timeout=timeout)
        except subprocess.TimeoutExpired:
            proc.kill()
            proc.wait()
            raise
        
        # 等待线程结束
        stdout_thread.join()
        stderr_thread.join()
        
        execution_time = time.time() - start_time
        
        return CommandResult(
            status=CommandStatus.SUCCESS if proc.returncode == 0 else CommandStatus.FAILED,
            returncode=proc.returncode,
            stdout=''.join(stdout_lines),
            stderr=''.join(stderr_lines),
            command=cmd if isinstance(cmd, str) else ' '.join(cmd),
            execution_time=execution_time
        )
    
    def _log_result(self, cmd: str, result: CommandResult):
        """记录执行结果到日志"""
        if result.success:
            logger.info(f"Command succeeded: {cmd} (time={result.execution_time:.2f}s)")
        else:
            logger.error(f"Command failed: {cmd} (code={result.returncode}, time={result.execution_time:.2f}s)")
        
        if self.log_output and result.stdout:
            logger.debug(f"STDOUT: {result.stdout[:500]}")
        if self.log_output and result.stderr:
            logger.warning(f"STDERR: {result.stderr[:500]}")
    
    def execute_async(
        self,
        cmd: Union[str, List[str]],
        callback: Optional[callable] = None,
        **kwargs
    ) -> subprocess.Popen:
        """
        异步执行命令(非阻塞)
        
        Args:
            cmd: 命令
            callback: 完成时的回调函数
            **kwargs: 传递给Popen的参数
        
        Returns:
            Popen对象,可用于后续通信
        """
        proc = subprocess.Popen(
            cmd,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True,
            **kwargs
        )
        
        if callback:
            def wait_and_callback():
                stdout, stderr = proc.communicate()
                callback(proc.returncode, stdout, stderr)
            
            import threading
            thread = threading.Thread(target=wait_and_callback)
            thread.start()
        
        return proc
    
    def get_output(
        self,
        cmd: Union[str, List[str]],
        **kwargs
    ) -> str:
        """
        获取命令输出(仅stdout)
        
        Returns:
            标准输出字符串,失败时返回空字符串
        """
        result = self.execute(cmd, **kwargs)
        return result.stdout if result.success else ""
    
    def check_output(
        self,
        cmd: Union[str, List[str]],
        **kwargs
    ) -> str:
        """
        获取命令输出,失败时抛出异常
        
        Returns:
            标准输出字符串
        
        Raises:
            subprocess.CalledProcessError: 命令失败时
        """
        result = self.execute(cmd, check=True, **kwargs)
        return result.stdout


# 全局默认执行器实例
_default_executor = CommandExecutor()

def run_cmd(
    cmd: Union[str, List[str]],
    **kwargs
) -> CommandResult:
    """
    便捷函数:执行命令并返回结果
    
    Examples:
        >>> result = run_cmd(['ls', '-l'])
        >>> if result.success:
        ...     print(result.stdout)
        
        >>> result = run_cmd('ping 127.0.0.1 -c 4', shell=True, timeout=10)
        
        >>> # 实时输出
        >>> def on_output(line, is_error):
        ...     print(f"{'ERROR' if is_error else 'OUT'}: {line}", end='')
        >>> result = run_cmd(['ping', 'google.com'], realtime_output=True, output_callback=on_output)
    """
    return _default_executor.execute(cmd, **kwargs)

def get_cmd_output(
    cmd: Union[str, List[str]],
    **kwargs
) -> str:
    """
    便捷函数:快速获取命令输出
    
    Examples:
        >>> output = get_cmd_output(['date'])
        >>> print(f"Current date: {output}")
    """
    return _default_executor.get_output(cmd, **kwargs)

def check_cmd_output(
    cmd: Union[str, List[str]],
    **kwargs
) -> str:
    """
    便捷函数:获取输出,失败抛异常
    
    Examples:
        >>> output = check_cmd_output(['which', 'python'])
        >>> print(f"Python location: {output}")
    """
    return _default_executor.check_output(cmd, **kwargs)

使用示例

# 示例1: 基础使用
result = run_cmd(['ls', '-la'])
if result.success:
    print(f"成功: {result.stdout}")
else:
    print(f"失败: {result.stderr}")

# 示例2: 带超时和重试
result = run_cmd(
    ['curl', 'https://api.example.com/data'],
    timeout=5,
    retry=3,
    retry_delay=2
)

# 示例3: 实时输出
def print_output(line, is_error):
    prefix = "ERROR" if is_error else "OUT"
    print(f"[{prefix}] {line.rstrip()}")

result = run_cmd(
    ['ping', '8.8.8.8', '-c', '10'],
    realtime_output=True,
    output_callback=print_output
)

# 示例4: 环境变量和工作目录
result = run_cmd(
    ['python', 'build.py'],
    cwd='/project',
    env={'DEBUG': '1', 'PATH': '/custom/bin'}
)

# 示例5: 管道输入
result = run_cmd(
    ['grep', 'error'],
    input_data="line1\nline2 error\nline3",
    capture_output=True
)

# 示例6: 快速获取输出
output = get_cmd_output(['hostname'])
print(f"Hostname: {output.strip()}")

# 示例7: 检查命令执行
try:
    output = check_cmd_output(['git', 'status'])
    print("Git status:", output)
except subprocess.CalledProcessError as e:
    print(f"Git command failed: {e}")

# 示例8: 异步执行
proc = _default_executor.execute_async(
    ['long_running_task'],
    callback=lambda code, out, err: print(f"Done with code {code}")
)
# 继续做其他事...
proc.wait()  # 可选:等待完成

高级版本(带日志和回调)

import subprocess  # 用于执行系统命令
import shlex  # 用于安全地分割命令字符串
import logging  # 用于记录日志
from typing import Optional, Callable, Tuple, Union, List  # 类型注解

# 配置日志系统
logging.basicConfig(level=logging.INFO)  # 设置日志级别为INFO(会显示INFO及以上级别的日志)
logger = logging.getLogger(__name__)  # 获取当前模块的日志记录器

def run_command(
    cmd: Union[str, List[str]],  # 命令:可以是字符串"ls -l"或列表["ls", "-l"]
    shell: bool = False,  # 是否通过shell执行(False更安全)
    timeout: int = 30,  # 超时时间(秒)
    cwd: Optional[str] = None,  # 工作目录:命令在这个目录下执行,None表示当前目录
    env: Optional[dict] = None,  # 环境变量:None表示继承父进程的环境变量
    on_output: Optional[Callable[[str], None]] = None,  # 实时输出回调函数(每行输出都会调用这个函数)
    on_error: Optional[Callable[[str], None]] = None,  # 实时错误输出回调函数
    raise_on_error: bool = False  # 命令失败时是否抛出异常
) -> Tuple[int, str, str]:  # 返回值:(返回码, 标准输出, 错误输出)
    """
    增强版命令执行函数 - 支持实时输出、工作目录、环境变量等高级功能
    
    这个函数比基础版本更强大,可以:
    1. 实时获取命令输出(不用等命令执行完)
    2. 指定工作目录
    3. 自定义环境变量
    4. 记录执行日志
    
    参数详细说明:
        cmd: 要执行的命令
            - 字符串形式: "ls -la"
            - 列表形式: ["ls", "-la"] (推荐)
            
        shell: 是否通过shell执行
            - False: 直接执行程序(推荐,安全)
            - True: 通过/bin/sh执行(支持管道、通配符等)
            
        timeout: 超时时间(秒)
            - 命令执行超过这个时间会被强制终止
            - 默认30秒
            
        cwd: 工作目录(Current Working Directory)
            - 设置命令执行时的当前目录
            - 例如: cwd="/tmp" 会在/tmp目录下执行命令
            - None表示使用当前目录
            
        env: 环境变量字典
            - 设置命令执行时的环境变量
            - 例如: env={"PATH": "/usr/bin", "MY_VAR": "hello"}
            - None表示继承父进程的环境变量
            
        on_output: 实时输出回调函数
            - 每次有新的一行标准输出时都会调用这个函数
            - 函数签名: def callback(line: str) -> None
            - 用途: 实时显示进度、保存到文件等
            
        on_error: 实时错误回调函数
            - 每次有新的一行错误输出时都会调用
            - 用途: 实时显示错误信息
            
        raise_on_error: 错误时是否抛出异常
            - False: 命令失败时返回错误码,不抛异常
            - True: 命令失败时抛出CalledProcessError异常
    
    返回值说明:
        returncode: 命令返回码(0=成功,非0=失败)
        stdout: 完整的标准输出(所有输出拼接成的字符串)
        stderr: 完整的错误输出(所有错误拼接成的字符串)
    
    使用示例:
        # 示例1:实时打印输出
        >>> def print_line(line):
        ...     print(f"实时输出: {line}")
        ...
        >>> run_command(['ping', '-c', '3', 'google.com'], on_output=print_line)
        
        # 示例2:指定工作目录
        >>> run_command(['ls', '-la'], cwd='/tmp')
        
        # 示例3:自定义环境变量
        >>> run_command(['python', 'script.py'], env={'PYTHONPATH': '/my/libs'})
    """
    
    # ==================== 第1步:预处理命令参数 ====================
    # 如果命令是字符串且不使用shell,需要分割成列表
    # 例如:"ls -l" -> ["ls", "-l"]
    if isinstance(cmd, str) and not shell:
        cmd = shlex.split(cmd)  # shlex.split会正确处理引号和转义字符
    
    # 记录日志(方便调试和追踪)
    # 如果是列表,用空格连接成字符串再记录
    logger.info(f"执行命令: {cmd if isinstance(cmd, str) else ' '.join(cmd)}")
    
    # ==================== 第2步:初始化数据结构 ====================
    stdout_lines = []  # 存储所有标准输出行(用于最后的返回值)
    stderr_lines = []  # 存储所有错误输出行(用于最后的返回值)
    
    # ==================== 第3步:创建子进程 ====================
    # 使用Popen而不是run,因为Popen支持实时读取输出
    process = subprocess.Popen(
        cmd,                    # 要执行的命令
        shell=shell,           # 是否使用shell
        stdout=subprocess.PIPE, # 捕获标准输出
        stderr=subprocess.PIPE, # 捕获错误输出
        text=True,             # 以文本形式返回(而不是字节)
        cwd=cwd,               # 工作目录(None表示当前目录)
        env=env,               # 环境变量(None表示继承父进程)
        bufsize=1              # 行缓冲(1表示遇到换行符就刷新缓冲区)
    )
    
    # ==================== 第4步:实时读取输出(try块) ====================
    try:
        # 导入需要的模块
        import select  # select用于监控文件描述符(跨平台)
        import sys     # sys用于判断操作系统类型
        
        # ---------- 4.1 设置非阻塞模式(仅Unix/Linux)----------
        # 非阻塞模式:读取时如果没有数据就立即返回,而不是等待
        # Windows不支持fcntl,所以只在Unix/Linux下启用
        if sys.platform != 'win32':  # sys.platform != 'win32' 表示不是Windows系统
            import fcntl  # fcntl用于设置文件描述符属性(Unix专用)
            import os     # os模块提供操作系统接口
            
            # 为标准输出和标准错误设置非阻塞模式
            for pipe in [process.stdout, process.stderr]:
                if pipe:  # 如果管道存在
                    fd = pipe.fileno()  # 获取文件描述符(整数)
                    # 获取当前文件状态标志
                    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
                    # 添加非阻塞标志(O_NONBLOCK)
                    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
        
        # ---------- 4.2 循环读取输出直到进程结束 ----------
        while True:
            # 检查进程是否结束
            # poll()返回None表示还在运行,返回其他值表示已结束(返回值就是返回码)
            returncode = process.poll()
            
            # ----- 读取标准输出(正常输出)-----
            if process.stdout:
                # iter(process.stdout.readline, '') 会持续读取直到遇到空字符串
                # 每次读取一行(遇到换行符)
                for line in iter(process.stdout.readline, ''):
                    if line:  # 有内容
                        # 移除末尾的换行符,添加到列表
                        line = line.rstrip()
                        stdout_lines.append(line)
                        # 如果提供了回调函数,调用它(实时输出)
                        if on_output:
                            on_output(line)
                    else:  # 读到空字符串表示没有更多数据了
                        break
            
            # ----- 读取标准错误(错误输出)-----
            if process.stderr:
                for line in iter(process.stderr.readline, ''):
                    if line:
                        line = line.rstrip()
                        stderr_lines.append(line)
                        # 如果提供了错误回调函数,调用它
                        if on_error:
                            on_error(line)
                    else:
                        break
            
            # ----- 处理进程结束后的剩余数据 -----
            # 进程已经结束(returncode不是None)
            if returncode is not None:
                # 读取标准输出中剩余的所有数据
                if process.stdout:
                    remaining_out = process.stdout.read()
                    if remaining_out:  # 有剩余内容
                        # splitlines()按行分割,且会保留内容(不包括换行符)
                        lines = remaining_out.splitlines()
                        stdout_lines.extend(lines)  # 添加到列表
                        # 如果有回调函数,也实时输出这些剩余行
                        if on_output:
                            for line in lines:
                                on_output(line)
                
                # 读取标准错误中剩余的所有数据
                if process.stderr:
                    remaining_err = process.stderr.read()
                    if remaining_err:
                        lines = remaining_err.splitlines()
                        stderr_lines.extend(lines)
                        if on_error:
                            for line in lines:
                                on_error(line)
                
                # 所有数据读取完毕,退出循环
                break
            
            # ----- 避免忙等待(busy waiting)-----
            # 如果进程还在运行,休眠0.1秒再继续循环
            # 防止CPU占用率过高
            import time
            time.sleep(0.1)
        
        # ---------- 4.3 处理结果 ----------
        # 将列表中的所有行用换行符连接成一个字符串
        stdout_str = '\n'.join(stdout_lines)
        stderr_str = '\n'.join(stderr_lines)
        
        # 如果命令失败(返回码非0)且要求抛出异常
        if returncode != 0 and raise_on_error:
            # 抛出CalledProcessError异常,包含返回码、命令、输出等信息
            raise subprocess.CalledProcessError(returncode, cmd, stdout_str, stderr_str)
        
        # 返回结果(返回码、标准输出、错误输出)
        return returncode, stdout_str, stderr_str
    
    # ==================== 第5步:异常处理 ====================
    # 异常1:命令执行超时
    except subprocess.TimeoutExpired:
        # 超时了,强制杀死进程
        process.kill()   # 发送SIGKILL信号终止进程
        process.wait()   # 等待进程真正结束
        # 记录错误日志
        logger.error(f"命令超时: {cmd}")
        # 返回超时错误
        return -1, "", f"命令执行超时({timeout}秒)"
    
    # 异常2:其他任何异常
    except Exception as e:
        # 记录错误日志
        logger.error(f"命令执行失败: {e}")
        # 返回错误信息
        return -1, "", str(e)
    
    # ==================== 第6步:清理资源(finally块) ====================
    finally:
        # 无论是否发生异常,都要关闭管道
        # 释放系统资源,避免文件描述符泄漏
        if process.stdout:
            process.stdout.close()  # 关闭标准输出管道
        if process.stderr:
            process.stderr.close()  # 关闭错误输出管道
        if process.stdin:
            process.stdin.close()   # 关闭标准输入管道(本例中没有用到)


# ==================== 使用示例 ====================

def example_usage():
    """展示增强版函数的各种用法"""
    
    # 示例1:实时显示命令输出(像在终端一样)
    def print_progress(line):
        print(f"[实时] {line}")
    
    print("=== 示例1:实时显示ping输出 ===")
    code, out, err = run_command(
        ['ping', '-c', '3', '127.0.0.1'],
        on_output=print_progress,  # 每行输出都会实时打印
        timeout=10
    )
    print(f"命令结束,返回码: {code}")
    
    # 示例2:在指定目录执行命令
    print("\n=== 示例2:在/tmp目录执行ls ===")
    code, out, err = run_command(
        ['ls', '-la'],
        cwd='/tmp',  # 切换到/tmp目录执行
        shell=False
    )
    print(f"/tmp目录下的文件:\n{out}")
    
    # 示例3:自定义环境变量
    print("\n=== 示例3:使用自定义环境变量 ===")
    code, out, err = run_command(
        ['python', '-c', 'import os; print(os.environ.get("MY_VAR", "not found"))'],
        env={'MY_VAR': 'hello world', 'PATH': '/usr/bin'},  # 自定义环境
        shell=False
    )
    print(f"输出: {out}")
    
    # 示例4:收集ping命令的统计信息(实时显示+最后汇总)
    print("\n=== 示例4:实时显示并收集完整输出 ===")
    output_lines = []  # 用于收集所有输出
    
    def collect_output(line):
        print(f"实时: {line}")  # 实时显示
        output_lines.append(line)  # 同时收集
    
    code, out, err = run_command(
        ['ping', '-c', '3', 'google.com'],
        on_output=collect_output,
        timeout=10
    )
    
    print(f"\n总计收集到 {len(output_lines)} 行输出")
    if code == 0:
        print("命令执行成功")
    else:
        print(f"命令执行失败,返回码: {code}")


# ==================== 关键概念解释 ====================

"""
关键概念解释:

1. 实时输出 vs 等待输出:
   - 基础版(run):等待命令执行完,一次性获取所有输出
   - 增强版(Popen):边执行边获取输出,可以实时显示进度

2. 阻塞 vs 非阻塞:
   - 阻塞:读取时如果没有数据会一直等待(类似排队)
   - 非阻塞:读取时如果没有数据立即返回(类似看一眼没有就走)
   - 使用非阻塞可以更灵活地处理输出

3. 回调函数(Callback):
   - 是一个"回头调用"的函数
   - 我们定义好,让程序在特定事件发生时自动调用
   - 例如:有输出时自动调用on_output函数

4. 文件描述符(File Descriptor):
   - 操作系统为每个打开的文件/管道分配的一个整数编号
   - 0=标准输入,1=标准输出,2=标准错误
   - 通过这个编号可以操作对应的文件/管道

5. 忙等待(Busy Waiting):
   - 不断循环检查某个条件,非常消耗CPU
   - 使用sleep()可以降低CPU占用率

注意事项:
1. Windows系统不支持非阻塞模式,会有小差异
2. 如果回调函数执行太慢,会阻塞命令的执行
3. 超时后进程会被强制杀死,可能导致资源未释放
4. 大量输出时,内存消耗会很大
"""

if __name__ == "__main__":
    # 简单的测试
    print("测试增强版函数...")
    
    # 测试1:基本功能
    code, out, err = run_command(['echo', 'hello world'])
    print(f"结果: {out}")
    
    # 测试2:实时输出
    def show_output(x):
        print(f"实时看到: {x}")
    
    code, out, err = run_command(
        ['python', '-c', 'import time; [print(i) or time.sleep(1) for i in range(3)]'],
        on_output=show_output
    )
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容