背景
需要批量在路由器上进行配置,与网元建立SSH连接,同时存在交互操作。比如:键入Configure
,进入配置模式成功后才可以键入后续指令。
在这个过程中遇到很多坑,在此分享。
SSHClient
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=device, port=22, username=username, password=password)
stdin, stdout, stderr = ssh.exec_command("configure", bufsize=1024)
res, err = stdout.read(), stderr.read()
result = res if res else err
print(result)
SSHClient是最简单的命令行执行方式,简单的参数填写,优雅的结果处理,这很难不让人赶紧上手一试。
但遗憾的是SSHClient不支持交互式命令执行,其原因在于其exec_command
方法每次执行一条命令都会开启一个新的“channel”,从而开启一个新的session,这相当于我们每执行一次命令,都重新登录了一次网元设备,这使得交互式无从谈起。
def exec_command(
self,
command,
bufsize=-1,
timeout=None,
get_pty=False,
environment=None,
):
# 就是他,在这个open_session的说明中也有描述:Request a new channel to the server,
# of type ``"session"``.
chan = self._transport.open_session(timeout=timeout)
if get_pty:
chan.get_pty()
chan.settimeout(timeout)
if environment:
chan.update_environment(environment)
chan.exec_command(command)
stdin = chan.makefile_stdin("wb", bufsize)
stdout = chan.makefile("r", bufsize)
stderr = chan.makefile_stderr("r", bufsize)
return stdin, stdout, stderr
所以我们要创建固定channel,从而创建固定session
交互式连接
# Create a new SSH session over an existing socket, or socket-like object.
trans = paramiko.Transport((devcie, 22))
trans.start_client()
trans.auth_password(username, password)
# 新建channel
channel = trans.open_session(timeout=1200)
# 获取终端
channel.get_pty()
# 激活终端
channel.invoke_shell()
# 执行命令
channel.send(command)
# 结果获取
result = channel.recv(10240)
result = result.decode("utf-8")
产生的问题
以上是交互式连接的过程,但事情并不是一帆风顺的,在这个过程中还有两个问题,这两个问题都由交互结果获取函数channel.recv
引起,这将导致。
- 交互结果获取存在延迟
- 错误处理困难
原因
def read(self, len=1024, buffer=None):
return self._wrap_ssl_read(len, buffer)
def recv(self, len=1024, flags=0):
if flags != 0:
raise ValueError("non-zero flags not allowed in calls to recv")
return self._wrap_ssl_read(len)
def _wrap_ssl_read(self, len, buffer=None):
try:
return self._ssl_io_loop(self.sslobj.read, len, buffer)
except ssl.SSLError as e:
if e.errno == ssl.SSL_ERROR_EOF and self.suppress_ragged_eofs:
return 0 # eof, return 0.
else:
raise
def _ssl_io_loop(self, func, *args):
"""Performs an I/O loop between incoming/outgoing and the socket."""
should_loop = True
ret = None
while should_loop:
errno = None
try:
# 就是这里,递归的入口
ret = func(*args)
except ssl.SSLError as e:
if e.errno not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE):
# WANT_READ, and WANT_WRITE are expected, others are not.
raise e
errno = e.errno
buf = self.outgoing.read()
self.socket.sendall(buf)
if errno is None:
should_loop = False
elif errno == ssl.SSL_ERROR_WANT_READ:
buf = self.socket.recv(SSL_BLOCKSIZE)
if buf:
self.incoming.write(buf)
else:
self.incoming.write_eof()
return ret
以上是channel.recv
的实现过程,归根结底还是_ssl_io_loop
这个函数会递归的获取ssh交互结果,形成一种循环。这种循环的结束条件就是接受到交互结果,或者是结果读取异常。
如果我们在执行命令之后立刻获取结果,交互可能尚未产生结果,获取失败,结束循环(接收无效),channel.recv
仿佛没有执行一样。
这时我们就会想到,既然交互结果的获取具有滞后性,那我们就编写逻辑,使其等待或者循环等待。
这就会引发下一个问题,如果我们循环调用channel.recv
时,必须在获取到交互结果后结束我们的循环,不论这个交互的结果是不是你期望的;否则会进入死循环,程序无法推进。
建议
- 循环调用
channel.recv
以获取交互结果。 - 全面的结果处理,结果的处理取决于所交互设备的响应内容,一旦获取交互结果,不论是期望与否都要及时退出我们的循环;否则程序无法推进。