使用Spring Request-Reply实现基于Kafka的同步请求响应

大家提到Kafka时第一印象就是它是一个快速的异步消息处理系统,不同于通常tomcat之类应用服务器和前端之间的请求/响应方式请求,客户端发出一个请求,必然会等到一个响应,这种方式对Kafka来说并不自然,Kafka是一种事件驱动方式,事件激活然后响应,这种方式对很多人接受起来不方便,为了实现请求 - 响应模型,开发人员必须在消息的生产者记录中构建相关ID系统,并将其与消息的消费者记录中的ID进行匹配,找到那个请求ID再使用Kafka的一个队列进行回复。

下图是本案例的演示架构图,这个案例是以同步行为返回两个数字总和的结果。

客户端  --->请求---> RESTcontroll ---> Spring replying kafka 模板 -->Kafka的请求主题 -->Spring Kafka监听器

  |                                                                                        |

  |<----- 响应 <----RESTcontroll <-- Spring replying kafka 模板 <-- Kafka的响应主题<---------|


下面我们开始看看开发这个演示步骤:

设置Springboot启动类

首先需要在pom.xml引入Spring kafka模板:

  <dependency>

        <groupId>org.springframework.kafka</groupId>

        <artifactId>spring-kafka</artifactId>

    </dependency>

代码如下:

@SpringBootApplication

public class RequestReplyKafkaApplication {

  public static void main(String[] args) {

    SpringApplication.run(RequestReplyKafkaApplication.class, args);

  }

}

设置Spring ReplyingKafkaTemplate

我们需要在Springboot配置类的KafkaConfig对Spring kafka模板进行配置:

@Configuration

public class KafkaConfig {

在这个配置类中,我们需要配置核心的ReplyingKafkaTemplate类,这个类继承了 KafkaTemplate 提供请求/响应的的行为;还有一个生产者工厂(参见 ProducerFactory 下面的代码)和 KafkaMessageListenerContainer。这是最基本的设置,因为请求响应模型需要对应到消息生产者和消费者的行为。

// 这是核心的ReplyingKafkaTemplate

@Bean

public ReplyingKafkaTemplate<String, Model, Model> replyKafkaTemplate(ProducerFactory<String, Model> pf, KafkaMessageListenerContainer<String, Model> container) {

  return new ReplyingKafkaTemplate<>(pf, container);

}

// 配件:监听器容器Listener Container to be set up in ReplyingKafkaTemplate

@Bean

public KafkaMessageListenerContainer<String, Model> replyContainer(ConsumerFactory<String, Model> cf) {

  ContainerProperties containerProperties = new ContainerProperties(requestReplyTopic);

  return new KafkaMessageListenerContainer<>(cf, containerProperties);

}

// 配件:生产者工厂Default Producer Factory to be used in ReplyingKafkaTemplate

@Bean

public ProducerFactory<String,Model> producerFactory() {

  return new DefaultKafkaProducerFactory<>(producerConfigs());

}

// 配件:kafka生产者的Kafka配置Standard KafkaProducer settings - specifying brokerand serializer

@Bean

public Map<String, Object> producerConfigs() {

  Map<String, Object> props = new HashMap<>();

  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,

            bootstrapServers);

  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,

            StringSerializer.class);

  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

  return props;

}

设置spring-Kafka的监听器

这与通常创建的Kafka消费者相同。唯一的变化是额外是在工厂中设置ReplyTemplate,这是必须的,因为消费者需要将计算结果放入到Kafka的响应主题。

//消费者工厂 Default Consumer Factory

@Bean

public ConsumerFactory<String, Model> consumerFactory() {

  return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new JsonDeserializer<>(Model.class));

}

// 并发监听器容器Concurrent Listner container factory

@Bean

public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Model>> kafkaListenerContainerFactory() {

  ConcurrentKafkaListenerContainerFactory<String, Model> factory = new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());

  // NOTE - set up of reply template 设置响应模板

  factory.setReplyTemplate(kafkaTemplate());

  return factory;

}

// Standard KafkaTemplate

@Bean

public KafkaTemplate<String, Model> kafkaTemplate() {

  return new KafkaTemplate<>(producerFactory());

}

编写我们的kafka消费者

