创建消息Producer服务,配置消息主题
这里新建了一个controller,controller里面有一个接口,这个接口是用来生产消息的,然后最后面是一个方法,这个方法用来消费消息
package com.example.streamsample;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.function.Consumer;
@RestController
@RequestMapping
@Configuration
@Slf4j
public class Controller {
private final StreamBridge streamBridge;
@Autowired
public Controller(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
@GetMapping("/streamTest")
public void streamTest(String msg){
streamBridge.send("mytopic", msg); // 向kafka发送消息
}
// 消费者,bean名称必须与配置文件中的bindingName一致
@Bean
public Consumer<String> receiver() {
return msg -> log.info("receive msg:{}", msg);
}
}
在stream 3.0之后,就如之前帖子讲的,我们可以使用方法来直接消费消息:
// 消费者,bean名称必须与配置文件中的bindingName一致
@Bean
public Consumer<String> receiver() {
return msg -> log.info("receive msg:{}", msg);
}
而如上,topic就默认为 receiver-in-0*****
而在下面的配置中就是把这个信道绑定到mytopic,这样的话,生产者发送到的主题就跟消费的主题一致了。
spring.cloud.stream.bindings.receiver-in-0.destination: mytopic
启动多个Consumer节点测试消息广播
然后我们可以以不同的端口号来启动两次这个服务
注意把这个并行打开就可以同时启动两次这个服务了。
RabbitMQ界面查看广播组(Exchanges)
然后我们可以调用postman来生产一条消息,然后分别到两个服务的控制台看,发现都接受到了这个topic发出的消息了: