Spring Integration之消息传递讲解

1 Spring Integration

1.1 基础概念

1.1.1 简介

Spring IntegrationSpring 框架的一个重要扩展,其核心目标在于极大地简化企业集成模式的开发过程。它构建了一种基于消息的编程模型,让分布式系统中的系统集成变得更加轻松便捷。

基本概念:

  • 消息:在 Spring Integration 的体系中,消息是信息传递的关键载体。它就像一个装满各种信息的“包裹”,不仅可以包含业务数据,还能携带头部信息、消息标签等内容。消息会沿着特定的通道(Channel)在系统中有序传递。
  • 通道(Channel):通道就像是消息在系统中流动的“高速公路”。Spring Integration 提供了多种不同类型的通道,
    • 直接通道(Direct Channel),它就像一条直达专线,能让消息快速高效地传递;
    • 发布 - 订阅通道(Publish - Subscribe Channel),类似于广播电台,可以将消息同时传递给多个订阅者;
    • 队列通道(Queue Channel),如同排队等待服务的队伍,消息会按照顺序依次进行处理
  • 端点(Endpoint):端点是消息的生产者或者消费者,它们就像接力赛中的运动员,消息从一个端点传递到另一个端点,从而形成一个完整的消息处理流程。
  • 适配器(Adapter):适配器是 Spring Integration 与外部系统或者服务之间的“桥梁”。它能够将外部系统的消息“翻译”成 Spring Integration 能够理解的消息格式,也可以将 Spring Integration 的消息传递给外部系统。
  • 过滤器(Filter):过滤器就像是一个严格的“门卫”,只有满足特定条件的消息才能通过它的“检查”。它在消息的路由、转换等过程中发挥着重要作用。
  • 转换器(Transformer):转换器如同一个神奇的“魔法师”,能够将消息从一种形式转换为另一种形式,以满足系统的不同需求。它可以对数据格式进行转换,也可以修改消息体的内容。

1.1.2 与传统消息中间件的区别

Spring Integration 与传统消息中间件的区别与联系

  • 区别
    • Spring Integration 是框架:Spring Integration 是基于 Spring 构建的一个强大框架,它提供了一整套用于构建企业集成模式的工具和组件,就像一个功能齐全的“工具箱”。
    • 传统消息中间件是产品:传统消息中间件通常是独立的产品,如 RabbitMQ、Apache Kafka、ActiveMQ 等,它们专注于提供可靠的消息传递服务,就像专业的“快递物流公司”。
  • 联系
    • 整合性:Spring Integration 具有强大的整合能力,它可以与传统消息中间件完美集成。通过适配器,Spring Integration 能够与外部消息中间件进行通信,就像一个万能的“接口”,帮助企业集成系统与不同的消息中间件进行对接。
    • 解耦与异步通信:和传统消息中间件一样,Spring Integration 也支持解耦和异步通信的模式。通过消息的发布与订阅,系统组件之间可以实现解耦和松耦合,就像各个部门之间通过邮件进行沟通,彼此独立又能协同工作。
    • 消息传递:Spring Integration 和传统消息中间件都基于消息传递的模型。消息作为信息的载体,在系统中传递,实现不同组件之间的通信,就像信件在不同的收件人之间传递一样。

总体而言,Spring Integration 提供了一种更加轻量级和灵活的方式来实现企业集成,而传统消息中间件更专注于提供可靠的消息传递服务。在实际应用中,我们可以根据具体的需求选择合适的技术和工具。

1.2 定义和配置消息通道

定义消息通道:在 Spring Integration 中,消息通道是消息在系统中传递的关键“管道”。
配置示例:

@Bean
public MessageChannel myChannel() {
    return MessageChannels.direct().get();
}

配置消息通道的类型:Spring Integration 提供了多种不同类型的消息通道,如直接通道(Direct Channel)、发布 - 订阅通道(Publish - Subscribe Channel)、队列通道(Queue Channel)等。我们可以根据实际需求选择合适的通道类型。

@Bean
public MessageChannel directChannel() {
    return MessageChannels.direct().get();
}
@Bean
public MessageChannel publishSubscribeChannel() {
    return MessageChannels.publishSubscribe().get();
}
@Bean
public MessageChannel queueChannel() {
    return MessageChannels.queue().get();
}

消息通道的属性配置:可以通过配置消息通道的一些属性,如容量、过期时间等,来满足具体的需求。

