fasthttp是如何做到比net/http快十倍的

标准库标准库net/http的实现原理,不过有个fasthttp的库号称比net/http快十倍呢!
哇,性能太强了吧,话不多说,和大家一起看看fasthttp Server端的底层实现,来看看到底是如何做到性能如此之快的,有哪些优秀的特性值得我们学习和借鉴的!

Server端处理流程对比

在进行了解fasthttp底层代码实现之前,我们先对两者处理请求的方式进行一个回顾和对比,了解完两者的基本的情况之后,再对fasthttp的实现最进一步分析。

net/http处理流程

源《图文讲透Golang标准库 net/http实现原理 -- 服务端》中讲的比较详细了,这里再把大致流程整理以下,整体流程如下:

image.png

  1. 将路由和对应的handler注册到一个 map 中,用做后续键值路由匹配
  2. 注册完之后就是开启循环监听连接,每获取到一个连接就会创建一个 Goroutine进行处理
  3. 在创建好的 Goroutine 里面会循环的等待接收请求数据,然后根据请求的地址去键值路由map中匹配对应的handler
  4. 执行匹配到的处理器handler
    net/http 的实现是一个连接新建一个 goroutine,如果在连接数非常多的时候,每个连接都会创建一个 Goroutine 就会给系统带来一定的压力。这也就造成了 net/http在处理高并发时的瓶颈。

每次来了一个连接,都要实例化一个连接对象,这谁受得了a,要脱毛了

fasthttp处理流程

再看看fasthttp处理请求的流程:


image.png
  1. 启动监听
  2. 循环监听端口获取连接,建立workerPool
  3. 循环尝试获取连接 net.Conn,先会去 ready 队列里获取 workerChan,获取不到就会去对象池获取
  4. 将获取到的的连接net.Conn 发送到 workerChan 的 channel 中
  5. 开启一个 Goroutine 一直循环获取 workerChan 这个 channel 中的数据
  6. 获取到channel中的net.Conn之后就会对请求进行处理
    workerChan 其实就是一个连接处理对象,这个对象里面有一个 channel 用来传递连接;每个 workerChan 在后台都会有一个 Goroutine 循环获取 channel 中的连接,然后进行处理。

workerChan是在workerPool临时对象分别存取

fasthttp为什么快

fasthttp的优化主要有以下几个点:

  • 连接复用,如slice中有可复用的workerChan就从ready这个slice中获取,没有可复用的就在workerChanPool创建一个,万一池子满了(默认是 256 * 1024个)就报错。
  • 对于内存复用,就是大量使用了sync.Pool(你知道的,sync.Pool复用对象有啥好处),有人统计过,用了整整30个sync.Pool,context、request对象、header、response对象都用了sync.Pool ....
  • 利用unsafe.Pointer指针进行[]byte 和 string 转换,避免[]byte到string转换时带来的内存分配和拷贝带来的消耗 。

知道了fasthttp为什么快,接下来我们看下它是如何处理监听处理请求的,在哪些地方用到了这些特性。

底层实现

简单案例

import (
    "github.com/buaazp/fasthttprouter"
    "github.com/valyala/fasthttp"
    "log"
)

func main() {
    //创建路由
    r := fasthttprouter.New()
    r.GET("/", Index)
 if err := fasthttp.ListenAndServe(":8083", r.Handler); err != nil {
        log.Fatalf("ListenAndServe fatal: %s", err)
    }

}
func Index(ctx *fasthttp.RequestCtx) {
    ctx.WriteString("hello xiaou code!")
}

这个案例同样是几样代码就启动了一个服务。
创建路由、为不同的路由执行关联不同的处理函数handler,接着跟net/http一样调用 ListenAndServe 函数进行启动服务监听,等待请求进行处理。

workerPool结构

workerpool 对象表示 连接处理 工作池,这样可以控制连接建立后的处理方式,而不是像标准库 net/http 一样,对每个请求连接都启动一个 goroutine 处理, 内部的 ready 字段存储空闲的 workerChan 对象,workerChanPool 字段表示管理 workerChan 的对象池。workerPool结构体如下:

type workerPool struct {
    //匹配请求对应的handler
    WorkerFunc ServeHandler
    //最大同时处理的请求数
    MaxWorkersCount int
    
    LogAllErrors bool
    //最大空闲工作时间
    MaxIdleWorkerDuration time.Duration

    Logger Logger
    //互斥锁
    lock         sync.Mutex
    //work数量
    workersCount int
    mustStop     bool
    // 空闲的 workerChan
    ready []*workerChan
    //是否关闭workerPool
    stopCh chan struct{}
    //sync.Pool  workerChan 的对象池
    workerChanPool sync.Pool

    connState func(net.Conn, ConnState)
}

WorkerFunc :这个属性挺重要的,因为给它赋值的是

Server.serveConnready:存储了空闲的

workerChanworkerChanPool:是workerChan 的对象池,在sync.Pool中存取临时对象,可减少内存分配

启动服务

ListenAndServe是启动服务监听的入口,内部的调用过程如下:


image.png
Server.Serve

Serve方法为来自给监听到的连接提供处理服务,直到超过了最大限制(256 * 1024)才会报错。

