*现在我们开发往往不断使用封装好的web框架, 运行web服务也有相当多的容器, 但是其原理往往都离不开socket. 像是nginx底层就是采用类似python中epoll的异步监听方式加上socket结合来做. * 本文采取从最简单的socket通信实现聊天机器人, 到伪并发实现聊天机器人, 最后采用异步监听方式实现聊天机器人, 逐步推进.
首先我们实现一个最简单版的的socket服务端, server_s1.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
HOST='127.0.0.1'
PORT=9999
sockaddr=(HOST,PORT)
sk=socket.socket()
sk.bind(sockaddr)
sk.listen(5)
conn,address=sk.accept()
ret_bytes=conn.recv(1024)
print(str(ret_bytes,encoding='utf-8'))
conn.sendall(ret_bytes+bytes(', 已收到!',encoding='utf-8'))
sk.close()
-
sk=socket.socket()
这里创建socket对象 - 通过
sk.bind(sockaddr)
传入一个元组对象以此来设置服务端ip和port -
sk.listen(5)
表示设置最大等待连接数为5个 -
conn,address=sk.accept()
此时阻塞进程, 循环等待被连接, 返回连接对象和包含连接信息的对象 -
ret_bytes=conn.recv(1024)
等待接受1024个字节的信息 -
conn.sendall(ret_bytes+bytes(', 已收到!',encoding='utf-8'))
将接受的信息加上, 已收到!
重新发送给客户端. 注意, 在python2中可以传递str类型的数据, 但是在python3中只能传递byte类型的数据 -
sk.close()
关闭连接
至此简单的服务端已经写好了, 我们看看客户端, client_c1.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
HOST='127.0.0.1'
PORT=9999
sockaddr=(HOST,PORT)
ct=socket.socket()
ct.connect(sockaddr)
ct.sendall(bytes('第一次连接',encoding='utf-8'))
ret_bytes=ct.recv(1024)
print(str(ret_bytes,encoding='utf-8'))
ct.close()
- 客户端中需要连接服务端, 通过
ct.connect(sockaddr)
来执行
到现在为止, 已经把简单聊天机器人已经写好了, 客户端向服务端发送第一次连接
, 服务端接受输出到客户端并回馈给客户端第一次连接, 已收到!
接下来我们试着让这个服务端更健壮一些, 尝试让它可以不断的返回客户端发送过来的内容
这是第二个版本的服务端, server_s2.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
HOST='127.0.0.1'
PORT=9999
sockaddr=(HOST,PORT)
sk=socket.socket()
sk.bind(sockaddr)
sk.listen(5)
while True:
conn,address=sk.accept()
while True:
try:
ret_bytes=conn.recv(1024)
except Exception as ex:
print("已从",address,"断开")
break
else:
conn.sendall(ret_bytes+bytes(', 已收到!',encoding='utf-8'))
sk.close()
- 最内层的循环表示一旦连接则一直等待客户端发送消息并发回去, 直到连接断开
- 最外层的循环表示即使断开连接但是服务器仍处于等待其他客户端连接
- 加入异常处理表示, 客户端断开连接, 服务端仅仅断开此次连接
接下来看看客户端文件, client_c2.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
HOST='127.0.0.1'
PORT=9999
sockaddr=(HOST,PORT)
ct=socket.socket()
ct.connect(sockaddr)
while True:
inp=input("请输入要发送的内容: ")
ct.sendall(bytes(inp,encoding='utf-8'))
ret_bytes=ct.recv(1024)
print(str(ret_bytes,encoding='utf-8'))
ct.close()
- 客户端仅仅需要将要发送内容的部分放到循环中即可
现在第二个版本已经可以连续不断的处理同一连接的消息, 即使断开也不会影响服务器的健壮性. 但是, 我们的服务器功能还很单一, 只能一次处理一个客户端的连接. 接下来将用select模块实现伪并发处理客户端连接
这里是第三个版本的服务端文件, server_s3.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import select
HOST = '127.0.0.1'
PORT = 9999
sockaddr = (HOST, PORT)
sk = socket.socket()
sk.bind(sockaddr)
sk.listen(5)
sk_inps = [sk, ]
while True:
change_list, keep_list, error_list = select.select(sk_inps, [], sk_inps, 1)
for sk_tmp in change_list:
if sk_tmp == sk:
conn, address = sk_tmp.accept()
sk_inps.append(conn)
else:
try:
ret_bytes = sk_tmp.recv(1024)
except Exception as ex:
sk_inps.remove(sk_tmp)
print("已从", sk_tmp.getpeername(), "断开")
else:
sk_tmp.sendall(ret_bytes + bytes(', 已收到!', encoding='utf-8'))
for sk_tmp in error_list:
sk_inps.remove(sk_tmp)
sk.close()
我们首先来看一下循环的过程
- 在
change_list, keep_list, error_list = select.select(sk_inps, [], sk_inps, 1)
中,select.select()
会自动监控起参数的内容, 当第一个参数中的对象发生变化时候会将该对象加到change_list中, 该次循环结束时change_list便会自动清空. 第一个参数中的变化对于sk对象, 这里只有客户端连接sk对象或者与sk对象断开两种情况 - 接着我们遍历change_lis中的内容, 当有客户端连接时候, 如图所见, chang_list中只有sk对象, 此时我们将客户端的连接conn加入到sk_inps中, 让select下次循环时候也监控conn对象的变化
- 当客户端发送消息时候意味着conn对象的变化, 此时change_list中加入该连接对象, 根据此对象, 我们可以处理客户端发送来的消息
- 通过以上方式, 让服务端轮流处理每个客户端连接, 由于cpu现在的处理速度极快, 给人的感觉就是并发处理多个客户端请求, 实际上是伪装并发处理
-
sk_inps.remove(sk_tmp)
这一句中, 一旦客户端断开连接, 则服务端就会捕捉到异常并将该客户端对象从监控列表sk_inps
中移除 - 接着我们来说是
select.select()
中的第二个参数, 该参数中有什么对象则keep_list
中就会加入什么对象, 该参数对于读写分离的伪并发处理有很大意义, 我们稍后再做介绍 -
select.select()
的第三个参数是当被监控的对象出现错误或者异常时候就将出错的对象加入到error_list
中, 随后我们遍历error_list
并根据里边的出错对象将其从sk_inps
中除去
该版本的客户端延续上一版本即可, 无需更改. 至此, 我们就建立一个能并发简单处理多客户端连接的服务器. 但是, 对于change_list
中遍历时候我们既有读又有写的操作, 这样当后期的处理复杂的时候, 代码维护很难再进行下去. 接下来我们接着开发我们的伪并发处理的最终版本
这里是服务的文件, server_s4.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
import select
HOST = '127.0.0.1'
PORT = 9997
sockaddr = (HOST, PORT)
sk = socket.socket()
sk.bind(sockaddr)
sk.listen(5)
sk_inps = [sk, ]
sk_outs=[]
message_dic={}
while True:
change_list, keep_list, error_list = select.select(sk_inps, sk_outs, sk_inps, 1)
for sk_tmp in change_list:
if sk_tmp == sk:
conn, address = sk_tmp.accept()
sk_inps.append(conn)
message_dic[conn]=[]
else:
try:
ret_bytes = sk_tmp.recv(1024)
except Exception as ex:
sk_inps.remove(sk_tmp)
print("已从", sk_tmp.getpeername(), "断开")
del message_dic[sk_tmp]
else:
sk_outs.append(sk_tmp)
message_dic[sk_tmp].append(str(ret_bytes,encoding='utf-8'))
for conn in keep_list:
message= message_dic[conn][0]
conn.sendall(bytes(message+", 已收到!",encoding='utf-8'))
del message_dic[conn][0]
sk_outs.remove(conn)
for sk_tmp in error_list:
sk_inps.remove(sk_tmp)
sk.close()
-
sk_outs=[]
中保存发送消息的客户端连接对象 -
message_dic={}
中保存消息内容 - 当客户端发送消息时候, 我们在一个for循环中将其连接对象和消息内容分别保存起来, 在第二个循环中我处理消息内容
以上就是伪并发处理客户端请求所有内容, 究其本质其实是IO多路复用原理. 同时python中也提供了真正的并发处理模块socketserver, 下面我们采用socketserver来实现
首先看我们的服务端文件, server_s5.py
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socketserver
HOST = '127.0.0.1'
PORT = 9997
sockaddr = (HOST, PORT)
class MySocket(socketserver.BaseRequestHandler):
def handle(self):
conn = self.request
while True:
try:
ret_bytes = conn.recv(1024)
except Exception as ex:
print("已从", self.client_address, "断开")
break
else:
conn.sendall(ret_bytes + bytes(', 已收到!', encoding='utf-8'))
if __name__ == "__main__":
server = socketserver.ThreadingTCPServer(sockaddr, MySocket)
server.serve_forever()
- 其原理只是将上述的IO多路复用改成了
threading
线程处理, 再加上本来的Socket内容形成 -
server = socketserver.ThreadingTCPServer(sockaddr, MySocket)
该句会将Socket服务端设置ip和port等内容封装到对象中, 执行初始化时候需要加入自己写的继承socketserver.BaseRequestHandler
的类 -
server.serve_forever()
此句执行时候会使得对象调用handle(self)
方法, 在该方法中我们对客户端连接进行处理
以上我们将Socket从基础原理到复杂自定义已经使用封装好的模块使用介绍完毕. 接下来我们补充一些理论知识和常用的Socket参数和方法: **
首先我们来回顾一下OSI模型和TCP/IP协议簇,如图(图片引自网络)
每层都有相对应的协议,但是socket API只是操作系统提供的一个用于网络编程的接口, 如图(图片引自网络)
根据 socket 传输数据方式的不同(其实就是使用协议的不同), 导致其与不同层打交道
-
Stream sockets
, 是一种面向连接
的 socket, 使用 TCP 协议. -
Datagram sockets
,无连接
的 socket,使用 UDP 协议. -
Raw sockets
, 通常用在路由器或其他网络设备中, 这种socket直接由网络层通向应用层.
以下是注意点:
- 在我们创建对象时候
sk=socket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)
实际上默认传入了参数, 第一个参数表示ip协议,ocket.AF_INET
表示ipv4协议(默认就是), 第二个参数表示传输数据格式,socket.SOCK_STREAM
表示tcp协议(默认就是),socket.SOCK_DGRAM
表示udp协议 -
ret_bytes=conn.recv(1024)
中表示最多接受1024个字节; 若没有接受到内容则会阻塞进程, 等待接受内容 -
send()
可能会发送部分内容,sendall()
本质就是内部循环调用send()
直到将内容发送完毕, 建议使用sendall()
- 当用socket做ftp文件传输时候会产生粘包问题, 此时只需在发送文件大小之后等待接受服务端返回一个确认码后, 再发送文件即可解决