Springboot集成Kafka

一、书写背景:

最近陆续收到不少朋友反馈,他们在计划要在springboot项目引入kafka中间件。在网上找过很多资料,但都比较零散,按照其说明进行操作后让项目正常跑起来仍是比较坎坷,听说我对kafka比较了解,希望给予一起分享。刚好最近因为疫情,实际相对于平常稍宽松,也想借此写点东西,一来作为自己的总结,二来可以给予有需要的朋友一些引导,针对此文期望对各位遇到问题的朋友有一定的帮助。

二、前期准备:

1. 安装JDK,具体版本可以根据项目实际情况选择,目前使用最多的为jdk8

2.安装Zookeeper,具体版本可以根据项目实际情况选择,本项目使用的是3.5.8

3.安装Kafka ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.1

4.安装Kafka Manage  (非必要:安装主要是了对kafka项目操作提供图形化界面操作),具体版本可以根据项目实际情况选择,本项目使用的是1.3.3.7

三、具体实现:

1.pom.xml加依赖:

<parent>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-parent</artifactId>

<version>2.3.12.RELEASE</version>

<relativePath/> <!-- lookup parent from repository -->

</parent>

<dependencies>

<!--springboot web依赖 -->

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-web</artifactId>

</dependency>

<!--kafka依赖 -->

<dependency>

<groupId>org.springframework.kafka</groupId>

<artifactId>spring-kafka</artifactId>

</dependency>

</dependencies>

2.application.yml做配置

spring:

  kafka:

    bootstrap-servers: 127.0.0.1:9092

    producer: #生产者

      # 发生错误后,消息重发的次数。重试1次,此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能时带有顺序写入的数据打乱顺序

      #比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1)

      retries: 1

      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。模式时16k

      batch-size: 16384 #16k

      # 设置生产者内存缓冲区的大小

      buffer-memory: 33554432

      acks: 1

      # 键的序列化方式

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      # 值的序列化方式

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:

      #group-id: default-group

      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效

      auto-commit-interval: 1S

      enable-auto-commit: false

      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:

      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)

      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录

      auto-offset-reset: earliest

      # 键的反序列化方式

      key-deserializer: org.apache.kafka.common.serialization.StringSerializer

      # 值的反序列化方式

      value-deserializer: org.apache.kafka.common.serialization.StringSerializer

    listener:

      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交

      # RECORD

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

      # BATCH

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

      # TIME

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

      # COUNT

      # TIME | COUNT 有一个条件满足时提交

      # COUNT_TIME

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交

      # MANUAL

      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种

      # MANUAL_IMMEDIATE

      ack-mode: manual_immediate

      # 在侦听器容器中运行的线程数

      concurrency: 5

3.创建topic:

3.1 通过命令创建,如下:

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

 -- 127.0.0.1:2181  zookeeper 服务器地址

--  replication-factor partitions 副本数量

--partitions partition数量

3.2 通过Kafka Manager创建(推荐使用),操作如下:


3.2.1 新建Cluster

点击【Cluster】>【Add Cluster】打开如下添加集群的配置界面:

输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.10.1.0)


注意:如果没有在 Kafka 中配置过 JMX_PORT,千万不要选择第一个复选框。Enable JMX Polling如果选择了该复选框,Kafka-manager 可能会无法启动。




3.2.2 新建主题


新建topic操作


查看新创建的topic

4.编代码:

4.1 定义消息生产者 API

package com.charlie.cloudconsumer.service.impl.kafka;

import com.charlie.cloudconsumer.common.utils.JSON;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.UUID;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息生产端,为实践业务提供向kafka block发现消息的API

*/

@Component

@Slf4j

public class QueueProducer {

@Autowired

    private KafkaTemplatekafkaTemplate;

public void sendQueue(String topic,Object msgContent) {

String obj2String =JSON.toJSONString(msgContent);

log.info("kafka service 准备发送消息为:{}",obj2String);

//发送消息

        ListenableFuture>future =kafkaTemplate.send(topic,UUID.randomUUID().toString(),obj2String);

future.addCallback(new ListenableFutureCallback>() {

//消息发送成功

            @Override

            public void onSuccess(SendResult stringObjectSendResult) {

log.info("[kafka service-生产成功]topic:{},结果{}",topic, stringObjectSendResult.toString());

}

//消息发送失败

            @Override

            public void onFailure(Throwable throwable) {

//发送失败的处理,本处只是记录了错误日志,可结合实际业务做处理

                log.info("[kafka service-生产失败]topic:{},失败原因{}",topic, throwable.getMessage());

}

});

}

}

