一、首先看到入口地方:
@Path("/v1/cluster")
public class ClusterStatsResource
{
private final InternalNodeManager nodeManager;
private final QueryManager queryManager;
private final boolean isIncludeCoordinator;
有三个状态:节点管理器、查询管理器、是否包含协调器
二、提供的服务:
@GET
@Produces(MediaType.APPLICATION_JSON)
public ClusterStats getClusterStats()
{
long runningQueries = 0;
long blockedQueries = 0;
long queuedQueries = 0;
省略。。。。
返回的是ClusterStats信息
三、ClusterStats状态有哪些?
public static class ClusterStats
{
private final long runningQueries;
private final long blockedQueries;
private final long queuedQueries;
private final long activeWorkers;
private final long runningDrivers;
private final double reservedMemory;
private final double rowInputRate;
private final double byteInputRate;
private final double cpuTimeRate;
此处我们可以知道此接口返回的是:运行查询数、阻塞个数、进入排队数、活动的worker数、正在运行的drivers数 等信息。
四、ClusterStats状态是如何获取的呢?
由两个最开始ClusterStatsResource中的两个状态提供:
InternalNodeManager
QueryManager
五、先看InternalNodeManager提供什么?
@Inject
public DiscoveryNodeManager(
@ServiceType("presto") ServiceSelector serviceSelector,
NodeInfo nodeInfo,
FailureDetector failureDetector,
NodeVersion expectedNodeVersion,
@ForNodeManager HttpClient httpClient)
{
省略。。。
this.currentNode = refreshNodesInternal();
}
看最后一行,refreshNodesInternal。
//获取所有节点的状态信息,通过/v1/service来获取。包括location、节点状态、节点ID、UUID等信息
Set<ServiceDescriptor> services = serviceSelector.selectAllServices().stream()
.filter(service -> !failureDetector.getFailed().contains(service))
.collect(toImmutableSet());
//获取所有节点状况
allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build());
activeNodesByConnectorId = byConnectorIdBuilder.build();
coordinators = coordinatorsBuilder.build();
每隔5秒来,由协调器节点主动去查询workers状态。而且在更新完成5s之后,就调用上面的refreshNodesInternal方法。通过/v1/service来获取节点信息,把新节点加入到DiscoveryNodeManager的一个map中nodeStates。
@PostConstruct
public void startPollingNodeStates()
{
// 如果是协调器节点,就定时5s去刷新拉去worker节点数据
if (getCoordinators().contains(currentNode)) {
nodeStateUpdateExecutor.scheduleWithFixedDelay(() -> {
AllNodes allNodes = getAllNodes();
<b>从上面我们已经看出来ClusterStats的activeWorkers信息可以从InternalNodeManager的nodeStates中获取。</b>
六、再看QueryManager提供什么?
他的实现类是:SqlQueryManager
QueryManager提供了getAllQueryInfo方法给ClusterStatsResource来获取ClusterStats中的状态信息。
@Override
public List<QueryInfo> getAllQueryInfo()
{
return queries.values().stream()
.map(queryExecution -> {
try {
return queryExecution.getQueryInfo();
}
catch (RuntimeException ignored) {
return null;
}
})
.filter(Objects::nonNull)
.collect(toImmutableList());
}
这个类在创建query时候,加入到 queries中
private final ConcurrentMap<QueryId, QueryExecution> queries = new ConcurrentHashMap<>();
同时对每一个query添加监听器,一旦执行状态改变,就更新状态query状态即:QueryInfo
七、回头看看ClusterStats中的状态
public ClusterStats getClusterStats()
{
long runningQueries = 0;
long blockedQueries = 0;
long queuedQueries = 0;
long activeNodes = nodeManager.getNodes(NodeState.ACTIVE).size();
if (!isIncludeCoordinator) {
activeNodes -= 1;
}
long runningDrivers = 0;
double memoryReservation = 0;
double rowInputRate = 0;
double byteInputRate = 0;
double cpuTimeRate = 0;
for (QueryInfo query : queryManager.getAllQueryInfo()) {
if (query.getState() == QueryState.QUEUED) {
queuedQueries++;
}
else if (query.getState() == QueryState.RUNNING) {
if (query.getQueryStats().isFullyBlocked()) {
blockedQueries++;
}
else {
runningQueries++;
}
}
if (!query.getState().isDone()) {
double totalExecutionTimeSeconds = query.getQueryStats().getElapsedTime().getValue(SECONDS);
if (totalExecutionTimeSeconds != 0) {
byteInputRate += query.getQueryStats().getProcessedInputDataSize().toBytes() / totalExecutionTimeSeconds;
rowInputRate += query.getQueryStats().getProcessedInputPositions() / totalExecutionTimeSeconds;
cpuTimeRate += (query.getQueryStats().getTotalCpuTime().getValue(SECONDS)) / totalExecutionTimeSeconds;
}
memoryReservation += query.getQueryStats().getTotalMemoryReservation().toBytes();
runningDrivers += query.getQueryStats().getRunningDrivers();
}
}
return new ClusterStats(runningQueries, blockedQueries, queuedQueries, activeNodes, runningDrivers, memoryReservation, rowInputRate, byteInputRate, cpuTimeRate);
}
我们看到大量的信息是来自query.QueryInfo中的信息。
八、状态监听器实现
待续~~~