loki源码阅读之querier

简介

querier处理查询,获取来自ingester和长期存储的日志数据,在从后端存储中执行相同的查询前,先从ingester的内存中查询.

相关源码

cmd/loki/main.go中

func main() {
    ...
    t, err := loki.New(config)
    util.CheckFatal("initialising loki", err)

    level.Info(util.Logger).Log("msg", "Starting Loki", "version", version.Info())

    err = t.Run()
    ...
}

pkg/loki/loki.go中

func New(cfg Config) (*Loki, error) {
    loki := &Loki{
        cfg: cfg,
    }

    loki.setupAuthMiddleware()
    if err := loki.setupModuleManager(); err != nil {
        return nil, err
    }
    storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)

    return loki, nil
}

func (t *Loki) setupModuleManager() error {
    mm := modules.NewManager()

    mm.RegisterModule(Server, t.initServer)
    mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig)
    mm.RegisterModule(MemberlistKV, t.initMemberlistKV)
    mm.RegisterModule(Ring, t.initRing)
    mm.RegisterModule(Overrides, t.initOverrides)
    mm.RegisterModule(Distributor, t.initDistributor)
    mm.RegisterModule(Store, t.initStore)
    mm.RegisterModule(Ingester, t.initIngester)
    mm.RegisterModule(Querier, t.initQuerier)
    mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
    mm.RegisterModule(TableManager, t.initTableManager)
    mm.RegisterModule(All, nil)

    // Add dependencies
    deps := map[string][]string{
        Ring:          {RuntimeConfig, Server, MemberlistKV},
        Overrides:     {RuntimeConfig},
        Distributor:   {Ring, Server, Overrides},
        Store:         {Overrides},
        Ingester:      {Store, Server, MemberlistKV},
        Querier:       {Store, Ring, Server},
        QueryFrontend: {Server, Overrides},
        TableManager:  {Server},
        All:           {Querier, Ingester, Distributor, TableManager},
    }

    for mod, targets := range deps {
        if err := mm.AddDependency(mod, targets...); err != nil {
            return err
        }
    }

    t.moduleManager = mm

    return nil
}

func (t *Loki) Run() error {
    serviceMap, err := t.moduleManager.InitModuleServices(t.cfg.Target)
    if err != nil {
        return err
    }

    t.serviceMap = serviceMap
    ...
    var servs []services.Service
    for _, s := range serviceMap {
        servs = append(servs, s)
    }
    ...
    sm, err := services.NewManager(servs...)
    ...
    err = sm.StartAsync(context.Background())

    ...
}

github.com/cortexproject/cortex/pkg/util/modules/modules.go中

func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
    if _, ok := m.modules[target]; !ok {
        return nil, fmt.Errorf("unrecognised module name: %s", target)
    }
    servicesMap := map[string]services.Service{}

    // initialize all of our dependencies first
    deps := m.orderedDeps(target)
    deps = append(deps, target) // lastly, initialize the requested module

    for ix, n := range deps {
        mod := m.modules[n]

        var serv services.Service

        if mod.initFn != nil {
            s, err := mod.initFn()
            if err != nil {
                return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
            }

            if s != nil {
                // We pass servicesMap, which isn't yet complete. By the time service starts,
                // it will be fully built, so there is no need for extra synchronization.
                serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:]))
            }
        }

        if serv != nil {
            servicesMap[n] = serv
        }
    }

    return servicesMap, nil
}

github.com/cortexproject/cortex/pkg/util/services/manager.go中

func NewManager(services ...Service) (*Manager, error) {
    if len(services) == 0 {
        return nil, errors.New("no services")
    }

    m := &Manager{
        services:  services,
        byState:   map[State][]Service{},
        healthyCh: make(chan struct{}),
        stoppedCh: make(chan struct{}),
    }
    ...
    return m, nil
}

func (m *Manager) StartAsync(ctx context.Context) error {
    for _, s := range m.services {
        err := s.StartAsync(ctx)
        if err != nil {
            return err
        }
    }
    return nil
}

pkg/loki/modules.go中

