直接使用的函数
import subprocess
import time
from typing import Union, List, Tuple
def run_cmd(
cmd: Union[str, List[str]],
timeout: int = 30,
shell: bool = False,
cwd: str = None
) -> Tuple[int, str, str]:
"""
执行命令并返回结果
Args:
cmd: 命令或命令列表
timeout: 超时时间(秒)
shell: 是否使用shell
cwd: 工作目录
Returns:
(返回码, 标准输出, 标准错误)
Examples:
>>> code, out, err = run_cmd(['ls', '-l'])
>>> code, out, err = run_cmd('ls -l | grep py', shell=True)
"""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
shell=shell,
cwd=cwd
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", f"命令执行超时({timeout}秒)"
except FileNotFoundError:
return -2, "", f"命令未找到: {cmd}"
except Exception as e:
return -3, "", str(e)
# 使用示例
if __name__ == '__main__':
# 执行命令
code, out, err = run_cmd(['ls', '-la'])
if code == 0:
print(out)
else:
print(f"错误: {err}")
# 带超时
code, out, err = run_cmd(['ping', 'google.com'], timeout=3)
# 使用 shell
code, out, err = run_cmd('echo "hello" | wc -c', shell=True)
subprocess 是 Python 标准库中用于创建和管理子进程的模块。它允许你运行系统命令、启动另一个 Python 脚本、调用外部程序,并获取它们的输出、错误信息和返回码。
核心功能对比
函数/类 特点 适用场景
subprocess.run() Python 3.5+ 推荐,阻塞等待完成 大多数简单场景
subprocess.Popen() 更底层,非阻塞,可实时交互 需要持续通信或控制进程
subprocess.call() 旧版本,已过时 不建议新代码使用
subprocess.check_output() 仅获取输出 简单获取标准输出
基本使用示例
- subprocess.run() - 推荐方式
import subprocess
# 运行命令,等待完成
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout) # 标准输出
print(result.stderr) # 错误输出
print(result.returncode) # 返回码(0表示成功)
# 检查执行是否成功(非0返回码会抛出异常)
subprocess.run(['ls', '/nonexistent'], check=True)
# 使用 shell 模式(注意安全风险)
subprocess.run('echo "Hello" | grep H', shell=True)
- subprocess.Popen() - 高级控制
import subprocess
# 启动进程但不等待
proc = subprocess.Popen(['ping', '-c', '4', 'google.com'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
# 等待完成并获取输出
stdout, stderr = proc.communicate()
print(f"返回码: {proc.returncode}")
# 实时读取输出
proc = subprocess.Popen(['ping', '-c', '4', 'google.com'],
stdout=subprocess.PIPE,
text=True)
while True:
output = proc.stdout.readline()
if output == '' and proc.poll() is not None:
break
if output:
print(output.strip())
# 向进程输入数据
proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
stdout, _ = proc.communicate(input='Hello from Python!')
print(stdout) # 输出: Hello from Python!
- 管道连接多个命令
# 等价于: ls -l | grep ".py" | wc -l
p1 = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', '.py'], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(['wc', '-l'], stdin=p2.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
p2.stdout.close()
result = p3.communicate()[0]
print(f"找到 {result.decode().strip()} 个 Python 文件")
常用参数说明
参数 作用
capture_output=True 捕获 stdout 和 stderr
text=True 返回字符串而非 bytes
check=True 命令失败时抛出 CalledProcessError
shell=True 通过 shell 执行(有安全风险)
timeout=5 超时限制(秒)
env={'KEY':'value'} 设置环境变量
cwd='/path' 设置工作目录
错误处理
import subprocess
try:
result = subprocess.run(['false'], check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print(f"命令失败,返回码: {e.returncode}")
print(f"错误输出: {e.stderr}")
except subprocess.TimeoutExpired as e:
print(f"命令超时: {e.timeout}秒")
except FileNotFoundError as e:
print(f"命令未找到: {e}")
安全注意事项
避免 shell=True 当命令参数来自用户输入时,存在命令注入风险:
python
# 危险!用户输入 "; rm -rf /" 会执行删除操作
user_input = input("输入文件名: ")
subprocess.run(f'ls {user_input}', shell=True)
# 安全:使用列表形式
subprocess.run(['ls', user_input])
必须使用 shell=True 时,确保输入是可信的或经过严格验证。
实际应用示例
import subprocess
import sys
def run_command(cmd, timeout=30):
"""安全运行命令并返回输出"""
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=False
)
return {
'success': result.returncode == 0,
'stdout': result.stdout,
'stderr': result.stderr,
'returncode': result.returncode
}
except subprocess.TimeoutExpired:
return {'success': False, 'error': f'命令执行超过{timeout}秒'}
# 使用示例
result = run_command(['ping', '-c', '1', '8.8.8.8'])
if result['success']:
print(result['stdout'])
else:
print(f"错误: {result['stderr']}")
选择 subprocess.run() 处理绝大多数任务,需要实时交互或长时间运行的任务使用 Popen。始终优先使用列表形式的命令参数,避免 shell=True 除非绝对必要。
完整封装的命令执行函数
基础版封装
import subprocess
import sys
from typing import List, Optional, Union, Tuple, Dict, Any
def run_cmd_simple(
cmd: Union[str, List[str]],
timeout: int = 30,
check: bool = False,
shell: bool = False
) -> Tuple[int, str, str]:
"""
简单封装的命令执行函数
Args:
cmd: 命令(字符串或列表)
timeout: 超时时间(秒)
check: 是否检查返回码
shell: 是否使用shell
Returns:
(returncode, stdout, stderr)
Examples:
>>> code, out, err = run_cmd_simple(['ls', '-l'])
>>> code, out, err = run_cmd_simple('ls -l', shell=True)
"""
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
timeout=timeout,
check=check,
shell=shell
)
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
return -1, "", f"Command timeout after {timeout}s"
except subprocess.CalledProcessError as e:
return e.returncode, e.stdout, e.stderr
except FileNotFoundError:
return -2, "", f"Command not found: {cmd}"
企业级封装
import subprocess
import os
import signal
import logging
from datetime import datetime
from typing import Optional, Dict, Any, List, Union
from dataclasses import dataclass, field
from enum import Enum
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class CommandStatus(Enum):
"""命令执行状态"""
SUCCESS = "success"
FAILED = "failed"
TIMEOUT = "timeout"
NOT_FOUND = "not_found"
INTERRUPTED = "interrupted"
@dataclass
class CommandResult:
"""命令执行结果"""
status: CommandStatus
returncode: int
stdout: str
stderr: str
command: str
execution_time: float
start_time: datetime = field(default_factory=datetime.now)
@property
def success(self) -> bool:
return self.status == CommandStatus.SUCCESS
@property
def output(self) -> str:
"""优先返回 stdout,如果为空则返回 stderr"""
return self.stdout if self.stdout else self.stderr
def __str__(self) -> str:
return f"CommandResult(status={self.status.value}, returncode={self.returncode}, time={self.execution_time:.2f}s)"
class CommandExecutor:
"""
企业级命令执行器
特性:
- 自动重试
- 超时控制
- 日志记录
- 环境变量管理
- 工作目录切换
- 实时输出回调
- 信号处理
"""
def __init__(
self,
default_timeout: int = 30,
default_retry: int = 0,
default_retry_delay: float = 1.0,
log_output: bool = True,
raise_on_error: bool = False
):
"""
初始化执行器
Args:
default_timeout: 默认超时时间(秒)
default_retry: 默认重试次数
default_retry_delay: 默认重试延迟(秒)
log_output: 是否记录输出到日志
raise_on_error: 错误时是否抛出异常
"""
self.default_timeout = default_timeout
self.default_retry = default_retry
self.default_retry_delay = default_retry_delay
self.log_output = log_output
self.raise_on_error = raise_on_error
def execute(
self,
cmd: Union[str, List[str]],
timeout: Optional[int] = None,
retry: Optional[int] = None,
retry_delay: Optional[float] = None,
shell: bool = False,
cwd: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
input_data: Optional[str] = None,
realtime_output: bool = False,
output_callback: Optional[callable] = None,
check: bool = False,
**kwargs
) -> CommandResult:
"""
执行命令
Args:
cmd: 要执行的命令
timeout: 超时时间(秒),None则使用默认值
retry: 重试次数,None则使用默认值
retry_delay: 重试延迟,None则使用默认值
shell: 是否使用shell执行
cwd: 工作目录
env: 环境变量
input_data: 发送到stdin的数据
realtime_output: 是否实时输出(会阻塞直到命令完成)
output_callback: 实时输出的回调函数
check: 是否检查返回码(失败时抛出异常)
**kwargs: 传递给Popen的其他参数
Returns:
CommandResult对象
Raises:
subprocess.CalledProcessError: 当check=True且命令失败时
subprocess.TimeoutExpired: 当超时发生时
"""
timeout = timeout or self.default_timeout
retry = retry if retry is not None else self.default_retry
retry_delay = retry_delay if retry_delay is not None else self.default_retry_delay
# 准备环境变量
final_env = None
if env:
final_env = os.environ.copy()
final_env.update(env)
# 命令字符串用于日志
cmd_str = cmd if isinstance(cmd, str) else ' '.join(cmd)
# 重试逻辑
for attempt in range(retry + 1):
try:
if realtime_output:
result = self._execute_realtime(
cmd, timeout, shell, cwd, final_env,
input_data, output_callback, **kwargs
)
else:
result = self._execute_blocking(
cmd, timeout, shell, cwd, final_env,
input_data, **kwargs
)
# 记录日志
if self.log_output:
self._log_result(cmd_str, result)
# 检查返回码
if check and not result.success:
raise subprocess.CalledProcessError(
result.returncode, cmd_str, result.stdout, result.stderr
)
return result
except subprocess.TimeoutExpired as e:
logger.warning(f"Command timeout (attempt {attempt + 1}/{retry + 1}): {cmd_str}")
if attempt == retry:
return CommandResult(
status=CommandStatus.TIMEOUT,
returncode=-1,
stdout="",
stderr=str(e),
command=cmd_str,
execution_time=timeout
)
import time
time.sleep(retry_delay)
except FileNotFoundError as e:
logger.error(f"Command not found: {cmd_str}")
return CommandResult(
status=CommandStatus.NOT_FOUND,
returncode=-2,
stdout="",
stderr=str(e),
command=cmd_str,
execution_time=0
)
except KeyboardInterrupt:
logger.warning(f"Command interrupted: {cmd_str}")
return CommandResult(
status=CommandStatus.INTERRUPTED,
returncode=-3,
stdout="",
stderr="Interrupted by user",
command=cmd_str,
execution_time=0
)
# 不应该到达这里
return CommandResult(
status=CommandStatus.FAILED,
returncode=-999,
stdout="",
stderr="Unknown error",
command=cmd_str,
execution_time=0
)
def _execute_blocking(
self,
cmd: Union[str, List[str]],
timeout: int,
shell: bool,
cwd: Optional[str],
env: Optional[Dict[str, str]],
input_data: Optional[str],
**kwargs
) -> CommandResult:
"""阻塞执行命令"""
import time
start_time = time.time()
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
shell=shell,
cwd=cwd,
env=env,
input=input_data,
**kwargs
)
execution_time = time.time() - start_time
return CommandResult(
status=CommandStatus.SUCCESS if result.returncode == 0 else CommandStatus.FAILED,
returncode=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
command=cmd if isinstance(cmd, str) else ' '.join(cmd),
execution_time=execution_time
)
def _execute_realtime(
self,
cmd: Union[str, List[str]],
timeout: int,
shell: bool,
cwd: Optional[str],
env: Optional[Dict[str, str]],
input_data: Optional[str],
output_callback: Optional[callable],
**kwargs
) -> CommandResult:
"""实时输出执行命令"""
import time
import threading
start_time = time.time()
stdout_lines = []
stderr_lines = []
# 启动进程
proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE if input_data else None,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=shell,
cwd=cwd,
env=env,
**kwargs
)
def read_output(pipe, lines_list, callback, is_stderr=False):
for line in iter(pipe.readline, ''):
if line:
lines_list.append(line)
if callback:
callback(line, is_stderr)
pipe.close()
# 启动输出读取线程
stdout_thread = threading.Thread(
target=read_output,
args=(proc.stdout, stdout_lines, output_callback, False)
)
stderr_thread = threading.Thread(
target=read_output,
args=(proc.stderr, stderr_lines, output_callback, True)
)
stdout_thread.start()
stderr_thread.start()
# 发送输入数据
if input_data:
proc.stdin.write(input_data)
proc.stdin.close()
# 等待进程结束(带超时)
try:
proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
raise
# 等待线程结束
stdout_thread.join()
stderr_thread.join()
execution_time = time.time() - start_time
return CommandResult(
status=CommandStatus.SUCCESS if proc.returncode == 0 else CommandStatus.FAILED,
returncode=proc.returncode,
stdout=''.join(stdout_lines),
stderr=''.join(stderr_lines),
command=cmd if isinstance(cmd, str) else ' '.join(cmd),
execution_time=execution_time
)
def _log_result(self, cmd: str, result: CommandResult):
"""记录执行结果到日志"""
if result.success:
logger.info(f"Command succeeded: {cmd} (time={result.execution_time:.2f}s)")
else:
logger.error(f"Command failed: {cmd} (code={result.returncode}, time={result.execution_time:.2f}s)")
if self.log_output and result.stdout:
logger.debug(f"STDOUT: {result.stdout[:500]}")
if self.log_output and result.stderr:
logger.warning(f"STDERR: {result.stderr[:500]}")
def execute_async(
self,
cmd: Union[str, List[str]],
callback: Optional[callable] = None,
**kwargs
) -> subprocess.Popen:
"""
异步执行命令(非阻塞)
Args:
cmd: 命令
callback: 完成时的回调函数
**kwargs: 传递给Popen的参数
Returns:
Popen对象,可用于后续通信
"""
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
**kwargs
)
if callback:
def wait_and_callback():
stdout, stderr = proc.communicate()
callback(proc.returncode, stdout, stderr)
import threading
thread = threading.Thread(target=wait_and_callback)
thread.start()
return proc
def get_output(
self,
cmd: Union[str, List[str]],
**kwargs
) -> str:
"""
获取命令输出(仅stdout)
Returns:
标准输出字符串,失败时返回空字符串
"""
result = self.execute(cmd, **kwargs)
return result.stdout if result.success else ""
def check_output(
self,
cmd: Union[str, List[str]],
**kwargs
) -> str:
"""
获取命令输出,失败时抛出异常
Returns:
标准输出字符串
Raises:
subprocess.CalledProcessError: 命令失败时
"""
result = self.execute(cmd, check=True, **kwargs)
return result.stdout
# 全局默认执行器实例
_default_executor = CommandExecutor()
def run_cmd(
cmd: Union[str, List[str]],
**kwargs
) -> CommandResult:
"""
便捷函数:执行命令并返回结果
Examples:
>>> result = run_cmd(['ls', '-l'])
>>> if result.success:
... print(result.stdout)
>>> result = run_cmd('ping 127.0.0.1 -c 4', shell=True, timeout=10)
>>> # 实时输出
>>> def on_output(line, is_error):
... print(f"{'ERROR' if is_error else 'OUT'}: {line}", end='')
>>> result = run_cmd(['ping', 'google.com'], realtime_output=True, output_callback=on_output)
"""
return _default_executor.execute(cmd, **kwargs)
def get_cmd_output(
cmd: Union[str, List[str]],
**kwargs
) -> str:
"""
便捷函数:快速获取命令输出
Examples:
>>> output = get_cmd_output(['date'])
>>> print(f"Current date: {output}")
"""
return _default_executor.get_output(cmd, **kwargs)
def check_cmd_output(
cmd: Union[str, List[str]],
**kwargs
) -> str:
"""
便捷函数:获取输出,失败抛异常
Examples:
>>> output = check_cmd_output(['which', 'python'])
>>> print(f"Python location: {output}")
"""
return _default_executor.check_output(cmd, **kwargs)
使用示例
# 示例1: 基础使用
result = run_cmd(['ls', '-la'])
if result.success:
print(f"成功: {result.stdout}")
else:
print(f"失败: {result.stderr}")
# 示例2: 带超时和重试
result = run_cmd(
['curl', 'https://api.example.com/data'],
timeout=5,
retry=3,
retry_delay=2
)
# 示例3: 实时输出
def print_output(line, is_error):
prefix = "ERROR" if is_error else "OUT"
print(f"[{prefix}] {line.rstrip()}")
result = run_cmd(
['ping', '8.8.8.8', '-c', '10'],
realtime_output=True,
output_callback=print_output
)
# 示例4: 环境变量和工作目录
result = run_cmd(
['python', 'build.py'],
cwd='/project',
env={'DEBUG': '1', 'PATH': '/custom/bin'}
)
# 示例5: 管道输入
result = run_cmd(
['grep', 'error'],
input_data="line1\nline2 error\nline3",
capture_output=True
)
# 示例6: 快速获取输出
output = get_cmd_output(['hostname'])
print(f"Hostname: {output.strip()}")
# 示例7: 检查命令执行
try:
output = check_cmd_output(['git', 'status'])
print("Git status:", output)
except subprocess.CalledProcessError as e:
print(f"Git command failed: {e}")
# 示例8: 异步执行
proc = _default_executor.execute_async(
['long_running_task'],
callback=lambda code, out, err: print(f"Done with code {code}")
)
# 继续做其他事...
proc.wait() # 可选:等待完成