编码发送数据到redis服务,客户端完成了第一个交互过程,即请求的过程。接下来客户端还要接受并解析服务端的响应回复。这个过程我们需要将RESP协议编码的字节串解析成python的字串。
由于响应回复有多种,并且有多行的存在。因此解析响应的时候要注意对CRLF的处理,即tcp包的数据分界方式。在我们尚为进行真正的网络通信的时候,我们创建一个变量用于表示redis服务器返回的进入的socket缓冲区。此时的代码逻辑与读取真实的socket数据很像,后面我们再介绍redis.py的socket交互。
read_response
redispy中,调用PythonParser
类的read_response
方法来读取redis的数据。该方法又会相继调用_buffer对象的readline和read方法。后两者分别调用SocketBuffer
类的_read_from_socket方法来读取socket。为了模拟从socket中读取数据,我们会修改_read_from_socket方法,使其读socket的数据改成从我们假设的缓冲区变量读取。
class Socket(object):
def __init__(self, data):
self.data = data
def recv(self, length):
data = self.data[:length]
self.data=self.data[length:]
return data
用我们定义的Socket类模拟网络数据流,其中recv方法则从data中返回数据。为了简化学习,我们暂时把所有错误的处理都忽略。
状况回复
从前面的RESP协议可以得知,状态回复以+
开头,后面跟着状态消息,最后以CRLF结束。
测试的代码如下:
data = b'+OK\r\n'
pp = PythonParser(socket_read_size=65536)
pp.on_connect(data)
print(pp.read_response())
打印的结果为b'OK'
。我们先看下PythonParser
类的定义。
class PythonParser(object):
encoding = None
def __init__(self, socket_read_size):
self.socket_read_size = socket_read_size
self._sock = None
self._buffer = None
PythonParser
类定义了读取socket的数据大小,已经socket对象和buffer对象。
再看on_connect
方法,主要是初始化了我们假定的Socket对象和SocketBuffer
对象。
def on_connect(self, data):
self._sock = Socket(data)
self._buffer = SocketBuffer(self._sock, self.socket_read_size)
SocketBuffer
SocketBuffer类的主要职能就是把从socket中读取的数据,以bytes的方式存储到内存中。然后从内存中解析该数据。通过控制buffer的写入和写出的值,可以精确的设置什么时候从socket中读数据。
class SocketBuffer(object):
def __init__(self, socket, socket_read_size):
self._sock = socket
self.socket_read_size = socket_read_size
self._buffer = BytesIO()
self.bytes_written = 0
self.bytes_read = 0
@property
def length(self):
return self.bytes_written - self.bytes_read
def _read_from_socket(self, length=None):
pass
def purge(self):
pass
def read(self, length):
pass
def readline(self):
pass
该类实例化的时候会初始化socket对象和_buffer对象,后者是BytesIO的实例,用于读取写入内存字节数据。
回到我们的测试代码中,一旦调用了on_connect方法,下面就是调用read_response方法。在该方法中,首先会调用_buffer
对象的readline
方法:
def readline(self):
buf = self._buffer
buf.seek(self.bytes_read)
data = buf.readline()
# 处理包结束
while not data.endswith(SYM_CRLF):
self._read_from_socket()
buf.seek(self.bytes_read)
data = buf.readline()
self.bytes_read += len(data)
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
readline
方法的主要功能就是从socket中读取一行数据。首先将bytes的指针seek到起始的位置。然后判断是否以CRLF结尾,即表示是否读取了redis的一个编码单位。如果尚未读取,就会调用_read_from_socket
方法从socket缓冲区读取数据到内存缓冲区中。最后再从内存中读取一行数据到data变量中。
例如我们的例子中,redis返回的数据是b'+OK\r\n'
,此时会将所有数据都读取到BytesIO中,然后从BytesIO读取到data,最后返回+OK
。
下面再看read_response
方法:
def read_response(self):
response = self._buffer.readline()
byte, response = byte_to_chr(response[0]), response[1:]
if byte not in ('-', '+', ':', '$', '*'):
raise RedisError
# server returned an error
if byte == '-':
response = nativestr(response)
# 处理错误
return response
# single value
elif byte == '+':
pass
# int value
elif byte == ':':
response = int(response)
# bulk response
elif byte == '$':
length = int(response)
if length == -1:
return None
response = self._buffer.read(length)
# multi-bulk response
elif byte == '*':
length = int(response)
if length == -1:
return None
response = [self.read_response() for i in range(length)]
if isinstance(response, bytes) and self.encoding:
response = response.decode(self.encoding)
return response
该方法会读取stocketbuffer对象的返回,即上面的+OK
。通过判断第一个字节的类型来判断回复的类型。此时比较简单,直接返回OK
。错误回复也类似,直接把错误类型和错误信息返回即可。
分段读取
上面的例子中,socket的recv一次调用的字节是65536。可以把socket缓冲区的数据全部读取。如果设定的大小是每次只读取一个字节呢?
修改测试代码再运行,我们看见输入依然正常。因为在readline代码中,while not data.endswith(SYM_CRLF)
的判断可以帮我们断定什么时候读取完。无论一次读多少个字节,data的数据从BytesIO读取都是一行,因此最后总会读到CRLF中的\n。此时data的数据就是以\r\n
结尾,结束从socket中读数据。由此可以,tcp的读取数据是没有界限的,就像流水一样,除非我们在协议中规定以什么字符标记作为分界。上面描述的过程大致录制了一个小视频,点击下载。
数字回复
数字回复和状态回复类似,只不过回复的token类型以:
开头,其他过程和状态回复类似。不同在于客户端的解析要转换成数字类型。
批量回复
状态回复很简单,redis操作中,批量回复也很常见。并且会比较复杂。基于上面的代码运行原理。我们首先也是读取一行,然后接触回复类型。因为批量回复的token会告诉我们返回的字串的长度。可以根据该信息确定我们read_byte位置,然后将剩余的socket全部读取。
例如返回的数据如果是 $6\r\nfoobar\r\n
, 经过第一次readline的数据,我们得到的response为$6\r\n
。当确定了返回类型是批量回复,将会继续调用read
方法,将剩下的数据(foobar\r\n)读取。read的代码如下:
def read(self, length):
length = length + 2
if length > self.length:
self._read_from_socket(length - self.length)
self._buffer.seek(self.bytes_read)
data = self._buffer.read(length)
self.bytes_read += len(data)
if self.bytes_read == self.bytes_written:
self.purge()
return data[:-2]
read
方法比readline
简单。它只需要判断BytesIO中的数据是否是所有redis的数据。对于$6\r\nfoobar\r\n
而言,如果一次读5个字节,那么readline调用之后,BytesIO中还有一个f
字符,即长度为1。因为返回了字符串是6+2个字节(最后的CRLF),因此8>1,说明还要从socket中读取7个字节。即再次调用_read_from_socket
方法,与readline类似,读取到CRLF结束并返回。当读取完毕之后,需要调用self.purge情况buffer对象。为了更好的展示这个过程,也录制了一个小视频。
由于每次读取5个socket字节,因此在从socket中读取了两次。如果多读了呢。多读了也没有关系,即使BytesIO多读了socket的数据。在buffer对象读取的时候还有一个length参数,这个参数会保证以CRLF结尾。这也是redis设计协议的时候,为什么字符串返回要在$
后加上字节的长度。
多批量回复
多批量回复以*
开头,这个编码格式和请求的命令一样。多个字节串分别编码,然后再和*
参数数结合。例如下面一个回复样式:
*3\r\n$3\r\n777\r\n$6\r\n\xe4\xbd\xa0\xe5\xa5\xbd\r\n$5\r\nhello\r\n
再看read_response中解析多批量回复的代码:
elif byte == '*':
length = int(response)
if length == -1:
return None
response = [self.read_response() for i in range(length)]
一旦是多批量回复,因为*
后跟着返回的参数个数,而这些参数个数的编码和批量回复的一模一样。既然如此,那么递归调用read_response,再解析出来的批量回复组合起来即可。
特殊类型回复
RESP的回复我们都介绍了,所谓的特殊。是数据情况特别的时候,比如返回空字符串的时候,token会是0,返回Nil值的时候,token可能是-1。具体这些情况,可以参考官方文档的案例。
总结
经过上面的分析,我们了解了redispy是如何解析redis服务器返回的RESP编码的数据。解码的关键在于对socket数据的读取。尽管我们是模拟了socket对象。上面的代码和实际socket交互是完全一样的。因为真实的socket.recv调用也只是应用层的程序代码从socket的缓冲区读取数据。缓存区直接的IO则是内核在tcp层处理的内容。
我们把真实的socket.recv读取数据从内核转移到一个Socket类,这样的模拟也是合理的,并且易于调试。不然还得先模拟发送命令给redis,然后打断点等待回复。
尽管我们的模拟抽象很好,可是真实的编码还是需要处理socket的数据流,尤其是对于通信错误的处理。完整的代码可以阅读redis.py项目。
签名我们介绍了编码,创建连接和现在接受数据并解码。接下来将会实现redis.py中的另外一个特性,连接池的实现。
文中相关代码