Flume的三要素(Source/Channel/Sink)、拦截器、选择器、Sink组

三要素(Source/Channel/Sink)

  • Source:负责接收数据到flume的组件

    • 1.Netcat:基于TCP端口的数据源接收器
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置Source部分
      a1.sources.r1.type = netcat
      a1.sources.r1.bind = 192.0.0.2
      a1.sources.r1.port = 44444
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # 绑定相关组件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
      • logger4j配置文件log4j.properties(控制台输出+文件输出)
      # 设置Logger的日志级别为INFO,同时增加两个日志输出项叫A1,A2.
      log4j.rootLogger=INFO, A1, A2
      
      # A1这个设置项被配置为控制台输出 ConsoleAppender.
      log4j.appender.A1=org.apache.log4j.ConsoleAppender
      
      # A1 输出项的输出格式.
      log4j.appender.A1.layout=org.apache.log4j.PatternLayout
      log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
      
      # A2 这个配置项的设置,文件输出
      log4j.appender.A2=org.apache.log4j.FileAppender
      
      # 设置日志的文件名
      log4j.appender.A2.File=./logs/log.out
      
      # 定义输出的日志格式
      log4j.appender.A2.layout=org.apache.log4j.PatternLayout
      log4j.appender.A2.layout.conversionPattern=%m%n
      
      • 启动程序
        flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example1.conf --conf /root/trainging/flume-1.9.0/conf/

      • 发送数据
        telnet 192.0.0.2 44444

    • 2.Exec:基于命令行标准输出来产生数据的数据源接收器
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置Source部分
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /root/data/flume/access.log
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # 绑定相关组件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
      • logger4j配置文件log4j.properties(控制台输出+文件输出):配置同上

      • 启动程序
        flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example2.conf --conf /root/trainging/flume-1.9.0/conf/

      • 发送数据
        echo hello >> /root/data/flume/access.log

    • 3.avro:高扩展的RPC数据源(最常用)
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置Source部分
      a1.sources.r1.type = avro
      a1.sources.r1.bind = 192.0.0.2
      a1.sources.r1.port = 44444
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100
      
      # 绑定相关组件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
      • logger4j配置文件log4j.properties(控制台输出+文件输出):配置同上

      • 启动程序
        flume-ng agent --name a1 --conf-file /root/trainging/flume-1.9.0/conf/example3.conf --conf /root/trainging/flume-1.9.0/conf/

      • 发送端Java代码

      package pkg01;
      
      import java.nio.charset.Charset;
      import org.apache.flume.Event;
      import org.apache.flume.EventDeliveryException;
      import org.apache.flume.api.RpcClient;
      import org.apache.flume.api.RpcClientFactory;
      import org.apache.flume.event.EventBuilder;
      
      public class class1 {
      
          public static void main(String[] args) throws EventDeliveryException {
      
              String ip = "192.0.0.2";
              int port = 44444;
      
              RpcClient client = RpcClientFactory.getDefaultInstance(ip, port); // Avro
      //      RpcClient client = RpcClientFactory.getThriftInstance(ip, port); // Thrift
              Event event = EventBuilder.withBody("hello flume!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!",
                      Charset.forName("UTF8"));
              client.append(event);
              client.close();
          }
      }
      
  • Channel:位于Source和Sink之间的缓冲块,允许Source和Sink运行在不同速率上

    • 1.MemoryChannel:建立在内存中的通道,数据存储在JVM的堆上
      • 允许数据少量丢失可使用内存通道,不允许数据丢失可使用文件通道
      • 内存通道支持事务特性,如下所示:


        image.png
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1
      
      # 配置Source部分
      a1.sources.r1.type = netcat
      a1.sources.r1.bind = 192.0.0.2
      a1.sources.r1.port = 44444
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      
      # 配置Channel部分
        # 设置内存通道
      a1.channels.c1.type = memory
        # 可以存最大10万个事件event
      a1.channels.c1.capacity = 100000
        # 每个事务可以存取最大100个事件event
      a1.channels.c1.transactionCapacity = 100
        # 内存通道大小为500MB
      a1.channels.c1.byteCapacity = 500000000
        # 其中10%存放头文件,500MB*10%
      a1.channels.c1.byteCapacityBufferPercentage = 10
      
      # 绑定相关组件
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
      
  • Sink:从channel中获取数据并推送给下一级Flume Agent或者存储数据到指定位置

    • Sink组概念:


      image.png
    • Sink优化:


      image.png
    • Sink事务特性:


      image.png
    • 一般配置:
      # sink类型
      a1.sinks.k1.type = hdfs
      # hdfs目录
      a1.sinks.k1.hdfs.path = /user/hduser/logs/data_%Y-%m-%d
      # 文件前缀
      a1.sinks.k1.hdfs.filePrefix = retail
      # 文件后缀
      a1.sinks.k1.hdfs.fileSuffix = .txt
      # 60秒或128兆或100个事件,则关闭文件
      a1.sinks.k1.hdfs.rollInterval = 60
      a1.sinks.k1.hdfs.rollSize  = 128000000
      a1.sinks.k1.hdfs.rollCount = 100
      # 30秒没数据写入则关闭文件
      a1.sinks.k1.hdfs.idleTimeout= 30
      # 最多打开100个文件
      a1.sinks.k1.hdfs.maxOpenFiles= 30
      # 文件不压缩
      # a1.sinks.k1.hdfs.fileType = DataStream
      # 文件压缩,snappy方式压缩
      a1.sinks.k1.hdfs.fileType = CompressedStream
      a1.sinks.k1.hdfs.codeC = snappy
      # 使用本地时间戳
      a1.sinks.k1.hdfs.useLocalTimeStamp = true
      # 每10分钟写到一个bucket(目录)
      a1.sinks.k1.hdfs.round = true
      a1.sinks.k1.hdfs.roundUnit = minute
      a1.sinks.k1.hdfs.roundValue= 10
      
  • 拦截器:工作在Source和Channel之间,在Source接收到数据后,拦截器基于自定义的规则删除或转换相关事件,如果一个链路上存在多个拦截器,将按顺序依次执行

    • 主机拦截器:Agent将主机IP或主机名添加到事件的报文头中,事件报文头使用hostHeader配置,默认是host
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = host
      # 是否覆盖原有的值
      a1.sources.r1.interceptors.i1.preserveExisting = true
      # 默认是使用IP,这里不使用IP
      a1.sources.r1.interceptors.i1.useIP = false
      # 使用主机名
      a1.sources.r1.interceptors.i1.hostHeader = hostname
      
      image.png

      image.png
    • 时间拦截器(常用):事件报文头带有timestamp键,可以方便HDFS Sink进行分桶
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = timestamp
      
      image.png
    • 静态拦截器:简单的将固定报文的KV对插入到报文事件中
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = static
      a1.sources.r1.interceptors.i1.key = Zone
      a1.sources.r1.interceptors.i1.value = NEW_YORK
      
      image.png
    • UUID拦截器:通过拦截器给每个事件添加唯一标识符UUID
      a1.sources.r1.interceptors = i1
      a1.sources.r1.interceptors.i1.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
      
      image.png
      • 正则过滤拦截器:选择性保留数据
        a1.sources.r1.interceptors = i1 i2
        
        a1.sources.r1.interceptors.i1.type = regex_filter
        a1.sources.r1.interceptors.i1.regex = .*info*
        # 保留此类事件
        a1.sources.r1.interceptors.i1.excludeEvents = false 
        
        a1.sources.r1.interceptors.i2.type = regex_filter
        a1.sources.r1.interceptors.i2.regex = .*data3*
        # 排除此类事件
        a1.sources.r1.interceptors.i2.excludeEvents = true
        
        image.png
      • 自定义拦截器:自定义的拦截器,java实现
        Map<String, String> headers = new HashMap<String, String>(); 
        headers.put("ClientServer", "Client01srv");
        
        List<Event> events = new ArrayList<Event>();
        events.add(EventBuilder.withBody("info ", Charset.forName("UTF8"), headers));
        
        RpcClient client = RpcClientFactory.getDefaultInstance(ip, port); // Avro
        client.appendBatch(events);
        client.close();
        
        image.png
  • 通道选择器:Channel处理器选择哪些事件进入哪个Channel

    • 复制Channel选择器:复制Source过来的数据到每个Channel中
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1 k2
      a1.channels = c1 c2
      
      # 配置Source部分
      a1.sources.r1.type = avro
      a1.sources.r1.bind = 10.0.1.213
      a1.sources.r1.port = 44444
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      a1.sinks.k2.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 100
      
      a1.channels.c2.type = memory
      a1.channels.c2.capacity = 100
      
      # 绑定相关组件
      a1.sources.r1.channels = c1 c2
      a1.sinks.k1.channel = c1
      a1.sinks.k2.channel = c2
      
    • 多路复用Channel选择器:根据事件头数据中key对应的value来选择此事件进入哪个Channel
      # 配置Agent中的三要素
      a1.sources = r1
      a1.sinks = k1 k2
      a1.channels = c1 c2
      
      # 配置Source部分
      a1.sources.r1.type = avro
      a1.sources.r1.bind = 10.0.1.213
      a1.sources.r1.port = 44444
      a1.sources.r1.selector.type = multiplexing
      a1.sources.r1.selector.header = op
      # op为1,事件进入 c1 Channel
      a1.sources.r1.selector.mapping.1 = c1
      # op为2,事件进入 c2 Channel
      a1.sources.r1.selector.mapping.2 = c2
      # op为其他值,事件进入 c2 Channel
      a1.sources.r1.selector.default = c2
      
      # 配置Sink部分
      a1.sinks.k1.type = logger
      a1.sinks.k2.type = logger
      
      # 配置Channel部分
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 100
      
      a1.channels.c2.type = memory
      a1.channels.c2.capacity = 100
      
      # 绑定相关组件
      a1.sources.r1.channels = c1 c2
      a1.sinks.k1.channel = c1
      a1.sinks.k2.channel = c2
      
  • Sink组:多个Sink组成一个Sink组,可用于负载均衡和故障转移

    • 负载均衡:多条链路都发送数据,不保证数据顺序
      image.png
      # 配置Sink部分
      a1.sinkgroups = g1
      a1.sinkgroups.g1.sinks = k1 k2
      a1.sinkgroups.g1.processor.type = load_balance
      a1.sinkgroups.g1.processor.backoff = true
      #a1.sinkgroups.g1.processor.selector = random
      a1.sinkgroups.g1.processor.selector = round_robin
      a1.sinks.k1.type = logger
      a1.sinks.k2.type = logger
      
    • 故障转移:一个链路坏了,其他链路会按照优先级起来顶替工作,保证数据顺序
      image.png
      # 配置Sink部分
      a1.sinkgroups = g1
      a1.sinkgroups.g1.sinks = k1 k2
      a1.sinkgroups.g1.processor.type = failover
      a1.sinkgroups.g1.processor.priority.k1 = 5
      a1.sinkgroups.g1.processor.priority.k2 = 10
      a1.sinkgroups.g1.processor.maxpenalty = 10000
      #a1.sinks.k1.type = logger
      a1.sinks.k2.type = logger
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = localhost
      a1.sinks.k1.port = 44444
      
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容