Kafka3.3.1版本配置SASL认证

Kafka 3.3.1版本配置SASL认证和较老的版本配置SASL认证有区别,本文将使用一个较为简单的方式做一个记录

首先在Kafka的官网下载3.3.1版本,然后再安装JDK,JDK的版本要求在8到18都可以,其他的JDK版本没有测试过,下载好Kafka后是一个gz文件,自行解压到服务器合适的目录中

zookeeper配置SASL认证

在3.3.1版本的Kafka中已经集成了zookeeper,我们只需要给zookeeper做好相关sasl的配置即可

  1. 创建配置文件zookeeper_server_jaas.conf
    该文件为zookeeper的SASL用户配置文件,文件名称可自定义,存放路径也可自定义,在这里,我会把该文件存放到Kafka解压后的config目录下,文件内容如下 :
Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="zkClusterAdmin"
    password="zkClusterAdminPassword"
    user_zkClient="zkClientPassword";
};
  • username和password为zookeeper做为集群部署时使用的用户名和密码
  • user_zkClient表示添加一个叫zkClient的用户,密码为zkClientPassword,这个用户名和密码将用于kafka连接zookeeper时使用
  • user_开头的用户为zookeeper分配给其他客户端连接时使用的用户名和密码
  • 注意有两个“;”号不能少且位置也不能错
  • 注意配置中的“Server”,这个为zookeeper启动时读取配置的默认名称,大小写严格,当然也可以自定义名称,如需使用自定义名称,则需要配合下面第3步使用
  1. 修改zookeeper的启动配置文件 config/zookeeper.properties
    在该文件中添加如下三行内容:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
  • authProvider.1表示开启认证功能,其中的“1”表示开启认证功能的优先级,这个数越小优先级越高,应该不能小于0
  • requireClientAuthScheme表示认证的方式,配置为SASL的认证方式
  1. 修改zookeeper的启动文件bin/zookeeper-server-start.sh
    需要找到该文件中export KAFKA_HEAP_OPTS=这行,在其配置的参数后面添加-Djava.security.auth.login.config=zookeeper_server_jaas.conf,如下所示:
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx2048M -Xms512M -Djava.security.auth.login.config=zookeeper_server_jaas.conf"
fi
...
  • 如果在zookeeper_server_jaas.conf配置文件中使用的配置项名称为自定义名称,例如使用的是zkServer,则还需要添加配置参数-Dzookeeper.sasl.serverconfig=zkServer,完整配置如下所示:
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx2048M -Xms512M -Djava.security.auth.login.config=zookeeper_server_jaas.conf -Dzookeeper.sasl.serverconfig=zkServer"
fi
...
  • -Djava.security.auth.login.config=配置的是你实际zookeeper_server_jaas.conf文件名和相对或绝对路径,
    比如在本文中,我的config和bin是平级目录,zookeeper-server-start.sh在bin目录下,那么正确的配置为-Djava.security.auth.login.config=../config/zookeeper_server_jaas.conf

经过以上三个步骤,zookeeper就已经配置完成了,zookeeper就可以正常启动了:

./zookeeper-server-start.sh ../config/zookeeper.properties
  • zookeeper的启动端口默认为2181,可自行在zookeeper.properties文件中修改

kafka配置SASL认证

  1. 创建配置文件kafka_server_jaas.conf
    该文件为kafkaServer的用户配置和连接zookeeper的用户配置文件,文件名称可自定义,存放路径也可自定义,这里我把文件都放到解压的config目录下,内容如下
KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="brokerAdmin"
    password="borkerAdminPassword"
    user_borkerAdmin="borkerAdminPassword"
    user_borkerClient="borkerClientPassword";
};
Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="zkClient"
    password="zkClientPassword";
};
  • 在KafkaServer配置项中,username="brokerAdmin"password="borkerAdminPassword"user_borkerAdmin="borkerAdminPassword"这三项不能少且必须配置,在第三项配置user_borkerAdmin="borkerAdminPassword"中"user_"后面必须为username的值, =后面必须为password的值,这项配置非常重要且必须保证配置正确,否则kafka启动时会报鉴权失败且启动不会成功,假如你配置的username="admin", password="abcd",那么第三项则为: user_admin="abcd"

  • user_borkerClient="borkerClientPassword"表示创建一个叫borkerClient的用户,密码为borkerClientPassword,这个用于consumer、producer、config、acl等连接kafka时使用的用户及密码,当然也可以给不同的端创建不同的账号和密码,这儿可以添加多个用户及密码,比如:user_a="aPassword",user_b="bPassword",最后一个用户配置那行记得以“;”结尾

  • username和password是broker的管理账号和密码,同时也是作为broker集群同步时使用的用户名和密码

  • 在Client配置项中,username和password为zookeeper_server_jaas.conf中配置的客户端连接用户名及密码

  • KafkaServer和Client这两个配置项名称暂时不自定义,保持大小写严格;可自定义,只是难得找 如何使用自定义名称的配置

  • 同样注意 KafkaServer和Client配置项中的两个";"号不能少且位置不能变

  1. 修改KafkaServer配置文件 server.properties
    该文件在解压的config目录下,在该文件内容末尾添加如下配置项:
listeners=SASL_PLAINTEXT://yourIp:yourPort
advertised.listeners=SASL_PLAINTEXT://yourHostName:yourPort
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=true
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:brokerAdmin
  • 配置中的authorizer.class.name老版本是使用的kafka.security.auth.SimpleAclAuthorizer这个类,新版本使用的是kafka.security.authorizer.AclAuthorizer这个类

  • 配置中的allow.everyone.if.no.acl.found配置为true,如果设置在false,目前我知道的唯一后果就是我们在kafka_server_jaas.conf中配置的以user_开头的用户就不好使了,需要使用bin/kafka-configs.sh添加用户,再使用bin/kafka-acls.sh配置用户权限才能正常使用,网上对这个配置项的说法是:开启之后,borker节点之间的认证和同步才能正常进行,borker节点之间使用的正是“everyone”这个用户进行同步的,但是如果是这样,那kafka_server_jaas.conf文件中的KafkaServer的username又算个什么事儿呢?没搞明白

  • listeners和advertised.listeners请配置为自己的IP和端口,一般listeners中的yourIp配置为局域网中本机的IP地址,advertised.listeners中的yourHostName配置为你服务器的域名,没有域名公网IP也行,如果没有公网IP就配置为局域网IP,yourPort配置为你的kafka端口

  • super.users=User:后面的用户名“brokerAdmin”为kafka_server_jaas.conf里KafkaServer中的username的值

  1. 修改KafkaServer启动文件 kafka-server-start.sh
    该文件存在于解压的bin目录下,找到文件中export KAFKA_HEAP_OPTS=这行,在其配置参数中添加配置参数-Djava.security.auth.login.config=kafka_server_jaas.conf,如下所示:
...
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx4G -Xms1G -Djava.security.auth.login.config=kafka_server_jaas.conf"
fi
...
  • -Djava.security.auth.login.config=你实际kafka_server_jaas.conf的名称及相对或绝对路径,我的config和bin是平级目录,kafka-server-start.sh在bin目录下,那么正确的配置为- Djava.security.auth.login.config=../config/kafka_server_jaas.conf

到此KafkaServer就已经配置完成了,可以正常启动KafkaServer了:

./kafka-server-start.sh ../config/server.properties

从第4步开始就不再是必须配置的步骤了

  1. 添加kafka_client_jaas.conf文件
    如果想使用kafka-console-consumer.sh或kafka-console-producer.sh测试我们创建的Kafka服务是否正常可用,则需要创建该配置,该配置文件是连接KafkaServer的用户配置文件,文件名称和存放路径均可自定义,我还是把该文件存放到解压的config目录中,内容如下:
