TCP/IP协议笔记3-处理多客户端连接

第三篇总结下TCP交互数据流与多进程编程以及python中多客户端编程的几种实现方案,测试环境为macos10.12和ubuntu16.04。

1 交互数据流

先看一段简单的代码,这里先把服务端更加简化一下,只接收一次数据就关闭客户端的连接,客户端代码不变,如下所示。

#onceserver.py
import socket

def start_server(ip, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        sock.bind((ip, port))
        sock.listen(1)

        while True:
            conn, cliaddr = sock.accept()
            print 'server connect from: ', cliaddr

            data = conn.recv(1024)
            print 'server received:', data
            conn.send(data.upper())
            conn.close()
    except Exception, ex:
        print 'exception occured:', ex
    finally:
        sock.close()

if __name__ == "__main__":
    start_server('127.0.0.1', 7777)


#client.py
from socket import *
import sys

def start_client(ip, port):
    try:
        sock = socket(AF_INET, SOCK_STREAM, 0)
        sock.connect((ip, port))
        print 'connected'
        while True:
            data = sys.stdin.readline().strip()
            if not data: break

            sock.send(data)
            result = sock.recv(1024)
            if not result:
                print 'other side has closed'
            else:
                print 'response from server:%s' % result
        sock.close()
    except Exception, ex:
        print ex

if __name__ == "__main__":
    start_client('127.0.0.1', 7777)

先开一个终端python onceserver.py,再开另一个终端运行python client.py,然后在客户端依次输入haha, hehe, wawa,可以发现结果如下:

ssj@ssj-mbp ~/Prog/network $ python client.py    
connected
haha
response from server:HAHA
hehe
other side has closed
wawa
[Errno 32] Broken pipe

而对应到wireshark里面,可以看到数据包如下,出现这个结果也很容易解释了:序号5的数据包是客户端发送了4个字节的数据haha给服务端;序号6的数据包是服务端回应一个ACK包,可以看到序号6的ACK的值比序号5上一个Seq的增加了4,这是因为传输了4个字节的数据,所以请求的下一个seq的值加了4。接着的序号7的数据包是服务端发给客户端的4个字节的数据HAHA,ACK的值不变,PSH标志置位。序号8是客户端对这四个字节的ACK包。序号9则是服务端关闭连接的FIN包,然后序号10是客户端对FIN的ACK包。

图1 交互数据包

前一段都是正常的,下面看看后面的输入产生这个结果的原因,这个时候,服务端已经关闭了该连接,我们在客户端再次输入hehe,这时对应序号11,而由于服务端已经关闭了连接,所以回应了一个RST包,对应序号12。客户端send完数据后就不管了,收到RST包后,发现数据为0,所以打印出other side has closed,但是这个时候并不能立刻通知应用程序,而是保存在内核的TCP协议层,这样直到最后再一次准备发送wawa的时候,由于TCP协议层已经处于RST状态了,因此不会将数据发出,而是发一个SIGPIPE信号给应用层,SIGPIPE信号的缺省处理动作是终止程序,所以看到上面的现象。为了避免客户端异常退出,上面的代码应该在判断对方关闭了连接后break出循环,而不是继续send。而服务端要多次接收数据,则改成之前文章中那样。

2 处理多客户端请求-多进程方案

上一节修正后的服务端和客户端代码如下:

#server.py
import socket

def start_server(ip, port):
    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        listensock.bind((ip, port))
        listensock.listen(5)

        while True:
            conn, cliaddr = listensock.accept()
            print 'server connect from: ', cliaddr

            while True:
                data = conn.recv(1024)
                if not data:
                    print 'client closed:', cliaddr
                    break

                print 'server received:', data
                conn.send(data.upper())
            conn.close()
    except Exception, ex:
        print 'exception occured:', ex
    finally:
        listensock.close()

if __name__ == "__main__":
    start_server('127.0.0.1', 7777)


#client.py
from socket import *
import sys

def start_client(ip, port):
    try:
        sock = socket(AF_INET, SOCK_STREAM, 0)

        sock.connect((ip, port))
        print 'connected'
        while True:
            data = sys.stdin.readline().strip()
            if not data: break

            sock.send(data)
            result = sock.recv(1024)
            if not result:
                print 'other side has closed'
                break
            else:
                print 'response from server:%s' % result
        sock.close()
    except Exception, ex:
        print ex

if __name__ == "__main__":
    start_client('127.0.0.1', 7777)

这个时候开启第一个终端,运行python server.py,这时候再开启第二个终端运行python client.py,输入数据,也得到了正常的回应,可是当我们开启另外一个终端运行第二个客户端的时候,会发现发送数据后并只得到了一个ACK回应,服务端并没有发送数据过来。原因也很简单,服务端还卡在第二个循环里面,第一个客户端连接不退出,服务端不会再次运行accept函数处理新的连接。

处理多客户端有几种方式,比如多进程,一个进程对应一个连接,还有多线程,以及进程和线程混合模式等。当然还有更好的select,epoll等方案可以一个进程处理多个客户端,这节就用多进程的来实现下多客户端处理。修改代码如下:

import socket
import os
import sys

def start_server(ip, port):
    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        listensock.bind((ip, port))
        listensock.listen(5)

        while True:
            conn, cliaddr = listensock.accept()

            try:
                pid = os.fork()
            except OSError, e:
                break

            if pid == 0:
                print 'server connect from: ', cliaddr
                listensock.close()
                while True:
                    data = conn.recv(1024)
                    if not data:
                        print 'client closed:', cliaddr
                        break

                    print 'server received:', data
                    conn.send(data.upper())
                conn.close()
                os._exit(0)
            else:
                conn.close()
    except Exception, ex:
        print 'exception occured:', ex
    finally:
        listensock.close()

这样每次来一个连接,就创建一个新的子进程来处理,处理完子进程退出,就可以达到处理多个客户端的情况了。注意的是,这里子进程退出了而父进程也不进行回收处理的话,子进程会变成僵尸进程,如下图所示,一个客户端退出后,可以看到多了一个Python的僵尸进程,状态是Z+,在linux下面会显示状态为<defunct>

➜  data ps aux|grep Python
ssj              7908   0.0  0.0        0      0 s001  Z+    4:14下午   0:00.00 (Python)

为什么会有僵尸进程的存在呢?我们知道一个进程在终止时会关闭所有文件描述符,释放在用户空间分配的内存,但是它的进程控制块(PCB)还保留着,内核在其中保存了一些信息:如果是正常终止则保存着退出状态,如果是异常终止则保存着导致该进程终止的信号是哪个。如果一个进程已经终止,但是它的父进程尚未调用wait或waitpid对它进行清理,这时的进程状态称为僵尸进程。也可以参考下stackoverflow上面的这个问题 why-zombie-processes-exist

为了解决僵尸进程问题,父进程需要处理SIGCHLD信号并调用wait清理僵尸进程,当然为了简单起见,我这里是在父进程里面直接忽略SIGCHLD信号,相当于直接告诉系统,我不关心子进程的状态,不要产生僵尸进程,这样也可以达到解决僵尸进程的目的,修改后的代码如下:

......
import signal #导入signal模块

def start_server(ip, port):
    signal.signal(signal.SIGCHLD, signal.SIG_IGN) #忽略SIGCHLD信号
    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
......

另外说一句,与僵尸进程对应的还有个孤儿进程,就是父进程已经退出,而子进程还没有退出时所处的状态,孤儿进程的父进程退出后会被init进程接管,也就是说它的父进程会被设置为1,子进程运行结束会被init进程回收,不会产生僵尸进程。另外一点,如果要终止一个僵尸进程是不能通过kill命令来实现的,因为僵尸进程已经终止了,没法再kill,正确的方法是kill掉僵尸进程的父进程,让init进程接管僵尸进程并回收。

3 处理多客户端请求-select方案

在之前提到的TCP编程中,其中的socket是阻塞socket,因为python程序会停止运行,直到一个event发生。其中accept()调用会阻塞,直到接收到一个客户端连接。而recv()调用也会阻塞,直到这次接收客户端数据完成(或者没有更多的数据要接收)。send()调用也会阻塞,直到将这次需要返回给客户端的数据都放到Linux的发送缓冲队列中。使用多进程或者多线程来处理多客户端请求,容易引起性能问题,异步socket是一种不错的解决方案。异步socket在python的API里面有select,poll,epoll三种,其中epoll性能最好,select性能较差,因为它每次都要轮询程序锁需要的所有socket去查找感兴趣的event。注意一下,select在这里虽然称之为异步socket,并不是说它的读取和写入不阻塞,只是因为select函数给你找到了已经有的读事件和写事件的socket,你在accept,recv,send调用的时候可以直接读取到数据而不需要再等待,因为数据已经到达

select几乎在所有平台都能支持,良好的跨平台支持是它为数不多的优点了。select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,如果要增大则需要修改参数重新编译内核。另外,select()所维护的socket文件描述符的数据结构,随着文件描述符数量的增大,调用select()扫描所有的socket的开销也会增加。poll()与select()类似,这里就不再讨论。select()将就绪的读写事件的socket告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()的时候将再次返回这些socket,所以它们一般不会丢失消息(比如在下面代码中第一次不处理wset中的socket,第二次select的时候还是会返回对应的socket的集合)。这种方式称为水平触发(Level Triggered),后面会看到epoll里面支持水平触发和垂直触发。

select服务端的实现如下所示:

#selectserver.py
import socket
import os
import select
import Queue

def start_server(ip, port):
    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    try:
        listensock.bind((ip, port))
        listensock.listen(511)
        inputs = [listensock]
        outputs = []

        msg_queue = {}
        while inputs:
            print 'waiting for next event'
            rset, wset, expset = select.select(inputs, outputs, inputs)
            if not rset and not wset and not expset:
                print 'timeout'
                break

            print 'rset %s, wset:%s' % (rset, wset)
            #处理读事件
            for s in rset:
                if s is listensock: #如果是监听socket,则accept接受连接。
                    conn, cliaddr = s.accept()
                    print 'connect from ', cliaddr
                    inputs.append(conn)
                    msg_queue[conn] = Queue.Queue() #为每个连接分配一个队列接收数据
                else:
                    data = s.recv(1024)
                    if data:
                        print 'server received %s from %s' % (data, s.getpeername())
                        msg_queue[s].put(data)
                        if s not in outputs:
                            outputs.append(s)
                    else:
                        print 'client %s closed' % s.getpeername()
                        if s in outputs:
                            outputs.remove(s) //客户端关闭,将对应socket从outputs中移除。
                        inputs.remove(s)
                        del msg_queue[s]
                        s.close()

            #处理写事件
            for s in wset:
                try:
                    #用get_nowait()防止阻塞,如果队列为空会抛出Empty异常,python队列用get会阻塞。
                    next_msg = msg_queue[s].get_nowait()                     
                    print  'server sending %s to %s' % (next_msg.upper(), s.getpeername())
                    s.send(next_msg.upper())
                except Queue.Empty:
                    print s.getpeername(), 'queue empty'
                    outputs.remove(s)

            #处理异常
            for s in expset:
                print 'exception on %s' % s.getpeername()
                inputs.remove(s)
                if s in outputs:
                    outputs.remove(s)
                s.close()
                del msg_queue[s]

    except Exception, ex:
        print 'exception occured:', ex
    finally:
        listensock.close()

if __name__ == "__main__":
    start_server('127.0.0.1', 7777)

运行python selectserver.py,然后在另一个终端开启python client.py,输入数据hehe,可以看到服务端的输出如下,也就是说,select会阻塞等待,等到有事件来的时候,select函数会遍历所有的socket,找到有读取事件和写入事件的socket,然后读取事件的socket设置在rset中,写入事件的socket的设置在wset中,异常的socket在exception中,然后分别处理即可。注意读取事件有个特例是监听关键字,要单独处理。

ssj@ssj-mbp ~/Prog/network/data $ python selectserver.py
waiting for next event
rset [<socket._socketobject object at 0x1022e37c0>], wset:[]
connect from  ('127.0.0.1', 61612)
waiting for next event
rset [<socket._socketobject object at 0x1022e39f0>], wset:[]
server received haha from ('127.0.0.1', 61612)
waiting for next event
rset [], wset:[<socket._socketobject object at 0x1022e39f0>]
server sending HAHA to ('127.0.0.1', 61612)
waiting for next event
rset [], wset:[<socket._socketobject object at 0x1022e39f0>]
('127.0.0.1', 61612) queue empty
waiting for next event

4 处理多客户端请求-epoll方案

上一节的select方案是不需要多进程了,只要有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道I/O事件发生,但却并不知道是那几个socket的I/O事件(可能有一个,多个,甚至全部),于是只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。轮询的时间复杂度为O(n),而且socket越多,时间越长。epoll就是对select的改进,它不再需要轮询所有的socket了,而是把哪个socket发生了什么I/O事件直接通知给我们,如下代码中的epoll.poll()方法就是返回有I/O事件的socket的文件描述符和事件类型,大大降低了时间复杂度,提高了性能。关于epoll的原理可以参见参考资料5,python中的API已经简化了不少操作。

epoll有水平触发(LT, level triggered)和边缘触发(ET, edge triggered)两种方式。其中LT是默认的工作方式,LT模式同时支持block和no-block socket,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知,这种模式编程出错误可能性要小一点。而ET是一种加速模式,当一个新的事件到来时,ET模式下可以从poll调用中获取到这个事件,可是如果这次没有把这个事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在ET模式下是无法再次从poll调用中获取这个事件的,使用ET方式的epoll代码可以参见参考资料4。macos没有epoll方法,这里用的测试环境为Ubuntu16.04.

python中使用epoll代码如下:

import socket
import os
import select
import Queue

def start_server(ip, port):
    listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listensock.bind((ip, port))
    listensock.listen(511)
    listensock.setblocking(0)
    epoll = select.epoll()
    epoll.register(listensock.fileno(), select.EPOLLIN)

    try:
        connections = {}
        msg_queue = {}

        while True:
            events = epoll.poll(1)
            for fileno, event in events:
                if fileno == listensock.fileno():
                    conn, cliaddr = listensock.accept()
                    conn.setblocking(0)
                    epoll.register(conn.fileno(), select.EPOLLIN)
                    connections[conn.fileno()] = conn
                    msg_queue[conn.fileno()] = Queue.Queue()
                elif event & select.EPOLLIN:
                    data = connections[fileno].recv(1024)
                    if data:
                        print 'server recv ', data 
                        msg_queue[fileno].put(data)
                        epoll.modify(fileno, select.EPOLLOUT)
                    else:
                        print 'no data recv, server close ', fileno 
                        epoll.modify(fileno, select.EPOLLHUP)
                        connections[fileno].shutdown(socket.SHUT_RDWR)
                elif event & select.EPOLLOUT:
                    try:
                        data = msg_queue[fileno].get_nowait()
                        print 'server send ', data
                        connections[fileno].send(data.upper())
                    except Queue.Empty:
                        epoll.modify(fileno, select.EPOLLIN)
                elif event & select.EPOLLHUP:
                    print 'close ', fileno
                    epoll.unregister(fileno)
                    connections[fileno].close()
                    del connections[fileno]
    except Exception, ex:
        print 'exception occured:', ex
    finally:
        epoll.unregister(listensock.fileno())
        epoll.close()
        listensock.close()


if __name__ == "__main__":
    start_server('127.0.0.1', 7777)

5 参考资料

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容