现在我们在用JAVA做ES应用开发的时候,通常会使用RestHighLevelClient来进行发送请求,早期没有RestHighLevelClient的时候,是直接使用Transport进行转发请求。
1.ES请求流转
首先我们来看下,从client发出http请求到ES集群后的整个流程。
1)首先请求到达集群节点后,由Netty4HttpServerTransport接受请求,通过RequestHandler类转到Controller,再有Controller根据http请求,找打注册在上面的Action。
2)根据Http请求选择的TransportXXXAction会判断当前请求的shard是否在当前节点,如果在,直接访问lucene,如果不在,则需要队请求转发
3)Node内部的请求转发都是基于Netty4Transpor的,默认是9200端口,可以理解为Elasticsearch内部的RPC通讯
4)请求到达node2之后,经过对应的XXXHandler处理后,会访问node2的lucene
2.RestHighLevelClient的请求流程
2.1新建client
HttpHost httpHost = new HttpHost("localhost", 9200, "http");
RestClientBuilder builder = RestClient.builder(httpHost);
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", "123456"));
builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
builder.setConnectTimeout(3000);
return builder;
}
});
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
httpAsyncClientBuilder.setMaxConnTotal(30);
httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
return httpAsyncClientBuilder;
}
});
client = new RestHighLevelClient(builder);
如上代码所示,在新建client的时候,可以指定请求的节点,鉴权,请求超时,http线程池等参数。示例只是用了一个节点,如果才用多个节点的,在builder的时候可以传入多个HttpHost
2.2请求体构造
我们以Search请求为例,来进行举例,和search类似,client对很多方法都提供了同步和异步的方法
请求最主要的参数就是SearchRequest,这里需要放入当前请求的索引,查询条件等
SearchRequest最主要的两个参数,一个是indices,还有一个就是SearchSourceBuilder,indices表示请求的索引,SearchSourceBuilder表示请求的语法
用RestHighLevelClient的一个很重要的原因,就是它的语法很大程度上和ES的DSL是一一对应的,比如SearchSourceBuilder我们可以理解为DSL最外层的{}, SearchSourceBuilder内部的成员变量有,
QueryBuilder,fetchSourceContext, aggregations等,这些成员变量内部的接口也和DSL语法基本差不多。
在SearchRequest构造完成之后,我们可以调用toString()方法生成DSL,当然在使用过程中我们也可以在kibina等工具内用DSL调好请求,然后在Search的时候直接使用DSL,但是这种方式不利于维护查询的语法。不建议使用。
2.3请求发送
执行search方法后,最终会执行到RestClient:performRequest
public Response performRequest(Request request) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
performRequestAsyncNoCatch(request, listener);
return listener.get();
}
通过该方法,我们看到无论我们在外面选择的是同步或者异步的方法,其实clinet内部都是按照异步处理的。所以2.1中介绍的线程池配置就很关键,需要根据不同的业务选择不同的线程池大小。
我们来看下这段代码的主要逻辑performRequestAsyncNoCatch(request, listener),该方法中经过一些参数校验和请求封装后,进入方法
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
这里有个关键的方法,nextNode(),我们看一下nextNode()做了些什么
/**
* Returns a non-empty {@link Iterator} of nodes to be used for a request
* that match the {@link NodeSelector}.
* <p>
* If there are no living nodes that match the {@link NodeSelector}
* this will return the dead node that matches the {@link NodeSelector}
* that is closest to being revived.
* @throws IOException if no nodes are available
*/
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
Iterable<Node> hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
}
我们先来看一下RestClient的几个成员变量,即selectNodes的几个入参
private final AtomicInteger lastNodeIndex = new AtomicInteger(0); //上一次请求的node编号
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>(); //node的状态map,表示某个node是否连不上
private final NodeSelector nodeSelector; //node 选择器,用于负载均衡
private volatile NodeTuple<List<Node>> nodeTuple; //当前client配置的所有协调节点信息
理解了这几个变量之后,我们再看selectNodes方法
/**
* Select nodes to try and sorts them so that the first one will be tried initially, then the following ones
* if the previous attempt failed and so on. Package private for testing.
*/
static Iterable<Node> selectNodes(NodeTuple<List<Node>> nodeTuple, Map<HttpHost, DeadHostState> blacklist,
AtomicInteger lastNodeIndex, NodeSelector nodeSelector) throws IOException {
/*
* Sort the nodes into living and dead lists.
*/
List<Node> livingNodes = new ArrayList<>(nodeTuple.nodes.size() - blacklist.size());
List<DeadNode> deadNodes = new ArrayList<>(blacklist.size());
for (Node node : nodeTuple.nodes) { //1
DeadHostState deadness = blacklist.get(node.getHost());
if (deadness == null) {
livingNodes.add(node);
continue;
}
if (deadness.shallBeRetried()) {
livingNodes.add(node);
continue;
}
deadNodes.add(new DeadNode(node, deadness));
}
if (false == livingNodes.isEmpty()) {//2
/*
* Normal state: there is at least one living node. If the
* selector is ok with any over the living nodes then use them
* for the request.
*/
List<Node> selectedLivingNodes = new ArrayList<>(livingNodes);
nodeSelector.select(selectedLivingNodes);
if (false == selectedLivingNodes.isEmpty()) {
/*
* Rotate the list using a global counter as the distance so subsequent
* requests will try the nodes in a different order.
*/
Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement());
return selectedLivingNodes;
}
}
/*
* Last resort: there are no good nodes to use, either because
* the selector rejected all the living nodes or because there aren't
* any living ones. Either way, we want to revive a single dead node
* that the NodeSelectors are OK with. We do this by passing the dead
* nodes through the NodeSelector so it can have its say in which nodes
* are ok. If the selector is ok with any of the nodes then we will take
* the one in the list that has the lowest revival time and try it.
*/
if (false == deadNodes.isEmpty()) {//3
final List<DeadNode> selectedDeadNodes = new ArrayList<>(deadNodes);
/*
* We'd like NodeSelectors to remove items directly from deadNodes
* so we can find the minimum after it is filtered without having
* to compare many things. This saves us a sort on the unfiltered
* list.
*/
nodeSelector.select(new Iterable<Node>() {
@Override
public Iterator<Node> iterator() {
return new DeadNodeIteratorAdapter(selectedDeadNodes.iterator());
}
});
if (false == selectedDeadNodes.isEmpty()) {
return singletonList(Collections.min(selectedDeadNodes).node);
}
}
throw new IOException("NodeSelector [" + nodeSelector + "] rejected all nodes, "
+ "living " + livingNodes + " and dead " + deadNodes);
}
(1)该方法的第一步,就是从节点状态map内选出所有的node,如果不是dead node,则直接加入到livingNodes列表,如果是的话,判断一下是否需要充实(根据dead的时间)。这里每次请求结束后会根据请求结果更新blacklist的值
(2)第二步,判断liveingNodes是否为空,不为空,则配合负载算法,重排序livingNodes,在后面使用过程中,从livingNodes中选择Node
(3)如果,liveingNodes为空,判断deadNodes是否为空,不为空的话,从deadNodes中选择一个最快被解禁的node,作为请求的Node(死马当活马医)
3总结
看起RestHighLevelClient很简单,其实内部还是有很多复杂逻辑的,有兴趣的可以深入了解下
更多精彩内容,请关注公众号