KafkaClient {  
    org.apache.kafka.common.security.plain.PlainLoginModule required  
    username="borkerClient"  
    password="borkerClientPassword";  
};
  • 配置中的username和password为kafka_server_jaas.conf中配置的user_开头的用户及对应的密码
  • KafkaClient这个名称不能变,保持大小严格
  • 同样两个“;”不能少且位置不能变
  1. 配置kafka-console-consumer.sh和kafka-console-producer.sh启动文件
    kafka-console-consumer.sh和kafka-console-producer.sh都在解压的bin目录中,依次打开这两个文件,同样找到export KAFKA_HEAP_OPTS=这一行,在其配置参数中添加参数-Djava.security.auth.login.config=kafka_client_jaas.conf,如下所示:
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=kafka_client_jaas.conf"
fi
  1. 配置consumer.properties和producer.properties文件
    consumer.properties和producer.properties在解压的config目录下,依次打开这两个文件,在末尾添加以下内容:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
  • 注意,在这两个配置文件中,bootstrap.servers=ip:port要修改为你在配置文件server.properties中配置的listeners或advertised.listeners的IP和端口

7.创建配置文件command_config.properties
该文件其实也是配置的连接Kafka的用户配置文件,但是该文件用于kafka-acls.sh,kafka-configs.sh等文件,
同样的名称和路径可自定义,我这里还是把该文件放到解压的config目录中,内容如下:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="borkerClient" password="borkerClientPassword";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
  • 配置中的username和passowrd为kafka_server_jaas.conf中配置的user_开头的用户及对应的密码
  • password="borkerClientPassword"后面的那个“;”不能少
  1. 验证KafkaServer是否正常运行
    创建一个topic,命令为:
./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test --partitions 1 --replication-factor 1 --command-config ../config/command_config.properties
  • --bootstrap-server 后面给KafkaServer的IP和配置的端口,不再是zookeeper的ip和端口,这是新版Kafka的使用方式
  • --command-config 后面给上面创建的command_config.properties文件
  • 以上命令是创建了一个名称叫“test”的topic

开启两个shell窗口,一个用于生产消息,另一个用于消费消息
消费消息这边命令如下:

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning --consumer.config ../config/consumer.properties

生产消息这边命令如下:

./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config ../config/producer.properties

然后在生产消息这边发送几条消息,在消费消息那边就可以看到生产的消息了。说明kafka配置SASL后服务正常运行

在创建topic命令中,如果不加载认证方式SASL,将会出现认证失败导致无法创建topic,命令如下:

./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test --partitions 1 --replication-factor 1
  • 以上命令,如果KafkaServer没有配置SASL认证,可以正常使用

同样消息者和生产者不添加用户配置信息,也无法生产消息和消费消息:

./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test
  • 以上两个命令,如果KafkaServer没有配置SASL认证,可以正常使用

到此,关于Kafka配置SASL认证的所有步骤都已经写完了,看的朋友们,希望你们要清晰地去理解各个配置项的意义和作用,不要搞混也不要搞错,错一步就无法发消息和接收消息,甚至于连一个用于测试的topic都创建不了,我第一次尝试,由于配置错误,连KafkaServer都启动不了


如果不想创建上面第4步的kafka_client_jaas.conf配置文件,这个配置文件当前只用于kafka-console-consumer.sh和kafka-console-producer.sh两个文件,我们只需要在config目录下的consumer.properties和producer.properties文件中,在上面第6步的基础上,再添加一行配置:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="borkerClient" password="borkerClientPassword";即可
在consumer.properties和producer.properties中完整的添加内容如下:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="borkerClient" password="borkerClientPassword";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

就和上面第7步中的command_config.properties文件中的内容一样,通过这样配置之后,我们就不需要修改kafka-console-consumer.sh和kafka-console-producer.sh两个文件了,当然消费消息和生产消息依然要加载各自的properties

