之前有遇到dubbo线程池枯竭的问题,但作为服务端不太好监控到自己的线程池已满。基于之前已经打通micrometer+prometheus+grafana来做java服务的实时监控,直接使用metrics实时上报到prometheus,贼方便
1. 获取到dubbo的线程池
因为dubbo线程数是一个波动的值,所以采用仪表盘(gauge)来存储数据
dubbo 2.6.8
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.store.DataStore;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor;
@Slf4j
public class DubboThreadPoolUtil {
private static final String DEFAULT_PORT = "20880";
/**
* 获取dubbo的线程池
*/
public static Optional<ThreadPoolExecutor> getThreadPool() {
return getThreadPool(DEFAULT_PORT);
}
public static Optional<ThreadPoolExecutor> getThreadPool(String port) {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
return Optional.ofNullable(executors.get(port)).map(executor -> {
if (executor instanceof ThreadPoolExecutor) {
return (ThreadPoolExecutor) executor;
} else {
return null;
}
});
}
}
2. 搭配micrometer+prometheus使用
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.common.store.DataStore;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import lombok.extern.slf4j.Slf4j;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
public class DubboThreadPoolUtil {
private static final String DEFAULT_PORT = "20880";
private static ScheduledThreadPoolExecutor scheduledExecutor;
private static ThreadPoolExecutor dubboExecutor;
/**
* 获取dubbo的线程池
*/
public static Optional<ThreadPoolExecutor> getThreadPool() {
return getThreadPool(DEFAULT_PORT);
}
public static Optional<ThreadPoolExecutor> getThreadPool(String port) {
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
Map<String, Object> executors = dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY);
return Optional.ofNullable(executors.get(port)).map(executor -> {
if (executor instanceof ThreadPoolExecutor) {
return (ThreadPoolExecutor) executor;
} else {
return null;
}
});
}
public static void addMetricsWatcher() {
addMetricsWatcher(DEFAULT_PORT);
}
public static void addMetricsWatcher(String port) {
if (scheduledExecutor == null) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("dubbo-threadPool-watcher-%d").build();
scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
}
scheduledExecutor.scheduleWithFixedDelay(() -> {
if (dubboExecutor == null) {
Optional<ThreadPoolExecutor> threadPoolOptional = getThreadPool(port);
if (!threadPoolOptional.isPresent()) {
log.info("添加dubbo线程池监控【未成功,等1s重试】");
return;
}
dubboExecutor = threadPoolOptional.get();
log.info("添加dubbo线程池监控【完毕】");
}
Metrics.gauge("dubbo_threadPool", Collections.singletonList(Tag.of("indicator", "活跃线程数")), dubboExecutor, ThreadPoolExecutor::getActiveCount);
Metrics.gauge("dubbo_threadPool", Collections.singletonList(Tag.of("indicator", "最大线程数")), dubboExecutor, ThreadPoolExecutor::getMaximumPoolSize);
}, 0,1, TimeUnit.SECONDS);
}
}
3. 配置grafana图
max(dubbo_threadPool{indicator="活跃线程数"})
max(dubbo_threadPool{indicator="最大线程数"})
效果如下
值得注意的是,dubbo线程池默认采用的SynchronousQueue