前面的文章,我们使用的是jdk自带的队列,实现了服务的吞吐量增加,但是我们知道的是,jdk的队列时基于内存的,即当请求量很大的时候,大量的请求缓存在内存当中,对于内存的要求还是很大的,不是很适合并发量很大的业务场景。尤其是在电商的场景,都会通过消息队列的削峰,解耦,从而提高系统的吞吐量,保证稳定性。所以我们接下来,继续对系统进行改进,引入kafka,进一步对于稳定性进行完善。
关于kafka的安装,介绍,集成,请参考文章开头给出的链接内的具体内容。
一、引入Kafka
引入依赖:
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.10.RELEASE</version>
</dependency>
添加配置:
spring:
kafka:
bootstrap-servers: 172.16.3.29:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
# #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
# batch-size: 16384
# # 设置生产者内存缓冲区的大小。
# buffer-memory: 33554432
# # 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# # 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
# acks: 1
consumer:
group-id: test
# # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
# auto-commit-interval: 1S
# # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
# auto-offset-reset: earliest
# # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
# enable-auto-commit: false
# # 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# # 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 150
listener:
# # 在侦听器容器中运行的线程数。
# concurrency: 5
# #listner负责ack,每调用一次,就立即commit
# ack-mode: manual_immediate
missing-topics-fatal: false
二、测试服务改造
2.1 消息生产者
提供一个简单的生产者工具类,只有简单的发送消息一个方法,参数是 topic 和 message。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* kafka生产者
*
* @author weirx
* @date 2021/02/03 14:22
**/
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* kafka消息发送
* @param
* @author weirx
* @return void
* @date: 2021/2/3
*/
public void send(String topic,String message){
kafkaTemplate.send(topic,message);
}
}
2.2 下单接口改造
引入KafkaProducer,下单时替换http请求成kafka推送,伪代码如下:
@Autowired
private KafkaProducer kafkaProducer;
kafkaProducer.send("rob-necessities-order",JSONObject.toJSONString(map));
2.3 支付接口改造
在前面一篇文章当中,支付接口是作为下单接口的回调接口被调用的,其实是一个完全同步的接口,易出现问题,如阻塞,请求失败、超时等。
所以此处我们也将其改造成kafka异步消费,消费者工具类如下所示:
@Slf4j
@Component
public class KafkaConsumer {
@Autowired
private TradingServiceImpl tradingService;
@KafkaListener(topics = {"rob-necessities-trading"})
public void consumer(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
String orderId = message.toString();
log.info("支付开始时间***********************:{},订单id: {}", LocalDateTime.now(), orderId);
tradingService.pay(Long.valueOf(orderId));
log.info("支付完成时间/////////////////////////:{},订单id: {}", LocalDateTime.now(), orderId);
}
}
}
三、订单服务改造
2.1 支付回调改造
前面的支付方式,我们是在订单完成后通过http接口形式,现在改用kafka,所以我们需要提供一个kafka消息生产者,将消息同送到测试服务:
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* kafka消息发送
* @param
* @author weirx
* @return void
* @date: 2021/2/3
*/
public void send(String topic,String message){
kafkaTemplate.send(topic,message);
}
}
2.2 下单接口改造
接口下单命令的方式不再是等待http请求调用了,此处变成监听kafka,提供消费者如下:
@Slf4j
@Component
public class KafkaConsumer {
@Autowired
private OrderService orderService;
@KafkaListener(topics = {"rob-necessities-order"})
public void consumer(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
JSONObject jsonObject = JSONObject.parseObject(message.toString());
OrderDTO orderDTO = jsonObject.toJavaObject(OrderDTO.class);
orderService.saveOrder(orderDTO);
}
}
}
因为我们已经使用了kafka作为并发时流量缓冲的组件,就不在需要我们前面自己添加进来的队列了,所以改造后的下单接口如下所示:
@Autowired
private KafkaProducer kafkaProducer;
@Override
public Result saveOrder(OrderDTO orderDTO) {
// 下单实现
Result result = this.saveOrderImpl(orderDTO);
String orderId = JSONObject.parseObject(JSONObject.toJSONString(result.getData())).getString("id");
kafkaProducer.send("rob-necessities-trading", orderId);
return Result.success("下单成功");
}
如上所示,具体的订单业务逻辑没有变化。
四、测试
kafka是单节点的,在其他服务器上,因为我本地没有内存了。
全部完成时间大概是23秒,时间上有些许增加,但是整体吞吐量跟以前绝对不是一个量级的了。
另外,我们在调用支付接口的时候也可以通过kafka的形式,但是本文不做修改了。
作者:我犟不过你
链接:https://juejin.cn/post/7068450775743070244