func (s *Server) Serve(ln net.Listener) error {
    //最大连接处理数
    maxWorkersCount := s.getConcurrency()

    s.mu.Lock()
    s.ln = append(s.ln, ln)
    if s.done == nil {
        s.done = make(chan struct{})
    }
    if s.concurrencyCh == nil {
        s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    }
    s.mu.Unlock()
    //workerPool进行初始化
    wp := &workerPool{
        WorkerFunc:            s.serveConn,
        MaxWorkersCount:       maxWorkersCount,
        LogAllErrors:          s.LogAllErrors,
        MaxIdleWorkerDuration: s.MaxIdleWorkerDuration,
        Logger:                s.logger(),
        connState:             s.setState,
    }
    //开启协程,处理协程池的清理工作
    wp.Start()
    atomic.AddInt32(&s.open, 1)
    defer atomic.AddInt32(&s.open, -1)

    for {
        // 阻塞等待,获取连接net.Conn
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
            ...
            return err
        }
        s.setState(c, StateNew)
        atomic.AddInt32(&s.open, 1)
        //处理获取到的连接net.Conn
        if !wp.Serve(c) {
            //未能处理,说明已达到最大worker限制
            ...
        }
        c = nil
    }
}

从上面的注释中我们可以看出 Server 方法主要做了以下几件事:

  1. 初始化 worker Pool,并启动

  2. net.Listener循环接收请求

  3. 将接收到的请求交给workerChan 处理

注意:这里如果超过了设定的最大连接数(默认是 256 * 1024个)就直接报错了

Start开启协程池

workerPool进行初始化之后接着就调用Start开启,这里主要是指定sync.Pool变量workerChanPool的创建函数。

接着开启一个协程,该Goroutine的目的是进行定时清理 workerPool 中的 ready 中保存的空闲 workerChan,清理频率为每 10s 启动一次。

清理规则是使用二进制搜索算法找出最近可以清理的工作者的索引

func (wp *workerPool) Start() {
    //wp的关闭channel是否为空
    if wp.stopCh != nil {
        return
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    //指定workerChanPool的创建函数
    wp.workerChanPool.New = func() interface{} {
        return &workerChan{
            ch: make(chan net.Conn, workerChanCap),
        }
    }
    //开启协程
    go func() {
        var scratch []*workerChan
        for {
            //清理空闲超时的 workerChan
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                // 间隔10 s
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
} 

开启一个清理Goroutine的目的是为了避免在流量高峰创建了大量协程,之后不再使用,造成协程浪费。

清理流程是在wp.clean()方法中实现的。

接收连接

acceptConn函数通过调用net.Listener的accept方法去接受连接,这里获取连接的方式跟net/http调用的其实都是一样的。

func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
    for {
        c, err := ln.Accept()
        if err != nil {
            //err判断
            ...
        }
        //校验是否net.TCPConn连接
       // 校验每个ip对应的连接数
        if s.MaxConnsPerIP > 0 {
            pic := wrapPerIPConn(s, c)
            if pic == nil {
                ...
                continue
            }
            c = pic
        }
        return c, nil
    }
} 

获取 workerChan

func (wp *workerPool) Serve(c net.Conn) bool {
    //获取 workerChan 
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    //将连接放到channel中
    ch.ch <- c
    //返回true
    return true
}

这里调用的getCh()函数实现了获取workerChan,获取到之后将之前接受的连接net.Conn放到workerChan结构体的channel通道中。

我们看下workerChan这个结构体

type workerChan struct {
    lastUseTime time.Time
    ch          chan net.Conn
}

lastUseTime:最后一次被使用的时间,这个值在进行清理workerChan的时候是会用到的

ch:用来传递获取到的连接net.Conn,获取到连接时接收,处理请求时获取

getCh方法:
func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    //从ready队列中拿workerChan
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        //ready队列不为空,从队尾拿workerChan
        ch = ready[n]
        //队尾置为nil
        ready[n] = nil
        //重新将ready赋值给wp.ready
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()
    //ready中获取不到workerChan,则从对象池中新建一个
    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        ch = vch.(*workerChan)
        //开启一个goroutine执行
        go func() {
            //处理ch中channel中的数据
            wp.workerFunc(ch)
            //处理完后将workerChan放回对象池
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
} 
image.png

getCh()方法的目的就是获取workerChan,流程如下:

  • 先会去 ready 空闲队列中获取 workerChan

  • ready 获取不到则从对象池中创建一个新的 workerChan

  • 并启动 Goroutine 用来处理 channel 中的数据

workPool中的ready是一个FILO的栈,每次从队尾取出workChan

处理连接

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    for c = range ch.ch {
        //channel的值是nil,退出
        if c == nil {
            break
        }
        //执行请求,并处理
        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
            ...
        }
        ...
        //将当前workerChan放入ready队列
        if !wp.release(ch) {
            break
        }
    }

    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}
执行流程
  • 先遍历workerChan的channel,看是否有连接net.Conn

  • 获取到连接之后就执行WorkerFunc 函数处理请求

  • 请求处理完之后将当前workerChan放入ready队列

WorkerFunc 函数实际上是 Server 的 serveConn 方法

一开始开代码的时候我还没发现呢,细看了之后在Server.Serve()启动服务时将Server.serveConn()方法赋值给了workerPool的WorkerFunc()。

image.png

要想了解实现的朋友可以搜下这方面的代码:

func (s *Server) ServeConn(c net.Conn) error {
    ...
    err := s.serveConn(c)
    ...
}

里面的代码会比较多,不过里面的流程就是是获取到请求的参数,找到对应的 handler 进行请求处理,然后返回 响应给客户端。

这里的实现代码可以看到context、request对象的sync.Pool实现,这里就不一一贴出来了。

总结

fasthttp和net/http在实现上还是有较大区别,通过对实现原理的分析,知道了fasthttp速度快是利用了大量sync.Pool对象复用 、[]byte 和 string利用万能指针unsafe.Pointer进行转换等优化技巧。

如果你的业务需要支撑较高的 QPS 并且保持一致的低延迟时间,那么采用 fasthttp 是一个较好的选择。不过net/http兼容性更高,在多数情况下反而是更好的选择!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容