ES 操作之批量写-BulkProcessor 原理浅析

最近对线上业务进行重构,涉及到ES同步这一块,在重构过程中,为了ES 写入 性能考虑,大量的采取了 bulk的方式,来保证整体的一个同步速率,针对BulkProcessor 来深入一下,了解下 是如何实现,基于请求数,请求数据量大小 和 固定时间,刷新写入ES 的原理

针对ES 批量写入, 提供了3种方式,在 high-rest-client 中
分别是 bulk bulkAsync bulkProcessor 3种方式。
本文主要针对 bulkProcessor 来进行一些讲述

BulkProcessor

文档介绍


BulkProcessor是一个线程安全的批量处理类,允许方便地设置 刷新 一个新的批量请求 
(基于数量的动作,根据大小,或时间),
容易控制并发批量的数量
请求允许并行执行。

创建流程

How To use ?

来看个demo 创建BulkProcessor

 @Bean(name = "bulkProcessor") // 可以封装为一个bean,非常方便其余地方来进行 写入 操作
  public BulkProcessor bulkProcessor(){

        BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                (request, bulkListener) -> Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

        return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest request) {
                    // todo do something
                int i = request.numberOfActions();
                log.error("ES 同步数量{}",i);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                        // todo do something
                Iterator<BulkItemResponse> iterator = response.iterator();
                while (iterator.hasNext()){
                    System.out.println(JSON.toJSONString(iterator.next()));
                }
            }

            @Override
            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                                // todo do something
                log.error("写入ES 重新消费");
            }
        }).setBulkActions(1000) //  达到刷新的条数
                .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB)) // 达到 刷新的大小
                .setFlushInterval(TimeValue.timeValueSeconds(5)) // 固定刷新的时间频率
                .setConcurrentRequests(1) //并发线程数
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) // 重试补偿策略
                .build(); 

    }

使用BulkProcessor

bulkProcessor.add(xxxRequest)

创建过程做了些什么?

  1. 创建一个consumer 对象用来封装传递参数,和请求操作

      BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
                    (request, bulkListener) -> Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
    

    我们可以看到用了java 8的函数式编程接口 BiConsumer 关于 BiConsumer 的用法,可以自行百度,因为也是采取的 异步刷新策略, 所以,是一个返回结果的Listener ActionListener<BulkResponse>

  2. 构建并BulkProcess

     return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() {
             ****
             
            }).setBulkActions(1000)
                    .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.MB))
                    .setFlushInterval(TimeValue.timeValueSeconds(5))
                    .setConcurrentRequests(1)
                  .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                    .build();
    
        }
    

    可以很清楚的看到,在build 操作中,我们看到,在build 中,除了 之前定义的consumer,还实现了一个 Listener 接口 (稍后会具体讲到),用来做一些 在批量求情之前和请求之后的处理。

至此为止,BulkProcessor 创建,就OK啦~。

内部逻辑实现

先不说话,我们先上张类图

image

可以看到,在 BulkProcessor 中,有这样的一些类和接口

Listener
Builder
BulkProcessor
Flush
===== 华丽的分界线
BulkProcessor 实现了 Closeable --> 继承自  AutoCloseable (关于AutoCloseable 本文不做过多说明,具体的可以百度,或者等待后续)

那么先从构建开始,我们来看下Builder


/**
* 简单的构建,可以看到,就是一个client 和 listener 这个不会做刷新策略,
*/
public static Builder builder(Client client, Listener listener) {
        Objects.requireNonNull(client, "client");
        Objects.requireNonNull(listener, "listener");
        return new Builder(client::bulk, listener, client.threadPool(), () -> {});
    }



/**
* 所有功能的builder 实现方法
* ScheduledThreadPoolExecutor 用来实现 按照时间频率,来进行 刷新,如 每5s 
*
*/
    public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
        Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY); // 接口静态方式,来实现 Executor 的初始化
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS), //
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }


/**
* 构造函数 
* @param consumer 前文定义的consumer request response action
* @param listener listener  BulkProcessor 内置监听器
* @param scheduler elastic 定时调度 类scheduler  
* @paran onClose 关闭时候的运行
*/

    private Builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener,
                        Scheduler scheduler, Runnable onClose) {
            this.consumer = consumer;
            this.listener = listener;
            this.scheduler = scheduler;
            this.onClose = onClose;
        }

通过上述的代码片段,可以很明显的看到,关于初始化构建的一些关键点和要素

看完builder 接下来,我们看下 bulkprocessor 是如何工作的

先看下构造方法

   
   BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests); // BulkRequestHandler 批量执行 handler 操作
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler); //开始刷新任务
        this.onClose = onClose;
    }

startFlushTask 如何进行工作

 private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
   
   // 如果 按照时间刷新 为空,则直接返回 任务为取消状态
   if (flushInterval == null) {
            return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {
                    return true;
                }
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }


// 刷新线程
class Flush implements Runnable {

        @Override
        public void run() {
            synchronized (BulkProcessor.this) {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute(); // 下面方法
            }
        }
    }



