在了解到flume的底层实现原理之后,不仅可以根据flume自身提供的API,实现Source的定义,还可以根据项目的实际需求,编写自己的Source,比如Source可以是从网络上下载一个文件,亦或者是从数据库中查询数据,总之都能灵活实现自己的需求!
每个Source生成事件,并为了source转发事件到channel处理器中,source每次生成一个事件,调用channel处理器的processEvent方法将事件写入到channel处理器中,或者使用sprocessEventBatch方法发送事件;processEvent只为了一个事务创建事物,可能导致严重的开销,影响channel的性能,而sprocessEventBatch则是一次性处理一批事件,每次调动一个channel事物,然后批次提交。所以就性能而言应该是选择sprocessEventBatch方法进行自定义source。
自定义Source
自定义的消息有两种类型的Source,PollableSource (轮训拉取)与EventDrivenSource (事件驱动),两者的区别在于PollableSource是通过线程不断去调用process方法,主动拉取消息,而EventDrivenSource是需要触发一个调用机制,即被动等待。在利用PollableSource实现自定义Source时还需要实现Configurable接口,以便在项目中初始化某些配置用的,定义的Source如下:
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Random;
import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
public class MySource extends AbstractSource implements Configurable, PollableSource {
@Override
public long getBackOffSleepIncrement() {
return 0;
}
@Override
public long getMaxBackOffSleepInterval() {
return 0;
}
@Override
public Status process() throws EventDeliveryException {
Random random = new Random();
int randomNum = random.nextInt(100);
String text = "Hello World :" + random.nextInt(100); //实际需要传的内容
HashMap<String, String> header = new HashMap<String, String>();
header.put("id", Integer.toString(randomNum)); //将id--value放入到header中
this.getChannelProcessor()
.processEventBatch(EventBuilder.withBody(text, Charset.forName("UTF-8"), header)); //prcessEvent()将数据传上去
return Status.READY;
}
@Override
public void configure(Context arg0) {
}
}
自定义的Source比较简单,就是实现了PollableSource,然后在process方法中写我们的逻辑,输出Hello World,并发送给Channel,Source的主要目的就是将数据发送到Channel,到此Source的简单事情就做完了。 configure方法主要是放置一些配置信息,进行初始化一次就可以了,比如我们有些项目需要加载properties、数据库的连接等等 process方法会由PollingRunner线程去不断的调用执行,不理解原理的可以看第二篇文章,执行完后通过getChannelProcess的processEvent方法将我们的数据转换为flume的Event发送到Channel,这个过程在flume启动后会不断去执行。
Flume 构建高可用、可扩展的海量日志采集系统 PDF 下载
链接:http://pan.baidu.com/s/1mipvHby 密码:s56h
参考:
如果你还没看过Flume-ng源码解析系列中的启动流程、Source组件、Channel组件和Sink组件,可以点击下面链接:
Flume-ng源码解析之启动流程
Flume-ng源码解析之Source组件
Flume-ng源码解析之Channel组件
Flume-ng源码解析之Sink组件