Linux网络IO模型与python实现

一、背景知识

Socket

概念
  • Socket是应用层与TCP/IP协议族通信的中间软件抽象层,是应用程序通过网络协议进行通信的接口。


socket交互基本流程
  • 服务器端先初始化Socket,然后与端口绑定(bind),对端口进行监听(listen),调用accept,等待客户端连接。在这时如果有个客户端初始化一个Socket,然后连接服务器(connect),如果连接成功,这时客户端与服务器端的连接就建立了。客户端发送数据请求,服务器端接收请求并处理请求,然后把回应数据发送给客户端,客户端读取数据,最后关闭连接,一次交互结束。

Linux网络IO

  • Linux中内存分为用户空间和内核空间两个部分。如果用户想要操作内核空间的数据,则需要把数据从内核空间拷贝到用户空间。
  • 如果服务器收到了从客户端过来的请求,并且想要进行处理,那么需要经过如下步骤:
  1. 服务器的网络驱动接收到消息之后,向内核申请空间,并在收到完整的数据包(这个过程会产生延时,因为有可能是通过分组传送过来的)后,将其复制到内核空间;
  2. 数据从内核空间拷贝到用户空间;
  3. 用户程序进行处理。


二、Linux IO模型

阻塞IO模型

  • python实现
    server
import socket

HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024

# 模拟handle socket
def handle_socket(request):
    return '{} received'.format(request)

s = socket.socket()
s.bind((HOST, PORT))
# 开始 TCP 监听。backlog 参数指定在拒绝连接之前,操作系统可以挂起的最大连接数量。
# 该值至少为 1,大部分应用程序设为 5 就可以了。
s.listen(5)
while True:
    print('waiting for connection...')
    conn, addr = s.accept()
    print('connecting from: {}'.format(addr))
    req = conn.recv(BUFSIZ)
    resp = handle_socket(req.decode('utf-8'))
    conn.send(resp.encode())
    conn.close()

client

import socket

HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024

s = socket.socket()
s.connect((HOST, PORT))
s.send(b'test message')
while True:
    data = s.recv(BUFSIZ)
    if not data:
        break
    print(data.decode('utf-8'))
s.close()
  • 缺点:
    recv只能监视单个socket,因此server同一时间只能服务一个client。
  • 一种改进方案是为每个客户端开启一个单独线程来处理。但由于线程也需要占用资源,不可能无限的开启线程。

非阻塞IO模型

非阻塞的recv系统调用之后,进程没有被阻塞,操作系统立马把结果返回给进程,如果数据还没准备好,则抛出异常,进程可以去做其他的事,然后在发起recv系统调用,重复上述过程(这个过程通常被称为轮询),一直到数据准备好,再拷贝数据到进程进行数据处理。需要注意,拷贝数据的整个过程,进程仍然是属于阻塞状态。

  • python实现
    server
import socket

HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024

# 模拟handle socket
def handle_socket(request):
    return '{} received'.format(request)

server = socket.socket()
server.bind((HOST, PORT))
server.listen(5)
# accept默认是阻塞的,设置后accept成为非阻塞
server.setblocking(False)

conn = None

while True:
    try:
        # print('waiting for connection...')
        conn, addr = server.accept()
        print('connecting from: {}'.format(addr))
    # accept被设置为非阻塞后,要求必须有connect来连接, 否则抛出BlockingIOError
    except BlockingIOError:
        continue
        
    try:
        client_req = conn.recv(BUFSIZ)
        resp = handle_socket(client_req.decode('utf-8'))
        conn.send(resp.encode())
        conn.close()
    except (BlockingIOError, ConnectionResetError):
        pass
  • 缺点:需要不断循环向操作系统拿数据,因此CPU占用率很高

多路复用IO模型

  • 多路复用模型使用一个线程来检查多个文件描述符(Socket)的就绪状态,如果有一个文件描述符就绪,则返回,否则阻塞直到超时。得到就绪状态后进行真正的操作可以在同一个线程里执行,也可以启动多线程执行。
  • IO多路复用有select, poll和epoll三种机制
  • 但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。
