zinx V0.7 读写分离

七、Zinx的读写分离模型 · 语雀 (yuque.com)

  1. 对框架进行读写分离 我们要实现的是客户端发送消息给客户端 客户端的Reader对消息进行接收 然后Reader通过管道将消息发送给Writer而不是直接发回给客户端 最后Writer将消息发回给客户端
    因为之前没有管道服务端不知道该什么时候将消息发回 可能客户端发送了两次 服务端才将这两次消息一并返回
    因此我们需要实现读写分离
消息处理
  1. 因为我们之前的Reader是在Connection模块中实现的 所以我们在Connection结构体中新增添1个Channel
type Connection struct {
  ···
  // 无缓冲的管道 用于读写的Goroutine之间的消息通信
    MsgChan chan []byte
}

这里是channel存的是字节切片而不是Message 因为Writer要发送给客户端的消息是已经序列化好的二进制流
这里就是我们的Writer模块

/*
    主要是写消息的Goroutine 专门发送给客户端的模块
*/
func (c *Connection) StartWriter() {
    fmt.Println("[Writer Goroutinr is running]")
    defer fmt.Println(c.GetRemoteAddr().String(), " [conn Writer exit!]")
    defer fmt.Println("Writer is exit!")

    // 不断阻塞等待Msgchan的消息
    for {
        select {
        // data就是已经序列化好的Message 二进制流的结构MsgDatalen|MsgId|MsgData
        case data := <-c.MsgChan:
            // 有数据要写给客户端
            if _, err := c.Conn.Write(data); err != nil {
                fmt.Println("Send data error", err, " Conn Writer exit")
                return
            }
        case <-c.ExitChannel: //可读代表Reader已经退出 此时Writer也要退出
            return
        }
    }
}

然后我们在Connection的Start中同时开启Reader和Writer这两个协程

func (c *Connection) Start() {
    fmt.Println("Conn Start() ... Conn ID = ", c.ConnId)
    // 启动从当前链接的读数据的业务
    // Todo 启动从当前链接写数据的业务
    go c.StartReader()
    go c.StartWriter()
}

此时两者同时运行 当Reader读到消息后 将调用路由对应的处理方法 而我们的处理方法中调用了SendMsg

func (this *PingRouter) Handle(request ziface.IRequest) {
    fmt.Println("Call PingRouter Handle...")
    fmt.Println("Recv from client: msgId= ", request.GetMsgId(),
        ", data= ", string(request.GetData()))
    // 回写数据
    err := request.GetConnection().SendMsg(200, []byte("ping..ping..ping.."))
    if err != nil {
        fmt.Println(err)
        return
    }
}

SendMsg是将服务端要发送的内容序列化好后并传给客户端

// 提供一个SendMsg方法 将我们要发送给客户端的数据 先进行封包再发送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
    if c.IsClosed == true {
        return errors.New("Connection closed when send msg")
    }
    // 将data进行封包 MsgDataLen|MsgId|MsgData
    dp := NewDataPack()
    // 这个msg是已经序列化的二进制msg
    binaryMsg, err := dp.Pack(NewMsgPackage(msgId, data))
    if err != nil {
        fmt.Println("Pack error msg id = ", msgId)
        return errors.New("Pack error msg")
    }
    // 把数据发送给管道
    c.MsgChan <- binaryMsg
    return nil
}

看函数中的倒数第二行 将封包了的数据输送进管道 而当前的MsgChan我们初始化时是设置为无缓冲的 所以一有数据进入管道 Writer端通过select监听到有数据就会立马接收数据并进行处理

基本的流程就是客户端发送封装好的消息->Reader进行接收->路由的处理方法进行处理->SendMsg数据发送给管道->Writer监听到数据并进行处理

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容