Flume 消费或生产消息到 Kafka 遇到的相关问题及解决

  1. 背景:

    在使用的 Flume 消费或生产添加了SASL_SSL认证的 Kafka 时,遇到以下问题。其中 Flume 本身自带 Kafka 依赖是0.9.0版本, Kafka 集群是0.10.0版本。

  2. 遇到的相关问题
    1. 问题一:

      由于 Flume 本身自带 Kafka 依赖是0.9.0版本, Kafka 集群是0.10.0版本,版本的不一致。

      ...
      19/10/18 13:05:18 ERROR source.BasicSourceSemantics: Unexpected error performing start: name = r1
      org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524)
        at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:514)
        at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
        at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:577)
        ... 13 more
      Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:299)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44)
        at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
        ... 16 more
      19/10/18 13:05:18 ERROR source.PollableSourceRunner: Unhandled exception, logging and sleeping for 5000ms
      org.apache.flume.FlumeException: Source had error configuring or starting
        at org.apache.flume.source.AbstractPollableSource.process(AbstractPollableSource.java:53)
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:133)
        at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:647)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:542)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:524)
        at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:514)
        at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
        at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
      Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:74)
        at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:60)
        at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:79)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:577)
        ... 13 more
      Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
        at javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
        at javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
        at javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
        at javax.security.auth.login.LoginContext.login(LoginContext.java:587)
        at org.apache.kafka.common.security.kerberos.Login.login(Login.java:299)
        at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
        at org.apache.kafka.common.security.kerberos.LoginManager.<init>(LoginManager.java:44)
        at org.apache.kafka.common.security.kerberos.LoginManager.acquireLoginManager(LoginManager.java:85)
        at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:55)
        ... 16 more
      ...
      

      解决办法:
      将 Flume 中的 lib 目录下的 kafka_2.10-0.9.0-kafka-2.0.2.jarkafka-clients-0.9.0-kafka-2.0.2.jar 的 jar 包替换为0.10.0的 jar 包。
      替换为 0.10.0 版本后可以实现 Kafka Sink (向 Kafka 生产数据),但是使用 Kafka Source (消费 Kafka 数据)会出现问题二。

      cp kafka_2.11-0.10.0-kafka-2.1.0.jar /opt/cloudera/parcels/CDH/jars/
      cp kafka-clients-0.10.0-kafka-2.1.0.jar /opt/cloudera/parcels/CDH/jars/
      cd /opt/cloudera/parcels/CDH/lib/flume-ng/lib
      ln -s  ../../../jars/kafka_2.11-0.10.0-kafka-2.1.0.jar kafka_2.11-0.10.0-kafka-2.1.0.jar
      ln -s ../../../jars/kafka-clients-0.10.0-kafka-2.1.0.jar kafka-clients-0.10.0-kafka-2.1.0.jar
      rm -rf /opt/cloudera/parcels/CDH/lib/flume-ng/lib/kafka_2.10-0.9.0-kafka-2.0.2.jar
      rm -rf /opt/cloudera/parcels/CDH/lib/flume-ng/lib/kafka-clients-0.9.0-kafka-2.0.2.jar
      
    2. 问题二:

      替换使用 Kafka(启用了 SASL_SSL) 时需要的两个 jar 包后,使用 Kafka Source 会出现 NoSuchMethodError 问题。

      ...
      19/10/18 10:49:51 INFO authenticator.AbstractLogin: Successfully logged in.
      19/10/18 10:49:51 INFO utils.AppInfoParser: Kafka version : 0.10.2.2
      19/10/18 10:49:51 INFO utils.AppInfoParser: Kafka commitId : cd80bc412b9b9701
      19/10/18 10:49:51 ERROR lifecycle.LifecycleSupervisor: Unable to start PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } - Exception follows.
      java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V
        at org.apache.flume.source.kafka.KafkaSource$TopicListSubscriber.subscribe(KafkaSource.java:152)
        at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:517)
        at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83)
        at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
      19/10/18 10:49:51 ERROR lifecycle.LifecycleSupervisor: Unsuccessful attempt to shutdown component: {} due to missing dependencies. Please shutdown the agentor disable this component, or the agent will bein an undefined state.
      java.lang.NullPointerException
        at org.apache.flume.source.PollableSourceRunner$PollingRunner.access$200(PollableSourceRunner.java:119)
        at org.apache.flume.source.PollableSourceRunner.stop(PollableSourceRunner.java:90)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:257)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
      19/10/18 10:49:54 INFO lifecycle.LifecycleSupervisor: Component PollableSourceRunner: { source:org.apache.flume.source.kafka.KafkaSource{name:r1,state:IDLE} counterGroup:{ name:null counters:{} } } is in error state, and Flume will notattempt to change its state
      

      查看对比 kafka-clients-0.9.0-kafka-2.0.2.jar 和 kafka-clients-0.10.0-kafka-2.1.0.jar 中的 org.apache.kafka.clients.consumer.Consumer 接口发现,subscribe 方法参数类型发生变化。
      0.9.0 版本:

      public interface Consumer<K, V> extends Closeable {
         Set<TopicPartition> assignment();
         
         Set<String> subscription();
         
         void subscribe(List<String> var1);
         
         void subscribe(List<String> var1, ConsumerRebalanceListener var2);
         
         void assign(List<TopicPartition> var1);
         
         void subscribe(Pattern var1, ConsumerRebalanceListener var2);
         
         void unsubscribe();
         ...
      }
      

      0.10.0 版本:

      public interface Consumer<K, V> extends Closeable {
          public Set<TopicPartition> assignment();
          
          public Set<String> subscription();
          
          public void subscribe(Collection<String> topics);
          
          public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
          
          public void assign(Collection<TopicPartition> partitions);
          
          public void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
          
          public void unsubscribe();
          ...
      }
      

      解决办法:
      Flume 中依赖的第三方 jar (Kafka的jar) 发生改变了, 需要重新编译。项目中所依赖的 API 如果发生更改,即使在源代码中不需要进行任何更改,也应重新编译。如果 API 未发生更改,则无需重新编译。所以要解决上述异常,需要将 flume-kafka-source 中依赖的 Kafka 版本改为 0.10.0 后重新编译打包,然后替换 Flume 的 lib 中的 flume-kafka-source.jar 包。
      参考: https://stackoverflow.com/questions/536971/do-i-have-to-recompile-my-application-when-i-upgrade-a-third-party-jar

      1. 下载 flume 源码包
        地址: http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.12.1-src.tar.gz

      2. 解压源码包进入 flume-kafka-source 项目中并修改 pom.xml 中的 kafka.version。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <!--<version>${kafka.version}</version>-->
            <version>0.10.0-kafka-2.1.0</version>
        </dependency>
        
      3. 使用 maven 打成 jar 包。

        [root@cdh01 flume-app]# wget http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.12.1-src.tar.gz
        [root@cdh01 flume-app]# tar -zxvf flume-ng-1.6.0-cdh5.12.1-src.tar.gz
        [root@cdh01 flume-app]# cd /data/flume-app/flume-ng-1.6.0-cdh5.12.1/flume-ng-sources/flume-kafka-source
        [root@cdh01 flume-kafka-source]# vim pom.xml
        [root@cdh01 flume-kafka-source]# rm -rf src/test/java/*  # 删除测试文件,打包时会报错
        [root@cdh01 flume-kafka-source]# mvn clean package
        
        
      4. 替换 /opt/cloudera/parcels/CDH/jars/flume-kafka-source-1.6.0-cdh5.12.1.jar

        [root@cdh01 ~]# rm -rf /opt/cloudera/parcels/CDH/jars/flume-kafka-source-1.6.0-cdh5.12.1.jar
        [root@cdh01 ~]# cd /data/flume-app/flume-ng-1.6.0-cdh5.12.1/flume-ng-sources/flume-kafka-source/target
        [root@cdh01 target]# cp flume-kafka-source-1.6.0-cdh5.12.1.jar /opt/cloudera/parcels/CDH/jars
        
  3. Flume 配置文件及命令

    运行命令:

    flume-ng agent --conf-file test-kafka-flume-hdfs.conf --name a1 -Dflume.root.logger=debug,CONSOLE -Djava.security.auth.login.config=/data/flume-app/kafka_client_jaas.conf -Xmx1024m
    

    kafka_client_jaas.conf 配置文件:

    KafkaClient {
      org.apache.kafka.common.security.plain.PlainLoginModule required
      username="username"
      password="123456";
    };
    

    test-kafka-flume-hdfs.conf 配置文件:

    ## flume 配置
    ## 组件
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    ## source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = 192.168.1.101:9093,192.168.1.102:9093,192.168.1.103:9093
    a1.sources.r1.kafka.topics = test_topic
    a1.sources.r1.kafka.consumer.group.id = test_group
    a1.sources.r1.kafka.consumer.auto.offset.reset = earliest
    a1.sources.r1.kafka.consumer.security.protocol = SASL_SSL
    a1.sources.r1.kafka.consumer.sasl.mechanism = PLAIN
    a1.sources.r1.kafka.consumer.ssl.truststore.location = /data/flume-app/kafka.client.truststore.jks 
    a1.sources.r1.kafka.consumer.ssl.truststore.password = truststore
    
    ## channel1
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /data/flume-app/channel/checkpointDir/test/checkpoint
    a1.channels.c1.dataDirs = /data/flume-app/channel/checkpointDir/test/data
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.transactionCapacity = 10000
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    ##sink1
    a1.sinks.k1.type = logger
    #a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    #a1.sinks.k1.kafka.producer.max.request.size = 100000
    #a1.sinks.k1.kafka.producer.compression.type = gzip
    #a1.sinks.k1.brokerList = 192.168.1.101:9093,192.168.1.102:9093,192.168.1.103:9093
    #a1.sinks.k1.topic = sink_test
    #a1.sinks.k1.kafka.producer.security.protocol = SASL_SSL
    #a1.sinks.k1.kafka.producer.sasl.mechanism = PLAIN
    #a1.sinks.k1.kafka.producer.ssl.truststore.location = /data/flume-app/kafka.client.truststore.jks 
    #a1.sinks.k1.kafka.producer.ssl.truststore.password = truststore
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1   
    
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容