Spring Actuator 之 Metrics

Spring Boot Actuator 之 Metrics

Spring Boot Actuactor 的 Metrics 是用来收集系统的总体运行状态的指标数据。

指标类型

  • 计数度量器(Counter)
  • 瞬态度量器(Gauge)
  • 吞吐率度量器(Meter)
  • 直方图度量器(Histogram)
  • 采样点分位图度量器(Quantile Summery)

监控目标组件

  • Dubbo
  • Tomcat
  • JVM
  • DB Connection Pool (Druid)
  • MongoDB
  • Redis
  • RocketMQ
  • MethodInterceptor(方法级指标)

Dubbo Metrics

  1. 自定义ThreadPool 目的是对线程池运行状况进行监控。代码如下:
public class DubboMetricsThreadPool implements ThreadPool {

    private MeterRegistry simpleMeterRegistry;
    public static final String DUBBO_THREADS_PREFIX = "dubbo.threads";
    public static final String THREAD_POOL_NAME = "fixedExt";

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        register(executor);
        return executor;
    }

    private void register(ThreadPoolExecutor executor) {
        if (simpleMeterRegistry == null) {
            return;
        }
        Gauge.builder(DUBBO_THREADS_PREFIX + ".active.count", executor::getActiveCount).register(simpleMeterRegistry);
        Gauge.builder(DUBBO_THREADS_PREFIX + ".pool.size.current", executor::getPoolSize).register(simpleMeterRegistry);
        Gauge.builder(DUBBO_THREADS_PREFIX + ".pool.size.max", executor::getMaximumPoolSize).register(simpleMeterRegistry);
    }

    public void setSimpleMeterRegistry(MeterRegistry simpleMeterRegistry) {
        this.simpleMeterRegistry = simpleMeterRegistry;
    }
}
  1. 自定义 Dubbo Filter。目的是对 Dubbo的调用请求进行监控。代码如下:
@Activate(group = {PROVIDER, CONSUMER})
public class DubboMetricsFilter implements Filter {

    private MeterRegistry simpleMeterRegistry;
    private static final String DUBBO_METRICS_NAME = "dubbo.request.latency";
    public static final String EXCEPTION_TAG = "exception";
    public static final String RESULT_TAG = "result";
    private static final String DEFAULT_EXCEPTION_VALUE = "none";
    private static final String DEFAULT_RESULT_SUCCESS_VALUE = "success";
    private static final String DEFAULT_RESULT_ERROR_VALUE = "error";
    private static final String SERVICE_TAG = "service";
    public static final String METHOD_TAG = "method";
    public static final String CODE_TAG = "code";

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (simpleMeterRegistry == null) {
            return invoker.invoke(invocation);
        }
        Timer.Sample sample = Timer.start(simpleMeterRegistry);
        try {
            Result result = invoker.invoke(invocation);
            if (result instanceof AsyncRpcResult) {
                AsyncRpcResult asyncRpcResult = (AsyncRpcResult) result;
                asyncRpcResult.getResponseFuture()
                        .whenComplete((res, exception) -> {
                            record(sample, exception, invoker, invocation);
                        });
            } else {
                record(sample, null, invoker, invocation);
            }
            return result;
        } catch (Throwable e) {
            record(sample, e, invoker, invocation);
            throw e;
        }
    }

    private void record(Timer.Sample sample,
                        Throwable t,
                        Invoker<?> invoker,
                        Invocation invocation) {
        try {
            boolean isProvider = invoker.getUrl().getSide(PROVIDER).equalsIgnoreCase(PROVIDER);
            String serviceName = invoker.getInterface().getName();
            String methodName = RpcUtils.getMethodName(invocation);
            sample.stop(Timer.builder(DUBBO_METRICS_NAME)
                    .tag(RESULT_TAG, getResultTag(t))
                    .tag(SERVICE_TAG, serviceName)
                    .tag(METHOD_TAG, methodName)
                    .tag(CODE_TAG, buildCode(serviceName, methodName))
                    .tag(EXCEPTION_TAG, getExceptionTag(t))
                    .tag(SIDE_KEY, isProvider ? PROVIDER : CONSUMER)
                    .register(simpleMeterRegistry));
        } catch (Throwable ignore) {
            // ignore
        }
    }

    private String buildCode(String serviceName, String methodName) {
        return serviceName + "." + methodName;
    }

    private String getResultTag(Throwable e) {
        if (e == null) {
            return DEFAULT_RESULT_SUCCESS_VALUE;
        }
        String result = DEFAULT_RESULT_ERROR_VALUE;
        if (e instanceof RpcException) {
            RpcException rpcException = (RpcException) e;
            if (rpcException.isTimeout()) {
                result = "timeoutError";
            }
            if (rpcException.isBiz()) {
                result = "bisError";
            }
            if (rpcException.isNetwork()) {
                result = "networkError";
            }
            if (rpcException.isSerialization()) {
                result = "serializationError";
            }
        }
        return result;
    }

    private String getExceptionTag(Throwable throwable) {
        if (throwable == null) {
            return DEFAULT_EXCEPTION_VALUE;
        }
        if (throwable.getCause() == null) {
            return throwable.getClass().getSimpleName();
        }
        return throwable.getCause().getClass().getSimpleName();
    }

    public void setSimpleMeterRegistry(MeterRegistry simpleMeterRegistry) {
        this.simpleMeterRegistry = simpleMeterRegistry;
    }
}
  1. 然后创建两个文件 org.apache.dubbo.common.threadpool.ThreadPoolorg.apache.dubbo.rpc.Filter 并放到 META-INF/dubbo 下:

    META-INF
    |--dubbo
    |----org.apache.dubbo.common.threadpool.ThreadPool
    |----org.apache.dubbo.rpc.Filter
    
  2. 文件 org.apache.dubbo.common.threadpool.ThreadPool 如下:

    fixedExt=me.howard.spring.boot.dubbo.triple.provider.config.DubboMetricsThreadPool
    
  3. 文件 org.apache.dubbo.rpc.Filter 如下:

    metrics1=me.howard.spring.boot.dubbo.triple.provider.filter.DubboMetricsFilter
    

在 Grafana 上看到 Dubbo 请求相关指标数据:
Provider 端:


配置:

