目标
研究Soul网关的Monitor插件是如何采集metrics
Monitor Plugin
如果网关需要采集metrics,需要在pom.xml里加上Monitor插件依赖:
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-monitor</artifactId>
<version>${project.version}</version>
</dependency>
我们看看这个jar包下有哪些类,我们发现只有一个类MonitorPluginConfiguration
,主要功能是创建Spring Bean。
public class MonitorPluginConfiguration {
// 创建monitor plugin Bean
@Bean
public SoulPlugin monitorPlugin() {
return new MonitorPlugin();
}
// 创建 monitor plugin配置处理器 Bean
@Bean
public PluginDataHandler monitorPluginDataHandler() {
return new MonitorPluginDataHandler();
}
}
MonitorPlugin类功能是设置埋点
public class MonitorPlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// 负责埋点数据,metrics label: http_request_total,
// label values: request uri 和 request method
MetricsTrackerFacade.getInstance()
.counterInc(MetricsLabelEnum.HTTP_REQUEST_TOTAL.getName(),
exchange.getRequest().getURI().getPath(),
exchange.getRequest().getMethodValue());
return chain.execute(exchange);
}
}
MonitorPluginDataHandler类功能是创建并启动埋点数据采集。
public class MonitorPluginDataHandler implements PluginDataHandler {
// 处理插件配置
@Override
public void handlerPlugin(final PluginData pluginData) {
// 如果插件配置不为null,并且是开启状态,则启动metrics采集器
if (Objects.nonNull(pluginData) && pluginData.getEnabled()) {
// 从插件配置读取metrics的配置信息,如ip、port等数据
MetricsConfig monitorConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), MetricsConfig.class);
// 配置验证
if (!checkConfig(monitorConfig)) {
return;
}
// 如果没启动,则启动
if (!MetricsTrackerFacade.getInstance().isStarted()) {
start(monitorConfig);
} else if (!monitorConfig.equals(Singleton.INST.get(MetricsConfig.class))) {
// 否则,重启
restart(monitorConfig);
}
} else {
// 停止
stop();
}
}
private void start(final MetricsConfig monitorConfig) {
// metrics采集器功能由第三方提供,soul网关提供了SPI,可由用户自已去实现
// 启动
MetricsTrackerFacade.getInstance().start(monitorConfig);
Singleton.INST.single(MetricsConfig.class, monitorConfig);
}
private void stop() {
MetricsTrackerFacade.getInstance().stop();
}
}
Soul网关的metrics采集默认是用prometheus实现的,当然soul网关也提供Java SPI,由用户自已实现采用其它第三方的工具。下面我们看看Soul是如何用prometheus实现的
MetricsTrackerFacade类
public final class MetricsTrackerFacade {
// 启动采集器
public void start(final MetricsConfig metricsConfig) {
if (this.isStarted.compareAndSet(false, true)) {
// 创建采集器,采用Dubbo SPI的思想。通过加载classpath下的org.dromara.soul.metrics.spi.MetricsTrackerManager文件中的类
metricsTrackerManager = ExtensionLoader.getExtensionLoader(MetricsTrackerManager.class).getJoin(metricsConfig.getMetricsName());
// prometheus启动,由PrometheusMetricsTrackerManager类实现
metricsTrackerManager.start(metricsConfig);
// 获取当前java进程拥有的cpu核数,可做为创建线程池的最大线程数
Integer threadCount = Optional.ofNullable(metricsConfig.getThreadCount()).orElse(Runtime.getRuntime().availableProcessors());
// 如果是配置异步采集数据,则创建线程池 MetricsTrackerHandler.getInstance().init(metricsConfig.getAsync(), threadCount, metricsTrackerManager);
} else {
log.info("metrics tracker has started !");
}
}
}
MetricsTrackerHandler类负责真正执行数据埋点
public final class MetricsTrackerHandler {
public void init(final boolean async, final int threadCount, final MetricsTrackerManager metricsTrackerManager) {
this.async = async; // 是否异步
this.metricsTrackerManager = metricsTrackerManager;
if (async) {
// 如果是异步,则创建线程池,线程池的线程数量是cpu核数,
// QUEUE_SIZE是线程池中的允许的任务最大数量,默认5000
executorService = new MetricsThreadPoolExecutor(threadCount, QUEUE_SIZE);
}
}
}
// 对指标进行计数
public void counterInc(final String metricsLabel, final String... labelValues) {
if (async) {
// 由子线程去执行
executorService.execute(() -> handlerCounter(metricsLabel, labelValues));
} else {
// 在当前线程执行
handlerCounter(metricsLabel, labelValues);
}
}
MetricsThreadPoolExecutor线程池,负责异步采集metrics数据
public final class MetricsThreadPoolExecutor extends ThreadPoolExecutor {
public MetricsThreadPoolExecutor(final int threadCount, final int queueSize) {
// 创建线程池最大线程数是threadCount
// 有界的任务队列
// 当任务队列已满,采取的拒绝策略
super(threadCount, threadCount, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(queueSize),
SoulThreadFactory.create("metrics", true), new CallerWaitPolicy());
}
// 拒绝策略,采用的是阻塞当前线程。
private static class CallerWaitPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
try {
// 此方法,是将任务放入队列中,当队列已满则进入阻塞
// 当前线程处于WAITTING状态。
executor.getQueue().put(r);
} catch (InterruptedException ex) {
log.error("InterruptedException, discard {}", r);
}
}
}
}
PrometheusMetricsTrackerManager具体的prometheus的实现
public final class PrometheusMetricsTrackerManager implements MetricsTrackerManager {
private HTTPServer server;
public void start(final MetricsConfig metricsConfig) {
// 注册jmx信息,jvm相关监控数据
register(metricsConfig.getJmxConfig());
InetSocketAddress inetSocketAddress;
if ("".equals(metricsConfig.getHost()) || null == metricsConfig.getHost()) {
inetSocketAddress = new InetSocketAddress(metricsConfig.getPort());
} else {
inetSocketAddress = new InetSocketAddress(metricsConfig.getHost(), metricsConfig.getPort());
}
// 创建HTTP Server,用来prometheus server会定时通过ip:port来拉取metrics数据。
server = new HTTPServer(inetSocketAddress, CollectorRegistry.defaultRegistry, true);
}
}
总结
- 通过阅读源码学了Java SPI的知识
- 学习了门面设计模式
- 了解prometheus采集了哪些metrics数据