Selector-选择器
Selector是Source的子组件,用于决定数据的分发模式
提供了2种模式:replicating(复制模式)和multiplexing(路由模式/多路复用模式)
-
replicating:当节点收到数据的时候,会将数据复制,然后发往每一个扇出节点,此时每一个节点收到的数据是相同的。
- 概述
- Selector 默认是复制模式(replicating),即把source复制,然后分发给多个sink
*可配置选项说明
- Selector 默认是复制模式(replicating),即把source复制,然后分发给多个sink
- 概述
| 配置项 | 说明 |
|---|---|
| selector.type | replicating 表示复制模式,source的selector如果不配置,默认就是这种模式在复制模式下,当source接收到数据后,会复制多分,分发给每一个avro sink |
| selector.optional | 标志通道为可选 |
- 示例
a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating(这个是默认的)
a1.source.r1.channels = c1 c2 c3
a1.source.r1.selector.optional = c3
-
multiplexing:监听headers中指定字段的值,根据值来进行分发,此时每一个节点收到的数据是不同的。如果不指定,Selector默认采用的是复制模式。如果需要对数据进行备份,使用replicating;如果需要对数据进行分类,那么使用multiplexing
-
概述
- 在这种模式下,用户可以指定转发的规则。selector根据规则进行数据的分发
可配置选项说明
-
| 配置项 | 说明 |
|---|---|
| selector.type | multiplexing 表示路由模式 |
| selector.header | 指定要监测的头的名称 |
| selector.mapping.* | 匹配规则 |
| selector.default | 如果未满足匹配规则,则默认发往指定的通道 |
-
示例
概述
01机利用http source接收数据,根据路由规则,发往02,03机。02,03通过avro source接收数据,通过logger sink 打印数据-
配置
-
01机配置示例
#配置Agent a1 的组件 a1.sources=r1 a1.sinks=s1 s2 a1.channels=c1 c2 #描述/配置a1的source1 a1.sources.r1.type=http a1.sources.r1.port=8888 a1.sources.r1.selector.type=multiplexing a1.sources.r1.selector.header=state a1.sources.r1.selector.mapping.cn=c1 a1.sources.r1.selector.mapping.us=c2 a1.sources.r1.selector.default=c2 #描述sink a1.sinks.s1.type=avro a1.sinks.s1.hostname=192.168.234.212 a1.sinks.s1.port=9999 a1.sinks.s2.type=avro a1.sinks.s2.hostname=192.168.234.213 a1.sinks.s2.port=9999 #描述内存channel a1.channels.c1.type=memory a1.channels.c1.capacity=1000 a1.channels.c1.transactionCapacity=100 a1.channels.c2.type=memory a1.channels.c2.capacity=1000 a1.channels.c2.transactionCapacity=100 #为channel 绑定 source和sink a1.sources.r1.channels=c1 c2 a1.sinks.s1.channel=c1 a1.sinks.s2.channel=c
-
* 02,03配置示例:
```shell
#配置Agent a1 的组件
a1.sources=r1
a1.sinks=s1
a1.channels=c1
#描述/配置a1的source1
a1.sources.r1.type=avro
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=9999
#描述sink
a1.sinks.s1.type=logger
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
#为channel 绑定 source和sink
a1.sources.r1.channels=c1
a1.sinks.s1.channel=c1
```
Interceptor - 拦截器:
Interceptor是Source的子组件,也是Flume中唯一可以直接更改原始数据的组件
不同于Selector,Interceptor可以配置多个,形成拦截器链,后边的拦截器可以拦截之前的数据
Flume有能力在运行阶段修改/删除Event,这是通过拦截器(Interceptors)来实现的
拦截器需要实现org.apache.flume.interceptor.Interceptor接口
拦截器可以修改或删除事件基于开发者在选择器中选择的任何条件
拦截器采用了责任链模式,多个拦截器可以按指定顺序拦截
一个拦截器返回的事件列表被传递给链中的下一个拦截器
如果一个拦截器需要删除事件,它只需要在返回的事件集中不包含要删除的事件即可
如果要删除所有事件,只需返回一个空列表
-
常见子类:
-
Timestamp:会在headers中添加一个字段timestamp,用于标记日志被收集的时间。结合HDFS Sink,可以实现对日志进行按天存放的效果
-
概述
- 这个拦截器在事件头中插入以毫秒为单位的当前处理时间
- 头的名字为timestamp,值为当前处理的时间戳
- 如果在之前已经有这个时间戳,则保留原有的时间戳
可配置项说明
-
| 配置项 | 说明 |
|---|---|
| type | timestamp |
| preserveExisting | false 如果时间戳已经存在是否保留 |
- 配置示例
#配置Agent a1 的组件
a1.sources=r1
a1.sinks=s1 s2
a1.channels=c1 c2
#描述/配置a1的source1
a1.sources.r1.type=http
a1.sources.r1.port=8888
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp
#描述sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.234.212
a1.sinks.s1.port=9999
a1.sinks.s2.type=avro
a1.sinks.s2.hostname=192.168.234.213
a1.sinks.s2.port=9999
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
#为channel 绑定 source和sink
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
结果:

未命名图片.png
-
Host:会在headers中添加一个字段host,用于标记日志是被哪个节点收集的
-
概述
- 这个拦截器插入当前处理Agent的主机名或ip
- 头的名字为host或配置的名称
- 值是主机名或ip地址,基于配置
可配置项说明
-
| 配置参数 | 说明 |
|---|---|
| type | host |
| preserveExisting | false 如果主机名已经存在是否保留 |
| useIP | true 如果配置为true则用IP,配置为false则用主机名 |
| hostHeader | host 加入头时使用的名称 |
- 配置示例
#配置Agent a1 的组件
a1.sources=r1
a1.sinks=s1 s2
a1.channels=c1 c2
#描述/配置a1的source1
a1.sources.r1.type=http
a1.sources.r1.port=8888
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=host
#描述sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.234.212
a1.sinks.s1.port=9999
a1.sinks.s2.type=avro
a1.sinks.s2.hostname=192.168.234.213
a1.sinks.s2.port=9999
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
#为channel 绑定 source和sink
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
结果:

未命名图片.png
-
Static:会在headers中添加一个用户指定的字段
-
概述
- 此拦截器允许用户增加静态头信息使用静态的值到所有事件
- 目前的实现中不允许一次指定多个头
- 如果需要增加多个静态头可以指定多个Static interceptors
可配置项说明
-
| 配置项 | 说明 |
|---|---|
| type | static |
| preserveExisting | true |
| key | key 要增加的头名 |
| value | value 要增加的头值 |
- 配置示例
#配置Agent a1 的组件
a1.sources=r1
a1.sinks=s1 s2
a1.channels=c1 c2
#描述/配置a1的source1
a1.sources.r1.type=http
a1.sources.r1.port=8888
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=static
a1.sources.r1.interceptors.i1.key=addr
a1.sources.r1.interceptors.i1.value=beijing
#描述sink
a1.sinks.s1.type=avro
a1.sinks.s1.hostname=192.168.234.212
a1.sinks.s1.port=9999
a1.sinks.s2.type=avro
a1.sinks.s2.hostname=192.168.234.213
a1.sinks.s2.port=9999
#描述内存channel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.channels.c2.type=memory
a1.channels.c2.capacity=1000
a1.channels.c2.transactionCapacity=100
#为channel 绑定 source和sink
a1.sources.r1.channels=c1 c2
a1.sinks.s1.channel=c1
a1.sinks.s2.channel=c2
结果:

未命名图片.png
-
UUID:会在headers中添加一个字段id,用于标记数据的唯一性
-
概述
- 这个拦截器在所有事件头中增加一个全局一致性标志,其实就是UUID
可配置项说明
-
| 配置项 | 说明 |
|---|---|
| type | org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder |
| headerName | id 头名称 |
| preserveExisting | true 如果头已经存在,是否保留 |
| prefix | “” 在UUID前拼接的字符串前缀 |
- 配置示例
未命名图片.png
结果示意:
未命名图片.png
-
Search And Replace:在使用的时候需要指定正则表达式,那么符合正则表达式的数据就会被替换掉
-
概述
- 这个拦截器提供了简单的基于字符串的正则搜索和替换功能
可配置项说明
-
| 配置项 | 说明 |
|---|---|
| type | search_replace |
| searchPattern | 要搜索和替换的正则表达式 |
| replaceString | 要替换为的字符串 |
| charset | UTF-8 字符集编码,默认utf-8 |
- 配置示例
未命名图片.png
发送的数据示意(比如在body里写一些数字):
未命名图片.png
结果示意:
未命名图片.png
-
Regex Filtering:如果没有指定excludeEvents属性或者这个属性的值为false,那么就表示符合正则表达式的数据会被留下;如果excludeEvents的值为true,那么符合正则表达式的数据就会被过滤
-
概述
- 此拦截器通过解析事件体去匹配给定正则表达式来筛选事件
- 所提供的正则表达式即可以用来包含或刨除事件
可配置项说明
-
| 配置项 | 说明 |
|---|---|
| type | regex_filter |
| regex | ”.*” 所要匹配的正则表达式 |
| excludeEvents | false 如果是true则刨除匹配的事件,false则包含匹配的事件 |
- 配置示例
未命名图片.png
结果将过滤到以jp开头的信息,即如果发送的是以jp开头的信息,则收不到
未命名图片.png
-
Regex Extractor
-
概述
- 使用指定正则表达式匹配事件,并将匹配到的组作为头加入到事件中
- 它也支持插件化的序列化器用来格式化匹配到的组在加入他们作为头之前
可配置项说明
-
| 配置项 | 说明 |
|---|---|
| type | regex_extractor |
| regex | 要匹配的正则表达式 |
| serializers | 匹配对象列表 |
- 配置示例
# 01 define agent name, source/sink/channel
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 02 source,http,jsonhandler
a1.sources.r1.type = http
a1.sources.r1.bind = master
a1.sources.r1.port = 6666
a1.sources.r1.handler = org.apache.flume.source.http.JSONHandler
# 03 regex extractor interceptor,match event body to extract character and digital
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (^[a-zA-Z]*)\\s([0-9]*$) # regex匹配并进行分组,匹配结果将有两个部分, 注意\s空白字符要进行转义
# specify key for 2 matched part
a1.sources.r1.interceptors.i1.serializers = s1 s2
# key name
a1.sources.r1.interceptors.i1.serializers.s1.name = word
a1.sources.r1.interceptors.i1.serializers.s2.name = digital
# 04 logger sink
a1.sinks.k1.type = logger
# 05 channel,memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 06 bind source,sink to channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
发送事件:

573045-20171203164805476-1277297547.png
结果示意:

573045-20171203164829382-1566280927.png