# 请求吞吐量
sum(rate(dubbo_requests_latency_seconds_count{application="xxx-service", instance="pushgateway", side="provider"}[1m]))
# 请求响应时间平均值
sum(rate(dubbo_requests_latency_seconds_sum{application="xxx-service", instance="pushgateway", result="success", side="provider"}[1m])) / sum(rate(dubbo_requests_latency_seconds_count{application="xxx-service", instance="pushgateway", result="success", side="provider"}[1m]))
# 请求响应时最大值
max(dubbo_requests_latency_seconds_max{application="xxx-service", instance="pushgateway", result="success", side="provider"})
# 请求响应时间最大值 TOP5
topk(5, max(dubbo_requests_latency_seconds_max{application="xxx-service", instance="pushgateway", result="success", side="provider"}) by (id))
# 请求吞吐量 TOP5
topk(5, sum(rate(dubbo_requests_latency_seconds_count{application="xxx-service", instance="pushgateway", side="provider"}[1m])) by (id))
# 请求错误数
sum(rate(dubbo_requests_latency_seconds_count{application="xxx-service", instance="pushgateway", result!="success", side="provider"}[1m]))
# Dubbo Thread Pool 使用情况, 当前活动的线程数
dubbo_threads_active_count{application="xxx-service", instance="pushgateway"}
# Dubbo Thread Pool 使用情况, 当前线程数
dubbo_threads_pool_size{application="xxx-service", instance="pushgateway"}
# Dubbo Thread Pool 使用情况, 目前线程池已使用最大的线程数
dubbo_threads_largest_pool_size{application="xxx-service", instance="pushgateway"}
# Dubbo Thread Pool 使用情况, 配置最大的线程数
dubbo_threads_pool_max_size{application="xxx-service", instance="pushgateway"}

Consumer 端:

配置:见上面 Provdier 的配置,只是条件改为 consumer

Tomcat Metrics

在 application.yml 添加如下配置,开启 tomcat 相关指标的监控

server:
  tomcat:
    mbeanregistry:
      enabled: true

TomcatMetrics

Spring Actuator 引用 micrometer 工具来采集指标数据

//  Tomcat 的 Work Threads Pool 的使用情况的相关指标
private void registerThreadPoolMetrics(MeterRegistry registry) {
  registerMetricsEventually(":type=ThreadPool,name=*", (name, allTags) -> {
    // 配置 Tomcat 的 最大线程数量  --server.tomcat.threads.max
    Gauge.builder("tomcat.threads.config.max", mBeanServer,
                  s -> safeDouble(() -> s.getAttribute(name, "maxThreads")))
      .tags(allTags)
      .baseUnit(BaseUnits.THREADS)
      .register(registry);
    // 当前正在处理业务的线程数量
    Gauge.builder("tomcat.threads.busy", mBeanServer,
                  s -> safeDouble(() -> s.getAttribute(name, "currentThreadsBusy")))
      .tags(allTags)
      .baseUnit(BaseUnits.THREADS)
      .register(registry);
    // 当前的线程数,包括 正在处理业务的线程 + 空闲的线程数
    Gauge.builder("tomcat.threads.current", mBeanServer,
                  s -> safeDouble(() -> s.getAttribute(name, "currentThreadCount")))
      .tags(allTags)
      .baseUnit(BaseUnits.THREADS)
      .register(registry);
        // 当前 Tomcat 接受的请求的连接数
    Gauge.builder("tomcat.connections.current", mBeanServer,
                  s -> safeDouble(() -> s.getAttribute(name, "connectionCount")))
      .tags(allTags)
      .baseUnit(BaseUnits.CONNECTIONS)
      .register(registry);
        // 当前 Tomcat keep-alive 的数量
    Gauge.builder("tomcat.connections.keepalive.current", mBeanServer,
                  s -> safeDouble(() -> s.getAttribute(name, "keepAliveCount")))
      .tags(allTags)
      .baseUnit(BaseUnits.CONNECTIONS)
      .register(registry);
        // 配置 Tomcat 的 最大连接数量  --server.tomcat.max-connections
    Gauge.builder("tomcat.connections.config.max", mBeanServer,
                  s -> safeDouble(() -> s.getAttribute(name, "maxConnections")))
      .tags(allTags)
      .baseUnit(BaseUnits.CONNECTIONS)
      .register(registry);
  });
}

// Tomcat Request 使用情况相关指标
private void registerGlobalRequestMetrics(MeterRegistry registry) {
        registerMetricsEventually(":type=GlobalRequestProcessor,name=*", (name, allTags) -> {
            // 当前已发送的总字节大小
            FunctionCounter.builder("tomcat.global.sent", mBeanServer,
                s -> safeDouble(() -> s.getAttribute(name, "bytesSent")))
                .tags(allTags)
                .baseUnit(BaseUnits.BYTES)
                .register(registry);
                        // 当前已接收的总字节大小
            FunctionCounter.builder("tomcat.global.received", mBeanServer,
                s -> safeDouble(() -> s.getAttribute(name, "bytesReceived")))
                .tags(allTags)
                .baseUnit(BaseUnits.BYTES)
                .register(registry);
                        // 统计请求的错误数
            FunctionCounter.builder("tomcat.global.error", mBeanServer,
                    s -> safeDouble(() -> s.getAttribute(name, "errorCount")))
                    .tags(allTags)
                    .register(registry);
                        // 当前请求的总数
            FunctionTimer.builder("tomcat.global.request", mBeanServer,
                    s -> safeLong(() -> s.getAttribute(name, "requestCount")),
                    s -> safeDouble(() -> s.getAttribute(name, "processingTime")), TimeUnit.MILLISECONDS)
                    .tags(allTags)
                    .register(registry);
                        // 当前请求的最大耗时 毫秒
            TimeGauge.builder("tomcat.global.request.max", mBeanServer, TimeUnit.MILLISECONDS,
                    s -> safeDouble(() -> s.getAttribute(name, "maxTime")))
                    .tags(allTags)
                    .register(registry);
        });
    }

在 Grafana 上看到 Tomcat 相关的指标数据:

配置:

# Request 吞吐量
sum(rate(http_server_requests_seconds_count{application="user-app", instance="pushgateway"}[1m]))
# Request 错误数
sum(rate(http_server_requests_seconds_count{application="user-app", instance="pushgateway", status=~"5.."}[1m]))
# Request 响应时间平均值
sum(rate(http_server_requests_seconds_sum{application="user-app", instance="pushgateway", status!~"5.."}[1m]))/sum(rate(http_server_requests_seconds_count{application="user-app", instance="pushgateway", status!~"5.."}[1m]))
# Reqeust 响应时间最大值
max(http_server_requests_seconds_max{application="user-app", instance="pushgateway", status!~"5.."})
# Tomcat 线程池,当前正在活动的线程数
tomcat_threads_busy_threads{application="user-app", instance="pushgateway"}
# Tomcat 线程池,当前线程数量
tomcat_threads_current_threads{application="user-app", instance="pushgateway"}
# Tomcat 线程池最大线程数
tomcat_threads_config_max_threads{application="user-app", instance="pushgateway"}

JVM Metrics

默认是开启 JVM 相关指标的监控

JvmThreadMetrics类 :JVM 线程使用情况

public class JvmThreadMetrics implements MeterBinder {

