Spring Boot Actuator 之 Metrics
Spring Boot Actuactor 的 Metrics 是用来收集系统的总体运行状态的指标数据。
指标类型
- 计数度量器(Counter)
- 瞬态度量器(Gauge)
- 吞吐率度量器(Meter)
- 直方图度量器(Histogram)
- 采样点分位图度量器(Quantile Summery)
监控目标组件
- Dubbo
- Tomcat
- JVM
- DB Connection Pool (Druid)
- MongoDB
- Redis
- RocketMQ
- MethodInterceptor(方法级指标)
Dubbo Metrics
- 自定义
ThreadPool
目的是对线程池运行状况进行监控。代码如下:
public class DubboMetricsThreadPool implements ThreadPool {
private MeterRegistry simpleMeterRegistry;
public static final String DUBBO_THREADS_PREFIX = "dubbo.threads";
public static final String THREAD_POOL_NAME = "fixedExt";
@Override
public Executor getExecutor(URL url) {
String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);
int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
register(executor);
return executor;
}
private void register(ThreadPoolExecutor executor) {
if (simpleMeterRegistry == null) {
return;
}
Gauge.builder(DUBBO_THREADS_PREFIX + ".active.count", executor::getActiveCount).register(simpleMeterRegistry);
Gauge.builder(DUBBO_THREADS_PREFIX + ".pool.size.current", executor::getPoolSize).register(simpleMeterRegistry);
Gauge.builder(DUBBO_THREADS_PREFIX + ".pool.size.max", executor::getMaximumPoolSize).register(simpleMeterRegistry);
}
public void setSimpleMeterRegistry(MeterRegistry simpleMeterRegistry) {
this.simpleMeterRegistry = simpleMeterRegistry;
}
}
- 自定义 Dubbo Filter。目的是对 Dubbo的调用请求进行监控。代码如下:
@Activate(group = {PROVIDER, CONSUMER})
public class DubboMetricsFilter implements Filter {
private MeterRegistry simpleMeterRegistry;
private static final String DUBBO_METRICS_NAME = "dubbo.request.latency";
public static final String EXCEPTION_TAG = "exception";
public static final String RESULT_TAG = "result";
private static final String DEFAULT_EXCEPTION_VALUE = "none";
private static final String DEFAULT_RESULT_SUCCESS_VALUE = "success";
private static final String DEFAULT_RESULT_ERROR_VALUE = "error";
private static final String SERVICE_TAG = "service";
public static final String METHOD_TAG = "method";
public static final String CODE_TAG = "code";
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (simpleMeterRegistry == null) {
return invoker.invoke(invocation);
}
Timer.Sample sample = Timer.start(simpleMeterRegistry);
try {
Result result = invoker.invoke(invocation);
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncRpcResult = (AsyncRpcResult) result;
asyncRpcResult.getResponseFuture()
.whenComplete((res, exception) -> {
record(sample, exception, invoker, invocation);
});
} else {
record(sample, null, invoker, invocation);
}
return result;
} catch (Throwable e) {
record(sample, e, invoker, invocation);
throw e;
}
}
private void record(Timer.Sample sample,
Throwable t,
Invoker<?> invoker,
Invocation invocation) {
try {
boolean isProvider = invoker.getUrl().getSide(PROVIDER).equalsIgnoreCase(PROVIDER);
String serviceName = invoker.getInterface().getName();
String methodName = RpcUtils.getMethodName(invocation);
sample.stop(Timer.builder(DUBBO_METRICS_NAME)
.tag(RESULT_TAG, getResultTag(t))
.tag(SERVICE_TAG, serviceName)
.tag(METHOD_TAG, methodName)
.tag(CODE_TAG, buildCode(serviceName, methodName))
.tag(EXCEPTION_TAG, getExceptionTag(t))
.tag(SIDE_KEY, isProvider ? PROVIDER : CONSUMER)
.register(simpleMeterRegistry));
} catch (Throwable ignore) {
// ignore
}
}
private String buildCode(String serviceName, String methodName) {
return serviceName + "." + methodName;
}
private String getResultTag(Throwable e) {
if (e == null) {
return DEFAULT_RESULT_SUCCESS_VALUE;
}
String result = DEFAULT_RESULT_ERROR_VALUE;
if (e instanceof RpcException) {
RpcException rpcException = (RpcException) e;
if (rpcException.isTimeout()) {
result = "timeoutError";
}
if (rpcException.isBiz()) {
result = "bisError";
}
if (rpcException.isNetwork()) {
result = "networkError";
}
if (rpcException.isSerialization()) {
result = "serializationError";
}
}
return result;
}
private String getExceptionTag(Throwable throwable) {
if (throwable == null) {
return DEFAULT_EXCEPTION_VALUE;
}
if (throwable.getCause() == null) {
return throwable.getClass().getSimpleName();
}
return throwable.getCause().getClass().getSimpleName();
}
public void setSimpleMeterRegistry(MeterRegistry simpleMeterRegistry) {
this.simpleMeterRegistry = simpleMeterRegistry;
}
}
-
然后创建两个文件
org.apache.dubbo.common.threadpool.ThreadPool
和org.apache.dubbo.rpc.Filter
并放到META-INF/dubbo
下:META-INF |--dubbo |----org.apache.dubbo.common.threadpool.ThreadPool |----org.apache.dubbo.rpc.Filter
-
文件
org.apache.dubbo.common.threadpool.ThreadPool
如下:fixedExt=me.howard.spring.boot.dubbo.triple.provider.config.DubboMetricsThreadPool
-
文件
org.apache.dubbo.rpc.Filter
如下:metrics1=me.howard.spring.boot.dubbo.triple.provider.filter.DubboMetricsFilter
在 Grafana 上看到 Dubbo 请求相关指标数据:
Provider 端:
配置:
# 请求吞吐量
sum(rate(dubbo_requests_latency_seconds_count{application="xxx-service", instance="pushgateway", side="provider"}[1m]))
# 请求响应时间平均值
sum(rate(dubbo_requests_latency_seconds_sum{application="xxx-service", instance="pushgateway", result="success", side="provider"}[1m])) / sum(rate(dubbo_requests_latency_seconds_count{application="xxx-service", instance="pushgateway", result="success", side="provider"}[1m]))
# 请求响应时最大值
max(dubbo_requests_latency_seconds_max{application="xxx-service", instance="pushgateway", result="success", side="provider"})
# 请求响应时间最大值 TOP5
topk(5, max(dubbo_requests_latency_seconds_max{application="xxx-service", instance="pushgateway", result="success", side="provider"}) by (id))
# 请求吞吐量 TOP5
topk(5, sum(rate(dubbo_requests_latency_seconds_count{application="xxx-service", instance="pushgateway", side="provider"}[1m])) by (id))
# 请求错误数
sum(rate(dubbo_requests_latency_seconds_count{application="xxx-service", instance="pushgateway", result!="success", side="provider"}[1m]))
# Dubbo Thread Pool 使用情况, 当前活动的线程数
dubbo_threads_active_count{application="xxx-service", instance="pushgateway"}
# Dubbo Thread Pool 使用情况, 当前线程数
dubbo_threads_pool_size{application="xxx-service", instance="pushgateway"}
# Dubbo Thread Pool 使用情况, 目前线程池已使用最大的线程数
dubbo_threads_largest_pool_size{application="xxx-service", instance="pushgateway"}
# Dubbo Thread Pool 使用情况, 配置最大的线程数
dubbo_threads_pool_max_size{application="xxx-service", instance="pushgateway"}
Consumer 端:
配置:见上面 Provdier 的配置,只是条件改为 consumer
Tomcat Metrics
在 application.yml 添加如下配置,开启 tomcat 相关指标的监控
server:
tomcat:
mbeanregistry:
enabled: true
TomcatMetrics
类
Spring Actuator 引用 micrometer 工具来采集指标数据
// Tomcat 的 Work Threads Pool 的使用情况的相关指标
private void registerThreadPoolMetrics(MeterRegistry registry) {
registerMetricsEventually(":type=ThreadPool,name=*", (name, allTags) -> {
// 配置 Tomcat 的 最大线程数量 --server.tomcat.threads.max
Gauge.builder("tomcat.threads.config.max", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "maxThreads")))
.tags(allTags)
.baseUnit(BaseUnits.THREADS)
.register(registry);
// 当前正在处理业务的线程数量
Gauge.builder("tomcat.threads.busy", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "currentThreadsBusy")))
.tags(allTags)
.baseUnit(BaseUnits.THREADS)
.register(registry);
// 当前的线程数,包括 正在处理业务的线程 + 空闲的线程数
Gauge.builder("tomcat.threads.current", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "currentThreadCount")))
.tags(allTags)
.baseUnit(BaseUnits.THREADS)
.register(registry);
// 当前 Tomcat 接受的请求的连接数
Gauge.builder("tomcat.connections.current", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "connectionCount")))
.tags(allTags)
.baseUnit(BaseUnits.CONNECTIONS)
.register(registry);
// 当前 Tomcat keep-alive 的数量
Gauge.builder("tomcat.connections.keepalive.current", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "keepAliveCount")))
.tags(allTags)
.baseUnit(BaseUnits.CONNECTIONS)
.register(registry);
// 配置 Tomcat 的 最大连接数量 --server.tomcat.max-connections
Gauge.builder("tomcat.connections.config.max", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "maxConnections")))
.tags(allTags)
.baseUnit(BaseUnits.CONNECTIONS)
.register(registry);
});
}
// Tomcat Request 使用情况相关指标
private void registerGlobalRequestMetrics(MeterRegistry registry) {
registerMetricsEventually(":type=GlobalRequestProcessor,name=*", (name, allTags) -> {
// 当前已发送的总字节大小
FunctionCounter.builder("tomcat.global.sent", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "bytesSent")))
.tags(allTags)
.baseUnit(BaseUnits.BYTES)
.register(registry);
// 当前已接收的总字节大小
FunctionCounter.builder("tomcat.global.received", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "bytesReceived")))
.tags(allTags)
.baseUnit(BaseUnits.BYTES)
.register(registry);
// 统计请求的错误数
FunctionCounter.builder("tomcat.global.error", mBeanServer,
s -> safeDouble(() -> s.getAttribute(name, "errorCount")))
.tags(allTags)
.register(registry);
// 当前请求的总数
FunctionTimer.builder("tomcat.global.request", mBeanServer,
s -> safeLong(() -> s.getAttribute(name, "requestCount")),
s -> safeDouble(() -> s.getAttribute(name, "processingTime")), TimeUnit.MILLISECONDS)
.tags(allTags)
.register(registry);
// 当前请求的最大耗时 毫秒
TimeGauge.builder("tomcat.global.request.max", mBeanServer, TimeUnit.MILLISECONDS,
s -> safeDouble(() -> s.getAttribute(name, "maxTime")))
.tags(allTags)
.register(registry);
});
}
在 Grafana 上看到 Tomcat 相关的指标数据:
配置:
# Request 吞吐量
sum(rate(http_server_requests_seconds_count{application="user-app", instance="pushgateway"}[1m]))
# Request 错误数
sum(rate(http_server_requests_seconds_count{application="user-app", instance="pushgateway", status=~"5.."}[1m]))
# Request 响应时间平均值
sum(rate(http_server_requests_seconds_sum{application="user-app", instance="pushgateway", status!~"5.."}[1m]))/sum(rate(http_server_requests_seconds_count{application="user-app", instance="pushgateway", status!~"5.."}[1m]))
# Reqeust 响应时间最大值
max(http_server_requests_seconds_max{application="user-app", instance="pushgateway", status!~"5.."})
# Tomcat 线程池,当前正在活动的线程数
tomcat_threads_busy_threads{application="user-app", instance="pushgateway"}
# Tomcat 线程池,当前线程数量
tomcat_threads_current_threads{application="user-app", instance="pushgateway"}
# Tomcat 线程池最大线程数
tomcat_threads_config_max_threads{application="user-app", instance="pushgateway"}
JVM Metrics
默认是开启 JVM 相关指标的监控
JvmThreadMetrics
类 :JVM 线程使用情况
public class JvmThreadMetrics implements MeterBinder {
@Override
public void bindTo(MeterRegistry registry) {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
// 当前 JVM 使用的线程数最大峰值
Gauge.builder("jvm.threads.peak", threadBean, ThreadMXBean::getPeakThreadCount)
.tags(tags)
.description("The peak live thread count since the Java virtual machine started or peak was reset")
.baseUnit(BaseUnits.THREADS)
.register(registry);
// 当前 JVM 的 正在活动的daemon线程数
Gauge.builder("jvm.threads.daemon", threadBean, ThreadMXBean::getDaemonThreadCount)
.tags(tags)
.description("The current number of live daemon threads")
.baseUnit(BaseUnits.THREADS)
.register(registry);
// 当前 JVM 正在活动的线程数,包括 daemon 和 nono-daemon
Gauge.builder("jvm.threads.live", threadBean, ThreadMXBean::getThreadCount)
.tags(tags)
.description("The current number of live threads including both daemon and non-daemon threads")
.baseUnit(BaseUnits.THREADS)
.register(registry);
try {
threadBean.getAllThreadIds();
// 统计当前 JVM 处于各种线程状态的线程数,如 RUNNABLE、WAITING、TIMED_WAITING、BLOCKED、TERMINATED等
for (Thread.State state : Thread.State.values()) {
Gauge.builder("jvm.threads.states", threadBean, (bean) -> getThreadStateCount(bean, state))
.tags(Tags.concat(tags, "state", getStateTagValue(state)))
.description("The current number of threads having " + state + " state")
.baseUnit(BaseUnits.THREADS)
.register(registry);
}
} catch (Error error) {
// An error will be thrown for unsupported operations
// e.g. SubstrateVM does not support getAllThreadIds
}
}
}
在 Grafana 上配置:
# 正在活动的线程数,包括 daemon 和 nono-daemon
jvm_threads_live_threads{application="user-app", instance="pushgateway"}
# 正在活动的daemon线程数
jvm_threads_daemon_threads{application="user-app", instance="pushgateway"}
# 目前为止 JVM 使用的线程数最大峰值
jvm_threads_peak_threads{application="user-app", instance="pushgateway"}
# 各线程状态汇总 {{state}}
jvm_threads_states_threads{application="user-app", instance="pushgateway"}
JvmMemoryMetrics
类:JVM 内存使用情况
public class JvmMemoryMetrics implements MeterBinder {
@Override
public void bindTo(MeterRegistry registry) {
// 缓冲区(包括 direct buffer 和 mapped buffer)
for (BufferPoolMXBean bufferPoolBean : ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class)) {
Iterable<Tag> tagsWithId = Tags.concat(tags, "id", bufferPoolBean.getName());
// 缓冲数量
Gauge.builder("jvm.buffer.count", bufferPoolBean, BufferPoolMXBean::getCount)
.tags(tagsWithId)
.description("An estimate of the number of buffers in the pool")
.baseUnit(BaseUnits.BUFFERS)
.register(registry);
// 缓冲区已占用内存大小
Gauge.builder("jvm.buffer.memory.used", bufferPoolBean, BufferPoolMXBean::getMemoryUsed)
.tags(tagsWithId)
.description("An estimate of the memory that the Java virtual machine is using for this buffer pool")
.baseUnit(BaseUnits.BYTES)
.register(registry);
// 缓冲区总容量
Gauge.builder("jvm.buffer.total.capacity", bufferPoolBean, BufferPoolMXBean::getTotalCapacity)
.tags(tagsWithId)
.description("An estimate of the total capacity of the buffers in this pool")
.baseUnit(BaseUnits.BYTES)
.register(registry);
}
// 堆内存: PS Eden Space、PS Old Space、PS Survivor Space
// 非堆内存:Metaspace、Compressed Class Space、Code Cache
for (MemoryPoolMXBean memoryPoolBean : ManagementFactory.getPlatformMXBeans(MemoryPoolMXBean.class)) {
String area = MemoryType.HEAP.equals(memoryPoolBean.getType()) ? "heap" : "nonheap";
Iterable<Tag> tagsWithId = Tags.concat(tags, "id", memoryPoolBean.getName(), "area", area);
// 已使用的大小
Gauge.builder("jvm.memory.used", memoryPoolBean, (mem) -> getUsageValue(mem, MemoryUsage::getUsed))
.tags(tagsWithId)
.description("The amount of used memory")
.baseUnit(BaseUnits.BYTES)
.register(registry);
// JVM 保证还可以使用的大小
// The committed size is the amount of memory guaranteed to be available for use by the Java virtual machine.
// The committed memory size is always greater than or equal to the used size.
Gauge.builder("jvm.memory.committed", memoryPoolBean, (mem) -> getUsageValue(mem, MemoryUsage::getCommitted))
.tags(tagsWithId)
.description("The amount of memory in bytes that is committed for the Java virtual machine to use")
.baseUnit(BaseUnits.BYTES)
.register(registry);
// 设置 最大值
Gauge.builder("jvm.memory.max", memoryPoolBean, (mem) -> getUsageValue(mem, MemoryUsage::getMax))
.tags(tagsWithId)
.description("The maximum amount of memory in bytes that can be used for memory management")
.baseUnit(BaseUnits.BYTES)
.register(registry);
}
}
}
在 Grafana 上查看指标:
配置:
# 堆内存使用率
sum(jvm_memory_used_bytes{application="user-app", instance="pushgateway", area="heap"})*100/sum(jvm_memory_max_bytes{application="user-app",instance="pushgateway", area="heap"})
# 非堆内存使用率
sum(jvm_memory_used_bytes{application="user-app", instance="pushgateway", area="nonheap"})*100/sum(jvm_memory_max_bytes{application="user-app",instance="pushgateway", area="nonheap"})
# 堆内存已使用大小
sum(jvm_memory_used_bytes{application="user-app", instance="pushgateway", area="heap"})
# 堆内存可使用大小
sum(jvm_memory_committed_bytes{application="user-app", instance="pushgateway", area="heap"})
# 堆内存配置最大值
sum(jvm_memory_max_bytes{application="user-app", instance="pushgateway", area="heap"})
# 非堆内存已使用大小
sum(jvm_memory_used_bytes{application="user-app", instance="pushgateway", area="nonheap"})
# 非堆内存可使用大小
sum(jvm_memory_committed_bytes{application="user-app", instance="pushgateway", area="nonheap"})
# 非堆内存设置最大大小
sum(jvm_memory_max_bytes{application="user-app", instance="pushgateway", area="nonheap"})
# 直接内存(堆外内存)buffer 数量
jvm_buffer_count_buffers{application="user-app", instance="pushgateway", id="direct"}
# 直接内存(堆外内存)已使用大小
jvm_buffer_memory_used_bytes{application="user-app", instance="pushgateway", id="direct"}
# 直接内存(堆外内存)总容量
jvm_buffer_total_capacity_bytes{application="user-app", instance="pushgateway", id="direct"}
# Mapped Buffer 数量
jvm_buffer_count_buffers{application="user-app", instance="pushgateway", id="mapped"}
# Mapped Buffer 已使用大小
jvm_buffer_memory_used_bytes{application="user-app", instance="pushgateway", id="mapped"}
# Mapped Buffer 总容量
jvm_buffer_total_capacity_bytes{application="user-app", instance="pushgateway", id="mapped"}
# 新生代 Eden 区
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"PS Eden Space"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"PS Eden Space"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"PS Eden Space"}
# 老年代
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"PS Old Gen"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"PS Old Gen"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"PS Old Gen"}
# 新生代 Surivor 区
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"PS Survivor Space"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"PS Survivor Space"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"PS Survivor Space"}
# Metaspace 区(方法区)
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"Metaspace"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"Metaspace"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"Metaspace"}
# UseCompressedOops 压缩 Class 指针空间
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"Compressed Class Space"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"Compressed Class Space"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"Compressed Class Space"}
# Code Cache 包括 JIT编译后的机器码,或 Java 使用 JNI 调用
jvm_memory_used_bytes{application="user-app", instance="pushgateway", id=~"Code Cache"}
jvm_memory_committed_bytes{application="user-app", instance="pushgateway", id=~"Code Cache"}
jvm_memory_max_bytes{application="user-app", instance="pushgateway", id=~"Code Cache"}
JvmGcMetrics
类: GC 使用情况指标
public class JvmGcMetrics implements MeterBinder, AutoCloseable {
@Override
public void bindTo(MeterRegistry registry) {
if (!this.managementExtensionsPresent) {
return;
}
double maxLongLivedPoolBytes = getLongLivedHeapPools().mapToDouble(mem -> getUsageValue(mem, MemoryUsage::getMax)).sum();
AtomicLong maxDataSize = new AtomicLong((long) maxLongLivedPoolBytes);
// 堆内存最大大小
Gauge.builder("jvm.gc.max.data.size", maxDataSize, AtomicLong::get)
.tags(tags)
.description("Max size of long-lived heap memory pool")
.baseUnit(BaseUnits.BYTES)
.register(registry);
AtomicLong liveDataSize = new AtomicLong();
// 回收后 堆内存大小
Gauge.builder("jvm.gc.live.data.size", liveDataSize, AtomicLong::get)
.tags(tags)
.description("Size of long-lived heap memory pool after reclamation")
.baseUnit(BaseUnits.BYTES)
.register(registry);
// 新生代内存池 GC 情况
Counter allocatedBytes = Counter.builder("jvm.gc.memory.allocated").tags(tags)
.baseUnit(BaseUnits.BYTES)
.description("Incremented for an increase in the size of the (young) heap memory pool after one GC to before the next")
.register(registry);
// 晋升老年代的内存正增加的次数
Counter promotedBytes = (isGenerationalGc) ? Counter.builder("jvm.gc.memory.promoted").tags(tags)
.baseUnit(BaseUnits.BYTES)
.description("Count of positive increases in the size of the old generation memory pool before GC to after GC")
.register(registry) : null;
final AtomicLong allocationPoolSizeAfter = new AtomicLong(0L);
for (GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (!(mbean instanceof NotificationEmitter)) {
continue;
}
NotificationListener notificationListener = (notification, ref) -> {
CompositeData cd = (CompositeData) notification.getUserData();
GarbageCollectionNotificationInfo notificationInfo = GarbageCollectionNotificationInfo.from(cd);
String gcCause = notificationInfo.getGcCause();
String gcAction = notificationInfo.getGcAction();
GcInfo gcInfo = notificationInfo.getGcInfo();
long duration = gcInfo.getDuration();
if (isConcurrentPhase(gcCause, notificationInfo.getGcName())) {
// GC 并发收集阶段情况
Timer.builder("jvm.gc.concurrent.phase.time")
.tags(tags)
.tags("action", gcAction, "cause", gcCause)
.description("Time spent in concurrent phase")
.register(registry)
.record(duration, TimeUnit.MILLISECONDS);
} else {
// GC 暂停情况
Timer.builder("jvm.gc.pause")
.tags(tags)
.tags("action", gcAction, "cause", gcCause)
.description("Time spent in GC pause")
.register(registry)
.record(duration, TimeUnit.MILLISECONDS);
}
final Map<String, MemoryUsage> before = gcInfo.getMemoryUsageBeforeGc();
final Map<String, MemoryUsage> after = gcInfo.getMemoryUsageAfterGc();
countPoolSizeDelta(before, after, allocatedBytes, allocationPoolSizeAfter, allocationPoolName);
final long longLivedBefore = longLivedPoolNames.stream().mapToLong(pool -> before.get(pool).getUsed()).sum();
final long longLivedAfter = longLivedPoolNames.stream().mapToLong(pool -> after.get(pool).getUsed()).sum();
if (isGenerationalGc) {
final long delta = longLivedAfter - longLivedBefore;
if (delta > 0L) {
promotedBytes.increment(delta);
}
}
// Some GC implementations such as G1 can reduce the old gen size as part of a minor GC. To track the
// live data size we record the value if we see a reduction in the old gen heap size or
// after a major GC.
if (longLivedAfter < longLivedBefore || isMajorGc(notificationInfo.getGcName())) {
liveDataSize.set(longLivedAfter);
maxDataSize.set(longLivedPoolNames.stream().mapToLong(pool -> after.get(pool).getMax()).sum());
}
};
NotificationEmitter notificationEmitter = (NotificationEmitter) mbean;
notificationEmitter.addNotificationListener(notificationListener, notification -> notification.getType().equals(GarbageCollectionNotificationInfo.GARBAGE_COLLECTION_NOTIFICATION), null);
notificationListenerCleanUpRunnables.add(() -> {
try {
notificationEmitter.removeNotificationListener(notificationListener);
} catch (ListenerNotFoundException ignore) {
}
});
}
}
}
在 Grafana 上查看指标:
配置:
# GC 暂停次数/秒
rate(jvm_gc_pause_seconds_count{application="user-app", instance="pushgateway"}[1m])
# GC 暂停平均时间
rate(jvm_gc_pause_seconds_sum{application="$application", instance="$instance"}[1m])/rate(jvm_gc_pause_seconds_count{application="$application", instance="$instance"}[1m])
# GC 暂停最大时间
jvm_gc_pause_seconds_max{application="$application", instance="$instance"}
# 在一次 GC之后到下一次 GC 之前,年轻代增加的大小
rate(jvm_gc_memory_allocated_bytes_total{application="user-app", instance="pushgateway"}[1m])
# 在一次 GC之前到 GC 之后 老年代内存池正增加的次数
rate(jvm_gc_memory_promoted_bytes_total{application="user-app", instance="pushgateway"}[1m])
ClassLoaderMetrics
类加载器的使用情况
public class ClassLoaderMetrics implements MeterBinder {
@Override
public void bindTo(MeterRegistry registry) {
ClassLoadingMXBean classLoadingBean = ManagementFactory.getClassLoadingMXBean();
// 当前类加载器加载类的数量
Gauge.builder("jvm.classes.loaded", classLoadingBean, ClassLoadingMXBean::getLoadedClassCount)
.tags(tags)
.description("The number of classes that are currently loaded in the Java virtual machine")
.baseUnit(BaseUnits.CLASSES)
.register(registry);
// 自从 JVM 启动后类加载器卸载类的数量
FunctionCounter.builder("jvm.classes.unloaded", classLoadingBean, ClassLoadingMXBean::getUnloadedClassCount)
.tags(tags)
.description("The total number of classes unloaded since the Java virtual machine has started execution")
.baseUnit(BaseUnits.CLASSES)
.register(registry);
}
}
在 Grafana 上查看指标:
配置:
# 已加载类的数量
jvm_classes_loaded_classes{application="user-app", instance="pushgateway"}
# 类加载数量的变化
# delta function 表示在指定的时间范围内的指标的差值。如下面的示例:类加载器加载类的数量在当前时间和1分钟之间的数量变化值
# delta should only be used with gauges.
delta(jvm_classes_loaded_classes{application="user-app",instance="pushgateway"}[1m])
JvmCompilationMetrics
类: JVM 编译时的情况
public class JvmCompilationMetrics implements MeterBinder {
@Override
public void bindTo(MeterRegistry registry) {
CompilationMXBean compilationBean = ManagementFactory.getCompilationMXBean();
if (compilationBean != null && compilationBean.isCompilationTimeMonitoringSupported()) {
// JVM 在编译时所耗费的时间
FunctionCounter.builder("jvm.compilation.time", compilationBean, CompilationMXBean::getTotalCompilationTime)
.tags(Tags.concat(tags, "compiler", compilationBean.getName()))
.description("The approximate accumulated elapsed time spent in compilation")
.baseUnit(BaseUnits.MILLISECONDS)
.register(registry);
}
}
}
ExecutorServiceMetrics
类:线程池使用情况
public class ExecutorServiceMetrics implements MeterBinder {
// 是针对 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor
private void monitor(MeterRegistry registry, @Nullable ThreadPoolExecutor tp) {
if (tp == null) {
return;
}
// 线程池中已完成的任务数量
FunctionCounter.builder(metricPrefix + "executor.completed", tp, ThreadPoolExecutor::getCompletedTaskCount)
.tags(tags)
.description("The approximate total number of tasks that have completed execution")
.baseUnit(BaseUnits.TASKS)
.register(registry);
// 当前线程池中正在处理任务的线程数量
Gauge.builder(metricPrefix + "executor.active", tp, ThreadPoolExecutor::getActiveCount)
.tags(tags)
.description("The approximate number of threads that are actively executing tasks")
.baseUnit(BaseUnits.THREADS)
.register(registry);
// 当前线程池中,任务队列中任务数量
Gauge.builder(metricPrefix + "executor.queued", tp, tpRef -> tpRef.getQueue().size())
.tags(tags)
.description("The approximate number of tasks that are queued for execution")
.baseUnit(BaseUnits.TASKS)
.register(registry);
// 当前线程池中,任务队列剩余的空间大小,如:有界队列
Gauge.builder(metricPrefix + "executor.queue.remaining", tp, tpRef -> tpRef.getQueue().remainingCapacity())
.tags(tags)
.description("The number of additional elements that this queue can ideally accept without blocking")
.baseUnit(BaseUnits.TASKS)
.register(registry);
// 当前线程池中,已创建的线程的数量
Gauge.builder(metricPrefix + "executor.pool.size", tp, ThreadPoolExecutor::getPoolSize)
.tags(tags)
.description("The current number of threads in the pool")
.baseUnit(BaseUnits.THREADS)
.register(registry);
// 线程池配置的core线程数量
Gauge.builder(metricPrefix + "executor.pool.core", tp, ThreadPoolExecutor::getCorePoolSize)
.tags(tags)
.description("The core number of threads for the pool")
.baseUnit(BaseUnits.THREADS)
.register(registry);
// 线程池配置的最大线程的数量
Gauge.builder(metricPrefix + "executor.pool.max", tp, ThreadPoolExecutor::getMaximumPoolSize)
.tags(tags)
.description("The maximum allowed number of threads in the pool")
.baseUnit(BaseUnits.THREADS)
.register(registry);
}
// 针对 ForkJoinPool
private void monitor(MeterRegistry registry, ForkJoinPool fj) {
// 该线程池中发生窃取其它任务处理的任务的数量
FunctionCounter.builder(metricPrefix + "executor.steals", fj, ForkJoinPool::getStealCount)
.tags(tags)
.description("Estimate of the total number of tasks stolen from " +
"one thread's work queue by another. The reported value " +
"underestimates the actual total number of steals when the pool " +
"is not quiescent")
.register(registry);
// 评估该线程池任务队列中的任务数量
Gauge.builder(metricPrefix + "executor.queued", fj, ForkJoinPool::getQueuedTaskCount)
.tags(tags)
.description("An estimate of the total number of tasks currently held in queues by worker threads")
.register(registry);
// 评估该线程池中正在处理任务的线程数量
Gauge.builder(metricPrefix + "executor.active", fj, ForkJoinPool::getActiveThreadCount)
.tags(tags)
.description("An estimate of the number of threads that are currently stealing or executing tasks")
.register(registry);
// 评估该线程池中正在处理任务的线程数量(不包括线程状态为:blocked 和 waiting 的线程)
Gauge.builder(metricPrefix + "executor.running", fj, ForkJoinPool::getRunningThreadCount)
.tags(tags)
.description("An estimate of the number of worker threads that are not blocked waiting to join tasks or for other managed synchronization threads")
.register(registry);
}
}
DiskSpaceMetrics
类:磁盘使用情况
public class DiskSpaceMetrics implements MeterBinder {
@Override
public void bindTo(MeterRegistry registry) {
Iterable<Tag> tagsWithPath = Tags.concat(tags, "path", absolutePath);
// 磁盘可用空间大小
Gauge.builder("disk.free", path, File::getUsableSpace)
.tags(tagsWithPath)
.description("Usable space for path")
.baseUnit(BaseUnits.BYTES)
.strongReference(true)
.register(registry);
// 已使用磁盘的空间大小
Gauge.builder("disk.total", path, File::getTotalSpace)
.tags(tagsWithPath)
.description("Total space for path")
.baseUnit(BaseUnits.BYTES)
.strongReference(true)
.register(registry);
}
}
System Metrics
FileDescriptorMetrics
类:文件描述符使用情况
表示该 JVM 进程中打开文件描述符的使用情况,包括:socket file 和 file 等
public class FileDescriptorMetrics implements MeterBinder {
@Override
public void bindTo(MeterRegistry registry) {
if (openFilesMethod != null) {
// 表示该进程已打开的文件描述符数量
Gauge.builder("process.files.open", osBean, x -> invoke(openFilesMethod))
.tags(tags)
.description("The open file descriptor count")
.baseUnit(BaseUnits.FILES)
.register(registry);
}
// 表示该进程支持打开最大文件描述符数量
if (maxFilesMethod != null) {
Gauge.builder("process.files.max", osBean, x -> invoke(maxFilesMethod))
.tags(tags)
.description("The maximum file descriptor count")
.baseUnit(BaseUnits.FILES)
.register(registry);
}
}
}
在 Grafana 上查看指标:
配置:
# 打开 FD 数量
process_files_open_files{application="user-app", instance="pushgateway"}
# 支持最大的 FD 数量
process_files_max_files{application="user-app", instance="pushgateway"}
UptimeMetrics
类:运行时间指标
public class UptimeMetrics implements MeterBinder {
@Override
public void bindTo(MeterRegistry registry) {
// 该进程运行时长
TimeGauge.builder("process.uptime", runtimeMXBean, TimeUnit.MILLISECONDS, RuntimeMXBean::getUptime)
.tags(tags)
.description("The uptime of the Java virtual machine")
.register(registry);
// 该进程启动时间
TimeGauge.builder("process.start.time", runtimeMXBean, TimeUnit.MILLISECONDS, RuntimeMXBean::getStartTime)
.tags(tags)
.description("Start time of the process since unix epoch.")
.register(registry);
}
}
在 Grafana 上查看指标:
配置:
# 运行时长
process_uptime_seconds{application="user-app", instance="pushgateway"}
# 启动时间
process_start_time_seconds{application="user-app", instance="pushgateway"}*1000
ProcessorMetrics
类:进程 CPU 的使用情况
public class ProcessorMetrics implements MeterBinder {
@Override
public void bindTo(MeterRegistry registry) {
Runtime runtime = Runtime.getRuntime();
// 当前 JVM 环境中可使用的CPU 核数
Gauge.builder("system.cpu.count", runtime, Runtime::availableProcessors)
.tags(tags)
.description("The number of processors available to the Java virtual machine")
.register(registry);
if (operatingSystemBean.getSystemLoadAverage() >= 0) {
// 当前操作系统的负载情况
Gauge.builder("system.load.average.1m", operatingSystemBean, OperatingSystemMXBean::getSystemLoadAverage)
.tags(tags)
.description("The sum of the number of runnable entities queued to available processors and the number " +
"of runnable entities running on the available processors averaged over a period of time")
.register(registry);
}
if (systemCpuUsage != null) {
// 操作系统 占用 CPU 使用率
Gauge.builder("system.cpu.usage", operatingSystemBean, x -> invoke(systemCpuUsage))
.tags(tags)
.description("The \"recent cpu usage\" for the whole system")
.register(registry);
}
if (processCpuUsage != null) {
// 该进程 占用 CPU 使用率
Gauge.builder("process.cpu.usage", operatingSystemBean, x -> invoke(processCpuUsage))
.tags(tags)
.description("The \"recent cpu usage\" for the Java Virtual Machine process")
.register(registry);
}
}
}
在 Grafana 上查看指标:
# 操作系统 占用 CPU 率
system_cpu_usage{application="user-app", instance="pushgateway"}
# 进程 占用 CPU 率
process_cpu_usage{application="user-app", instance="pushgateway"}
#
avg_over_time(process_cpu_usage{application="user-app", instance="pushgateway"}[1h])
# 平均1分钟的负载情况
system_load_average_1m{application="user-app", instance="pushgateway"}
# CPU 核数
system_cpu_count{application="user-app", instance="pushgateway"}
DB Connection Pool
数据库连接池使用的是阿里的 Druid,下面先看 Spring Actuator 的源码是如何采集 Connection Pool 指标。
DataSourcePoolMetrics
类:针对数据库连接池的使用情况
public class DataSourcePoolMetrics implements MeterBinder {
private final DataSource dataSource;
@Override
public void bindTo(MeterRegistry registry) {
if (this.metadataProvider.getDataSourcePoolMetadata(this.dataSource) != null) {
// 活动的连接数
bindPoolMetadata(registry, "active",
"Current number of active connections that have been allocated from the data source.",
DataSourcePoolMetadata::getActive);
// 当前正在空闲的连接数
bindPoolMetadata(registry, "idle", "Number of established but idle connections.",
DataSourcePoolMetadata::getIdle);
// 配置连接池的最大连接数
bindPoolMetadata(registry, "max",
"Maximum number of active connections that can be allocated at the same time.",
DataSourcePoolMetadata::getMax);
bindPoolMetadata(registry, "min", "Minimum number of idle connections in the pool.",
DataSourcePoolMetadata::getMin);
}
}
// 数据源的元数据信息提供者
private static class CachingDataSourcePoolMetadataProvider implements DataSourcePoolMetadataProvider {
// Spring 容器中的 DataSource 列表
private static final Map<DataSource, DataSourcePoolMetadata> cache = new ConcurrentReferenceHashMap<>();
private final DataSourcePoolMetadataProvider metadataProvider;
CachingDataSourcePoolMetadataProvider(DataSourcePoolMetadataProvider metadataProvider) {
this.metadataProvider = metadataProvider;
}
<N extends Number> Function<DataSource, N> getValueFunction(Function<DataSourcePoolMetadata, N> function) {
return (dataSource) -> function.apply(getDataSourcePoolMetadata(dataSource));
}
// 获取 DataSource 的元数据信息
@Override
public DataSourcePoolMetadata getDataSourcePoolMetadata(DataSource dataSource) {
DataSourcePoolMetadata metadata = cache.get(dataSource);
if (metadata == null) {
metadata = this.metadataProvider.getDataSourcePoolMetadata(dataSource);
cache.put(dataSource, metadata);
}
return metadata;
}
}
}
我们需要把 Druid 连接池相关的 DataSource 元数据信息注册到 Spring 容器中,这样就可以采集 DruidDataSource 相关的指标数据。
Spring 为我们提供了强大扩展能力,才使我们很方便集成第三方组件。
@Configuration(proxyBeanMethods = false)
public class DataSourcePoolMetadataProvidersConfiguration {
public @Bean
DataSourcePoolMetadataProvider druidDataSourceMetadataProvider() {
return dataSource -> {
DruidDataSource druidDataSource = DataSourceUnwrapper.unwrap(dataSource, DruidDataSource.class);
if (druidDataSource != null) {
return new DruidDataSourcePoolMetadata(druidDataSource);
}
return null;
};
}
}
public class DruidDataSourcePoolMetadata extends AbstractDataSourcePoolMetadata<DruidDataSource> {
public DruidDataSourcePoolMetadata(DruidDataSource dataSource) {
super(dataSource);
}
@Override
public Integer getActive() {
return getDataSource().getActiveCount();
}
@Override
public Integer getMax() {
return getDataSource().getMaxActive();
}
@Override
public Integer getMin() {
return getDataSource().getMinIdle();
}
@Override
public String getValidationQuery() {
return getDataSource().getValidationQuery();
}
@Override
public Boolean getDefaultAutoCommit() {
return getDataSource().isDefaultAutoCommit();
}
@Override
public Integer getIdle() {
return getDataSource().getMaxIdle();
}
}
在 Grafana 上看到指标如下:
配置:
jdbc_connections_active{application="user-app", instance="pushgateway"}
jdbc_connections_idle{application="user-app", instance="pushgateway"}
jdbc_connections_max{application="user-app", instance="pushgateway"}
MongoDB Metrics
MongoDB Client 的配置:
@Configuration
public class MongoConfiguration {
@Bean
public MongoDatabaseFactory mongoDatabaseFactory(MeterRegistry meterRegistry) {
MongoClientSettings settings = MongoClientSettings.builder()
// MongoMetricsCommandListener 采集对 MongoDB 所有操作的指标
.addCommandListener(new MongoMetricsCommandListener(meterRegistry))
.applyConnectionString(new ConnectionString(connectionString))
// MongoMetricsConnectionPoolListener 采集 Connection Pool 的指标
.applyToConnectionPoolSettings(builder -> builder.addConnectionPoolListener(new MongoMetricsConnectionPoolListener(meterRegistry)))
.build();
return new SimpleMongoClientDatabaseFactory(MongoClients.create(settings), properties.getDatabase());
}
}
MongoMetricsCommandListener
类:对 MongoDB 操作请求指标
public class MongoMetricsCommandListener implements CommandListener {
private void timeCommand(CommandEvent event, long elapsedTimeInNanoseconds) {
// 对 MongoDB 操作指标
Timer.builder("mongodb.driver.commands")
.description("Timer of mongodb commands")
.tags(tagsProvider.commandTags(event))
.register(registry)
.record(elapsedTimeInNanoseconds, TimeUnit.NANOSECONDS);
}
}
MongoMetricsConnectionPoolListener
类:Connection Pool 的使用情况
public class MongoMetricsConnectionPoolListener implements ConnectionPoolListener {
@Override
public void connectionPoolCreated(ConnectionPoolCreatedEvent event) {
List<Meter> connectionMeters = new ArrayList<>();
// 当前连接池中的连接数,包括:空闲和正在使用的连接
connectionMeters.add(registerGauge(event, METRIC_PREFIX + "size",
"the current size of the connection pool, including idle and and in-use members", poolSizes));
// 当前连接池中正在使用的连接数
connectionMeters.add(registerGauge(event, METRIC_PREFIX + "checkedout",
"the count of connections that are currently in use", checkedOutCounts));
// 当前连接池中等待获取连接的队列的大小
connectionMeters.add(registerGauge(event, METRIC_PREFIX + "waitqueuesize",
"the current size of the wait queue for a connection from the pool", waitQueueSizes));
meters.put(event.getServerId(), connectionMeters);
}
}
在 Grafana 上查看指标:
配置:
# 请求吞吐量
sum(rate(mongodb_driver_commands_seconds_count{application="user-app", instance="pushgateway"}[1m]))
# 请求响应时间 平均值
sum(rate(mongodb_driver_commands_seconds_sum{application="user-app", instance="pushgateway"}[1m]))/sum(rate(mongodb_driver_commands_seconds_count{application="user-app", instance="pushgateway"}[1m]))
# 请求响应时间 最大值
max(mongodb_driver_commands_seconds_max{application="user-app", instance="pushgateway"})
# 请求 错误数
sum(rate(mongodb_driver_commands_seconds_count{application="user-app", instance="pushgateway", status!="SUCCESS"}[1m]))
# Connection Pool
mongodb_driver_pool_size{application="user-app", instance="pushgateway"}
mongodb_driver_pool_checkedout{application="user-app", instance="pushgateway"}
mongodb_driver_pool_waitqueuesize{application="user-app", instance="pushgateway"}
Redis Metrics
RedisCacheMetrics
类:缓存命中、未命中、驱逐、GET、PUT、DELETE相关指标
public class RedisCacheMetrics extends CacheMeterBinder {
@Override
protected void bindImplementationSpecificMetrics(MeterRegistry registry) {
// 删除的 key 的数量
FunctionCounter.builder("cache.removals", this.cache, (cache) -> cache.getStatistics().getDeletes())
.tags(getTagsWithCacheName()).description("Cache removals").register(registry);
// 待处理获取数据的请求 key 的数量
FunctionCounter.builder("cache.gets", this.cache, (cache) -> cache.getStatistics().getPending())
.tags(getTagsWithCacheName()).tag("result", "pending").description("The number of pending requests")
.register(registry);
// Cache 花费在等待锁的时间
TimeGauge
.builder("cache.lock.duration", this.cache, TimeUnit.NANOSECONDS,
(cache) -> cache.getStatistics().getLockWaitDuration(TimeUnit.NANOSECONDS))
.tags(getTagsWithCacheName()).description("The time the cache has spent waiting on a lock")
.register(registry);
}
@Override
public final void bindTo(MeterRegistry registry) {
if (size() != null) {
// 当前 Cache 中的 key-value 对的数目
Gauge.builder("cache.size", cache.get(),
c -> {
Long size = size();
return size == null ? 0 : size;
})
.tags(tags)
.description("The number of entries in this cache. This may be an approximation, depending on the type of cache.")
.register(registry);
}
if (missCount() != null) {
// 未命中的 Cache Key 的次数
FunctionCounter.builder("cache.gets", cache.get(),
c -> {
Long misses = missCount();
return misses == null ? 0 : misses;
})
.tags(tags).tag("result", "miss")
.description("the number of times cache lookup methods have returned an uncached (newly loaded) value, or null")
.register(registry);
}
// 命中 Cache Key 的次数
FunctionCounter.builder("cache.gets", cache.get(), c -> hitCount())
.tags(tags).tag("result", "hit")
.description("The number of times cache lookup methods have returned a cached value.")
.register(registry);
// put Cache key 的次数
FunctionCounter.builder("cache.puts", cache.get(), c -> putCount())
.tags(tags)
.description("The number of entries added to the cache")
.register(registry);
if (evictionCount() != null) {
// 缓存失效,被驱逐的 key 的数量
FunctionCounter.builder("cache.evictions", cache.get(),
c -> {
Long evictions = evictionCount();
return evictions == null ? 0 : evictions;
})
.tags(tags)
.description("cache evictions")
.register(registry);
}
// 不同的 Cache 组件有不同的指标
bindImplementationSpecificMetrics(registry);
}
}
RocketMQ Metrics
对 Producer 和 Consumer Listener 二次封装,部分代码如下:
// 采集指标
private void record(String destination, String tag, Exception e, long duration, String side) {
List<Tag> list = Arrays.asList(
Tag.of("topic", destination)
, Tag.of("messageTag", tag == null ? "none" : tag)
, Tag.of("exception", e == null ? "none" : e.getClass().getSimpleName())
, Tag.of("application", applicationName)
, Tag.of("side", side));
if (meterRegistry != null) {
try {
meterRegistry.timer(Constants.REQUEST_METRICS_NAME, list)
.record(duration, TimeUnit.MILLISECONDS);
} catch (Throwable ignore) {
// ignore
log.error("record metrics throw exception", ignore);
}
}
}
在 Grafana 上看到指标:
配置:
sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", side="producer"}[1m]))
sum(rate(mq_requests_latency_seconds_sum{application="kifs-course-service", instance="pushgateway", exception="none", side="producer"}[1m])) / sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", exception="none", side="producer"}[1m]))
max(mq_requests_latency_seconds_max{application="kifs-course-service", instance="pushgateway", exception="none", side="producer"})
sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", exception!="none", side="producer"}[1m]))
sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", side="consumer"}[1m]))
sum(rate(mongodb_driver_commands_seconds_sum{application="kifs-course-service", instance="pushgateway"}[1m]))/sum(rate(mongodb_driver_commands_seconds_count{application="kifs-course-service", instance="pushgateway"}[1m]))
max(mongodb_driver_commands_seconds_max{application="kifs-course-service", instance="pushgateway"})
sum(rate(mq_requests_latency_seconds_count{application="kifs-course-service", instance="pushgateway", exception!="none", side="consumer"}[1m]))
Method Metrics
Configuration:
@Configuration
//@ConditionalOnProperty(prefix = "management.metrics.methods", name = "enable")
public class MethodMetricsConfiguration {
@Bean
@ConditionalOnClass(value = MeterRegistry.class)
public MethodMetricsAspect methodMetricsAspect(MeterRegistry meterRegistry) {
return new MethodMetricsAspect(meterRegistry, this::skipControllers);
}
private boolean skipControllers(ProceedingJoinPoint pjp) {
Class<?> targetClass = pjp.getTarget().getClass();
return targetClass.isAnnotationPresent(RestController.class) ||
targetClass.isAnnotationPresent(Controller.class);
}
}
Annotation:
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE, ElementType.METHOD})
public @interface Monitored {
String value() default "";
String[] extraTags() default {};
double[] percentiles() default {};
String description() default "none";
boolean isAlert() default false;
}
Interceptor:
@Aspect
@Slf4j
public class MethodMetricsAspect {
public static final Predicate<ProceedingJoinPoint> DONT_SKIP_ANYTHING = pjp -> false;
public static final String DEFAULT_METHOD_METRICS_NAME = "method.invoke.timed";
public static final String DEFAULT_EXCEPTION_TAG_VALUE = "none";
public static final String EXCEPTION_TAG = "exception";
private static final String DEFAULT_METHOD_DESCRIPTION = "method timer";
private final MeterRegistry registry;
private final Function<ProceedingJoinPoint, Iterable<Tag>> tagsBasedOnJoinPoint;
private final Predicate<ProceedingJoinPoint> shouldSkip;
public MethodMetricsAspect() {
this(Metrics.globalRegistry);
}
public MethodMetricsAspect(MeterRegistry registry) {
this(registry, DONT_SKIP_ANYTHING);
}
public MethodMetricsAspect(MeterRegistry registry,
Function<ProceedingJoinPoint, Iterable<Tag>> tagsBasedOnJoinPoint) {
this(registry, tagsBasedOnJoinPoint, DONT_SKIP_ANYTHING);
}
public MethodMetricsAspect(MeterRegistry registry,
Predicate<ProceedingJoinPoint> shouldSkip) {
this(registry,
pjp -> Tags.of("class", pjp.getStaticPart().getSignature().getDeclaringTypeName(),
"method", pjp.getStaticPart().getSignature().getName()),
shouldSkip);
}
public MethodMetricsAspect(MeterRegistry registry,
Function<ProceedingJoinPoint, Iterable<Tag>> tagsBasedOnJoinPoint,
Predicate<ProceedingJoinPoint> shouldSkip) {
this.registry = registry;
this.tagsBasedOnJoinPoint = tagsBasedOnJoinPoint;
this.shouldSkip = shouldSkip;
}
@Around("execution(@me.howard.actuator.annotation.Monitored * *.*(..)))")
public Object around(ProceedingJoinPoint pjp) throws Throwable {
if (shouldSkip.test(pjp)) {
return pjp.proceed();
}
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
Monitored monitored = method.getAnnotation(Monitored.class);
if (monitored == null) {
method = pjp.getTarget().getClass().getMethod(method.getName(), method.getParameterTypes());
monitored = method.getAnnotation(Monitored.class);
}
final boolean stopWhenCompleted = CompletionStage.class.isAssignableFrom(method.getReturnType());
return handleWithTimer(pjp, monitored, stopWhenCompleted);
}
private Object handleWithTimer(ProceedingJoinPoint pjp, Monitored monitored, boolean stopWhenCompleted) throws Throwable {
Timer.Sample sample = Timer.start(registry);
if (stopWhenCompleted) {
try {
return ((CompletionStage<?>) pjp.proceed()).whenComplete((result, exception) -> {
record(pjp, monitored, sample, getExceptionTag(exception));
});
} catch (Exception e) {
record(pjp, monitored, sample, e.getClass().getSimpleName());
throw e;
}
}
String exceptionClass = DEFAULT_EXCEPTION_TAG_VALUE;
try {
return pjp.proceed();
} catch (Exception e) {
exceptionClass = e.getClass().getSimpleName();
throw e;
} finally {
record(pjp, monitored, sample, exceptionClass);
}
}
private void record(ProceedingJoinPoint pjp, Monitored monitored, Timer.Sample sample, String exceptionClass) {
try {
sample.stop(Timer.builder(DEFAULT_METHOD_METRICS_NAME)
.description(DEFAULT_METHOD_DESCRIPTION)
.tags(monitored.extraTags().length % 2 == 0 ? monitored.extraTags() : new String[0])
.tags(EXCEPTION_TAG, exceptionClass)
.tags(tagsBasedOnJoinPoint.apply(pjp))
.tags("description", monitored.description())
.publishPercentiles(monitored.percentiles().length == 0 ? null : monitored.percentiles())
.register(registry));
} catch (Exception e) {
// ignore
log.warn("metrics record exception", e);
}
}
private String getExceptionTag(Throwable t) {
if (t == null) {
return DEFAULT_EXCEPTION_TAG_VALUE;
}
if (t.getCause() == null) {
return t.getClass().getSimpleName();
}
return t.getCause().getClass().getSimpleName();
}
}