用Cloudera Manager 安装 的kafka默认情况下,可以开启 Kerberos 和Sentry做安全认证和授权服务。
但有时有些特殊需求,不适合在Kerberos中添加用户,可以通过添加其他认证机制来解决此问题。
本文讲述了 如何让 Cloudera Manager中的kafka 支持 PLAIN和 SCRAM认证机制
版本:CDH-->5.13, Cloudera Manager-->5.13, kafka-->3.1.0对应的社区版:1.0.1
Step 1:登录 Cloudera Manager 控制台,进入kafka 组件的configuration 中。
Step 2:修改如下选项
Sentry Service:none 说明:关闭Sentry,因为SCRAM和PLAIN的授权操作都是通过kafka-acl来完成的,授权信息存储在zookeeper中,而Sentry 是独立的权限控制系统,只能支持Kerberos,不能支持SCRAM和PLAIN。
advertised.host.name:说明:配置每个broker的IP地址或者主机名
advertised.port:9093 说明:此示例中开启了PLAINTEXT-->9092和SASL_PLAINTEXT-->9093端口
security.inter.broker.protocol:INFERRED 说明:自动推测协议类型,推测结果为SASL_PLAINTEXT
authenticate.zookeeper.connection:true 连接 zookeeper是 启用SASL协议,因为 zookeeper开启了kerberos认证,因此此处需要勾选。
super.users:kafka 启动kafka服务的 超级用户。默认为kafka用户
broker_java_opts:尾部增加 -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf,指定kafka broker启动是 加载本地jaas文件/etc/kafka/kafka_server_jaas.conf,后面需要为每个机器设置kafka_server_jaas.conf文件
Kafka Broker Advanced Configuration Snippet (Safety Valve) for kafka.properties:内容如下
[
listeners=PLAINTEXT://0.0.0.0:9092,SASL_PLAINTEXT://0.0.0.0:9093
sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=GSSAPI
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
]
另外在每个broker上的此项配置中再增加如下信息
[
advertised.listeners=PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093
]
Step 3: 在每个broker主机上,添加/etc/kafka/kafka_server_jaas.conf,内容如下:
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username=kafka
password=thisIsSecret;
org.apache.kafka.common.security.plain.PlainLoginModule required
username=kafka
password=thisIsSecret
user_kafka=thisIsSecret
user_alice=thisIsSecret;
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useKeyTab=true
storeKey=true
useTicketCache=true
keyTab="/etc/kafka/kafka.keytab"
principal="kafka/hostname@example.com";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=true
keyTab="/etc/kafka/kafka.keytab"
principal="kafka/hostname@example.com";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=true
keyTab="/etc/kafka/kafka.keytab"
principal="kafka/hostname@example.com";
};
配置项中的principal 改为 实际的kerberos中的principal,及对应的keytab文件。
至此 可以通过PLAIN 和Kerberos 两种认证机制来访问kafka。
如果需要使用SCRAM机制,可以通过如下命令添加kafka和alice用户
/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=thisIsSecret]' --entity-type users --entity-name kafka
/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=thisIsSecret]' --entity-type users --entity-name alice
至此可以通过PLAIN,KERBEROS,SCRAM三种认证机制来访问kafka集群。
创建topic:
kafka-topics --zookeeper localhost:2181--list
kafka-topics --create --zookeeper localhost:2181--partitions 30 --replication-factor 1 --topic test
授权:
/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --operation IdempotentWrite --allow-principal User:* --allow-host=* --cluster
/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --operation All --allow-principal User:* --allow-host=* --transactional-id=*
/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --operation All --allow-principal User:* --allow-host=* --cluster
/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host=* --operation All --group=* --topic test
/opt/cloudera/parcels/KAFKA-3.0.0-1.3.0.0.p0.40/lib/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host=* --operation All --group=* --topic __consumer_offsets
客户端访问:
在客户端 创建如下plain_alice_jaas.conf 文件
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=alice
password=thisIsSecret;
};
scram_alice_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username=alice
password=thisIsSecret;
};
启动PLAIN机制的producer
export KAFKA_OPTS=" -Djava.security.auth.login.config=`pwd`/plain_alice_jaas.conf"
kafka-console-producer --broker-list localhost:9093 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN --topic test
启动SCRAM机制的consumer
export KAFKA_OPTS=" -Djava.security.auth.login.config=`pwd`/scram_alice_jaas.conf"
kafka-console-consumer --new-consumer --bootstrap-server localhost:9093 --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=SCRAM-SHA-256 --topic test
Kerberos认知机制如下
krb5_alice_jaas.conf文件
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=true
keyTab="/etc/kafka/alice.keytab"
principal="alice/hostname@example.com";
};
export KAFKA_OPTS=" -Djava.security.auth.login.config=`pwd`/krb5_alice_jaas.conf"
kafka-console-consumer --new-consumer --bootstrap-server localhost:9093 --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=GSSAPI --topic test