4.2 定义消息处理业务类

package com.charlie.cloudconsumer.service.impl.kafka;

import org.apache.commons.lang3.ObjectUtils;

import org.springframework.stereotype.Component;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : 消费端实际的业务处理对象

*/

@Component //添加此注解的原因是因为消费端在项目启动时就会初始化,消费端需要用到此类,故也让此类在项目启动时进行注册

public class QueueDataProcess {

public boolean doExec(Object obj) {

// todu 具体的业务逻辑

        if (ObjectUtils.isNotEmpty(obj)) {

return true;

}else {

return false;

}

}

}

4.3 定义消费者

package com.charlie.cloudconsumer.service.impl.kafka;

import com.charlie.cloudconsumer.common.utils.JSON;

import com.charlie.cloudconsumer.model.Order;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.exception.ExceptionUtils;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.support.Acknowledgment;

import org.springframework.kafka.support.KafkaHeaders;

import org.springframework.messaging.handler.annotation.Header;

import org.springframework.stereotype.Component;

import java.util.Optional;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息消费端,负责消费特定topic消息

*/

@Component

@Slf4j

@SuppressWarnings("all")

public class QueueConsumer {

@Autowired

    private QueueDataProcess queueDataProcess;

/**

*

*/

    @KafkaListener(topics ="test", groupId ="consumer")

public void doConsumer(ConsumerRecord record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {

Optional message =Optional.ofNullable(record.value());

if (message.isPresent()) {

try {

Object msg =message.get();

log.info("[kafka-消费] doConsumer 消费了: Topic:" + topic +",Message:" +msg);

boolean res =queueDataProcess.doExec(JSON.parseObject(msg.toString(),Order.class));

if (res) {

ack.acknowledge();

}

}catch (Exception ex) {

log.error("[kafka-消费异常] doConsumer Error {} ",ExceptionUtils.getFullStackTrace(ex));

}

}

}

}

4.4 定义控制器

package com.charlie.cloudconsumer.controller;

import com.alibaba.fastjson.JSON;

import com.charlie.cloudconsumer.common.utils.AjaxResult;

import com.charlie.cloudconsumer.common.utils.BuildResponseUtils;

import com.charlie.cloudconsumer.model.Order;

import com.charlie.cloudconsumer.service.impl.kafka.QueueProducer;

import org.apache.commons.lang3.ObjectUtils;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestMethod;

import org.springframework.web.bind.annotation.RestController;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息发送控制器,负责接受用户的发送的队列消息

*/

@RestController

@RequestMapping(value ="/kafka",produces =MediaType.APPLICATION_JSON_VALUE)

public class KafkaController {

    @Autowired

    private QueueProducer queueProducer;

    @RequestMapping(value = "/send",method = RequestMethod.POST)

    public  AjaxResult<?> sendMsg(@RequestBody Order order) {

        AjaxResult<?> ajaxResult= null;

        if (ObjectUtils.isNotEmpty(order)) {

          queueProducer.sendQueue("test",order);

            ajaxResult = BuildResponseUtils.success(0,"发送消息:"+ JSON.toJSONString(order) + "成功!");

        } else {

            ajaxResult = BuildResponseUtils.success(1,"发送消息:"+ JSON.toJSONString(order) + "失败!");

        }

        return ajaxResult;

    }

}

四、效果展示:

1 postman模拟用户发起操作


发送参数

2. springboot项目处理情况



3. postman端数据返回情况



以上的代码是经过本人运行过的,希望对看到的您有所帮助!在实际生产过程中的更多问题请查阅:Kafka问题总结及性能优化最佳实践

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,869评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,716评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,223评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,047评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,089评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,839评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,516评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,410评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,920评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,052评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,179评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,868评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,522评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,070评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,186评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,487评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,162评论 2 356

推荐阅读更多精彩内容