@Bean
public MessageChannel myChannel() {
    return MessageChannels.direct().capacity(10).get();
}

1.3 消息端点

1.3.1 消息端点作用和类型

消息端点作用:消息端点是消息的生产者或者消费者,它定义了消息的处理逻辑。消息从一个端点流向另一个端点,形成一个完整的消息处理流程。
消息端点的类型:

  • 过滤器(Filter):用于过滤消息,只有满足特定条件的消息才能通过。它就像一个“筛子”,筛选出符合要求的消息。
  • 转换器(Transformer):用于将消息从一种形式转换为另一种形式。它就像一个“变形金刚”,将消息变成不同的形态。
  • 分发器(Dispatcher):用于将消息分发给不同的子通道,根据条件进行消息路由。它就像一个“交通指挥员”,根据不同的规则将消息引导到不同的方向。
  • 服务激活器(Service Activator):用于将消息传递给特定的服务进行处理。它就像一个“调度员”,将消息分配给合适的服务进行处理。
  • 消息处理器(Message Handler):用于处理消息,可以是一个 Java 方法、表达式、脚本等。它就像一个“工人”,负责对消息进行具体的处理。
  • 消息源(Message Source):用于产生消息的端点,例如文件输入、JDBC 查询等。它就像消息的“源头”,不断地产生新的消息。
  • 通道适配器(Channel Adapter):用于将外部系统的消息转换为 Spring Integration 的消息格式。它就像一个“翻译官”,帮助不同系统之间进行消息的“交流”。

配置消息端点:

@ServiceActivator(inputChannel = "myChannel")
public void processMessage(Message<String> message) {
    // 处理消息的逻辑
}

通过合理定义和配置消息通道以及消息端点,我们可以构建出灵活、可扩展的消息传递系统,实现消息在系统中的高效流动和处理。

1.3.2 消息处理器的使用方法

消息处理器是 Spring Integration 中用于处理消息的核心组件,它可以是一个 Java 方法、表达式、脚本等。以下是消息处理器的使用方法:

@ServiceActivator(inputChannel = "inputChannel")
public void handleMessage(String message) {
    // 处理消息的逻辑
    System.out.println("Received Message: " + message);
}

handleMessage 方法是一个消息处理器,通过 @ServiceActivator 注解将其与名为 inputChannel 的输入通道关联起来。当消息被发送到该通道时,该方法会被调用来处理消息。

表达式处理器:

<int:service-activator input-channel="inputChannel" expression="@myService.process(#payload)">
    <int:poller fixed-rate="1000"/>
</int:service-activator>

在上述配置中,expression 属性定义了一个表达式,指定了消息处理的逻辑。这个表达式将调用名为 process 的方法,#payload 表示消息的载荷。

1.3.3 适配器与外部系统集成

适配器用于将外部系统的消息与 Spring Integration 进行集成,使得外部系统的消息能够在 Spring Integration 中流通。

1.3.3.1 文件适配器

<int-file:inbound-channel-adapter id="filesIn"
                                 channel="inputChannel"
                                 directory="file:${java.io.tmpdir}/input">
    <int:poller fixed-rate="5000"/>
</int-file:inbound-channel-adapter>

上述配置使用文件适配器(<int-file:inbound-channel-adapter>)来监听指定目录中的文件,并将文件内容发送到名为 inputChannel 的通道。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.messaging.MessageChannel;

import java.io.File;

@Configuration
public class FileIntegrationConfig {
    @Bean
    public MessageChannel inputChannel() {
        return new DirectChannel();
    }   
    @Bean
    @InboundChannelAdapter(
        channel = "inputChannel",
        poller = @Poller(fixedRate = "5000")
    )
    public FileReadingMessageSource filesIn() {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(new File(System.getProperty("java.io.tmpdir") + "/input"));
        source.setFilter(new AcceptOnceFileListFilter<>()); // 避免重复读取
        return source;
    }
}

1.3.3.2 JDBC 适配器

<int-jdbc:inbound-channel-adapter id="jdbcInboundAdapter"
                                  query="SELECT * FROM my_table"
                                  channel="inputChannel">
    <int:poller fixed-rate="10000"/>
</int-jdbc:inbound-channel-adapter>

上述配置中,JDBC 适配器(<int-jdbc:inbound-channel-adapter>)从数据库执行查询,并将结果发送到 inputChannel 通道。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.MessageChannel;