这是过去创建的Kafka消费者一样。唯一的变化是附加了@SendTo注释,此注释用于在响应主题上返回业务结果。

@KafkaListener(topics = "${kafka.topic.request-topic}")

@SendTo

public Model listen(Model request) throws InterruptedException {

  int sum = request.getFirstNumber() + request.getSecondNumber();

  request.setAdditionalProperty("sum", sum);

  return request;

}

这个消费者用于业务计算,把客户端通过请求传入的两个数字进行相加,然后返回这个请求,通过@SendTo发送到Kafka的响应主题。

总结服务

现在,让我们将所有这些都结合在一起放在RESTcontroller,步骤分为几步,先创建生产者记录,并在记录头部中设置接受响应的Kafka主题,这样

把请求和响应在Kafka那里对应起来,然后通过模板发布消息到Kafka,再通过future.get()堵塞等待Kafka的响应主题发送响应结果过来。这时再

打印结果记录中的头部信息,会看到Spring自动生成相关ID。

@ResponseBody

@PostMapping(value="/sum",produces=MediaType.APPLICATION_JSON_VALUE,consumes=MediaType.APPLICATION_JSON_VALUE)

public  Model  sum(@RequestBody  Model  request)throws InterruptedException,ExecutionException {

  //创建生产者记录

  ProducerRecord<String,Model>  record  = new ProducerRecord<String,Model>(requestTopic,request);

  //在记录头部中设置响应主题

  record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, requestReplyTopic.getBytes()));

  //发布到kafka主题中

  RequestReplyFuture<String, Model, Model> sendAndReceive = kafkaTemplate.sendAndReceive(record);

  //确认生产者是否成功生产

  SendResult<String, Model> sendResult = sendAndReceive.getSendFuture().get();


  //打印结果记录中所有头部信息 会看到Spring自动生成的相关ID,这个ID是由消费端@SendTo 注释返回的值。

sendResult.getProducerRecord().headers().forEach(header -> System.out.println(header.key() + ":" + header.value().toString()));


  //获取消费者记录

  ConsumerRecord<String, Model> consumerRecord = sendAndReceive.get();


  //返回消费者结果

  return consumerRecord.value();

}

并发消费者

即使你要创建请求主题在三个分区中,三个并发的消费者的响应仍然合并到一个Kafka响应主题,这样,Spring侦听器的容器能够完成匹配相关ID的繁重工作。

整个请求/响应的模型是一致的。

现在我们可以再修改启动类如下:

@ComponentScan(basePackages = {

        "com.gauravg.config",

        "com.gauravg.consumer",

        "com.gauravg.controller",

        "com.gauravg.model"

    })

@SpringBootApplication

public class RequestReplyKafkaApplication {

  public static void main(String[] args) {

    SpringApplication.run(RequestReplyKafkaApplication.class, args);

  }

}

post数据:

{

  "firstNumber": "111",

  "secondNumber": "2222"

}

返回结果是:

{

    "firstNumber": 111,

    "secondNumber": 2222,

    "sum": 2333

}

在控制台输出记录头部信息:

kafka_replyTopic:[B@1f59b198

kafka_correlationId:[B@356a7326

__TypeId__:[B@1a9111f

可见,Spring自动生成聚合ID(correlationId),无需我们自己手工比对了。

欢迎工作一到五年的Java工程师朋友们加入Java架构开发: 855835163

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,367评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,959评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,750评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,226评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,252评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,975评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,592评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,497评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,027评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,147评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,274评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,953评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,623评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,143评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,260评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,607评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,271评论 2 358

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,678评论 18 139
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong阅读 22,423评论 1 92
  • 1.1 spring IoC容器和beans的简介 Spring 框架的最核心基础的功能是IoC(控制反转)容器,...
    simoscode阅读 6,721评论 2 22
  • 小柳 受得住守不住 都不过一颗心 笑也好泪也罢 也不过这一生 2016年12月24日 于成都
    小柳姐姐阅读 67评论 0 0
  • 是不是因为有愧于心 所以抱得太紧 会不会自己都不相信 才要强调真实 越心虚越解释 关于你和她公开的秘密 没什么了不...
    请别说对不起阅读 168评论 0 0