最近想研究一下关于长链接的相关内容,在B站上看到了Zinx框架的视频,是Golang语言的框架,本着更好的理解框架的内容,按照整个Zinx课程的进度,制作一个Python版本的Zinx框架。有关于Zinx框架的具体内容,可以看框架作者的介绍。
python版本的Zinx,基于Gevent 22.10.2,使用协程功能。
golang版本的Zinx项目,项目中两个文件夹,ziface和znet。
- ziface主要是存放一些Zinx框架的全部模块的抽象层接口类。
- znet模块是zinx框架中网络相关功能的实现,所有网络相关模块都会定义在znet模块中。
└── zinx
├── ziface
│ └──
└── znet
├──
python中的关键字没有interface,但是可以使用抽象基类(abstract base class)和第三方库来实现类似于接口的功能。在实际开发中,我们可以根据具体需求选择合适的实现方式。
暂时使用抽象基类的形式模拟接口的实现。
在之前的章节中,服务端读取到消息之后,就会开启一个协程进行消息的处理。本节就是将流程改为读取到消息后,将待处理的消息存放到每一个worker的任务队列中,池中空闲的worker会领取自身任务队列中的消息进行处理,防止开启过多的协程进行处理。
让用户可以在配置文件中配置worker的数量,需要再GlobalObj中添加一个成员变量WorkerPoolSize 。
# -*- coding: utf-8 -*-
from typing import Optional
from ziface.iserver import IServer
import json
import os
class GlobalObj:
def __init__(self, host: str, tcp_port: int, name: str, version: str, max_packet_size: int, max_conn: int, worker_pool_size: int):
self.TcpServer: Optional[IServer] = None # 当前zinx的全局Server对象
self.Host: str = host # 当前服务器主机IP
self.TcpPort: int = tcp_port # 当前服务器主机监听端口号
self.Name: str = name # 当前服务器名称
self.Version: str = version # 当前Zinx版本号
self.MaxPacketSize: int = max_packet_size # 都需数据包的最大值
self.MaxConn: int = max_conn # 当前服务器主机允许的最大链接个数
self.WorkerPoolSize = worker_pool_size
self.Reload()
def Reload(self):
"""
读取用户的配置文件,覆盖默认配置
:return:
"""
root_path = os.getcwd()
conf_path = "/conf/"
conf_name = "zinx.json"
real_path = root_path + conf_path + conf_name
if not os.path.exists(real_path):
return
with open(real_path, 'r', encoding='utf-8') as jsonfile:
data = json.load(jsonfile)
for key in data.keys():
if key in self.__dict__.keys():
self.__dict__[key] = data[key]
def SetTcpServer(self, tcp):
"""
记录一下TCPServer
:param tcp:
:return:
"""
self.TcpServer = tcp
# 全局对象
GlobalObject = GlobalObj("0.0.0.0", 8986, "ZinxServerApp", "测试版本", 4096, 12000, 10)
# 顺便做一个全局Events
GlobalGevents: list = []
imsghandler中添加两个函数,StartWorkerPool开启工作池、SendMsgToTaskQueue将消息放入工作池队列。
# -*- coding: utf-8 -*-
from ziface.irequest import IRequest
from ziface.irouter import IRouter
from abc import ABC, abstractmethod
class IMsgHandler(ABC):
"""
消息管理抽象层
"""
@abstractmethod
def DoMsgHandler(self, request: IRequest):
"""
调度/执行对应的Router消息处理方法
:param request:
:return:
"""
pass
@abstractmethod
def AddRouter(self, msgID: int, router: IRouter):
"""
为消息添加具体的处理逻辑
:param msgID:
:param router:
:return:
"""
pass
@abstractmethod
def StartWorkerPool(self):
"""
启动worker工作池
:return:
"""
pass
@abstractmethod
def SendMsgToTaskQueue(self, request: IRequest):
"""
将消息交给TaskQueue, 由worker进行处理
:param request:
:return:
"""
pass
&esmp;在msgHandler实现一下,添加了两个成员函数WorkerPoolSize表示worker数量、TaskQueue每一个worker对应的消息队列。StartWorkerPool开启了工作池,StartOneWorker将每一个worker放入协程中运行,SendMsgToTaskQueue将消息放入对应worker的消息队列。
# -*- coding: utf-8 -*-
from typing import Dict, List
import gevent
from ziface.irequest import IRequest
from ziface.irouter import IRouter
from ziface.imsghandler import IMsgHandler
from utils.globalobj import GlobalObject, GlobalGevents
from gevent.queue import Queue
class MsgHandler(IMsgHandler):
"""
消息管理实现层
"""
def __init__(self):
self.Apis: Dict[int, IRouter] = {} # type : dict[int, IRouter]
self.WorkerPoolSize = GlobalObject.WorkerPoolSize # worker数量
self.TaskQueue: List[Queue] = [] # 每一个worker对应一个Queue,所有Queue用List管理
def DoMsgHandler(self, request: IRequest):
"""
调度/执行对应的Router消息处理方法
:param request:
:return:
"""
# 判断当前request绑定的API处理方法是否已经存在
if request.GetMsgID() not in self.Apis.keys():
print("接口 msgId = ", request.GetMsgID(), " 未找到!")
return
# 执行对应处理方法
self.Apis[request.GetMsgID()].PreHandle(request)
self.Apis[request.GetMsgID()].Handle(request)
self.Apis[request.GetMsgID()].PostHandle(request)
def AddRouter(self, msgID: int, router: IRouter):
"""
为消息添加具体的处理逻辑
:param msgID:
:param router:
:return:
"""
# 1 判断当前msg绑定的API处理方法是否已经存在
if msgID in self.Apis.keys():
print("接口 msgId = ", msgID, " 已存在,无法再次添加!")
return
# 2 添加msg与api的绑定关系
self.Apis[msgID] = router
print("接口 msgId = ", msgID, " 添加成功!")
def StartWorkerPool(self):
# 遍历需要启动worker的数量,依此启动
for i in range(0, self.WorkerPoolSize):
# 一个worker被启动
# 给当前worker对应的任务队列开辟空间
self.TaskQueue.append(Queue())
# 启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
g1 = gevent.spawn(self.StartOneWorker, i)
GlobalGevents.append(g1)
def StartOneWorker(self, workerID: int):
print("Worker ID = ", workerID, " 已启动。")
# 不断的等待队列中的消息
while True:
# 有消息则取出队列的Request,并执行绑定的业务方法
try:
temp_req = self.TaskQueue[workerID].get_nowait()
self.DoMsgHandler(temp_req)
except:
# get_nowait并不会主动切换其他协程,使用sleep切换到其他协程,保证不在当前协程中循环运行
gevent.sleep(0)
continue
def SendMsgToTaskQueue(self, request: IRequest):
"""
根据ConnID来分配当前的连接应该由哪个worker负责处理 轮询的平均分配法则
:return:
"""
# 得到需要处理此条连接的workerID
worker_id = request.GetConnection().GetConnID() % self.WorkerPoolSize
print("添加 ConnID=", request.GetConnection().GetConnID(), " 请求 msgID=", request.GetMsgID(), "到 workerID=",
worker_id)
# 将请求消息发送给任务队列
self.TaskQueue[worker_id].put(request)
def NewMsgHandler() -> IMsgHandler:
return MsgHandler()
消息队列和多任务模块封装好了,接下来就是集成了。
在Server的Start中,完成率先启动。
def Start(self):
"""
启动服务器方法
:return:
"""
print("[启动] 服务监听,IP地址:%s, 端口:%s,已经启动\n" % (self.ip, self.port))
self.msgHandler.StartWorkerPool()
g1 = gevent.spawn(self.InitTCP)
GlobalGevents.append(g1)
在Connection中StartReader中,将消息队列集成进去。设置worker数量则开启任务队列,不设置则不开启。
def StartReader(self):
"""
处理读业务
:return:
"""
print("开启读业务")
while True:
try:
dp = NewDataPack()
# 读取客户端的Msg head
head_data = self.Conn.recv(dp.GetHeadLen())
if len(head_data) == 0:
# head_data 长度为0表示客户端已经退出
break
# 拆包,得到msgID 和 dataLen 放在msg中
msg = dp.Unpack(head_data)
# 根据 dataLen 读取 data,放在msg.Data中
data_content = self.Conn.recv(msg.GetDataLen())
msg.SetData(data_content)
# 得到当前conn数据的Request请求数据
req = NewRequest(self, msg)
# 执行注册的路由方法
self.RunHandlerForWorker(req)
g1 = gevent.spawn(self.RunHandler(req))
GlobalGevents.append(g1)
except Exception as e:
print("读取数据异常 ", e)
break
print("读业务关闭,ID", self.ConnID)
self.Stop()
def RunHandlerForWorker(self, req):
"""
采用任务队列运行Handler,如果未设置worker数量或数量为0时,默认不开启队列运行
:param req:
:return:
"""
if GlobalObject.WorkerPoolSize <= 0:
return
# 已经启动工作池机制,将消息交给Worker处理
self.msgHandler.SendMsgToTaskQueue(req)
def RunHandler(self, request: IRequest):
"""
采用协程运行Handler,如果设置了worker数量,默认不开启协程运行。
:param request:
:return:
"""
if GlobalObject.WorkerPoolSize > 0:
return
self.msgHandler.DoMsgHandler(request)
服务接口做完了。在demo\taskqueue中做一个客户端和服务端测试一下,代码与demo\msghandler一样即可。
此时发送和接收都正常。消息队列和多任务完成。