    @Override
    public void bindTo(MeterRegistry registry) {
        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
                // 当前 JVM 使用的线程数最大峰值
        Gauge.builder("jvm.threads.peak", threadBean, ThreadMXBean::getPeakThreadCount)
                .tags(tags)
                .description("The peak live thread count since the Java virtual machine started or peak was reset")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
                // 当前 JVM 的 正在活动的daemon线程数
        Gauge.builder("jvm.threads.daemon", threadBean, ThreadMXBean::getDaemonThreadCount)
                .tags(tags)
                .description("The current number of live daemon threads")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
                // 当前 JVM 正在活动的线程数,包括 daemon 和 nono-daemon
        Gauge.builder("jvm.threads.live", threadBean, ThreadMXBean::getThreadCount)
                .tags(tags)
                .description("The current number of live threads including both daemon and non-daemon threads")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);

        try {
            threadBean.getAllThreadIds();
            // 统计当前 JVM 处于各种线程状态的线程数,如 RUNNABLE、WAITING、TIMED_WAITING、BLOCKED、TERMINATED等
            for (Thread.State state : Thread.State.values()) {
                Gauge.builder("jvm.threads.states", threadBean, (bean) -> getThreadStateCount(bean, state))
                        .tags(Tags.concat(tags, "state", getStateTagValue(state)))
                        .description("The current number of threads having " + state + " state")
                        .baseUnit(BaseUnits.THREADS)
                        .register(registry);
            }
        } catch (Error error) {
            // An error will be thrown for unsupported operations
            // e.g. SubstrateVM does not support getAllThreadIds
        }
    }
}

在 Grafana 上配置:

# 正在活动的线程数,包括 daemon 和 nono-daemon
jvm_threads_live_threads{application="user-app", instance="pushgateway"}
# 正在活动的daemon线程数
jvm_threads_daemon_threads{application="user-app", instance="pushgateway"}
# 目前为止 JVM 使用的线程数最大峰值
jvm_threads_peak_threads{application="user-app", instance="pushgateway"}
# 各线程状态汇总 {{state}}
jvm_threads_states_threads{application="user-app", instance="pushgateway"}

JvmMemoryMetrics 类:JVM 内存使用情况

public class JvmMemoryMetrics implements MeterBinder {

    @Override
    public void bindTo(MeterRegistry registry) {
        // 缓冲区(包括 direct buffer 和 mapped buffer)
        for (BufferPoolMXBean bufferPoolBean : ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class)) {
            Iterable<Tag> tagsWithId = Tags.concat(tags, "id", bufferPoolBean.getName());
                        // 缓冲数量
            Gauge.builder("jvm.buffer.count", bufferPoolBean, BufferPoolMXBean::getCount)
                    .tags(tagsWithId)
                    .description("An estimate of the number of buffers in the pool")
                    .baseUnit(BaseUnits.BUFFERS)
                    .register(registry);
                        // 缓冲区已占用内存大小
            Gauge.builder("jvm.buffer.memory.used", bufferPoolBean, BufferPoolMXBean::getMemoryUsed)
                .tags(tagsWithId)
                .description("An estimate of the memory that the Java virtual machine is using for this buffer pool")
                .baseUnit(BaseUnits.BYTES)
                .register(registry);
                        // 缓冲区总容量
            Gauge.builder("jvm.buffer.total.capacity", bufferPoolBean, BufferPoolMXBean::getTotalCapacity)
                .tags(tagsWithId)
                .description("An estimate of the total capacity of the buffers in this pool")
                .baseUnit(BaseUnits.BYTES)
                .register(registry);
        }

        // 堆内存: PS Eden Space、PS Old Space、PS Survivor Space
        // 非堆内存:Metaspace、Compressed Class Space、Code Cache
        for (MemoryPoolMXBean memoryPoolBean : ManagementFactory.getPlatformMXBeans(MemoryPoolMXBean.class)) {
            String area = MemoryType.HEAP.equals(memoryPoolBean.getType()) ? "heap" : "nonheap";
            Iterable<Tag> tagsWithId = Tags.concat(tags, "id", memoryPoolBean.getName(), "area", area);
                        // 已使用的大小
            Gauge.builder("jvm.memory.used", memoryPoolBean, (mem) -> getUsageValue(mem, MemoryUsage::getUsed))
                .tags(tagsWithId)
                .description("The amount of used memory")
                .baseUnit(BaseUnits.BYTES)
                .register(registry);
                        // JVM 保证还可以使用的大小
            // The committed size is the amount of memory guaranteed to be available for use by the Java virtual machine. 
            // The committed memory size is always greater than or equal to the used size.
            Gauge.builder("jvm.memory.committed", memoryPoolBean, (mem) -> getUsageValue(mem, MemoryUsage::getCommitted))
                .tags(tagsWithId)
                .description("The amount of memory in bytes that is committed for the Java virtual machine to use")
                .baseUnit(BaseUnits.BYTES)
                .register(registry);
                        // 设置 最大值
            Gauge.builder("jvm.memory.max", memoryPoolBean, (mem) -> getUsageValue(mem, MemoryUsage::getMax))
                .tags(tagsWithId)
                .description("The maximum amount of memory in bytes that can be used for memory management")
                .baseUnit(BaseUnits.BYTES)
                .register(registry);
        }
    }

}

在 Grafana 上查看指标:

配置:

# 堆内存使用率
sum(jvm_memory_used_bytes{application="user-app", instance="pushgateway", area="heap"})*100/sum(jvm_memory_max_bytes{application="user-app",instance="pushgateway", area="heap"})
# 非堆内存使用率
sum(jvm_memory_used_bytes{application="user-app", instance="pushgateway", area="nonheap"})*100/sum(jvm_memory_max_bytes{application="user-app",instance="pushgateway", area="nonheap"})
# 堆内存已使用大小
sum(jvm_memory_used_bytes{application="user-app", instance="pushgateway", area="heap"})
# 堆内存可使用大小
sum(jvm_memory_committed_bytes{application="user-app", instance="pushgateway", area="heap"})
# 堆内存配置最大值
sum(jvm_memory_max_bytes{application="user-app", instance="pushgateway", area="heap"})
# 非堆内存已使用大小
sum(jvm_memory_used_bytes{application="user-app", instance="pushgateway", area="nonheap"})
# 非堆内存可使用大小
sum(jvm_memory_committed_bytes{application="user-app", instance="pushgateway", area="nonheap"})
# 非堆内存设置最大大小
sum(jvm_memory_max_bytes{application="user-app", instance="pushgateway", area="nonheap"})

# 直接内存(堆外内存)buffer 数量
jvm_buffer_count_buffers{application="user-app", instance="pushgateway", id="direct"}
# 直接内存(堆外内存)已使用大小
jvm_buffer_memory_used_bytes{application="user-app", instance="pushgateway", id="direct"}
# 直接内存(堆外内存)总容量
jvm_buffer_total_capacity_bytes{application="user-app", instance="pushgateway", id="direct"}

# Mapped Buffer 数量
jvm_buffer_count_buffers{application="user-app", instance="pushgateway", id="mapped"}
# Mapped Buffer 已使用大小
jvm_buffer_memory_used_bytes{application="user-app", instance="pushgateway", id="mapped"}
# Mapped Buffer 总容量
jvm_buffer_total_capacity_bytes{application="user-app", instance="pushgateway", id="mapped"}

