Kafka Metrics模块解析

[TOC]

背景

Metrics是kafka内部使用的监控模块,主要有以下几个组成部分:

Measurable
Stat
Sensor
Metric

接口分析

1. Measurable

Measurable接口是度量类型最基础的接口,通过measure()方法获取被监控的值。

public interface Measurable extends MetricValueProvider<Double> {
    double measure(MetricConfig config, long now);
}
2. Stat

Stat接口表示需要经过统计计算的度量类型,例如平均值、最大值、最小值等,通过record()方法记录某值并更新度量值。

public interface Stat {
    public void record(MetricConfig config, double value, long timeMs);
}

MeasuleStat继承了Measureable接口和Stat接口,并没有添加新的方法。CompoundStat接口表示多个Stat的组合。
SampledStat是一个比较重要的抽象类,它表示一个抽样的度量值,除了Total外的其他MeasureableStat接口实现都依赖它功能。在SampleStat中可以有多个Sample并通过多个Sample完成对一个值的度量,在每个Sample中都记录了其对应的时间窗口和事件数量,SampledStat在计算最终的结果值时,可以根据这两个值决定是否使用此sample中的数据。SampledStat实现了MeasuleStat接口的record()方法和measure()方法。在record()方法中会根据时间窗口和事件数使用合适的Sample对象进行记录。

public void record(MetricConfig config, double value, long timeMs) {
//      拿到当前时间的sample对象
        Sample sample = current(timeMs);
//      检测当前sample是否已经完成取样        
        if (sample.isComplete(timeMs, config))
            sample = advance(config, timeMs);
//      更新sample对象
        update(sample, config, value, timeMs);
//      smaple对象的事件数加1
        sample.eventCount += 1;
    }

measure()方法首先会将过期的sample重置,之后调用combine方法完成计算。combine方法是抽象方法,不同子类有不同的实现。

public double measure(MetricConfig config, long now) {
//      检查sample是否过期
        purgeObsoleteSamples(config, now);
        return combine(this.samples, config, now);
}

3. Sensor

在实际应用中,对同一个操作需要有多个不同方面的度量,例如需要监控请求的最大长度,同时也需要监控请求的平均长度等。kafka通过将多个相关的度量对象封装在进sensor中实现。

4. Metric

Metrics类,负责统一管理Sensor对象、KafkaMetric对象。

public class Metrics implements Closeable {
//  默认配置信息
    private final MetricConfig config;
//  保存了添加到Metrics中的KafkaMetrics对象
    private final ConcurrentMap<MetricName, KafkaMetric> metrics;
//  保存了添加到Metrics中的Sensor的集合
    private final ConcurrentMap<String, Sensor> sensors;
//  记录了每个Sensor的子Sensor集合
    private final ConcurrentMap<Sensor, List<Sensor>> childrenSensors;
    private final List<MetricsReporter> reporters;
    private final Time time;
    private final ScheduledThreadPoolExecutor metricsScheduler;
    private static final Logger log = LoggerFactory.getLogger(Metrics.class);

//  从sensors集合中获取sensor对象,如果指定的Sensor不存在则创建新Sensor对象,并使用childrenSensors集合记录Sensor的层级关系
    public synchronized Sensor sensor(String name, MetricConfig config, long inactiveSensorExpirationTimeSeconds, Sensor.RecordingLevel recordingLevel, Sensor... parents) {
    //  根据name从sensors集合中获取sensor对象
        Sensor s = getSensor(name);
        if (s == null) {
        //  如果不存在则创建sensor对象
            s = new Sensor(this, name, parents, config == null ? this.config : config, time, inactiveSensorExpirationTimeSeconds, recordingLevel);
            this.sensors.put(name, s);
            if (parents != null) {
            // 通过childrenSensors记录sensor的层级关系
                for (Sensor parent : parents) {
                    List<Sensor> children = childrenSensors.get(parent);
                    if (children == null) {
                        children = new ArrayList<>();
                        childrenSensors.put(parent, children);
                    }
                    children.add(s);
                }
            }
            log.debug("Added sensor with name {}", name);
        }
        return s;
    }
}

使用场景

Producer、Consumer、Broker都会用到。下面以Producer举例。
Producer的构造函数中会初始化Metrics。

MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                    .tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);

Producer主要用Metrics来度量和统计"produce-throttle-time"的相关指标。

public static Sensor throttleTimeSensor(SenderMetricsRegistry metrics) {
        Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time");
        produceThrottleTimeSensor.add(metrics.produceThrottleTimeAvg, new Avg());
        produceThrottleTimeSensor.add(metrics.produceThrottleTimeMax, new Max());
        return produceThrottleTimeSensor;
    }

如上,metrics首先注册了名为“produce-throttle-time”的sensor。然后给这个sensor加了两个指标,分别是produceThrottleTimeAvg(平均值)和produceThrottleTimeMax(最大值)。这两个指标对应的度量方法分别是Avg的实例对象和Max的实例对象。
什么触发这些指标的统计呢?答案是在客户端收到发送消息的Response后。如下:

throttleTimeSensor.record(responseBody.get(CommonFields.THROTTLE_TIME_MS), now);

这个record方法解析如下:

public void record(double value, long timeMs, boolean checkQuotas) {
        if (shouldRecord()) {
            this.lastRecordTime = timeMs;
//          线程安全
            synchronized (this) {
//          遍历所有stat,这里对应的是上文的Avg和Max
                for (Stat stat : this.stats)
                    stat.record(config, value, timeMs);
                if (checkQuotas)
                    checkQuotas(timeMs);
            }
            for (Sensor parent : parents)
                parent.record(value, timeMs, checkQuotas);
        }
    }

Avg和Max都继承了SampledStat的record()方法。

public void record(MetricConfig config, double value, long timeMs) {
        Sample sample = current(timeMs);
        if (sample.isComplete(timeMs, config))
            sample = advance(config, timeMs);
//      这里的update就由各子类单独实现。
        update(sample, config, value, timeMs);
        sample.eventCount += 1;
    }
// Avg
@Override
    protected void update(Sample sample, MetricConfig config, double value, long now) {
//      很简单,先求和
        sample.value += value;
    }
//  Max
@Override
    protected void update(Sample sample, MetricConfig config, double value, long now) {
//      直接取最大值    
        sample.value = Math.max(sample.value, value);
    }

最后这两个指标的计算会由JmxReporter调用,最终的计算逻辑在SampledStat的combine()方法中。指标值最终会呈现在jmx中。

@Override
    public double measure(MetricConfig config, long now) {
        purgeObsoleteSamples(config, now);
//      measure()方法调用combine()方法
        return combine(this.samples, config, now);
    }

// Avg
@Override
    public double combine(List<Sample> samples, MetricConfig config, long now) {
        double total = 0.0;
        long count = 0;
        for (Sample s : samples) {
            total += s.value;
            count += s.eventCount;
        }
        return count == 0 ? 0 : total / count;
    }

// Max
@Override
    public double combine(List<Sample> samples, MetricConfig config, long now) {
        double max = Double.NEGATIVE_INFINITY;
        for (Sample sample : samples)
            max = Math.max(max, sample.value);
        return max;
    }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容