Loki中拥有这众多的limit策略,有的已经开放到配置文件中,还有的配置代码中已经实现但还没开放出来。大部分情况下开发者给了出一些默认参数足够优秀,不过有的时候我们也不免需要微调。那么小白这次先简单捡几个比较重要的策略来说明下Limits_Config中到底限制了什么。
1. 限流器
小白前段时间无意间重启升级了下Loki的服务,由于过程持续了一段时间,当服务恢复时客户端在push日志时总是会收到如下的报错,
429 Too Many Requests Ingestion rate limit exceeded (limit 10485760 bytes/sec)
这直接导致我们部分服务器上的日志采集器缓冲区一直处于拥堵,引起日志采集的延迟。
看起来这个是触发了Loki的限流策略了,查了下官方文档,发现在limits_config中两个参数控制Loki Distributor的日志接收的速率:
limits_config:
#令牌桶注入token的速率
ingestion_rate_mb: | default = 4]
#令牌桶的容量
ingestion_burst_size_mb: | default = 6]
经过摸索发现Loki的Distributor中关于限速的方法也是采用Golang标准库的time/rate
限流器来实现的。它的大致逻辑如下:
首先distributor处理日志push请求时声明了protobuf的编码,其中可以包括多个日志流,以及每个流里面的label信息和Entry中的日志时间戳和条目信息。
type PushRequest struct {
Streams []Stream `protobuf:"bytes,1,rep,name=streams,proto3,customtype=Stream" json:"streams"`
}
type Stream struct {
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"`
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"`
}
type Entry struct {
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"`
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"`
}
从日志流的Entry中取出日志条目并计算长度得到validatedSamplesSize,之后将本次日志的长度传给限流器
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
for _, stream := range req.Streams {
if err := d.validator.ValidateLabels(userID, stream); err != nil {
validationErr = err
continue
}
entries := make([]logproto.Entry, 0, len(stream.Entries))
for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(userID, stream.Labels, entry); err != nil {
validationErr = err
continue
}
entries = append(entries, entry)
validatedSamplesSize += len(entry.Line)
validatedSamplesCount++
}
获取当前时间,以及将租户ID和validatedSamplesSize传给ingestionRateLimiter.AllowN限流器处理,如果限流成功则忽略这次提交,同时打印上述截图中的报错,
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) {
validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount))
validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize))
//抛出429 Too Many Requests异常
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg(int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize))
}
接下来就是具体的限流器内的处理,我们看到Distributor主要通过RateLimiter.AllowN的方式获取令牌。也就是说截止当前时间,判断限流器桶中令牌数与日志条目容量数N(validatedSamplesSize),满足则返回true,同时从桶中消费N个令牌,反正则返回失败。这部分我们可以在这里面找到
https://github.com/cortexproject/cortex/blob/master/pkg/util/limiter/rate_limiter.go
到这里,我们需要改变的就是令牌桶的大小和速率即可,也就是一开始limits_config中的两个参数。我们将其适当调大就可以立马得到效果:
limits_config:
ingestion_rate_mb: 32
ingestion_burst_size_mb: 64
可以看到在调整了rate和burst之后,绿色框框内的客户端的日志缓冲区很快的就被消费清空了。
扩展
除此之外,Loki的限流器还有一个全局的动态配置
limits_config:
ingestion_rate_strategy: | default = "local"
默认情况下为local,如果你的Loki是一个分布式系统话,local会将上述的限流器的令牌桶作用在每个distributor中,这会极大提高Loki日志收取的吞吐量。如果配置的为global的话,distributor则会用
ingestion_rate_mb / ring.HealthyInstancesCount
得到全局的每个distributor的租户限制速率。这部分代码在ingestion_rate_strategy.go得以实现
type globalStrategy struct {
limits *validation.Overrides
ring ReadLifecycler
}
func newGlobalIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
return &globalStrategy{
limits: limits,
ring: ring,
}
}
func (s *globalStrategy) Limit(userID string) float64 {
numDistributors := s.ring.HealthyInstancesCount()
if numDistributors == 0 {
return s.limits.IngestionRateBytes(userID)
}
return s.limits.IngestionRateBytes(userID) / float64(numDistributors)
}
根据租户开启全局的日志限速策略,目前应该还没有人用于实践吧
总结:
限流器是后台服务中非常重要的一个组件,它可以通过限制请求数或流量的方式来保护后台服务避免过载,令牌桶是一个常见的实现方法。Loki显然在这里是利用令牌桶的方式来对日志流的容量进行限制。
更多关于Golang标准库中time/rate的使用,可以查考https://godoc.org/golang.org/x/time/rate
2. 标签长度限制
Loki的Limit_Config关于长度的限制包括对label键值对的大小限制等,其主要包含以下几个:
limits_config:
# label的key最大长度
max_label_name_length: | default = 1024
# label的value最大长度
max_label_value_length: | default = 2048
# 每个流中的最大label个数
max_label_names_per_series: | default = 30
标签限制的这部分代码在distributor的validator.go中,比如在这里计算labels内的键值对个数,超过配置最大数则放弃同时返回BadRequest
ls, err := util.ToClientLabels(stream.Labels)
numLabelNames := len(ls)
if numLabelNames > v.MaxLabelNamesPerSeries(userID) {
validation.DiscardedSamples.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Inc()
bytes := 0
for _, e := range stream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.MaxLabelNamesPerSeries, userID).Add(float64(bytes))
return httpgrpc.Errorf(http.StatusBadRequest, validation.MaxLabelNamesPerSeriesErrorMsg(cortex_client.FromLabelAdaptersToMetric(ls).String(), numLabelNames, v.MaxLabelNamesPerSeries(userID)))
}
比如下面这里distributor会取出labels中的name和value的长度分别和配置中的最大长度进行对比,以及对label中的name进行比较,防止重名。
maxLabelNameLength := v.MaxLabelNameLength(userID)
maxLabelValueLength := v.MaxLabelValueLength(userID)
lastLabelName := ""
for _, l := range ls {
if len(l.Name) > maxLabelNameLength {
updateMetrics(validation.LabelNameTooLong, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelNameTooLongErrorMsg(stream.Labels, l.Name)) } else if len(l.Value) > maxLabelValueLength {
updateMetrics(validation.LabelValueTooLong, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.LabelValueTooLongErrorMsg(stream.Labels, l.Value))
} else if cmp := strings.Compare(lastLabelName, l.Name); cmp == 0 {
updateMetrics(validation.DuplicateLabelNames, userID, stream)
return httpgrpc.Errorf(http.StatusBadRequest, validation.DuplicateLabelNamesErrorMsg(stream.Labels, l.Name))
}
lastLabelName = l.Name
}
总结:
loki中对于日志标签的长度限制并没有特别的强调,不过秉着无界的label会拖垮Loki性能的准则上,我们不宜将max_label_names_per_series配置得过大
3. 租户限制
默认情况下我们部署的Loki都只有一个租户,它使用的是一个叫fake的租户id。那么在Loki的限制策略中,涉及到租户的包含如下几个参数:
limits_config:
# 每个租户的最大日志流个数
max_streams_per_user: | default = 10000
#启用全局的租户最大日志流个数,默认0关闭
#一旦配置,每个租户日志流将有ingester注册到hash环上同时状态为
#HEALTH的个数动态计算得出,任何ingester的数量变化都会动态生效到这个值
max_global_streams_per_user: default = 0
在这里我们可以看到max_streams_per_user的local和global之间的计算方法,算法如下
(max_global_streams_per_user / max_streams_per_user) * replication_factor
复制因子的配置来至ingest_config.replication_factor
以下是limiter.go中对于全局最大日志流数的代码实现:
func (l *Limiter) convertGlobalToLocalLimit(globalLimit int) int {
if globalLimit == 0 {
return 0
}
//从环中得到健康的ingester数
numIngesters := l.ring.HealthyInstancesCount()
if numIngesters > 0 {
//全局的最大值除以健康的ingester个数,再乘个复制因子
return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
}
return 0
}
在AssertMaxStreamsPerUser函数中简单处理local和global的限制数据后,对日志流个数判断,低于计算后的limit则返回空,否则抛出超过租户最大限制数的异常
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {
localLimit := l.limits.MaxLocalStreamsPerUser(userID)
globalLimit := l.limits.MaxGlobalStreamsPerUser(userID)
//根据上述算法计算出全局的限制
adjustedGlobalLimit := l.convertGlobalToLocalLimit(globalLimit)
//当adjustedGlobalLimit不为0且localLimit大于adjustedGlobalLimit时
//calculatedLimit就为动态生成的limit,反之则为localLimit的限制数
calculatedLimit := l.minNonZero(localLimit, adjustedGlobalLimit)
if calculatedLimit == 0 {
calculatedLimit = math.MaxInt32
}
//如果日志流小于calculatedLimit则返回空,否则返回超过租户日志流个数异常
if streams < calculatedLimit {
return nil
}
return fmt.Errorf(errMaxStreamsPerUserLimitExceeded, userID, streams, calculatedLimit, localLimit, globalLimit, adjustedGlobalLimit)
}
func (l *Limiter) minNonZero(first, second int) int {
if first == 0 || (second != 0 && first > second) {
return second
}
return first
}
在ingester的instance.go中,loki对调用上述AssertMaxStreamsPerUser方法来判断租户流的限制,如果返回不为空,则放弃本次处理,并抛出TooManyRequests异常
err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
if err != nil {
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
bytes += len(e.Line)
}
validation.DiscardedBytes.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(bytes))
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.StreamLimitErrorMsg())
}
总结:
可以看到Loki对于全局的动态配置计算还是比较简单,如果启用max_global_streams_per_user的话,应要合理的配置复制因子的个数。如果复制因子的个数 > 健康的ingester个数,则计算出来的租户限制会比配置文件中定义要大许多。
4. 日志条目大小限制
在Loki中,对于客户端push到distributor中产生的每条日志流是可以对其做条目的大小限制的,这个在配置里面默认是不限制,也就是说每行的日志大小可以是无限😄,当然大部分情况下我们都不会去限制这个,如果有的同学环境特殊,可以考虑开启对每行日志的大小限制。
limits_config:
# 日志条目的大小限制,默认不限制
max_line_size: | default = none
这部分的实现也比较简单,在distributor的validator.go中,ValidateEntry函数会对获取日志流中的条目长度进行比较,当MaxLineSize不为0且日志流中的条目长度大于MaxLineSize则放弃本次处理,并抛出LineTooLong异常
func (v Validator) ValidateEntry(userID string, labels string, entry logproto.Entry) error {
if maxSize := v.MaxLineSize(userID); maxSize != 0 && len(entry.Line) > maxSize {
validation.DiscardedSamples.WithLabelValues(validation.LineTooLong, userID).Inc()
validation.DiscardedBytes.WithLabelValues(validation.LineTooLong, userID).Add(float64(len(entry.Line)))
return httpgrpc.Errorf(http.StatusBadRequest, validation.LineTooLongErrorMsg(maxSize, len(entry.Line), labels))
}
}
5. 查询限制
在Loki的查询中有一个限制日志返回行数的限制,它直接控制了你在grafana或者其他平台里面能够从loki里面拿到的日志行数,大部分情况下5000行可以满足需求,如果你觉得不够可以将其调大或者设置成0
limits_config:
# 查询返回条目的限制
max_entries_limit_per_query: | default = 5000
除此之外,对于单次查询的限制还有的chunk、stream和series的,不过大部分场景我们不会去对此做调整,小白这里就不在详细分析。
limits_config:
# 单个查询最多匹配的chunk个数
max_chunks_per_query: | default = 2000000
# 限制查询是匹配到的chunk大小,默认0为不限制
max_query_length: | default = 0
# 单词查询最多匹配到的日志流个数
max_streams_matchers_per_query: | default = 1000
# 限制查询时最大的日志度量个数
max_query_series: | default = 500
# 查询的并发数
max_query_parallelism | default = 14
# 允许租户缓存结果的有效时间
max_cache_freshness_per_query |default = 1m.
总结
事实上Loki中关于limits的部分大都不需要进行额外的调整,除非我们对Loki的日志存储有明确的场景定位,这才有机会去调整这些参数。
比如你的Loki是一个内部的日志,不涉及到多租户,那你的limit策略值大都配置的比较高,如果你的loki是一个多租户场景下日志,每个租户都有自己明确的日志,这个时候你可能关注的就是合理限制单个租户的日志接受和查询,避免单个租户的行为影响集群。不过目前Loki似乎还没有针对租户身份信息来做shard服务,或许后面还有更好的方式来应对多租户的场景
关注公众号「云原生小白」,获取更多精彩内容