-
主要类
- 作用
sniffer可以理解为动态更新restclient对象内节点,sniffer通过/_nodes/http的get请求去es拿取到当前的节点,然后进行解析成List<Node>对象,在把节点赋给restClient的一个过程。 - 使用方式
通过使用方式进行代码的解读,更能理解每一步。
RestClient restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"))
.build();
Sniffer sniffer = Sniffer.builder(restClient)
.setSniffIntervalMillis(60000).build();
sniffer官网例子:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/_usage.html
- 源码解读
NodesSniffer接口
public interface NodesSniffer {
/**
* Returns the sniffed Elasticsearch nodes.
*/
List<Node> sniff() throws IOException;
}
该接口是嗅探节点的接口,具体实现:ElasticsearchNodesSniffer类
ElasticsearchNodesSniffer 构造器,需要RestClient对象,请求es的_nodes/http
public ElasticsearchNodesSniffer(RestClient restClient, long sniffRequestTimeoutMillis, Scheme scheme) {
this.restClient = Objects.requireNonNull(restClient, "restClient cannot be null");
if (sniffRequestTimeoutMillis < 0) {
throw new IllegalArgumentException("sniffRequestTimeoutMillis must be greater than 0");
}
this.request = new Request("GET", "/_nodes/http");
request.addParameter("timeout", sniffRequestTimeoutMillis + "ms");
this.scheme = Objects.requireNonNull(scheme, "scheme cannot be null");
}
具体实现的sniff,通过JsonParser 去解析json,然后把对象赋值给Node。
@Override
public List<Node> sniff() throws IOException {
Response response = restClient.performRequest(request);
return readHosts(response.getEntity(), scheme, jsonFactory);
}
static List<Node> readHosts(HttpEntity entity, Scheme scheme, JsonFactory jsonFactory) throws IOException {
try (InputStream inputStream = entity.getContent()) {
JsonParser parser = jsonFactory.createParser(inputStream);
if (parser.nextToken() != JsonToken.START_OBJECT) {
throw new IOException("expected data to start with an object");
}
List<Node> nodes = new ArrayList<>();
while (parser.nextToken() != JsonToken.END_OBJECT) {
if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
if ("nodes".equals(parser.getCurrentName())) {
while (parser.nextToken() != JsonToken.END_OBJECT) {
JsonToken token = parser.nextToken();
assert token == JsonToken.START_OBJECT;
String nodeId = parser.getCurrentName();
Node node = readNode(nodeId, parser, scheme);
if (node != null) {
nodes.add(node);
}
}
} else {
parser.skipChildren();
}
}
}
return nodes;
}
}
正如第三步骤一样,Sniffer对象构建,是通过buider方法进行构建的,该方法会返回SnifferBuilder对象,然后调用该对象的build()方法初始化sniffer对象。
public Sniffer build() {
if (nodesSniffer == null) {
this.nodesSniffer = new ElasticsearchNodesSniffer(restClient);
}
return new Sniffer(restClient, nodesSniffer, sniffIntervalMillis,
sniffAfterFailureDelayMillis);
}
在new Sniffer对象会启动Scheduler延时线程去调用sniff方法,默认是每隔5s嗅探一次。
整体流程如下:
Sniff.builder() ->SnifferBuilder.build()->Sniff.Scheduler启动线程-> 调用sniff()