Reactive Spring实战 -- 响应式Kafka交互

本文分享如何使用KRaft部署Kafka集群,以及Spring中如何实现Kafka响应式交互。

KRaft

我们知道,Kafka使用Zookeeper负责为kafka存储broker,Consumer Group等元数据,并使用Zookeeper完成broker选主等操作。
虽然使用Zookeeper简化了Kafka的工作,但这也使Kafka的部署和运维更复杂。

Kafka 2.8.0开始移除了Zookeeper,并使用Kafka內部的仲裁(Quorum)控制器來取代ZooKeeper,官方称这个控制器为 "Kafka Raft metadata mode",即KRaft mode。从此用户可以在不需要Zookeeper的情况下部署Kafka集群,这使Fafka更加简单,轻量级。
使用KRaft模式后,用户只需要专注于维护Kafka集群即可。

注意:由于该功能改动较大,目前Kafka2.8版本提供的KRaft模式是一个测试版本,不推荐在生产环境使用。相信Kafka后续版本很快会提供生产可用的kraft版本。

下面介绍一下如果使用Kafka部署kafka集群。
这里使用3台机器部署3个Kafka节点,使用的Kafka版本为2.8.0。

  1. 生成ClusterId以及配置文件。
    (1)使用kafka-storage.sh生成ClusterId。
$ ./bin/kafka-storage.sh random-uuid
dPqzXBF9R62RFACGSg5c-Q

(2)使用ClusterId生成配置文件

$ ./bin/kafka-storage.sh format -t <uuid> -c ./config/kraft/server.properties
Formatting /tmp/kraft-combined-logs

注意:只需要在生成一个ClusterId,并使用该ClusterId在所有机器上生成配置文件,即集群中所有节点使用的ClusterId需相同。

  1. 修改配置文件
    脚本生成的配置文件只能用于单个Kafka节点,如果在部署Kafka集群,需要对配置文件进行一下修改。

(1)修改config/kraft/server.properties(稍后使用该配置启动kafka)

process.roles=broker,controller 
node.id=1
listeners=PLAINTEXT://172.17.0.2:9092,CONTROLLER://172.17.0.2:9093
advertised.listeners=PLAINTEXT://172.17.0.2:9092
controller.quorum.voters=1@172.17.0.2:9093,2@172.17.0.3:9093,3@172.17.0.4:9093

process.roles指定了该节点角色,有以下取值

  • broker: 这台机器将仅仅当作一个broker
  • controller: 作为Raft quorum的控制器节点
  • broker,controller: 包含以上两者的功能

一个集群中不同节点的node.id需要不同。
controller.quorum.voters需要配置集群中所有的controller节点,配置格式为<nodeid style="margin: 0px; padding: 0px;">@<ip style="margin: 0px; padding: 0px;">:<port style="margin: 0px; padding: 0px;">。</port></ip></nodeid>

(2)
kafka-storage.sh脚本生成的配置,默认将kafka数据存放在/tmp/kraft-combined-logs/,
我们还需要/tmp/kraft-combined-logs/meta.properties配置中的node.id,使其与server.properties配置中保持一起。

node.id=1

  1. 启动kafka
    使用kafka-server-start.sh脚本启动Kafka节点
$ ./bin/kafka-server-start.sh ./config/kraft/server.properties

下面测试一下该kafka集群

  1. 创建主题
$ ./bin/kafka-topics.sh --create --partitions 3 --replication-factor 3 --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 

  1. 生产消息
$ ./bin/kafka-console-producer.sh --broker-list 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1

  1. 消费消息
$ ./bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092 --topic topic1 --from-beginning

这部分命令的使用与低版本的Kafka保持一致。

Kafka的功能暂时还不完善,这是展示一个简单的部署示例。
Kafka文档:https://github.com/apache/kafka/blob/trunk/config/kraft/README.md

Spring中可以使用Spring-Kafka、Spring-Cloud-Stream两个框架实现kafka响应式交互。
下面分别看一下这两个框架的使用。

Spring-Kafka

  1. 添加引用
    添加spring-kafka引用
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.5.8.RELEASE</version>
</dependency>

  1. 准备配置文件,内容如下
spring.kafka.producer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.bootstrap-servers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.LongDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.group-id=warehouse-consumers
spring.kafka.consumer.properties.spring.json.trusted.packages=*

分别是生产者和消费者对应的配置,很简单。

  1. 发送消息
    Spring-Kakfa中可以使用ReactiveKafkaProducerTemplate发送消息。
    首先,我们需要创建一个ReactiveKafkaProducerTemplate实例。(目前SpringBoot会自动创建KafkaTemplate实例,但不会创建ReactiveKafkaProducerTemplate实例)。
@Configuration
public class KafkaConfig {
    @Autowired
    private KafkaProperties properties;

    @Bean
    public ReactiveKafkaProducerTemplate reactiveKafkaProducerTemplate() {
        SenderOptions options = SenderOptions.create(properties.getProducer().buildProperties());
        ReactiveKafkaProducerTemplate template = new ReactiveKafkaProducerTemplate(options);
        return template;
    }
}

KafkaProperties实例由SpringBoot自动创建,读取上面配置文件中对应的配置。

接下来,就可以使用ReactiveKafkaProducerTemplate发送消息了

    @Autowired
    private ReactiveKafkaProducerTemplate template;

    public static final String WAREHOUSE_TOPIC = "warehouse";
    public Mono<Boolean> add(Warehouse warehouse) {
        Mono<SenderResult<Void>> resultMono = template.send(WAREHOUSE_TOPIC, warehouse.getId(), warehouse);
        return resultMono.flatMap(rs -> {
            if(rs.exception() != null) {
                logger.error("send kafka error", rs.exception());
                return Mono.just(false);
            }
            return Mono.just(true);
        });
    }

