SpringCloud 进阶: 消息驱动(入门)Spring Cloud Stream【Greenwich.SR3】

 我的博客:程序员笑笑生,欢迎浏览博客!

 上一章 SpringCloud 基础教程(十二)-Zipkin分布式链路追踪系统搭建当中,我们使用Zipkin搭建完整的实时数据追踪系统。本章开始我们将进入Spring Cloud的更高阶的内容部分,首先从消息驱动Spring Cloud Stream开始。

前言

 消息驱动,顾明思议,在企业级应用中,消息中间件经常用于处理非同步场景、消息通知、应用解耦等。常用的有RabbitMq、kafka、Redis等消息队列等。Spring Cloud Stream是一个构建事件消息驱动的微服务框架,提供了一个灵活的编程模型。并基于Spring的基础之上,支持发布-订阅模型、消费者分组、数据分片等功能。

一、Stream 应用模型

file
  • Middleware: 消息中间件,如RabbitMq等
  • Binder:可以认为是适配器,用来将Stream与中间连接起来的,不同的Binder对应不同的中间件,需要我们配置
  • Application Core:由Stream封装的消息机制,很少情况下自定义开发
  • inputs:输入,可以自定义开发
  • outputs:输出,可以自定义开发

接下来快速开始,主要就是针对以上几个组件进行不同的配置。

二、快速开始

 接下来,我们以RabbitMQ为例(消息队列的环境搭建整这里不做过多的介绍,本章以Stream为主),新建2个Maven工程,分别当做消息生产者(server-receiver)、消息生产者(server-sender),在2个项目中引入Stream依赖和Stream对RabbitMq的依赖,在生产者单独的添加web的依赖,为了能够通过HTTP调用发送信息:

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
       <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream</artifactId>
</dependency>

2.1 server-receiver消费者

 启动主类:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
/**
 * @EnableBinding 表示告诉当前应用,增加消息通道的监听功能
 * 监听Sink类中名为input的输入通道:
 */
@SpringBootApplication
@EnableBinding(Sink.class)
public class ReceiverApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReceiverApplication.class, args);
    }
    /**
     * 监听rabbitmq的消息,具体什么队列,什么topic,通过配置信息application获取
     *
     * @param msg
     */
    @StreamListener(Sink.INPUT)
    public void reader(String msg) {
        System.out.print("receiver {}:" + msg);
    }
}

application.yml配置:

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: mytopic
            binder: defaultRabbit
      binders:
         defaultRabbit:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
server:
  port: 8081

 具体配置详解,spring.cloud.stream为前缀:

bindings配置:

  • input:表示channelName,这里为什么是input,是因为启动类中@EnableBinding(Sink.class)注解当中配置Sink接口,该接口中默认定义了channelName的名称,当然我们也可以自己写Sink接口
  • destination:消息中间件的Topic
  • binder:当前bingding绑定的对应的适配器,该实例表示适配rabbitmq,名称默认为defaultRabbit,可以自定义,接着需要配置该名称对应的类型,环境信息等

binders配置:

  • defaultRabbit:binder配置的适配器的名称,和spring.cloud.stream.bindings.input.binder值一样
  • environment:表示当前binder对应的配置信息

2.2 生产者server-sender

 SenderApplication启动类,添加@EnableBinding注解:

import com.microservice.stream.controller.SenderSource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
/**
 * @EnableBinding(SenderSource.class) 表示监听Stream通道功能
 * 
 * SenderSource为自定义的通道接口
 * 
 */
@SpringBootApplication
@EnableBinding(SenderSource.class)
public class SenderApplication {
   
    public static void main(String[] args) {
        SpringApplication.run(SenderApplication.class,args);

    }
}

 自定义SenderSource接口,参考org.springframework.cloud.stream.messaging.Source,将channel的名称改成和消费者的Sink的channel名称一样。

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface SenderSource {
    /**
     * Name of the output channel.
     */
    String OUTPUT = "input";
    /**
     * @return output channel
     */
    @Output(SenderSource.OUTPUT)
    MessageChannel output();
}

 编写控制器,通过HTTP发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SenderController {
    @Autowired
    SenderSource source;
    
    @RequestMapping("/send")
    public String sender(String msg) {
        source.output().send(MessageBuilder.withPayload(msg).build());
        return "ok";
    }

}

 applicaiton.yml配置,配置和消费者的配置一样

spring:
  cloud:
    stream:
      bindings:
         input:
            destination: mytopic
            binder: defaultRabbit
      binders:
         defaultRabbit:
             type: rabbit
             environment:
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672
server:
  port: 8081                     

