网上的相关资料较多,只说几个需要注意的地方:
1. 关于kafka自带的zk
新版本kafka的发行版中自带了zookeeper,开启sasl过程会更方便(不用拷贝相关的jar文件),网上有人说到最好不要用内置的zookeeper,但没有确切证据,个人认为应该是推荐使用,需要注意的是,kafka自带的zookeeper和标准的zookeeper使用上不太一样,相关资料相对较少,但基本配置基本没有多大变化。
2.zookeeper.properties或者zoo.cfg的配置信息增加如下信息
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
3.zk_server_jaas.conf配置文件
这个配置文件在zookeeper启动参数中指定,内容是存储用户的信息:
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
说明:
username="admin", password="admin-secret"
用于zookeeper服务节点内部通讯使用。
user_admin="admin-secret"
是创建一个名称为admin的账号,密码为admin-secret
,相当于客户端接入的账号,后面kafka连接zookeeper时使用这个账号。
4.zookeeper启动命令
export KAFKA_OPTS=" -Djava.security.auth.login.config=conf/zk_server_jaas.conf "
$kafka_root/bin/zookeeper-server-start.sh conf/zookeeper.properties
zk_server_jaas.conf
是第三步中配置的文件,KAFKA_OPTS变量会在zookeeper-server-start.sh
中使用,这个只有kafka自带的zookeeper才能识别,标准的zookeeper需要修改启动脚本。
5.kafka配置文件server.properies
listeners=SASL_PLAINTEXT://localhost:9092
advertised.listeners=SASL_PLAINTEXT://localhost:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
super.users=User:admin
需要注意的是,这一段网上内容比较乱。advertised.listeners
的内容需要是listeners
的子集,sasl.mechanism.inter.broker.protocol
是客户端接入的时候的加密算法。
6.kafka的用户配置文件kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
第一段KafkaServer是指明服务器端各节点通讯,及kafka的客户端登录的账号密码,第二段Client是指的kafka连接zookeeper时所使用的sasl用户密码。网上有人说Client端无用,经验证是有用的。
7.kafka启动
启动命令行要指定第5\6步的配置信息
export KAFKA_OPTS=" -Djava.security.auth.login.config=kafka_server_jaas.conf"
$kafka_root/bin/kafka-server-start.sh server.properties
8.客户端接入
python上以以confluent_kafka为例,构建客户端对象时的信息:
conf = {'bootstrap.servers':'localhost:9092',
'group.id': uuid.uuid1(),
'session.timeout.ms':6000,
'security.protocol':'SASL_PLAINTEXT',
'sasl.mechanism':'PLAIN',
'sasl.username':'admin',
'sasl.password':'admin-secret',
'default.topic.config': {
'auto.offset.reset':'earliest'
}
其中sasl.username':'admin
为kafka_server_jaas.conf
的KafkaServer段所指定的用户名及密码,sasl.mechanism':'PLAIN'
为kafka配置文件server.properies
中所指定的加密方法。
上述过程已经过实际验证,如有疏漏请指正!