# 新生代 Eden 区
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"PS Eden Space"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"PS Eden Space"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"PS Eden Space"}

# 老年代
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"PS Old Gen"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"PS Old Gen"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"PS Old Gen"}

# 新生代 Surivor 区
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"PS Survivor Space"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"PS Survivor Space"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"PS Survivor Space"}

# Metaspace 区(方法区)
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"Metaspace"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"Metaspace"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"Metaspace"}

# UseCompressedOops 压缩 Class 指针空间
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"Compressed Class Space"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"Compressed Class Space"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"Compressed Class Space"}

# Code Cache 包括 JIT编译后的机器码,或 Java 使用 JNI 调用
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"Code Cache"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"Code Cache"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"Code Cache"}

JvmGcMetrics类: GC 使用情况指标

public class JvmGcMetrics implements MeterBinder, AutoCloseable {
    @Override
    public void bindTo(MeterRegistry registry) {
        if (!this.managementExtensionsPresent) {
            return;
        }
                
        double maxLongLivedPoolBytes = getLongLivedHeapPools().mapToDouble(mem -> getUsageValue(mem, MemoryUsage::getMax)).sum();

        AtomicLong maxDataSize = new AtomicLong((long) maxLongLivedPoolBytes);
        // 堆内存最大大小
        Gauge.builder("jvm.gc.max.data.size", maxDataSize, AtomicLong::get)
            .tags(tags)
            .description("Max size of long-lived heap memory pool")
            .baseUnit(BaseUnits.BYTES)
            .register(registry);

        AtomicLong liveDataSize = new AtomicLong();
                // 回收后 堆内存大小
        Gauge.builder("jvm.gc.live.data.size", liveDataSize, AtomicLong::get)
            .tags(tags)
            .description("Size of long-lived heap memory pool after reclamation")
            .baseUnit(BaseUnits.BYTES)
            .register(registry);
                // 新生代内存池 GC 情况
        Counter allocatedBytes = Counter.builder("jvm.gc.memory.allocated").tags(tags)
            .baseUnit(BaseUnits.BYTES)
            .description("Incremented for an increase in the size of the (young) heap memory pool after one GC to before the next")
            .register(registry);
                // 晋升老年代的内存正增加的次数
        Counter promotedBytes = (isGenerationalGc) ? Counter.builder("jvm.gc.memory.promoted").tags(tags)
                    .baseUnit(BaseUnits.BYTES)
                    .description("Count of positive increases in the size of the old generation memory pool before GC to after GC")
                    .register(registry) : null;

        final AtomicLong allocationPoolSizeAfter = new AtomicLong(0L);

        for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
            if (!(mbean instanceof NotificationEmitter)) {
                continue;
            }
            NotificationListener notificationListener = (notification, ref) -> {
                CompositeData cd = (CompositeData) notification.getUserData();
                GarbageCollectionNotificationInfo notificationInfo = GarbageCollectionNotificationInfo.from(cd);

                String gcCause = notificationInfo.getGcCause();
                String gcAction = notificationInfo.getGcAction();
                GcInfo gcInfo = notificationInfo.getGcInfo();
                long duration = gcInfo.getDuration();
                if (isConcurrentPhase(gcCause, notificationInfo.getGcName())) {
                   // GC 并发收集阶段情况
                    Timer.builder("jvm.gc.concurrent.phase.time")
                            .tags(tags)
                            .tags("action", gcAction, "cause", gcCause)
                            .description("Time spent in concurrent phase")
                            .register(registry)
                            .record(duration, TimeUnit.MILLISECONDS);
                } else {
                    // GC 暂停情况
                    Timer.builder("jvm.gc.pause")
                            .tags(tags)
                            .tags("action", gcAction, "cause", gcCause)
                            .description("Time spent in GC pause")
                            .register(registry)
                            .record(duration, TimeUnit.MILLISECONDS);
                }

                final Map<String, MemoryUsage> before = gcInfo.getMemoryUsageBeforeGc();
                final Map<String, MemoryUsage> after = gcInfo.getMemoryUsageAfterGc();

                countPoolSizeDelta(before, after, allocatedBytes, allocationPoolSizeAfter, allocationPoolName);

                final long longLivedBefore = longLivedPoolNames.stream().mapToLong(pool -> before.get(pool).getUsed()).sum();
                final long longLivedAfter = longLivedPoolNames.stream().mapToLong(pool -> after.get(pool).getUsed()).sum();
                if (isGenerationalGc) {
                    final long delta = longLivedAfter - longLivedBefore;
                    if (delta > 0L) {
                        promotedBytes.increment(delta);
                    }
                }

                // Some GC implementations such as G1 can reduce the old gen size as part of a minor GC. To track the
                // live data size we record the value if we see a reduction in the old gen heap size or
                // after a major GC.
                if (longLivedAfter < longLivedBefore || isMajorGc(notificationInfo.getGcName())) {
                    liveDataSize.set(longLivedAfter);
                    maxDataSize.set(longLivedPoolNames.stream().mapToLong(pool -> after.get(pool).getMax()).sum());
                }
            };
            NotificationEmitter notificationEmitter = (NotificationEmitter) mbean;
            notificationEmitter.addNotificationListener(notificationListener, notification -> notification.getType().equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION), null);
            notificationListenerCleanUpRunnables.add(() -> {
                try {
                    notificationEmitter.removeNotificationListener(notificationListener);
                } catch (ListenerNotFoundException ignore) {
                }
            });
        }
    }

}

在 Grafana 上查看指标:

配置:

# GC 暂停次数/秒
rate(jvm_gc_pause_seconds_count{application="user-app", instance="pushgateway"}[1m])
# GC 暂停平均时间
rate(jvm_gc_pause_seconds_sum{application="$application", instance="$instance"}[1m])/rate(jvm_gc_pause_seconds_count{application="$application", instance="$instance"}[1m])
# GC 暂停最大时间
jvm_gc_pause_seconds_max{application="$application", instance="$instance"}

# 在一次 GC之后到下一次 GC 之前,年轻代增加的大小
rate(jvm_gc_memory_allocated_bytes_total{application="user-app", instance="pushgateway"}[1m])
# 在一次 GC之前到 GC 之后 老年代内存池正增加的次数
rate(jvm_gc_memory_promoted_bytes_total{application="user-app", instance="pushgateway"}[1m])

ClassLoaderMetrics 类加载器的使用情况

