【Spring Cloud Stream】发送和接收消息

【前置文章】

【官方文档】

【本文内容】
本文基于Spring Cloud Stream 3.2.4版本,用RabbitMQ作为消息中间件。

  • 使用Supplier接口发送消息:
    • 新建一个bean返回type为Supplier,方法为hello(),会自动创建exchange=hello-out-0
    • 默认情况下每1秒钟由Spring框架触发1次。
    • 修改默认值:spring.cloud.stream.poller.fixedDelay=5000
    • Reactive风格编程,Supplier<Flux<String>> hello()Flux.just(String),只会触发一次。
    • Reactive风格编程,配合Thread.sleep和Schedulers.elastic(),可实现每1秒发送1次消息。
    • Reactive风格编程,使用@PollableBean轻松实现每1秒发送1次消息。
  • 主动发送消息到一个output
    • 使用StreamBridge发送消息到exchange=fooout-in-0,配合spring.cloud.stream.output-bindings=fooout使用。
    • 使用StreamBridge发送消息到动态destination中,如myDestination,没有该exchange则新建,有则跳过新建。
    • 使用ChannelInterceptor拦截消息,配合@GlobalChannelInterceptor使用。
  • Functional组合spring.cloud.function.definition=toUpperCase|wrapInQuotes
  • 多个input/output参数的Functions:参考官方文档
  • Batch ConsumersConsumer<List<Person>> batchPerson()
  • Batch ProducersFunction<String, List<Message<String>>> batch()
  • Spring IntegrationFlow作为functions:参考官方文档

1. 使用Supplier接口发送消息

基于java8的三个接口:

  • Comsumer相当于接收消息(即消费消息),所以只有input。
  • Function相关于接收消息送处理后再发送给下游,所以有input/output。
  • Supplier则是发送消息给下游,即生产者,配置上只有output。

ComsumerFunction在前置文章中有介绍,本章节介绍Supplier

首先,添加一个Supplier的bean:

    @Bean
    public Supplier<String> hello() {
        return () -> "hello world";
    }

在RabbitMQ console中可以看到创建了exchange:
image.png

并没有queue创建,因为我们的app只需要负责将消息发送给exchange,至于下游的接收,则需要下游app自己处理。

值得注意的是,不管是Function或是Consumer,因为都有input的配置,即消息从上游发送后,当前app中的方法就会被trigger(即方法的触发依赖说消息的到达)。

Supplier作为发送方,需要谁来触发呢?毕竟它没有绑定到任何一个input上。默认情况下,Spring框架会在启动的时候进行触发,并且每秒会触发一次。

为了验证上述结论,我们可以在项目中再额外定义一个Comsumer用来接收上述Supplier发送的消息:

    @Bean
    public Consumer<String> helloConsumer() {
        return str -> {
            log.info("Received: " + str);
        };
    }

在前置文章#4.5中讲到,如果有1个以上的functional bean,那么就需要显示定义出bean的名字,并且我们将Consumer的input和Supplier的output,指到了同一个exchange中:

spring.cloud.function.definition=helloConsumer;hello
spring.cloud.stream.bindings.helloConsumer-in-0.destination=hello
spring.cloud.stream.bindings.hello-out-0.destination=hello

启动程序后,可以看到在Consumer端源源不断的接收到来自Supplier的消息,而且间隔就是1秒:

image.png

默认的参数可以修改,详细看:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_polling_configuration_properties
比如我们加上spring.cloud.stream.poller.fixedDelay=5000,那么Supplier消息就会每隔5秒发送一次。

接下来,我们尝试使用reactive的编程风格来编写Supplier的bean,下面的这个方法,其实只会被调用一次:

    @Bean
    public Supplier<Flux<String>> hello() {
        return () -> Flux.just("hello from reactive style");
    }

这个比较好测试,我们直接在exchange=hello上面手动binding一个queue=my-queue:
image.png

启动程序后发现,只能接收到一条message,就再也没有新的message了。

官方给了reactive想要实现每隔一秒发送消息的示例,这里使用了Schedulers以及让这个Thread sleep 1秒:

@Bean
    public Supplier<Flux<String>> hello() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    // ignore
                }
                return "Hello from Supplier";
            }
        })).subscribeOn(Schedulers.elastic()).share();
    }