import javax.sql.DataSource;

@Configuration
public class JdbcIntegrationConfig {
    @Bean
    public MessageChannel inputChannel() {
        return new DirectChannel();
    }
    @Bean
    @InboundChannelAdapter(
        channel = "inputChannel",
        poller = @Poller(fixedRate = "10000")
    )
    public JdbcPollingChannelAdapter jdbcInboundAdapter(DataSource dataSource) {
        JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM my_table");
        return adapter;
    }
}

1.3.3.3 HTTP 适配器

<int-http:inbound-channel-adapter id="httpInboundAdapter"
                                  channel="inputChannel"
                                  path="/receiveMessage"
                                  request-mapper="requestMapping">
    <int:poller fixed-rate="10000"/>
</int-http:inbound-channel-adapter>

上述配置使用 HTTP 适配器(<int-http:inbound-channel-adapter>)监听指定路径的 HTTP 请求,并将请求的消息发送到 inputChannel 通道。

@Bean
public IntegrationFlow httpInboundFlow() {
    return IntegrationFlows
            .from(Http.inboundGateway("/receiveMessage")
                    .requestMapping(r -> r.methods(HttpMethod.POST))
                    .requestPayloadType(String.class))
            .channel("inputChannel")
            .get();
}

1.3.4 消息的格式转换与处理

消息转换是 Spring Integration 中常见的操作,用于将消息从一种格式或结构转换为另一种格式或结构,以满足系统的需求。以下是消息转换的实际应用场景和示例:

JSON 到对象的转换:

@Transformer(inputChannel = "jsonInputChannel", outputChannel = "objectOutputChannel")
public MyObject convertJsonToObject(String jsonString) {
    // 使用 Jackson 库将 JSON 字符串转换为 Java 对象
    return objectMapper.readValue(jsonString, MyObject.class);
}

在上述代码中,@Transformer 注解表示这是一个消息转换器,它将 jsonInputChannel 通道的 JSON 消息转换为 Java 对象,并将结果发送到 objectOutputChannel 通道。

对象到 JSON 的转换:

@Transformer(inputChannel = "objectInputChannel", outputChannel = "jsonOutputChannel")
public String convertObjectToJson(MyObject myObject) {
    // 使用 Jackson 库将 Java 对象转换为 JSON 字符串
    return objectMapper.writeValueAsString(myObject);
}

在这个例子中,消息转换器将 objectInputChannel 通道的 Java 对象转换为 JSON 字符串,并将结果发送到 jsonOutputChannel 通道。

1.3.5 路由器的作用和应用场景

路由器用于根据消息的内容或特征将消息路由到不同的通道,实现消息在系统中的分发。以下是路由器的实际应用场景和示例:

1.3.5.1 内容路由器

<int:router input-channel="inputChannel" expression="payload.type">
    <int:mapping value="A" channel="channelA"/>
    <int:mapping value="B" channel="channelB"/>
    <int:mapping value="C" channel="channelC"/>
</int:router>

在上述配置中,内容路由器(<int:router>)根据消息的 type 属性的值将消息路由到不同的通道。如果消息的 type 是 "A",则路由到 channelA;如果是 "B",则路由到 channelB,以此类推。

@Bean
public ExpressionEvaluatingRouter router() {
    ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.type");
    router.setChannelMapping("A", "channelA");
    router.setChannelMapping("B", "channelB");
    router.setChannelMapping("C", "channelC");
    router.setDefaultOutputChannelName("defaultChannel"); // 可选:未匹配时默认通道
    router.setInputChannelName("inputChannel");
    return router;
}

或者

@Bean
public IntegrationFlow routingFlow() {
    return IntegrationFlows.from("inputChannel")
            .<MyPayloadType, String>route(p -> p.getType(), mapping -> mapping
                    .channelMapping("A", "channelA")
                    .channelMapping("B", "channelB")
                    .channelMapping("C", "channelC")
                    .defaultOutputChannel("defaultChannel"))
            .get();
}

1.3.5.2 筛选器路由器

<int:router input-channel="inputChannel">
    <int:mapping value="payload.type == 'A'" channel="channelA"/>
    <int:mapping value="payload.type == 'B'" channel="channelB"/>
    <int:mapping value="payload.type == 'C'" channel="channelC"/>
</int:router>

在这个例子中,路由器根据筛选条件将消息路由到不同的通道。只有满足条件的消息才会被路由到相应的通道。