public class ClassLoaderMetrics implements MeterBinder {
    @Override
    public void bindTo(MeterRegistry registry) {
        ClassLoadingMXBean classLoadingBean = ManagementFactory.getClassLoadingMXBean();
                // 当前类加载器加载类的数量
        Gauge.builder("jvm.classes.loaded", classLoadingBean, ClassLoadingMXBean::getLoadedClassCount)
                .tags(tags)
                .description("The number of classes that are currently loaded in the Java virtual machine")
                .baseUnit(BaseUnits.CLASSES)
                .register(registry);
                // 自从 JVM 启动后类加载器卸载类的数量
        FunctionCounter.builder("jvm.classes.unloaded", classLoadingBean, ClassLoadingMXBean::getUnloadedClassCount)
                .tags(tags)
                .description("The total number of classes unloaded since the Java virtual machine has started execution")
                .baseUnit(BaseUnits.CLASSES)
                .register(registry);
    }
}

在 Grafana 上查看指标:

配置:

# 已加载类的数量
jvm_classes_loaded_classes{application="user-app", instance="pushgateway"}

# 类加载数量的变化
# delta function 表示在指定的时间范围内的指标的差值。如下面的示例:类加载器加载类的数量在当前时间和1分钟之间的数量变化值
# delta should only be used with gauges.
delta(jvm_classes_loaded_classes{application="user-app",instance="pushgateway"}[1m])

JvmCompilationMetrics类: JVM 编译时的情况

public class JvmCompilationMetrics implements MeterBinder {
    @Override
    public void bindTo(MeterRegistry registry) {
        CompilationMXBean compilationBean = ManagementFactory.getCompilationMXBean();
        if (compilationBean != null && compilationBean.isCompilationTimeMonitoringSupported()) {
            // JVM 在编译时所耗费的时间
            FunctionCounter.builder("jvm.compilation.time", compilationBean, CompilationMXBean::getTotalCompilationTime)
                    .tags(Tags.concat(tags, "compiler", compilationBean.getName()))
                    .description("The approximate accumulated elapsed time spent in compilation")
                    .baseUnit(BaseUnits.MILLISECONDS)
                    .register(registry);
        }
    }
}

ExecutorServiceMetrics类:线程池使用情况

public class ExecutorServiceMetrics implements MeterBinder {

    // 是针对 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor
    private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {
        if (tp == null) {
            return;
        }
                // 线程池中已完成的任务数量
        FunctionCounter.builder(metricPrefix + "executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount)
                .tags(tags)
                .description("The approximate total number of tasks that have completed execution")
                .baseUnit(BaseUnits.TASKS)
                .register(registry);
                // 当前线程池中正在处理任务的线程数量
        Gauge.builder(metricPrefix + "executor.active", tp, ThreadPoolExecutor::getActiveCount)
                .tags(tags)
                .description("The approximate number of threads that are actively executing tasks")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
                // 当前线程池中,任务队列中任务数量
        Gauge.builder(metricPrefix + "executor.queued", tp, tpRef -> tpRef.getQueue().size())
                .tags(tags)
                .description("The approximate number of tasks that are queued for execution")
                .baseUnit(BaseUnits.TASKS)
                .register(registry);
                // 当前线程池中,任务队列剩余的空间大小,如:有界队列
        Gauge.builder(metricPrefix + "executor.queue.remaining", tp, tpRef -> tpRef.getQueue().remainingCapacity())
                .tags(tags)
                .description("The number of additional elements that this queue can ideally accept without blocking")
                .baseUnit(BaseUnits.TASKS)
                .register(registry);
                // 当前线程池中,已创建的线程的数量
        Gauge.builder(metricPrefix + "executor.pool.size", tp, ThreadPoolExecutor::getPoolSize)
                .tags(tags)
                .description("The current number of threads in the pool")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
                // 线程池配置的core线程数量
        Gauge.builder(metricPrefix + "executor.pool.core", tp, ThreadPoolExecutor::getCorePoolSize)
                .tags(tags)
                .description("The core number of threads for the pool")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
                // 线程池配置的最大线程的数量
        Gauge.builder(metricPrefix + "executor.pool.max", tp, ThreadPoolExecutor::getMaximumPoolSize)
                .tags(tags)
                .description("The maximum allowed number of threads in the pool")
                .baseUnit(BaseUnits.THREADS)
                .register(registry);
    }

    // 针对 ForkJoinPool
    private void monitor(MeterRegistry registry, ForkJoinPool fj) {
        // 该线程池中发生窃取其它任务处理的任务的数量
        FunctionCounter.builder(metricPrefix + "executor.steals", fj, ForkJoinPool::getStealCount)
                .tags(tags)
                .description("Estimate of the total number of tasks stolen from " +
                        "one thread's work queue by another. The reported value " +
                        "underestimates the actual total number of steals when the pool " +
                        "is not quiescent")
                .register(registry);
                // 评估该线程池任务队列中的任务数量
        Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount)
                .tags(tags)
                .description("An estimate of the total number of tasks currently held in queues by worker threads")
                .register(registry);
                // 评估该线程池中正在处理任务的线程数量
        Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount)
                .tags(tags)
                .description("An estimate of the number of threads that are currently stealing or executing tasks")
                .register(registry);
                // 评估该线程池中正在处理任务的线程数量(不包括线程状态为:blocked 和 waiting 的线程)
        Gauge.builder(metricPrefix + "executor.running", fj, ForkJoinPool::getRunningThreadCount)
                .tags(tags)
                .description("An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization threads")
                .register(registry);
    }
}

DiskSpaceMetrics类:磁盘使用情况

public class DiskSpaceMetrics implements MeterBinder {

    @Override
    public void bindTo(MeterRegistry registry) {
        Iterable<Tag> tagsWithPath = Tags.concat(tags, "path", absolutePath);
        // 磁盘可用空间大小
        Gauge.builder("disk.free", path, File::getUsableSpace)
                .tags(tagsWithPath)
                .description("Usable space for path")
                .baseUnit(BaseUnits.BYTES)
                .strongReference(true)
                .register(registry);
        // 已使用磁盘的空间大小
        Gauge.builder("disk.total", path, File::getTotalSpace)
                .tags(tagsWithPath)
                .description("Total space for path")
                .baseUnit(BaseUnits.BYTES)
                .strongReference(true)
                .register(registry);
    }
}

System Metrics

FileDescriptorMetrics类:文件描述符使用情况

表示该 JVM 进程中打开文件描述符的使用情况,包括:socket file 和 file 等

public class FileDescriptorMetrics implements MeterBinder {

    @Override
    public void bindTo(MeterRegistry registry) {
        if (openFilesMethod != null) {
            // 表示该进程已打开的文件描述符数量
            Gauge.builder("process.files.open", osBean, x -> invoke(openFilesMethod))
                    .tags(tags)
                    .description("The open file descriptor count")
                    .baseUnit(BaseUnits.FILES)
                    .register(registry);
        }
                // 表示该进程支持打开最大文件描述符数量
        if (maxFilesMethod != null) {
            Gauge.builder("process.files.max", osBean, x -> invoke(maxFilesMethod))
                    .tags(tags)
                    .description("The maximum file descriptor count")
                    .baseUnit(BaseUnits.FILES)
                    .register(registry);
        }
    }
}

在 Grafana 上查看指标:

配置:

