es实战-插件开发

参考连接:

es插件开发教程
IDEA debug es插件
官方插件开发指导

插件开发:

1.拦截修改request以及response插件:

主类:EsPlugin:配置参数,拦截器以及模块注入

package xpackPlugin;

import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import xpackPlugin.EsModule;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

public class EsPlugin extends Plugin implements ActionPlugin, NetworkPlugin {
    public SetOnce<TcpAuditActionFilter> TcpAuditActionFilter=new SetOnce();
    public Settings settings;
    public static Client client;

    public EsPlugin(Settings settings){
        super();
        this.settings = settings;
    }

    public List<ActionFilter> getActionFilters() {
        //接口实现类可以向上转型为接口
        //添加拦截器
        List<ActionFilter> filters = new ArrayList();
        TcpAuditActionFilter.set(new TcpAuditActionFilter());
        filters.add(TcpAuditActionFilter.get());
        return filters;
    }

    @Override
    public Collection<Module> createGuiceModules() {
        //注入插件
        List<Module> modules = new ArrayList();
        modules.add(new EsModule());
        return  modules;
    }

    @Override
    public List<Setting<?>> getSettings() {
        //添加yml配置文件字段
        ArrayList<Setting<?>> settings = new ArrayList();
        settings.addAll(super.getSettings());
        //settings.add(Setting.simpleString("zk.ip",new Setting.Property[]{Setting.Property.NodeScope}));
        return settings;
    }

    @Override
    public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
        this.client=client;
        return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry);
    }
}

请求拦截类TcpAuditActionFilter :拦截request以及response并对其进行修改

package xpackPlugin;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilter;
import org.elasticsearch.action.support.ActionFilterChain;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.security.action.user.PutUserRequest;

import java.io.IOException;

public class TcpAuditActionFilter implements ActionFilter {
    public int order() {
        return Integer.MAX_VALUE;
    }

    public <Request extends ActionRequest, Response extends ActionResponse> void apply(
            Task task,
            String s,
            Request request,
            ActionListener<Response> actionListener,
            ActionFilterChain<Request, Response> actionFilterChain) {
        ActionListener<Response> myActionListener = new AnotherActionListener(actionListener,
                request, System.currentTimeMillis());
        //对各种请求进行分类处理,之后发回es
        System.out.println(request.toString());
        if (request instanceof PutUserRequest) {
            PutUserRequest temp=new PutUserRequest();
            actionFilterChain.proceed(task, s, (Request)temp, myActionListener);
        } else {
            actionFilterChain.proceed(task, s, request, myActionListener);
        }

    }

    class AnotherActionListener<Response extends ActionResponse, Request extends ActionRequest> implements ActionListener<Response> {
        private ActionListener<Response> actionListener;
        private Request request;
        private long startTime;

        public AnotherActionListener(ActionListener<Response> actionListener, Request request, long startTime) {
            this.actionListener = actionListener;
            this.request = request;
            this.startTime = startTime;
        }

