如何配置KafkaChannel

完整配置

# Define spooling source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /tmp/flume
a1.sources.s1.channels = c1

a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = regex_extractor
a1.sources.s1.interceptors.i1.regex =.*\\|(.*)\\|.*
a1.sources.s1.interceptors.i1.serializers = e1
a1.sources.s1.interceptors.i1.serializers.e1.name = key

# Define a kafka channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = host1:6667,host2:6667,host3:6667
a1.channels.c1.kafka.topic = test
a1.channels.c1.parseAsFlumeEvent = false

a1.channels = c1
a1.sources = s1
a1.sinks =k1

例如在/tmp/flume下面放置一个文件,内容a|b|c
那么通过上面的配置,消费一下kafka的test,看一下结果

sh /usr/hdp/2.5.0.0-1245/kafka/bin/kafka-console-consumer.sh --zookeeper host1:2181 --topic test  --property print.key=true

b       a|b|c
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容