Spring Cloud入门教程(十):消息总线(Bus)

Spring Cloud入门教程系列:

本人和同事撰写的《Spring Cloud微服务架构开发实战》一书也在京东、当当等书店上架,大家可以点击这里前往购买,多谢大家支持和捧场!


在我们开始讲Spring Cloud Bus之前来看另外一个IT术语:ESB(Enterprise Service Bus)。ESB在维基百科中是这样描述的:

企业服务总线(Enterprise Service Bus,ESB)的概念是從服務導向架構(Service Oriented Architecture, SOA)發展而來。SOA描述了一种IT基础设施的应用集成模型;其中的软构件集是以一种定义清晰的层次化结构来相互耦合。一个ESB是一个预先组装的SOA实现,它包含了实现SOA分层目标所必需的基础功能部件。

在企业计算领域,企业服务总线是指由中间件基础设施产品技术实现的、 通过事件驱动和基于XML消息引擎,为更复杂的面向服务的架构提供的软件架构的构造物。企业服务总线通常在企业消息系统上提供一个抽象层,使得集成架构师能够不用编码而是利用消息的价值完成集成工作。

企业服务总线提供可靠消息传输,服务接入,协议转换,数据格式转换,基于内容的路由等功能,屏蔽了服务的物理位置,协议和数据格式。

其中,最重要的一句就是:企业服务总线通常在企业消息系统上提供一个抽象层,使得集成架构师能够不用编码而是利用消息的价值完成集成工作。 通俗一点来讲就是企业服务总线是架构在消息中间件之上的另外一个抽象层,使得我们可以不用关心消息相关的处理就可以完成业务逻辑的处理。

到这里你是不是有点突然明白Spring Cloud Bus 和 Spring Cloud Stream之间的关系了,刚开始接触这两个组件时,大部分都会迷惑到底这两者有什么区别?它们又有什么联系?Stream通过对消息中间件进行抽象封装,提供一个统一的接口供我们发送和监听消息,而Bus则是在Stream基础之上再次进行抽象封装,使得我们可以在不用理解消息发送、监听等概念的基础上使用消息来完成业务逻辑的处理。

那么Spring Cloud Bus是如何为我们实现的呢?一句话概括就是事件机制。

1. Spring的事件机制

在Spring框架中有一个事件机制,该机制是一个观察者模式的实现。观察者模式建立一种对象与对象之间的依赖关系,当一个对象(称之为:观察目标)发生改变时将自动通知其它对象(称之为:观察者),这些观察者将做出相应的反应。一个观察目标可以对应多个观察者,而且这些观察者之间没有相互联系,可以根据需要增加和删除观察者,使得系统更易于扩展。通过Spring事件机制可以达到如下目的:

  • 应用模块之间的解耦;
  • 对同一种事件可以根据需要定义多种处理方式;
  • 对主线应用不干扰,是一个极佳的开闭原则(OCP)实践。

当我们在应用中引入事件机制时需要借助Spring中以下接口或抽象类:

  • ApplicationEventPublisher: 这是一个接口,用来发布一个事件;
  • ApplicationEvent: 这是一个抽象类,用来定义一个事件;
  • ApplicationListener<E extends ApplicationEvent>: 这是一个接口,实现事件的监听。

其中Spring应用的上下文ApplicationContext默认是实现了ApplicationEventPublisher接口,因此在发布事件时我们可以直接使用ApplicationContext.publishEvent()方法来发送。

一个典型的Spring事件发送与监听代码如下。

1.1 定义事件

比如,我们定义一个用户事件:

