kafka性能监控之KafkaMetrics Sensor

说起kafka的metrics,很多人应该是即陌生又熟悉,

熟悉是因为阅读源码的过程中,不可避免地会看到metrics.add()的代码.而陌生是因为metrics仅仅只是辅助功能,并不是kafka主要逻辑的一部分,并不会引起读者太多的关注.

在这里首先说明一个容易产生误解的地方,不少文章说kafka使用yammers框架来实现性能监控.这么说其实没有问题,因为kafka确实通过yammers向外暴露了接口,可以通过jmx或者grahite来监视各个性能参数.但是kafka内的性能监控比如producer,consumer的配额限制,并不是通过yammer实现的.而是通过自己的一套metrics框架来实现的.

事实上,kafka有两个metrics包,在看源码的时候很容易混淆

package kafka.metrics

以及

package org.apache.kafka.common.metrics

可以看到这两个包的包名都是metrics,但是他们负责的任务并不相同,而且两个包中的类并没有任何的互相引用关系.可以看作是两个完全独立的包.kafka.mtrics这个包,主要调用yammer的Api,并进行封装,提供给client监测kafka的各个性能参数.而commons.metrics这个包是我这篇文章主要要介绍的,这个包并不是面向client提供服务的,他是为了给kafka中的其他组件,比如replicaManager,PartitionManager,QuatoManager提供调用,让这些Manager了解kafka现在的运行状况,以便作出相应决策的.

首先metrics第一次被初始化,在kafkaServer的startup()方法中

metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime, true)
quotaManagers = QuotaFactory.instantiate(config, metrics, time)

初始化了一个Metrics,并将这个实例传到quotaManagers的构造函数中,这里简单介绍一下quotaManagers.这是kafka中用来限制kafka,producer的传输速度的,比如在config文件下设置producer不能以超过5MB/S的速度传输数据,那么这个限制就是通过quotaManager来实现的.

回到metrics上,跟进代码.

public class Metrics implements Closeable {
 ....
 ....
    private final ConcurrentMap<MetricName, KafkaMetric> metrics;
    private final ConcurrentMap<String, Sensor> sensors;

metrics与sensors这两个concurrentMap是Metrics中两个重要的成员属性.那么什么是KafkaMetric,什么是Sensor呢?

首先分析KafkaMetric

KafkaMetric实现了Metric接口,可以看到它的核心方法value()返回要监控的参数的值.

public interface Metric {

    /**
     * A name for this metric
     */
    public MetricName metricName();

    /**
     * The value of the metric
     */
    public double value();

}

那么KafkaMetric又是如何实现value()方法的呢?

@Override
public double value() {
    synchronized (this.lock) {
        return value(time.milliseconds());
    }
}

double value(long timeMs) {
    return this.measurable.measure(config, timeMs);
}

原来value()是通过kafkaMetric中的另一个成员属性measurable完成

public interface Measurable {

    /**
     * Measure this quantity and return the result as a double
     * @param config The configuration for this metric
     * @param now The POSIX time in milliseconds the measurement is being taken
     * @return The measured value
     */
    public double measure(MetricConfig config, long now);

}

其实这边挺绕的,Metrics有kafkaMetric的成员变量,而kafkaMetric又通过Measurable返回要检测的值.打个比方,Metrics好比是汽车的仪表盘,kafkaMetric就是仪表盘上的一个仪表,Measurable就是对真正要检测的组件的一个封装.来看看一个Measrable的简单实现,在sender.java类中.

metrics.addMetric(m, new Measurable() {
    public double measure(MetricConfig config, long now) {
        return (now - metadata.lastSuccessfulUpdate()) / 1000.0;
    }
});

可以看到measure的实现就是简单地返回要返回的值,因为是直接在目标类中定义的,所以可以直接获得相应变量的引用.

介绍完KafkaMetric,接下来介绍Sensor,也就是下面的ConcurrentMap中的Sensor

private final ConcurrentMap<String, Sensor> sensors;

以下是Sensor类的源码

/**
 * A sensor applies a continuous sequence of numerical values to a set of associated metrics. For example a sensor on
 * message size would record a sequence of message sizes using the {@link #record(double)} api and would maintain a set
 * of metrics about request sizes such as the average or max.
 */
public final class Sensor {
    //一个kafka就只有一个Metrics实例,这个registry就是对这个Metrics的引用
    private final Metrics registry;
    private final String name;
    private final Sensor[] parents;
    private final List<Stat> stats;
    private final List<KafkaMetric> metrics;

这一段的注释很有意义,从注释中可以看到Sensor的作用不同KafkaMetric. KafkaMetric仅仅是返回某一个参数的值,而Sensor有基于某一参数时间序列进行统计的功能,比如平均值,最大值,最小值.那这些统计又是如何实现的呢?答案是List<Stat> stats这个属性成员.

public interface Stat {

