01.flume source的实现方式以及PollableSource的process方法返回值READY和BACKOFF区别

前言

        之前公司有一个需求要实现把rocketmq和rabbitmq中的数据对接到kafka中,虽然flume中已经提供了很多种类的source,但是没有提供rocketmq和rabbitmq的相关source,因此就调研了一下flume source插件如何开发。经过调研之后发现实现flume source有两种方式,一种是实现EventDrivenSource,一种是实现PollableSource,这两种有什么区别呢?

        EventDrivenSource:是事件驱动型的source,意思就是源有了数据之后,会主动的推送给我们的source。Rabbitmq就是这样的机制,因此开发rabbitmq的source需要实现EventDrivenSource

        PollableSource:是source主动去源服务器拉取机制。Rocketmq、kafka是这种机制。因此开发rocketmq需要实现PollableSource

正文

        上面整体上介绍了开发flume source有几种实现方式,下面我们讨论本篇博客的知识点,我们实现PollableSource类型的source时,需要实现对应的方法如下图

1

 process:该方法要实现的从rocketmq拉取消息的,再有数据的情况下返回Status. READY,再消费完robketmq topic的数据,没有新数据进到topic的时候返回Status. BACKOFF,这样可以使当前线程sleep一定的时间,不至于让当前线程一直空转,浪费cpu的资源。

具体应该sleep多长时间呢,是由下面两个方法getBackOffSleepIncrement和getMaxBackOffSleepInterval一块儿完成控制,到底是如何控制的呢,我们通过源码分析一下,源码如下图

2

从以上红框中的代码可以清楚的看出,两个方法所起的作用,counterGroup.incrementAndGet("runner.backoffs.consecutive")相当于一个累加器,累加调用process方法空转的次数,source.getBackOffSleepIncrement()是返回每次空转之后,需要累加的sleep的值,source.getMaxBackOffSleepInterval()是sleep的最大值,当累加sleep时间超过最大值是,就按照最大值sleep,如果没有超过最大值,就按照累加sleep进行sleep,当有数据进来时,就重新开始计算累加值,把counterGroup.incrementAndGet("runner.backoffs.consecutive")清空

以上,我们就介绍完了,Status. READY和Status. BACKOFF的区别,并且介绍了时如何控制当前先sleep的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。