ReactiveKafkaProducerTemplate#send方法返回一个Mono(这是Spring Reactor中的核心对象),Mono中携带了SenderResult,SenderResult中的RecordMetadata、exception存储该记录的元数据(包括offset、timestamp等信息)以及发送操作的异常。

  1. 消费消息
    Spring-Kafka使用ReactiveKafkaConsumerTemplate消费消息。
@Service
public class WarehouseConsumer {
    @Autowired
    private KafkaProperties properties;

    @PostConstruct
    public void consumer() {
        ReceiverOptions<Long, Warehouse> options = ReceiverOptions.create(properties.getConsumer().buildProperties());
        options = options.subscription(Collections.singleton(WarehouseService.WAREHOUSE_TOPIC));
        new ReactiveKafkaConsumerTemplate(options)
                .receiveAutoAck()
                .subscribe(record -> {
                    logger.info("Warehouse Record:" + record);
                });
    }
}

这里与之前使用@KafkaListener注解实现的消息监听者不同,不过也非常简单,分为两个步骤:
(1)ReceiverOptions#subscription方法将ReceiverOptions关联到kafka主题
(2)创建ReactiveKafkaConsumerTemplate,并注册subscribe的回调函数消费消息。
提示:receiveAutoAck方法会自动提交消费组offset。

Spring-Cloud-Stream

Spring-Cloud-Stream是Spring提供的用于构建消息驱动微服务的框架。
它为不同的消息中间件产品提供一种灵活的,统一的编程模型,可以屏蔽底层不同消息组件的差异,目前支持RabbitMQ、Kafka、RocketMQ等消息组件。

这里简单展示Spring-Cloud-Stream中实现Kafka响应式交互的示例,不深入介绍Spring-Cloud-Stream的应用。

  1. 引入spring-cloud-starter-stream-kafka的引用
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>

  1. 添加配置
spring.cloud.stream.kafka.binder.brokers=172.17.0.2:9092,172.17.0.3:9092,172.17.0.4:9092
spring.cloud.stream.bindings.warehouse2-out-0.contentType=application/json
spring.cloud.stream.bindings.warehouse2-out-0.destination=warehouse2
# 消息格式
spring.cloud.stream.bindings.warehouse3-in-0.contentType=application/json
# 消息目的地,可以理解为Kafka主题
spring.cloud.stream.bindings.warehouse3-in-0.destination=warehouse2
# 定义消费者消费组,可以理解为Kafka消费组
spring.cloud.stream.bindings.warehouse3-in-0.group=warehouse2-consumers
# 映射方法名
spring.cloud.function.definition=warehouse2;warehouse3

Spring-Cloud-Stream 3.1版本之后,@EnableBinding、@Output等StreamApi注解都标记为废弃,并提供了一种更简洁的函数式编程模型。
该版本后,用户不需要使用注解,只要在配置文件中指定需要绑定的方法,Spring-Cloud-Stream会为用户将这些方法与底层消息组件绑定,用户可以直接调用这些方法发送消息,或者接收到消息时Spring-Cloud-Stream会调用这些方法消费消息。

通过以下格式定义输入、输出函数的相关属性:
输出(发送消息):<functionName> + -out- + <index>
输入(消费消息):<functionName> + -in- + <index>
对于典型的单个输入/输出函数,index始终为0,因此它仅与具有多个输入和输出参数的函数相关。
Spring-Cloud-Stream支持具有多个输入(函数参数)/输出(函数返回值)的函数。

spring.cloud.function.definition配置指定需要绑定的方法名,不添加该配置,Spring-Cloud-Stream会自动尝试绑定返回类型为Supplier/Function/Consumer的方法,但是使用该配置可以避免Spring-Cloud-Stream绑定混淆。

  1. 发送消息
    用户可以编写一个返回类型为Supplier的方法,并定时发送消息
    @PollableBean
    public Supplier<Flux<Warehouse>> warehouse2() {
        Warehouse warehouse = new Warehouse();
        warehouse.setId(333L);
        warehouse.setName("天下第一仓");
        warehouse.setLabel("一级仓");

        logger.info("Supplier Add : {}", warehouse);
        return () -> Flux.just(warehouse);
    }

定义该方法后,Spring-Cloud-Stream每秒调用一次该方法,生成Warehouse实例,并发送到Kafka。
(这里方法名warehouse3已经配置在spring.cloud.function.definition中。)

通常场景下,应用并不需要定时发送消息,而是由业务场景触发发送消息操作, 如Rest接口,
这时可以使用StreamBridge接口

    @Autowired
    private StreamBridge streamBridge;

    public boolean add2(Warehouse warehouse) {
        return streamBridge.send("warehouse2-out-0", warehouse);
    }

暂时未发现StreamBridge如何实现响应式交互。

  1. 消费消息
    应用要消费消息,只需要定义一个返回类型为Function/Consumer的方法即可。如下
    @Bean
    public Function<Flux<Warehouse>, Mono<Void>> warehouse3() {
        Logger logger = LoggerFactory.getLogger("WarehouseFunction");
        return flux -> flux.doOnNext(data -> {
            logger.info("Warehouse Data: {}", data);
        }).then();
    }

注意:方法名与<functionName> + -out- + <index>/<functionName> + -in- + <index>
spring.cloud.function.definition中的配置需要保持一致,以免出错。

SpringCloudStream文档:https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html

文章完整代码:https://gitee.com/binecy/bin-springreactive/tree/master/warehouse-service

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容