Spark-Streaming Kafka In Kerberos

最近在HDP2.6的环境里尝试了Kerberos,在各组件运行正常的情况下最终成功运行spark-streaming应用,总结一下就是一叶障目,不见泰山,坑多梯子少。尤其在国内,关于Kerberos的资料较少,但在生产环境中,Kerberos又是如鲠在喉,无法忽视。

因此分享这篇文章,希望能给还在苦苦爬坑的小伙伴们一点帮助。

  • 我们的HDP为单用户ocsp安装,多用户需要根据以下步骤进行细微修改

确认OCSP各组件的Kerberos工作正常

1. Kafka

  • 使用kafka-topics.sh创建topic

  • 使用kafka producer和consumer需要先kinit
    kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/host-10-1-236-122@ASIAINFO.COM

  • 使用producer发送消息,consumer消费消息

    • kafka producer
      /usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-producer.sh --topic kerin --broker-list host-10-1-236-122:6667 --security-protocol PLAINTEXTSASL

    • kafka consumer
      /usr/hdp/2.6.0.3-8/kafka/bin/kafka-console-consumer.sh --topic kerin --security-protocol PLAINTEXTSASL --bootstrap-server host-10-1-236-122:6667

FAQ:

  • 使用kafka producer和consumer需要先kinit kinit -kt /etc/security/keytabs/kafka.service.keytab ocsp/<hostname>@ASIAINFO.COM
  • 否则:
    • kafka producer 报错:

      [2017-07-19 10:44:56,582] WARN Error while fetching metadata with correlation id 0 : {kertest=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)

    • kafka consumer 报错:

      javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user


2. Hive

  • kinit
  • 使用beeline登录

3. Phoenix

  • 使用sqlline与principal,keytab登录

进行Spark,Kafka针对Kerberos相关配置

1. 先放上最后提交任务的命令

spark-submit  --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default  --principal ocsp-yg@ASIAINFO.COM --keytab /etc/security/keytabs/hdfs.headless.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar 
  • --principal与--keytab这两个参数为spark需要的Kerberos认证信息

  • --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf"为driver连接kafka用到的认证信息,因此使用本地绝对路径

  • --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf"为executor连接kafka用到的Kerberos认证信息,因此使用container中的相对路径./

  • jaas文件中定义了principal与keytab,由于我们使用了yarn-client模式,driver需要的文件在本地文件系统,executor需要的文件需要我们使用--files的方式上传,即--files "/usr/OCSP/conf/kafka_client_jaas.conf,/usr/OCSP/conf/ocsp.keytab"

  • 有的文档中说--files中传keytab文件会与spark本身的--keytab 冲突,其实是因为他们对spark和kafka使用了相同的principal和keytab,在上述命令中我为了清晰起见,让spark使用了principal ocsp-yg@ASIAINFO.COM,keytab hdfs.headless.keytab,让spark连接kafka时使用了principal ocsp/ASIAINFO.COM(principal其实是在jaas文件中指定的,3中详细讲jaas文件) keytab ocsp.keytab,当spark提交任务时,yarn会将--keytab后面的keytab文件与--files里的文件先后上传,即 hdfs.headless.keytab与ocsp.keytab均会被上传,spark与kafka各取所需,即可正常工作。当spark与kafka要使用相同的keytab文件时,比如都用ocsp.keytab,那么yarn会先后上传两次ocsp.keytab,在spark正使用的时候更新了keytab,造成异常退出

  • 因此如果spark与kafka需要使用相同的keytab文件,我们只需要在--files里不要上传keytab即可避免冲突

spark-submit  --class <classname> --master yarn --deploy-mode client --executor-memory 2g --executor-cores 2 --driver-memory 2g --num-executors 2 --queue default  --principal ocsp@ASIAINFO.COM --keytab /etc/security/keytabs/ocsp.keytab --files "/usr/OCSP/conf/kafka_client_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/usr/OCSP/conf/kafka_client_jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" --jars <your jars>,/usr/OCSP/lib/spark-kafka-0-10-connector-assembly_2.10-1.0.1.jar /usr/OCSP/lib/ocsp-core_1.6-2.1.0.jar 
  • 还有一个问题是本例中drvier和executor使用了相同的kafka_client_jaas.conf,这也会造成一些问题,3中会详细说明

2. 生成keytab和principal

  • 在KDC Server上执行
    kadmin -p admin/admin@ASIAINFO.COM
  • 生成principal,principal最好使用ocsp的用户名+domain
    addprinc -randkey ocsp/ASIAINFO.COM
  • 生成keytab
    ktadd -k /data/ocsp.keytab ocsp/ASIAINFO.COM
  • 将keytab文件copy到spark driver所在的机器(因为OCSP默认使用yarn-client模式)

3. 创建spark读取kafka的jaas配置文件

  • 配置文件kafka_client_jaas.conf样例如下:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=false
  useKeyTab=true
  principal="ocsp@ASIAINFO.COM"
  keyTab="./ocsp.keytab"
  renewTicket=true
  storeKey=true
  serviceName="ocsp";
};
  • 其中useTicketCache指从系统的cash中读取credential信息,useKeyTab指从指定的keyTab文件读取credential

  • principal和keytab用第二步生成的principal与keytab,注意:k�eytab的路径

    • 如果这个conf文件是给driver读取,则我们要用keytab文件在本地的绝对路径
    • 如果这个conf文件是executor读取,则我们要用keytab文件在container中的相对路径,即./ocsp.keytab
    • 如果为了方便起见,drvier与executor要使用相同的jaas文件,路径配置为./ocsp.keytab,我们需要将keytab文件copy到运行spark-submit的当前路径
    • 如果driver和executor要使用不同的jaas文件,则driver的jaas文件中,keytab应为本地绝对路径,executor的jaas文件中,keytab应为相对路径./

4. 配置spark1.6+kafka0.10 jar包

<dependency>
                    <groupId>com.hortonworks</groupId>
                    <artifactId>spark-kafka-0-10-connector-main_2.10</artifactId>
                    <version>1.0.1</version>
                    <scope>system</scope>
                    <systemPath>${project.basedir}/../lib/spark-kafka-0-10-connector_2.10-1.0.1.jar</systemPath>
</dependency>

5. 修改Spark读取Kafka部分

  • 需要import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
  • 我们使用的DirectApi读取kafka
KafkaUtils.createDirectStream[String, String](
        SSC,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](TopicsSet, KafkaParams))

KafkaParams配置如下:

val KafkaParams = Map[String, Object]("auto.offset.reset" -> "latest"
            , "key.deserializer" -> classOf[StringDeserializer]
            , "value.deserializer" -> classOf[StringDeserializer]
            , "security.protocol" -> "SASL_PLAINTEXT"
            , "bootstrap.servers" -> "kafka-server1:6667"
            , "group.id" -> "test")

6. 修改Spark写Kafka部分

  • 写kafka调用的是kafka官方的库
<dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>0.10.1.1</version>
</dependency>
  • 代码中需要import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerConfig, ProducerRecord}
val props = new Properties()
      props.put("bootstrap.servers", dsConf.get("metadata.broker.list", ""))
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      if (MainFrameConf.KERBEROS_ENABLE == "true"){
        props.put("security.protocol","SASL_PLAINTEXT")
      }
new KafkaProducer[String, String](props)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容