背景
笔者之前曾负责过一个数据研究项目,由于项目中存在多目标数据下发的场景,而且源数据存储在MongoDB中,经过多次选型,最终采用了Kafka直接推送的方式。
首先读者必然会问,为什么一定要走外网?通常由于消息队列在于公司内网中,因此对安全性的考虑可以说不需要。
这里是由于源库与目标库不在一个内网环境,因此必然免不了通过外网进行交互,故此才有安全方面的考量。
然后需要交代一下一些其它因素,也就是应用场景:
- 目标库出于安全考虑,不愿意开放外网端口,而主动到外网拉取数据是最好的选择。
- 考虑到后期爬虫的进展,每天的数据量预估是非常大的,因此也是优先选型Kafka。
- 在解析MongoDB时用到了阿里开源的MongoShake,除去库到库直接同步数据外,解析binLog最好的方式就是推入Kafka,这也是唯一支持的消息队列。
与一般推数据的对比
通常推数据,无非就是2种方式,推和拉。在关系型数据库场景,我们一般可以通过时间戳去定时定量推送或拉取数据,这样的场景比比皆是。并且通常我们会架设一个服务或者应用,以接口的方式去处理,无形之中增加了我们维护一个服务的成本,而且当数据量非常大的时候,我们自己开发的程序也很难达到Kafka这样消息中间件的吞吐量。
挑战
试想一下(也就是我考虑过),当暴露在外网的消息队列被心怀恶意之人扫到,他可以做什么?简单的可以灌入大量恶意数据或者窃取数据,遇到高端的甚至可以利用漏洞攻击服务器。在这样的场景下,不如是消息生产者还是消费者,权限和加密已是势在必行了。
以下是笔者通过官网和各种教程踩坑后实践出的Kafka-ACL权限配置傻瓜教程,部分敏感之处做了简化处理,这也是一篇最简单的搭建权限教程,其中Kafka服务只展示了单机的方式,鉴权也仅是关注ACL验证,自然也降低了难度,只要跟着照做就必然能成功。虽然是傻瓜教程,但是其中涉及到的一些内容还是需要略有了解Kafka才可以。
版本
- Kafka version:2.11-2.2.2
- Spring-Boot version:2.2.6.RELEASE
Kafka server stand-alone
-
vim kafka_server_jaas.conf
创建conf文件KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_admin="admin" user_producer="prod" user_consumer="cons"; };
-
vim kafka-server-start.sh
在对应位置加上java安全文件配置if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/vega/kafka_2.11-2.2.2/config/kafka_server_jaas.conf" fi
cp server.properties server_sasl.properties
复制一份配置文件(或者说备份)-
vim server_sasl.properties
编辑配置文件,这里ip需要修改成内网地址,vega.aliyun为外网地址。对于配置文件中没有的参数,就加上listeners=SASL_PLAINTEXT://ip:9093,PLAINTEXT://ip:9092 # 这是kafka集群内部通信用的 advertised.listeners=SASL_PLAINTEXT://vega.aliyun:9093 # 这是注册在zookeeper上供外部调用的 group.initial.rebalance.delay.ms=0 security.inter.broker.protocol=SASL_PLAINTEXT sasl.enabled.mechanisms=PLAIN sasl.mechanism.inter.broker.protocol=PLAIN authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer allow.everyone.if.no.acl.found=true super.users=User:admin
nohup ./bin/kafka-server-start.sh ./config/server_sasl.properties &
用新的配置文件启动kafka。可以先不后台运行,看看是否正常启动,如果正常启动了,基本上就已经成功了一半。-
配置 producer的 topic权限 group权限,其中zookeeper_ip替换为kafka注册的zookeeper的ip,这里配置了producer拥有所有topic和group的写权限。也是符合之前的场景,我们的生产者只需要写数据。
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper_ip:2181 --add --allow-principal User:producer --operation Write --topic=* bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper_ip:2181 --add --allow-principal User:producer --operation Write -group=*
配置 consumer的 topic权限 group权限 只有读的权限
例:topic权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper_ip:2181 --add --allow-principal User:consumer --operation Read --topic abcd
abcd是对应的topic
例:group权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper_ip:2181 --add --allow-principal User:consumer --operation Read -group test
test是对应的group
查看topic已经有的权限:
bin/kafka-acls.sh --list --authorizer-properties zookeeper.connect=zookeeper_ip:2181
至此Kafka服务的ACL权限已经配置完了,接下来直接上SpringBoot集成部分。
SpringBoot集成
-
首先准备2个springboot应用,分别创建一个conf文件
kafka_client_jaas.conf
放在resources/config(当然,名字随你起),其内容如下:producer
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="prod"; };
consumer
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="consumer" password="cons"; };
-
在启动类中加入以下代码,其中获取文件的部分如果是jar包启动的话需要放到一个能加载到的位置,或者解压后运行。
static { System.setProperty("java.security.auth.login.config", Objects.requireNonNull(Application.class.getResource("/config/kafka_client_jaas.conf")).getPath()); }
-
分别编辑
application.yml
为了图省事儿,我把2个配置放一起了,只需要拆开就行了。完成这些步骤后,启动应用,一般来说已经可以正常通信了,但是如果没有配置的话会出现连不上的问题。spring: kafka: bootstrap-servers: kafka_ip:9093 consumer: group-id: test auto-offset-reset: earliest enable-auto-commit: false auto-commit-interval: 100 properties: sasl.mechanism: PLAIN security.protocol: SASL_PLAINTEXT listener: ack-mode: manual concurrency: 1 missing-topics-fatal: false producer: retries: 5 properties: sasl.mechanism: PLAIN security.protocol: SASL_PLAINTEXT