Flume自定义source

在了解到flume的底层实现原理之后,不仅可以根据flume自身提供的API,实现Source的定义,还可以根据项目的实际需求,编写自己的Source,比如Source可以是从网络上下载一个文件,亦或者是从数据库中查询数据,总之都能灵活实现自己的需求!

每个Source生成事件,并为了source转发事件到channel处理器中,source每次生成一个事件,调用channel处理器的processEvent方法将事件写入到channel处理器中,或者使用sprocessEventBatch方法发送事件;processEvent只为了一个事务创建事物,可能导致严重的开销,影响channel的性能,而sprocessEventBatch则是一次性处理一批事件,每次调动一个channel事物,然后批次提交。所以就性能而言应该是选择sprocessEventBatch方法进行自定义source。

自定义Source
自定义的消息有两种类型的Source,PollableSource (轮训拉取)与EventDrivenSource (事件驱动),两者的区别在于PollableSource是通过线程不断去调用process方法,主动拉取消息,而EventDrivenSource是需要触发一个调用机制,即被动等待。在利用PollableSource实现自定义Source时还需要实现Configurable接口,以便在项目中初始化某些配置用的,定义的Source如下:

import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Random;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;

public class MySource extends AbstractSource implements Configurable, PollableSource {

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    @Override
    public Status process() throws EventDeliveryException {
        Random random = new Random();
        int randomNum = random.nextInt(100);
        String text = "Hello World :" + random.nextInt(100);  //实际需要传的内容
        HashMap<String, String> header = new HashMap<String, String>();
        header.put("id", Integer.toString(randomNum));   //将id--value放入到header中
        this.getChannelProcessor()
                .processEventBatch(EventBuilder.withBody(text, Charset.forName("UTF-8"), header)); //prcessEvent()将数据传上去

        return Status.READY;
    }

    @Override
    public void configure(Context arg0) {

    }

}

自定义的Source比较简单,就是实现了PollableSource,然后在process方法中写我们的逻辑,输出Hello World,并发送给Channel,Source的主要目的就是将数据发送到Channel,到此Source的简单事情就做完了。 configure方法主要是放置一些配置信息,进行初始化一次就可以了,比如我们有些项目需要加载properties、数据库的连接等等 process方法会由PollingRunner线程去不断的调用执行,不理解原理的可以看第二篇文章,执行完后通过getChannelProcess的processEvent方法将我们的数据转换为flume的Event发送到Channel,这个过程在flume启动后会不断去执行。

Flume 构建高可用、可扩展的海量日志采集系统 PDF 下载
链接:http://pan.baidu.com/s/1mipvHby 密码:s56h

参考:
如果你还没看过Flume-ng源码解析系列中的启动流程、Source组件、Channel组件和Sink组件,可以点击下面链接:
Flume-ng源码解析之启动流程
Flume-ng源码解析之Source组件
Flume-ng源码解析之Channel组件
Flume-ng源码解析之Sink组件

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 博客原文 翻译作品,水平有限,如有错误,烦请留言指正。原文请见 官网英文文档 引言 概述 Apache Flume...
    rabbitGYK阅读 13,940评论 13 34
  • 1、通过CocoaPods安装项目名称项目信息 AFNetworking网络请求组件 FMDB本地数据库组件 SD...
    阳明AI阅读 16,040评论 3 119
  • 介绍 概述 Apache Flume是为有效收集聚合和移动大量来自不同源到中心数据存储而设计的可分布,可靠的,可用...
    ximengchj阅读 8,881评论 0 13
  • 我叫简娜,我很不幸因为5岁的疾病我一直停留在九十多厘米。同时我又是幸运的。因为我有爱我的父母,她们为了给我全部的爱...
    莫晓楠阅读 4,181评论 8 1
  • 贴心的交互是关心用户的 这句话可以理解成男朋友总是需要记住女朋友的生日,爱好,习惯,好的交互也是一样,需要记住用户...
    IAnUEdesign阅读 3,730评论 0 1