func (t *Loki) initQuerier() (services.Service, error) {
    level.Debug(util.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.cfg.Worker))
    worker, err := frontend.NewWorker(t.cfg.Worker, cortex_querier.Config{MaxConcurrent: t.cfg.Querier.MaxConcurrent}, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
    if err != nil {
        return nil, err
    }
    if t.cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
        t.cfg.Querier.IngesterQueryStoreMaxLookback = t.cfg.Ingester.QueryStoreMaxLookBackPeriod
    }
    t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
    if err != nil {
        return nil, err
    }

    httpMiddleware := middleware.Merge(
        serverutil.RecoveryHTTPMiddleware,
        t.httpAuthMiddleware,
        serverutil.NewPrepopulateMiddleware(),
    )
    t.server.HTTP.Handle("/loki/api/v1/query_range", httpMiddleware.Wrap(http.HandlerFunc(t.querier.RangeQueryHandler)))
    t.server.HTTP.Handle("/loki/api/v1/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.InstantQueryHandler)))
    // Prometheus compatibility requires `loki/api/v1/labels` however we already released `loki/api/v1/label`
    // which is a little more consistent with `/loki/api/v1/label/{name}/values` so we are going to handle both paths.
    t.server.HTTP.Handle("/loki/api/v1/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
    t.server.HTTP.Handle("/loki/api/v1/labels", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
    t.server.HTTP.Handle("/loki/api/v1/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
    t.server.HTTP.Handle("/loki/api/v1/tail", httpMiddleware.Wrap(http.HandlerFunc(t.querier.TailHandler)))
    t.server.HTTP.Handle("/loki/api/v1/series", httpMiddleware.Wrap(http.HandlerFunc(t.querier.SeriesHandler)))

    t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LogQueryHandler)))
    t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
    t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
    t.server.HTTP.Handle("/api/prom/tail", httpMiddleware.Wrap(http.HandlerFunc(t.querier.TailHandler)))
    t.server.HTTP.Handle("/api/prom/series", httpMiddleware.Wrap(http.HandlerFunc(t.querier.SeriesHandler)))
    return worker, nil // ok if worker is nil here
}

pkg/querier/querier.go中

func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
    factory := func(addr string) (ring_client.PoolClient, error) {
        return client.New(clientCfg, addr)
    }

    return newQuerier(cfg, clientCfg, factory, ring, store, limits)
}

func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.PoolFactory, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
    querier := Querier{
        cfg:    cfg,
        ring:   ring,
        pool:   distributor.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
        store:  store,
        limits: limits,
    }

    querier.engine = logql.NewEngine(cfg.Engine, &querier)
    err := services.StartAndAwaitRunning(context.Background(), querier.pool)
    if err != nil {
        return nil, errors.Wrap(err, "querier pool")
    }

    return &querier, nil
}

pkg/querier/http.go中

func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
    // Enforce the query timeout while querying backends
    ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
    defer cancel()

    request, err := loghttp.ParseInstantQuery(r)
    if err != nil {
        serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
        return
    }

    if err := q.validateEntriesLimits(ctx, request.Limit); err != nil {
        serverutil.WriteError(err, w)
        return
    }

    params := logql.NewLiteralParams(
        request.Query,
        request.Ts,
        request.Ts,
        0,
        0,
        request.Direction,
        request.Limit,
        nil,
    )
    query := q.engine.Query(params)
    result, err := query.Exec(ctx)
    if err != nil {
        serverutil.WriteError(err, w)
        return
    }

    if err := marshal.WriteQueryResponseJSON(result, w); err != nil {
        serverutil.WriteError(err, w)
        return
    }
}

func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.EntryIterator, error) {
    err := q.validateQueryRequest(ctx, params.QueryRequest)
    if err != nil {
        return nil, err
    }

    var chunkStoreIter iter.EntryIterator

    if q.cfg.IngesterQueryStoreMaxLookback == 0 {
        // IngesterQueryStoreMaxLookback is zero, the default state, query the store normally
        chunkStoreIter, err = q.store.LazyQuery(ctx, params)
        if err != nil {
            return nil, err
        }
    } else if q.cfg.IngesterQueryStoreMaxLookback > 0 {
        // IngesterQueryStoreMaxLookback is greater than zero
        // Adjust the store query range to only query for data ingesters are not already querying for
        adjustedEnd := params.End.Add(-q.cfg.IngesterQueryStoreMaxLookback)
        if params.Start.After(adjustedEnd) {
            chunkStoreIter = iter.NoopIterator
        } else {
            // Make a copy of the request before modifying
            // because the initial request is used below to query ingesters
            queryRequestCopy := *params.QueryRequest
            newParams := logql.SelectParams{
                QueryRequest: &queryRequestCopy,
            }
            newParams.End = adjustedEnd
            chunkStoreIter, err = q.store.LazyQuery(ctx, newParams)
            if err != nil {
                return nil, err
            }
        }
    } else {
        // IngesterQueryStoreMaxLookback is less than zero
        // ingesters will be querying all the way back in time so there is no reason to query the store
        chunkStoreIter = iter.NoopIterator
    }

    // skip ingester queries only when QueryIngestersWithin is enabled (not the zero value) and
    // the end of the query is earlier than the lookback
    if lookback := time.Now().Add(-q.cfg.QueryIngestersWithin); q.cfg.QueryIngestersWithin != 0 && params.GetEnd().Before(lookback) {
        return chunkStoreIter, nil
    }

    iters, err := q.queryIngesters(ctx, params)
    if err != nil {
        return nil, err
    }

    return iter.NewHeapIterator(ctx, append(iters, chunkStoreIter), params.Direction), nil
}

