Influxdb的Http请求处理流程

Http请求的处理流程

HTTPDService服务的添加

  1. 在 Server的启动过程中会添加并启动各种service, 其中就包括这个HTTPDService:appendHTTPDService(c httpd.Config) 定义在 cmd/influxdb/run/server.go
    srv := httpd.NewService(c)
    srv.Handler.MetaClient = s.MetaClient
    srv.Handler.QueryAuthorizer = meta.NewQueryAuthorizer(s.MetaClient)
    srv.Handler.WriteAuthorizer = meta.NewWriteAuthorizer(s.MetaClient)
    srv.Handler.QueryExecutor = s.QueryExecutor
    srv.Handler.Monitor = s.Monitor
    srv.Handler.PointsWriter = s.PointsWriter
    srv.Handler.Version = s.buildInfo.Version
    srv.Handler.BuildType = "OSS"
    ss := storage.NewStore(s.TSDBStore, s.MetaClient)
    srv.Handler.Store = ss
    srv.Handler.Controller = control.NewController(ss, s.Logger)

    s.Services = append(s.Services, srv)
  1. 从上面的代码可以看出,主要是初始化这个Handler, 这个Handler类负责处理具体的Http Request,生成相应的Response;

HTTPDService分析

  1. Httpd Service的具体实现在 services/httpd目录下
  2. 这个http服务使用golang提供的net/http包实现
  3. 流程解析:
    3.1 创建Service:
    func NewService(c Config) *Service {
    s := &Service{
        addr:           c.BindAddress, //http服务监控的地址,端口
        https:          c.HTTPSEnabled,
        cert:           c.HTTPSCertificate,
        key:            c.HTTPSPrivateKey,
        limit:          c.MaxConnectionLimit,
        tlsConfig:      c.TLS,
        err:            make(chan error),
        unixSocket:     c.UnixSocketEnabled,
        unixSocketPerm: uint32(c.UnixSocketPermissions),
        bindSocket:     c.BindSocket,
        Handler:        NewHandler(c),  // 创建Handler
        Logger:         zap.NewNop(),
    }
    if s.tlsConfig == nil {
        s.tlsConfig = new(tls.Config)
    }

3.2 启动Service:

func (s *Service) Open() error {
    s.Handler.Open() // Handler必要的初始化,主要是日志文件的设置

    // Open listener.
    if s.https {
         ...
        //tls listener支持
        s.ln = listener
    } else {
        ...
        listener, err := net.Listen("tcp", s.addr)
        s.ln = listener
    }

    // Open unix socket listener.
    if s.unixSocket {
        ...
        s.unixSocketListener = listener
        go s.serveUnixSocket()
    }

    // Enforce a connection limit if one has been given.
    // 使用这个LimitListener,同时仅能接收s.limit个连接,超过的connect则自动被close掉
    if s.limit > 0 {
        s.ln = LimitListener(s.ln, s.limit)
    }

    ...

    // Begin listening for requests in a separate goroutine.
    go s.serveTCP()
    return nil
}

3.3 关键函数之NewHandler():

h := &Handler{
        mux:            pat.New(),
        Config:         &c,
        Logger:         zap.NewNop(),
        CLFLogger:      log.New(os.Stderr, "[httpd] ", 0),
        stats:          &Statistics{},
        requestTracker: NewRequestTracker(),
    }

    // Limit the number of concurrent & enqueued write requests.
    h.writeThrottler = NewThrottler(c.MaxConcurrentWriteLimit, c.MaxEnqueuedWriteLimit)
    h.writeThrottler.EnqueueTimeout = c.EnqueuedWriteTimeout

    h.AddRoutes([]Route{
    ...
    //添加各个不同url的路由信息
    }
    
    h.AddRoutes(fluxRoute)

3.4 关键函数之s.serverTCP(),使用之前初始化的listener和handler启动真正的http服务

    err := http.Serve(listener, s.Handler)
    if err != nil && !strings.Contains(err.Error(), "closed") {
        s.err <- fmt.Errorf("listener failed: addr=%s, err=%s", s.Addr(), err)
    }

连接数限制

  1. 使用 LimitListener实现,在原始的Listener外包了一层还实现这个限制功能
  2. LimitListener定义: 从下面的代码可以看出创建了一个带缓冲区的chan, 其缓冲区大小为要限制的连接数的大小
type limitListener struct {
    net.Listener
    sem chan struct{}
}
func LimitListener(l net.Listener, n int) net.Listener {
    return &limitListener{Listener: l, sem: make(chan struct{}, n)}
}
  1. 接收连接:
func (l *limitListener) Accept() (net.Conn, error) {
    for {
        c, err := l.Listener.Accept()
        if err != nil {
            return nil, err
        }
        
        // 如果接收的连接数达到sem chan缓冲区的大小,下面这个select将进入default分支,立即close掉当前连接
        // 否则返回封装后的limitListenerConn, 它在close时调用l.release, 读取sem chan中数据,释放缓冲区空间
        select {
        case l.sem <- struct{}{}:
            return &limitListenerConn{Conn: c, release: l.release}, nil
        default:
            c.Close()
        }
    }
}

Query请求的处理流程

  1. 主要实现在 func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.User)

  2. 调整 ResponseWriter: 根据请求中的Accept头,来使用不同的ResponseWriter, 作用是设置Http Reponse中对应的Content-Type和格式化Body部分,目前支持三种类型:text/csvapplication/jsonapplication/x-msgpack, 具体实现可在 services/httpd/response_writer.go

  3. 解析http request: 包括 uri和body部分, 最后生成 influxql.QueryExecutionOptions
    3.1 生成 influxql.Query: 通常在request uri中的q=是query语句,比如:select * from m1, 会经过influxql.NewParserp.ParseQuery()的处理
    3.2 生成ExecutionOptions:

opts := query.ExecutionOptions{
Database:        db,
RetentionPolicy: r.FormValue("rp"),
ChunkSize:       chunkSize,
ReadOnly:        r.Method == "GET",
NodeID:          nodeID,
}
  1. 设置closing chan, 当当前的http连接断开时,close掉这个closing chan, 即通过当前正在处理的query请求,作相应的处理
var closing chan struct{}
    if !async {
        closing = make(chan struct{})
        if notifier, ok := w.(http.CloseNotifier); ok {
            // CloseNotify() is not guaranteed to send a notification when the query
            // is closed. Use this channel to signal that the query is finished to
            // prevent lingering goroutines that may be stuck.
            done := make(chan struct{})
            defer close(done)

            notify := notifier.CloseNotify()
            go func() {
                // Wait for either the request to finish
                // or for the client to disconnect
                select {
                case <-done:
                case <-notify:
                    close(closing)
                }
            }()
            opts.AbortCh = done
        } else {
            defer close(closing)
        }
    }
  1. 执行具体的query操作: results := h.QueryExecutor.ExecuteQuery(q, opts, closing), 返回results是个chan, 所有的query结果都从这个chan循环读取出来;
  2. 非chunked方式的Response的合成:所有结果合部缓存在内存中,从上面5中的chan循环读取出来result, 先作h.Config.MaxRowLimit返回行数的限制检查,再作merge,为了相同Series的数据连续存放和节省内存占用.
        l := len(resp.Results)
        if l == 0 {
            resp.Results = append(resp.Results, r)
        } else if resp.Results[l-1].StatementID == r.StatementID { //相同StatemnetID的result是连续返回的,中间没有间隔
            if r.Err != nil {
                resp.Results[l-1] = r
                continue
            }

            cr := resp.Results[l-1]
            rowsMerged := 0
            if len(cr.Series) > 0 {
                lastSeries := cr.Series[len(cr.Series)-1]

                for _, row := range r.Series {
                    if !lastSeries.SameSeries(row) { //相同Series的row是连续返回的,中间没有间隔
                        // Next row is for a different series than last.
                        break
                    }
                    // Values are for the same series, so append them.
                    lastSeries.Values = append(lastSeries.Values, row.Values...)
                    rowsMerged++
                }
            }

            // Append remaining rows as new rows.
            r.Series = r.Series[rowsMerged:]
            cr.Series = append(cr.Series, r.Series...)
            cr.Messages = append(cr.Messages, r.Messages...)
            cr.Partial = r.Partial
        } else {
            resp.Results = append(resp.Results, r)
        }
  1. chunked方式的Response: 从上面5中的chan循环读取出来result, 每条result立即返回到client:
// Write out result immediately if chunked.
        if chunked {
            n, _ := rw.WriteResponse(Response{
                Results: []*query.Result{r},
            })
            atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
            w.(http.Flusher).Flush()
            continue
        }
  1. async请求处理: 简单讲就是不返回任何的查询结果,也就是不支持,返回的http code是StatusNoContent
if async {
        go h.async(q, results)
        h.writeHeader(w, http.StatusNoContent)
        return
    }

Write请求的处理流程

  1. 写入的line protocol例子:insert test_mea_1,tag1=v1,tag2=v2 cpu=1,memory=10,对应到http request:
    1.1 uri部分: /write?consistency=all&db=my_test_db_2&precision=ns&rp=
    1.2 body部分: test_mea_1,tag1=v1,tag2=v2 cpu=1,memory=10\n
  2. 实现在 func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User)中;
    2.1 解析uri和body部分:
    database := r.URL.Query().Get("db")
    ...
    if h.Config.MaxBodySize > 0 { //限制body读取的大小
        body = truncateReader(body, int64(h.Config.MaxBodySize))
    }
    if r.Header.Get("Content-Encoding") == "gzip" {
       //body解压缩
    }
    ...
    _, err := buf.ReadFrom(body) //读取body部分
    ...
    //解析 point
    points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(), r.URL.Query().Get("precision"))
    
    //决定多复本情况下的写入一致性策略
    level := r.URL.Query().Get("consistency")
    ...
    // 写入point
    h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points); influxdb.IsClientError(err)

    // 失败的话返回client返回信息
    h.httpError(..)
    
    // 成功时返回
    h.writeHeader(w, http.StatusNoContent)

