参考连接:
es插件开发教程
IDEA debug es插件
官方插件开发指导
插件开发:
1.拦截修改request以及response插件:
主类:EsPlugin:配置参数,拦截器以及模块注入
package xpackPlugin;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import xpackPlugin.EsModule;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class EsPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
public SetOnce<TcpAuditActionFilter> TcpAuditActionFilter=new SetOnce();
public Settings settings;
public static Client client;
public EsPlugin(Settings settings){
super();
this.settings = settings;
}
public List<ActionFilter> getActionFilters() {
//接口实现类可以向上转型为接口
//添加拦截器
List<ActionFilter> filters = new ArrayList();
TcpAuditActionFilter.set(new TcpAuditActionFilter());
filters.add(TcpAuditActionFilter.get());
return filters;
}
@Override
public Collection<Module> createGuiceModules() {
//注入插件
List<Module> modules = new ArrayList();
modules.add(new EsModule());
return modules;
}
@Override
public List<Setting<?>> getSettings() {
//添加yml配置文件字段
ArrayList<Setting<?>> settings = new ArrayList();
settings.addAll(super.getSettings());
//settings.add(Setting.simpleString("zk.ip",new Setting.Property[]{Setting.Property.NodeScope}));
return settings;
}
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
this.client=client;
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry);
}
}
请求拦截类TcpAuditActionFilter :拦截request以及response并对其进行修改
package xpackPlugin;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.security.action.user.PutUserRequest;
import java.io.IOException;
public class TcpAuditActionFilter implements ActionFilter {
public int order() {
return Integer.MAX_VALUE;
}
public <Request extends ActionRequest, Response extends ActionResponse> void apply(
Task task,
String s,
Request request,
ActionListener<Response> actionListener,
ActionFilterChain<Request, Response> actionFilterChain) {
ActionListener<Response> myActionListener = new AnotherActionListener(actionListener,
request, System.currentTimeMillis());
//对各种请求进行分类处理,之后发回es
System.out.println(request.toString());
if (request instanceof PutUserRequest) {
PutUserRequest temp=new PutUserRequest();
actionFilterChain.proceed(task, s, (Request)temp, myActionListener);
} else {
actionFilterChain.proceed(task, s, request, myActionListener);
}
}
class AnotherActionListener<Response extends ActionResponse, Request extends ActionRequest> implements ActionListener<Response> {
private ActionListener<Response> actionListener;
private Request request;
private long startTime;
public AnotherActionListener(ActionListener<Response> actionListener, Request request, long startTime) {
this.actionListener = actionListener;
this.request = request;
this.startTime = startTime;
}
public void onResponse(Response response) {
//对es的响应进行分类和更改,部分response不支持构造函数,需要看源码查找构造方法
if (response instanceof GetIndexResponse) {
GetIndexResponse temp= null;
try {
temp = GetIndexResponse.fromXContent(null);
} catch (IOException e) {
e.printStackTrace();
}
actionListener.onResponse((Response)temp);
} {
actionListener.onResponse(response);
}
}
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
}
}
模块注入类EsMoudle:将插件拦截注入到es
package xpackPlugin;
import org.elasticsearch.common.inject.AbstractModule;
public class EsModule extends AbstractModule {
protected void configure() {
bind(TcpAuditActionFilter.class).asEagerSingleton();
}
}
到此,拦截请求类插件代码开发完成,下面介绍自定义request handler类插件发开
2.自定义request请求插件开发:
可以自己定义一些请求,响应可以自定义也可以把请求改写发到es进行操作
插件主类SciPlugin:定义插件主类
package sciPlugin;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.XPackPlugin;
import java.nio.file.Path;
import java.util.List;
import java.util.function.Supplier;
import static java.util.Collections.singletonList;
public class SciPlugin extends Plugin implements ActionPlugin {
public SciPlugin(Settings settings, Path configPath) {
super();
}
@Override
public List<RestHandler> getRestHandlers(final Settings settings,
final RestController restController,
final ClusterSettings clusterSettings,
final IndexScopedSettings indexScopedSettings,
final SettingsFilter settingsFilter,
final IndexNameExpressionResolver indexNameExpressionResolver,
final Supplier<DiscoveryNodes> nodesInCluster) {
return singletonList(new SciHandler(settings, restController));
}
}
请求处理类SciHandler:定义请求以及响应结果
package sciPlugin;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.rest.XPackRestHandler;
import org.elasticsearch.xpack.core.security.action.role.PutRoleRequest;
import java.io.IOException;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class SciHandler extends BaseRestHandler {
protected SciHandler(Settings settings,final RestController controller) {
super(settings);
controller.registerHandler(POST, "/user", this);
}
@Override
public String getName() {
return "sciplugin";
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException {
BytesReference a = restRequest.requiredContent();
XContentType b = restRequest.getXContentType();
String c = XContentHelper.convertToJson(a, false, false, b);
PutRoleRequest putRoleRequest=new PutRoleRequest();
putRoleRequest.name("caster");
return (channel) -> {
new XPackClient(nodeClient).security().putRole(putRoleRequest,new RestToXContentListener(channel));
// new XPackClient(nodeClient).security().putUser(putRoleRequest,new RestToXContentListener(channel));
};
}
}
其中controller.registerHandler(POST, "/user", this)为注册请求头,可以定义多种类型请求头,也可以在里面加入传参,建议开发时参考es源码插件进行开发,例如设置mapping类:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.elasticsearch.rest.action.admin.indices;
import java.io.IOException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer;
import org.elasticsearch.rest.RestRequest.Method;
import org.elasticsearch.rest.action.RestToXContentListener;
public class RestPutMappingAction extends BaseRestHandler {
private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(RestPutMappingAction.class));
public RestPutMappingAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(Method.PUT, "/{index}/_mapping/", this);
controller.registerHandler(Method.PUT, "/{index}/{type}/_mapping", this);
controller.registerHandler(Method.PUT, "/{index}/_mapping/{type}", this);
controller.registerHandler(Method.PUT, "/_mapping/{type}", this);
controller.registerHandler(Method.POST, "/{index}/_mapping/", this);
controller.registerHandler(Method.POST, "/{index}/{type}/_mapping", this);
controller.registerHandler(Method.POST, "/{index}/_mapping/{type}", this);
controller.registerHandler(Method.POST, "/_mapping/{type}", this);
controller.registerHandler(Method.PUT, "/{index}/_mappings/", this);
controller.registerHandler(Method.PUT, "/{index}/{type}/_mappings", this);
controller.registerHandler(Method.PUT, "/{index}/_mappings/{type}", this);
controller.registerHandler(Method.PUT, "/_mappings/{type}", this);
controller.registerHandler(Method.POST, "/{index}/_mappings/", this);
controller.registerHandler(Method.POST, "/{index}/{type}/_mappings", this);
controller.registerHandler(Method.POST, "/{index}/_mappings/{type}", this);
controller.registerHandler(Method.POST, "/_mappings/{type}", this);
}
public String getName() {
return "put_mapping_action";
}
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
PutMappingRequest putMappingRequest = Requests.putMappingRequest(Strings.splitStringByCommaToArray(request.param("index")));
putMappingRequest.type(request.param("type"));
putMappingRequest.source(request.requiredContent(), request.getXContentType());
if (request.hasParam("update_all_types")) {
DEPRECATION_LOGGER.deprecated("[update_all_types] is deprecated since indices may not have more than one type anymore", new Object[0]);
}
putMappingRequest.updateAllTypes(request.paramAsBoolean("update_all_types", false));
putMappingRequest.timeout(request.paramAsTime("timeout", putMappingRequest.timeout()));
putMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putMappingRequest.masterNodeTimeout()));
putMappingRequest.indicesOptions(IndicesOptions.fromRequest(request, putMappingRequest.indicesOptions()));
return (channel) -> {
client.admin().indices().putMapping(putMappingRequest, new RestToXContentListener(channel));
};
}
}
3.自定义插件的环境配置与部署:
1.IDEA开发环境:
maven配置:如果不需要依赖xpack模块就不需要引入xpack的依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>es6.5plugin</groupId>
<artifactId>es6.5plugin</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>elasticsearch-releases</id>
<url>https://artifacts.elastic.co/maven</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.5.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.5.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>x-pack-transport</artifactId>
<version>6.5.4</version>
</dependency>
</dependencies>
</project>
打包:artifacts打包不需要包含依赖包
远程debug es调试插件代码:使用IDEA自带remote功能
Run->Rdit Configurations->左上角添加Remote->配置es集群的host和要debug监控的端口,如图:
2.es集群环境配置和插件部署
插件打包后放到es的plugins目录上下面:新建文件,里面包含源码jar包和插件说明
配置文件内容如下:
description=EsPlugin
version=1.0
name=EsPlugin
classname=sciPlugin.SciPlugin
java.version=1.8
elasticsearch.version=6.5.4
extended.plugins=x-pack-core
具体含义可参考文章头部es官方问题,需要注意插件如果依赖es原生组件,一定要填写在extended.plugins后,貌似会影响启动模块的顺序,导致找不到依赖。
最后配置es config目录下的jvm配置文件jvm.options:
添加: -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8888
这样启动es是以debug方式启动,启动后idea运行debug插件程序就可以拦截到es请求进行继续开发调试了
es内置modules其实就是plugin,存放目录和自定义插件不同,但是加载和配置文件相同,如图:
es启动的时候会根据plugin-descriptor.properties来按顺序启动各个模块,extended.plugins在这里会起作用,可以直接在插件中指定需要依赖的内置module减少依赖包(如:x-pack-core),有些module不允许依赖(会报错: cannot extend non-extensible plugin [percolator])。不同module,plugin之间如果没有依赖关系,可以在文件夹下放置同样的依赖包,并不会引起冲突。