flume同时使用KafkaSource、KafkaSink导致的问题

KafkaSource 配置topic:topic1
KafkaSink 配置topic:topic2
从topic1拉取数据,经过简单处理后发到topic2
但是你会发现flume一直在循环读写topic1
原因就是KafkaSink中的这段代码:

if (eventTopic == null) {
    eventTopic = topic;
} 

首先从headers中获取TOPIC_HEADER(topic),然后优先使用。然而在KafkaSource中则会将topic1 PUT到Header中,所以导致循环读写topic1。

如何解决呢?

你可以重新KafkaSink,改掉上面那段代码。

或者配置一个拦截器,将KafkaSource 写到Header中的topic给替换掉:

agent.sources.so1.interceptors.i1.type = static
agent.sources.so1.interceptors.i1.key = topic
agent.sources.so1.interceptors.i1.preserveExisting = false
agent.sources.so1.interceptors.i1.value = topic2
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容