Redispy 源码学习(五) --- RESP协议实现--解码

编码发送数据到redis服务,客户端完成了第一个交互过程,即请求的过程。接下来客户端还要接受并解析服务端的响应回复。这个过程我们需要将RESP协议编码的字节串解析成python的字串。

由于响应回复有多种,并且有多行的存在。因此解析响应的时候要注意对CRLF的处理,即tcp包的数据分界方式。在我们尚为进行真正的网络通信的时候,我们创建一个变量用于表示redis服务器返回的进入的socket缓冲区。此时的代码逻辑与读取真实的socket数据很像,后面我们再介绍redis.py的socket交互。

read_response

redispy中,调用PythonParser类的read_response方法来读取redis的数据。该方法又会相继调用_buffer对象的readline和read方法。后两者分别调用SocketBuffer类的_read_from_socket方法来读取socket。为了模拟从socket中读取数据,我们会修改_read_from_socket方法,使其读socket的数据改成从我们假设的缓冲区变量读取。

class Socket(object):

    def __init__(self, data):
        self.data = data

    def recv(self, length):
        data = self.data[:length]
        self.data=self.data[length:]
        return data

用我们定义的Socket类模拟网络数据流,其中recv方法则从data中返回数据。为了简化学习,我们暂时把所有错误的处理都忽略。

状况回复

从前面的RESP协议可以得知,状态回复以+开头,后面跟着状态消息,最后以CRLF结束。

测试的代码如下:

    data = b'+OK\r\n'
    pp = PythonParser(socket_read_size=65536)
    pp.on_connect(data)
    print(pp.read_response())

打印的结果为b'OK'。我们先看下PythonParser类的定义。

class PythonParser(object):
    encoding = None

    def __init__(self, socket_read_size):
        self.socket_read_size = socket_read_size
        self._sock = None
        self._buffer = None

PythonParser类定义了读取socket的数据大小,已经socket对象和buffer对象。

再看on_connect方法,主要是初始化了我们假定的Socket对象和SocketBuffer对象。

    def on_connect(self, data):
        self._sock = Socket(data)
        self._buffer = SocketBuffer(self._sock, self.socket_read_size)

SocketBuffer

SocketBuffer类的主要职能就是把从socket中读取的数据,以bytes的方式存储到内存中。然后从内存中解析该数据。通过控制buffer的写入和写出的值,可以精确的设置什么时候从socket中读数据。

class SocketBuffer(object):
    def __init__(self, socket, socket_read_size):
        self._sock = socket
        self.socket_read_size = socket_read_size
        self._buffer = BytesIO()
        self.bytes_written = 0
        self.bytes_read = 0

    @property
    def length(self):
        return self.bytes_written - self.bytes_read

    def _read_from_socket(self, length=None):
        pass

    def purge(self):
        pass
        
    def read(self, length):
        pass
        
    def readline(self):
        pass

该类实例化的时候会初始化socket对象和_buffer对象,后者是BytesIO的实例,用于读取写入内存字节数据。

回到我们的测试代码中,一旦调用了on_connect方法,下面就是调用read_response方法。在该方法中,首先会调用_buffer对象的readline方法:

    def readline(self):
        buf = self._buffer
        buf.seek(self.bytes_read)
        data = buf.readline()
        # 处理包结束
        while not data.endswith(SYM_CRLF):
            self._read_from_socket()
            buf.seek(self.bytes_read)
            data = buf.readline()

        self.bytes_read += len(data)
        if self.bytes_read == self.bytes_written:
            self.purge()

        return data[:-2]

readline方法的主要功能就是从socket中读取一行数据。首先将bytes的指针seek到起始的位置。然后判断是否以CRLF结尾,即表示是否读取了redis的一个编码单位。如果尚未读取,就会调用_read_from_socket方法从socket缓冲区读取数据到内存缓冲区中。最后再从内存中读取一行数据到data变量中。

例如我们的例子中,redis返回的数据是b'+OK\r\n',此时会将所有数据都读取到BytesIO中,然后从BytesIO读取到data,最后返回+OK

下面再看read_response方法:

    def read_response(self):
        response = self._buffer.readline()

        byte, response = byte_to_chr(response[0]), response[1:]

        if byte not in ('-', '+', ':', '$', '*'):
            raise RedisError

        # server returned an error
        if byte == '-':
            response = nativestr(response)
            # 处理错误
            return response
        # single value
        elif byte == '+':
            pass
        # int value
        elif byte == ':':
            response = int(response)
        # bulk response
        elif byte == '$':
            length = int(response)
            if length == -1:
                return None
            response = self._buffer.read(length)
        # multi-bulk response
        elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self.read_response() for i in range(length)]
        if isinstance(response, bytes) and self.encoding:
            response = response.decode(self.encoding)
        return response

