IO多路复用深入浅出

Java程序员进阶三条必经之路:数据库、虚拟机、异步通信。

前言

从零单排高性能问题,这次轮到异步通信了。这个领域入门有点难,需要了解UNIX五种IO模型和TCP协议,熟练使用三大异步通信框架:Netty、NodeJS、Tornado。目前所有标榜异步的通信框架用的都不是异步IO模型,而是IO多路复用中的epoll。因为Python提供了对Linux内核API的友好封装,所以我选择Python来学习IO多路复用。

IO多路复用

  1. select

举一个EchoServer的例子,客户端发送任何内容,服务端会原模原样返回。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Created on Feb 16, 2016

@author: mountain
'''
import socket
import select
from Queue import Queue

#AF_INET指定使用IPv4协议,如果要用更先进的IPv6,就指定为AF_INET6。
#SOCK_STREAM指定使用面向流的TCP协议,如果要使用面向数据包的UCP协议,就指定SOCK_DGRAM。
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
#设置监听的ip和port
server_address = ('localhost', 1234)
server.bind(server_address)
#设置backlog为5,client向server发起connect,server accept后建立长连接,
#backlog指定排队等待server accept的连接数量,超过这个数量,server将拒绝连接。
server.listen(5)
#注册在socket上的读事件
inputs = [server]
#注册在socket上的写事件
outputs = []
#注册在socket上的异常事件
exceptions = []
#每个socket有一个发送消息的队列
msg_queues = {}
print "server is listening on %s:%s." % server_address
while inputs:
     #第四个参数是timeout,可选,表示n秒内没有任何事件通知,就执行下面代码
     readable, writable, exceptional = select.select(inputs, outputs, exceptions)
     for sock in readable:
         #client向server发起connect也是读事件,server accept后产生socket加入读队列中
         if sock is server:
             conn, addr = sock.accept()
             conn.setblocking(False)
             inputs.append(conn)
             msg_queues[conn] = Queue()
             print "server accepts a conn."
         else:
             #读取client发过来的数据,最多读取1k byte。
             data = sock.recv(1024)
             #将收到的数据返回给client
             if data:
                 msg_queues[sock].put(data)
                 if sock not in outputs:
                     #下次select的时候会触发写事件通知,写和读事件不太一样,前者是可写就会触发事件,并不一定要真的去写
                     outputs.append(sock)
             else:
                 #client传过来的消息为空,说明已断开连接
                 print "server closes a conn."
                 if sock in outputs:
                     outputs.remove(sock)
                 inputs.remove(sock)
                 sock.close()
                 del msg_queues[sock]
     for sock in writable:
         if not msg_queues[sock].empty():
             sock.send(msg_queues[sock].get_nowait())
         if msg_queues[sock].empty():
             outputs.remove(sock)
     for sock in exceptional:
         inputs.remove(sock)
         if sock in outputs:
             outputs.remove(sock)
         sock.close()
         del msg_queues[sock]
[mountain@king ~/workspace/wire]$ telnet localhost 1234
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1
1

select有3个缺点:
1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
1. 每次调用select后,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
这点从python的例子里看不出来,因为python select api更加友好,直接返回就绪的socket列表。事实上linux内核select api返回的是就绪socket数目:
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
1. fd数量有限,默认1024。

  1. poll

采用poll重新实现EchoServer,只要搞懂了select,poll也不难,只是api的参数不太一样而已。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Created on Feb 27, 2016

@author: mountain
'''
import select
import socket
import sys
import Queue

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server_address = ('localhost', 1234)
server.bind(server_address)
server.listen(5)
print 'server is listening on %s port %s' % server_address
msg_queues = {}
timeout = 1000 * 60
#POLLIN: There is data to read
#POLLPRI: There is urgent data to read
#POLLOUT: Ready for output
#POLLERR: Error condition of some sort
#POLLHUP: Hung up
#POLLNVAL: Invalid request: descriptor not open
READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR
READ_WRITE = READ_ONLY | select.POLLOUT
poller = select.poll()
#注册需要监听的事件
poller.register(server, READ_ONLY)
#文件描述符和socket映射
fd_to_socket = { server.fileno(): server}
while True:
     events = poller.poll(timeout)
     for fd, flag in events:
         sock = fd_to_socket[fd]
         if flag & (select.POLLIN | select.POLLPRI):
             if sock is server:
                 conn, client_address = sock.accept()
                 conn.setblocking(False)
                 fd_to_socket[conn.fileno()] = conn
                 poller.register(conn, READ_ONLY)
                 msg_queues[conn] = Queue.Queue()
             else:
                 data = sock.recv(1024)
                 if data:
                     msg_queues[sock].put(data)
                     poller.modify(sock, READ_WRITE)
                 else:
                     poller.unregister(sock)
                     sock.close()
                     del msg_queues[sock]
         elif flag & select.POLLHUP:
             poller.unregister(sock)
             sock.close()
             del msg_queues[sock]
         elif flag & select.POLLOUT:
             if not msg_queues[sock].empty():
                 msg = msg_queues[sock].get_nowait()
                 sock.send(msg)
             else:
                 poller.modify(sock, READ_ONLY)
         elif flag & select.POLLERR:
             poller.unregister(sock)
             sock.close()
             del msg_queues[sock]

