Flume学习-Selector-选择器、Interceptor-拦截器

Selector-选择器

  • Selector是Source的子组件,用于决定数据的分发模式

  • 提供了2种模式:replicating(复制模式)和multiplexing(路由模式/多路复用模式)

  • replicating:当节点收到数据的时候,会将数据复制,然后发往每一个扇出节点,此时每一个节点收到的数据是相同的。

    • 概述
      • Selector 默认是复制模式(replicating),即把source复制,然后分发给多个sink
        *可配置选项说明
配置项 说明
selector.type replicating 表示复制模式,source的selector如果不配置,默认就是这种模式在复制模式下,当source接收到数据后,会复制多分,分发给每一个avro sink
selector.optional 标志通道为可选
  • 示例
a1.sources = r1  
a1.channels = c1 c2 c3  
a1.source.r1.selector.type = replicating(这个是默认的)  
a1.source.r1.channels = c1 c2 c3  
a1.source.r1.selector.optional = c3  
  • multiplexing:监听headers中指定字段的值,根据值来进行分发,此时每一个节点收到的数据是不同的。如果不指定,Selector默认采用的是复制模式。如果需要对数据进行备份,使用replicating;如果需要对数据进行分类,那么使用multiplexing

    • 概述

      • 在这种模式下,用户可以指定转发的规则。selector根据规则进行数据的分发
    • 可配置选项说明

配置项 说明
selector.type multiplexing 表示路由模式
selector.header 指定要监测的头的名称
selector.mapping.* 匹配规则
selector.default 如果未满足匹配规则,则默认发往指定的通道
  • 示例

    • 概述
      01机利用http source接收数据,根据路由规则,发往02,03机。02,03通过avro source接收数据,通过logger sink 打印数据

    • 配置

      • 01机配置示例

        #配置Agent a1 的组件  
        a1.sources=r1  
        a1.sinks=s1 s2  
        a1.channels=c1 c2  
        #描述/配置a1的source1  
        a1.sources.r1.type=http  
        a1.sources.r1.port=8888  
        a1.sources.r1.selector.type=multiplexing  
        a1.sources.r1.selector.header=state  
        a1.sources.r1.selector.mapping.cn=c1  
        a1.sources.r1.selector.mapping.us=c2  
        a1.sources.r1.selector.default=c2  
        #描述sink  
        a1.sinks.s1.type=avro  
        a1.sinks.s1.hostname=192.168.234.212  
        a1.sinks.s1.port=9999  
        a1.sinks.s2.type=avro
        a1.sinks.s2.hostname=192.168.234.213  
        a1.sinks.s2.port=9999  
        #描述内存channel
        a1.channels.c1.type=memory  
        a1.channels.c1.capacity=1000  
        a1.channels.c1.transactionCapacity=100  
        a1.channels.c2.type=memory  
        a1.channels.c2.capacity=1000  
        a1.channels.c2.transactionCapacity=100  
        #为channel 绑定 source和sink  
        a1.sources.r1.channels=c1 c2  
        a1.sinks.s1.channel=c1  
        a1.sinks.s2.channel=c  
        
    * 02,03配置示例:

      ```shell
      #配置Agent a1 的组件  
      a1.sources=r1  
      a1.sinks=s1  
      a1.channels=c1  
      #描述/配置a1的source1  
      a1.sources.r1.type=avro  
      a1.sources.r1.bind=0.0.0.0  
      a1.sources.r1.port=9999  
      #描述sink  
      a1.sinks.s1.type=logger  
      #描述内存channel  
      a1.channels.c1.type=memory  
      a1.channels.c1.capacity=1000  
      a1.channels.c1.transactionCapacity=100  
      #为channel 绑定 source和sink  
      a1.sources.r1.channels=c1  
      a1.sinks.s1.channel=c1  
      ```

