49 Spring Cloud Stream 入门案例

准备工作

案例中通过rabbitMQ作为消息中间件,完成SpringCloud Stream的案例。需要自行安装

消息生产者

(1)创建工程引入依赖

   <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

(2)定义bingding

发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:

public interface Source {
    String OUTPUT = "output";
    
    @Output("output")
    MessageChannel output();
}

这就接口声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生 产者。

(3)配置application.yml

spring:
 cloud:
   stream:
     bindings:
       output:
         destination: muziwk-default
         contentType: text/plain

  • contentType:用于指定消息的类型。具体可以参考 spring cloud stream docs
  • destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 muziwk-default 的所有消息队列中。

(4)测试发送消息

@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {
    @Autowired
    @Qualifier("output")
    MessageChannel output;
    @Override
    public void run(String... strings) throws Exception {
        //发送MQ消息
        output.send(MessageBuilder.withPayload("hello world").build());
   }
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
   }
}

消息消费者

(1)创建工程引入依赖

<dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

(2)定义bingding

同发送消息一致,在Spring Cloud Stream中接受消息,需要定义一个接口,如下是内置的一个接口。

public interface Sink {
    String INPUT = "input";
    @Input("input")
    SubscribableChannel input();
}

注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。 这就接口声明了一个 binding 命名为 “input” 。

(3)配置application.yml

spring:
 cloud:
   stream:
     bindings:
       input:
         destination: muziwk-default 

destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 muziwk-default

(4) 测试

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    // 监听 binding 为 Sink.INPUT 的消息
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("监听收到:" + message.getPayload());
   }
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
   }
}


  • 定义一个 class (这里直接在启动类),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 @StreamListener(Processor.INPUT),方法参数为 Message 。
  • 所有发送 exchange 为“muziwk-default ” 的MQ消息都会被投递到这个临时队列,并且触发上述的方 法。
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容