为了kafka集群的安全,我们一般会开启认证,kafka的认证方式有多种,详细见http://kafka.apache.org/documentation/#security, 这里只讲下sasl认证方式
1. kafka的认证范围
- kafka client 与 kafka server(broker)
- broker与broker之间
- broker与zookeeper之间
2. 开启认证的步骤
2.1 zookeeper的认证配置
- 在zookeeper安装根目录的conf目录下,创建zk_server_jaas.conf, 文件内容如下
Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_kafka="kafka-secret"; };
user_kafka="kafka-secret"; 创建用户kafka, 密码是"kafka-secret", 该账号用于kafka broker与zookeeper连接的时候的认证
- 修改zoo.cfg, 添加一下内容
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000
- 因为认证的时候用到包org.apache.kafka.common.security.plain.PlainLoginModule, 这个是kafka-client.jar里面,所有需要将相应的jar拷贝到 zookeeper安装根目录的lib目录下, 大概要copy这些jar
kafka-clients-2.0.0.jar lz4-java-1.4.1.jar osgi-resource-locator-1.0.1.jar slf4j-api-1.7.25.jar snappy-java-1.1.7.1.jar
- 然后就是修改zk的启动参数, 修改 bin/zkEnv.sh, 在文件尾加上
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOCFGDIR/zk_server_jaas.conf "
- 然后重新启动zookeeper服务就好(如果是集群,每个集群都进行相应的操作)
2.2 kafka broker的认证配置
- 在安装根目录的config目录下, 创建kafka_server_jaas.conf, 内容如下
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret" user_admin="admin-secret" user_alice="alice-secret"; }; Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-secret"; };
KafkaServer段里面配置了broker之间的认证配置以及client和broker之间的认证配置
KafkaServer.username, KafkaServer.password用于broker之间的相互认证
KafkaServer.user_admin和KafkaServer.user_alice用于client和broker之间的认证, 下面我们client里面都用用户alice进行认证
Client段里面定义username和password用于broker与zookeeper连接的认证
- 修改 config/server.properties
# 本例使用SASL_PLAINTEXT listeners=SASL_PLAINTEXT://127.0.0.1:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN # 配置ACL入口类 authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer # default false | true to accept all the users to use it. # allow.everyone.if.no.acl.found=true # 设置本例中admin为超级用户 super.users=User:admin
- 修改kafka启动脚本, 添加 java.security.auth.login.config 环境变量。打开 bin/kafka-server-start.sh
将最后一句
修改为exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_server_jaas.conf kafka.Kafka "$@"
- 然后重启kafka server就可(如果是集群,所有的broker都进行相应的修改操作)
2.3 kafka client的认证配置
- 在kakfka根目录/config/下创建kafka_client_jaas.conf, 内容如下
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice-secret"; };
- producer 配置修改
- 修改 producer脚本启动参数, 打开bin/kafka-console-producer.sh
将最后一行
修改为exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_client_jaas.conf kafka.tools.ConsoleProducer "$@"
- 创建一个producer.config 为 console producer指定下面两个属性
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN
- 测试
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --producer.config producer.config
- 修改 producer脚本启动参数, 打开bin/kafka-console-producer.sh
- consumer配置修改
- 修改 consume脚本启动参数, 打开bin/kafka-console-consumer.sh
将最后一行
修改为exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/soft/kafka_cluster_ss/k1/config/kafka_client_jaas.conf kafka.tools.ConsoleConsumer "$@"
- 创建一个consumer.config 为 console consumer指定下面三个属性
security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN group.id=test-group
- 修改 consume脚本启动参数, 打开bin/kafka-console-consumer.sh
- 测试
上面修改了console-producer 以及console-consumer的配置以及启动脚本之后,会发现并不能正常的进行消息生成和消费,会提示认证失败,这个时候就需要对用户进行权限分配了- 为topic test_tp1 添加生产者用户alice
[root@test kafka_cluster_ss]# k1/bin/kafka-acls.sh --authorizer- properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice --producer --topic test_tp1 Adding ACLs for resource `Topic:LITERAL:test_tp1`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Create from hosts: * User:alice has Allow permission for operations: Write from hosts: * Current ACLs for resource `Topic:LITERAL:test_tp1`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Create from hosts: * User:alice has Allow permission for operations: Write from hosts: *
- 为topic test_tp1添加消费者用户alice, 消费组位 test_group
[root@test kafka_cluster_ss]# k1/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 --add --allow-principal User:alice -- consumer --topic test_tp1 --group test_group Adding ACLs for resource `Topic:LITERAL:test_tp1`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Read from hosts: * Adding ACLs for resource `Group:LITERAL:test_group`: User:alice has Allow permission for operations: Read from hosts: * Current ACLs for resource `Topic:LITERAL:test_tp1`: User:alice has Allow permission for operations: Describe from hosts: * User:alice has Allow permission for operations: Create from hosts: * User:alice has Allow permission for operations: Write from hosts: * User:alice has Allow permission for operations: Read from hosts: * Current ACLs for resource `Group:LITERAL:test_group`: User:alice has Allow permission for operations: Read from hosts: *
- 生产者测试
[root@test kafka_cluster_ss]# k1/bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test_tp1 --producer.config producer.config >s >ss1 >ss2 >
- 消费者测试
[root@test kafka_cluster_ss]# k1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test_tp1 --from-beginning --consumer.config consumer.config s1 ss1 ss2
- 为topic test_tp1 添加生产者用户alice
2.4 acl 例子
-
添加acl
假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl
允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如
果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
需要注意的是--allow-host和deny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。
-
删除acl
删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
-
acl列表
我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
-
添加或删除作为生产者或消费者的principal
acl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。
2.6 kafka的一些client的设置
-
librdkafkacpp
std::string broker_list = "localhost:9092,localhost:9093,localhost:9094"; RdKafka::Conf* global_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); global_conf->set("metadata.broker.list", broker_list, err_string); global_conf->set("security.protocol", "sasl_plaintext", err_string); global_conf->set("sasl.mechanisms", "PLAIN", err_string); global_conf->set("sasl.username", username.c_str(), err_string); global_conf->set("sasl.password", password.c_str(), err_string);