func serveRequest(s *Server, responsesChan chan<- *serverMessage, stopChan <-chan struct{}, m *serverMessage, workersCh <-chan struct{}) {
request := m.Request
m.Request = nil
clientAddr := m.ClientAddr
m.ClientAddr = ""
skipResponse := (m.ID == 0)
if skipResponse {
m.Response = nil
m.Error = ""
s.Stats.incRPCCalls()
serverMessagePool.Put(m)
}
t := time.Now()
response, err := callHandlerWithRecover(s.LogError, s.Handler, clientAddr, s.Addr, request)
s.Stats.incRPCTime(uint64(time.Since(t).Seconds() * 1000))
if !skipResponse {
m.Response = response
m.Error = err
// Select hack for better performance.
// See https://github.com/valyala/gorpc/pull/1 for details.
select {
case responsesChan <- m:
default:
select {
case responsesChan <- m:
case <-stopChan:
}
}
}
<-workersCh
}
- 消息分为两种,- ID不为0的时候,即!skipResponse的时候,将处理后的消息放入responsesChan交由serverWriter处理,返回给client
- ID为0的标记为skipResponse,并放回消息池中。这种类似于不需要回应的rpc函数调用。
- 具体的处理函数如下,调用handler(s.Handler)并附加了一层崩溃记录恢复。
- s.Handler可以实现类似于函数字典,提供函数注册和函数调用的功能。
- workersCh实际上是对worker计数,所以结束的时候计数-1
func callHandlerWithRecover(logErrorFunc LoggerFunc, handler HandlerFunc, clientAddr, serverAddr string, request interface{}) (response interface{}, errStr string) {
defer func() {
if x := recover(); x != nil {
stackTrace := make([]byte, 1<<20)
n := runtime.Stack(stackTrace, false)
errStr = fmt.Sprintf("Panic occured: %v\nStack trace: %s", x, stackTrace[:n])
logErrorFunc("gorpc.Server: [%s]->[%s]. %s", clientAddr, serverAddr, errStr)
}
}()
response = handler(clientAddr, request)
return
}