        public void onResponse(Response response) {
            //对es的响应进行分类和更改,部分response不支持构造函数,需要看源码查找构造方法
            if (response instanceof GetIndexResponse) {
                GetIndexResponse temp= null;
                try {
                    temp = GetIndexResponse.fromXContent(null);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                actionListener.onResponse((Response)temp);
            } {
                actionListener.onResponse(response);
            }
        }

        public void onFailure(Exception e) {
            actionListener.onFailure(e);
        }

    }

}

模块注入类EsMoudle:将插件拦截注入到es

package xpackPlugin;

import org.elasticsearch.common.inject.AbstractModule;

public class EsModule extends AbstractModule {
    protected void configure() {
        bind(TcpAuditActionFilter.class).asEagerSingleton();
    }
}

到此,拦截请求类插件代码开发完成,下面介绍自定义request handler类插件发开

2.自定义request请求插件开发:

可以自己定义一些请求,响应可以自定义也可以把请求改写发到es进行操作

插件主类SciPlugin:定义插件主类

package sciPlugin;

import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
import org.elasticsearch.xpack.core.XPackPlugin;

import java.nio.file.Path;
import java.util.List;
import java.util.function.Supplier;

import static java.util.Collections.singletonList;

public class SciPlugin extends Plugin implements ActionPlugin {
    public SciPlugin(Settings settings, Path configPath) {
        super();
    }
    @Override
    public List<RestHandler> getRestHandlers(final Settings settings,
                                             final RestController restController,
                                             final ClusterSettings clusterSettings,
                                             final IndexScopedSettings indexScopedSettings,
                                             final SettingsFilter settingsFilter,
                                             final IndexNameExpressionResolver indexNameExpressionResolver,
                                             final Supplier<DiscoveryNodes> nodesInCluster) {

        return singletonList(new SciHandler(settings, restController));
    }
}

请求处理类SciHandler:定义请求以及响应结果

package sciPlugin;

import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.XPackClient;
import org.elasticsearch.xpack.core.rest.XPackRestHandler;
import org.elasticsearch.xpack.core.security.action.role.PutRoleRequest;

import java.io.IOException;

import static org.elasticsearch.rest.RestRequest.Method.POST;

public class SciHandler extends BaseRestHandler {
    protected SciHandler(Settings settings,final RestController controller) {
        super(settings);
        controller.registerHandler(POST, "/user", this);
    }

    @Override
    public String getName() {
        return "sciplugin";
    }

    @Override
    protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient nodeClient) throws IOException {
        BytesReference a = restRequest.requiredContent();
        XContentType b = restRequest.getXContentType();
        String c = XContentHelper.convertToJson(a, false, false, b);
        PutRoleRequest putRoleRequest=new PutRoleRequest();
        putRoleRequest.name("caster");
        return (channel) -> {
            new XPackClient(nodeClient).security().putRole(putRoleRequest,new RestToXContentListener(channel));
//            new XPackClient(nodeClient).security().putUser(putRoleRequest,new RestToXContentListener(channel));
        };
    }
}

其中controller.registerHandler(POST, "/user", this)为注册请求头,可以定义多种类型请求头,也可以在里面加入传参,建议开发时参考es源码插件进行开发,例如设置mapping类:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.elasticsearch.rest.action.admin.indices;

import java.io.IOException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer;
import org.elasticsearch.rest.RestRequest.Method;
import org.elasticsearch.rest.action.RestToXContentListener;

public class RestPutMappingAction extends BaseRestHandler {
    private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(Loggers.getLogger(RestPutMappingAction.class));

    public RestPutMappingAction(Settings settings, RestController controller) {
        super(settings);
        controller.registerHandler(Method.PUT, "/{index}/_mapping/", this);
        controller.registerHandler(Method.PUT, "/{index}/{type}/_mapping", this);
        controller.registerHandler(Method.PUT, "/{index}/_mapping/{type}", this);
        controller.registerHandler(Method.PUT, "/_mapping/{type}", this);
        controller.registerHandler(Method.POST, "/{index}/_mapping/", this);
        controller.registerHandler(Method.POST, "/{index}/{type}/_mapping", this);
        controller.registerHandler(Method.POST, "/{index}/_mapping/{type}", this);
        controller.registerHandler(Method.POST, "/_mapping/{type}", this);
        controller.registerHandler(Method.PUT, "/{index}/_mappings/", this);
        controller.registerHandler(Method.PUT, "/{index}/{type}/_mappings", this);
        controller.registerHandler(Method.PUT, "/{index}/_mappings/{type}", this);
        controller.registerHandler(Method.PUT, "/_mappings/{type}", this);
        controller.registerHandler(Method.POST, "/{index}/_mappings/", this);
        controller.registerHandler(Method.POST, "/{index}/{type}/_mappings", this);
        controller.registerHandler(Method.POST, "/{index}/_mappings/{type}", this);
        controller.registerHandler(Method.POST, "/_mappings/{type}", this);
    }

