问题列表
- ES有哪两种Client
- 两种需要建立的连接数和需要建立连接的Nodes数是怎么样的
- 两种Client都有办法取得Cluster的所有的Nodes节点吗
- 其他Node是否会感知这个Client的存在?(未答,待更新)
这章来聊聊Client,首先我们看一下TransportClient的源代码,其实看了之前一篇文章的话可以看到TransportClient的初始化过程和一个Node的过程是非常类似的,然后我们再看一下ES 5.x 新弄的RestClient,一个稍微简单,轻量的Client(重点是不需要和Node之间保持长连接,和RestClient 不会和ES版本保持强依赖。
本篇主要是看一下Client是如何初始化的,以及它是如何和其他Node保持关系的,至于怎么调用Client发起查询则不在本篇讨论范围之内。
TransportClient
首先我们看看Elasticsearch 官网给出的new 一个Client的关键参数:
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
// on shutdown
client.close();
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
//Add transport addresses and do something with the client...
Settings settings = Settings.builder()
.put("client.transport.sniff", true).build();
TransportClient client = new PreBuiltTransportClient(settings);
创建一个Client最最重要就是上面列的:
- 要连接的IP地址
- 要连接的集群
- 是否采用嗅探
PreBuiltTransportClient
其实就是一个TransportClient的 builder类,完全没后自己方法
public class PreBuiltTransportClient extends TransportClient {
...
/**
* Creates a new transport client with pre-installed plugins.
*
* @param settings the settings passed to this transport client
* @param plugins a collection of additional plugins to run with this client
* @param hostFailureListener a failure listener that is invoked if a node is disconnected; this can be <code>null</code>
*/
public PreBuiltTransportClient(
Settings settings,
Collection<Class<? extends Plugin>> plugins,
HostFailureListener hostFailureListener) {
super(settings, Settings.EMPTY, addPlugins(plugins, PRE_INSTALLED_PLUGINS), hostFailureListener);
}
直接进去TransportClient的类看,首先是4个重要参数。
public static final Setting<TimeValue> CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL =
Setting.positiveTimeSetting("client.transport.nodes_sampler_interval", timeValueSeconds(5), Setting.Property.NodeScope);
public static final Setting<TimeValue> CLIENT_TRANSPORT_PING_TIMEOUT =
Setting.positiveTimeSetting("client.transport.ping_timeout", timeValueSeconds(5), Setting.Property.NodeScope);
public static final Setting<Boolean> CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME =
Setting.boolSetting("client.transport.ignore_cluster_name", false, Setting.Property.NodeScope);
public static final Setting<Boolean> CLIENT_TRANSPORT_SNIFF =
Setting.boolSetting("client.transport.sniff", false, Setting.Property.NodeScope);
nodes_sampler_interval
和ping_timeout
两个参数默认值都是5s。构造函数的话和Node的初始化是非常相似的:
private static ClientTemplate buildTemplate(Settings providedSettings, Settings defaultSettings,
Collection<Class<? extends Plugin>> plugins, HostFailureListener failureListner) {
if (Node.NODE_NAME_SETTING.exists(providedSettings) == false) {
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final NetworkService networkService = new NetworkService(settings, Collections.emptyList());
try {
final List<Setting<?>> additionalSettings = new ArrayList<>(pluginsService.getPluginSettings());
final List<String> additionalSettingsFilter = new ArrayList<>(pluginsService.getPluginSettingsFilter());
for (final ExecutorBuilder<?> builder : threadPool.builders()) {
additionalSettings.addAll(builder.getRegisteredSettings());
}
SettingsModule settingsModule = new SettingsModule(settings, additionalSettings, additionalSettingsFilter);
SearchModule searchModule = new SearchModule(settings, true, pluginsService.filterPlugins(SearchPlugin.class));
List<NamedWriteableRegistry.Entry> entries = new ArrayList<>();
entries.addAll(NetworkModule.getNamedWriteables());
entries.addAll(searchModule.getNamedWriteables());
entries.addAll(ClusterModule.getNamedWriteables());
entries.addAll(pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedWriteables().stream())
.collect(Collectors.toList()));
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(entries);
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(Stream.of(
searchModule.getNamedXContents().stream(),
pluginsService.filterPlugins(Plugin.class).stream()
.flatMap(p -> p.getNamedXContent().stream())
).flatMap(Function.identity()).collect(toList()));
ModulesBuilder modules = new ModulesBuilder();
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.createGuiceModules()) {
modules.add(pluginModule);
}
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getIndexScopedSettings(),
settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool,
pluginsService.filterPlugins(ActionPlugin.class), null, null);
modules.add(actionModule);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(),
boundTransportAddress -> DiscoveryNode.createLocal(settings, new TransportAddress(TransportAddress.META_ADDRESS, 0),
UUIDs.randomBase64UUID()), null);
modules.add((b -> {
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
}));
Injector injector = modules.createInjector();
final TransportClientNodesService nodesService =
new TransportClientNodesService(settings, transportService, threadPool, failureListner == null
? (t, e) -> {} : failureListner);
final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,
actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
List<LifecycleComponent> pluginLifecycleComponents = new ArrayList<>(pluginsService.getGuiceServiceClasses().stream()
.map(injector::getInstance).collect(Collectors.toList()));
resourcesToClose.addAll(pluginLifecycleComponents);
transportService.start();
transportService.acceptIncomingRequests();
ClientTemplate transportClient = new ClientTemplate(injector, pluginLifecycleComponents, nodesService, proxy, namedWriteableRegistry);
resourcesToClose.clear();
return transportClient;
} finally {
IOUtils.closeWhileHandlingException(resourcesToClose);
}
}
当然Client相比起来肯定不会初始化Node节点所有的东西,但是一些重要的东西是不会缺少的,比如threadPool
,NetworkModule
,TransportService
等等。最后会new出一个ClientTemplate
,这个类封装了5个变量
private static final class ClientTemplate {
final Injector injector;
private final List<LifecycleComponent> pluginLifecycleComponents;
private final TransportClientNodesService nodesService;
private final TransportProxyClient proxy;
private final NamedWriteableRegistry namedWriteableRegistry;
private ClientTemplate(Injector injector, List<LifecycleComponent> pluginLifecycleComponents,
TransportClientNodesService nodesService, TransportProxyClient proxy, NamedWriteableRegistry namedWriteableRegistry) {
this.injector = injector;
this.pluginLifecycleComponents = pluginLifecycleComponents;
this.nodesService = nodesService;
this.proxy = proxy;
this.namedWriteableRegistry = namedWriteableRegistry;
}
Settings getSettings() {
return injector.getInstance(Settings.class);
}
ThreadPool getThreadPool() {
return injector.getInstance(ThreadPool.class);
}
}
injector不用说了,就是管理bean依赖的,proxy我在后面会说,namedWritableRegistry 我现在不知道是什么东西来的,我感觉好像是一些request和response的一些序列化的东西,以后读懂了会回来更新这里。这里先看一下这个nodesService,它是一个TransportClientNodesService
类,通俗讲就是给Client用来感知和管理与周边的Node通讯用,应该和NodeService是做类似的东西
TransportClientNodesService(Settings settings, TransportService transportService,
ThreadPool threadPool, TransportClient.HostFailureListener hostFailureListener) {
super(settings);
this.clusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.threadPool = threadPool;
this.minCompatibilityVersion = Version.CURRENT.minimumCompatibilityVersion();
this.nodesSamplerInterval = TransportClient.CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
this.pingTimeout = TransportClient.CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
this.ignoreClusterName = TransportClient.CLIENT_TRANSPORT_IGNORE_CLUSTER_NAME.get(this.settings);
if (logger.isDebugEnabled()) {
logger.debug("node_sampler_interval[{}]", nodesSamplerInterval);
}
if (TransportClient.CLIENT_TRANSPORT_SNIFF.get(this.settings)) {
this.nodesSampler = new SniffNodesSampler();
} else {
this.nodesSampler = new SimpleNodeSampler();
}
this.hostFailureListener = hostFailureListener;
this.nodesSamplerFuture = threadPool.schedule(nodesSamplerInterval, ThreadPool.Names.GENERIC, new ScheduledNodeSampler());
}
重要参数Sniff 就用在这里,根绝不同的配置定义了SniffNodesSampler
或者是SimpleNodeSampler
,留意最后还初始化了一个定时器,就是按配置的如每5s去ping一下其他nodes。
class SimpleNodeSampler extends NodeSampler {
@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
for (DiscoveryNode listedNode : listedNodes) {
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
handler);
final LivenessResponse livenessResponse = handler.txGet();
if (!ignoreClusterName && !clusterName.equals(livenessResponse.getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName);
newFilteredNodes.add(listedNode);
} else {
// use discovered information but do keep the original transport address,
// so people can control which address is exactly used.
DiscoveryNode nodeWithInfo = livenessResponse.getDiscoveryNode();
newNodes.add(new DiscoveryNode(nodeWithInfo.getName(), nodeWithInfo.getId(), nodeWithInfo.getEphemeralId(),
nodeWithInfo.getHostName(), nodeWithInfo.getHostAddress(), listedNode.getAddress(),
nodeWithInfo.getAttributes(), nodeWithInfo.getRoles(), nodeWithInfo.getVersion()));
}
} catch (ConnectTransportException e) {
logger.debug(
(Supplier<?>)
() -> new ParameterizedMessage("failed to connect to node [{}], ignoring...", listedNode), e);
hostFailureListener.onNodeDisconnected(listedNode, e);
} catch (Exception e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage("failed to get node info for {}, disconnecting...", listedNode), e);
}
}
nodes = validateNewNodes(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
}
}
SampleNodeSampler
很简单,其实就是把配置进去的listedNodes
去请求一个STATE的request,注意,这里用的是TransportService去拿connection(底层是用netty4Transport),而线程是用GENERIC的线程池。把成功建立连接的所有的Nodes保存起来,而与每个Node也只保持1条连接。
再来看看SniffNodesSampler
:
class SniffNodesSampler extends NodeSampler {
@Override
protected void doSample() {
// the nodes we are going to ping include the core listed nodes that were added
// and the last round of discovered nodes
Set<DiscoveryNode> nodesToPing = new HashSet<>();
for (DiscoveryNode node : listedNodes) {
nodesToPing.add(node);
}
for (DiscoveryNode node : nodes) {
nodesToPing.add(node);
}
final CountDownLatch latch = new CountDownLatch(nodesToPing.size());
final ConcurrentMap<DiscoveryNode, ClusterStateResponse> clusterStateResponses = ConcurrentCollections.newConcurrentMap();
try {
for (final DiscoveryNode nodeToPing : nodesToPing) {
threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new AbstractRunnable() {
首先也是先向所有的listedNode都ping一遍,注意这里用的是MANAGEMENT
的threadPool
@Override
protected void doRun() throws Exception {
Transport.Connection pingConnection = null;
if (nodes.contains(nodeToPing)) {
try {
pingConnection = transportService.getConnection(nodeToPing);
} catch (NodeNotConnectedException e) {
// will use a temp connection
}
}
if (pingConnection == null) {
logger.trace("connecting to cluster node [{}]", nodeToPing);
connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
pingConnection = connectionToClose;
}
transportService.sendRequest(pingConnection, ClusterStateAction.NAME,
Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE)
.withTimeout(pingTimeout).build(),
new TransportResponseHandler<ClusterStateResponse>() {
@Override
public ClusterStateResponse newInstance() {
return new ClusterStateResponse();
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
@Override
public void handleResponse(ClusterStateResponse response) {
clusterStateResponses.put(nodeToPing, response);
onDone();
}
@Override
public void handleException(TransportException e) {
logger.info(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to get local cluster state for {}, disconnecting...", nodeToPing), e);
try {
hostFailureListener.onNodeDisconnected(nodeToPing, e);
} finally {
onDone();
}
}
});
}
同样也是调用TransportService发起连接,这里要特别注意,这种方式下其实是建立了一堆连接connectionToClose = transportService.openConnection(nodeToPing, LISTED_NODES_PROFILE);
如每个类型多少条连接这样,所以这种模式一个Client会和一个Node保持一堆连接。回调函数都很简单,成功和失败的都归类,同时拿到了每个送回来的cluster的state保存下来clusterStateResponses.put(nodeToPing, response);
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
for (Map.Entry<DiscoveryNode, ClusterStateResponse> entry : clusterStateResponses.entrySet()) {
if (!ignoreClusterName && !clusterName.equals(entry.getValue().getClusterName())) {
logger.warn("node {} not part of the cluster {}, ignoring...",
entry.getValue().getState().nodes().getLocalNode(), clusterName);
newFilteredNodes.add(entry.getKey());
continue;
}
for (ObjectCursor<DiscoveryNode> cursor : entry.getValue().getState().nodes().getDataNodes().values()) {
newNodes.add(cursor.value);
}
}
nodes = validateNewNodes(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
最后汇总再确认一遍所有的nodes,校验完后维护,其实这里的nodes就是整个集群的所有的nodes了,剩下的就交给那个调度器去每间隔时间去ping了。
从这里我们也可以看出,其实这里我们已经建立好了连接了,因此以后有什么请求的话其实client向一个node取一个连接或者一个类型的连接就可以发起请求了。这就是之前说的proxy的事,我们回去看看proxy是什么东西。
proxy也是在ClientTemplate里面被初始化:final TransportProxyClient proxy = new TransportProxyClient(settings, transportService, nodesService,actionModule.getActions().values().stream().map(x -> x.getAction()).collect(Collectors.toList()));
它只保存两个变量,nodesService
,proxies
private final TransportClientNodesService nodesService;
private final Map<Action, TransportActionNodeProxy> proxies;
TransportProxyClient(Settings settings, TransportService transportService,
TransportClientNodesService nodesService, List<GenericAction> actions) {
this.nodesService = nodesService;
Map<Action, TransportActionNodeProxy> proxies = new HashMap<>();
for (GenericAction action : actions) {
if (action instanceof Action) {
proxies.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.proxies = unmodifiableMap(proxies);
}
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends
ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action,
final Request request, ActionListener<Response> listener) {
final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
nodesService.execute((n, l) -> proxy.execute(n, request, l), listener);
}
调用也很简单,就是得到一个action的proxy,指定一个函数proxy.execute()
然后把这个函数扔给nodesService
去执行它。说白就是proxies
里记录了每种action如何执行请求,然后让nodesService
随机选一个node来发送:
public <Response> void execute(NodeListenerCallback<Response> callback, ActionListener<Response> listener) {
// we first read nodes before checking the closed state; this
// is because otherwise we could be subject to a race where we
// read the state as not being closed, and then the client is
// closed and the nodes list is cleared, and then a
// NoNodeAvailableException is thrown
// it is important that the order of first setting the state of
// closed and then clearing the list of nodes is maintained in
// the close method
final List<DiscoveryNode> nodes = this.nodes;
if (closed) {
throw new IllegalStateException("transport client is closed");
}
ensureNodesAreAvailable(nodes);
int index = getNodeNumber();
RetryListener<Response> retryListener = new RetryListener<>(callback, listener, nodes, index, hostFailureListener);
DiscoveryNode node = retryListener.getNode(0);
try {
callback.doWithNode(node, retryListener);
} catch (Exception e) {
try {
//this exception can't come from the TransportService as it doesn't throw exception at all
listener.onFailure(e);
} finally {
retryListener.maybeNodeFailed(node, e);
}
}
}
RestClient
RestClient 其实再简单不过了,看头100行的源代码就基本看完了
public class RestClient implements Closeable {
private static final Log logger = LogFactory.getLog(RestClient.class);
private final CloseableHttpAsyncClient client;
//we don't rely on default headers supported by HttpAsyncClient as those cannot be replaced
private final Header[] defaultHeaders;
private final long maxRetryTimeoutMillis;
private final String pathPrefix;
private final AtomicInteger lastHostIndex = new AtomicInteger(0);
private volatile HostTuple<Set<HttpHost>> hostTuple;
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
private final FailureListener failureListener;
RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
HttpHost[] hosts, String pathPrefix, FailureListener failureListener) {
this.client = client;
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
this.defaultHeaders = defaultHeaders;
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
setHosts(hosts);
}
/**
* Returns a new {@link RestClientBuilder} to help with {@link RestClient} creation.
*/
public static RestClientBuilder builder(HttpHost... hosts) {
return new RestClientBuilder(hosts);
}
/**
* Replaces the hosts that the client communicates with.
* @see HttpHost
*/
public synchronized void setHosts(HttpHost... hosts) {
if (hosts == null || hosts.length == 0) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
Set<HttpHost> httpHosts = new HashSet<>();
AuthCache authCache = new BasicAuthCache();
for (HttpHost host : hosts) {
Objects.requireNonNull(host, "host cannot be null");
httpHosts.add(host);
authCache.put(host, new BasicScheme());
}
this.hostTuple = new HostTuple<>(Collections.unmodifiableSet(httpHosts), authCache);
this.blacklist.clear();
}
这里与TransportClient最大不一样就是这里不会启用sniff,仅负责维护配置进去的所有的hosts,需要简要的话都把authCahe 保存起来,剩下就是所有的请求都交给apache 的httpClient去做了。
Sniffer组件
那如果我真心觉得RestClient好用而我又想用sniff那咋整,贴心的ES团队给个小插件给你,用它来绑定一下你的client,它会帮你去嗅探出这个cluster的所有机器并回调client的setHosts()
void sniff(HttpHost excludeHost, long nextSniffDelayMillis) {
if (running.compareAndSet(false, true)) {
try {
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
logger.debug("sniffed hosts: " + sniffedHosts);
if (excludeHost != null) {
sniffedHosts.remove(excludeHost);
}
if (sniffedHosts.isEmpty()) {
logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
} else {
this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
}
} catch (Exception e) {
logger.error("error while sniffing nodes", e);
} finally {
scheduleNextRun(nextSniffDelayMillis);
running.set(false);
}
}
}
本篇完