Interceptor - 拦截器:

  • Interceptor是Source的子组件,也是Flume中唯一可以直接更改原始数据的组件

  • 不同于Selector,Interceptor可以配置多个,形成拦截器链,后边的拦截器可以拦截之前的数据

  • Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的

  • 拦截器需要实现org.apache.flume.interceptor.Interceptor接口

  • 拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件

  • 拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截

  • 一个拦截器返回的事件列表被传递给链中的下一个拦截器

  • 如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可

  • 如果要删除所有事件,只需返回一个空列表

  • 常见子类:

  • Timestamp:会在headers中添加一个字段timestamp,用于标记日志被收集的时间。结合HDFS Sink,可以实现对日志进行按天存放的效果

    • 概述

      • 这个拦截器在事件头中插入以毫秒为单位的当前处理时间
      • 头的名字为timestamp,值为当前处理的时间戳
      • 如果在之前已经有这个时间戳,则保留原有的时间戳
    • 可配置项说明

配置项 说明
type timestamp
preserveExisting false 如果时间戳已经存在是否保留
  • 配置示例
#配置Agent a1 的组件  
a1.sources=r1  
a1.sinks=s1 s2  
a1.channels=c1 c2  
#描述/配置a1的source1  
a1.sources.r1.type=http  
a1.sources.r1.port=8888  
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp  
#描述sink  
a1.sinks.s1.type=avro  
a1.sinks.s1.hostname=192.168.234.212  
a1.sinks.s1.port=9999  
a1.sinks.s2.type=avro  
a1.sinks.s2.hostname=192.168.234.213  
a1.sinks.s2.port=9999  
#描述内存channel  
a1.channels.c1.type=memory  
a1.channels.c1.capacity=1000  
a1.channels.c1.transactionCapacity=100  
a1.channels.c2.type=memory  
a1.channels.c2.capacity=1000  
a1.channels.c2.transactionCapacity=100  
#为channel 绑定 source和sink  
a1.sources.r1.channels=c1 c2 
a1.sinks.s1.channel=c1  
a1.sinks.s2.channel=c2  

结果:

未命名图片.png

  • Host:会在headers中添加一个字段host,用于标记日志是被哪个节点收集的

    • 概述

      • 这个拦截器插入当前处理Agent的主机名或ip
      • 头的名字为host或配置的名称
      • 值是主机名或ip地址,基于配置
    • 可配置项说明

配置参数 说明
type host
preserveExisting false 如果主机名已经存在是否保留
useIP true 如果配置为true则用IP,配置为false则用主机名
hostHeader host 加入头时使用的名称
  • 配置示例
#配置Agent a1 的组件  
a1.sources=r1  
a1.sinks=s1 s2  
a1.channels=c1 c2  
#描述/配置a1的source1  
a1.sources.r1.type=http  
a1.sources.r1.port=8888  
a1.sources.r1.interceptors=i1  
a1.sources.r1.interceptors.i1.type=host  
#描述sink  
a1.sinks.s1.type=avro  
a1.sinks.s1.hostname=192.168.234.212  
a1.sinks.s1.port=9999  
a1.sinks.s2.type=avro  
a1.sinks.s2.hostname=192.168.234.213  
a1.sinks.s2.port=9999  
#描述内存channel  
a1.channels.c1.type=memory  
a1.channels.c1.capacity=1000  
a1.channels.c1.transactionCapacity=100  
a1.channels.c2.type=memory  
a1.channels.c2.capacity=1000  
a1.channels.c2.transactionCapacity=100  
#为channel 绑定 source和sink  
a1.sources.r1.channels=c1 c2  
a1.sinks.s1.channel=c1  
a1.sinks.s2.channel=c2  

结果:

未命名图片.png

  • Static:会在headers中添加一个用户指定的字段

    • 概述

      • 此拦截器允许用户增加静态头信息使用静态的值到所有事件
      • 目前的实现中不允许一次指定多个头
      • 如果需要增加多个静态头可以指定多个Static interceptors
    • 可配置项说明

配置项 说明
type static
preserveExisting true
key key 要增加的头名
value value 要增加的头值
  • 配置示例
