func serverReader(s *Server, r io.Reader, clientAddr string, responsesChan chan<- *serverMessage,
stopChan <-chan struct{}, done chan<- struct{}, enabledCompression bool, workersCh chan struct{}) {
defer func() {
if r := recover(); r != nil {
s.LogError("gorpc.Server: [%s]->[%s]. Panic when reading data from client: %v", clientAddr, s.Addr, r)
}
close(done)
}()
d := newMessageDecoder(r, s.RecvBufferSize, enabledCompression, &s.Stats)
defer d.Close()
var wr wireRequest
for {
if err := d.Decode(&wr); err != nil {
if !isClientDisconnect(err) && !isServerStop(stopChan) {
s.LogError("gorpc.Server: [%s]->[%s]. Cannot decode request: [%s]", clientAddr, s.Addr, err)
}
return
}
m := serverMessagePool.Get().(*serverMessage)
m.ID = wr.ID
m.Request = wr.Request
m.ClientAddr = clientAddr
wr.ID = 0
wr.Request = nil
select {
case workersCh <- struct{}{}:
default:
select {
case workersCh <- struct{}{}:
case <-stopChan:
return
}
}
go serveRequest(s, responsesChan, stopChan, m, workersCh)
}
}
- 增加宕机恢复时,且在函数结束时通知上层结束(close(done))
- 申请了一个decoder,主要是对gob格式进行解码,长度为RecvBufferSize(默认65536)
- 从中解析请求(ID/Request),
- 并且从message池中获取一个msg,并将ID/Request/clientAddr放置进去
- 将wireRequest置空,便于释放
- 写入工作管道,意思工作计数+1,如果达到上限的就等待,处理完的serveRequest会将工作计数-1.
- 并在default中监听stopChan(控制停止管道,如果停止,则直接函数返回)
比较神奇的地方是这种看起来很挫的写法,op更快..
具体描述
看起来很搓
select {
case workersCh <- struct{}{}:
default:
select {
case workersCh <- struct{}{}:
case <-stopChan:
return
}
}
看起来简洁
select {
case workersCh <- struct{}{}:
case <-stopChan:
return
}