/**
 * 用户事件
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class UserEvent extends ApplicationEvent {
    /** 消息类型:更新用户,值为: {@value} */
    public static final String ET_UPDATE = "update";

    // ========================================================================
    // fields =================================================================
    private String action;
    private User user;

    // ========================================================================
    // constructor ============================================================
    public UserEvent(User user) {
        super(user);
        this.user = user;
    }

    public UserEvent(User user, String action) {
        super(user);
        this.action = action;
        this.user = user;
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .add("action", this.getAction())
                .add("user", this.getUser()).toString();
    }

    // ==================================================================
    // setter/getter ====================================================
    public String getAction() {
        return action;
    }
    public void setAction(String action) {
        this.action = action;
    }

    public User getUser() {
        return user;
    }
    public void setUser(User user) {
        this.user = user;
    }
}

1.2 定义监听

我们定义一个用户事件监听器,当用户变更时做相应处理:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
 * 用户事件监听
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@Component
public class UserEventListener implements ApplicationListener<UserEvent> {
    protected Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @Override
    public void onApplicationEvent(UserEvent userEvent) {
        this.logger.debug("收到用户事件:{} ", userEvent);
        // TODO: 实现具体的业务处理
    }
}

用户事件监听比较简单,只需要实现ApplicationListener接口,进行相应处理即可。

1.3 发送消息

发送消息比较简单,我们也可以直接在Event中实现,比如我们将上面UserEvent更改为如下:

/**
 * 用户事件
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class UserEvent extends ApplicationEvent {
     // 省略了之前的代码
    /**
     * 发布事件
     */
    public void fire() {
        ApplicationContext context = ApplicationContextHolder.getApplicationContext();
        if(null != context) {
            logger.debug("发布事件:{}", this);
            context.publishEvent(this);            
        }else{
            logger.warn("无法获取到当前Spring上下文信息,不能够发布事件");
        }
    }
}

那么我们就可以在需要的地方通过下面的代码来发布事件了:

new UserEvent(user, UserEvent.ET_UPDATE).fire();

2. Spring Cloud Bus机制

我们上面了解了Spring的事件机制,那么Spring Cloud Bus又是如何将事件机制和Stream结合在一起的呢?总起来说机制如下:

  1. 在需要发布或者监听事件的应用中增加@RemoteApplicationEventScan注解,通过该注解就可以启动Stream中所说的消息通道的绑定;
  2. 对于事件发布,则需要继承ApplicationEvent的扩展类 -- RemoteApplicationEvent,当通过ApplicationContext.publishEvent()发布此种类型的事件时,Spring Cloud Bus就会对所要发布的事件进行包装,形成一个我们所熟知的消息,然后通过默认的springCloudBus消息通道发送到消息中间件;
  3. 对于事件监听者则不需要进行任何变更,仍旧按照上面的方式就可以实现消息的监听。但,需要注意的一点就是在消费的微服务工程中也必须定义第2步所定义的事件,并且需要保障全类名一致(如果不一致,则需要做一点工作)。

嗯,就是这么简单。通过Bus我们就可以像编写单体架构应用一样进行开发,而不需要关系什么消息中间件、主题、消息、通道呀等等一大堆概念。

你也行在怀疑,是不是这么简单呀。那好,让我们来看看是不是很容易就可以实现Stream中示例。

3. 重构Spring Cloud Stream中的示例

3.1 重构商品微服务

3.1.1 增加对Bus的依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

3.1.2 构建商品事件

我们将原来商品配置变更所发送的消息更改为一个事件,代码如下:

package io.twostepsfromjava.cloud.bus;


import com.google.common.base.MoreObjects;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;

/**
 * 商品事件
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class ProductEvent extends RemoteApplicationEvent {
    /** 消息类型:更新商品,值为: {@value} */
    public static final String ET_UPDATE = "update";
    /** 消息类型:删除商品,值为: {@value} */
    public static final String ET_DELETE = "delete";

    // ========================================================================
    // fields =================================================================
    private String action;
    private String itemCode;

    // ========================================================================
    // constructor ============================================================
    public ProductEvent() {
        super();
    }

    public ProductEvent(Object source, String originService, String destinationService, String action, String itemCode) {
        super(source, originService, destinationService);
        this.action = action;
        this.itemCode = itemCode;
    }

    @Override
    public String toString() {
        return MoreObjects.toStringHelper(this)
                .add("action", this.getAction())
                .add("itemCode", this.getItemCode()).toString();
    }

    // ==================================================================
    // setter/getter ====================================================
    public String getAction() {
        return action;
    }
    public void setAction(String action) {
        this.action = action;
    }

    public String getItemCode() {
        return itemCode;
    }
    public void setItemCode(String itemCode) {
        this.itemCode = itemCode;
    }
}

