Redispy 源码学习(四) --- 创建连接

了解RESP协议,并实现了pack编码,下一步就是发送命令给redis服务器,等待结果返回就好啦。

当然,想要发送命令,还需要先创建连接。如果连接都不存在,就无所谓发送命令了。我们已经了解发送网络数据需要先创建socket连接。因此下面写了一个简单的客户端代码,基于前面编码RESP的代码:

    command = Connection().pack_command(*args)
    print(command)

    import socket

    address = ('127.0.0.1', 6379)
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(address)
    sock.sendall(command[0])
    print(sock.recv(1024))

可以看到服务返回了并打印了b'+PONG\r\n'结果。一切工作正常。等等,如果每次我们发送数据都需要这么原始的创建socket,然后调用连接,发送数据,那么这个脚本也太丑了。因此,我们的主题就是学习redispy是如何抽象封装连接管理的。

连接

redispy定义了一个Connection类用于连接的管理,而定义了一个PythonParser类用来适配Hiredis。同时还实现了一个SocketBuffer方法用于收发处理socket数据。当然,本篇只讨论创建连接发送数据。

我们的客户端代码为

    conn = Connection()
    conn.send_command("PING")
    print(conn.read_response())

创建一个Connection实例,然后发送命令到redis服务器,最后再读取服务器的响应。当然,conn.read_response的源码分析暂时可以忽略。

send_command 方法内会先打包编码命令,然后再调用send_packed_command方法发送命令。send_packed_command方法的源码如下:

    def send_packed_command(self, command):
        """将编码后的redis命令发送到redis服务器"""
        if not self._sock:
            self.connect()
        try:
            if isinstance(command, str):
                command = [command]
            for item in command:
                self._sock.sendall(item)
        except socket.timeout:
            self.disconnect()
            raise TimeoutError('Timeout writing to socket')
        except socket.error:
            e = sys.exc_info()[1]
            self.disconnect()
            if len(e.args) == 1:
                errno, errmsg = 'UNKNOWN', e.args[0]
            else:
                errno = e.args[0]
                errmsg = e.args[1]
            raise ConnectionError("Error {} while writing to socket. {}.".format(errno, errmsg))
        except:
            self.disconnect()
            raise

Connection实例化的时候会初始一些熟悉,例如 _sock和_parser对象,两者分别是用于通信的socket和解析resp协议的适配器。

send_packed_command首先会检查_sock,即socket是否创建。如果尚未创建,则会调用connect方法创建连接。connect返回创建的socket连接。

一旦有了socket,就可以直接调用sendall方法发送命令了。再socket交互的过程中,还得注意可能产生的异常和错误。

创建连接

Conneciton使用connect方法创建连接,该方法如下:

    def connect(self):
        if self._sock:
            return
        try:
            sock = self._connect()
        except socket.error:
            e = sys.exc_info()[1]
            raise ConnectionError(self._error_message(e))

        self._sock = sock

        try:
            self.on_connect()
        except RedisError:
            self.disconnect()
            raise

同样也会先检查一下_sock对象,然后调用_connect方法创建socket。再创建完毕socket之后,还调用了一个on_connect方法做一些连接后的工作。

_connect的源码略多,但是很清晰:

    def _connect(self):
        """ 创建 socket 连接,并返回socket对象
        """
        err = None
        for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
            family, socktype, proto, canonname, socket_address = res
            try:
                # 创建 tcp socket 对象
                sock = socket.socket(family, socktype, proto)
                # TCP_NODELAY
                sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

                if self.socket_keepalive:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    for k, v in iteritems(self.socket_keepalive_options):
                        sock.setsockopt(socket.SOL_TCP, k, v)
                # 设置创建连接的超时时间
                sock.settimeout(self.socket_connect_timeout)
                sock.connect(socket_address)
                # 设置连接之后的超时时间
                sock.settimeout(self.socket_timeout)
                return sock
            except socket.error as _:
                err = _
                if sock is not None:
                    sock.close()
        if err is not None:
            raise err
        raise socket.error("socket.getaddrinfo returned an empty list")

首先调用socket的getaddrinfo方法,确定通信的族协议和socket类型。即IP4和tcp通信方式。然后设置socket选项,开启TCP_NODELAY模式,即禁用Nagle算法。TCP为了保证网络的性能,通常会把缓冲区的数据积累到一定程度再一起发送。而对于redis这种一问一答的通信方式,需要你尽可能的发送消息,而不用等待,因此会禁用掉该算法。

Nagle算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓“小段”,指的是小于MSS尺寸的数据块,
所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的ACK确认该数据已收到。
Nagle算法只允许一个未被ACK的包存在于网络,它并不管包的大小,因此它事实上就是一个扩展的停-等协议,只不过它是基于包停-等的,而不是基于字节停-等的。
TCP_NODELAY 选项
默认情况下,发送数据采用Nagle 算法。这样虽然提高了网络吞吐量,但是实时性却降低了,在一些交互性很强的应用程序来说是不允许的,
使用TCP_NODELAY选项可以禁止Nagle 算法。

然后就是对长连接(keepalive)的选项设置,最后设置连接的超时时间,然后调用socket的connect方法创建连接。创建连接后,自然要把连接对象的socket返回。

连接后初始化

redispy在创建了连接之后,会调用on_connect 方法做一些初始化工作:

    def on_connect(self):
        
        self._parser.on_connect(self)
        
        if self.password:
            self.send_command('AUTH', self.password)
            if nativestr(self.read_response()) != 'OK':
                raise AuthenticationError('Invalid Password')


        if self.db:
            self.send_command('SELECT', self.db)
            if nativestr(self.read_response()) != 'OK':
                raise ConnectionError('Invalid Database')

on_connect会先通过PythonParser的on_connect方法,初始化ScoketBuffer,这样做的目的就是为了接下来接受redis的回复。暂时可以忽略,主要看其他的逻辑。

如果需要认证,就再调用 send_command 方法发送认证的命令,如果创建连接指定了数据库,就发送选择数据库的指令,默认是选择数据库0。

这里的代码很有意思,我们的入口就是 send_command 方法。send_command 内的流程中又会先检查连接,连接没有就创建,连接有了才返回。此时再次调用send_command方法,显然已经有了连接,因此代码会走到self._sock.sendall(item)发送命令。

从这里也可以看出,redispy创建连接是惰性的,只有发送查询的命令的时候,才会检查连接,从而决定是否创建socket连接。

总结

redispy中创建连接并没有很多高深的内容。通过封装一个连接对象管理连接。代码不复杂并不代码没有用。实际上,正式因为设计了这么简单的连接方式,才为后面的连接池实现带来了便利。

收发数据都是基于socket,socket创建是表示可以进入数据的收发状态,但这只是最底层的连接。redis的逻辑连接还需要我们认证或者选择数据库。因为后者已经涉及了socket数据通信,因此也是再连接创立之后的操作。redispy为了考虑这种方法,很好的复用了send_command 方法。通过on_connect的方式创建redis的逻辑连接。

至于read_response的内容,将会是RESP协议的解析,和redis回复内容的话题。我们将会在接下来的文中分析。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352

推荐阅读更多精彩内容