那么,有没有一种办法,可以用reactive的编程风格来编写Supplier并且用更为简单的方式实现每隔1秒发送一次消息呢?(为什么要执着于每隔多久发送一次,这里的use case可以是我们需要源源不断的从一个数据源中主动拉取数据,处理好之后再发送给下游)。

答案是有的,可以使用@PollableBean,以下的代码也会每隔1秒钟发送一次消息:

    @PollableBean
    public Supplier<Flux<String>> hello() {
        return () -> Flux.just("hello");
    }

2. 主动发送消息到一个output

很多时候我们并不能使用Supplier来处理消息,主要原因是有些驱动并不能在启动时就设定好,而是有可能是通过REST api来发送一个消息,对于这种use case,Spring Cloud Stream提供了两种方式:

2.1 方式1:使用StreamBridge发送消息到output
@RestController
public class StreamBridgeController {
    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("fooout")
    public String delegateToSupplier() {
        streamBridge.send("fooout-out-0", "hello from streamBridge");
        return "success";
    }
}

这时候就用到了上面#4.6的配置了,即没有绑定到具体的function的时候,需要配置spring.cloud.stream.input/output-binings属性

spring.cloud.stream.output-bindings=fooout

启动项目后,自动创建exchange:
image.png

为了测试我们手动创建一个queue=my-queue,并绑定到这个exchange上:
image.png

调用API:http://localhost:8080/fooout后,可以接收到消息:

image.png

在这里我们自动装载了一个StreamBridge bean,使得可以发送消息到指定的output binding上。另外streamBridge.send(..)发送的消息是一个Object对象,即不仅可以发送String,还可以发送一个对象作为message,默认的conversion是json格式,比如我们尝试发送一个Student对象:

        streamBridge.send("fooout-out-0", new Student(1, "test"));
image.png
2.2 方式2:使用StreamBridge发送到动态的destination中

我们尝试发送消息到exchange=myDestination中,但不同于上个例子,这里的myDestination没有在application.properties中配置,因此,这样的exchange name会被当作是动态的destination(目的地)。

如果exchange=myDestination不存在,则会先创建出来,如果存在则跳过创建。

    @GetMapping("dynamic")
    public String dynamicDestination() {
        streamBridge.send("myDestination", "hello dynamic destination.");
        return "success";
    }

动态新增的exchange:
image.png
2.3 StreamBridge.send(...)其它方法:
  • boolean send(String bindingName, Object data, MimeType outputContentType):默认的content type是application/json,但在send的时候可以指定content type。
  • boolean send(String bindingName, @Nullable String binderName, Object data):Spring Cloud Stream支持多个binder的case,比如从Kafka收消息再发送到RabbitMQ。通过StreamBridge发送时,可以指定相应的binderName。
2.4 使用拦截器拦截StreamBridge

StreamBridge的send方法内部用的是MessageChannel来建立output binding,MessageChannel位于spring-messaging.jar中,即这是Spring messaging中的代码,Jms用的也是这套,并不是Spring Cloud Stream独有。

关于MessageChannel的Interceptor,这篇文章更为详细:https://dzone.com/articles/spring-cloud-stream-channel-interceptor

MessageChannel可以用ChannelInterceptor来拦截。这个接口有6个方法,基本涵盖了接收消息的各个阶段:

  • preSend(...)
  • postSend(...)
  • afterSendCompletion(...)
  • preReceive(...)
  • postReceive(...)
  • afterReceiveCompletion(...)

先新建一个ChannelInterceptor的bean,Spring Cloud Stream并不会注入这个interceptor,除非这个bean加上@GlobalChannelInterceptor(patterns = "*")

    @Bean
    @GlobalChannelInterceptor(patterns = "*")
    public ChannelInterceptor customInterceptor() {
        return new ChannelInterceptor() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                String bytes = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
                log.info("interceptor message: {}", bytes);
                return message;
            }
        };
    }

【测试】
访问上述#2.1的地址:http://localhost:8080/fooout,控制台日志:

2023-01-24 12:23:57.308 INFO 25053 --- [nio-8080-exec-1] com.config.MessagInterceptor : interceptor message: {"id":1,"name":"test"}

访问上述#2.2的地址:http://localhost:8080/dynamic,控制台日志:

2023-01-24 12:24:45.136 INFO 25053 --- [nio-8080-exec-2] com.config.MessagInterceptor : interceptor message: hello dynamic destination.

上述的@GlobalChannelInterceptor(patterns = "*")会拦截所有的binding,如果只想拦截#2.1的fooout-out-0的binding,那么可以把patterns改成:@GlobalChannelInterceptor(patterns = "fooout-*")

