kafka在SASL/PLAIN模式下如何配置ACL
这里只介绍SASL/PLAIN模式下,如何配置ACL的问题,也就是假定SASL/PLAIN已经安装配置好了。关于如何配置SASL/PLAIN请参考前面一篇介绍:kafka如何配置SASL/PLAIN认证模式。
这个地方也分两种情况,就是kafka本身要不要也支持ACL:
- 如果不支持那么任何人都可以连接到zookeeper读取修改ACL配置。
- 如果支持那么对所有连接到zookeeper的访问也支持SASL认证。
因为ACL的数据是存储在zookeeper里面的。
- zookeeper不支持ACL
参考前面文章,zookeeper就不需要配置SASL信息。
- kafka端的配置server.properties
增加额外的下面三项:
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
# allow.everyone.if.no.acl.found=false
super.users=User:admin
这样就可以了。
- 创建ACL
$ /opt/kafka/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=zookeeper.example.com:2181 \
--add \
--allow-principal User:"kafkaclient" \
--operation All \
--topic "test-topic" \
--group "*" \
--cluster
- 查看ACL
$ /opt/kafka/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=zookeeper.example.com:2181 \
--list \
--topic "test-topic"
- zookeeper也支持ACL
- kafka端的配置server.properties
增加额外的下面三项:
authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer
# allow.everyone.if.no.acl.found=false
super.users=User:admin
zookeeper.set.acl=true
相比较前面zookeeper不支持ACL的场景多了一条zookeeper.set.acl=true
。
- zookeeper端的配置zoo.cfg
增加额外的下面三项:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
- 创建ACL
# for topic
$ /opt/kafka/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=zookeeper.example.com:2181 \
--add \
--allow-principal User:"kafkaclient1" \
--operation All \
--resource-pattern-type prefixed \
--topic "kafkaclient1--" \
--cluster
# for group
$ /opt/kafka/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=zookeeper.example.com:2181 \
--add \
--allow-principal User:"kafkaclient1" \
--operation All \
--group "*" \
--cluster
授予客户端kafkaclient1只能访问以"kafkaclient1--"开头的topic。
- 查看ACL
$ /opt/kafka/bin/kafka-acls.sh \
--authorizer-properties zookeeper.connect=zookeeper.example.com:2181 \
--list \
--resource-pattern-type prefixed \
--topic "kafkaclient1--"
上面3和4两步,我们没有看到哪里指定了SASL需要的用户信息,实际上是从环境变量KAFKA_OPTS去读取的;请参考本文最后的producer和consumer的执行步骤,显示的设定了KAFKA_OPTS值。
- 一个比较完整的zookeeper同时支持ACL的配置例子
- kafka端的配置文件server.properties
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# allow.everyone.if.no.acl.found=false
super.users=User:admin
zookeeper.set.acl=true
- kafka启动JVM命令添加java.security.auth.login.config
exec $base_dir/kafka-run-class.sh \
$EXTRA_ARGS \
-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf \
kafka.Kafka \
"$@"
或者设置到环境变量:
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_server_jaas.conf
- 文件/path/to/kafka_server_jaas.conf的内容
$ cat /path/to/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd"
user_admin="adminpwd"
user_kafkaclient1="kafkaclient1pwd"
user_kafkaclient2="kafkaclient2pwd";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd";
};
- zookeeper端的配置文件zoo.cfg
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
- zookeeper启动JVM命令添加java.security.auth.login.config
exec $base_dir/kafka-run-class.sh \
$EXTRA_ARGS \
-Djava.security.auth.login.config=/path/to/zookeeper_jaas.conf \
org.apache.zookeeper.server.quorum.QuorumPeerMain \
"$@"
或者设置到环境变量:
export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/zookeeper_jaas.conf
- 文件/path/to/zookeeper_jaas.conf的内容
$ cat /path/to/zookeeper_jaas.conf
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd"
user_admin="adminpwd";
};
- producer和consumer如何访问
- 定义client访问的jaas配置文件/path/to/kafka_client_jaas.conf
$ cat /path/to/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafkaclient1"
password="kafkaclient1pwd";
};
- 定义client访问的配置文件/path/to/client-sasl.properties
$ cat /path/to/client-sasl.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- 启动producer
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf"
$ /opt/kafka/bin/kafka-console-producer.sh \
--broker-list kafka.example.com:9092 \
--topic kafkaclient1--topic \
--producer.config /path/to/client-sasl.properties
- 启动consumer
$ export KAFKA_OPTS="-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf"
$ /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka.example.com:9092 \
--topic kafkaclient1--topic \
--consumer.config /path/to/client-sasl.properties --from-beginning
- 关于jaas文件
前面我们用到了好几个jaas文件
- kafka_server_jaas.conf
- kafka_client_jaas.conf
- zookeeper_jaas.conf
偷懒的做法我们可以把他们合并成一个,然后用到的时候全部指向这一个文件就行。例如:
$ cat kafka_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd"
user_admin="adminpwd"
user_kafkaclient1="kafkaclient1pwd"
user_kafkaclient2="kafkaclient2pwd";
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafkaclient1"
password="kafkaclient1pwd";
};
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd"
user_admin="adminpwd";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="adminpwd";
};
这个文件就包含4节内容:
- KafkaServer给kafka server使用。
- KafkaClient给kafka client用来连接kafka server的时候使用。
- Server给zookeeper使用。
- Client给zookeeper client用来连接zookeeper的时候使用,我们碰到两种场景:
4.1. kafka server作为zookeeper的client使用。
4.2 命令行工具/opt/kafka/bin/kafka-acls.sh访问zookeeper时作为客户端使用。