    public String getName() {
        return "put_mapping_action";
    }

    public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
        PutMappingRequest putMappingRequest = Requests.putMappingRequest(Strings.splitStringByCommaToArray(request.param("index")));
        putMappingRequest.type(request.param("type"));
        putMappingRequest.source(request.requiredContent(), request.getXContentType());
        if (request.hasParam("update_all_types")) {
            DEPRECATION_LOGGER.deprecated("[update_all_types] is deprecated since indices may not have more than one type anymore", new Object[0]);
        }

        putMappingRequest.updateAllTypes(request.paramAsBoolean("update_all_types", false));
        putMappingRequest.timeout(request.paramAsTime("timeout", putMappingRequest.timeout()));
        putMappingRequest.masterNodeTimeout(request.paramAsTime("master_timeout", putMappingRequest.masterNodeTimeout()));
        putMappingRequest.indicesOptions(IndicesOptions.fromRequest(request, putMappingRequest.indicesOptions()));
        return (channel) -> {
            client.admin().indices().putMapping(putMappingRequest, new RestToXContentListener(channel));
        };
    }
}

3.自定义插件的环境配置与部署:

1.IDEA开发环境:

maven配置:如果不需要依赖xpack模块就不需要引入xpack的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>es6.5plugin</groupId>
    <artifactId>es6.5plugin</artifactId>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>elasticsearch-releases</id>
            <url>https://artifacts.elastic.co/maven</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.5.4</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.5.4</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>x-pack-transport</artifactId>
            <version>6.5.4</version>
        </dependency>
    </dependencies>

</project>

打包:artifacts打包不需要包含依赖包
远程debug es调试插件代码:使用IDEA自带remote功能
Run->Rdit Configurations->左上角添加Remote->配置es集群的host和要debug监控的端口,如图:

image

2.es集群环境配置和插件部署

插件打包后放到es的plugins目录上下面:新建文件,里面包含源码jar包和插件说明


image

配置文件内容如下:

description=EsPlugin
version=1.0
name=EsPlugin
classname=sciPlugin.SciPlugin
java.version=1.8
elasticsearch.version=6.5.4
extended.plugins=x-pack-core

具体含义可参考文章头部es官方问题,需要注意插件如果依赖es原生组件,一定要填写在extended.plugins后,貌似会影响启动模块的顺序,导致找不到依赖。

最后配置es config目录下的jvm配置文件jvm.options:

添加: -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=8888
这样启动es是以debug方式启动,启动后idea运行debug插件程序就可以拦截到es请求进行继续开发调试了
es内置modules其实就是plugin,存放目录和自定义插件不同,但是加载和配置文件相同,如图:

image

es启动的时候会根据plugin-descriptor.properties来按顺序启动各个模块,extended.plugins在这里会起作用,可以直接在插件中指定需要依赖的内置module减少依赖包(如:x-pack-core),有些module不允许依赖(会报错: cannot extend non-extensible plugin [percolator])。不同module,plugin之间如果没有依赖关系,可以在文件夹下放置同样的依赖包,并不会引起冲突。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354

推荐阅读更多精彩内容

  • 久违的晴天,家长会。 家长大会开好到教室时,离放学已经没多少时间了。班主任说已经安排了三个家长分享经验。 放学铃声...
    飘雪儿5阅读 7,523评论 16 22
  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,564评论 0 11
  • 可爱进取,孤独成精。努力飞翔,天堂翱翔。战争美好,孤独进取。胆大飞翔,成就辉煌。努力进取,遥望,和谐家园。可爱游走...
    赵原野阅读 2,727评论 1 1
  • 在妖界我有个名头叫胡百晓,无论是何事,只要找到胡百晓即可有解决的办法。因为是只狐狸大家以讹传讹叫我“倾城百晓”,...
    猫九0110阅读 3,261评论 7 3