# 打开 FD 数量
process_files_open_files{application="user-app", instance="pushgateway"}
# 支持最大的 FD 数量
process_files_max_files{application="user-app", instance="pushgateway"}

UptimeMetrics类:运行时间指标

public class UptimeMetrics implements MeterBinder {

    @Override
    public void bindTo(MeterRegistry registry) {
        // 该进程运行时长
        TimeGauge.builder("process.uptime", runtimeMXBean, TimeUnit.MILLISECONDS, RuntimeMXBean::getUptime)
            .tags(tags)
            .description("The uptime of the Java virtual machine")
            .register(registry);
                // 该进程启动时间
        TimeGauge.builder("process.start.time", runtimeMXBean, TimeUnit.MILLISECONDS, RuntimeMXBean::getStartTime)
            .tags(tags)
            .description("Start time of the process since unix epoch.")
            .register(registry);
    }
}

在 Grafana 上查看指标:

配置:

# 运行时长
process_uptime_seconds{application="user-app", instance="pushgateway"}
# 启动时间
process_start_time_seconds{application="user-app", instance="pushgateway"}*1000

ProcessorMetrics类:进程 CPU 的使用情况

public class ProcessorMetrics implements MeterBinder {

    @Override
    public void bindTo(MeterRegistry registry) {
        Runtime runtime = Runtime.getRuntime();
        // 当前 JVM 环境中可使用的CPU 核数
        Gauge.builder("system.cpu.count", runtime, Runtime::availableProcessors)
            .tags(tags)
            .description("The number of processors available to the Java virtual machine")
            .register(registry);
            
        if (operatingSystemBean.getSystemLoadAverage() >= 0) {
            // 当前操作系统的负载情况
            Gauge.builder("system.load.average.1m", operatingSystemBean, OperatingSystemMXBean::getSystemLoadAverage)
                .tags(tags)
                .description("The sum of the number of runnable entities queued to available processors and the number " +
                    "of runnable entities running on the available processors averaged over a period of time")
                .register(registry);
        }

        if (systemCpuUsage != null) {
            // 操作系统 占用 CPU 使用率
            Gauge.builder("system.cpu.usage", operatingSystemBean, x -> invoke(systemCpuUsage))
                .tags(tags)
                .description("The \"recent cpu usage\" for the whole system")
                .register(registry);
        }

        if (processCpuUsage != null) {
            // 该进程 占用 CPU 使用率
            Gauge.builder("process.cpu.usage", operatingSystemBean, x -> invoke(processCpuUsage))
                .tags(tags)
                .description("The \"recent cpu usage\" for the Java Virtual Machine process")
                .register(registry);
        }
    }
}

在 Grafana 上查看指标:

# 操作系统 占用 CPU 率
system_cpu_usage{application="user-app", instance="pushgateway"}
# 进程 占用 CPU 率
process_cpu_usage{application="user-app", instance="pushgateway"}
# 
avg_over_time(process_cpu_usage{application="user-app", instance="pushgateway"}[1h])
# 平均1分钟的负载情况
system_load_average_1m{application="user-app", instance="pushgateway"}
# CPU 核数
system_cpu_count{application="user-app", instance="pushgateway"}

DB Connection Pool

数据库连接池使用的是阿里的 Druid,下面先看 Spring Actuator 的源码是如何采集 Connection Pool 指标。

DataSourcePoolMetrics类:针对数据库连接池的使用情况

public class DataSourcePoolMetrics implements MeterBinder {

    private final DataSource dataSource;

    @Override
    public void bindTo(MeterRegistry registry) {
        if (this.metadataProvider.getDataSourcePoolMetadata(this.dataSource) != null) {
      // 活动的连接数
            bindPoolMetadata(registry, "active",
                    "Current number of active connections that have been allocated from the data source.",
                    DataSourcePoolMetadata::getActive);
      // 当前正在空闲的连接数
            bindPoolMetadata(registry, "idle", "Number of established but idle connections.",
                    DataSourcePoolMetadata::getIdle);
      // 配置连接池的最大连接数
            bindPoolMetadata(registry, "max",
                    "Maximum number of active connections that can be allocated at the same time.",
                    DataSourcePoolMetadata::getMax);
            bindPoolMetadata(registry, "min", "Minimum number of idle connections in the pool.",
                    DataSourcePoolMetadata::getMin);
        }
    }
    // 数据源的元数据信息提供者
    private static class CachingDataSourcePoolMetadataProvider implements DataSourcePoolMetadataProvider {
        // Spring 容器中的 DataSource 列表
        private static final Map<DataSource, DataSourcePoolMetadata> cache = new ConcurrentReferenceHashMap<>();

        private final DataSourcePoolMetadataProvider metadataProvider;

        CachingDataSourcePoolMetadataProvider(DataSourcePoolMetadataProvider metadataProvider) {
            this.metadataProvider = metadataProvider;
        }

        <N extends Number> Function<DataSource, N> getValueFunction(Function<DataSourcePoolMetadata, N> function) {
            return (dataSource) -> function.apply(getDataSourcePoolMetadata(dataSource));
        }
        // 获取 DataSource 的元数据信息
        @Override
        public DataSourcePoolMetadata getDataSourcePoolMetadata(DataSource dataSource) {
            DataSourcePoolMetadata metadata = cache.get(dataSource);
            if (metadata == null) {
                metadata = this.metadataProvider.getDataSourcePoolMetadata(dataSource);
                cache.put(dataSource, metadata);
            }
            return metadata;
        }
    }
}

我们需要把 Druid 连接池相关的 DataSource 元数据信息注册到 Spring 容器中,这样就可以采集 DruidDataSource 相关的指标数据。

Spring 为我们提供了强大扩展能力,才使我们很方便集成第三方组件。

@Configuration(proxyBeanMethods = false)
public class DataSourcePoolMetadataProvidersConfiguration {

    public @Bean
    DataSourcePoolMetadataProvider druidDataSourceMetadataProvider() {
        return dataSource -> {
            DruidDataSource druidDataSource = DataSourceUnwrapper.unwrap(dataSource, DruidDataSource.class);
            if (druidDataSource != null) {
                return new DruidDataSourcePoolMetadata(druidDataSource);
            }
            return null;
        };
    }

}

public class DruidDataSourcePoolMetadata extends AbstractDataSourcePoolMetadata<DruidDataSource> {

    public DruidDataSourcePoolMetadata(DruidDataSource dataSource) {
        super(dataSource);
    }

    @Override
    public Integer getActive() {
        return getDataSource().getActiveCount();
    }

    @Override
    public Integer getMax() {
        return getDataSource().getMaxActive();
    }

    @Override
    public Integer getMin() {
        return getDataSource().getMinIdle();
    }

    @Override
    public String getValidationQuery() {
        return getDataSource().getValidationQuery();
    }

    @Override
    public Boolean getDefaultAutoCommit() {
        return getDataSource().isDefaultAutoCommit();
    }

