前言
之前公司有一个需求要实现把rocketmq和rabbitmq中的数据对接到kafka中,虽然flume中已经提供了很多种类的source,但是没有提供rocketmq和rabbitmq的相关source,因此就调研了一下flume source插件如何开发。经过调研之后发现实现flume source有两种方式,一种是实现EventDrivenSource,一种是实现PollableSource,这两种有什么区别呢?
EventDrivenSource:是事件驱动型的source,意思就是源有了数据之后,会主动的推送给我们的source。Rabbitmq就是这样的机制,因此开发rabbitmq的source需要实现EventDrivenSource
PollableSource:是source主动去源服务器拉取机制。Rocketmq、kafka是这种机制。因此开发rocketmq需要实现PollableSource
正文
上面整体上介绍了开发flume source有几种实现方式,下面我们讨论本篇博客的知识点,我们实现PollableSource类型的source时,需要实现对应的方法如下图
process:该方法要实现的从rocketmq拉取消息的,再有数据的情况下返回Status. READY,再消费完robketmq topic的数据,没有新数据进到topic的时候返回Status. BACKOFF,这样可以使当前线程sleep一定的时间,不至于让当前线程一直空转,浪费cpu的资源。
具体应该sleep多长时间呢,是由下面两个方法getBackOffSleepIncrement和getMaxBackOffSleepInterval一块儿完成控制,到底是如何控制的呢,我们通过源码分析一下,源码如下图
从以上红框中的代码可以清楚的看出,两个方法所起的作用,counterGroup.incrementAndGet("runner.backoffs.consecutive")相当于一个累加器,累加调用process方法空转的次数,source.getBackOffSleepIncrement()是返回每次空转之后,需要累加的sleep的值,source.getMaxBackOffSleepInterval()是sleep的最大值,当累加sleep时间超过最大值是,就按照最大值sleep,如果没有超过最大值,就按照累加sleep进行sleep,当有数据进来时,就重新开始计算累加值,把counterGroup.incrementAndGet("runner.backoffs.consecutive")清空
以上,我们就介绍完了,Status. READY和Status. BACKOFF的区别,并且介绍了时如何控制当前先sleep的。