loki源码阅读之ingester

简介

ingester负责在把日志写入到长期存储后端和返回日志数据

相关源码

cmd/loki/main.go中

func main() {
    ...
    t, err := loki.New(config)
    util.CheckFatal("initialising loki", err)

    level.Info(util.Logger).Log("msg", "Starting Loki", "version", version.Info())

    err = t.Run()
    ...
}

pkg/loki/loki.go中

func New(cfg Config) (*Loki, error) {
    loki := &Loki{
        cfg: cfg,
    }

    loki.setupAuthMiddleware()
    if err := loki.setupModuleManager(); err != nil {
        return nil, err
    }
    storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)

    return loki, nil
}

func (t *Loki) setupModuleManager() error {
    mm := modules.NewManager()

    mm.RegisterModule(Server, t.initServer)
    mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig)
    mm.RegisterModule(MemberlistKV, t.initMemberlistKV)
    mm.RegisterModule(Ring, t.initRing)
    mm.RegisterModule(Overrides, t.initOverrides)
    mm.RegisterModule(Distributor, t.initDistributor)
    mm.RegisterModule(Store, t.initStore)
    mm.RegisterModule(Ingester, t.initIngester)
    mm.RegisterModule(Querier, t.initQuerier)
    mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
    mm.RegisterModule(TableManager, t.initTableManager)
    mm.RegisterModule(All, nil)

    // Add dependencies
    deps := map[string][]string{
        Ring:          {RuntimeConfig, Server, MemberlistKV},
        Overrides:     {RuntimeConfig},
        Distributor:   {Ring, Server, Overrides},
        Store:         {Overrides},
        Ingester:      {Store, Server, MemberlistKV},
        Querier:       {Store, Ring, Server},
        QueryFrontend: {Server, Overrides},
        TableManager:  {Server},
        All:           {Querier, Ingester, Distributor, TableManager},
    }

    for mod, targets := range deps {
        if err := mm.AddDependency(mod, targets...); err != nil {
            return err
        }
    }

    t.moduleManager = mm

    return nil
}

func (t *Loki) Run() error {
    serviceMap, err := t.moduleManager.InitModuleServices(t.cfg.Target)
    if err != nil {
        return err
    }

    t.serviceMap = serviceMap
    ...
    var servs []services.Service
    for _, s := range serviceMap {
        servs = append(servs, s)
    }
    ...
    sm, err := services.NewManager(servs...)
    ...
    err = sm.StartAsync(context.Background())

    ...
}

github.com/cortexproject/cortex/pkg/util/modules/modules.go中

func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
    if _, ok := m.modules[target]; !ok {
        return nil, fmt.Errorf("unrecognised module name: %s", target)
    }
    servicesMap := map[string]services.Service{}

    // initialize all of our dependencies first
    deps := m.orderedDeps(target)
    deps = append(deps, target) // lastly, initialize the requested module

    for ix, n := range deps {
        mod := m.modules[n]

        var serv services.Service

        if mod.initFn != nil {
            s, err := mod.initFn()
            if err != nil {
                return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
            }

            if s != nil {
                // We pass servicesMap, which isn't yet complete. By the time service starts,
                // it will be fully built, so there is no need for extra synchronization.
                serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:]))
            }
        }

        if serv != nil {
            servicesMap[n] = serv
        }
    }

    return servicesMap, nil
}

github.com/cortexproject/cortex/pkg/util/services/manager.go中

func NewManager(services ...Service) (*Manager, error) {
    if len(services) == 0 {
        return nil, errors.New("no services")
    }

    m := &Manager{
        services:  services,
        byState:   map[State][]Service{},
        healthyCh: make(chan struct{}),
        stoppedCh: make(chan struct{}),
    }
    ...
    return m, nil
}

func (m *Manager) StartAsync(ctx context.Context) error {
    for _, s := range m.services {
        err := s.StartAsync(ctx)
        if err != nil {
            return err
        }
    }
    return nil
}

pkg/loki/modules.go中

func (t *Loki) initStore() (_ services.Service, err error) {
    if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
        t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
        switch t.cfg.Target {
        case Ingester:
            // We do not want ingester to unnecessarily keep downloading files
            t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeWriteOnly
        case Querier:
            // We do not want query to do any updates to index
            t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadOnly
        default:
            t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadWrite
        }
    }

    t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
    if err != nil {
        return
    }

    return services.NewIdleService(nil, func(_ error) error {
        t.store.Stop()
        return nil
    }), nil
}

func (t *Loki) initIngester() (_ services.Service, err error) {
    t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
    t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
    t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort

    // We want ingester to also query the store when using boltdb-shipper
    pc := activePeriodConfig(t.cfg.SchemaConfig)
    if pc.IndexType == local.BoltDBShipperType {
        t.cfg.Ingester.QueryStore = true
        mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
        if err != nil {
            return nil, err
        }
        t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
    }

    t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, prometheus.DefaultRegisterer)
    if err != nil {
        return
    }

    logproto.RegisterPusherServer(t.server.GRPC, t.ingester)
    logproto.RegisterQuerierServer(t.server.GRPC, t.ingester)
    logproto.RegisterIngesterServer(t.server.GRPC, t.ingester)
    grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
    t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
    return t.ingester, nil
}

pkg/ingester/ingester.go中

func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
    if cfg.ingesterClientFactory == nil {
        cfg.ingesterClientFactory = client.New
    }
    enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding)
    if err != nil {
        return nil, err
    }

    i := &Ingester{
        cfg:          cfg,
        clientConfig: clientConfig,
        instances:    map[string]*instance{},
        store:        store,
        loopQuit:     make(chan struct{}),
        flushQueues:  make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
        tailersQuit:  make(chan struct{}),
        factory: func() chunkenc.Chunk {
            return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
        },
    }

    i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer)
    if err != nil {
        return nil, err
    }

    i.lifecyclerWatcher = services.NewFailureWatcher()
    i.lifecyclerWatcher.WatchService(i.lifecycler)

    // Now that the lifecycler has been created, we can create the limiter
    // which depends on it.
    i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)

    i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
    return i, nil
}

