阅读ES源代码需要对Guice这个依赖注入框架有一点基本了解, 稍微了解一点基础就不影响阅读.这里也写了个基本看ES Guice的使用方式. ES源码的版本是2.4.1版本.
1 初始化TransportClient
我们操作Elasticsearch时, 会首先创建Client类, 代码如下:
private static Client getClient(){
Settings settings = Settings.builder().put("cluster.name", "SERVICE-ELASTICSEARCH-aae03d413f4744298859cd4245e4eda5")
.put("client.transport.ping_timeout", "1200s").build();
Client client = null;
try {
client = TransportClient.builder().settings(settings).build().addTransportAddress
(new InetSocketTransportAddress(InetAddress.getByName("10.3.70.101"), 9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
}
即实例化TransportClient对象,看看其源代码
public TransportClient build() {
//省略...
final ThreadPool threadPool = new ThreadPool(settings);
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
modules.add(pluginModule);
}
modules.add(new PluginsModule(pluginsService));
modules.add(new SettingsModule(this.settings));
modules.add(new NetworkModule(namedWriteableRegistry));
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(new TransportModule(this.settings, namedWriteableRegistry));
modules.add(new SearchModule() {
@Override
protected void configure() {
// noop
}
});
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule(hostFailedListener));
modules.add(new CircuitBreakerModule(this.settings));
pluginsService.processModules(modules);
Injector injector = modules.createInjector();
final TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
transportService.acceptIncomingRequests();
TransportClient transportClient = new TransportClient(injector); // 初始化TransportClient对象
success = true;
return transportClient;
} finally {
if (!success) {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}
可以知道代码里面是先通过Guice框架完成相关Module的依赖注入,比如你的增删改查Action实际上是ActionModule所包含的具体实现, 不同Module完成不同的工作.然后TransportClient transportClient = new TransportClient(injector)这个就是实例化TransportClient对象了, 来看看它的构造方法,
private TransportClient(Injector injector) {
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
this.injector = injector;
nodesService = injector.getInstance(TransportClientNodesService.class);
proxy = injector.getInstance(TransportProxyClient.class); // 代理类, 真正的完成execute工作的人
}
nodeService是节点操作类, proxy是代理类,是具体的执行execute方法操作的类.因此看看TransportProxyClient
2 TransportProxyClient
@Inject
public TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, Map<String, GenericAction> actions) {
this.nodesService = nodesService;
MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();
for (GenericAction action : actions.values()) {
if (action instanceof Action) {
actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.proxies = actionsBuilder.immutableMap();
}
TransportProxyClient使用的是构造器注入, TransportProxyClient 此处是构造器注入, Settings是SettingsModule 绑定的, TransportService 是 TransportModule绑定的, TransportClientNodesService 是 ClientTransportModule绑定的, Map<String, GenericAction> actions是ActionModule绑定生成的. 其中里面的proxies变量比较重要,他其实是一个Map, key是Action, value是TransportActionNodeProxy, 那么这些Action是什么东西呢?
我们知道一开始初始化TransportClient 的时候绑定了很多module其中有一个是ActionModule, 它的configure函数内容如下:
protected void configure() {
Multibinder<ActionFilter> actionFilterMultibinder = Multibinder.newSetBinder(binder(), ActionFilter.class);
for (Class<? extends ActionFilter> actionFilter : actionFilters) {
actionFilterMultibinder.addBinding().to(actionFilter);
}
bind(ActionFilters.class).asEagerSingleton();
bind(AutoCreateIndex.class).asEagerSingleton();
bind(DestructiveOperations.class).asEagerSingleton();
registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);
registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
registerAction(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
registerAction(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
registerAction(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
registerAction(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
registerAction(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
registerAction(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
registerAction(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);
registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
registerAction(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
registerAction(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
registerAction(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
registerAction(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
registerAction(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class);
registerAction(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
registerAction(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
registerAction(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
registerAction(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
registerAction(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
registerAction(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
registerAction(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
registerAction(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
registerAction(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
registerAction(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
registerAction(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
registerAction(PutWarmerAction.INSTANCE, TransportPutWarmerAction.class);
registerAction(DeleteWarmerAction.INSTANCE, TransportDeleteWarmerAction.class);
registerAction(GetWarmersAction.INSTANCE, TransportGetWarmersAction.class);
registerAction(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
registerAction(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
registerAction(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);
registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
registerAction(GetAction.INSTANCE, TransportGetAction.class);
registerAction(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class,
TransportDfsOnlyAction.class);
registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
TransportShardMultiTermsVectorAction.class);
registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);
registerAction(ExistsAction.INSTANCE, TransportExistsAction.class);
registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);
registerAction(UpdateAction.INSTANCE, TransportUpdateAction.class);
registerAction(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
TransportShardMultiGetAction.class);
registerAction(BulkAction.INSTANCE, TransportBulkAction.class,
TransportShardBulkAction.class);
registerAction(SearchAction.INSTANCE, TransportSearchAction.class);
registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
registerAction(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
registerAction(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
registerAction(RenderSearchTemplateAction.INSTANCE, TransportRenderSearchTemplateAction.class);
//Indexed scripts
registerAction(PutIndexedScriptAction.INSTANCE, TransportPutIndexedScriptAction.class);
registerAction(GetIndexedScriptAction.INSTANCE, TransportGetIndexedScriptAction.class);
registerAction(DeleteIndexedScriptAction.INSTANCE, TransportDeleteIndexedScriptAction.class);
registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);
// register Name -> GenericAction Map that can be injected to instances.
MapBinder<String, GenericAction> actionsBinder
= MapBinder.newMapBinder(binder(), String.class, GenericAction.class);
for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
}
//.......省略
}
public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction, Class... supportTransportActions) {
actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
}
上面ActionModule 通过registerAction方法注册Action, 并加入到actions这个Map函数中, 然后通过MapBinder<String, GenericAction> 将数据注入其他实例,一次初始化TransportProxyClient的时候即可使用此MapBinder<String, GenericAction>
// register Name -> GenericAction Map that can be injected to instances.
MapBinder<String, GenericAction> actionsBinder
= MapBinder.newMapBinder(binder(), String.class, GenericAction.class);
for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
}
3 Request操作流程
经常会使用ES的增删改查以及Admin相关操作, 实际上最终都会经过代理类TransportProxyClient进行执行. 这里以Get操作来说明一下, 其他的Request类似.
首先放一个Get的demo代码:
public class App
{
public static void main(String[] args) {
create();
}
public static void create(){
Client client = getClient();
getInfo(client);
//getIndice(client);
}
private static void getInfo(Client client) {
GetRequestBuilder getRequestBuilder = client.prepareGet("face_fixedperson", "Fixedperson", "126tc");
GetResponse getFields = getRequestBuilder.execute().actionGet();
String sourceAsString = getFields.getSourceAsString();
System.out.println("-----" + sourceAsString);
}
private static void getIndice(Client client) {
//Index index = new Index();
AdminClient admin = client.admin();
IndicesAdminClient indices = admin.indices();
ListenableActionFuture<GetIndexResponse> execute = indices.prepareGetIndex().execute();
GetIndexResponse getIndexResponse = execute.actionGet();
String[] indices1 = getIndexResponse.getIndices();
for (String in : indices1) {
System.out.println(in);
}
}
private static Client getClient(){
Settings settings = Settings.builder().put("cluster.name", "SERVICE-ELASTICSEARCH-aae03d413f4744298859cd4245e4eda5")
.put("client.transport.ping_timeout", "1200s").build();
Client client = null;
try {
client = TransportClient.builder().settings(settings).build().addTransportAddress
(new InetSocketTransportAddress(InetAddress.getByName("10.3.70.101"), 9300));
} catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
}
}
当执行getRequestBuilder.execute()的时候, 会先转到ActionRequestBuilder的
public void execute(ActionListener<Response> listener) {
client.execute(action, beforeExecute(request), listener);
}
然后转到了AbstractClient的
public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
doExecute(action, request, listener);
}
其中的doExecute实际上是TransportClient的doExecute, 其内部实际上就是通过TransportProxyClient来实现进行具体的execute操作的.
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
proxy.execute(action, request, listener);
}
继续看看TransportProxyClient的execute方法:
public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) {
final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {
@Override
public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
proxy.execute(node, request, listener);
}
}, listener);
}
TransportActionNodeProxy<Request, Response> proxy = proxies.get(action); 这一步根据Action来获取具体的TransportActionNodeProxy类, 具体可参见TransportProxyClient的构造那部分.
接着nodesService.execute方法实际上就是从可用的Node中随机选择出一个node,然后执行proxy.execute(node, request, listener)方法, 进而将请求发给此Node.
大题来说, ES一开始初始化TransportClient的时候会绑定多种多样的Module, 然后在初始化TransportClient的时候同时会初始化代理类TransportProxyClient, 当通过TransportClient来提交Request请求的时候吧, 会交由代理类来真正的执行, 代理类会从可用节点中选择一个节点然后发送请求到此节点上. 这就是ES操作的前奏, 至于具体的操作细节则后面在写.