    /**
     * Record the given value
     * @param config The configuration to use for this metric
     * @param value The value to record
     * @param timeMs The POSIX time in milliseconds this value occurred
     */
    public void record(MetricConfig config, double value, long timeMs);

}

可以看到Stat是一个接口,其中有一个record方法可以记录一个采样数值,下面看一个例子,max这个功能如何用Stat来实现?

public final class Max extends SampledStat {

    public Max() {
        super(Double.NEGATIVE_INFINITY);
    }

    @Override
    protected void update(Sample sample, MetricConfig config, double value, long now) {
        sample.value = Math.max(sample.value, value);
    }

    @Override
    public double combine(List<Sample> samples, MetricConfig config, long now) {
        double max = Double.NEGATIVE_INFINITY;
        for (int i = 0; i < samples.size(); i++)
            max = Math.max(max, samples.get(i).value);
        return max;
    }

}

是不是很简单,update相当于冒一次泡,把当前的值与历史的最大值比较.combine相当于用一次完整的冒泡排序找出最大值,需要注意的是,max是继承SampleStat的,而SampleStat是Stat接口的实现类.那我们回到Sensor类上来.

public void record(double value, long timeMs) {
    this.lastRecordTime = timeMs;
    synchronized (this) {
        // increment all the stats
        for (int i = 0; i < this.stats.size(); i++)
            this.stats.get(i).record(config, value, timeMs);
        checkQuotas(timeMs);
    }
    for (int i = 0; i < parents.length; i++)
        parents[i].record(value, timeMs);
}

record方法,每个注册于其中的stats提交值,同时如果自己有父sensor的话,向父sensor提交.

public void checkQuotas(long timeMs) {
    for (int i = 0; i < this.metrics.size(); i++) {
        KafkaMetric metric = this.metrics.get(i);
        MetricConfig config = metric.config();
        if (config != null) {
            Quota quota = config.quota();
            if (quota != null) {
                double value = metric.value(timeMs);
                if (!quota.acceptable(value)) {
                    throw new QuotaViolationException(
                        metric.metricName(),
                        value,
                        quota.bound());
                }
            }
        }
    }
}

checkQuotas,通过这里其实是遍历注册在sensor上的每一个KafkaMetric来检查他们的值有没有超过config文件中设置的配额.注意这里的QuotaVioLationException,是不是很熟悉.在QuatoManager中,如果有一个client的上传/下载速度超过指定配额.那么就会抛出这个异常

try {
  clientSensors.quotaSensor.record(value)
  // trigger the callback immediately if quota is not violated
  callback(0)
} catch {
  case qve: QuotaViolationException =>
    // Compute the delay
    val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId))
    throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota))
    clientSensors.throttleTimeSensor.record(throttleTimeMs)
    // If delayed, add the element to the delayQueue
    delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
    delayQueueSensor.record()
    logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}

最后,Sensor会初始化一个线程专门用来清除长时间没有使用的Sensor.这个线程名为"SensorExpiryThread"

class ExpireSensorTask implements Runnable {
    public void run() {
        for (Map.Entry<String, Sensor> sensorEntry : sensors.entrySet()) {
            // removeSensor also locks the sensor object. This is fine because synchronized is reentrant
            // There is however a minor race condition here. Assume we have a parent sensor P and child sensor C.
            // Calling record on C would cause a record on P as well.
            // So expiration time for P == expiration time for C. If the record on P happens via C just after P is removed,
            // that will cause C to also get removed.
            // Since the expiration time is typically high it is not expected to be a significant concern
            // and thus not necessary to optimize
            synchronized (sensorEntry.getValue()) {
                if (sensorEntry.getValue().hasExpired()) {
                    log.debug("Removing expired sensor {}", sensorEntry.getKey());
                    removeSensor(sensorEntry.getKey());
                }
            }
        }
    }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容