Paramiko 模块

什么是 Paramiko?
Paramiko 是一个 Python 实现的 SSH 协议库,提供了 SSH 客户端和 SSH 服务器的 API。它允许你通过 SSH 协议远程控制服务器,进行数据传输或在 Shell 中执行命令等操作。
如何安装 Paramiko?
Paramiko 可以使用 pip 安装,命令如下:
pip install paramiko
如何使用 Paramiko 连接 SSH 服务器?
使用 Paramiko 连接 SSH 服务器可以通过如下代码实现:

import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Establish SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_password)
# Execute command
command = 'your_ssh_command'
stdin, stdout, stderr = ssh.exec_command(command)
# Print output
print(stdout.read().decode())
# Close SSH connection
ssh.close()

如何使用 Paramiko 上传和下载文件?
可以使用 Paramiko 的 SFTP API 上传和下载文件:

import paramiko
# SFTP credentials
sftp_host = 'your_sftp_host'
sftp_user = 'your_sftp_username'
sftp_password = 'your_sftp_password'
# Establish SFTP connection
transport = paramiko.Transport((sftp_host, 22))
transport.connect(username=sftp_user, password=sftp_password)
sftp = transport.open_sftp()
# Download a remote file
remote_file_path = '/path/to/remote/file'
local_file_path = '/path/to/local/file'
sftp.get(remote_file_path, local_file_path)
# Upload a local file
local_file_path = '/path/to/local/file'
remote_file_path = '/path/to/remote/file'
sftp.put(local_file_path, remote_file_path)
# Close SFTP connection
sftp.close()
transport.close()

如何使用 Paramiko 执行 sudo 命令?
可以使用 Paramiko 的 invoke_shell() 方法来模拟一个终端会话,然后执行 sudo 命令:

import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Establish SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_password)
# Start a shell session
shell = ssh.invoke_shell()
shell.send('sudo your_command\n')
# Wait for the password prompt
while not shell.recv_ready():
    pass
shell.send('your_password\n')
# Execute command
output = shell.recv(1024)
# Print output
print(output.decode())
 # Close SSH connection
ssh.close()

** 增加异常处理 **

import paramiko
import json


# SSH连接信息
hostname = "1.1.1.1"
username = "root"
password = "123456"
# 命令
command = "kubectl get cm -n test xxx-configmap -o json"
# 创建SSH客户端
client = paramiko.SSHClient()
# 自动添加主机密钥
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())


# 连接SSH服务器
try:
    client.connect(hostname, username=username, password=password)
except paramiko.AuthenticationException:
    print("Authentication failed, please verify your password.")
except paramiko.SSHException as sshException:
    print("Unable to establish SSH connection: %s" % sshException)
except paramiko.SSHException as e:
    print(e)


# 执行命令
try:
    stdin, stdout, stderr = client.exec_command(command, timeout=10)
    output = stdout.read().decode()
    print(output)
    result = json.loads(output)
    print(result["kind"])
except paramiko.SSHException as sshException:
    print("Unable to execute command: %s" % sshException)

# 关闭连接
client.close()

Paramiko 的 SSH 隧道功能

import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Tunnel credentials
tunnel_host = 'your_tunnel_host'
tunnel_port = your_tunnel_port
tunnel_user = 'your_tunnel_username'
tunnel_password = 'your_tunnel_password'
# Establish SSH tunnel
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(tunnel_host, username=tunnel_user, password=tunnel_password)
transport = ssh.get_transport()
local_port = 12345
remote_port = 54321
transport.request_port_forward('', local_port, remote_port)
# Establish SSH connection through the tunnel
ssh_tunnel = paramiko.SSHClient()
ssh_tunnel.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh_tunnel.connect(ssh_host, username=ssh_user, password=ssh_password, sock=transport.open_channel('direct-tcpip', ('127.0.0.1', remote_port), ('127.0.0.1', local_port)))
# Execute command
command = 'your_ssh_command'
stdin, stdout, stderr = ssh_tunnel.exec_command(command)
# Print output
print(stdout.read().decode())
# Close SSH connection
ssh_tunnel.close()
transport.close()
ssh.close()

以上就是关于 Python Paramiko 模块的文章总结。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容