# 创建topic指令依然保持一致
./kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --topic test --partitions 1 --replication-factor 1 --command-config ../config/command_config.properties
# 消费消息
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning --consumer.config ../config/consumer.properties
# 生产消息
./kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config ../config/producer.properties

再记录一些简单的kafka的命令使用

# 以下操作都必须使用kafka的管理员进行添加

# 添加用户
新:./kafka-configs.sh --bootstrap-server ip:port --alter --add-config 'SCRAM-SHA-256=[password=abcd],SCRAM-SHA-512=[password=abcd]' --entity-type users --entity-name usera --command-config ../config/command_config.properties
密码和用户名称根据实际配置的进行修改

# 给指定用户添加指定topic【读和写权限】
新:./kafka-acls.sh --bootstrap-server ip:port -add --allow-principal User:usera  --operation Read --operation Write --topic topicName --command-config ../config/command_config.properties
当topic为*的时候,表示分配给指定用户所有topic的读和写权限,--operation 后面可以给*,表示分配topic的所有权限

# 给用户添加指定topic【消息生产权限】
新:./kafka-acls.sh --bootstrap-server ip:port --add --allow-principal User:usera --producer --topic topicName --command-config ../config/command_config.properties
当topic为*的时候,表示分配给指定用户所有topic的生产者权限 

# 给用户添加指定topic【消息消费权限】
新:./kafka-acls.sh --bootstrap-server 1ip:port --add --allow-principal User:usera --consumer --topic topicName --group group-1 --command-config ../config/command_config.properties
当topic为*的时候,表示分配给指定用户所有topic的消息消费权限,group名称根据该用户实际所在的组名称设置

# 查看指定topic的开放权限信息(主要是查看对哪些用户开放了哪些权限)
./kafka-acls.sh --bootstrap-server ip:port--list --topic topicName --command-config ../config/command_config.properties

# 删除指定topic的指定用户的【写】权限
旧:./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --operation Write --allow-principal User:userName --allow-host ***.***.***.*** --remove --topic topicName
新:./kafka-acls.sh --bootstrap-server ip:port --operation Write --allow-principal User:user --allow-host ***.***.***.*** --remove --topic topicName --command-config ../config/command_config.properties
当User为*的时候,表示所有用户,当topic为*的时候,表示所有topic

# 删除指定topic的所有权限,当一个topic的所有权限都删除,表示这个topic任何用户都可以操作
旧:./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --topic topicName
新:./kafka-acls.sh --bootstrap-server ip:port --remove --topic topicName --command-config ../config/command_config.properties

# 删除指定topic多个用户的读和写权限 未验证
旧:./kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:crawler --allow-principal User:Alice --allow-host ***.***.***.*** --allow-host ***.***.***.*** --operation Read --operation Write --topic topicName
新:./kafka-acls.sh --bootstrap-server ip:port --remove --allow-principal User:crawler --allow-principal User:Alice --allow-host ***.***.***.*** --allow-host ***.***.***.*** --operation Read --operation Write --topic topicName --command-config ../config/command_config.properties
  • 所谓的新老指令,其实就是把老指令中的 --authorizer-properties zookeeper.connect=zookeeperIp:zookeeperPort 替换为 --bootstrap-server kafkaServerIp:kafkaServerPort

  • 如果KafkaServer配置了SASL认证,通过执行 bin目录下各个sh文件的 --help指令,查看有没有command-config选项,如果有该选项,就在该sh命令后面添加 --command-config ../config/command_config.properties参数

  • 目前发现kafka-console-consumer.sh和kafka-console-producer.sh这两个sh文件有自己properties加载选项:
    kafka-console-consumer.sh 的properties加载选项是 --consumer.config ../config/consumer.properties
    kafka-console-producer.sh 的properties加载选项是 --producer.config ../config/producer.properties

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,997评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,603评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,359评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,309评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,346评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,258评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,122评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,970评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,403评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,596评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,769评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,464评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,075评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,705评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,848评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,831评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,678评论 2 354

推荐阅读更多精彩内容