gorpc之serverReader

func serverReader(s *Server, r io.Reader, clientAddr string, responsesChan chan<- *serverMessage,
    stopChan <-chan struct{}, done chan<- struct{}, enabledCompression bool, workersCh chan struct{}) {

    defer func() {
        if r := recover(); r != nil {
            s.LogError("gorpc.Server: [%s]->[%s]. Panic when reading data from client: %v", clientAddr, s.Addr, r)
        }
        close(done)
    }()

    d := newMessageDecoder(r, s.RecvBufferSize, enabledCompression, &s.Stats)
    defer d.Close()

    var wr wireRequest
    for {
        if err := d.Decode(&wr); err != nil {
            if !isClientDisconnect(err) && !isServerStop(stopChan) {
                s.LogError("gorpc.Server: [%s]->[%s]. Cannot decode request: [%s]", clientAddr, s.Addr, err)
            }
            return
        }

        m := serverMessagePool.Get().(*serverMessage)
        m.ID = wr.ID
        m.Request = wr.Request
        m.ClientAddr = clientAddr

        wr.ID = 0
        wr.Request = nil

        select {
        case workersCh <- struct{}{}:
        default:
            select {
            case workersCh <- struct{}{}:
            case <-stopChan:
                return
            }
        }
        go serveRequest(s, responsesChan, stopChan, m, workersCh)
    }
}
  • 增加宕机恢复时,且在函数结束时通知上层结束(close(done))
  • 申请了一个decoder,主要是对gob格式进行解码,长度为RecvBufferSize(默认65536)
  • 从中解析请求(ID/Request),
  • 并且从message池中获取一个msg,并将ID/Request/clientAddr放置进去
  • 将wireRequest置空,便于释放
  • 写入工作管道,意思工作计数+1,如果达到上限的就等待,处理完的serveRequest会将工作计数-1.
  • 并在default中监听stopChan(控制停止管道,如果停止,则直接函数返回)

比较神奇的地方是这种看起来很挫的写法,op更快..
具体描述

看起来很搓

select {
case workersCh <- struct{}{}:
default:
    select {
    case workersCh <- struct{}{}:
    case <-stopChan:
        return
    }
}

看起来简洁

select {
case workersCh <- struct{}{}:
case <-stopChan:
    return
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。