@Bean
public IntegrationFlow filterRoutingFlow() {
    return IntegrationFlows.from("inputChannel")
            .<MyPayloadType, Boolean>route(p -> {
                if ("A".equals(p.getType())) return "channelA";
                if ("B".equals(p.getType())) return "channelB";
                if ("C".equals(p.getType())) return "channelC";
                return "defaultChannel";
            }, router -> router
                .channelMapping("channelA", "channelA")
                .channelMapping("channelB", "channelB")
                .channelMapping("channelC", "channelC")
                .defaultOutputChannel("defaultChannel")
            )
            .get();
}

1.4 集成模式与设计模式

1.4.1 常见的集成模式

Spring Integration 提供了许多常见的集成模式,这些模式能够帮助开发人员构建可靠、可扩展的消息驱动系统。以下是一些常见的集成模式:

  • 消息通道(Message Channel):它定义了消息在系统中传递的路径,是消息传递的重要媒介,就像城市中的道路,消息沿着它在系统中流动。
  • 消息端点(Message Endpoint):定义了消息的生产者或者消费者,可以是服务激活器、消息处理器等。它就像道路上的车站,负责消息的发送和接收。
  • 消息适配器(Message Adapter):用于将外部系统的消息转换为 Spring Integration 的消息格式,实现系统与外部系统的集成。它就像一个翻译官,帮助不同语言的系统进行交流。
  • 消息网关(Message Gateway):提供了对系统的入口,允许外部系统通过网关发送消息到系统中,或者从系统中获取消息。它就像系统的大门,控制着消息的进出。
  • 消息转换器(Message Transformer):用于对消息的格式进行转换,将消息从一种表示形式转换为另一种,以满足系统的需求。它就像一个变形金刚,能把消息变成不同的样子。
  • 消息过滤器(Message Filter):用于过滤消息,只有满足特定条件的消息才能通过,实现对消息的筛选。它就像一个筛子,把不符合要求的消息过滤掉。
  • 消息路由器(Message Router):根据消息的内容、属性或条件将消息路由到不同的通道,实现消息的分发。它就像一个交通指挥员,根据不同的规则将消息引导到不同的方向。
  • 聚合器(Aggregator):将多个相关的消息合并为一个消息,通常用于处理分散的消息片段。它就像一个拼图高手,把分散的消息碎片拼成完整的消息。
  • 分裂器(Splitter):将一个消息拆分为多个消息,通常用于处理大块的消息内容。它就像一个切割工人,把大的消息切割成小块。
  • 定时器(Timer):定期发送消息,用于实现定时任务或者轮询外部系统。它就像一个闹钟,定时提醒系统执行相应的操作。

1.4.2 设计模式构&建消息驱动

在构建消息驱动的系统时,我们可以借鉴一些设计模式来提高系统的可维护性、可扩展性和可测试性。
以下是一些常用的设计模式,特别是在消息驱动系统中的应用:

  • 发布 - 订阅模式(Publish - Subscribe Pattern):在消息驱动系统中,通过使用发布 - 订阅模式可以实现消息的广播,允许多个组件订阅并接收相同的消息。它就像一个广播电台,向多个听众同时发送消息。
  • 观察者模式(Observer Pattern):观察者模式可以用于实现消息的订阅和通知机制,在消息产生时通知所有的观察者。它就像一个新闻发布系统,当有新闻发布时,会通知所有订阅的用户。
  • 策略模式(Strategy Pattern):策略模式可用于实现灵活的消息处理策略,根据不同的需求选择不同的消息处理算法。它就像一个工具箱,根据不同的任务选择不同的工具。
  • 装饰者模式(Decorator Pattern):装饰者模式可用于动态地添加消息处理逻辑,如消息转换器、消息过滤器等。它就像给消息穿上不同的衣服,增加不同的功能。
  • 责任链模式(Chain of Responsibility Pattern):责任链模式可用于实现消息处理管道,每个处理器负责处理特定类型的消息,形成一个处理链。它就像一个流水线,每个工人负责完成特定的工序。
  • 命令模式(Command Pattern):命令模式可以将消息封装为命令对象,以支持撤销、重做等操作。
  • 工厂模式(Factory Pattern):工厂模式可用于创建消息适配器、消息处理器等组件,提供一种灵活的对象创建方式。

1.5 流程和通道拦截的实现方法

