使用kafka提升你的订单接口吞吐量

前面的文章,我们使用的是jdk自带的队列,实现了服务的吞吐量增加,但是我们知道的是,jdk的队列时基于内存的,即当请求量很大的时候,大量的请求缓存在内存当中,对于内存的要求还是很大的,不是很适合并发量很大的业务场景。尤其是在电商的场景,都会通过消息队列的削峰,解耦,从而提高系统的吞吐量,保证稳定性。所以我们接下来,继续对系统进行改进,引入kafka,进一步对于稳定性进行完善。
关于kafka的安装,介绍,集成,请参考文章开头给出的链接内的具体内容。

一、引入Kafka

引入依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.10.RELEASE</version>
</dependency>

添加配置:

spring:
  kafka:
    bootstrap-servers: 172.16.3.29:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      #      batch-size: 16384
      #      # 设置生产者内存缓冲区的大小。
      #      buffer-memory: 33554432
      #      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
    #      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
    #      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
    #      acks: 1
    consumer:
      group-id: test
      #      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      #      auto-commit-interval: 1S
      #      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      #      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      #      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      #      auto-offset-reset: earliest
      #      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      #      enable-auto-commit: false
      #      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 150
    listener:
      #      # 在侦听器容器中运行的线程数。
      #      concurrency: 5
      #      #listner负责ack,每调用一次,就立即commit
      #      ack-mode: manual_immediate
      missing-topics-fatal: false

二、测试服务改造

2.1 消息生产者

提供一个简单的生产者工具类,只有简单的发送消息一个方法,参数是 topic 和 message。
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * kafka生产者
 *
 * @author weirx
 * @date 2021/02/03 14:22
 **/
@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * kafka消息发送
     * @param
     * @author weirx
     * @return void
     * @date: 2021/2/3
     */
    public void send(String topic,String message){
        kafkaTemplate.send(topic,message);
    }
}

2.2 下单接口改造

引入KafkaProducer,下单时替换http请求成kafka推送,伪代码如下:

@Autowired
private KafkaProducer kafkaProducer;


kafkaProducer.send("rob-necessities-order",JSONObject.toJSONString(map));

2.3 支付接口改造

在前面一篇文章当中,支付接口是作为下单接口的回调接口被调用的,其实是一个完全同步的接口,易出现问题,如阻塞,请求失败、超时等。
所以此处我们也将其改造成kafka异步消费,消费者工具类如下所示:

@Slf4j
@Component
public class KafkaConsumer {

    @Autowired
    private TradingServiceImpl tradingService;

    @KafkaListener(topics = {"rob-necessities-trading"})
    public void consumer(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            String orderId = message.toString();
            log.info("支付开始时间***********************:{},订单id: {}", LocalDateTime.now(), orderId);
            tradingService.pay(Long.valueOf(orderId));
            log.info("支付完成时间/////////////////////////:{},订单id: {}", LocalDateTime.now(), orderId);
        }
    }
}

三、订单服务改造

2.1 支付回调改造

前面的支付方式,我们是在订单完成后通过http接口形式,现在改用kafka,所以我们需要提供一个kafka消息生产者,将消息同送到测试服务:

@Component
public class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * kafka消息发送
     * @param
     * @author weirx
     * @return void
     * @date: 2021/2/3
     */
    public void send(String topic,String message){
        kafkaTemplate.send(topic,message);
    }
}

2.2 下单接口改造

接口下单命令的方式不再是等待http请求调用了,此处变成监听kafka,提供消费者如下:

@Slf4j
@Component
public class KafkaConsumer {

    @Autowired
    private OrderService orderService;

    @KafkaListener(topics = {"rob-necessities-order"})
    public void consumer(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("----------------- record =" + record);
            log.info("------------------ message =" + message);
            JSONObject jsonObject = JSONObject.parseObject(message.toString());
            OrderDTO orderDTO = jsonObject.toJavaObject(OrderDTO.class);
            orderService.saveOrder(orderDTO);
        }
    }
}

因为我们已经使用了kafka作为并发时流量缓冲的组件,就不在需要我们前面自己添加进来的队列了,所以改造后的下单接口如下所示:

@Autowired
private KafkaProducer kafkaProducer;

@Override
public Result saveOrder(OrderDTO orderDTO) {

    // 下单实现
    Result result = this.saveOrderImpl(orderDTO);
    String orderId = JSONObject.parseObject(JSONObject.toJSONString(result.getData())).getString("id");
    kafkaProducer.send("rob-necessities-trading", orderId);
    return Result.success("下单成功");
}

如上所示,具体的订单业务逻辑没有变化。

四、测试

kafka是单节点的,在其他服务器上,因为我本地没有内存了。
全部完成时间大概是23秒,时间上有些许增加,但是整体吞吐量跟以前绝对不是一个量级的了。
另外,我们在调用支付接口的时候也可以通过kafka的形式,但是本文不做修改了。

作者:我犟不过你
链接:https://juejin.cn/post/7068450775743070244

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353

推荐阅读更多精彩内容