引言
sockets的历史悠久,它们最早在 1971 年的 APPANET 中使用,后来成为1983年发布的Berkeley Software Distribution(BSD)操作系统中的API,称为Berkeley sockets。
Web服务器和浏览器并不是使用sockets的唯一程序,各种规模和类型的客户端 - 服务器(client - server)应用程序也得到了广泛使用。
今天,尽管socket API使用的底层协议已经发展多年,而且已经有新协议出现,但是底层 API 仍然保持不变。
最常见的套接字应用程序类型是客户端 - 服务器(client - server)应用程序,其中一方充当服务器并等待来自客户端的连接。
Socket API介绍
Python中的socket模块提供了一个到Berkeley sockets API的接口,其中的主要接口函数如下:
socket()
bind()
listen()
accept()
connect()
connect_ex()
send()
recv()
close()
这些方便使用的接口函数和系统底层的功能调用相一致。
TCP Sockets
我们准备构建一个基于 TCP 协议的socket对象,为什么使用 TCP 呢,因为:
可靠性:如果在传输过程中因为网络原因导致数据包丢失,会有相关机制检测到并且进行重新传输
按序到达:一方发送到另一方的数据包是按发送顺序被接收的。
对比之下,UDP 协议是不提供这些保证的,但是它的响应效率更高,资源消耗更少。
TCP 协议并不需要我们自己去实现,在底层都已经实现好了,我们只需要使用Python的socket模块,进行协议指定就可以了。socket.SOCK_STREAM表示使用 TCP 协议,socket.SOCK_DGRAM表示使用 UDP 协议
我们来看看基于 TCP 协议socket的 API 调用和数据传送流程图,右边的一列是服务器端(server),左边的一列是客户端(client)。
要实现左边的处于监听状态的server,我们需要按照顺序调用这样几个函数:
socket(): 创建一个socket对象
bind(): 关联对应 ip 地址和端口号
listen(): 允许对象接收其他socket的连接
accept(): 接收其他socket的连接,返回一个元组(conn, addr),conn 是一个新的socket对象,代表这个连接,addr 是连接端的地址信息。
client调用connect()时,会通过 TCP 的三次握手,建立连接。当client连接到server时,server会调用accept()完成这次连接。
双方通过send()和recv()来接收和发送数据,最后通过close()来关闭这次连接,释放资源。一般server端是不关闭的,会继续等待其他的连接。
Echo Client and Server
刚才我们弄清楚了server和client使用socket进行通信的过程,我们现在要自己进行一个简单的也是经典的实现:server复述从client接收的信息。
Echo Server
import socket
HOST = '127.0.0.1' # Standard loopback interface address (localhost)
PORT = 65431 # Port to listen on (non-privileged ports are > 1023)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen()
conn, addr = s.accept()
with conn:
print('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
socket.socket()创建了一个socket对象,它实现了上下文管理器协议,我们直接用 with 语句进行创建即可,而且最后不需要调用close()函数。
socket()中的两个参数指明了连接需要的 ip地址类型和传输协议类型,socket.AF_INET 表示使用 IPv4的地址进行连接,socket.SOCK_STREAM 表示使用 TCP 协议进行数据的传输。
bind()用来将socket对象和特定的网络对象和端口号进行关联,函数中的两个参数是由创建socket对象时指定的 ip地址类型 决定的,这里使用的是socket.AF_INET(IPv4),因此,bind()函数接收一个元组对象作为参数(HOST, PORT)
host可以是一个主机名,IP地址,或者空字符串。如果使用的是 IP地址,host必须是 IPv4格式的地址字符串。127.0.0.1是本地环路的标准写法,因此只有在主机上的进程才能够连接到server,如果设置为空字符串,它可以接受所有合法 IPv4地址的连接。
port应该是从1 - 65535的一个整数(0被保留了),它相当于是一个窗口和其他的客户端建立连接,如果想使用1 - 1024的端口,一些系统可能会要求要有管理员权限。
listen()使得server可以接受连接,它可以接受一个参数:backlog,用来指明系统可以接受的连接数量,虽然同一时刻只能与一端建立连接,但是其他的连接请求可以被放入等待队列中,当前面的连接断开,后面的请求会依次被处理,超过这个数量的连接请求再次发起后,会被server直接拒绝。
从Python 3.5开始,这个参数是可选的,如果我们不明确指明,它就采用系统默认值。如果server端在同一时刻会收到大量的连接请求,通常要把这个值调大一些,在Linux中,可以在/proc/sys/net/core/somaxconn看到值的情况,详细请参阅:
Will increasing net.core.somaxconn make a difference?
How TCP backlog works in Linux
accept()监听连接的建立,是一个阻塞式调用,当有client连接之后,它会返回一个代表这个连接的新的socket对象和代表client地址信息的元组。对于 IPv4 的地址连接,地址信息是 (host, port),对于 IPv6 ,(host, port, flowinfo, scopeid)
有一件事情需要特别注意,accept()之后,我们获得了一个新的socket对象,它和server以及client都不同,我们用它来进行和client的通信。
conn, addr = s.accept()
with conn:
print('Connected by', addr)
while True:
data = conn.recv(1024)
if not data:
break
conn.sendall(data)
conn是我们新获得的socket对象,conn.recv()也是一个阻塞式调用,它会等待底层的 I/O 响应,直到获得数据才继续向下执行。外面的while循环保证server端一直监听,通过conn.sendall将数据再发送回去。
Echo Client
import socket
HOST = '127.0.0.1' # The server's hostname or IP address
PORT = 65431 # The port used by the server
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall("Hello, world".encode("utf8"))
data = s.recv(1024)
print('Received', data.decode("utf8"))
和server相比,client更加简单,先是创建了一个socket对象,然后将它和server连接,通过s.sendall()将信息发送给server,通过s.recv()获得来自server的数据,然后将其打印输出。
在发送数据时,只支持发送字节型数据,所以我们要将需要发送的数据进行编码,在收到server端的回应后,将得到的数据进行解码,就能还原出我们能够识别的字符串了。
启动程序
我们要先启动server端,做好监听准备,然后再启动client端,进行连接。
这个信息是在client连接后打印出来的。
可以使用netstat这个命令查看socket的状态,更详细使用可以查阅帮助文档。
查看系统中处于监听状态的socket,过滤出了使用 TCP协议 和 IPv4 地址的对象:
如果先启动了client,会有下面这个经典的错误:
造成的原因可能是端口号写错了,或者server根本就没运行,也可能是在server端存在防火墙阻值了连接建立,下面是一些常见的错误异常:
Exceptionerrno ConstantDescriptionBlockingIOErrorEWOULDBLOCKResource temporarily unavailable. For example, in non-blocking mode, when calling send() and the peer is busy and not reading, the send queue (network buffer) is full. Or there are issues with the network. Hopefully this is a temporary condition.OSErrorADDRINUSEAddress already in use. Make sure there’s not another process running that’s using the same port number and your server is setting the socket option SO_REUSEADDR: socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1).ConnectionResetErrorECONNRESETConnection reset by peer. The remote process crashed or did not close its socket properly (unclean shutdown). Or there’s a firewall or other device in the network path that’s missing rules or misbehaving.TimeoutErrorETIMEDOUTOperation timed out. No response from peer.ConnectionRefusedErrorECONNREFUSEDConnection refused. No application listening on specified port.
连接的建立
现在我们仔细看一下server和client是怎样建通信的:
当使用环路网络((IPv4 address 127.0.0.1 or IPv6 address ::1))的时候,数据没有离开过主机跑到外部的网络。如图所示,环路网络是在主机内部建立的,数据就经过它来发送,从主机上运行的一个程序发送到另一个程序,从主机发到主机。这就是为什么我们喜欢说环路网络和 IP地址 127.0.0.1(IPv4) 或 ::1(IPv6) 都表示主机
如果server使用的时其他的合法IP地址,它就会通过以太网接口与外部网络建立联系:
如何处理多端连接
echo server最大的缺点就是它同一时间只能服务一个client,直到连接的断开,echo client同样也有不足,当client进行如下操作时,有可能s.recv()只返回了一个字节的数据,数据并不完整。
data = s.recv(1024)
这里所设定的参数 1024 表示单次接收的最大数据量,并不是说会返回 1024 字节的数据。在server中使用的send()与之类似,调用后它有一个返回值,标示已经发送出去的数据量,可能是小于我们实际要发送的数据量,比如说有 6666 字节的数据要发送,用上面的发送方式要发送很多此才行,也就是说一次调用send()数据并没有被完整发送,我们需要自己做这个检查来确保数据完整发送了。
因此,这里使用了sendall(),它会不断地帮我们发送数据直到数据全部发送或者出现错误。
所以,目前有两个问题:
怎样同时处理多个连接?
怎样调用send()和recv()直到数据全部发送或接收。
要实现并发,传统方法是使用多线程,最近比较流行的方法是使用在Python3.4中引入的异步IO模块asyncio。
这里准备用更加传统,但是更容易理解的方式来实现,基于系统底层的一个调用:select(),Python中也提供了
对应的模块:selectors
select()通过了一种机制,它来监听操作发生情况,一旦某个操作准备就绪(一般是读就绪或者是写就绪),然后将需要进行这些操作的应用程序select出来,进行相应的读和写操作。到这里,你可能会发现这并没有实现并发,但是它的响应速度非常快,通过异步操作,足够模拟并发的效果了。
Muti-Connection Client and Server
Multi-Connection Server
import selectors
sel = selectors.DefaultSelector()
# ...
lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print('listening on', (host, port))
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)
和echo server最大的不同就在于,通过lsock.setblocking(False),将这个socket对象设置成了非阻塞形式,与sel.select()一起使用,就可以在一个或多个socket对象上等待事件,然后在数据准备就绪时进行数据的读写操作。
sel.register()给server注册了我们需要的事件,对server来说,我们需要 I/O 可读,从而进行client发送数据的读入,因此,通过selector.EVENT_READ来指明。
data用来存储和socket有关的任何数据,当sel.select()返回结果时,它也被返回,我们用它作为一个标志,来追踪拥有读入和写入操作的socket对象。
接下来是事件循环:
import selectors
sel = selectors.DefaultSelector()
# ...
while True:
events = sel.select(timeout=None)
for key, mask in events:
if key.data is None:
accept_wrapper(key.fileobj)
else:
service_connection(key, mask)
sel.select(timeout=None)是一个阻塞式调用,直到有socket对象准备好了 I/O 操作,或者等待时间超过设定的timeout。它将返回(key, events)这类元组构成的一个列表,每一个对应一个就绪的socket对象。
key是一个SeletorKey类型的实例,它有一个fileobj的属性,这个属性就是sokect对象。
mask是就绪操作的状态掩码。
如果key.data is None,我们就知道,这是一个server对象,于是要调用accept()方法,用来等待client的连接。不过我们要调用我们自己的accept_wrapper()函数,里面还会包含其他的逻辑。
如果key.data is not None,我们就知道,这是一个client对象,它带着数据来建立连接啦!然后我们要为它提供服务,于是就调用service_connection(key, mask),完成所有的服务逻辑。
def accept_wrapper(sock):
conn, addr = sock.accept() # Should be ready to read
print('accepted connection from', addr)
conn.setblocking(False)
data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
events = selectors.EVENT_READ | selectors.EVENT_WRITE
sel.register(conn, events, data=data)
这个函数用来处理与client的连接,使用conn.setblocking(False)将该对象设置为非阻塞状态,这正是我们在这个版本的程序中所需要的,否则,整个server会停止,直到它返回,这意味着其他socket对象进入等待状态。
然后,使用types.SimplleNamespace()构建了一个data对象,存储我们想保存的数据和socket对象。
因为数据的读写都是通过conn,所以使用selectors.EVENT_READ | selectors.EVENT_WRITE,然后用sel.register(conn, events, data=data)进行注册。
def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
data.outb += recv_data
else:
print('closing connection to', data.addr)
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if data.outb:
print('echoing', repr(data.outb), 'to', data.addr)
sent = sock.send(data.outb) # Should be ready to write
data.outb = data.outb[sent:]
这就时服务逻辑的核心,key中包含了socket对象和data对象,mask是已经就绪操作的掩码。根据sock可以读,将数据保存在data.outb中,这也将成为写出的数据。
if recv_data:
data.outb += recv_data
else:
print('closing connection to', data.addr)
sel.unregister(sock)
sock.close()
如果没有接收到数据,说明client数据发完了,sock的状态不再被追踪,然后关闭这次连接。
Multi-Connection Client
messages = [b'Message 1 from client.', b'Message 2 from client.']
def start_connections(host, port, num_conns):
server_addr = (host, port)
for i in range(0, num_conns):
connid = i + 1
print('starting connection', connid, 'to', server_addr)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(False)
sock.connect_ex(server_addr)
events = selectors.EVENT_READ | selectors.EVENT_WRITE
data = types.SimpleNamespace(connid=connid,
msg_total=sum(len(m) for m in messages),
recv_total=0,
messages=list(messages),
outb=b'')
sel.register(sock, events, data=data)
使用connect_ex()而不是connect(),因为connect()会立即引发BlockingIOError异常。connect_ex()只返回错误码 errno.EINPROGRESS,而不是在连接正在进行时引发异常。连接完成后,socket对象就可以进行读写,并通过select()返回。
连接建立完成后,我们使用了types.SimpleNamespace构建出和socket对象一同保存的数据,里面的messages对我们要发送的数据做了一个拷贝,因为在后续的发送过程中,它会被修改。client需要发送什么,已经发送了什么以及已经接收了什么都要进行追踪,总共要发送的数据字节数也保存在了data对象中。
def service_connection(key, mask):
sock = key.fileobj
data = key.data
if mask & selectors.EVENT_READ:
recv_data = sock.recv(1024) # Should be ready to read
if recv_data:
print('received', repr(recv_data), 'from connection', data.connid)
data.recv_total += len(recv_data)
if not recv_data or data.recv_total == data.msg_total:
print('closing connection', data.connid)
sel.unregister(sock)
sock.close()
if mask & selectors.EVENT_WRITE:
if not data.outb and data.messages:
data.outb = data.messages.pop(0)
if data.outb:
print('sending', repr(data.outb), 'to connection', data.connid)
sent = sock.send(data.outb) # Should be ready to write
data.outb = data.outb[sent:]
client要追踪来自server的数据字节数,如果收到的数据字节数和发送的相等,或者有一次没有收到数据,说明数据接收完成,本次服务目的已经达成,就可以关闭这次连接了。
data.outb用来维护发送的数据,前面提到过,一次发送不一定能将数据全部送出,使用data.outb = data.outb[sent:]来更新数据的发送。发送完毕后,再messages中取出数据准备再次发送。
可以在这里看到最后的完整代码:
server.py
client.py
最后的运行效果如下:
还是要先启动server,进入监听状态,然后client启动,与server建立两条连接,要发送的信息有两条,这里分开发送,先将fist message分别发送到server,然后再发送second message。server端收到信息后进行暂时保存,当两条信息都收到了才开始进行echo,client端收到完整信息后表示服务结束,断开连接。
注:喜欢python + qun:839383765 可以获取Python各类免费最新入门学习资料!