什么是 Paramiko?
Paramiko 是一个 Python 实现的 SSH 协议库,提供了 SSH 客户端和 SSH 服务器的 API。它允许你通过 SSH 协议远程控制服务器,进行数据传输或在 Shell 中执行命令等操作。
如何安装 Paramiko?
Paramiko 可以使用 pip 安装,命令如下:
pip install paramiko
如何使用 Paramiko 连接 SSH 服务器?
使用 Paramiko 连接 SSH 服务器可以通过如下代码实现:
import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Establish SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_password)
# Execute command
command = 'your_ssh_command'
stdin, stdout, stderr = ssh.exec_command(command)
# Print output
print(stdout.read().decode())
# Close SSH connection
ssh.close()
如何使用 Paramiko 上传和下载文件?
可以使用 Paramiko 的 SFTP API 上传和下载文件:
import paramiko
# SFTP credentials
sftp_host = 'your_sftp_host'
sftp_user = 'your_sftp_username'
sftp_password = 'your_sftp_password'
# Establish SFTP connection
transport = paramiko.Transport((sftp_host, 22))
transport.connect(username=sftp_user, password=sftp_password)
sftp = transport.open_sftp()
# Download a remote file
remote_file_path = '/path/to/remote/file'
local_file_path = '/path/to/local/file'
sftp.get(remote_file_path, local_file_path)
# Upload a local file
local_file_path = '/path/to/local/file'
remote_file_path = '/path/to/remote/file'
sftp.put(local_file_path, remote_file_path)
# Close SFTP connection
sftp.close()
transport.close()
如何使用 Paramiko 执行 sudo 命令?
可以使用 Paramiko 的 invoke_shell()
方法来模拟一个终端会话,然后执行 sudo 命令:
import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Establish SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_password)
# Start a shell session
shell = ssh.invoke_shell()
shell.send('sudo your_command\n')
# Wait for the password prompt
while not shell.recv_ready():
pass
shell.send('your_password\n')
# Execute command
output = shell.recv(1024)
# Print output
print(output.decode())
# Close SSH connection
ssh.close()
** 增加异常处理 **
import paramiko
import json
# SSH连接信息
hostname = "1.1.1.1"
username = "root"
password = "123456"
# 命令
command = "kubectl get cm -n test xxx-configmap -o json"
# 创建SSH客户端
client = paramiko.SSHClient()
# 自动添加主机密钥
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 连接SSH服务器
try:
client.connect(hostname, username=username, password=password)
except paramiko.AuthenticationException:
print("Authentication failed, please verify your password.")
except paramiko.SSHException as sshException:
print("Unable to establish SSH connection: %s" % sshException)
except paramiko.SSHException as e:
print(e)
# 执行命令
try:
stdin, stdout, stderr = client.exec_command(command, timeout=10)
output = stdout.read().decode()
print(output)
result = json.loads(output)
print(result["kind"])
except paramiko.SSHException as sshException:
print("Unable to execute command: %s" % sshException)
# 关闭连接
client.close()
Paramiko 的 SSH 隧道功能
import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Tunnel credentials
tunnel_host = 'your_tunnel_host'
tunnel_port = your_tunnel_port
tunnel_user = 'your_tunnel_username'
tunnel_password = 'your_tunnel_password'
# Establish SSH tunnel
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(tunnel_host, username=tunnel_user, password=tunnel_password)
transport = ssh.get_transport()
local_port = 12345
remote_port = 54321
transport.request_port_forward('', local_port, remote_port)
# Establish SSH connection through the tunnel
ssh_tunnel = paramiko.SSHClient()
ssh_tunnel.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_tunnel.connect(ssh_host, username=ssh_user, password=ssh_password, sock=transport.open_channel('direct-tcpip', ('127.0.0.1', remote_port), ('127.0.0.1', local_port)))
# Execute command
command = 'your_ssh_command'
stdin, stdout, stderr = ssh_tunnel.exec_command(command)
# Print output
print(stdout.read().decode())
# Close SSH connection
ssh_tunnel.close()
transport.close()
ssh.close()
以上就是关于 Python Paramiko 模块的文章总结。