Spring Integration中,可以通过拦截器(Interceptor)来对消息通道和流程进行拦截和处理。拦截器允许在消息在通道中传递和处理的过程中执行自定义逻辑。

1.5.1 通道拦截

在通道级别,可以使用通道拦截器来对消息通道的发送和接收进行拦截。

<int:channel id="myChannel">
    <int:interceptors>
        <int:wire-tap channel="logChannel"/>
    </int:interceptors>
</int:channel>

上述配置中,<int:wire-tap>是一个通道拦截器,将通道上的所有消息发送到logChannel通道,以便记录日志或进行其他操作。Spring Integration提供了一些内置的拦截器,如WireTapLoggingHandler等,用于实现常见的拦截需求。

@Bean
public MessageChannel myChannel() {
    DirectChannel channel = new DirectChannel();
    channel.addInterceptor(new WireTap(logChannel())); // 添加 wire-tap 拦截器
    return channel;
}

@Bean
public MessageChannel logChannel() {
    return new DirectChannel(); // 用于接收拷贝消息
}

1.5.2 流程拦截

在流程级别,可以使用<int:advice><int:expression-advice>等元素来添加拦截器。

<int:service-activator input-channel="inputChannel" output-channel="outputChannel">
    <int:advice-chain>
        <int:expression-advice expression="payload.toUpperCase()"/>
    </int:advice-chain>
</int:service-activator>

在上述配置中,<int:expression-advice>是一个流程拦截器,它使用SpEL表达式将消息内容转换为大写。

@Bean
public MessageChannel inputChannel() {
    return new DirectChannel();
}

@Bean
public MessageChannel outputChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "inputChannel", outputChannel = "outputChannel", adviceChain = "toUpperCaseAdvice")
public MessageHandler serviceActivator() {
    return message -> {
        // 这个逻辑可以为空,因为表达式 advice 已经修改 payload
        System.out.println("最终处理后的消息: " + message.getPayload());
    };
}

@Bean(name = "toUpperCaseAdvice")
public Advice toUpperCaseAdvice() {
    ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
    advice.setBeforeExpression("payload.toUpperCase()");
    return advice;
}

1.5.3 自定义拦截器

可以通过实现ChannelInterceptor接口或扩展ChannelInterceptorAdapter类来创建自定义的通道拦截器。同样,通过实现Advice接口或扩展AbstractRequestHandlerAdvice类可以创建自定义的流程拦截器

public class CustomChannelInterceptor implements ChannelInterceptor {
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        // 在消息发送之前执行的逻辑
        return message;
    }

    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
        // 在消息发送完成后执行的逻辑
    }

    // 其他方法省略
}
<int:service-activator input-channel="inputChannel" output-channel="outputChannel">
    <int:advice-chain>
        <bean class="com.example.CustomExpressionAdvice"/>
    </int:advice-chain>
</int:service-activator>

上述配置中,使用了自定义的流程拦截器CustomExpressionAdvice,该类需实现Advice接口。

通过应用内置或自定义的拦截器,可以在消息处理的不同阶段执行自定义的逻辑,如日志记录、性能监控、消息转换等。

2 Spring Integration实战

传统订单处理流程往往涉及多个手动步骤,容易导致延迟和错误。为了提高电商平台的运作效率,客户那边要求我们开发一个自动化订单处理系统,从订单创建到支付、库存检查和发货全流程自动化处理,通过消息触发相关的业务逻辑,减少人为失误。

2.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

2.2 启动类Application

@SpringBootApplication
@IntegrationComponentScan
public class OrderProcessingApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderProcessingApplication.class, args);
    }
}

2.3 配置消息通道


/**
 * 配置消息通道
 */
@Configuration
publicclass IntegrationConfig {

    /**
     * 定义订单创建的消息通道
     * @return DirectChannel 实例
     */
    @Bean
    public MessageChannel orderCreatedChannel() {
        returnnew DirectChannel();
    }

    /**
     * 定义支付处理的消息通道
     * @return DirectChannel 实例
     */
    @Bean
    public MessageChannel paymentProcessedChannel() {
        returnnew DirectChannel();
    }

    /**
     * 定义库存检查的消息通道
     * @return DirectChannel 实例
     */
    @Bean
    public MessageChannel inventoryCheckedChannel() {
        returnnew DirectChannel();
    }

    /**
     * 定义发货调度的消息通道
     * @return DirectChannel 实例
     */
    @Bean
    public MessageChannel shipmentScheduledChannel() {
        returnnew DirectChannel();
    }
}