pkg/logql/engine.go中


func NewEngine(opts EngineOpts, q Querier) *Engine {
    opts.applyDefault()
    return &Engine{
        timeout:   opts.Timeout,
        evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
    }
}
func (ng *Engine) Query(params Params) Query {
    return &query{
        timeout:   ng.timeout,
        params:    params,
        evaluator: ng.evaluator,
        parse: func(_ context.Context, query string) (Expr, error) {
            return ParseExpr(query)
        },
    }
}

func (q *query) Exec(ctx context.Context) (Result, error) {
    log, ctx := spanlogger.New(ctx, "query.Exec")
    defer log.Finish()

    rangeType := GetRangeType(q.params)
    timer := prometheus.NewTimer(queryTime.WithLabelValues(string(rangeType)))
    defer timer.ObserveDuration()

    // records query statistics
    var statResult stats.Result
    start := time.Now()
    ctx = stats.NewContext(ctx)

    data, err := q.Eval(ctx)

    statResult = stats.Snapshot(ctx, time.Since(start))
    statResult.Log(level.Debug(log))

    status := "200"
    if err != nil {
        status = "500"
        if IsParseError(err) {
            status = "400"
        }
    }
    RecordMetrics(ctx, q.params, status, statResult)

    return Result{
        Data:       data,
        Statistics: statResult,
    }, err
}

func (q *query) Eval(ctx context.Context) (parser.Value, error) {
    ctx, cancel := context.WithTimeout(ctx, q.timeout)
    defer cancel()

    expr, err := q.parse(ctx, q.params.Query())
    if err != nil {
        return nil, err
    }

    switch e := expr.(type) {
    case SampleExpr:
        value, err := q.evalSample(ctx, e)
        return value, err

    case LogSelectorExpr:
        iter, err := q.evaluator.Iterator(ctx, e, q.params)
        if err != nil {
            return nil, err
        }

        defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
        streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval())
        return streams, err
    default:
        return nil, errors.New("Unexpected type (%T): cannot evaluate")
    }
}

func (ng *Engine) Query(params Params) Query {
    return &query{
        timeout:   ng.timeout,
        params:    params,
        evaluator: ng.evaluator,
        parse: func(_ context.Context, query string) (Expr, error) {
            return ParseExpr(query)
        },
    }
}

pkg/logql/evaluator.go中

// NewDefaultEvaluator constructs a DefaultEvaluator
func NewDefaultEvaluator(querier Querier, maxLookBackPeriod time.Duration) *DefaultEvaluator {
    return &DefaultEvaluator{
        querier:           querier,
        maxLookBackPeriod: maxLookBackPeriod,
    }

}

func (ev *DefaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr, q Params) (iter.EntryIterator, error) {
    params := SelectParams{
        QueryRequest: &logproto.QueryRequest{
            Start:     q.Start(),
            End:       q.End(),
            Limit:     q.Limit(),
            Direction: q.Direction(),
            Selector:  expr.String(),
            Shards:    q.Shards(),
        },
    }

    if GetRangeType(q) == InstantType {
        params.Start = params.Start.Add(-ev.maxLookBackPeriod)
    }

    return ev.querier.Select(ctx, params)

}

github.com/cortexproject/cortex/pkg/util/services/basic_service.go中

func NewBasicService(start StartingFn, run RunningFn, stop StoppingFn) *BasicService {
    return &BasicService{
        startFn:             start,
        runningFn:           run,
        stoppingFn:          stop,
        state:               New,
        runningWaitersCh:    make(chan struct{}),
        terminatedWaitersCh: make(chan struct{}),
    }
}

func (b *BasicService) StartAsync(parentContext context.Context) error {
    switched, oldState := b.switchState(New, Starting, func() {
        b.serviceContext, b.serviceCancel = context.WithCancel(parentContext)
        b.notifyListeners(func(l Listener) { l.Starting() }, false)
        go b.main()
    })

    if !switched {
        return invalidServiceStateError(oldState, New)
    }
    return nil
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,576评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,515评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,017评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,626评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,625评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,255评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,825评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,729评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,271评论 1 320
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,363评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,498评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,183评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,867评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,338评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,458评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,906评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,507评论 2 359