    @Override
    public Integer getIdle() {
        return getDataSource().getMaxIdle();
    }
}

在 Grafana 上看到指标如下:

配置:

jdbc_connections_active{application="user-app", instance="pushgateway"}

jdbc_connections_idle{application="user-app", instance="pushgateway"}

jdbc_connections_max{application="user-app", instance="pushgateway"}

MongoDB Metrics

MongoDB Client 的配置:

@Configuration
public class MongoConfiguration {
  
    @Bean
    public MongoDatabaseFactory mongoDatabaseFactory(MeterRegistry meterRegistry) {
          MongoClientSettings settings = MongoClientSettings.builder()
                    // MongoMetricsCommandListener 采集对 MongoDB 所有操作的指标
                .addCommandListener(new MongoMetricsCommandListener(meterRegistry))
                .applyConnectionString(new ConnectionString(connectionString))
                    // MongoMetricsConnectionPoolListener 采集 Connection Pool 的指标
                .applyToConnectionPoolSettings(builder -> builder.addConnectionPoolListener(new MongoMetricsConnectionPoolListener(meterRegistry)))
                .build();
        return new SimpleMongoClientDatabaseFactory(MongoClients.create(settings), properties.getDatabase());
    }
}

MongoMetricsCommandListener类:对 MongoDB 操作请求指标

public class MongoMetricsCommandListener implements CommandListener {

    private void timeCommand(CommandEvent event, long elapsedTimeInNanoseconds) {
        // 对 MongoDB 操作指标
        Timer.builder("mongodb.driver.commands")
                .description("Timer of mongodb commands")
                .tags(tagsProvider.commandTags(event))
                .register(registry)
                .record(elapsedTimeInNanoseconds, TimeUnit.NANOSECONDS);
    }
}

MongoMetricsConnectionPoolListener类:Connection Pool 的使用情况

public class MongoMetricsConnectionPoolListener implements ConnectionPoolListener {
    @Override
    public void connectionPoolCreated(ConnectionPoolCreatedEvent event) {
        List<Meter> connectionMeters = new ArrayList<>();
        // 当前连接池中的连接数,包括:空闲和正在使用的连接
        connectionMeters.add(registerGauge(event, METRIC_PREFIX + "size",
                "the current size of the connection pool, including idle and and in-use members", poolSizes));
        // 当前连接池中正在使用的连接数
        connectionMeters.add(registerGauge(event, METRIC_PREFIX + "checkedout",
                "the count of connections that are currently in use", checkedOutCounts));
        // 当前连接池中等待获取连接的队列的大小
        connectionMeters.add(registerGauge(event, METRIC_PREFIX + "waitqueuesize",
                "the current size of the wait queue for a connection from the pool", waitQueueSizes));
        meters.put(event.getServerId(), connectionMeters);
    }
}

在 Grafana 上查看指标:

配置:

# 请求吞吐量
sum(rate(mongodb_driver_commands_seconds_count{application="user-app", instance="pushgateway"}[1m]))
# 请求响应时间 平均值
sum(rate(mongodb_driver_commands_seconds_sum{application="user-app", instance="pushgateway"}[1m]))/sum(rate(mongodb_driver_commands_seconds_count{application="user-app", instance="pushgateway"}[1m]))
# 请求响应时间 最大值
max(mongodb_driver_commands_seconds_max{application="user-app", instance="pushgateway"})
# 请求 错误数
sum(rate(mongodb_driver_commands_seconds_count{application="user-app", instance="pushgateway", status!="SUCCESS"}[1m]))

# Connection Pool 
mongodb_driver_pool_size{application="user-app", instance="pushgateway"}
mongodb_driver_pool_checkedout{application="user-app", instance="pushgateway"}
mongodb_driver_pool_waitqueuesize{application="user-app", instance="pushgateway"}

Redis Metrics

RedisCacheMetrics类:缓存命中、未命中、驱逐、GET、PUT、DELETE相关指标

public class RedisCacheMetrics extends CacheMeterBinder {
    @Override
    protected void bindImplementationSpecificMetrics(MeterRegistry registry) {
    // 删除的 key 的数量
        FunctionCounter.builder("cache.removals", this.cache, (cache) -> cache.getStatistics().getDeletes())
                .tags(getTagsWithCacheName()).description("Cache removals").register(registry);
    // 待处理获取数据的请求 key 的数量
        FunctionCounter.builder("cache.gets", this.cache, (cache) -> cache.getStatistics().getPending())
                .tags(getTagsWithCacheName()).tag("result", "pending").description("The number of pending requests")
                .register(registry);
    // Cache 花费在等待锁的时间
        TimeGauge
                .builder("cache.lock.duration", this.cache, TimeUnit.NANOSECONDS,
                        (cache) -> cache.getStatistics().getLockWaitDuration(TimeUnit.NANOSECONDS))
                .tags(getTagsWithCacheName()).description("The time the cache has spent waiting on a lock")
                .register(registry);
    }
  
  @Override
    public final void bindTo(MeterRegistry registry) {
        if (size() != null) {
            // 当前 Cache 中的 key-value 对的数目
            Gauge.builder("cache.size", cache.get(),
                    c -> {
                        Long size = size();
                        return size == null ? 0 : size;
                    })
                    .tags(tags)
                    .description("The number of entries in this cache. This may be an approximation, depending on the type of cache.")
                    .register(registry);
        }

        if (missCount() != null) {
            // 未命中的 Cache Key 的次数
            FunctionCounter.builder("cache.gets", cache.get(),
                    c -> {
                        Long misses = missCount();
                        return misses == null ? 0 : misses;
                    })
                    .tags(tags).tag("result", "miss")
                    .description("the number of times cache lookup methods have returned an uncached (newly loaded) value, or null")
                    .register(registry);
        }
                // 命中 Cache Key 的次数
        FunctionCounter.builder("cache.gets", cache.get(), c -> hitCount())
                .tags(tags).tag("result", "hit")
                .description("The number of times cache lookup methods have returned a cached value.")
                .register(registry);
                // put Cache key 的次数
        FunctionCounter.builder("cache.puts", cache.get(), c -> putCount())
                .tags(tags)
                .description("The number of entries added to the cache")
                .register(registry);

        if (evictionCount() != null) {
            // 缓存失效,被驱逐的 key 的数量
            FunctionCounter.builder("cache.evictions", cache.get(),
                    c -> {
                        Long evictions = evictionCount();
                        return evictions == null ? 0 : evictions;
                    })
                    .tags(tags)
                    .description("cache evictions")
                    .register(registry);
        }
                // 不同的 Cache 组件有不同的指标
        bindImplementationSpecificMetrics(registry);
    }

}

RocketMQ Metrics

对 Producer 和 Consumer Listener 二次封装,部分代码如下:

