paramiko实现SSH交互式命令执行

背景

需要批量在路由器上进行配置,与网元建立SSH连接,同时存在交互操作。比如:键入Configure,进入配置模式成功后才可以键入后续指令。

在这个过程中遇到很多坑,在此分享。

SSHClient

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=device, port=22, username=username, password=password)
stdin, stdout, stderr = ssh.exec_command("configure", bufsize=1024)
res, err = stdout.read(), stderr.read()
result = res if res else err
print(result)

SSHClient是最简单的命令行执行方式,简单的参数填写,优雅的结果处理,这很难不让人赶紧上手一试。

但遗憾的是SSHClient不支持交互式命令执行,其原因在于其exec_command方法每次执行一条命令都会开启一个新的“channel”,从而开启一个新的session,这相当于我们每执行一次命令,都重新登录了一次网元设备,这使得交互式无从谈起。

def exec_command(
    self,
    command,
    bufsize=-1,
    timeout=None,
    get_pty=False,
    environment=None,
):
# 就是他,在这个open_session的说明中也有描述:Request a new channel to the server, 
# of type ``"session"``.
chan = self._transport.open_session(timeout=timeout)
if get_pty:
    chan.get_pty()
chan.settimeout(timeout)
if environment:
    chan.update_environment(environment)
chan.exec_command(command)
stdin = chan.makefile_stdin("wb", bufsize)
stdout = chan.makefile("r", bufsize)
stderr = chan.makefile_stderr("r", bufsize)
return stdin, stdout, stderr

所以我们要创建固定channel,从而创建固定session

交互式连接

# Create a new SSH session over an existing socket, or socket-like object.
trans = paramiko.Transport((devcie, 22))
trans.start_client()
trans.auth_password(username, password)

# 新建channel
channel = trans.open_session(timeout=1200)
# 获取终端
channel.get_pty()
# 激活终端
channel.invoke_shell()
# 执行命令
channel.send(command)
# 结果获取
result = channel.recv(10240)
result = result.decode("utf-8")

产生的问题

以上是交互式连接的过程,但事情并不是一帆风顺的,在这个过程中还有两个问题,这两个问题都由交互结果获取函数channel.recv引起,这将导致。

  1. 交互结果获取存在延迟
  2. 错误处理困难

原因

def read(self, len=1024, buffer=None):
    return self._wrap_ssl_read(len, buffer)


def recv(self, len=1024, flags=0):
    if flags != 0:
        raise ValueError("non-zero flags not allowed in calls to recv")
    return self._wrap_ssl_read(len)


def _wrap_ssl_read(self, len, buffer=None):
    try:
        return self._ssl_io_loop(self.sslobj.read, len, buffer)
    except ssl.SSLError as e:
        if e.errno == ssl.SSL_ERROR_EOF and self.suppress_ragged_eofs:
            return 0  # eof, return 0.
        else:
            raise

        
def _ssl_io_loop(self, func, *args):
    """Performs an I/O loop between incoming/outgoing and the socket."""
    should_loop = True
    ret = None
    while should_loop:
        errno = None
        try:
            # 就是这里,递归的入口
            ret = func(*args)
        except ssl.SSLError as e:
            if e.errno not in (ssl.SSL_ERROR_WANT_READ, ssl.SSL_ERROR_WANT_WRITE):
                # WANT_READ, and WANT_WRITE are expected, others are not.
                raise e
            errno = e.errno
        buf = self.outgoing.read()
        self.socket.sendall(buf)
        if errno is None:
            should_loop = False
        elif errno == ssl.SSL_ERROR_WANT_READ:
            buf = self.socket.recv(SSL_BLOCKSIZE)
            if buf:
                self.incoming.write(buf)
            else:
                self.incoming.write_eof()
    return ret

以上是channel.recv的实现过程,归根结底还是_ssl_io_loop 这个函数会递归的获取ssh交互结果,形成一种循环。这种循环的结束条件就是接受到交互结果,或者是结果读取异常。

如果我们在执行命令之后立刻获取结果,交互可能尚未产生结果,获取失败,结束循环(接收无效),channel.recv仿佛没有执行一样。

这时我们就会想到,既然交互结果的获取具有滞后性,那我们就编写逻辑,使其等待或者循环等待。

这就会引发下一个问题,如果我们循环调用channel.recv时,必须在获取到交互结果后结束我们的循环,不论这个交互的结果是不是你期望的;否则会进入死循环,程序无法推进。

建议

  1. 循环调用channel.recv以获取交互结果。
  2. 全面的结果处理,结果的处理取决于所交互设备的响应内容,一旦获取交互结果,不论是期望与否都要及时退出我们的循环;否则程序无法推进。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容