poll解决了select的第三个缺点,fd数量不受限制,但是失去了select的跨平台特性,它的linux内核api是这样的:

int poll (struct pollfd *fds, unsigned int nfds, int timeout);
struct pollfd { 
     int fd; /* file descriptor */
     short events; /* requested events to watch */
     short revents; /* returned events witnessed */
};
  1. epoll

用法与poll几乎一样。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Created on Feb 28, 2016

@author: mountain
'''
import select
import socket
import Queue

server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server_address = ('localhost', 1234)
server.bind(server_address)
server.listen(5)
print 'server is listening on %s port %s' % server_address
msg_queues = {}
timeout = 60
READ_ONLY = select.EPOLLIN | select.EPOLLPRI
READ_WRITE = READ_ONLY | select.EPOLLOUT
epoll = select.epoll()
#注册需要监听的事件
epoll.register(server, READ_ONLY)
#文件描述符和socket映射
fd_to_socket = { server.fileno(): server}
while True:
     events = epoll.poll(timeout)
     for fd, flag in events:
         sock = fd_to_socket[fd]
         if flag & READ_ONLY:
             if sock is server:
                 conn, client_address = sock.accept()
                 conn.setblocking(False)
                 fd_to_socket[conn.fileno()] = conn
                 epoll.register(conn, READ_ONLY)
                 msg_queues[conn] = Queue.Queue()
             else:
                 data = sock.recv(1024)
                 if data:
                     msg_queues[sock].put(data)
                     epoll.modify(sock, READ_WRITE)
                 else:
                     epoll.unregister(sock)
                     sock.close()
                     del msg_queues[sock]
         elif flag & select.EPOLLHUP:
             epoll.unregister(sock)
             sock.close()
             del msg_queues[sock]
         elif flag & select.EPOLLOUT:
             if not msg_queues[sock].empty():
                 msg = msg_queues[sock].get_nowait()
                 sock.send(msg)
             else:
                 epoll.modify(sock, READ_ONLY)
         elif flag & select.EPOLLERR:
             epoll.unregister(sock)
             sock.close()
             del msg_queues[sock]

epoll解决了select的三个缺点,是目前最好的IO多路复用解决方案。为了更好地理解epoll,我们来看一下linux内核api的用法。

int epoll_create(int size)//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)//注册事件,每个fd只拷贝一次。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)/*等待IO事件,事件发生时,
内核调用回调函数,把就绪fd放入就绪链表中,并唤醒epoll_wait,epoll_wait只需要遍历就绪链表即可,
而select和poll都是遍历所有fd,这效率高下立判。*/
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • 看到网上有不少讨论epoll,但大多不够详细准确,以前面试有被问到这个问题。不去更深入的了解,只能停留在知其然...
    电台_Fang阅读 11,569评论 0 8
  • 大纲 一.Socket简介 二.BSD Socket编程准备 1.地址 2.端口 3.网络字节序 4.半相关与全相...
    y角阅读 2,387评论 2 11
  • 我有一位好闺蜜,从小一起长大,大学在一个城市,工作后也会经常见面的那种,反正就是大家所谓的‘’后天亲人”。刚刚我的...
    哆啦的梦阅读 440评论 0 1
  • 这一篇文章是写在14年11月30日,在我看来没有什么文笔可言,字字句句皆出本心。我仍然记得,当时的我是留着泪把它写...
    Ta_nG阅读 767评论 0 1
  • A财富目标 1,公司四个月以内收入50万 2,自己可以独立承担自己的责任。 B,伴侣的目标 身高一米六到一米七,长...
    雪痕情阅读 92评论 0 1