简介
querier处理查询,获取来自ingester和长期存储的日志数据,在从后端存储中执行相同的查询前,先从ingester的内存中查询.
相关源码
cmd/loki/main.go中
func main() {
...
t, err := loki.New(config)
util.CheckFatal("initialising loki", err)
level.Info(util.Logger).Log("msg", "Starting Loki", "version", version.Info())
err = t.Run()
...
}
pkg/loki/loki.go中
func New(cfg Config) (*Loki, error) {
loki := &Loki{
cfg: cfg,
}
loki.setupAuthMiddleware()
if err := loki.setupModuleManager(); err != nil {
return nil, err
}
storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)
return loki, nil
}
func (t *Loki) setupModuleManager() error {
mm := modules.NewManager()
mm.RegisterModule(Server, t.initServer)
mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig)
mm.RegisterModule(MemberlistKV, t.initMemberlistKV)
mm.RegisterModule(Ring, t.initRing)
mm.RegisterModule(Overrides, t.initOverrides)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Store, t.initStore)
mm.RegisterModule(Ingester, t.initIngester)
mm.RegisterModule(Querier, t.initQuerier)
mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
mm.RegisterModule(TableManager, t.initTableManager)
mm.RegisterModule(All, nil)
// Add dependencies
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
Overrides: {RuntimeConfig},
Distributor: {Ring, Server, Overrides},
Store: {Overrides},
Ingester: {Store, Server, MemberlistKV},
Querier: {Store, Ring, Server},
QueryFrontend: {Server, Overrides},
TableManager: {Server},
All: {Querier, Ingester, Distributor, TableManager},
}
for mod, targets := range deps {
if err := mm.AddDependency(mod, targets...); err != nil {
return err
}
}
t.moduleManager = mm
return nil
}
func (t *Loki) Run() error {
serviceMap, err := t.moduleManager.InitModuleServices(t.cfg.Target)
if err != nil {
return err
}
t.serviceMap = serviceMap
...
var servs []services.Service
for _, s := range serviceMap {
servs = append(servs, s)
}
...
sm, err := services.NewManager(servs...)
...
err = sm.StartAsync(context.Background())
...
}
github.com/cortexproject/cortex/pkg/util/modules/modules.go中
func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
if _, ok := m.modules[target]; !ok {
return nil, fmt.Errorf("unrecognised module name: %s", target)
}
servicesMap := map[string]services.Service{}
// initialize all of our dependencies first
deps := m.orderedDeps(target)
deps = append(deps, target) // lastly, initialize the requested module
for ix, n := range deps {
mod := m.modules[n]
var serv services.Service
if mod.initFn != nil {
s, err := mod.initFn()
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
}
if s != nil {
// We pass servicesMap, which isn't yet complete. By the time service starts,
// it will be fully built, so there is no need for extra synchronization.
serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:]))
}
}
if serv != nil {
servicesMap[n] = serv
}
}
return servicesMap, nil
}
github.com/cortexproject/cortex/pkg/util/services/manager.go中
func NewManager(services ...Service) (*Manager, error) {
if len(services) == 0 {
return nil, errors.New("no services")
}
m := &Manager{
services: services,
byState: map[State][]Service{},
healthyCh: make(chan struct{}),
stoppedCh: make(chan struct{}),
}
...
return m, nil
}
func (m *Manager) StartAsync(ctx context.Context) error {
for _, s := range m.services {
err := s.StartAsync(ctx)
if err != nil {
return err
}
}
return nil
}
pkg/loki/modules.go中
func (t *Loki) initQuerier() (services.Service, error) {
level.Debug(util.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.cfg.Worker))
worker, err := frontend.NewWorker(t.cfg.Worker, cortex_querier.Config{MaxConcurrent: t.cfg.Querier.MaxConcurrent}, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
if err != nil {
return nil, err
}
if t.cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.cfg.Querier.IngesterQueryStoreMaxLookback = t.cfg.Ingester.QueryStoreMaxLookBackPeriod
}
t.querier, err = querier.New(t.cfg.Querier, t.cfg.IngesterClient, t.ring, t.store, t.overrides)
if err != nil {
return nil, err
}
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
t.httpAuthMiddleware,
serverutil.NewPrepopulateMiddleware(),
)
t.server.HTTP.Handle("/loki/api/v1/query_range", httpMiddleware.Wrap(http.HandlerFunc(t.querier.RangeQueryHandler)))
t.server.HTTP.Handle("/loki/api/v1/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.InstantQueryHandler)))
// Prometheus compatibility requires `loki/api/v1/labels` however we already released `loki/api/v1/label`
// which is a little more consistent with `/loki/api/v1/label/{name}/values` so we are going to handle both paths.
t.server.HTTP.Handle("/loki/api/v1/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
t.server.HTTP.Handle("/loki/api/v1/labels", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
t.server.HTTP.Handle("/loki/api/v1/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
t.server.HTTP.Handle("/loki/api/v1/tail", httpMiddleware.Wrap(http.HandlerFunc(t.querier.TailHandler)))
t.server.HTTP.Handle("/loki/api/v1/series", httpMiddleware.Wrap(http.HandlerFunc(t.querier.SeriesHandler)))
t.server.HTTP.Handle("/api/prom/query", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LogQueryHandler)))
t.server.HTTP.Handle("/api/prom/label", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
t.server.HTTP.Handle("/api/prom/label/{name}/values", httpMiddleware.Wrap(http.HandlerFunc(t.querier.LabelHandler)))
t.server.HTTP.Handle("/api/prom/tail", httpMiddleware.Wrap(http.HandlerFunc(t.querier.TailHandler)))
t.server.HTTP.Handle("/api/prom/series", httpMiddleware.Wrap(http.HandlerFunc(t.querier.SeriesHandler)))
return worker, nil // ok if worker is nil here
}
pkg/querier/querier.go中
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
factory := func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}
return newQuerier(cfg, clientCfg, factory, ring, store, limits)
}
func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.PoolFactory, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
querier := Querier{
cfg: cfg,
ring: ring,
pool: distributor.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
store: store,
limits: limits,
}
querier.engine = logql.NewEngine(cfg.Engine, &querier)
err := services.StartAndAwaitRunning(context.Background(), querier.pool)
if err != nil {
return nil, errors.Wrap(err, "querier pool")
}
return &querier, nil
}
pkg/querier/http.go中
func (q *Querier) InstantQueryHandler(w http.ResponseWriter, r *http.Request) {
// Enforce the query timeout while querying backends
ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(q.cfg.QueryTimeout))
defer cancel()
request, err := loghttp.ParseInstantQuery(r)
if err != nil {
serverutil.WriteError(httpgrpc.Errorf(http.StatusBadRequest, err.Error()), w)
return
}
if err := q.validateEntriesLimits(ctx, request.Limit); err != nil {
serverutil.WriteError(err, w)
return
}
params := logql.NewLiteralParams(
request.Query,
request.Ts,
request.Ts,
0,
0,
request.Direction,
request.Limit,
nil,
)
query := q.engine.Query(params)
result, err := query.Exec(ctx)
if err != nil {
serverutil.WriteError(err, w)
return
}
if err := marshal.WriteQueryResponseJSON(result, w); err != nil {
serverutil.WriteError(err, w)
return
}
}
func (q *Querier) Select(ctx context.Context, params logql.SelectParams) (iter.EntryIterator, error) {
err := q.validateQueryRequest(ctx, params.QueryRequest)
if err != nil {
return nil, err
}
var chunkStoreIter iter.EntryIterator
if q.cfg.IngesterQueryStoreMaxLookback == 0 {
// IngesterQueryStoreMaxLookback is zero, the default state, query the store normally
chunkStoreIter, err = q.store.LazyQuery(ctx, params)
if err != nil {
return nil, err
}
} else if q.cfg.IngesterQueryStoreMaxLookback > 0 {
// IngesterQueryStoreMaxLookback is greater than zero
// Adjust the store query range to only query for data ingesters are not already querying for
adjustedEnd := params.End.Add(-q.cfg.IngesterQueryStoreMaxLookback)
if params.Start.After(adjustedEnd) {
chunkStoreIter = iter.NoopIterator
} else {
// Make a copy of the request before modifying
// because the initial request is used below to query ingesters
queryRequestCopy := *params.QueryRequest
newParams := logql.SelectParams{
QueryRequest: &queryRequestCopy,
}
newParams.End = adjustedEnd
chunkStoreIter, err = q.store.LazyQuery(ctx, newParams)
if err != nil {
return nil, err
}
}
} else {
// IngesterQueryStoreMaxLookback is less than zero
// ingesters will be querying all the way back in time so there is no reason to query the store
chunkStoreIter = iter.NoopIterator
}
// skip ingester queries only when QueryIngestersWithin is enabled (not the zero value) and
// the end of the query is earlier than the lookback
if lookback := time.Now().Add(-q.cfg.QueryIngestersWithin); q.cfg.QueryIngestersWithin != 0 && params.GetEnd().Before(lookback) {
return chunkStoreIter, nil
}
iters, err := q.queryIngesters(ctx, params)
if err != nil {
return nil, err
}
return iter.NewHeapIterator(ctx, append(iters, chunkStoreIter), params.Direction), nil
}
pkg/logql/engine.go中
func NewEngine(opts EngineOpts, q Querier) *Engine {
opts.applyDefault()
return &Engine{
timeout: opts.Timeout,
evaluator: NewDefaultEvaluator(q, opts.MaxLookBackPeriod),
}
}
func (ng *Engine) Query(params Params) Query {
return &query{
timeout: ng.timeout,
params: params,
evaluator: ng.evaluator,
parse: func(_ context.Context, query string) (Expr, error) {
return ParseExpr(query)
},
}
}
func (q *query) Exec(ctx context.Context) (Result, error) {
log, ctx := spanlogger.New(ctx, "query.Exec")
defer log.Finish()
rangeType := GetRangeType(q.params)
timer := prometheus.NewTimer(queryTime.WithLabelValues(string(rangeType)))
defer timer.ObserveDuration()
// records query statistics
var statResult stats.Result
start := time.Now()
ctx = stats.NewContext(ctx)
data, err := q.Eval(ctx)
statResult = stats.Snapshot(ctx, time.Since(start))
statResult.Log(level.Debug(log))
status := "200"
if err != nil {
status = "500"
if IsParseError(err) {
status = "400"
}
}
RecordMetrics(ctx, q.params, status, statResult)
return Result{
Data: data,
Statistics: statResult,
}, err
}
func (q *query) Eval(ctx context.Context) (parser.Value, error) {
ctx, cancel := context.WithTimeout(ctx, q.timeout)
defer cancel()
expr, err := q.parse(ctx, q.params.Query())
if err != nil {
return nil, err
}
switch e := expr.(type) {
case SampleExpr:
value, err := q.evalSample(ctx, e)
return value, err
case LogSelectorExpr:
iter, err := q.evaluator.Iterator(ctx, e, q.params)
if err != nil {
return nil, err
}
defer helpers.LogErrorWithContext(ctx, "closing iterator", iter.Close)
streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval())
return streams, err
default:
return nil, errors.New("Unexpected type (%T): cannot evaluate")
}
}
func (ng *Engine) Query(params Params) Query {
return &query{
timeout: ng.timeout,
params: params,
evaluator: ng.evaluator,
parse: func(_ context.Context, query string) (Expr, error) {
return ParseExpr(query)
},
}
}
pkg/logql/evaluator.go中
// NewDefaultEvaluator constructs a DefaultEvaluator
func NewDefaultEvaluator(querier Querier, maxLookBackPeriod time.Duration) *DefaultEvaluator {
return &DefaultEvaluator{
querier: querier,
maxLookBackPeriod: maxLookBackPeriod,
}
}
func (ev *DefaultEvaluator) Iterator(ctx context.Context, expr LogSelectorExpr, q Params) (iter.EntryIterator, error) {
params := SelectParams{
QueryRequest: &logproto.QueryRequest{
Start: q.Start(),
End: q.End(),
Limit: q.Limit(),
Direction: q.Direction(),
Selector: expr.String(),
Shards: q.Shards(),
},
}
if GetRangeType(q) == InstantType {
params.Start = params.Start.Add(-ev.maxLookBackPeriod)
}
return ev.querier.Select(ctx, params)
}
github.com/cortexproject/cortex/pkg/util/services/basic_service.go中
func NewBasicService(start StartingFn, run RunningFn, stop StoppingFn) *BasicService {
return &BasicService{
startFn: start,
runningFn: run,
stoppingFn: stop,
state: New,
runningWaitersCh: make(chan struct{}),
terminatedWaitersCh: make(chan struct{}),
}
}
func (b *BasicService) StartAsync(parentContext context.Context) error {
switched, oldState := b.switchState(New, Starting, func() {
b.serviceContext, b.serviceCancel = context.WithCancel(parentContext)
b.notifyListeners(func(l Listener) { l.Starting() }, false)
go b.main()
})
if !switched {
return invalidServiceStateError(oldState, New)
}
return nil
}