2.3 启动接受者和消费者,发送消息

 首先启动消费者,看启动日志,我们看到程序声明了一个名称为:mytopic.anonymous.88A97a5vQ9Ox07GnNBlKYQ的队列,并且绑定了mytopic 主题,创建了一个连上rabbit的连接:

file

我们看看rabbit的web页面队列列表中就有了新增了一个队列,并且绑定了mytopic主题:

file

然后再启动生产者server-sender,在启动日志中我们也看到了应用创建了到对应的消息队列的连接:

file

接下来我们通过HTTP发送信息:http://localhost:8081/send/?msg=test,在服务消费者的日志中,监听到了对应的消息:

file

通过以上的简单的实例,我们体验了Spring Cloud Stream在提供消息驱动服务方面非常的方便。

三、代码分析

3.1 @EnableBinding注解

 @EnableBinding表示告诉应用增加了通道监听功能,可以是一个或者多个,可以传入Sink和Source类,Sink和Souce可以自定义

3.2 Sink和Soure

 我们首先看看Sink类和Source类,

Sink

/**
 * Bindable interface with one input channel.
 */
public interface Sink {

    /**
     * Input channel name.
     */
    String INPUT = "input";

    /**
     * @return input channel.
     */
    @Input(Sink.INPUT)
    SubscribableChannel input();

}

Source

package org.springframework.cloud.stream.messaging;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {

    /**
     * Name of the output channel.
     */
    String OUTPUT = "output";

    /**
     * @return output channel
     */
    @Output(Source.OUTPUT)
    MessageChannel output();

}

 Sink和Source一样是一个接口类型, INPUT、OUTPUT表示channel名称,@Input、@Output表示注入参数为对应的channel名称,在我们的上文中我们自定义了SenderSource类型的接口,为了和Sink的channel名称一样。

  • Sink:单一的输入通道
  • Source:单一的输出通道

 Spring会为每一个标注了@Output,@Input的管道接口生成一个实现类

3.3 Spring-messaging的抽象

 在Sink和source的接口中,我们注意到了MessageChannel、SubscribableChannel类,在spring框架中,spring-message模块对消息处理的抽象类:

对象 说明
Message 消息
MessageHandler 处理消息的单元
MessageChannel 发送/接受传输消息的信道,单项的
SubscribableChannel 继承MessageChannel,传送消息到所有的订阅者
ExecutorSubscribableChannel 继承SubscribableChannel,异步线程池传输消息

四、配置文件

  配置文件的格式如下,<>表示我们可以自定义:

spring:
  cloud:
    stream:
      bindings:
         <channel-name>:   #channel名称
            destination: mytopic  # 发布-订阅模型的消息主题topic
            binder: defaultRabbit  #binder(适配器的名称)
      binders:
         <binder-name>:  # 根据binder配置一样的名称
             type: rabbit  # 中间件的类型
             environment:  # 中间件实例的环境
                spring:
                 rabbitmq:
                     host: localhost
                     port: 5672

 我们可以定义多个binding,分别为binding绑定相同或不同的Binder

总结

 本章我们初步的介绍了Spring Cloud Stream,通过对Steam的应用模型一节通过消息生产者和消费者模型实现了简单的发布-订阅的模型,对Stream有了一些了解,Steram的功能远不止于此。在后期的介绍中,我还将继续深入的介绍Stream更多的内容。

file
file
file

SpringCloud基础教程(一)-微服务与SpringCloud

SpringCloud基础教程(二)-服务发现 Eureka

SpringCloud基础教程(三)-Eureka进阶

SpringCloud 基础教程(四)-配置中心入门

SpringCloud基础教程(五)-配置中心热生效和高可用

SpringCloud 基础教程(六)-负载均衡Ribbon

SpringCloud 基础教程(七)-Feign声明式服务调用

SpringCloud 基础教程(八)-Hystrix熔断器(上)

SpringCloud 基础教程(九)-Hystrix服务监控(下)

SpringCloud 基础教程(十)-Zull服务网关

SpringCloud 基础教程(十一)- Sleuth 调用链追踪简介

SpringCloud 基础教程(十二)-Zipkin 分布式链路追踪系统搭建

SpringCloud 进阶: 消息驱动(入门) Spring Cloud Stream【Greenwich.SR3】

更多精彩内容,请期待...

本文由博客一文多发平台 OpenWrite 发布!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,383评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,522评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,852评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,621评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,741评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,929评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,076评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,803评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,265评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,582评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,716评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,395评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,039评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,027评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,488评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,612评论 2 350

推荐阅读更多精彩内容