该方法会读取stocketbuffer对象的返回,即上面的+OK。通过判断第一个字节的类型来判断回复的类型。此时比较简单,直接返回OK。错误回复也类似,直接把错误类型和错误信息返回即可。

分段读取

上面的例子中,socket的recv一次调用的字节是65536。可以把socket缓冲区的数据全部读取。如果设定的大小是每次只读取一个字节呢?

修改测试代码再运行,我们看见输入依然正常。因为在readline代码中,while not data.endswith(SYM_CRLF)的判断可以帮我们断定什么时候读取完。无论一次读多少个字节,data的数据从BytesIO读取都是一行,因此最后总会读到CRLF中的\n。此时data的数据就是以\r\n结尾,结束从socket中读数据。由此可以,tcp的读取数据是没有界限的,就像流水一样,除非我们在协议中规定以什么字符标记作为分界。上面描述的过程大致录制了一个小视频,点击下载

数字回复

数字回复和状态回复类似,只不过回复的token类型以:开头,其他过程和状态回复类似。不同在于客户端的解析要转换成数字类型。

批量回复

状态回复很简单,redis操作中,批量回复也很常见。并且会比较复杂。基于上面的代码运行原理。我们首先也是读取一行,然后接触回复类型。因为批量回复的token会告诉我们返回的字串的长度。可以根据该信息确定我们read_byte位置,然后将剩余的socket全部读取。

例如返回的数据如果是 $6\r\nfoobar\r\n, 经过第一次readline的数据,我们得到的response为$6\r\n。当确定了返回类型是批量回复,将会继续调用read方法,将剩下的数据(foobar\r\n)读取。read的代码如下:

    def read(self, length):
        length = length + 2
        if length > self.length:
            self._read_from_socket(length - self.length)

        self._buffer.seek(self.bytes_read)
        data = self._buffer.read(length)
        self.bytes_read += len(data)

        if self.bytes_read == self.bytes_written:
            self.purge()

        return data[:-2]

read方法比readline简单。它只需要判断BytesIO中的数据是否是所有redis的数据。对于$6\r\nfoobar\r\n而言,如果一次读5个字节,那么readline调用之后,BytesIO中还有一个f字符,即长度为1。因为返回了字符串是6+2个字节(最后的CRLF),因此8>1,说明还要从socket中读取7个字节。即再次调用_read_from_socket方法,与readline类似,读取到CRLF结束并返回。当读取完毕之后,需要调用self.purge情况buffer对象。为了更好的展示这个过程,也录制了一个小视频

由于每次读取5个socket字节,因此在从socket中读取了两次。如果多读了呢。多读了也没有关系,即使BytesIO多读了socket的数据。在buffer对象读取的时候还有一个length参数,这个参数会保证以CRLF结尾。这也是redis设计协议的时候,为什么字符串返回要在$后加上字节的长度。

多批量回复

多批量回复以*开头,这个编码格式和请求的命令一样。多个字节串分别编码,然后再和*参数数结合。例如下面一个回复样式:

*3\r\n$3\r\n777\r\n$6\r\n\xe4\xbd\xa0\xe5\xa5\xbd\r\n$5\r\nhello\r\n

再看read_response中解析多批量回复的代码:

elif byte == '*':
            length = int(response)
            if length == -1:
                return None
            response = [self.read_response() for i in range(length)]

一旦是多批量回复,因为*后跟着返回的参数个数,而这些参数个数的编码和批量回复的一模一样。既然如此,那么递归调用read_response,再解析出来的批量回复组合起来即可。

特殊类型回复

RESP的回复我们都介绍了,所谓的特殊。是数据情况特别的时候,比如返回空字符串的时候,token会是0,返回Nil值的时候,token可能是-1。具体这些情况,可以参考官方文档的案例。

总结

经过上面的分析,我们了解了redispy是如何解析redis服务器返回的RESP编码的数据。解码的关键在于对socket数据的读取。尽管我们是模拟了socket对象。上面的代码和实际socket交互是完全一样的。因为真实的socket.recv调用也只是应用层的程序代码从socket的缓冲区读取数据。缓存区直接的IO则是内核在tcp层处理的内容。

我们把真实的socket.recv读取数据从内核转移到一个Socket类,这样的模拟也是合理的,并且易于调试。不然还得先模拟发送命令给redis,然后打断点等待回复。

尽管我们的模拟抽象很好,可是真实的编码还是需要处理socket的数据流,尤其是对于通信错误的处理。完整的代码可以阅读redis.py项目。

签名我们介绍了编码,创建连接和现在接受数据并解码。接下来将会实现redis.py中的另外一个特性,连接池的实现。

文中相关代码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容