其他Http request请求的处理不一一详述

补充一下Influxdb中的Handler.AddRoute的实现

  1. 其作用就是添加http uri的路由信息,将相应的uri与具体的handler函数对应起来;
  2. Route的定义
 type Route struct {
    Name           string
    Method         string
    Pattern        string
    Gzipped        bool
    LoggingEnabled bool
    HandlerFunc    interface{}
}

  //query请求对应的Route
   Route{
            "query", // Query serving route.
            "POST", "/query", true, true, h.serveQuery,
        }
        
    //写请求对应的Route
    Route{
        "write", // Data-ingest route.
        "POST", "/write", true, writeLogEnabled, h.serveWrite,
    }
  1. Influxdb使用了golang提供的net/http包来实现它的http服务,具体的http请求都会对应到相应的http.Handler, 而http.Handler又使用了http.HandlerFunc来产生,参见:HandlerFunc, 这个AddRout就利用了HandlerFunc将handler层层包装,添加各种功能;
  2. 我们来剖析一下AddRoute的处理流程
    4.1 处理框架
// 针于每个route分别处理
for _, r := range routes {
        //利用route的定义和当前influxdb的config来包装生成handler
        var handler http.Handler
        ... //对handler进行层层包装
        //将route和handler添加到mux, 这里这个使用了第三方的模式复用器: https://github.com/bmizerany/pat
        h.mux.Add(r.Method, r.Pattern, handler)
}