// 采集指标
private void record(String destination, String tag, Exception e, long duration, String side) {
  List<Tag> list = Arrays.asList(
    Tag.of("topic", destination)
    , Tag.of("messageTag", tag == null ? "none" : tag)
    , Tag.of("exception", e == null ? "none" : e.getClass().getSimpleName())
    , Tag.of("application", applicationName)
    , Tag.of("side", side));
  if (meterRegistry != null) {
    try {
      meterRegistry.timer(Constants.REQUEST_METRICS_NAME, list)
        .record(duration, TimeUnit.MILLISECONDS);
    } catch (Throwable ignore) {
      // ignore
      log.error("record metrics throw exception", ignore);
    }
  }
}

在 Grafana 上看到指标:

配置:

sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", side="producer"}[1m]))

sum(rate(mq_requests_latency_seconds_sum{application="kifs-course-service", instance="pushgateway", exception="none", side="producer"}[1m])) / sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", exception="none", side="producer"}[1m]))

max(mq_requests_latency_seconds_max{application="kifs-course-service", instance="pushgateway", exception="none", side="producer"})

sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", exception!="none", side="producer"}[1m]))

sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", side="consumer"}[1m]))

sum(rate(mongodb_driver_commands_seconds_sum{application="kifs-course-service", instance="pushgateway"}[1m]))/sum(rate(mongodb_driver_commands_seconds_count{application="kifs-course-service", instance="pushgateway"}[1m]))

max(mongodb_driver_commands_seconds_max{application="kifs-course-service", instance="pushgateway"})

sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", exception!="none", side="consumer"}[1m]))

Method Metrics

Configuration:

@Configuration
//@ConditionalOnProperty(prefix = "management.metrics.methods", name = "enable")
public class MethodMetricsConfiguration {

    @Bean
    @ConditionalOnClass(value = MeterRegistry.class)
    public MethodMetricsAspect methodMetricsAspect(MeterRegistry meterRegistry) {
        return new MethodMetricsAspect(meterRegistry, this::skipControllers);
    }


    private boolean skipControllers(ProceedingJoinPoint pjp) {
        Class<?> targetClass = pjp.getTarget().getClass();
        return targetClass.isAnnotationPresent(RestController.class) ||
                targetClass.isAnnotationPresent(Controller.class);
    }
}

Annotation:

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE, ElementType.METHOD})
public @interface Monitored {

    String value() default "";

    String[] extraTags() default {};

    double[] percentiles() default {};

    String description() default "none";

    boolean isAlert() default false;
}

Interceptor:

@Aspect
@Slf4j
public class MethodMetricsAspect {

    public static final Predicate<ProceedingJoinPoint> DONT_SKIP_ANYTHING = pjp -> false;
    public static final String DEFAULT_METHOD_METRICS_NAME = "method.invoke.timed";
    public static final String DEFAULT_EXCEPTION_TAG_VALUE = "none";
    public static final String EXCEPTION_TAG = "exception";
    private static final String DEFAULT_METHOD_DESCRIPTION = "method timer";
    private final MeterRegistry registry;
    private final Function<ProceedingJoinPoint, Iterable<Tag>> tagsBasedOnJoinPoint;
    private final Predicate<ProceedingJoinPoint> shouldSkip;

    public MethodMetricsAspect() {
        this(Metrics.globalRegistry);
    }

    public MethodMetricsAspect(MeterRegistry registry) {
        this(registry, DONT_SKIP_ANYTHING);
    }

    public MethodMetricsAspect(MeterRegistry registry,
                               Function<ProceedingJoinPoint, Iterable<Tag>> tagsBasedOnJoinPoint) {
        this(registry, tagsBasedOnJoinPoint, DONT_SKIP_ANYTHING);
    }

    public MethodMetricsAspect(MeterRegistry registry,
                               Predicate<ProceedingJoinPoint> shouldSkip) {
        this(registry,
                pjp -> Tags.of("class", pjp.getStaticPart().getSignature().getDeclaringTypeName(),
                        "method", pjp.getStaticPart().getSignature().getName()),
                shouldSkip);
    }

    public MethodMetricsAspect(MeterRegistry registry,
                               Function<ProceedingJoinPoint, Iterable<Tag>> tagsBasedOnJoinPoint,
                               Predicate<ProceedingJoinPoint> shouldSkip) {
        this.registry = registry;
        this.tagsBasedOnJoinPoint = tagsBasedOnJoinPoint;
        this.shouldSkip = shouldSkip;
    }

    @Around("execution(@me.howard.actuator.annotation.Monitored * *.*(..)))")
    public Object around(ProceedingJoinPoint pjp) throws Throwable {
        if (shouldSkip.test(pjp)) {
            return pjp.proceed();
        }
        Method method = ((MethodSignature) pjp.getSignature()).getMethod();
        Monitored monitored = method.getAnnotation(Monitored.class);
        if (monitored == null) {
            method = pjp.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes());
            monitored = method.getAnnotation(Monitored.class);
        }

        final boolean stopWhenCompleted = CompletionStage.class.isAssignableFrom(method.getReturnType());
        return handleWithTimer(pjp, monitored, stopWhenCompleted);
    }

    private Object handleWithTimer(ProceedingJoinPoint pjp, Monitored monitored, boolean stopWhenCompleted) throws Throwable {
        Timer.Sample sample = Timer.start(registry);
        if (stopWhenCompleted) {
            try {
                return ((CompletionStage<?>) pjp.proceed()).whenComplete((result, exception) -> {
                    record(pjp, monitored, sample, getExceptionTag(exception));
                });
            } catch (Exception e) {
                record(pjp, monitored, sample, e.getClass().getSimpleName());
                throw e;
            }
        }

        String exceptionClass = DEFAULT_EXCEPTION_TAG_VALUE;
        try {
            return pjp.proceed();
        } catch (Exception e) {
            exceptionClass = e.getClass().getSimpleName();
            throw e;
        } finally {
            record(pjp, monitored, sample, exceptionClass);
        }
    }

    private void record(ProceedingJoinPoint pjp, Monitored monitored, Timer.Sample sample, String exceptionClass) {
        try {
            sample.stop(Timer.builder(DEFAULT_METHOD_METRICS_NAME)
                    .description(DEFAULT_METHOD_DESCRIPTION)
                    .tags(monitored.extraTags().length % 2 == 0 ? monitored.extraTags() : new String[0])
                    .tags(EXCEPTION_TAG, exceptionClass)
                    .tags(tagsBasedOnJoinPoint.apply(pjp))
                    .tags("description", monitored.description())
                    .publishPercentiles(monitored.percentiles().length == 0 ? null : monitored.percentiles())
                    .register(registry));
        } catch (Exception e) {
            // ignore
            log.warn("metrics record exception", e);
        }
    }

    private String getExceptionTag(Throwable t) {
        if (t == null) {
            return DEFAULT_EXCEPTION_TAG_VALUE;
        }
        if (t.getCause() == null) {
            return t.getClass().getSimpleName();
        }
        return t.getCause().getClass().getSimpleName();
    }
}

后续补充其它组件

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容