sniffer 嗅探器源码解读

  1. 主要类


    image.png
  2. 作用
    sniffer可以理解为动态更新restclient对象内节点,sniffer通过/_nodes/http的get请求去es拿取到当前的节点,然后进行解析成List<Node>对象,在把节点赋给restClient的一个过程。
  3. 使用方式
    通过使用方式进行代码的解读,更能理解每一步。
  RestClient restClient = RestClient.builder(
    new HttpHost("localhost", 9200, "http"))
    .build();
  Sniffer sniffer = Sniffer.builder(restClient)
    .setSniffIntervalMillis(60000).build();

sniffer官网例子:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/_usage.html

  1. 源码解读
    NodesSniffer接口
public interface NodesSniffer {
    /**
     * Returns the sniffed Elasticsearch nodes.
     */
    List<Node> sniff() throws IOException;
}

该接口是嗅探节点的接口,具体实现:ElasticsearchNodesSniffer类
ElasticsearchNodesSniffer 构造器,需要RestClient对象,请求es的_nodes/http

public ElasticsearchNodesSniffer(RestClient restClient, long sniffRequestTimeoutMillis, Scheme scheme) {
        this.restClient = Objects.requireNonNull(restClient, "restClient cannot be null");
        if (sniffRequestTimeoutMillis < 0) {
            throw new IllegalArgumentException("sniffRequestTimeoutMillis must be greater than 0");
        }
        this.request = new Request("GET", "/_nodes/http");
        request.addParameter("timeout", sniffRequestTimeoutMillis + "ms");
        this.scheme = Objects.requireNonNull(scheme, "scheme cannot be null");
    }

具体实现的sniff,通过JsonParser 去解析json,然后把对象赋值给Node。

 @Override
    public List<Node> sniff() throws IOException {
        Response response = restClient.performRequest(request);
        return readHosts(response.getEntity(), scheme, jsonFactory);
    }

    static List<Node> readHosts(HttpEntity entity, Scheme scheme, JsonFactory jsonFactory) throws IOException {
        try (InputStream inputStream = entity.getContent()) {
            JsonParser parser = jsonFactory.createParser(inputStream);
            if (parser.nextToken() != JsonToken.START_OBJECT) {
                throw new IOException("expected data to start with an object");
            }
            List<Node> nodes = new ArrayList<>();
            while (parser.nextToken() != JsonToken.END_OBJECT) {
                if (parser.getCurrentToken() == JsonToken.START_OBJECT) {
                    if ("nodes".equals(parser.getCurrentName())) {
                        while (parser.nextToken() != JsonToken.END_OBJECT) {
                            JsonToken token = parser.nextToken();
                            assert token == JsonToken.START_OBJECT;
                            String nodeId = parser.getCurrentName();
                            Node node = readNode(nodeId, parser, scheme);
                            if (node != null) {
                                nodes.add(node);
                            }
                        }
                    } else {
                        parser.skipChildren();
                    }
                }
            }
            return nodes;
        }
    }

正如第三步骤一样,Sniffer对象构建,是通过buider方法进行构建的,该方法会返回SnifferBuilder对象,然后调用该对象的build()方法初始化sniffer对象。

public Sniffer build() {
        if (nodesSniffer == null) {
            this.nodesSniffer = new ElasticsearchNodesSniffer(restClient);
        }
        return new Sniffer(restClient, nodesSniffer, sniffIntervalMillis,   
         sniffAfterFailureDelayMillis);
    }

在new Sniffer对象会启动Scheduler延时线程去调用sniff方法,默认是每隔5s嗅探一次。

整体流程如下:
Sniff.builder() ->SnifferBuilder.build()->Sniff.Scheduler启动线程-> 调用sniff()

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容