2020-10-22
官方配置:http://kafka.apache.org/documentation/#security_sasl_plain
kafka:类似于一个数据缓冲区,应用程序接收不了大量数据的冲击,依靠kafka在中间做个缓冲,kafka依靠zk达到一个高可用状态
zk:注册中心
1、环境准备
1、zk安装--使用kafka的sasl认证 #zk安装请看上个文档https://www.jianshu.com/p/3b22058ee3fd
2、修改主配置文件、创建zk_jaas文件、声明位置、拷贝kafka下认证文件----如果是集群,所有节点的机器内容要同步修改
1、修改zk的主配置文件---节点同步修改
[root@elastic41 zookeeper]# vim conf/zoo.cfg
#文件中新增以下内容
#kafka加密认证使用
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider ##指定使用kafka的认证机制为plain
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
zookeeper.sasl.client=true
2、创建zk_jaas文件、声明位置---节点同步修改
[root@elastic41 zookeeper]# cat >> conf/zk_server_jaas.conf <<EOF
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required ##指定使用kafka的认证机制为plain
username="admin"
password="admin-secret"
user_kafka="kafka-sec"
user_producer="prod-sec";
};
EOF
3、声明jaas位置---节点同步修改
[root@elastic41 zookeeper]# vim bin/zkEnv.sh
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/data/zookeeper/conf/zk_server_jaas.conf" #将这个变量加入到zkEnv.sh脚本中,zkServer.sh和zkClient.sh启动会调用zkEnv.sh
4、拷贝kafka下认证文件到zk/lib中----节点同步修改
[root@elastic41 zookeeper]# cp -a /root/lxt/kafka_2.11-2.3.0/libs/{kafka-clients-2.3.0.jar,lz4-java-1.6.0.jar,slf4j-api-1.7.26.jar,slf4j-log4j12-1.7.26.jar,snappy-java-1.1.7.3.jar} lib/
5、重新启动zk---添加开机启动项
[root@elastic41 zookeeper]# bin/zkServer.sh restart
[root@elastic41 zookeeper]# echo "/data/zookeeper/bin/zkServer.sh start" >> /etc/rc.local
#######################并没有什么卵用的分割线######################
安装包下载:http://kafka.apache.org/downloads 下载二进制包解压就行 #我使用的版本 Scala 2.11- kafka_2.11-2.3.0.tgz
1、kafka在配置文件的config中创建两个文件server_jaas和client_jaas文件---集群节点同步修改
[root@elastic41 kafka_2.11-2.3.0]#cat >> config/kafka_server_jaas.conf <<EOF
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-sec"
user_admin="admin-sec"
user_producer="prod-sec"
user_consumer="cons-sec";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka-sec";
};
EOF
--------------------------------------------------分割线----------------------------------------------
[root@elastic42 kafka_2.11-2.3.0]# cat >> config/kafka_client_jaas.conf <<EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="producer"
password="prod-sec";
};
EOF
#######################并没有什么卵用的分割线######################
2、配置 kafka中config/server.properties,分别在3个节点修改配置文件,注意ip和broker.id号id号只要三个节点不一样就可以随意,id号和/tmp/kafka-logs/meta.properties下面的id号是对应的,如果id号改变记得修改或者删除meta文件让他重新生成,影响kafka启动!
[root@elastic42 kafka_2.11-2.3.0]# vim config/server.properties
broker.id=1 #其他节点记得改id号
listeners=SASL_PLAINTEXT://192.168.7.41:9092 #其他节点记得改ip
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin;User:producer;User:consumer
allow.everyone.if.no.acl.found=true #开启acl拒绝除管理员外任何人启动kafka集群
log.dirs=/data/kafka_2.12-2.3.0/kafka-logs
zookeeper.connect=192.168.7.41:2181,192.168.7.42:2181,192.168.7.43:2181
#######################并没有什么卵用的分割线######################
3、添加kafka-opts参数指定jaas文件位置---其他节点要同步!
#在bin/kafka-server-start.sh中指定server_jaas位置
#在 bin/kafka-console-consumer.sh 和 bin/kafka-console-producer.sh中指定client_jaas位置
[root@elastic41 kafka_2.11-2.3.0]# vim bin/kafka-server-start.sh #位置在base_dir=$(dirname $0)下面就行
export KAFKA_OPTS=" -Djava.security.auth.login.config=/root/lxt/kafka_2.11-2.3.0/config/kafka_server_jaas.conf"
[root@elastic41 kafka_2.11-2.3.0]# vim bin/kafka-console-consumer.sh #位置放到if上面 指定client_jaas
export KAFKA_OPTS="-Djava.security.auth.login.config=/root/lxt/kafka_2.11-2.3.0/config/kafka_client_jaas.conf"
[root@elastic41 kafka_2.11-2.3.0]# vim bin/kafka-console-producer.sh #位置放到if上面 指定client_jaas
export KAFKA_OPTS="-Djava.security.auth.login.config=/root/lxt/kafka_2.11-2.3.0/config/kafka_client_jaas.conf"
4、启动kafka--添加开机启动
[root@elastic41 kafka_2.11-2.3.0]#bin/kafka-server-start.sh config/server.properties #前台启动可查看报错之类信息、如果启动没有问题可以加 -daemon直接后台启动--或使用nohup &的方式后台启动
[root@elastic41 ~]# /root/lxt/kafka_2.11-2.3.0/bin/kafka-server-start.sh -daemon /root/lxt/kafka_2.11-2.3.0/config/server.properties
[root@elastic41 ~]#echo "/root/lxt/kafka_2.11-2.3.0/bin/kafka-server-start.sh -daemon /root/lxt/kafka_2.11-2.3.0/config/server.properties" >>/etc/rc.local #这个方式并不太友好,因为他启动依赖zk,如果zk还没起来就执行了这个命令,服务是起不来的
#######################并没有什么卵用的分割线######################
#创建topic
[root@elastic41 kafka_2.11-2.3.0] bin/kafka-topics.sh --create --topic testtopic --replication-factor 3 --partitions 1 --zookeeper localhost:2181
查看topic
[root@elastic41 kafka_2.11-2.3.0] bin/kafka-topics.sh --list --zookeeper localhost:2181
其他节点 产生数据测试
[root@elastic42 kafka_2.11-2.3.0] bin/kafka-console-producer.sh --broker-list 192.168.7.42:9092 --topic testtopic --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
另一个节点 消费者查看数据测试
[root@elastic43 kafka_2.11-2.3.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.7.43:9092 --topic testtopic --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN --from-beginning
#######################并没有什么卵用的分割线######################
报错信息集合
https://blog.csdn.net/zsyoung/article/details/100102551
大屌哥文档
https://blog.csdn.net/qq_31547771/article/details/100585535