Kafka 3.3.1版本配置SASL认证和较老的版本配置SASL认证有区别,本文将使用一个较为简单的方式做一个记录
首先在Kafka的官网下载3.3.1版本,然后再安装JDK,JDK的版本要求在8到18都可以,其他的JDK版本没有测试过,下载好Kafka后是一个gz文件,自行解压到服务器合适的目录中
zookeeper配置SASL认证
在3.3.1版本的Kafka中已经集成了zookeeper,我们只需要给zookeeper做好相关sasl的配置即可
-
创建配置文件zookeeper_server_jaas.conf
该文件为zookeeper的SASL用户配置文件,文件名称可自定义,存放路径也可自定义,在这里,我会把该文件存放到Kafka解压后的config目录下,文件内容如下 :
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zkClusterAdmin"
password="zkClusterAdminPassword"
user_zkClient="zkClientPassword";
};
- username和password为zookeeper做为集群部署时使用的用户名和密码
- user_zkClient表示添加一个叫zkClient的用户,密码为zkClientPassword,这个用户名和密码将用于kafka连接zookeeper时使用
- user_开头的用户为zookeeper分配给其他客户端连接时使用的用户名和密码
- 注意有两个“
;
”号不能少且位置也不能错 - 注意配置中的“
Server
”,这个为zookeeper启动时读取配置的默认名称,大小写严格,当然也可以自定义名称,如需使用自定义名称,则需要配合下面第3步
使用
-
修改zookeeper的启动配置文件 config/zookeeper.properties
在该文件中添加如下三行内容:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
- authProvider.1表示开启认证功能,其中的“1”表示开启认证功能的优先级,这个数越小优先级越高,应该不能小于0
- requireClientAuthScheme表示认证的方式,配置为SASL的认证方式
-
修改zookeeper的启动文件bin/zookeeper-server-start.sh
需要找到该文件中export KAFKA_HEAP_OPTS=
这行,在其配置的参数后面添加-Djava.security.auth.login.config=zookeeper_server_jaas.conf
,如下所示:
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx2048M -Xms512M -Djava.security.auth.login.config=zookeeper_server_jaas.conf"
fi
...
- 如果在zookeeper_server_jaas.conf配置文件中使用的配置项名称为自定义名称,例如使用的是zkServer,则还需要添加配置参数
-Dzookeeper.sasl.serverconfig=zkServer
,完整配置如下所示:
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx2048M -Xms512M -Djava.security.auth.login.config=zookeeper_server_jaas.conf -Dzookeeper.sasl.serverconfig=zkServer"
fi
...
-
-Djava.security.auth.login.config=
配置的是你实际zookeeper_server_jaas.conf文件名和相对或绝对路径,
比如在本文中,我的config和bin是平级目录,zookeeper-server-start.sh在bin目录下,那么正确的配置为-Djava.security.auth.login.config=../config/zookeeper_server_jaas.conf
经过以上三个步骤,zookeeper就已经配置完成了,zookeeper就可以正常启动了:
./zookeeper-server-start.sh ../config/zookeeper.properties
- zookeeper的启动端口默认为2181,可自行在zookeeper.properties文件中修改
kafka配置SASL认证
-
创建配置文件kafka_server_jaas.conf
该文件为kafkaServer的用户配置和连接zookeeper的用户配置文件,文件名称可自定义,存放路径也可自定义,这里我把文件都放到解压的config目录下,内容如下
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="brokerAdmin"
password="borkerAdminPassword"
user_borkerAdmin="borkerAdminPassword"
user_borkerClient="borkerClientPassword";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="zkClient"
password="zkClientPassword";
};
在KafkaServer配置项中,
username="brokerAdmin"
、password="borkerAdminPassword"
和user_borkerAdmin="borkerAdminPassword"
这三项不能少且必须配置,在第三项配置user_borkerAdmin="borkerAdminPassword"
中"user_"后面必须为username的值,=
后面必须为password的值,这项配置非常重要且必须保证配置正确,否则kafka启动时会报鉴权失败且启动不会成功,假如你配置的username="admin", password="abcd",那么第三项则为: user_admin="abcd"user_borkerClient="borkerClientPassword"
表示创建一个叫borkerClient的用户,密码为borkerClientPassword,这个用于consumer、producer、config、acl等连接kafka时使用的用户及密码,当然也可以给不同的端创建不同的账号和密码,这儿可以添加多个用户及密码,比如:user_a="aPassword",user_b="bPassword",最后一个用户配置那行记得以“;
”结尾username和password是broker的管理账号和密码,同时也是作为broker集群同步时使用的用户名和密码
在Client配置项中,username和password为zookeeper_server_jaas.conf中配置的客户端连接用户名及密码
KafkaServer和Client这两个配置项名称
暂时不自定义
,保持大小写严格;可自定义,只是难得找 如何使用自定义名称的配置同样注意 KafkaServer和Client配置项中的两个"
;
"号不能少且位置不能变
-
修改KafkaServer配置文件 server.properties
该文件在解压的config目录下,在该文件内容末尾添加如下配置项:
listeners=SASL_PLAINTEXT://yourIp:yourPort
advertised.listeners=SASL_PLAINTEXT://yourHostName:yourPort
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:brokerAdmin
配置中的authorizer.class.name老版本是使用的
kafka.security.auth.SimpleAclAuthorizer
这个类,新版本使用的是kafka.security.authorizer.AclAuthorizer
这个类配置中的allow.everyone.if.no.acl.found配置为true,如果设置在false,目前我知道的唯一后果就是我们在kafka_server_jaas.conf中配置的以user_开头的用户就不好使了,需要使用bin/kafka-configs.sh添加用户,再使用bin/kafka-acls.sh配置用户权限才能正常使用,网上对这个配置项的说法是:开启之后,borker节点之间的认证和同步才能正常进行,borker节点之间使用的正是“everyone”这个用户进行同步的,但是如果是这样,那kafka_server_jaas.conf文件中的KafkaServer的username又算个什么事儿呢?没搞明白
listeners和advertised.listeners请配置为自己的IP和端口,一般listeners中的yourIp配置为局域网中本机的IP地址,advertised.listeners中的yourHostName配置为你服务器的域名,没有域名公网IP也行,如果没有公网IP就配置为局域网IP,yourPort配置为你的kafka端口
super.users=User:后面的用户名“brokerAdmin”为kafka_server_jaas.conf里KafkaServer中的username的值
-
修改KafkaServer启动文件 kafka-server-start.sh
该文件存在于解压的bin目录下,找到文件中export KAFKA_HEAP_OPTS=
这行,在其配置参数中添加配置参数-Djava.security.auth.login.config=kafka_server_jaas.conf
,如下所示:
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx4G -Xms1G -Djava.security.auth.login.config=kafka_server_jaas.conf"
fi
...
-
-Djava.security.auth.login.config=
你实际kafka_server_jaas.conf的名称及相对或绝对路径,我的config和bin是平级目录,kafka-server-start.sh在bin目录下,那么正确的配置为- Djava.security.auth.login.config=../config/kafka_server_jaas.conf
到此KafkaServer就已经配置完成了,可以正常启动KafkaServer了:
./kafka-server-start.sh ../config/server.properties
从第4步开始就不再是必须配置的步骤了
-
添加kafka_client_jaas.conf文件
如果想使用kafka-console-consumer.sh或kafka-console-producer.sh测试我们创建的Kafka服务是否正常可用,则需要创建该配置,该配置文件是连接KafkaServer的用户配置文件,文件名称和存放路径均可自定义,我还是把该文件存放到解压的config目录中,内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="borkerClient"
password="borkerClientPassword";
};
- 配置中的username和password为kafka_server_jaas.conf中配置的user_开头的用户及对应的密码
- KafkaClient这个名称不能变,保持大小严格
- 同样两个“
;
”不能少且位置不能变
-
配置kafka-console-consumer.sh和kafka-console-producer.sh启动文件
kafka-console-consumer.sh和kafka-console-producer.sh都在解压的bin目录中,依次打开这两个文件,同样找到export KAFKA_HEAP_OPTS=
这一行,在其配置参数中添加参数-Djava.security.auth.login.config=kafka_client_jaas.conf
,如下所示:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=kafka_client_jaas.conf"
fi
-
配置consumer.properties和producer.properties文件
consumer.properties和producer.properties在解压的config目录下,依次打开这两个文件,在末尾添加以下内容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- 注意,在这两个配置文件中,
bootstrap.servers=ip:port
要修改为你在配置文件server.properties中配置的listeners或advertised.listeners的IP和端口
7.创建配置文件command_config.properties
该文件其实也是配置的连接Kafka的用户配置文件,但是该文件用于kafka-acls.sh,kafka-configs.sh等文件,
同样的名称和路径可自定义,我这里还是把该文件放到解压的config目录中,内容如下:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="borkerClient" password="borkerClientPassword";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
- 配置中的username和passowrd为kafka_server_jaas.conf中配置的user_开头的用户及对应的密码
- password="borkerClientPassword"后面的那个“
;
”不能少
-
验证KafkaServer是否正常运行
创建一个topic,命令为:
./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test --partitions 1 --replication-factor 1 --command-config ../config/command_config.properties
- --bootstrap-server 后面给KafkaServer的IP和配置的端口,不再是zookeeper的ip和端口,这是新版Kafka的使用方式
- --command-config 后面给上面创建的command_config.properties文件
- 以上命令是创建了一个名称叫“test”的topic
开启两个shell窗口,一个用于生产消息,另一个用于消费消息
消费消息这边命令如下:
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning --consumer.config ../config/consumer.properties
生产消息这边命令如下:
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config ../config/producer.properties
然后在生产消息这边发送几条消息,在消费消息那边就可以看到生产的消息了。说明kafka配置SASL后服务正常运行
在创建topic命令中,如果不加载认证方式SASL,将会出现认证失败导致无法创建topic,命令如下:
./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test --partitions 1 --replication-factor 1
- 以上命令,如果KafkaServer没有配置SASL认证,可以正常使用
同样消息者和生产者不添加用户配置信息,也无法生产消息和消费消息:
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test
- 以上两个命令,如果KafkaServer没有配置SASL认证,可以正常使用
到此,关于Kafka配置SASL认证的所有步骤都已经写完了,看的朋友们,希望你们要清晰地去理解各个配置项的意义和作用,不要搞混也不要搞错,错一步就无法发消息和接收消息,甚至于连一个用于测试的topic都创建不了,我第一次尝试,由于配置错误,连KafkaServer都启动不了
如果不想创建上面第4步的kafka_client_jaas.conf配置文件,这个配置文件当前只用于kafka-console-consumer.sh和kafka-console-producer.sh两个文件,我们只需要在config目录下的consumer.properties和producer.properties文件中,在上面第6步的基础上,再添加一行配置:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="borkerClient" password="borkerClientPassword";
即可
在consumer.properties和producer.properties中完整的添加内容如下:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="borkerClient" password="borkerClientPassword";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
就和上面第7步中的command_config.properties文件中的内容一样,通过这样配置之后,我们就不需要修改kafka-console-consumer.sh和kafka-console-producer.sh两个文件了,当然消费消息和生产消息依然要加载各自的properties
# 创建topic指令依然保持一致
./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test --partitions 1 --replication-factor 1 --command-config ../config/command_config.properties
# 消费消息
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning --consumer.config ../config/consumer.properties
# 生产消息
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config ../config/producer.properties
再记录一些简单的kafka的命令使用
# 以下操作都必须使用kafka的管理员进行添加
# 添加用户
新:./kafka-configs.sh --bootstrap-server ip:port --alter --add-config 'SCRAM-SHA-256=[password=abcd],SCRAM-SHA-512=[password=abcd]' --entity-type users --entity-name usera --command-config ../config/command_config.properties
密码和用户名称根据实际配置的进行修改
# 给指定用户添加指定topic【读和写权限】
新:./kafka-acls.sh --bootstrap-server ip:port -add --allow-principal User:usera --operation Read --operation Write --topic topicName --command-config ../config/command_config.properties
当topic为*的时候,表示分配给指定用户所有topic的读和写权限,--operation 后面可以给*,表示分配topic的所有权限
# 给用户添加指定topic【消息生产权限】
新:./kafka-acls.sh --bootstrap-server ip:port --add --allow-principal User:usera --producer --topic topicName --command-config ../config/command_config.properties
当topic为*的时候,表示分配给指定用户所有topic的生产者权限
# 给用户添加指定topic【消息消费权限】
新:./kafka-acls.sh --bootstrap-server 1ip:port --add --allow-principal User:usera --consumer --topic topicName --group group-1 --command-config ../config/command_config.properties
当topic为*的时候,表示分配给指定用户所有topic的消息消费权限,group名称根据该用户实际所在的组名称设置
# 查看指定topic的开放权限信息(主要是查看对哪些用户开放了哪些权限)
./kafka-acls.sh --bootstrap-server ip:port--list --topic topicName --command-config ../config/command_config.properties
# 删除指定topic的指定用户的【写】权限
旧:./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --operation Write --allow-principal User:userName --allow-host ***.***.***.*** --remove --topic topicName
新:./kafka-acls.sh --bootstrap-server ip:port --operation Write --allow-principal User:user --allow-host ***.***.***.*** --remove --topic topicName --command-config ../config/command_config.properties
当User为*的时候,表示所有用户,当topic为*的时候,表示所有topic
# 删除指定topic的所有权限,当一个topic的所有权限都删除,表示这个topic任何用户都可以操作
旧:./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --topic topicName
新:./kafka-acls.sh --bootstrap-server ip:port --remove --topic topicName --command-config ../config/command_config.properties
# 删除指定topic多个用户的读和写权限 未验证
旧:./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:crawler --allow-principal User:Alice --allow-host ***.***.***.*** --allow-host ***.***.***.*** --operation Read --operation Write --topic topicName
新:./kafka-acls.sh --bootstrap-server ip:port --remove --allow-principal User:crawler --allow-principal User:Alice --allow-host ***.***.***.*** --allow-host ***.***.***.*** --operation Read --operation Write --topic topicName --command-config ../config/command_config.properties
所谓的新老指令,其实就是把老指令中的
--authorizer-properties zookeeper.connect=zookeeperIp:zookeeperPort
替换为--bootstrap-server kafkaServerIp:kafkaServerPort
如果KafkaServer配置了SASL认证,通过执行 bin目录下各个sh文件的 --help指令,查看有没有command-config选项,如果有该选项,就在该sh命令后面添加
--command-config ../config/command_config.properties
参数目前发现kafka-console-consumer.sh和kafka-console-producer.sh这两个sh文件有自己properties加载选项:
kafka-console-consumer.sh 的properties加载选项是 --consumer.config ../config/consumer.properties
kafka-console-producer.sh 的properties加载选项是 --producer.config ../config/producer.properties