select
  • select()的机制中提供一种fd_set的数据结构,它是一个long类型的数组,数组的每一个元素与一打开的文件句柄(Socket)建立联系。程序发起一个select调用,select使整个进程阻塞。select会不断轮询fd_set中的所有socket,当任何一个socket收到数据,就会唤醒进程。

  • 当进程 被唤醒后,它知道至少有一个 Socket 接收了数据。程序只需遍历一遍 Socket 列表,就可以得到就绪的 Socket

  • 进程再进行read操作,直接从缓冲中把数据拷贝到进程。

  • python实现

import socket
import select

HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024

# 模拟handle socket
def handle_socket(request):
    return '{} received'.format(request)

server = socket.socket()
server.bind((HOST, PORT))
server.listen(5)
server.setblocking(False)

inputs = [server, ]

while True:
    print('waiting for connection...')
    # 监听第一个列表的文件描述符,如果其中有文件描述符发生改变,则捕获并放到rlist中
    # rlist-- wait until ready for reading
    # wlist -- wait until ready for writing
    # xlist -- wait for an ``exceptional condition''
    rlist, wlist, elist = select.select(inputs, [], [])

    for r in rlist:
        # 当客户端第一次连接服务端时
        if r == server:
            conn, addr = r.accept()
            inputs.append(conn)
            print('connecting from: {}'.format(addr))
        # 当客户端连接上服务端之后,再次发送数据时
        else:
            client_req = r.recv(BUFSIZ)
            resp = handle_socket(client_req.decode('utf-8'))
            r.send(resp.encode())
            inputs.remove(r)
            r.close()
  • 缺点:
  1. 每次调用 Select 都需要将进程加入到所有监视 Socket 的等待队列,每次唤醒都需要从每个队列中移除。这里涉及了两次遍历,而且每次都要将整个 FDS 列表传递给内核,有一定的开销。
  2. 为了减少数据拷贝带来的性能损坏,内核对被监控的fd_set集合大小做了限制,并且这个是通过宏控制的,大小不可改变(限制为1024)
poll

poll的机制与select类似,只是poll没有最大文件描述符数量的限制。因此poll仍然有select的缺点1。

epoll
  • epoll先用 epoll_ctl 维护等待队列,再调用 epoll_wait 阻塞进程。
  • epoll维护了一个就绪列表Rdlist,当程序执行到 epoll_wait 时,如果 Rdlist 已经引用了 Socket,那么 epoll_wait 直接返回,唤醒进程。如果 Rdlist 为空,阻塞进程。进程能够通过Rdlist获取就绪的socket,从而避免了遍历。
  • epoll使用mmap将用户空间的一块地址和内核空间的一块地址同时映射到相同的一块物理内存地址,使得这块物理内存对内核和对用户均可见,减少用户态和内核态之间的数据交换。加速内核与用户空间的消息传递。
  • epoll使用红黑树数据结构来维护需要监视的 Socket。当添加或者删除一个套接字时(epoll_ctl),都在红黑树上去处理。
  • epoll模型的工作模式:

1)LT模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll时,会再次响应应用程序并通知此事件。

2)ET模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll时,不会再次响应应用程序并通知此事件。

  • python实现:
import socket
import select

HOST = socket.gethostname()
PORT = 12345
BUFSIZ = 1024

def handle_socket(request):
    return '{} received'.format(request)

server = socket.socket()
server.bind((HOST, PORT))
server.listen(5)
server.setblocking(False)

#创建epoll对象
epoll = select.epoll()

#将创建的套接字添加到epoll的事件监听中
#事件类型:
#select.EPOLLIN 可读事件
#select.EPOLLOUT 可写事件
#select.EPOLLERR   错误事件
#select.EPOLLHUP   客户端断开事件
epoll.register(server.fileno(), select.EPOLLIN)

conns = {}

while True:
    print('waiting for connection...')
    #轮询注册的事件集合
    epoll_list = epoll.poll()
    for fd, events in epoll_list:
        #新连接
        if fd == server.fileno():
            conn, addr = server.accept()
            #注册新连接fd到待读事件集合
            epoll.register(conn.fileno(), select.EPOLLIN)
            conns[conn.fileno()] = conn
        #可读事件
        elif events == select.EPOLLIN:
            client_req = conns[fd].recv(BUFSIZ)
            resp = handle_socket(client_req.decode('utf-8'))
            conns[fd].send(resp.encode())
            epoll.unregister(fd)
            conns[fd].close()
            del conns[fd]
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容