前言
SpringBoot中集成Kafka,主要目的干啥呢,当然消息推送啦。不同系统之间,自身系统不同组件之间消息通信的一种方式,也可以是使用MQ。
使用消息系统的目的主要就是为了解耦、异步通信、消峰处理。
消息系统三大优点
解耦
怎么理解呢,比如我是A系统,我要现在要给B、C两个系统发送消息,如果不用消息系统,直接调用,就相当于A系统跟B、C系统强耦合到一起了,如果后面还有D、E......等系统怎么办呢,我总不能挨着挨着一个一个写吧,这样代码耦合太高了,而且我还得考虑别人收到没有,处理成功失败等等情况。那我使用消息系统不就解决这个问题了嘛,我直管向Kafka推送消息,我才不管你谁来消费呢,你想消费你就去消费Kafka消息。这不就解耦了嘛。
异步
系统A调用其他系统接口的时候,需要一直等待其他系统处理完成它的业务逻辑后,返回处理结果,我才能继续处理我的业务逻辑,但是它的业务逻辑我其实不关心,我没必要等啊。
那用异步不就好了吗,我将消息发给Kafka,你其他系统去消费就行了,我监听你返回的处理结果就行了,我发送完后就可以继续做其他操作了,不用一直等着。
消峰
比如电商在秒杀的时候,用户量暴增,但是它又只是一小段时间的爆发量。如果不处理,那服务器不得直接挂了。怎么办呢?那我们用Kafka啊,来了请求我就推送消息到Kafka,然后呢,消费设置一秒消费多少就好,然后等高峰过去,用户量回归正常,积累的消息也会慢慢的被消费完。这样咱不是又能愉快的玩耍了嘛。
Kafka术语
Producer:生产者,消息的生产者,负责推送消息到Kafka队列
Consumer:消费者,负责消费Kafka队列中的消息
Consumer group:用来实现消息广播(发给多个Consumer)或单播(发给单个Consumer)的手段
Offset:kafka存储消息的偏移量,可以理解为下标用来控制消息的消费位置
Broker:Kafka服务节点,一个kafka服务器就是一个Broker,一个集群由多个Broker组成,一个 Broker下可以有多个Topic
Topic:消息的类别、标题,可以理解为是一个消息的队列
Partition:属于Topic的子集,消息分区,一个Topic可以有多个partition,一个partition在物理上对 应了一个文件夹;partition中的所有消息都会分配一个offset
Kafka消息模式
点对点消息传递模式:
一个消息推送出去,只能被消费一次,任何一个消费者消费了该消息后,其他消费者都不能继续消费该消息
发布-订阅模式:
消息发布到队列,所有订阅了topic的的消费者都可以消费topic里的所有消息。
一、SpringBoot整合Kafka
1.1 pom文件中添加maven引用
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.7</version>
</dependency>
1.2 yml文件中增加kafka生产者的配置
spring:
kafka:
# 指定kafka server的地址,集群配多个,中间,逗号隔开
bootstrap-servers:
- 127.0.0.1:9092
- 127.0.0.1:9093
- 127.0.0.1:9094
# kafka生产者配置
producer:
# 写入失败时,重试次数。当leader失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
retries: 0
# 每次批量发送消息的数量,produce积累到一定数据,一次发数据量
# 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。
# 这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置(16K)
batch-size: 16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
# #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
buffer-memory: 33554432
#默认情况下消息是不压缩的,此参数可指定采用何种算法压缩消息,可取值:none,snappy,gzip,lz4。snappy压缩算法由Google研发,
#这种算法在性能和压缩比取得比较好的平衡;相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
#如果不开启压缩,可设置为none(默认值),比较大的消息可开启
compressionType: none
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,
# 无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后
# 立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,
# 这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
acks: all
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 连接超时时间
properties:
request.timeout.ms: 30000
###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
###########【初始化生产者配置】###########
# 重试次数
spring.kafka.producer.retries=0
# 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延时
spring.kafka.producer.properties.linger.ms=0
# 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
# linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
# 生产端缓冲区大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定义分区器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
###########【初始化消费者配置】###########
# 默认的消费组ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自动提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延时(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
#spring.kafka.consumer.auto-commit-interval=100
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset;
# latest:重置为分区中最新的offset(消费分区中新产生的数据);
# none:只要有一个分区不存在已提交的offset,就抛出异常;
spring.kafka.consumer.auto-offset-reset=latest
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消费请求超时时间
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化类
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消费端监听的topic不存在时,项目启动会报错(关掉)
spring.kafka.listener.missing-topics-fatal=false
# 设置批量消费
# spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
# spring.kafka.consumer.max-poll-records=50
1.3 配置生产者Producer
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import static com.alibaba.fastjson.JSON.toJSONString;
/**
* @Author: huangyibo
* @Date: 2022/7/2 17:28
* @Description: kafka生产者类
*/
@Component
@Slf4j
@SuppressWarnings({"unused"})
public class ProducerUtils {
private static final String PUSH_MSG_LOG = "准备发送消息为:{}";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private KafkaAdminClient kafkaAdminClient;
/**
* 如果没有topic,则创建一个
* @param topicName :
* @param partitionNum :
* @param replicaNum :
* @return org.apache.kafka.clients.admin.CreateTopicsResult
*/
public Boolean createTopic(String topicName, int partitionNum, int replicaNum){
KafkaFuture<Set<String>> topics = kafkaAdminClient.listTopics().names();
try {
if (topics.get().contains(topicName)) {
return true;
}
NewTopic newTopic = new NewTopic(topicName, partitionNum, (short) replicaNum);
kafkaAdminClient.createTopics(Collections.singleton(newTopic));
return true;
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
return false;
}
}
/**
* 传入topic名称,json格式字符串的消息,生产者进行发送
* @param topicName : topic名称
* @param jsonStr : 消息json字符串
* @return boolean : 推送是否成功
*/
public boolean sendMessage(String topicName, String jsonStr) {
createTopic(topicName, 5, 5);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,json格式字符串数组的消息,生产者进行发送
* @param topicName : topic名称
* @param jsonStrs : 消息json字符串数组
* @return boolean : 推送是否成功
*/
public Boolean[] sendMessage(String topicName, String[] jsonStrs) {
createTopic(topicName, 5, 5);
int msgLength = jsonStrs.length;
Boolean[] success = new Boolean[msgLength];
for (int i = 0; i < msgLength; i++) {
String jsonStr = jsonStrs[i];
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,消息对象,生产者进行发送
* @param topicName : topic名称
* @param obj : 消息对象
* @return boolean : 推送是否成功
*/
public boolean sendMessage(String topicName, Object obj) {
createTopic(topicName, 5, 5);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,消息对象数组,生产者进行发送
* @param topicName : topic名称
* @param list : 消息对象数组
* @return boolean : 推送是否成功
*/
public Boolean[] sendMessage(String topicName, List<Object> list) {
createTopic(topicName, 5, 5);
Boolean[] success = new Boolean[list.size()];
for (int i = 0; i < list.size(); i++) {
Object obj = list.get(i);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,json格式字符串的消息,生产者进行发送
* @param topicName : topic名称
* @param key : 消息key
* @param jsonStr : 消息json字符串
* @return boolean : 推送是否成功
*/
public boolean sendMessage(String topicName, String key, String jsonStr) {
createTopic(topicName, 5, 5);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
key, jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,json格式字符串数组的消息,生产者进行发送
* @param topicName : topic名称
* @param key : 消息key
* @param jsonStrs : 消息json字符串数组
* @return boolean : 推送是否成功
*/
public Boolean[] sendMessage(String topicName, String key, String[] jsonStrs) {
createTopic(topicName, 5, 5);
int msgLength = jsonStrs.length;
Boolean[] success = new Boolean[msgLength];
for (int i = 0; i < msgLength; i++) {
String jsonStr = jsonStrs[i];
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
key, jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,消息对象,生产者进行发送
* @param topicName : topic名称
* @param key : 消息key
* @param obj : 消息对象
* @return boolean : 推送是否成功
*/
public boolean sendMessage(String topicName, String key, Object obj) {
createTopic(topicName, 5, 5);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
key, jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,消息对象数组,生产者进行发送
* @param topicName : topic名称
* @param key : 消息key
* @param list : 消息对象数组
* @return boolean : 推送是否成功
*/
public Boolean[] sendMessage(String topicName, String key, List<Object> list) {
createTopic(topicName, 5, 5);
Boolean[] success = new Boolean[list.size()];
for (int i = 0; i < list.size(); i++) {
Object obj = list.get(i);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
key, jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,json格式字符串的消息,生产者进行发送
* @param topicName : topic名称
* @param partition : 消息发送分区
* @param key : 消息key
* @param jsonStr : 消息json字符串
* @return boolean : 推送是否成功
*/
public boolean sendMessage(String topicName, int partition, String key, String jsonStr) {
createTopic(topicName, 5, 5);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
partition, key, jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,json格式字符串数组的消息,生产者进行发送
* @param topicName : topic名称
* @param partition : 消息发送分区
* @param key : 消息key
* @param jsonStrs : 消息json字符串数组
* @return boolean : 推送是否成功
*/
public Boolean[] sendMessage(String topicName, int partition, String key, String[] jsonStrs) {
createTopic(topicName, 5, 5);
int msgLength = jsonStrs.length;
Boolean[] success = new Boolean[msgLength];
for (int i = 0; i < msgLength; i++) {
String jsonStr = jsonStrs[i];
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
partition, key, jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 传入topic名称,消息对象,生产者进行发送
* @param topicName : topic名称
* @param partition : 消息发送分区
* @param key : 消息key
* @param obj : 消息对象
* @return boolean : 推送是否成功
*/
public boolean sendMessage(String topicName, int partition, String key, Object obj) {
createTopic(topicName, 5, 5);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(topicName,
partition, key, jsonStr));
return dealSendResult(future);
}
/**
* 传入topic名称,消息对象数组,生产者进行发送
* @param topicName : topic名称
* @param partition : 消息发送分区
* @param key : 消息key
* @param list : 消息对象数组
* @return boolean : 推送是否成功
*/
public Boolean[] sendMessage(String topicName, int partition, String key, List<Object> list) {
createTopic(topicName, 5, 5);
Boolean[] success = new Boolean[list.size()];
for (int i = 0; i < list.size(); i++) {
Object obj = list.get(i);
String jsonStr = toJSONString(obj);
log.info(PUSH_MSG_LOG, jsonStr);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(new ProducerRecord<>(
topicName, partition, key, jsonStr));
success[i] = dealSendResult(future);
}
return success;
}
/**
* 处理消息推送结果
* kafkaTemplate提供了一个回调方法addCallback,
* 我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理
* @param future :
* @return boolean
*/
private boolean dealSendResult(ListenableFuture<SendResult<String, Object>> future) {
final boolean[] success = {false};
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理
log.info("生产者 发送消息失败 exMessage:{}", throwable.getMessage());
success[0] = false;
}
@Override
public void onSuccess(SendResult<String, Object> result) {
//成功的处理
log.info("生产者 发送消息成功, topic:{}, partition:{}, offset:{}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
success[0] = true;
}
});
return success[0];
}
}
此处生产者是异步发送消息,不用等待消息发送完成。
Producer是一个接口,声明了同步send和异步send两个重要方法。
ProducerRecord 消息实体类,每条消息由(topic,key,value,timestamp)四元组封装。一条消息key可以为空和timestamp可以设置当前时间为默认值。
1.3.1 带回调的生产者
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法,
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
1.3.2、自定义分区器
我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:
- ① 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
- ② 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
- ③ patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;
※ 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区。
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 自定义分区规则(这里假设全部发到0号分区)
// ......
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名
# 自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner
1.3.3、kafka事务提交
如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务。
@GetMapping("/kafka/transaction")
public void sendMessage7(){
// 声明事务:后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations -> {
operations.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
});
// 不声明事务:后面报错但前面消息已经发送成功了
kafkaTemplate.send("topic1","test executeInTransaction");
throw new RuntimeException("fail");
}
1.4 消费者
1.4.1、指定topic、partition、offset消费
前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供。
/**
* @Title 指定topic、partition、offset消费
* @Description 同时监听topic1和topic2,监听topic1的0号分区、topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
* @Param [record]
* @return void
**/
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1", partitions = { "0" }),
@TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}
属性解释:
- ① id:消费者ID;
- ② groupId:消费组ID;
- ③ topics:监听的topic,可监听多个;
- ④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。
上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。
注意:topics和topicPartitions不能同时使用;
1.4.2、批量消费
设置application.prpertise开启批量消费即可,
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
接收消息时用List来接收,监听代码如下,
@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
System.out.println(">>>批量消费一次,records.size()="+records.size());
for (ConsumerRecord<?, ?> record : records) {
System.out.println(record.value());
}
}
1.4.3、ConsumerAwareListenerErrorHandler 异常处理器
通过异常处理器,我们可以处理consumer在消费时发生的异常。
新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。
// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
return (message, exception, consumer) -> {
System.out.println("消费异常:"+message.getPayload());
return null;
};
}
// 将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
throw new Exception("简单消费-模拟异常");
}
// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}
执行看一下效果
1.4.4、消息过滤器
消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。
@Component
public class KafkaConsumer {
@Autowired
ConsumerFactory consumerFactory;
// 消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// 被过滤的消息将被丢弃
factory.setAckDiscarded(true);
// 消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}
// 消息过滤监听
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?, ?> record) {
System.out.println(record.value());
}
}
上面实现了一个"过滤奇数、接收偶数"的过滤策略,我们向topic1发送0-99总共100条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数
1.4.5、消息转发
在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下:
/**
* @Title 消息转发
* @Description 从topic1接收到的消息经过处理后转发到topic2
* @Param [record]
* @return void
**/
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
return record.value()+"-forward message";
}
1.4.6、定时启动、停止监听器
默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:
- ① 禁止监听器自启动;
- ② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动。
@EnableScheduling
@Component
public class CronTimer {
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private ConsumerFactory consumerFactory;
// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}
// 监听器
@KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
// 定时启动监听器
@Scheduled(cron = "0 42 11 * * ? ")
public void startListener() {
System.out.println("启动监听器...");
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("timingConsumer").isRunning()) {
registry.getListenerContainer("timingConsumer").start();
}
//registry.getListenerContainer("timingConsumer").resume();
}
// 定时停止监听器
@Scheduled(cron = "0 45 11 * * ? ")
public void shutDownListener() {
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}
启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作
11:42分监听器启动开始工作,消费消息
11:45分监听器停止工作,
1.4.7 配置消费者Consumer
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Optional;
/**
* @Author: huangyibo
* @Date: 2022/7/2 17:32
* @Description: kafka消费者类
*/
@Component
@Slf4j
public class ConsumerUtils {
@Autowired
private RedisUtils redisUtils;
@Bean
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(5);
factory.getContainerProperties().setPollTimeout(1000);
factory.setBatchListener(true);//设置为批量消费,每个批次数量在Kafka配置参数中设置
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置手动提交ackMode
return factory;
}
/**
* 单条的消费kafka消息
* @param record : 消息记录
* @param ack : ack回调确认
* @return void :
*/
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
@TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"0" ,"2" ,"4"}),
}, groupId = KafkaConstants.TOPIC_GROUP1)
public void topicTest(ConsumerRecord<String, String> record, Acknowledgment ack) {
Optional<String> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消费了: Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
ack.acknowledge();//手动提交offset
}
}
/**
* 批量的消费kafka消息,要配合containerFactory使用,配置的bean见batchFactory
* @param records : 消息记录列表
* @param ack : ack回调确认
* @return void :
*/
@Transactional(rollbackOn = Exception.class)
@KafkaListener(topics = KafkaConstants.TOPIC_TEST, topicPartitions = {
@TopicPartition(topic = KafkaConstants.TOPIC_TEST, partitions = {"1", "3"}),
}, groupId = KafkaConstants.TOPIC_GROUP2, containerFactory="batchFactory")
public void topicTest2(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
try {
for (ConsumerRecord<String, String> record : records) {
//取到消息后,先查询缓存中是否已经存在,存在表示不需要再次处理
//如果消息不存在,业务处理完成后将消息存入redis;如果消息存在直接跳过;这样可防止重复消费
boolean isExists = redisUtils.hasKey(record.topic() + record.partition() + record.key());
if (!isExists) {
Optional<String> message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test1 消费了: Topic:" + record.topic() + ",key:" + record.key() + ",Message:" + msg);
}
redisUtils.set(record.topic() + record.partition() + record.key(), record.value());
}
}
ack.acknowledge();//手动提交offset
}catch (Exception e){
log.error(e.getMessage());
throw e;
}
}
}
消费者,写了两种模式,批量获取消息与单条获取消息,获取消息的时候,指定topic,partition。通过KafkaListener监听来消费消息。
采用Kafka提供的StringSerializer和StringDeserializer进行序列化和反序列化,因为此种序列化方式无法序列化实体类。
1.5 使用
接下来当然就是使用了,直接定义一个接口,然后访问接口,在接口中调用Producer发送消息,就可在Consumer消费者中监听获取到消息。
@ApiOperation(value = "测试1", notes = "test2")
@GetMapping(value = "/test2")
public ResponseVo test2(String value, String key, Integer partition){
if(StringUtils.isBlank(value)){
return new ResponseVo.Builder().error().message("请从传入发送的消息!").build();
}
Message message = new Message.Builder().id(UuidUtil.getUuid32()).msg(value).sendTime(DateUtils.nowDate()).build();
String str = JSONObject.toJSONString(message);
if(StringUtils.isNotBlank(key)){
if(partition != null){
producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, partition, key, str);
}else{
producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, key, str);
}
}else {
producerUtils.sendMessage(KafkaConstants.TOPIC_TEST, str);
}
return null;
}
1.6 测试验证
打开swagger页面,找到接口,录入参数:
点击execute执行测试。结果如下图:
二、采用自定义序列化和反序列化器进行实体类的序列化和反序列化
和内置的StringSerializer字符串序列化一样,如果要自定义序列化方式,需要实现接口Serializer。假设每个字段按照下图所示的方式自定义序列化:
2.1、创建User实体类
public class User implements Serializable {
private Long id;
private String name;
private Integer age;
/**
* transient 关键字修饰的字段不会被序列化
*/
private transient String desc;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "User{" +
"id=" + id +
", name='" + name + '\'' +
", age=" + age +
", desc='" + desc + '\'' +
'}';
}
}
2.2、创建User序列化器
public class UserSerializable implements Serializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, User user) {
System.out.println("topic : " + topic + ", user : " + user);
byte[] dataArray = null;
ByteArrayOutputStream outputStream = null;
ObjectOutputStream objectOutputStream = null;
try {
outputStream = new ByteArrayOutputStream();
objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(user);
dataArray = outputStream.toByteArray();
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
if(outputStream != null){
try {
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(objectOutputStream != null){
try {
objectOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return dataArray;
}
@Override
public void close() {
}
}
2.3、创建User反序列化器
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public User deserialize(String topic, byte[] bytes) {
User user = null;
ByteArrayInputStream inputStream = null;
ObjectInputStream objectInputStream = null;
try {
inputStream = new ByteArrayInputStream(bytes);
objectInputStream = new ObjectInputStream(inputStream);
user = (User)objectInputStream.readObject();
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
if(inputStream != null){
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(objectInputStream != null){
try {
objectInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return user;
}
@Override
public void close() {
}
}
2.4、修改application-dev.properties配置
A、修改生产者配置的value-serializer
# 指定生产者消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.yibo.springbootkafkademo.Serializable.UserSerializable
B、修改消费者配置的value-deserializer
# 指定消费者消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.yibo.springbootkafkademo.Serializable.UserDeserializer
2.5、生产者向kafka发送消息
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@PostMapping("/user/save")
public boolean saveUser(@RequestBody User user){
kafkaTemplate.send("userTopic",user);
return true;
}
}
6、消费者监听topic=userTopic的消息
@Component
public class ConsumerListener {
@KafkaListener(topics = "userTopic")
public void onMessage(User user){
//insertIntoDb(buffer);//这里为插入数据库代码
System.out.println(user);
}
}
总结
可以看到,自定义Serializer和Deserializer非常痛苦,还有很多类型不支持,非常脆弱。复杂类型的支持更是一件痛苦的事情,不同版本之间的兼容性问题更是一个极大的挑战。由于Serializer和Deserializer影响到上下游系统,导致牵一发而动全身。自定义序列化&反序列化实现不是能力的体现,而是逗比的体现。所以强烈不建议自定义实现序列化&反序列化,推荐直接使用StringSerializer和StringDeserializer,然后使用json作为标准的数据传输格式。站在巨人的肩膀上,事半功倍。
参考:
https://blog.csdn.net/liu649983697/article/details/119456475
https://blog.csdn.net/yuanlong122716/article/details/105160545