Loki日志系统介绍
Loki
是 Grafana Labs 团队最新的开源项目,是一个水平可扩展,高可用性,多租户的日志聚合系统。它的设计非常经济高效且易于操作,因为它不会为日志内容编制索引,而是为每个日志流编制一组标签。项目受 Prometheus 启发,官方的介绍就是:Like Prometheus, but for logs.
,类似于 Prometheus 的日志系统。
1. 介绍
与其他日志聚合系统相比,Loki
具有下面的一些特性:
- 不对日志进行全文索引。
Loki
中存储的是压缩后的非结构化日志,并且只对元数据建立索引,因此Loki
具有操作简单、低成本的优势。 - 使用与 Prometheus 相同的标签。
Loki
通过标签对日志进行索引和分组,这使得日志的扩展和操作效率更高。 - 特别适合储存 Kubernetes Pod 日志。诸如 Pod 标签之类的元数据会被自动删除和编入索引。
- Grafana 原生支持。
Loki
日志系统由以下3个部分组成:
- loki是主服务器,负责存储日志和处理查询。
- promtail是专为loki定制的代理,负责收集日志并将其发送给 loki 。
- Grafana用于 UI展示。
2. 架构
如图所示,Loki
包含Distributor、Ingester、Querier和可选的Query frontend五个组件。每个组件都会起一个用于处理内部请求的 gRPC 服务器和一个用于处理外部 API 请求的 HTTP/1服务器。
i. Distributor
Distributor 是客户端连接的组件,用于收集日志。
在 promtail 收集并将日志发送给Loki 之后, Distributor 就是第一个接收它们的组件,每秒可以接收数百万次写入。Distributor会对接收到的日志流进行正确性校验,并将验证后的chunk日志块分批并行发送到Ingester。
Loki使用一致性哈希来保证数据流和Ingester的一致性,他们共同在一个哈希环上,哈希环的信息可以存放到etcd、Consul或者内存中。当使用Consul作为哈希环的实现时,所有Ingester通过一组token注册到环中,每个token是一个随机的32-bit无符号整数,同时Ingester会上报其状态到哈希环中。由于所有的Distributor使用相同的hash环,写请求可以发送至任意节点。为了保证结果的一致性,Distributor会等待收到至少一半加一个Ingester的回复后才响应客户端。
ii. Ingester
Ingester 接收来自Distributor的日志流,并将日志压缩后存放到所连接的存储后端。
Ingester接受日志流并构建数据块,其操作通常是压缩和追加日志。每个Ingester 的生命周期有PENDING
, JOINING
, ACTIVE
, LEAVING
和 UNHEALTHY
五种状态。处于JOINING
和ACTIVE
状态的Ingester可以接受写请求,处于ACTIVE
和LEAVING
状态时可以接受读请求。
Ingester 将收到的日志流在内存中打包成 chunks ,并定期同步到存储后端。由于存储的数据类型不同,Loki 的数据块和索引可以使用不同的存储。
当满足以下条件时,chunks 会被标记为只读
:
- 当前 chunk 达到配置的最大容量
- 当前 chunk 长时间没有更新
- 发生了定期同步
当旧的 chunk 经过了压缩并被打上了只读
标志后,新的可写的 chunk 就会生成。
iii. Querier
Querier 用来查询日志,可以直接从 Ingester 和后端存储中查询数据。当客户端给定时间区间和标签选择器之后,Querier 就会查找索引来确定所有匹配 chunk ,然后对选中的日志进行 grep并返回查询结果。查询时,Querier先访问所有Ingester用于获取其内存数据,只有当内存中没有符合条件的数据时,才会向存储后端发起同样的查询请求。
需要注意的是,对于每个查询,单个 Querier 会 grep 所有相关的日志。目前 Cortex 中已经实现了并行查询,该功能可以扩展到 Loki,通过分布式的 grep 加速查询。此外,由于副本因子的存在,Querier可能会接收到重复的数据,所以其内置了去重的功能,对拥有同样时间戳、标签组和消息内容的日志进行去重处理。
iv. Query Frontend
Query frontend 是可选组件,其提供了Querier的API并可用于读加速。当系统中有该组件时,所有的读请求都会经由Query frontend而非Querier处理。
Query frontend是无状态的,生产环境中推荐 2 副本来达到调度的均衡。Query frontend会对请求做一些调整,并将请求放入一个内部的队列中。在该场景中,Querier作为workers 不断从队列中获取任务、执行任务,并将结果返回给Query frontend用于聚合。
3. 读写
i. Read Path
参考上一节中关于Querier的介绍,读操作的流程如下:
- Querier 收到 HTTP 请求
- Querier 将请求发送至Ingester 用以获取内存数据
- Ingester 收到请求后返回符合条件的数据
- 如果没有Ingester 返回数据,Querier 从后端存储加载数据并执行查询
- Querier 遍历所有数据并进行去重处理,通过HTTP连接返回最终结果
ii. Write Path
日志数据的写主要依托的是Distributor和Ingester两个组件,整体流程如下:
- Distributor 收到 HTTP 请求,用于存储流数据
- 通过 hash 环对数据流进行 hash
- Distributor将数据流发送到对应的Ingester及其副本上
- Ingester 新建 Chunk 或将数据追加到已有Chunk 上
- Distributor通过 HTTP连接发送响应信息
4. 服务启动
Loki 依据配置文件中的 Target 参数来启动对应的模块,每个模块有若干依赖项,服务启动时会对依赖进行排序并依次执行初始化。Loki 模块的代码入口位于./cmd/loki/main.go
,启动服务流程如下:
该流程较为简单,在解析传入的配置文件后,根据配置文件中指定的Target对即将启动的服务进行初始化。初始化包括对服务依赖的排序和依次启动,具体模块的初始化会在后续内容中进行详细介绍。每个服务在生命周期中有如下状态变化:
5. 深度分析之写链路
i. 第一站——Promtail
前面介绍,Loki通过Promtail收集日志并将其转发至Distributor, Promtail模块的代码入口位于./cmd/promtail/main.go
,服务启动的流程如下图所示,其中标红加粗部分为收集日志的流向:
<figcaption>promtail 日志流向</figcaption>
关于配置文件可以参考后续章节的相关内容。Promtail启动时,先解析配置文件,然后调用promtail 包的 New 函数依次启动三个模块。
1)客户端模块;客户端run时,会进入一个for循环,每隔100ms检测一次。如果有数据的话就通过 sendBatch 进行发送, http 请求将数据发送至配置文件指定的URL(此处为Distributor的地址)。
func (c *client) run() {
....
for {
// 数据来源见下一个模块,由TargetManager采集到数据后通过通道传递过来
case e := <-c.entries:
batch, ok := batches[e.tenantID]
...
// If adding the entry to the batch will increase the size over the max
// size allowed, we do send the current batch and then create a new one
if batch.sizeBytesAfter(e) > c.cfg.BatchSize {
c.sendBatch(e.tenantID, batch)
batches[e.tenantID] = newBatch(e)
break
}
// The max size of the batch isn't reached, so we can add the entry
batch.add(e)
case <-maxWaitCheck.C:
// Send all batches whose max wait time has been reached
for tenantID, batch := range batches {
if batch.age() < c.cfg.BatchWait {
continue
}
c.sendBatch(tenantID, batch)
delete(batches, tenantID)
}
}
}
}
func (c *client) sendBatch(tenantID string, batch *batch) {
buf, entriesCount, err := batch.encode()
...
status, err = c.send(ctx, tenantID, buf)
...
}
func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) {
...
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf))
if err != nil {
return -1, err
}
...
// If the tenant ID is not empty promtail is running in multi-tenant mode, so
// we should send it to Loki
if tenantID != "" {
req.Header.Set("X-Scope-OrgID", tenantID)
}
resp, err := c.client.Do(req)
...
return resp.StatusCode, err
}
2)TargetManager模块;根据配置文件,采集指定的日志,并将收集到的日志通过channel发送给上面的client。以FileTargetManager为例:
// pkg/promtail/targets/filetargetmanager.go
func NewFileTargetManager(client api.EntryHandler,...) (*FileTargetManager, error) {
...
s := &targetSyncer{
entryHandler: pipeline.Wrap(client),
}
...
}
// Wrap implements EntryMiddleware
func (p *Pipeline) Wrap(next api.EntryHandler) api.EntryHandler {
return api.EntryHandlerFunc(func(labels model.LabelSet, timestamp time.Time, line string) error {
extracted := map[string]interface{}{}
p.Process(labels, extracted, ×tamp, &line)
// if the labels set contains the __drop__ label we don't send this entry to the next EntryHandler
if _, ok := labels[dropLabel]; ok {
return nil
}
return next.Handle(labels, timestamp, line)
})
}
// pkg/promtail/client/client.go
// Handle implement EntryHandler; adds a new line to the next batch; send is async.
func (c *client) Handle(ls model.LabelSet, t time.Time, s string) error {
tenantID := c.getTenantID(ls)
c.entries <- entry{tenantID, ls, logproto.Entry{
Timestamp: t,
Line: s,
}}
}
- Server模块;该模块对外提供了一个HTTP Server,用户可以通过暴露的接口和promtail交互,进行查询或者数据修改等。
ii. 第二站——Distributor
Promtail 会将收集到的数据通过 http 请求发送至 Distributor ,上节中配置的URL即集群中 Distributor 的地址。Distributor属于Loki的一个角色,如果Loki的配置文件中指定了该角色,上述Loki的服务启动会走到initDistributor()。在./pkg/loki/modules.go
里有一个模块及其依赖的定义,其中Distributor模块的部分如下:
var modules = map[moduleName]module{
Distributor: {
deps: []moduleName{Ring, Server, Overrides},
wrappedService: (*Loki).initDistributor, // 初始化Distributor
},
}
据此可知,Distributor在启动前需要初始化Ring,并启动Server用于处理http和grpc请求。这里的Overrides并不是一个服务,是用于动态加载配置文件的。Distributor的初始化流程如下:
Distributor初始化时,大体分为如下几步:
- 调用ingester的client.New新生成一个连接ingester的客户端,便于后续将数据通过grpc推送至ingester;
- 调用ring的 NewLifecycler 方法,将自己注册到ring中;
- 调用ring client的NewPool方法,获取所有 ingester 的pool;
Distributor 初始化的关键代码如下;
// ./pkg/distributor/distributor.go
// New a distributor creates.
func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr) // 客户端
}
}
...
distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, false) // 注册服务
...
d := Distributor{
pool: cortex_distributor.NewPool(clientCfg.PoolConfig, ingestersRing, factory, cortex_util.Logger), //获取ingesters pool
}
}
// ./pkg/ingester/client/client.go
// New returns a new ingester client.
func New(cfg Config, addr string) (HealthAndIngesterClient, error) {
opts := []grpc.DialOption{
grpc.WithInsecure(),
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
}
opts = append(opts, cfg.GRPCClientConfig.DialOption(instrumentation())...)
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, err
}
return ClosableHealthAndIngesterClient{
PusherClient: logproto.NewPusherClient(conn),
QuerierClient: logproto.NewQuerierClient(conn),
IngesterClient: logproto.NewIngesterClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
Closer: conn,
}, nil
}
当有来自Promtail的日志推送请求时,Distributor中日志的流向如下图所示:
当Promtail或者其他客户端通过HTTP请求推送日志到Distributor时,对应的Handler会调用Distributor的Push方法进行处理。大概分为以下几步:
- 获取用户ID;
- 遍历接收到的数据,将数据按照用户ID和标签进行分组;
- 针对每组数据,计算出从环中发现ingester的token值
- 调用ring的Get方法,token作为入参,获取所有的ingesters;
- 循环ingesters调用sendSamples,将数据通过grpc发送至筛选出的ingesters
其中第4步会根据配置文件指定的副本因子个数,从hash环中获取满足条件的ingesters。关键代码如下:
// ./pkg/distributor/distributor.go
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
...
for _, stream := range req.Streams {
...
keys = append(keys, util.TokenFor(userID, stream.Labels)) //遍历,计算token
...
}
...
for i, key := range keys {
replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0]) //获取ingesters
...
}
}
// ./cortex/pkg/ring/ring.go
// Get returns n (or more) ingesters which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet, error) {
...
var (
n = r.cfg.ReplicationFactor //配置的副本因子个数
start = r.search(key) // 获取ingester的起始点
)
// 从起始点开始获取ingesters
for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ {
iterations++
// Wrap i around in the ring.
i %= len(r.ringTokens)
...
}
return ReplicationSet{
Ingesters: liveIngesters,
MaxErrors: maxFailure,
}, nil
}
iii. 第三站——Ingester
Distributor 会将收集到的数据通过 grpc 请求发送至 Ingester ,Ingester属于Loki的一个角色,如果Loki的配置文件中指定了该角色,上述Loki的服务启动会走到initIngester()。在./pkg/loki/modules.go
里有一个模块及其依赖的定义,其中Ingester模块的部分如下:
var modules = map[moduleName]module{
Ingester: {
deps: []moduleName{Store, Server, MemberlistKV},
wrappedService: (*Loki).initIngester,
},
}
据此可知,Ingester在启动前需要初始化底层存储Store,并启动Server用于处理http和grpc请求。这里的MemberlistKV用于获取存储于ring中的节点列表。初始化流程如下:
值得注意的是,Ingester在初始化时会启动两个循环:
- loop :定期(默认30s)执行下面两个动作:
1)将Ingester结构体缓存中的instances数据存入队列;
2)从内存中删除flushed数据块。 - flushLoop: 将队列中的数据进行后续的传送处理
在进一步分析之前,先大概看一下几个嵌套的结构体:
// Ingester builds chunks for incoming log streams.
type Ingester struct {
...
instances map[string]*instance // 用户ID为key
flushQueues []*util.PriorityQueue // 存放instance的元数据,用于后续的flushLoop
...
}
// flushQueues中存放的数据
type flushOp struct {
from model.Time
userID string // 可获取到具体的instance
fp model.Fingerprint // 和具体的stream对应
immediate bool
}
type instance struct {
...
// we use 'mapped' fingerprints here.
streams map[model.Fingerprint]*stream // fp根据labels计算出来
index *index.InvertedIndex // 用于快速查询的反向index
...
}
type stream struct {
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
}
// 记录大概类似于{labelName:{labelValue:fp}}
type InvertedIndex struct {
shards []indexShard
}
type indexShard struct {
idx map[string]indexEntry // map的key为labelName
}
type indexEntry struct {
name string // labelName
fps map[string]indexValueEntry // map的key为labelValue
}
type indexValueEntry struct {
value string // labelValue
fps []model.Fingerprint
}
写流程
当Ingester收到来自Distributor的GRPC请求时,其处理流程如下:
上图所示的过程中,Ingester 首先从ctx中获取用户ID;然后通过getOrCreateInstance 生成新的instance,并将该instance以OrgID为key存入Ingester的map中;之后调用 instance 的Push方法:
- 根据Instance中的labels计算出对应的fp
- 创建新的stream,以fp为key存入Instance结构体的map中
- 在instance结构体的index中添加一个反向的map对,类似于
{labelName:{labelValue:fp}}
- 调用stream的Push方法,在该方法中会根据配置的chunk大小对stream进行分割,并将所有生成的chunk存入stream结构体的chunks数组中
前面提到, Ingester 在初始化的时候会启动两个循环,flushLoop用于定期处理接收到的日志请求,其处理流程如下:
对照前面的结构体定义,此处flush的时候,先从flushQueues获取元数据,然后根据元数据获取所有需要flush的chunks,之后调用store提供的接口存入底层存储。
iv. 第四站——Store
上文提到,在Ingester的初始化时,会依赖于Store的初始化,Store的初始化流程如下:
<figcaption>Store初始化流程</figcaption>
schema_config:
configs:
- from: 2020-05-09
store: tikv // 此处为自定义的IndexStore
object_store: manul // 此处为自定义的ChunkStore
schema: v11
index:
prefix: index_
period: 168h
在Store初始化的时候,会根据配置文件schema_config
指定的store
和object_store
分别创建对应的IndexClient和ChunkClient,并根据配置的时间范围,对不同时间段的数据创建不同的Schema
。关键代码如下:
// 创建Schema
func (cfg PeriodConfig) CreateSchema() (BaseSchema, error) {
buckets, bucketsPeriod := cfg.createBucketsFunc() // 创建bucketFunc,此处为dailyBuckets
switch cfg.Schema {
case "v11":
v10 := v10Entries{rowShards: cfg.RowShards}
return newSeriesStoreSchema(buckets, v11Entries{v10}), nil
}
}
// 在示例配置中,schema采取了推荐的 v11版本,则CreateSchema会返回一个seriesStoreSchema:
func newSeriesStoreSchema(buckets schemaBucketsFunc, entries seriesStoreEntries) seriesStoreSchema {
return seriesStoreSchema{
baseSchema: baseSchema{buckets: buckets, entries: entries},
entries: entries,
}
}
func (cfg *PeriodConfig) dailyBuckets(from, through model.Time, userID string) []Bucket {
var (
fromDay = from.Unix() / secondsInDay
throughDay = through.Unix() / secondsInDay
result []Bucket
)
for i := fromDay; i <= throughDay; i++ {
// tableName 根据时间点做区分,格式为:index.Prefix+t.Unix()/index.PeriodSecs
// hashKey 包含bucket开始日期
// rangeKey 从距离hashKey的offset开始到chunk的结束
relativeFrom := util.Max64(0, int64(from)-(i*millisecondsInDay))
relativeThrough := util.Min64(millisecondsInDay, int64(through)-(i*millisecondsInDay))
result = append(result, Bucket{
from: uint32(relativeFrom),
through: uint32(relativeThrough),
tableName: cfg.IndexTables.TableFor(model.TimeFromUnix(i * secondsInDay)),
hashKey: fmt.Sprintf("%s:d%d", userID, i),
bucketSize: uint32(millisecondsInDay), // helps with deletion of series ids in series store
})
}
return result
}
写流程
接上文Ingester中写入的最后部分,即将数据写入Store的流程,此处分为两部分:写入IndexStore和写入ChunkStore,写入流程如下,标红部分为写入Index的流程:
<figcaption>数据写入存储</figcaption>
写入ChunkStore的部分相对简单,写入前会先计算对应Chunk的ID,然后将ChunkID base64编码后作为后端ChunkStore中文件的名字,ChunkID的生成见后续对IndexStore的分析。数据写入IndexStore分为以下几步:
- 根据Ingester生成的Chunk计算出当前要写入Index中的entries
- 调用IndexStore的Add接口,将数据添加到缓存
- 调用IndexStore的BatchWrite接口,将数据写入IndexStore
此处着重介绍entries的计算方法,entries分为LabelEntries和ChunkEntries两部分,其中LabelEntries的计算方法如下:
1)计算chunkID
return fmt.Sprintf("%s/%x:%x:%x:%x", c.UserID, uint64(c.Fingerprint), int64(c.From), int64(c.Through), c.Checksum)
- 生成bucket,参考上述dailyBuckets的代码
3)生成entries:
const (
// For v9 schema
seriesRangeKeyV1 = '7'
labelSeriesRangeKeyV1 = '8'
// For v11 schema
labelNamesRangeKeyV1 = '9'
)
// 对labels进行sha256后经base64编码
seriesID := labelsSeriesID(labels)
// read first 32 bits of the hash and use this to calculate the shard
// rowShards可配置,默认16;对index分片,以减少大小
shard := binary.BigEndian.Uint32(seriesID) % s.rowShards
// 基于labelNames生成数据
data, err := jsoniter.ConfigFastest.Marshal(labelNames)
entries := []IndexEntry{
// Entry for metricName -> seriesID
{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s", shard, bucket.hashKey, metricName),
RangeValue: encodeRangeKey(seriesRangeKeyV1, seriesID, nil, nil),
},
// Entry for seriesID -> label names
{
TableName: bucket.tableName,
HashValue: string(seriesID),
RangeValue: encodeRangeKey(labelNamesRangeKeyV1, nil, nil, nil),
Value: data,
},
}
// Entries for metricName:labelName -> hash(value):seriesID
for _, v := range labels {
if v.Name == model.MetricNameLabel { // MetricNameLabel = "__name__"
continue
}
valueHash := sha256bytes(v.Value) //对label的值取hash,限制长度
entries = append(entries, IndexEntry{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s:%s", shard, bucket.hashKey, metricName, v.Name),
RangeValue: encodeRangeKey(labelSeriesRangeKeyV1, valueHash, seriesID, nil),
Value: []byte(v.Value),
})
}
ChunkEntries的计算方法如下:
1)计算chunkID
- 生成entries:
seriesID := labelsSeriesID(labels)
encodedThroughBytes := encodeTime(bucket.through)
entries := []IndexEntry{
// Entry for seriesID -> chunkID
{
TableName: bucket.tableName,
HashValue: bucket.hashKey + ":" + string(seriesID),
RangeValue: encodeRangeKey(chunkTimeRangeKeyV3, encodedThroughBytes, nil, []byte(chunkID)),
},
}
由上述生成entries的逻辑可知,有如下的entries在IndexStore中:
- metricName -> seriesID;
- seriesID -> label names
- metricName:labelName -> hash(value):seriesID
- seriesID -> chunkID
6. 深度分析之读链路
i. 第一站——querier
Querier属于Loki的一个角色,用于接收用户的查询请求,如果Loki的配置文件中指定了该角色,上述Loki的服务启动会走到initQuerier()。在./pkg/loki/modules.go
里有一个模块及其依赖的定义,其中Querier模块的部分如下:
var modules = map[moduleName]module{
Querier: {
deps: []moduleName{Store, Ring, Server},
wrappedService: (*Loki).initQuerier,
},
}
据此可知,Querier在启动前需要初始化底层存储Store,初始化Ring,并启动Server用于处理http和grpc请求。初始化流程如下:
Querier初始化时,大体分为如下几步:
- 调用ingester的client.New新生成一个连接ingester的客户端,便于后续将数据通过grpc推送至ingester;
- 调用ring client的NewPool方法,获取所有ingester的池;
- 启动新的 LogQL引擎,用于处理客户端传来的 LogQL查询语句
- 启动服务等待请求输入
查询流程
当有来自客户端的日志查询请求时,Querier日志流程如下图所示,本文以即时请求/loki/api/v1/query为例说明:
当Querier 收到客户端的日志查询请求时,有对应的Handler去处理。在图示的处理流程里,共分为以下几步:
- ParseInstantQuery 分析处理输入的 request 请求,格式化为可识别的结构
- NewInstantQuery 创建一个符合LogQL的查询
- Exec 执行查询,返回一个迭代器
- readStream 遍历迭代器,返回结果
值得注意的是这里的query结构包含了一个engine,而engine中隐含了query结构,querier初始化时的定义如下:
func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.PoolFactory, ring ring.ReadRing, store storage.Store, limits *validation.Overrides) (*Querier, error) {
querier := Querier{
cfg: cfg,
ring: ring,
pool: distributor.NewPool(clientCfg.PoolConfig, ring, clientFactory, util.Logger),
store: store,
limits: limits,
}
querier.engine = logql.NewEngine(cfg.Engine, &querier) // 此处将querier内嵌进engine的结构体
err := services.StartAndAwaitRunning(context.Background(), querier.pool)
if err != nil {
return nil, errors.Wrap(err, "querier pool")
}
return &querier, nil
}
Ingester和Store迭代器
上述第3步,是具体和Ingester和Store交互的地方,其调用流程如下:
如图所示,Select在验证请求之后,分别去底层Store和Ingester中获取数据,最后通过NewHeapIterator 将前两步拿到的迭代器进行组合,并返回给客户端。查询Ingester时,先从ring中获取到所有有效的节点,对所有节点进行并发查询,查询通过GRPC发送到Ingester,Ingester端进行后续处理,具体参见Ingester节。
readStream去重
Select的最后会通过NewHeapIterator将Store和Ingester中获取到的数据进行组合,之后客户端执行readStreams进行迭代,先看一下关键的数据结构:
// heapIterator iterates over a heap of iterators.
type heapIterator struct {
heap interface { //根据请求的时间顺序,逆序建立大顶堆,正序建立小顶堆
heap.Interface
Peek() EntryIterator
}
is []EntryIterator //存放所有的日志条目
prefetched bool //保证只执行一次
stats *stats.ChunkData
tuples []tuple //存放某次迭代时,labels和时间戳相同的日志条目
currEntry logproto.Entry
currLabels string
errs []error
}
type tuple struct {
logproto.Entry
EntryIterator
}
func (i *heapIterator) Next() bool {
i.prefetch() // 遍历is迭代器,将数据放入heap中,prefetched保证只遍历一次
if i.heap.Len() == 0 {
return false
}
// We support multiple entries with the same timestamp, and we want to
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value
// occurs most often.
for i.heap.Len() > 0 {
next := i.heap.Peek() //获取堆顶元素
entry := next.Entry()
// 一次遍历只获取有相同标签和时间错的日志条目
if len(i.tuples) > 0 && (i.tuples[0].Labels() != next.Labels() || !i.tuples[0].Timestamp.Equal(entry.Timestamp)) {
break
}
heap.Pop(i.heap) //遍历过的元素会出堆
// insert keeps i.tuples sorted
i.tuples = insert(i.tuples, tuple{
Entry: entry,
EntryIterator: next,
})
}
// Find in tuples which entry occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry.
t := mostCommon(i.tuples)
i.currEntry = t.Entry //设置好本次的条目和标签
i.currLabels = t.Labels()
i.tuples = i.tuples[:0] //清空,等待下次调用
return true
}
ii. 第二站——Ingester
当Ingester 收到来自Querier 的GRPC 请求时,其处理流程如下:
上图所示的过程中,Ingester 首先从ctx中获取用户ID;然后通过getOrCreateInstance 从缓存中获取instance;之后调用 instance 的 Query方法,大概流程如下:(此处需要结合上一讲里面提到的Ingester关键数据结构戳此回顾)
- 根据用户传入的查询条件,从index中获取对应的fp
- 根据fp从instance结构体的map中获取对应的stream
- 对获取到的stream进行迭代,并将迭代器返回
iii. 第三站——Store
接上文Querier读取的最后部分,从Store查询的流程比较复杂,大体分为三步:
- 解析客户端输入的logql结构;
- 根据匹配的labels和时间范围调用lazyChunks方法从IndexStore中获取 chunks 的元数据;
- 返回chunk的迭代器,供客户端迭代时调用,迭代时从chunkStore中获取数据
结合上文提到的 Store 的初始化流程和写流程,分别重点分析一下第2步和第3步。
IndexStore读取
<figcaption>IndexStore加载元数据</figcaption>
上图为从IndexStore获取Chunks元数据,步骤如下:
1)获取用户ID
2)根据用户ID,metricName和传入labels获取seriesIDs
3)根据seriesIDs获取chunkIDs
4)生成chunk 的元数据信息
// 参考前面的chunkID的格式:
// `<user id>/<fingerprint>:<start time>:<end time>:<checksum>`.
return Chunk{
UserID: userID,
Fingerprint: model.Fingerprint(fingerprint),
From: model.Time(from),
Through: model.Time(through),
Checksum: uint32(checksum),
ChecksumSet: true,
}
- 根据传入的时间对chunks进行过滤
ChunkStore读取
<figcaption>chunkStore遍历</figcaption>
上图为遍历迭代器的流程,大概流程如下:
1)nextBatch 遍历返回的所有Chunks
2)根据配置文件指定max_chunk_batch_size(默认50个chunks),从chunks元数据中pop即将迭代的数据块
3)以fp为map的key,将数据块按照时间排序,分割成时间区间不重合的chunk组
遍历所有的chunks,以fp分组;之后将相同fp的chunks进一步按照时间区间分组
result := make(map[model.Fingerprint][][]*chunkenc.LazyChunk, len(chunksByFp))
result[fp] = [][]*chunkenc.LazyChunk
4)分别获取每个chunk分组的第一个chunk
5)根据标签和查询条件对chunks进行过滤
6)调用objectclient的接口加载所有chunks
7. Loki中的Ring
在Loki 中,Ring用于Ingester和Distributor的服务注册和发现, Ring 的实现使用了键值存储,如 Consul 或 etcd。KV存储是简单的字典,在 Loki 中,使用了固定的prefix和 key :collectors/ring。
Distributor 和 Querier 通过watch该key的更新,来同步最新的value。但这种方式存在以下问题:
a. 更新操作
每次更新都需要对kv store进行两次操作:
- 读取 ring 数据到缓存;
- CAS (Compare-and-Swap)将更新后的数据回写。
CAS只有当数据在读取之后有更新的情况下才会写kv store。如果多个Ingesters同时更新ring,只有一个会成功,其余的都需要重试。这意味着新一轮的:读、更新内存和CAS循环。随着Ingesters个数的增加,CAS操作失败率也会增加,从而导致大量资源浪费(ingester和kv store)。
b. watch更新
假设有30个ingesters,ingester 每隔 10s进行一次心跳,就会有每秒3次的更新。如果有20个Distributor 和20个Querier,每个都对ring设置了watch,则40个watchers 每秒收到3次的更新,即kv store每秒同步120次更新。
基于上述问题,Loki新上线了一种kv store:memberlist,通过gossip做数据的同步,来提升Loki的可用性。文章的具体链接如下:
https://grafana.com/blog/2020/03/25/how-were-using-gossip-to-improve-cortex-and-loki-availability/
8. 自定义ChunkStore
Loki 使用 Cortex 的 chunk store 作为持久化存储,与其他 Loki 组件不同,chunk store并非独立的服务。它以代码库的形式嵌入到 Loki 的两个组件中:Ingester 和 Querier。
目前Cortex 默认实现了对如下的ChunkStore存储的支持:
如果业务需要实现自定义的ChunkStore,也只需实现 Chunk Store 的标准接口,就可以接入Loki。接口定义见Cortex源码的 pkg/chunk/storage_client.go,在 Cortex 的定义中,ChunkStore 需要实现如下接口:
// Client is for storing and retrieving chunks.
type Client interface {
Stop()
PutChunks(ctx context.Context, chunks []Chunk) error
GetChunks(ctx context.Context, chunks []Chunk) ([]Chunk, error)
DeleteChunk(ctx context.Context, chunkID string) error
}
如果是对象存储, Cortex 已内置实现了以上的接口,在开发时实现暴露的对象存储的接口即可。
// ObjectClient is used to store arbitrary data in Object Store (S3/GCS/Azure/Etc)
type ObjectClient interface {
PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error
GetObject(ctx context.Context, objectKey string) (io.ReadCloser, error)
List(ctx context.Context, prefix string) ([]StorageObject, []StorageCommonPrefix, error)
DeleteObject(ctx context.Context, objectKey string) error
Stop()
}
关于这部分的实现可以参考pkg/chunk/local/fs_object_client.go
9. 自定义IndexStore
目前Cortex 默认实现了对如下的IndexStore存储的支持:
接下来以TiKV为例说明如何实现自定义的IndexStore
a. TiKV
TiKV 是一个为 TiDB 打造的支持事务、数据强一致的分布式 Key-Value 数据库数据库,兼容MySQL协议。TiKV的系统架构如下所示:
TiKV基本概念:
Placement driver (PD). PD 是集群的manager, 负责整个TiKV的调度。
Store. 使用RocksDB作为底层存储方案,TiKV在每个Node上创建两个RocksDB实例:
rocksdb:用于存储大多数的TiKV数据
raftdb:存储Raft logs
Region. k-v数据迁移的基本单位,每个Region会复制到多个Node上,共同构成了一个Raft 组。
Node. 集群的物理节点,可以是VM、容器等。每个 Node 上可以有多个Store,Store里的数据可以跨越多个Region。当Node启动时,Node、Store 和 Region 的元数据会同步至 PD,Region 和 Store 的状态也会定期上报给PD。
Raft. 数据分布在多个 TiKV instance上,通过Raft来保证分布式数据的一致性。
b. 实现
在 Cortex 的定义中,Index 需要实现如下接口:
// IndexClient is a client for the storage of the index (e.g. DynamoDB or Bigtable).
type IndexClient interface {
Stop()
// For the write path.
NewWriteBatch() WriteBatch
BatchWrite(context.Context, WriteBatch) error
// For the read path.
QueryPages(ctx context.Context, queries []IndexQuery, callback func(IndexQuery, ReadBatch) (shouldContinue bool)) error
}
// WriteBatch represents a batch of writes.
type WriteBatch interface {
Add(tableName, hashValue string, rangeValue []byte, value []byte)
Delete(tableName, hashValue string, rangeValue []byte)
}
// ReadBatch represents the results of a QueryPages.
type ReadBatch interface {
Iterator() ReadBatchIterator
}
// ReadBatchIterator is an iterator over a ReadBatch.
type ReadBatchIterator interface {
Next() bool
RangeValue() []byte
Value() []byte
}
根据接口的定义,可以看出 index 是一组以hash key 和 range key作为关键字的条目。其中:
- hash key. 所有的读写操作都会用到
- range key. 写操作必需,读操作可选。查询可基于 prefix 或者 range
在TiKV的实现中,索引的 key 是如下的构成:
const (
separator = "\000"
endpoint = "\xff"
fixedPrefix = "t_index"
)
key:= fmt.Sprintf("%s_%s_%s", fixedPrefix+separator, tableName, hashValue) + string(rangeValue)
具体接口的实现可以结合官方提供的tikv sdk,并参考pkg/chunk/local/boltdb_index_client.go