Elasticsearch RestHighLevelClient发起请求的过程分析

现在我们在用JAVA做ES应用开发的时候,通常会使用RestHighLevelClient来进行发送请求,早期没有RestHighLevelClient的时候,是直接使用Transport进行转发请求。

1.ES请求流转

首先我们来看下,从client发出http请求到ES集群后的整个流程。
1)首先请求到达集群节点后,由Netty4HttpServerTransport接受请求,通过RequestHandler类转到Controller,再有Controller根据http请求,找打注册在上面的Action。
2)根据Http请求选择的TransportXXXAction会判断当前请求的shard是否在当前节点,如果在,直接访问lucene,如果不在,则需要队请求转发
3)Node内部的请求转发都是基于Netty4Transpor的,默认是9200端口,可以理解为Elasticsearch内部的RPC通讯
4)请求到达node2之后,经过对应的XXXHandler处理后,会访问node2的lucene


image.png

2.RestHighLevelClient的请求流程

2.1新建client

HttpHost httpHost = new HttpHost("localhost", 9200, "http");
RestClientBuilder builder =  RestClient.builder(httpHost);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
        new UsernamePasswordCredentials("elastic", "123456"));
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
    @Override
    public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
        builder.setConnectTimeout(3000);
        return builder;
    }
});
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
    @Override
    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
        httpAsyncClientBuilder.setMaxConnTotal(30);
        httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        return httpAsyncClientBuilder;
    }
});
client = new RestHighLevelClient(builder);

如上代码所示,在新建client的时候,可以指定请求的节点,鉴权,请求超时,http线程池等参数。示例只是用了一个节点,如果才用多个节点的,在builder的时候可以传入多个HttpHost

2.2请求体构造

我们以Search请求为例,来进行举例,和search类似,client对很多方法都提供了同步和异步的方法


image.png

请求最主要的参数就是SearchRequest,这里需要放入当前请求的索引,查询条件等

SearchRequest最主要的两个参数,一个是indices,还有一个就是SearchSourceBuilder,indices表示请求的索引,SearchSourceBuilder表示请求的语法

用RestHighLevelClient的一个很重要的原因,就是它的语法很大程度上和ES的DSL是一一对应的,比如SearchSourceBuilder我们可以理解为DSL最外层的{}, SearchSourceBuilder内部的成员变量有,
QueryBuilder,fetchSourceContext, aggregations等,这些成员变量内部的接口也和DSL语法基本差不多。

在SearchRequest构造完成之后,我们可以调用toString()方法生成DSL,当然在使用过程中我们也可以在kibina等工具内用DSL调好请求,然后在Search的时候直接使用DSL,但是这种方式不利于维护查询的语法。不建议使用。

2.3请求发送

执行search方法后,最终会执行到RestClient:performRequest

public Response performRequest(Request request) throws IOException {
    SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
    performRequestAsyncNoCatch(request, listener);
    return listener.get();
}

通过该方法,我们看到无论我们在外面选择的是同步或者异步的方法,其实clinet内部都是按照异步处理的。所以2.1中介绍的线程池配置就很关键,需要根据不同的业务选择不同的线程池大小。

我们来看下这段代码的主要逻辑performRequestAsyncNoCatch(request, listener),该方法中经过一些参数校验和请求封装后,进入方法

performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
        request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);

这里有个关键的方法,nextNode(),我们看一下nextNode()做了些什么

/**
 * Returns a non-empty {@link Iterator} of nodes to be used for a request
 * that match the {@link NodeSelector}.
 * <p>
 * If there are no living nodes that match the {@link NodeSelector}
 * this will return the dead node that matches the {@link NodeSelector}
 * that is closest to being revived.
 * @throws IOException if no nodes are available
 */
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
    NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
    Iterable<Node> hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
    return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
}

我们先来看一下RestClient的几个成员变量,即selectNodes的几个入参

private final AtomicInteger lastNodeIndex = new AtomicInteger(0);  //上一次请求的node编号
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();  //node的状态map,表示某个node是否连不上
private final NodeSelector nodeSelector; //node 选择器,用于负载均衡
private volatile NodeTuple<List<Node>> nodeTuple; //当前client配置的所有协调节点信息

理解了这几个变量之后,我们再看selectNodes方法

/**
 * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
 * if the previous attempt failed and so on. Package private for testing.
 */
static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
                                  AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
    /*
     * Sort the nodes into living and dead lists.
     */
    List<Node> livingNodes = new ArrayList<>(nodeTuple.nodes.size() - blacklist.size());
    List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
    for (Node node : nodeTuple.nodes) { //1
        DeadHostState deadness = blacklist.get(node.getHost());
        if (deadness == null) {
            livingNodes.add(node);
            continue;
        }
        if (deadness.shallBeRetried()) {
            livingNodes.add(node);
            continue;
        }
        deadNodes.add(new DeadNode(node, deadness));
    }

    if (false == livingNodes.isEmpty()) {//2
        /*
         * Normal state: there is at least one living node. If the
         * selector is ok with any over the living nodes then use them
         * for the request.
         */
        List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);
        nodeSelector.select(selectedLivingNodes);
        if (false == selectedLivingNodes.isEmpty()) {
            /*
             * Rotate the list using a global counter as the distance so subsequent
             * requests will try the nodes in a different order.
             */
            Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
            return selectedLivingNodes;
        }
    }

    /*
     * Last resort: there are no good nodes to use, either because
     * the selector rejected all the living nodes or because there aren't
     * any living ones. Either way, we want to revive a single dead node
     * that the NodeSelectors are OK with. We do this by passing the dead
     * nodes through the NodeSelector so it can have its say in which nodes
     * are ok. If the selector is ok with any of the nodes then we will take
     * the one in the list that has the lowest revival time and try it.
     */
    if (false == deadNodes.isEmpty()) {//3
        final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
        /*
         * We'd like NodeSelectors to remove items directly from deadNodes
         * so we can find the minimum after it is filtered without having
         * to compare many things. This saves us a sort on the unfiltered
         * list.
         */
        nodeSelector.select(new Iterable<Node>() {
            @Override
            public Iterator<Node> iterator() {
                return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
            }
        });
        if (false == selectedDeadNodes.isEmpty()) {
            return singletonList(Collections.min(selectedDeadNodes).node);
        }
    }
    throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, "
            + "living " + livingNodes + " and dead " + deadNodes);
}

(1)该方法的第一步,就是从节点状态map内选出所有的node,如果不是dead node,则直接加入到livingNodes列表,如果是的话,判断一下是否需要充实(根据dead的时间)。这里每次请求结束后会根据请求结果更新blacklist的值
(2)第二步,判断liveingNodes是否为空,不为空,则配合负载算法,重排序livingNodes,在后面使用过程中,从livingNodes中选择Node
(3)如果,liveingNodes为空,判断deadNodes是否为空,不为空的话,从deadNodes中选择一个最快被解禁的node,作为请求的Node(死马当活马医)

3总结

看起RestHighLevelClient很简单,其实内部还是有很多复杂逻辑的,有兴趣的可以深入了解下

更多精彩内容,请关注公众号


公众号二维码.jpg
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342