3. Functional组合

使用functional编程还可以将一系列的functional组合起来使用。例如有两个Function bean:

    @Bean
    public Function<String, String> toUpperCase() {
        return s -> s.toUpperCase();
    }

    @Bean
    public Function<String, String> wrapInQuotes() {
        return s -> "\"" + s + "\"";
    }

超过1个以上的functional我们需要配置spring.cloud.function.definition,在前置文章#4.5中有介绍,可以使用;隔开。

但如果我们用|隔开,就表示functional的组合,即消息从toUpperCase接收,处理(这个方法的处理结果是把消息变成大写),然后再发送给下游,下游是wrapInQuotes方法(这个方法是把消息加上双引号),然后再发送给下游:

spring.cloud.function.definition=toUpperCase|wrapInQuotes

【测试】

image.png

我们测试的开始端是RabbitMQ console --> 在exchange=toUpperCasewrapInQuotes-in-0中发送消息 --> 消息被toUpperCase()接收 --> 转大写后交给下一个functional处理(这里也是通过发消息),即wrapInQuotes() --> 给消息加上双引号 --> 发送消息给toUpperCasewrapInQuotes-out-0 --> 为了测试方便,我们新建一个my-queue,来接收消息。

首先发送消息:
image.png

再看到消息拦截器preSend方法发送的消息:
image.png

最后my-queue收到的消息,可以看到转成了大写后又加了括号:
image.png

如果觉得名字太长,可以按前置文章的#4.2章,可以先给binding标下name:

spring.cloud.function.definition=toUpperCase|wrapInQuotes
spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=input
spring.cloud.stream.bindings.input.destination=my-input

这样原先的exchange=toUpperCasewrapInQuotes-in-0就会变为my-input

4. 多个input/output参数的Functions

为什么会有这种需求?

  • 大数据:你的数据来源可能不止一个地方。
  • 数据聚合:有时候需要我们将来自不同stream的数据进行聚合。

具体查看官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_functions_with_multiple_input_and_output_arguments

5. Batch Consumers/Producers

5.1 Batch Consumers

具体查看官方文档:

图来自文章,虽然文章用的是中间件是solace,但图表达的很好:

image.png

首先,定义一个bean为Consumer的方法,可以批量接收Person,并打印出来:

    @Bean
    public Consumer<List<Person>> batchPerson() {
        return persons -> {
            log.info("Received " + persons.size());
            persons.forEach(p -> log.info("received: " + p));
        };
    }

需要相应的配置:

spring.cloud.stream.bindings.batchPerson-in-0.consumer.batch-mode=true

spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.receive-timeout=200

【测试】
为了方便测试,写了测试代码:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("batch-consumer")
    public String batch() {
        rabbitTemplate.convertAndSend("batchPerson-in-0", "#", "{\"name\":\"aaa\"}");
        rabbitTemplate.convertAndSend("batchPerson-in-0", "#", "{\"name\":\"bbb\"}");
        return "success";
    }

浏览器调用http://localhost:8080/batch-consumer后,日志打印:

image.png

5.2 Batch Producers
image.png

首先,创建producer bean,该方法会接收1个字符串,然后再分发出4条消息:

@Bean
    public Function<String, List<Message<String>>> batch() {
        return p -> {
            List<Message<String>> list = new ArrayList<>();
            list.add(MessageBuilder.withPayload(p + ":1").build());
            list.add(MessageBuilder.withPayload(p + ":2").build());
            list.add(MessageBuilder.withPayload(p + ":3").build());
            list.add(MessageBuilder.withPayload(p + ":4").build());
            return list;
        };
    }

【测试】
为了方便测试,我们写一个Consumer来接收上述批量Producer的结果:

    @Bean
    public Consumer<String> batchConsumer() {
        return str -> log.info("received: " + str);
    }

超过1个functional时,需要显性指定名字,以及我们把batchConsumer的in指定为上述batch producer的下游:

spring.cloud.function.definition=batch;batchConsumer
spring.cloud.stream.bindings.batchConsumer-in-0.destination=batch-out-0

在RabbitMQ console,exchange=batch-in-0中发布1条消息:

image.png

batchConsumer端收到了4条消息:
image.png

6. Spring IntegrationFlow作为functions

Spring Cloud Stream和Spring Integration集成,详细参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_spring_integration_flow_as_functions

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容