今天在看网课做项目的时候使用flume的时候拿它和kafka对接,source使用的是一个TAILDIR,channel就是一个普通的memorychannel,因为内容比较简单,我也没对照文档就自己写了,但是sink当时是选择使用一个channel对接两个sink,其中一个是kafkasink,另一个就是loggersink,多加一个loggersink是我想着loggersink读取到控制台的时候能够更直观的显示,但是遇到了一个问题,这两个sink获取到的channel数据不是一样的,而是互相争夺,同一个event只能有其中一个sink获取到(这和channel不同,channel如果一个source对接两个channel,默认情况下,两个channel获取到的数据是一样的),所以跟着官方文档看了下,补习一下多channel和多sink情况下参数配置,在这里做个总结。
ChannelProcessor
Source读取到数据后进入Channelprocessor,首先把读取到的event传递给拦截器,这里的拦截器可以是多个,在编写代码时,使用时需要以类的全限定名的方式把他们添加到集合中最后传给Properties配置文件,拦截器可以对event的header进行修改,修改后event将被传递给channelSelector,channelSelector也就是channel选择器会根据header的内容把event传递到不同的channel。
以上是ChannelProcessor的事件处理过程,但是如果默认情况下,我们使用一个source对接多个channel并且不作参数配置,那么selector.type的类型将会是replicating,也就是把同一个event都发给不同的channel。
可以看到默认的type就是replicating,而根据event的header进行channel选的的type就是下面的multiplexing,官方使用的配置案例上面也写了,
a1.sources.r1.selector.header=state
首先我们知道event的header里储存的数据是以Map的形式,也就是可以储存很多的KV对。
表示在选择器读取event的时候会根据key为state的value进行判断(这个state可以自定义名字,假设改为aaa,那么接下来比较的就是header中kv键值对中key为aaa的value值),以该value的值作为参考来分配到不同的channel。
a1.sources.r1.selector.mapping.CZ=c1
a1.sources.r1.selector.mapping.US=c2 c3
a1.sources.r1.selector.default=c4
后面的CZ US也是可以自定义的名字,表示的是如果key为state的这个kv对里的value="CZ",那么把这个event传递给c1,如果value="US",那么传递给c2和c3(c2和c3应该能获取到同一个event),如果value不为这两个值那么就按照default的配置,把他传递给c4.
multiplexing需要配合拦截器使用如果我们不自定义拦截器,那么event默认为null,自然无法进行选择操作会传递给default。
如果不配置
SinkProcessor
就是有了上面的惯性思维加上偷懒不去看官方文档,导致我一度认为sink的默认传递方式也是replicating,但是sink与channel并不相同。
可以看到sink有三个基本的的配置方式分别为default,failover及load_balance,我们看到最下面一句话,默认的sinkprocessor只接受一个sink,简单的理解就是一个event只能给一个sink。
那我想实现一个channel中的event给多个sink,光靠配置sinkprocessor可以做到吗?接下来看后两种type。
load_balance听名字我们就知道这是做负载均衡的,很遗憾负载均衡的多个sink肯定是无法接收到同一个event的,这个sinkprocessor会帮我们做的只有尽量均匀的将event传递给不同的sink。关键的配置参数
a1.sinkgroups.g1.processor.selector=random
我们看名字就知道只是限定负载均衡的方式为随机发送event到不同的sink。
那么failover呢?
failover其实就是故障转移,你设置的sinks k1 k2 并不都能获取到event,需要同时参考你设置的参数
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
看文档我们知道priority越大的sink,获取到event的优先级更好,而当该sink遇见状况无法正常工作时,才会把其他event交给优先级较低的sink。
a1.sinkgroups.g1.processor.maxpenalty=10000
这个参数我们可以理解为对故障sink的最大心跳时间,假如k2出现故障了,那么event将传递给k1,但是channel还会不定时的访问k2,询问其是否恢复,这个询问时间会不停的增加,而该参数则是配置最大的询问时间间隔,防止因为指数级的时间间隔增加,导致k2恢复后也长期无法接收到evnet。
总结
如果想要同一channel的事件传递给多个sink,果然还是无法实现的,当然是指通过配置文件无法实现,如果愿意研究自定义SInkProcessor的相关组件应该还是可以实现的。但是如果可以使用多个channel,那么配置一个source对应两个channel这两个channel再各自对应各自的sink,就很简单了。