这里和之前事件构建函数不同的是:在构建一个事件时需要指定originServicedestinationService。对于事件发布者来说originService就是自己,而destinationService则是指将事件发布到那些微服务实例。destinationService配置的格式为:{serviceId}:{appContextId},在配置时serviceIdappContextId可以使用通配符,如果这两个变量都使用通配符的话(*:**),则事件将发布到所有的微服务实例。如只省略appContextId,则事件只会发布给指定微服务的所有实例,如:userservice:**,则只会将事件发布给userservice微服务。

3.1.3 实现事件发布

我们将商品微服务中商品变更中的代码修改为如下:

package io.twostepsfromjava.cloud.product.service;

import io.twostepsfromjava.cloud.bus.ProductEvent;
import io.twostepsfromjava.cloud.product.dto.ProductDto;
import io.twostepsfromjava.cloud.product.mq.ProductMsg;
import io.twostepsfromjava.cloud.product.util.ApplicationContextHolder;
import io.twostepsfromjava.cloud.product.util.RemoteApplicationEventPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;


/**
 * 商品服务
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@Service
public class ProductService {
    protected Logger logger = LoggerFactory.getLogger(ProductService.class);

    private List<ProductDto> productList;
    
    @Autowired
    public ProductService() {
        this.productList = this.buildProducts();
    }
    
     // 省略了不相干的代码
    
    /**
     * 保存或更新商品信息
     * @param productDto
     * @return
     */
    public ProductDto save(ProductDto productDto) {
        // TODO: 实现商品保存处理
        for (ProductDto sourceProductDto : this.productList) {
            if (sourceProductDto.getItemCode().equalsIgnoreCase(productDto.getItemCode())) {
                sourceProductDto.setName(sourceProductDto.getName() + "-new");
                sourceProductDto.setPrice(sourceProductDto.getPrice() + 100);
                productDto = sourceProductDto;
                break;
            }
        }

        // 发送商品消息
        // this.sendMsg(ProductMsg.MA_UPDATE, productDto.getItemCode());
        // 发布商品变更消息
        this.fireEvent(ProductEvent.ET_UPDATE, productDto);

        return productDto;
    }
     
     // 这里已不再使用该方法
    protected void sendMsg(String msgAction, String itemCode) {
        ProductMsg productMsg = new ProductMsg(msgAction, itemCode);
        this.logger.debug("发送商品消息:{} ", productMsg);

        // 发送消息
        // this.source.output().send(MessageBuilder.withPayload(productMsg).build());
    }

    protected void fireEvent(String eventAction, ProductDto productDto) {
        ProductEvent productEvent = new ProductEvent(productDto,
                ApplicationContextHolder.getApplicationContext().getId(), "*:**",
                eventAction, productDto.getItemCode());

        // 发布事件
        RemoteApplicationEventPublisher.publishEvent(productEvent);
    }
}

其中RemoteApplicationEventPublisher的源码如下:

package io.twostepsfromjava.cloud.product.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
import org.springframework.context.ApplicationContext;


/**
 * 远程事件发布者
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
public class RemoteApplicationEventPublisher {
    protected static Logger logger = LoggerFactory.getLogger(RemoteApplicationEventPublisher.class);

    /**
     * 发布一个事件
     * @param event
     */
    public static void publishEvent(RemoteApplicationEvent event){
        ApplicationContext context = ApplicationContextHolder.getApplicationContext();
        if(null != context) {
            context.publishEvent(event);
            logger.debug("已发布事件:{}", event);
        }else{
            logger.warn("无法获取到当前Spring上下文信息,不能够发布事件");
        }
    }
}