4.2 添加验证处理`handler = authenticate(hf, h, h.Config.AuthEnabled)

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        // influxdb的config里没有启动验证,走下面的逻辑
        if !requireAuthentication {
            inner(w, r, nil)
            return
        }
        
        // 验证通过会生成这个 meta.User,传过最终的请求处理函数,作授权验证
        var user meta.User

        // TODO corylanou: never allow this in the future without users
        if requireAuthentication && h.MetaClient.AdminUserExists() {
            creds, err := parseCredentials(r)
            if err != nil {
                atomic.AddInt64(&h.stats.AuthenticationFailures, 1)
                h.httpError(w, err.Error(), http.StatusUnauthorized)
                return
            }

            // http 验证支持两种,User和jwt Bearer验证,这都有对应的rfc,具体内容不展开了
            // 其中user验证又包括 basic auth和uri中自带username和password两种方式
            // 如果验证不通过,就直接返回给客户端 h.httpError(w, "xxxx", http.StatusUnauthorized)
            switch creds.Method {
            case UserAuthentication:
                ...
            case BearerAuthentication:
                ...
            default:
                h.httpError(w, "unsupported authentication", http.StatusUnauthorized)
            }

        }
        
        // 调用最终的请求处理函数
        inner(w, r, user)
    })

4.3 handler = cors(handler) : 给response添加cors headers
4.4 handler = requestID(handler) : 给response添加request id
4.5 handler = h.recovery(handler, r.Name) : 在处理请求过程中捕获panic

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容