/**
*  刷新执行
*
*/

 // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();
                // 刷新 bulkRequest 为下一批做准备
        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

看到这里,关于时间的定时调度,我们其实是很清楚了,那么 关于数据量 和 大小的判断策略在哪儿?

   
/**
* 各种添加操作
*/
public BulkProcessor add(DocWriteRequest request, @Nullable Object payload) {
        internalAdd(request, payload);
        return this;
    }

    /**
    * 我们可以看到,在添加之后,会做一个操作
    * executeIfNeeded 如果需要,则进行执行
    */
    private synchronized void internalAdd(DocWriteRequest request, @Nullable Object payload) {
        ensureOpen();
        bulkRequest.add(request, payload);
        executeIfNeeded();
    }


/**
* 如果超过限制,则执行刷新操作
*/
 private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }


    /**
    * 这这儿,我们终于看到了 关于action 和 size 的判断操作,
    *
    */
    private boolean isOverTheLimit() {
        if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
            return true;
        }
        if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
            return true;
        }
        return false;
    }

通过上述的分析,关于按照时间,数据size,大小来进行flush 执行的入口我们都已经很清楚了

针对数据大小的设置。在每次添加的时候,做判断是否 超过限制
针对 时间的频次控制,交由ScheduledThreadPoolExecutor 来去做监控

下来,让我们看下具体的执行以及重试策略,和 返回值的处理

  public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
          // listener 填充 request 和执行ID
            listener.beforeBulk(executionId, bulkRequest);
            //通过信号量来进行资源的控制 来自于我们设置的 setConcurrentRequests
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
          // 进行执行并按照补偿重试策略如果失败
            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                            //结果写入  ActionListener --> BulkProcessor.Listener 的转换
              @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }
            }, Settings.EMPTY);
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {
                latch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {
            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                toRelease.run();
            }
        }
    }

最终的执行,在 RetryHandler 中,继续往下看

    public void execute(BulkRequest bulkRequest) {
            this.currentBulkRequest = bulkRequest;
            consumer.accept(bulkRequest, this);
        }

对,没错,只有一个操作, consumer.accept(bulkRequest, this);

再一次展现了 java 8 函数式接口的功能强大之处 此处 consumer.accept(bulkRequest, this);

执行的操作即 Es6XServiceImpl.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

如何设置重试策略,以及数据的筛选

 @Override
        public void onResponse(BulkResponse bulkItemResponses) {
            if (!bulkItemResponses.hasFailures()) {
                // we're done here, include all responses
                addResponses(bulkItemResponses, (r -> true));
                finishHim();
            } else {
                if (canRetry(bulkItemResponses)) {
                    addResponses(bulkItemResponses, (r -> !r.isFailed()));
                    retry(createBulkRequestForRetry(bulkItemResponses));
                } else {
                    addResponses(bulkItemResponses, (r -> true));
                    finishHim();
                }
            }
        }


/**
* 只针对失败的请求,放入到重试策略中
*/
   private void addResponses(BulkResponse response, Predicate<BulkItemResponse> filter) {
            for (BulkItemResponse bulkItemResponse : response) {
                if (filter.test(bulkItemResponse)) {
                    // Use client-side lock here to avoid visibility issues. This method may be called multiple times
                    // (based on how many retries we have to issue) and relying that the response handling code will be
                    // scheduled on the same thread is fragile.
                    synchronized (responses) {
                        responses.add(bulkItemResponse);
                    }
                }
            }
        }

再一次展示了函数式接口的强大之处

补充一张流程图

BulkProcessor.png

对这次的代码一些了解,对其中的一些设计理念和模式,学习到了不少的知识

如consumer, Listener,模式的应用。

如consumer ,Predicate 等函数式接口的用法。

如对线程的统一封装,来去做更统一的抽象处理

在后续的代码设计中,也会尝试采取这种设计模式,来写出更优雅的代码。es java api的代码内的不少设计模式,自我感觉还是很nice,后续会继续学习

仅代表个人见解,如有不对之出,欢迎指出

个人原创,转载请备明出处

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容

  • 对于开发人员来说,设计模式有时候就是一道坎,但是设计模式又非常有用,过了这道坎,它可以让你水平提高一个档次。而在a...
    WANKUN阅读 256评论 0 2
  • 对象的创建与销毁 Item 1: 使用static工厂方法,而不是构造函数创建对象:仅仅是创建对象的方法,并非Fa...
    孙小磊阅读 1,973评论 0 3
  • 说在前头~ 看完能动动小手点个心么?由衷感谢。 对于开发人员来说,设计模式有时候就是一道坎,但是设计模式又非常有...
    S_ZY阅读 3,138评论 3 60
  • 转载自:https://xiaobailong24.me/2017/03/18/Android-RxJava2.x...
    Young1657阅读 2,016评论 1 9
  • 你总是自我感觉良好 以为全世界都在为你骄傲 毕竟你拥有着无与伦比的美貌 这个世界那片白云,都在为你倾倒 可是你永远...
    幻梦邪魂阅读 306评论 8 3