#配置Agent a1 的组件  
a1.sources=r1  
a1.sinks=s1 s2  
a1.channels=c1 c2  
#描述/配置a1的source1  
a1.sources.r1.type=http  
a1.sources.r1.port=8888  
a1.sources.r1.interceptors=i1  
a1.sources.r1.interceptors.i1.type=static  
a1.sources.r1.interceptors.i1.key=addr  
a1.sources.r1.interceptors.i1.value=beijing  
#描述sink  
a1.sinks.s1.type=avro  
a1.sinks.s1.hostname=192.168.234.212  
a1.sinks.s1.port=9999  
a1.sinks.s2.type=avro  
a1.sinks.s2.hostname=192.168.234.213  
a1.sinks.s2.port=9999  
#描述内存channel  
a1.channels.c1.type=memory  
a1.channels.c1.capacity=1000  
a1.channels.c1.transactionCapacity=100  
a1.channels.c2.type=memory  
a1.channels.c2.capacity=1000  
a1.channels.c2.transactionCapacity=100  
#为channel 绑定 source和sink  
a1.sources.r1.channels=c1 c2  
a1.sinks.s1.channel=c1  
a1.sinks.s2.channel=c2  

结果:

未命名图片.png

  • UUID:会在headers中添加一个字段id,用于标记数据的唯一性

    • 概述

      • 这个拦截器在所有事件头中增加一个全局一致性标志,其实就是UUID
    • 可配置项说明

配置项 说明
type org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder
headerName id 头名称
preserveExisting true 如果头已经存在,是否保留
prefix “” 在UUID前拼接的字符串前缀
  • 配置示例
    未命名图片.png

    结果示意:
    未命名图片.png
  • Search And Replace:在使用的时候需要指定正则表达式,那么符合正则表达式的数据就会被替换掉

    • 概述

      • 这个拦截器提供了简单的基于字符串的正则搜索和替换功能
    • 可配置项说明

配置项 说明
type search_replace
searchPattern 要搜索和替换的正则表达式
replaceString 要替换为的字符串
charset UTF-8 字符集编码,默认utf-8
  • 配置示例
    未命名图片.png

    发送的数据示意(比如在body里写一些数字):
    未命名图片.png

    结果示意:
    未命名图片.png
  • Regex Filtering:如果没有指定excludeEvents属性或者这个属性的值为false,那么就表示符合正则表达式的数据会被留下;如果excludeEvents的值为true,那么符合正则表达式的数据就会被过滤

    • 概述

      • 此拦截器通过解析事件体去匹配给定正则表达式来筛选事件
      • 所提供的正则表达式即可以用来包含或刨除事件
    • 可配置项说明

配置项 说明
type regex_filter
regex ”.*” 所要匹配的正则表达式
excludeEvents false 如果是true则刨除匹配的事件,false则包含匹配的事件
  • 配置示例
    未命名图片.png

    结果将过滤到以jp开头的信息,即如果发送的是以jp开头的信息,则收不到
    未命名图片.png
  • Regex Extractor

    • 概述

      • 使用指定正则表达式匹配事件,并将匹配到的组作为头加入到事件中
      • 它也支持插件化的序列化器用来格式化匹配到的组在加入他们作为头之前
    • 可配置项说明

配置项 说明
type regex_extractor
regex 要匹配的正则表达式
serializers 匹配对象列表
  • 配置示例
# 01 define agent name, source/sink/channel   
a1.sources = r1  
a1.sinks = k1  
a1.channels = c1  
# 02 source,http,jsonhandler  
a1.sources.r1.type = http  
a1.sources.r1.bind = master  
a1.sources.r1.port = 6666  
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler  
# 03 regex extractor interceptor,match event body to extract character and digital  
a1.sources.r1.interceptors = i1    
a1.sources.r1.interceptors.i1.type = regex_extractor  
a1.sources.r1.interceptors.i1.regex = (^[a-zA-Z]*)\\s([0-9]*$)  # regex匹配并进行分组,匹配结果将有两个部分, 注意\s空白字符要进行转义  
# specify key for 2 matched part  
a1.sources.r1.interceptors.i1.serializers = s1 s2  
# key name  
a1.sources.r1.interceptors.i1.serializers.s1.name = word  
a1.sources.r1.interceptors.i1.serializers.s2.name = digital   
# 04 logger sink  
a1.sinks.k1.type = logger  
# 05 channel,memory  
a1.channels.c1.type = memory  
a1.channels.c1.capacity = 1000  
a1.channels.c1.transactionCapacity = 100  
# 06 bind source,sink to channel  
a1.sources.r1.channels = c1  
a1.sinks.k1.channel = c1    

发送事件:

573045-20171203164805476-1277297547.png

结果示意:
573045-20171203164829382-1566280927.png

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容