3.1.4 开启远程消息扫描

最后,修改微服务启动类,添加@RemoteApplicationEventScan注解:

package io.twostepsfromjava.cloud;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

/**
 * TwoStepsFromJava Cloud -- ProductDto Service 服务器
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@EnableDiscoveryClient
@RemoteApplicationEventScan
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

注意: 这里再次声明,远程事件必须定义在@RemoteApplicationEventScan注解所注解类的子包中,否则无法实现远程事件发布。

到这里我们的商品微服务重构就完成了。下面接着对Mall-Web微服务进行修改。

3.2 重构Mall-Web微服务

3.2.1 增加对Bus依赖

和商品微服务一样,就不重复了。

3.2.2 拷贝ProductEvent到本项目

呃,这个就不描述了。

3.2.3 实现事件监听处理

这个代码非常简单,不多说,具体如下:

package io.twostepsfromjava.cloud.web.mall.service;

import io.twostepsfromjava.cloud.bus.ProductEvent;
import io.twostepsfromjava.cloud.web.mall.dto.ProductDto;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;


/**
 * 远程事件监听
 *
 * @author CD826(CD826Dong@gmail.com)
 * @since 1.0.0
 */
@Component
public class ProductEventListener implements ApplicationListener<ProductEvent> {
    protected Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    protected ProductService productService;

    @Override
    public void onApplicationEvent(ProductEvent productEvent) {
        if (ProductEvent.ET_UPDATE.equalsIgnoreCase(productEvent.getAction())) {
            this.logger.debug("Web微服务收到商品变更事件,商品货号: {}", productEvent.getItemCode());
            // 重新获取该商品信息
            ProductDto productDto = this.productService.loadByItemCode(productEvent.getItemCode());
            if (null != productDto)
                this.logger.debug("重新获取到的商品信息为:{}", productDto);
            else
                this.logger.debug("货号为:{} 的商品不存在", productEvent.getItemCode());
        } else if (ProductEvent.ET_DELETE.equalsIgnoreCase(productEvent.getAction())) {
            this.logger.debug("Web微服务收到商品删除事件,所要删除商品货号为: {}", productEvent.getItemCode());
        } else {
            this.logger.debug("Web微服务收到未知商品事件: {}", productEvent);
        }
    }
}

3.2.3 开启远程消息扫描

和商品微服务一样,不论是事件的发布还是事件的监听都需要开启远程消息扫描。直接在微服务引导类中增加@RemoteApplicationEventScan注解即可。

3.3 测试

我们的重构到此就全部完成了,下面依次分别启动:

  1. Kafka服务器;
  2. 服务治理服务器: Service-discovery;
  3. 商品微服务: Product-Service;
  4. Mall-Web微服务。

然后,使用Postman访问原来的消息测试端点: http://localhost:2100/products/item-2。在商品微服务的控制台,可以看到类似下面输出:

商品微服务控制台输出

从输出日志中可以看到商品事件已经发布出去。如果这个时候我们查看Mall-Web微服务的控制台,可以看到下图的输出:

Mall-Web微服务控制台输出

从日志输出中可以看到Mall-Web微服务已经能够正确接收到商品变更事件,并进行相应的处理。

3.4 小结

从重构后的代码来说的确使用Bus会更容易理解,也更容易上手。这对于当使用场合比较简单会非常好,比如:广播。典型的应用就是Config中的配置刷新,当在项目中同时引入了Config和Bus时,就可以通过/bus/refresh端点实现配置更改的广播,从而让相应的微服务重新加载配置数据。

当然,Bus简便性的另外一层含义就是不够灵活,因此具体是在项目中使用Bug还是直接使用Stream就看你的需要了,总起来一句就是:够用就好。

你可以到这里下载本篇的代码。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容