func (i *Ingester) starting(ctx context.Context) error {
    i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
    for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
        i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)
        go i.flushLoop(j)
    }

    // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
    err := i.lifecycler.StartAsync(context.Background())
    if err != nil {
        return err
    }

    err = i.lifecycler.AwaitRunning(ctx)
    if err != nil {
        return err
    }

    // start our loop
    i.loopDone.Add(1)
    go i.loop()
    return nil
}

func (i *Ingester) loop() {
    defer i.loopDone.Done()

    flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)
    defer flushTicker.Stop()

    for {
        select {
        case <-flushTicker.C:
            i.sweepUsers(false)

        case <-i.loopQuit:
            return
        }
    }
}

func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
    instanceID, err := user.ExtractOrgID(ctx)
    if err != nil {
        return nil, err
    } else if i.readonly {
        return nil, ErrReadOnly
    }

    instance := i.getOrCreateInstance(instanceID)
    err = instance.Push(ctx, req)
    return &logproto.PushResponse{}, err
}

func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
    inst, ok := i.getInstanceByID(instanceID)
    if ok {
        return inst
    }

    i.instancesMtx.Lock()
    defer i.instancesMtx.Unlock()
    inst, ok = i.instances[instanceID]
    if !ok {
        inst = newInstance(&i.cfg, instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
        i.instances[instanceID] = inst
    }
    return inst
}

pkg/ingester/flush.go中

func (i *Ingester) flushLoop(j int) {
    defer func() {
        level.Debug(util.Logger).Log("msg", "Ingester.flushLoop() exited")
        i.flushQueuesDone.Done()
    }()

    for {
        o := i.flushQueues[j].Dequeue()
        if o == nil {
            return
        }
        op := o.(*flushOp)

        level.Debug(util.Logger).Log("msg", "flushing stream", "userid", op.userID, "fp", op.fp, "immediate", op.immediate)

        err := i.flushUserSeries(op.userID, op.fp, op.immediate)
        if err != nil {
            level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
        }

        // If we're exiting & we failed to flush, put the failed operation
        // back in the queue at a later point.
        if op.immediate && err != nil {
            op.from = op.from.Add(flushBackoff)
            i.flushQueues[j].Enqueue(op)
        }
    }
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
    instance, ok := i.getInstanceByID(userID)
    if !ok {
        return nil
    }

    chunks, labels := i.collectChunksToFlush(instance, fp, immediate)
    if len(chunks) < 1 {
        return nil
    }

    ctx := user.InjectOrgID(context.Background(), userID)
    ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
    defer cancel()
    err := i.flushChunks(ctx, fp, labels, chunks)
    if err != nil {
        return err
    }

    instance.streamsMtx.Lock()
    for _, chunk := range chunks {
        chunk.flushed = time.Now()
    }
    instance.streamsMtx.Unlock()
    return nil
}


func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error {
    ...
    if err := i.store.Put(ctx, wireChunks); err != nil {
        return err
    }
    ...
    return nil
}

pkg/ingester/instance.go中

func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
    i := &instance{
        cfg:        cfg,
        streams:    map[model.Fingerprint]*stream{},
        index:      index.New(),
        instanceID: instanceID,

        streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
        streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),

        factory: factory,
        tailers: map[uint32]*tailer{},
        limiter: limiter,

        syncPeriod:  syncPeriod,
        syncMinUtil: syncMinUtil,
    }
    i.mapper = newFPMapper(i.getLabelsFromFingerprint)
    return i
}

func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
    i.streamsMtx.Lock()
    defer i.streamsMtx.Unlock()

    var appendErr error
    for _, s := range req.Streams {

        stream, err := i.getOrCreateStream(s)
        if err != nil {
            appendErr = err
            continue
        }

        prevNumChunks := len(stream.chunks)
        if err := stream.Push(ctx, s.Entries, i.syncPeriod, i.syncMinUtil); err != nil {
            appendErr = err
            continue
        }

        memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
    }

    return appendErr
}

func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
    ...
    stream = newStream(i.cfg, fp, sortedLabels, i.factory)
    ...
}

pkg/ingester/stream.go中

func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
    ...
    chunk := &s.chunks[len(s.chunks)-1]
    ...
    if err := chunk.chunk.Append(&entries[i]); err != nil {
    ...
}

github.com/cortexproject/cortex/pkg/util/services/basic_service.go中

func NewBasicService(start StartingFn, run RunningFn, stop StoppingFn) *BasicService {
    return &BasicService{
        startFn:             start,
        runningFn:           run,
        stoppingFn:          stop,
        state:               New,
        runningWaitersCh:    make(chan struct{}),
        terminatedWaitersCh: make(chan struct{}),
    }
}

func (b *BasicService) StartAsync(parentContext context.Context) error {
    switched, oldState := b.switchState(New, Starting, func() {
        b.serviceContext, b.serviceCancel = context.WithCancel(parentContext)
        b.notifyListeners(func(l Listener) { l.Starting() }, false)
        go b.main()
    })

    if !switched {
        return invalidServiceStateError(oldState, New)
    }
    return nil
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,576评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,515评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,017评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,626评论 1 296
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,625评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,255评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,825评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,729评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,271评论 1 320
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,363评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,498评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,183评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,867评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,338评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,458评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,906评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,507评论 2 359