2.4 Controller

@RestController
@RequestMapping("/orders")
public class OrderController {

    private final OrderService orderService;

    @Autowired
    public OrderController(OrderService orderService) {
        this.orderService = orderService;
    }

    /**
     * 创建订单的API端点
     * @param order 订单对象
     * @return 成功消息
     */
    @PostMapping
    public String createOrder(@RequestBody Order order) {
        orderService.createOrder(order);
        return "Order created successfully";
    }
}

2.5 订单服务

/**
 * 订单服务类,负责创建订单并将订单信息发送到相应的消息通道
 */
@Service
public class OrderService {

    private final OrderGateway gateway;

    @Autowired
    public OrderService(OrderGateway gateway) {
        this.gateway = gateway;
    }

    /**
     * 创建订单并触发订单创建流程
     * @param order 订单对象
     */
    public void createOrder(Order order) {
        System.out.println("Creating order: " + order.getOrderId());
        // 将订单发送到orderCreatedChannel消息通道
        gateway.processOrder(order);
    }
}

2.6 支付处理服务

/**
 * 支付处理服务类,监听订单创建消息通道,处理支付逻辑
 */
@Component
public class PaymentService {

    private final OrderGateway gateway;

    @Autowired
    public PaymentService(OrderGateway gateway) {
        this.gateway = gateway;
    }

    /**
     * 处理订单创建消息,模拟支付处理
     * @param order 订单对象
     */
    @ServiceActivator(inputChannel = "orderCreatedChannel")
    public void handleOrderCreation(@Payload Order order) {
        System.out.println("Handling order creation for: " + order.getOrderId());
        // 模拟支付处理
        System.out.println("Processing payment for order: " + order.getOrderId());
        // 假设支付成功
        gateway.processPayment(order);
    }
}

2.7 库存检查服务

/**
 * 库存检查服务类,监听支付处理消息通道,检查库存并决定是否发货
 */
@Component
public class InventoryService {

    private final OrderGateway gateway;

    @Autowired
    public InventoryService(OrderGateway gateway) {
        this.gateway = gateway;
    }

    /**
     * 处理支付处理消息,检查库存
     * @param order 订单对象
     */
    @ServiceActivator(inputChannel = "paymentProcessedChannel")
    public void checkInventory(@Payload Order order) {
        System.out.println("Checking inventory for product: " + order.getProductId());
        // 模拟库存检查
        boolean isInStock = true; // 假设库存充足
        if (isInStock) {
            System.out.println("Product is in stock.");
            gateway.scheduleShipment(order);
        } else {
            System.out.println("Product is out of stock.");
            // 通知用户的逻辑
        }
    }
}
```
## 2.8 发货调度服务
```java
/**
 * 发货调度服务类,监听发货调度消息通道,安排发货
 */
@Component
public class ShipmentService {

    /**
     * 处理发货调度消息,模拟发货
     * @param order 订单对象
     */
    @ServiceActivator(inputChannel = "shipmentScheduledChannel")
    public void scheduleShipment(@Payload Order order) {
        System.out.println("Scheduling shipment for order: " + order.getOrderId());
        // 模拟发货调度
        System.out.println("Shipment scheduled for order: " + order.getOrderId());
    }
}
```
## 2.9 订单处理相关的消息网关接口
```java
/**
 * 定义订单处理相关的消息网关接口
 */
public interface OrderGateway {

    /**
     * 将订单发送到orderCreatedChannel消息通道
     * @param order 订单对象
     */
    @Gateway(requestChannel = "orderCreatedChannel")
    void processOrder(Order order);

    /**
     * 将订单发送到paymentProcessedChannel消息通道
     * @param order 订单对象
     */
    @Gateway(requestChannel = "paymentProcessedChannel")
    void processPayment(Order order);

    /**
     * 将订单发送到inventoryCheckedChannel消息通道
     * @param order 订单对象
     */
    @Gateway(requestChannel = "inventoryCheckedChannel")
    void checkInventory(Order order);

    /**
     * 将订单发送到shipmentScheduledChannel消息通道
     * @param order 订单对象
     */
    @Gateway(requestChannel = "shipmentScheduledChannel")
    void scheduleShipment(Order order);
}
```
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容