kafka配置SASL/PLAIN与权限管理调研

kafka配置SASL/PLAIN与权限管理调研

配置SASL鉴权

配置kafka_server_jaas.conf配置文件

$ vi config/kafka_server_jaas.conf
#这里配置了两个用户,admin:admin-secret,alice:alice-secret
#注意username和password是用来登录集群里其他机器用的
#生产环境这里需要通过接口重写
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_alice="alice-secret";
};
#把kafka_server_jaas.conf配置到java环境变量中
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G 
-Djava.security.auth.login.config=/usr/local/kafka_2.12-2.5.0/config/kafka_server_jaas.conf"

修改服务端配置

$ vi config/server.properties
listeners=PLAINTEXT://:9092,SASL_SSL://:9093
sasl.enabled.mechanisms=PLAIN

修改客户端配置

$ vi config/client-ssl.properties 
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
    username="alice" \
    password="alice-secret";

验证一下

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config config/client-ssl.properties
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config config/client-ssl.properties --from-beginning

ACL配置

服务端配置

$ vi config/server.properties
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
#注意这里需要配置匿名用户,匿名用户用于集群之间的通信
super.users=User:admin;User:ANONYMOUS

这个时候使用admin用户可以正常消费数据,alice会显示无权限消费

为alice增加topic权限

$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=192.168.0.26:2181 --add --allow-principal User:alice --consumer --topic alice- --resource-pattern-type prefixed --group group1

这条授权命令授权alice用户消费所有以alice-开头的topic,并且消费者组=group1

测试

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic alice-a

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic alice-a

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic alice-a --consumer.config config/client-ssl.properties --from-beginning --group group1

测试是否能够进行模糊订阅

Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.0.12:9093");
props.setProperty("group.id", "group1");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
//通过这个配置加快新topic的生效时间
props.setProperty("metadata.max.age.ms", "5000");


props.setProperty("security.protocol", "SASL_SSL");
props.setProperty("ssl.truststore.location", "C:\\client.truststore.jks");
props.setProperty("ssl.truststore.password", "123");
props.setProperty("sasl.mechanism", "PLAIN");
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"alice\"  password=\"alice-secret\";");
props.setProperty("ssl.endpoint.identification.algorithm", "");


props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Pattern.compile("alice-.*"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

测试成功,新加入的topic只要符合规则就可以立即被授权与生效

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。