了解RESP协议,并实现了pack编码,下一步就是发送命令给redis服务器,等待结果返回就好啦。
当然,想要发送命令,还需要先创建连接。如果连接都不存在,就无所谓发送命令了。我们已经了解发送网络数据需要先创建socket连接。因此下面写了一个简单的客户端代码,基于前面编码RESP的代码:
command = Connection().pack_command(*args)
print(command)
import socket
address = ('127.0.0.1', 6379)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(address)
sock.sendall(command[0])
print(sock.recv(1024))
可以看到服务返回了并打印了b'+PONG\r\n'
结果。一切工作正常。等等,如果每次我们发送数据都需要这么原始的创建socket,然后调用连接,发送数据,那么这个脚本也太丑了。因此,我们的主题就是学习redispy是如何抽象封装连接管理的。
连接
redispy定义了一个Connection类用于连接的管理,而定义了一个PythonParser类用来适配Hiredis。同时还实现了一个SocketBuffer方法用于收发处理socket数据。当然,本篇只讨论创建连接发送数据。
我们的客户端代码为
conn = Connection()
conn.send_command("PING")
print(conn.read_response())
创建一个Connection实例,然后发送命令到redis服务器,最后再读取服务器的响应。当然,conn.read_response的源码分析暂时可以忽略。
send_command 方法内会先打包编码命令,然后再调用send_packed_command方法发送命令。send_packed_command方法的源码如下:
def send_packed_command(self, command):
"""将编码后的redis命令发送到redis服务器"""
if not self._sock:
self.connect()
try:
if isinstance(command, str):
command = [command]
for item in command:
self._sock.sendall(item)
except socket.timeout:
self.disconnect()
raise TimeoutError('Timeout writing to socket')
except socket.error:
e = sys.exc_info()[1]
self.disconnect()
if len(e.args) == 1:
errno, errmsg = 'UNKNOWN', e.args[0]
else:
errno = e.args[0]
errmsg = e.args[1]
raise ConnectionError("Error {} while writing to socket. {}.".format(errno, errmsg))
except:
self.disconnect()
raise
Connection实例化的时候会初始一些熟悉,例如 _sock和_parser对象,两者分别是用于通信的socket和解析resp协议的适配器。
send_packed_command首先会检查_sock
,即socket是否创建。如果尚未创建,则会调用connect方法创建连接。connect返回创建的socket连接。
一旦有了socket,就可以直接调用sendall方法发送命令了。再socket交互的过程中,还得注意可能产生的异常和错误。
创建连接
Conneciton使用connect方法创建连接,该方法如下:
def connect(self):
if self._sock:
return
try:
sock = self._connect()
except socket.error:
e = sys.exc_info()[1]
raise ConnectionError(self._error_message(e))
self._sock = sock
try:
self.on_connect()
except RedisError:
self.disconnect()
raise
同样也会先检查一下_sock
对象,然后调用_connect
方法创建socket。再创建完毕socket之后,还调用了一个on_connect
方法做一些连接后的工作。
_connect
的源码略多,但是很清晰:
def _connect(self):
""" 创建 socket 连接,并返回socket对象
"""
err = None
for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
try:
# 创建 tcp socket 对象
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# 设置创建连接的超时时间
sock.settimeout(self.socket_connect_timeout)
sock.connect(socket_address)
# 设置连接之后的超时时间
sock.settimeout(self.socket_timeout)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise socket.error("socket.getaddrinfo returned an empty list")
首先调用socket的getaddrinfo方法,确定通信的族协议和socket类型。即IP4和tcp通信方式。然后设置socket选项,开启TCP_NODELAY模式,即禁用Nagle
算法。TCP为了保证网络的性能,通常会把缓冲区的数据积累到一定程度再一起发送。而对于redis这种一问一答的通信方式,需要你尽可能的发送消息,而不用等待,因此会禁用掉该算法。
Nagle算法的基本定义是任意时刻,最多只能有一个未被确认的小段。 所谓“小段”,指的是小于MSS尺寸的数据块,
所谓“未被确认”,是指一个数据块发送出去后,没有收到对方发送的ACK确认该数据已收到。
Nagle算法只允许一个未被ACK的包存在于网络,它并不管包的大小,因此它事实上就是一个扩展的停-等协议,只不过它是基于包停-等的,而不是基于字节停-等的。
TCP_NODELAY 选项
默认情况下,发送数据采用Nagle 算法。这样虽然提高了网络吞吐量,但是实时性却降低了,在一些交互性很强的应用程序来说是不允许的,
使用TCP_NODELAY选项可以禁止Nagle 算法。
然后就是对长连接(keepalive)的选项设置,最后设置连接的超时时间,然后调用socket的connect方法创建连接。创建连接后,自然要把连接对象的socket返回。
连接后初始化
redispy在创建了连接之后,会调用on_connect 方法做一些初始化工作:
def on_connect(self):
self._parser.on_connect(self)
if self.password:
self.send_command('AUTH', self.password)
if nativestr(self.read_response()) != 'OK':
raise AuthenticationError('Invalid Password')
if self.db:
self.send_command('SELECT', self.db)
if nativestr(self.read_response()) != 'OK':
raise ConnectionError('Invalid Database')
on_connect会先通过PythonParser的on_connect方法,初始化ScoketBuffer,这样做的目的就是为了接下来接受redis的回复。暂时可以忽略,主要看其他的逻辑。
如果需要认证,就再调用 send_command 方法发送认证的命令,如果创建连接指定了数据库,就发送选择数据库的指令,默认是选择数据库0。
这里的代码很有意思,我们的入口就是 send_command 方法。send_command 内的流程中又会先检查连接,连接没有就创建,连接有了才返回。此时再次调用send_command方法,显然已经有了连接,因此代码会走到self._sock.sendall(item)
发送命令。
从这里也可以看出,redispy创建连接是惰性的,只有发送查询的命令的时候,才会检查连接,从而决定是否创建socket连接。
总结
redispy中创建连接并没有很多高深的内容。通过封装一个连接对象管理连接。代码不复杂并不代码没有用。实际上,正式因为设计了这么简单的连接方式,才为后面的连接池实现带来了便利。
收发数据都是基于socket,socket创建是表示可以进入数据的收发状态,但这只是最底层的连接。redis的逻辑连接还需要我们认证或者选择数据库。因为后者已经涉及了socket数据通信,因此也是再连接创立之后的操作。redispy为了考虑这种方法,很好的复用了send_command 方法。通过on_connect的方式创建redis的逻辑连接。
至于read_response的内容,将会是RESP协议的解析,和redis回复内容的话题。我们将会在接下来的文中分析。