1 Spring Integration
1.1 基础概念
1.1.1 简介
Spring Integration
是 Spring
框架的一个重要扩展,其核心目标在于极大地简化企业集成模式的开发过程。它构建了一种基于消息的编程模型,让分布式系统中的系统集成变得更加轻松便捷。
基本概念:
- 消息:在
Spring Integration
的体系中,消息是信息传递的关键载体。它就像一个装满各种信息的“包裹”,不仅可以包含业务数据,还能携带头部信息、消息标签等内容。消息会沿着特定的通道(Channel
)在系统中有序传递。 - 通道(
Channel
):通道就像是消息在系统中流动的“高速公路”。Spring Integration
提供了多种不同类型的通道,- 直接通道(
Direct Channel
),它就像一条直达专线,能让消息快速高效地传递; - 发布 - 订阅通道(
Publish - Subscribe Channel
),类似于广播电台,可以将消息同时传递给多个订阅者; - 队列通道(
Queue Channel
),如同排队等待服务的队伍,消息会按照顺序依次进行处理
- 直接通道(
- 端点(
Endpoint
):端点是消息的生产者或者消费者,它们就像接力赛中的运动员,消息从一个端点传递到另一个端点,从而形成一个完整的消息处理流程。 - 适配器(
Adapter
):适配器是Spring Integration
与外部系统或者服务之间的“桥梁”。它能够将外部系统的消息“翻译”成Spring Integration
能够理解的消息格式,也可以将Spring Integration
的消息传递给外部系统。 - 过滤器(
Filter
):过滤器就像是一个严格的“门卫”,只有满足特定条件的消息才能通过它的“检查”。它在消息的路由、转换等过程中发挥着重要作用。 - 转换器(
Transformer
):转换器如同一个神奇的“魔法师”,能够将消息从一种形式转换为另一种形式,以满足系统的不同需求。它可以对数据格式进行转换,也可以修改消息体的内容。
1.1.2 与传统消息中间件的区别
Spring Integration
与传统消息中间件的区别与联系
- 区别
-
Spring Integration
是框架:Spring Integration
是基于Spring
构建的一个强大框架,它提供了一整套用于构建企业集成模式的工具和组件,就像一个功能齐全的“工具箱”。 - 传统消息中间件是产品:传统消息中间件通常是独立的产品,如
RabbitMQ、Apache Kafka、ActiveMQ
等,它们专注于提供可靠的消息传递服务,就像专业的“快递物流公司”。
-
- 联系
- 整合性:
Spring Integration
具有强大的整合能力,它可以与传统消息中间件完美集成。通过适配器,Spring Integration
能够与外部消息中间件进行通信,就像一个万能的“接口”,帮助企业集成系统与不同的消息中间件进行对接。 - 解耦与异步通信:和传统消息中间件一样,
Spring Integration
也支持解耦和异步通信的模式。通过消息的发布与订阅,系统组件之间可以实现解耦和松耦合,就像各个部门之间通过邮件进行沟通,彼此独立又能协同工作。 - 消息传递:
Spring Integration
和传统消息中间件都基于消息传递的模型。消息作为信息的载体,在系统中传递,实现不同组件之间的通信,就像信件在不同的收件人之间传递一样。
- 整合性:
总体而言,Spring Integration
提供了一种更加轻量级和灵活的方式来实现企业集成,而传统消息中间件更专注于提供可靠的消息传递服务。在实际应用中,我们可以根据具体的需求选择合适的技术和工具。
1.2 定义和配置消息通道
定义消息通道:在 Spring Integration
中,消息通道是消息在系统中传递的关键“管道”。
配置示例:
@Bean
public MessageChannel myChannel() {
return MessageChannels.direct().get();
}
配置消息通道的类型:Spring Integration
提供了多种不同类型的消息通道,如直接通道(Direct Channel
)、发布 - 订阅通道(Publish - Subscribe Channel
)、队列通道(Queue Channel
)等。我们可以根据实际需求选择合适的通道类型。
@Bean
public MessageChannel directChannel() {
return MessageChannels.direct().get();
}
@Bean
public MessageChannel publishSubscribeChannel() {
return MessageChannels.publishSubscribe().get();
}
@Bean
public MessageChannel queueChannel() {
return MessageChannels.queue().get();
}
消息通道的属性配置:可以通过配置消息通道的一些属性,如容量、过期时间等,来满足具体的需求。
@Bean
public MessageChannel myChannel() {
return MessageChannels.direct().capacity(10).get();
}
1.3 消息端点
1.3.1 消息端点作用和类型
消息端点作用:消息端点是消息的生产者
或者消费者
,它定义了消息的处理逻辑。消息从一个端点流向另一个端点,形成一个完整的消息处理流程。
消息端点的类型:
- 过滤器(
Filter
):用于过滤消息,只有满足特定条件的消息才能通过。它就像一个“筛子”,筛选出符合要求的消息。 - 转换器(
Transformer
):用于将消息从一种形式转换为另一种形式。它就像一个“变形金刚”,将消息变成不同的形态。 - 分发器(
Dispatcher
):用于将消息分发给不同的子通道,根据条件进行消息路由。它就像一个“交通指挥员”,根据不同的规则将消息引导到不同的方向。 - 服务激活器(
Service Activator
):用于将消息传递给特定的服务进行处理。它就像一个“调度员”,将消息分配给合适的服务进行处理。 - 消息处理器(
Message Handler
):用于处理消息,可以是一个 Java 方法、表达式、脚本等。它就像一个“工人”,负责对消息进行具体的处理。 - 消息源(
Message Source
):用于产生消息的端点,例如文件输入、JDBC 查询等。它就像消息的“源头”,不断地产生新的消息。 - 通道适配器(
Channel Adapter
):用于将外部系统的消息转换为Spring Integration
的消息格式。它就像一个“翻译官”,帮助不同系统之间进行消息的“交流”。
配置消息端点:
@ServiceActivator(inputChannel = "myChannel")
public void processMessage(Message<String> message) {
// 处理消息的逻辑
}
通过合理定义和配置消息通道以及消息端点,我们可以构建出灵活、可扩展的消息传递系统,实现消息在系统中的高效流动和处理。
1.3.2 消息处理器的使用方法
消息处理器是 Spring Integration
中用于处理消息的核心组件,它可以是一个 Java 方法、表达式、脚本等。以下是消息处理器的使用方法:
@ServiceActivator(inputChannel = "inputChannel")
public void handleMessage(String message) {
// 处理消息的逻辑
System.out.println("Received Message: " + message);
}
handleMessage
方法是一个消息处理器,通过 @ServiceActivator
注解将其与名为 inputChannel
的输入通道关联起来。当消息被发送到该通道时,该方法会被调用来处理消息。
表达式处理器:
<int:service-activator input-channel="inputChannel" expression="@myService.process(#payload)">
<int:poller fixed-rate="1000"/>
</int:service-activator>
在上述配置中,expression
属性定义了一个表达式,指定了消息处理的逻辑。这个表达式将调用名为 process
的方法,#payload
表示消息的载荷。
1.3.3 适配器与外部系统集成
适配器用于将外部系统的消息与 Spring Integration
进行集成,使得外部系统的消息能够在 Spring Integration
中流通。
1.3.3.1 文件适配器
<int-file:inbound-channel-adapter id="filesIn"
channel="inputChannel"
directory="file:${java.io.tmpdir}/input">
<int:poller fixed-rate="5000"/>
</int-file:inbound-channel-adapter>
上述配置使用文件适配器(<int-file:inbound-channel-adapter>
)来监听指定目录中的文件,并将文件内容发送到名为 inputChannel
的通道。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.messaging.MessageChannel;
import java.io.File;
@Configuration
public class FileIntegrationConfig {
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(
channel = "inputChannel",
poller = @Poller(fixedRate = "5000")
)
public FileReadingMessageSource filesIn() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(System.getProperty("java.io.tmpdir") + "/input"));
source.setFilter(new AcceptOnceFileListFilter<>()); // 避免重复读取
return source;
}
}
1.3.3.2 JDBC 适配器
<int-jdbc:inbound-channel-adapter id="jdbcInboundAdapter"
query="SELECT * FROM my_table"
channel="inputChannel">
<int:poller fixed-rate="10000"/>
</int-jdbc:inbound-channel-adapter>
上述配置中,JDBC 适配器(<int-jdbc:inbound-channel-adapter>
)从数据库执行查询,并将结果发送到 inputChannel 通道。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.jdbc.JdbcPollingChannelAdapter;
import org.springframework.messaging.MessageChannel;
import javax.sql.DataSource;
@Configuration
public class JdbcIntegrationConfig {
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(
channel = "inputChannel",
poller = @Poller(fixedRate = "10000")
)
public JdbcPollingChannelAdapter jdbcInboundAdapter(DataSource dataSource) {
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM my_table");
return adapter;
}
}
1.3.3.3 HTTP 适配器
<int-http:inbound-channel-adapter id="httpInboundAdapter"
channel="inputChannel"
path="/receiveMessage"
request-mapper="requestMapping">
<int:poller fixed-rate="10000"/>
</int-http:inbound-channel-adapter>
上述配置使用 HTTP 适配器(<int-http:inbound-channel-adapter>
)监听指定路径的 HTTP 请求,并将请求的消息发送到 inputChannel 通道。
@Bean
public IntegrationFlow httpInboundFlow() {
return IntegrationFlows
.from(Http.inboundGateway("/receiveMessage")
.requestMapping(r -> r.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.channel("inputChannel")
.get();
}
1.3.4 消息的格式转换与处理
消息转换是 Spring Integration
中常见的操作,用于将消息从一种格式或结构转换为另一种格式或结构,以满足系统的需求。以下是消息转换的实际应用场景和示例:
JSON 到对象的转换:
@Transformer(inputChannel = "jsonInputChannel", outputChannel = "objectOutputChannel")
public MyObject convertJsonToObject(String jsonString) {
// 使用 Jackson 库将 JSON 字符串转换为 Java 对象
return objectMapper.readValue(jsonString, MyObject.class);
}
在上述代码中,@Transformer
注解表示这是一个消息转换器,它将 jsonInputChannel
通道的 JSON 消息转换为 Java 对象,并将结果发送到 objectOutputChannel
通道。
对象到 JSON 的转换:
@Transformer(inputChannel = "objectInputChannel", outputChannel = "jsonOutputChannel")
public String convertObjectToJson(MyObject myObject) {
// 使用 Jackson 库将 Java 对象转换为 JSON 字符串
return objectMapper.writeValueAsString(myObject);
}
在这个例子中,消息转换器将 objectInputChannel
通道的 Java 对象转换为 JSON 字符串,并将结果发送到 jsonOutputChannel 通道。
1.3.5 路由器的作用和应用场景
路由器用于根据消息的内容或特征将消息路由到不同的通道,实现消息在系统中的分发。以下是路由器的实际应用场景和示例:
1.3.5.1 内容路由器
<int:router input-channel="inputChannel" expression="payload.type">
<int:mapping value="A" channel="channelA"/>
<int:mapping value="B" channel="channelB"/>
<int:mapping value="C" channel="channelC"/>
</int:router>
在上述配置中,内容路由器(<int:router>
)根据消息的 type
属性的值将消息路由到不同的通道。如果消息的 type 是 "A",则路由到 channelA;如果是 "B",则路由到 channelB,以此类推。
@Bean
public ExpressionEvaluatingRouter router() {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter("payload.type");
router.setChannelMapping("A", "channelA");
router.setChannelMapping("B", "channelB");
router.setChannelMapping("C", "channelC");
router.setDefaultOutputChannelName("defaultChannel"); // 可选:未匹配时默认通道
router.setInputChannelName("inputChannel");
return router;
}
或者
@Bean
public IntegrationFlow routingFlow() {
return IntegrationFlows.from("inputChannel")
.<MyPayloadType, String>route(p -> p.getType(), mapping -> mapping
.channelMapping("A", "channelA")
.channelMapping("B", "channelB")
.channelMapping("C", "channelC")
.defaultOutputChannel("defaultChannel"))
.get();
}
1.3.5.2 筛选器路由器
<int:router input-channel="inputChannel">
<int:mapping value="payload.type == 'A'" channel="channelA"/>
<int:mapping value="payload.type == 'B'" channel="channelB"/>
<int:mapping value="payload.type == 'C'" channel="channelC"/>
</int:router>
在这个例子中,路由器根据筛选条件将消息路由到不同的通道。只有满足条件的消息才会被路由到相应的通道。
@Bean
public IntegrationFlow filterRoutingFlow() {
return IntegrationFlows.from("inputChannel")
.<MyPayloadType, Boolean>route(p -> {
if ("A".equals(p.getType())) return "channelA";
if ("B".equals(p.getType())) return "channelB";
if ("C".equals(p.getType())) return "channelC";
return "defaultChannel";
}, router -> router
.channelMapping("channelA", "channelA")
.channelMapping("channelB", "channelB")
.channelMapping("channelC", "channelC")
.defaultOutputChannel("defaultChannel")
)
.get();
}
1.4 集成模式与设计模式
1.4.1 常见的集成模式
Spring Integration
提供了许多常见的集成模式,这些模式能够帮助开发人员构建可靠、可扩展的消息驱动系统。以下是一些常见的集成模式:
- 消息通道(
Message Channel
):它定义了消息在系统中传递的路径,是消息传递的重要媒介,就像城市中的道路,消息沿着它在系统中流动。 - 消息端点(
Message Endpoint
):定义了消息的生产者或者消费者,可以是服务激活器、消息处理器等。它就像道路上的车站,负责消息的发送和接收。 - 消息适配器(
Message Adapter
):用于将外部系统的消息转换为Spring Integration
的消息格式,实现系统与外部系统的集成。它就像一个翻译官,帮助不同语言的系统进行交流。 - 消息网关(
Message Gateway
):提供了对系统的入口,允许外部系统通过网关发送消息到系统中,或者从系统中获取消息。它就像系统的大门,控制着消息的进出。 - 消息转换器(
Message Transformer
):用于对消息的格式进行转换,将消息从一种表示形式转换为另一种,以满足系统的需求。它就像一个变形金刚,能把消息变成不同的样子。 - 消息过滤器(
Message Filter
):用于过滤消息,只有满足特定条件的消息才能通过,实现对消息的筛选。它就像一个筛子,把不符合要求的消息过滤掉。 - 消息路由器(
Message Router
):根据消息的内容、属性或条件将消息路由到不同的通道,实现消息的分发。它就像一个交通指挥员,根据不同的规则将消息引导到不同的方向。 - 聚合器(
Aggregator
):将多个相关的消息合并为一个消息,通常用于处理分散的消息片段。它就像一个拼图高手,把分散的消息碎片拼成完整的消息。 - 分裂器(
Splitter
):将一个消息拆分为多个消息,通常用于处理大块的消息内容。它就像一个切割工人,把大的消息切割成小块。 - 定时器(
Timer
):定期发送消息,用于实现定时任务或者轮询外部系统。它就像一个闹钟,定时提醒系统执行相应的操作。
1.4.2 设计模式构&建消息驱动
在构建消息驱动的系统时,我们可以借鉴一些设计模式来提高系统的可维护性、可扩展性和可测试性。
以下是一些常用的设计模式,特别是在消息驱动系统中的应用:
- 发布 - 订阅模式(
Publish - Subscribe Pattern
):在消息驱动系统中,通过使用发布 - 订阅模式可以实现消息的广播,允许多个组件订阅并接收相同的消息。它就像一个广播电台,向多个听众同时发送消息。 - 观察者模式(
Observer Pattern
):观察者模式可以用于实现消息的订阅和通知机制,在消息产生时通知所有的观察者。它就像一个新闻发布系统,当有新闻发布时,会通知所有订阅的用户。 - 策略模式(
Strategy Pattern
):策略模式可用于实现灵活的消息处理策略,根据不同的需求选择不同的消息处理算法。它就像一个工具箱,根据不同的任务选择不同的工具。 - 装饰者模式(
Decorator Pattern
):装饰者模式可用于动态地添加消息处理逻辑,如消息转换器、消息过滤器等。它就像给消息穿上不同的衣服,增加不同的功能。 - 责任链模式(
Chain of Responsibility Pattern
):责任链模式可用于实现消息处理管道,每个处理器负责处理特定类型的消息,形成一个处理链。它就像一个流水线,每个工人负责完成特定的工序。 - 命令模式(
Command Pattern
):命令模式可以将消息封装为命令对象,以支持撤销、重做等操作。 - 工厂模式(
Factory Pattern
):工厂模式可用于创建消息适配器、消息处理器等组件,提供一种灵活的对象创建方式。
1.5 流程和通道拦截的实现方法
在Spring Integration
中,可以通过拦截器(Interceptor
)来对消息通道和流程进行拦截和处理。拦截器允许在消息在通道中传递和处理的过程中执行自定义逻辑。
1.5.1 通道拦截
在通道级别,可以使用通道拦截器来对消息通道的发送和接收进行拦截。
<int:channel id="myChannel">
<int:interceptors>
<int:wire-tap channel="logChannel"/>
</int:interceptors>
</int:channel>
上述配置中,<int:wire-tap>
是一个通道拦截器,将通道上的所有消息发送到logChannel通道,以便记录日志或进行其他操作。Spring Integration
提供了一些内置的拦截器,如WireTap
、LoggingHandler
等,用于实现常见的拦截需求。
@Bean
public MessageChannel myChannel() {
DirectChannel channel = new DirectChannel();
channel.addInterceptor(new WireTap(logChannel())); // 添加 wire-tap 拦截器
return channel;
}
@Bean
public MessageChannel logChannel() {
return new DirectChannel(); // 用于接收拷贝消息
}
1.5.2 流程拦截
在流程级别,可以使用<int:advice>
和<int:expression-advice>
等元素来添加拦截器。
<int:service-activator input-channel="inputChannel" output-channel="outputChannel">
<int:advice-chain>
<int:expression-advice expression="payload.toUpperCase()"/>
</int:advice-chain>
</int:service-activator>
在上述配置中,<int:expression-advice>
是一个流程拦截器,它使用SpEL表达式将消息内容转换为大写。
@Bean
public MessageChannel inputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel outputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "inputChannel", outputChannel = "outputChannel", adviceChain = "toUpperCaseAdvice")
public MessageHandler serviceActivator() {
return message -> {
// 这个逻辑可以为空,因为表达式 advice 已经修改 payload
System.out.println("最终处理后的消息: " + message.getPayload());
};
}
@Bean(name = "toUpperCaseAdvice")
public Advice toUpperCaseAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setBeforeExpression("payload.toUpperCase()");
return advice;
}
1.5.3 自定义拦截器
可以通过实现ChannelInterceptor
接口或扩展ChannelInterceptorAdapter
类来创建自定义的通道拦截器
。同样,通过实现Advice
接口或扩展AbstractRequestHandlerAdvice
类可以创建自定义的流程拦截器
。
public class CustomChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// 在消息发送之前执行的逻辑
return message;
}
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
// 在消息发送完成后执行的逻辑
}
// 其他方法省略
}
<int:service-activator input-channel="inputChannel" output-channel="outputChannel">
<int:advice-chain>
<bean class="com.example.CustomExpressionAdvice"/>
</int:advice-chain>
</int:service-activator>
上述配置中,使用了自定义的流程拦截器CustomExpressionAdvice
,该类需实现Advice接口。
通过应用内置或自定义的拦截器,可以在消息处理的不同阶段执行自定义的逻辑,如日志记录、性能监控、消息转换等。
2 Spring Integration实战
传统订单处理流程往往涉及多个手动步骤,容易导致延迟和错误。为了提高电商平台的运作效率,客户那边要求我们开发一个自动化订单处理系统,从订单创建到支付、库存检查和发货全流程自动化处理,通过消息触发相关的业务逻辑,减少人为失误。
2.1 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
2.2 启动类Application
@SpringBootApplication
@IntegrationComponentScan
public class OrderProcessingApplication {
public static void main(String[] args) {
SpringApplication.run(OrderProcessingApplication.class, args);
}
}
2.3 配置消息通道
/**
* 配置消息通道
*/
@Configuration
publicclass IntegrationConfig {
/**
* 定义订单创建的消息通道
* @return DirectChannel 实例
*/
@Bean
public MessageChannel orderCreatedChannel() {
returnnew DirectChannel();
}
/**
* 定义支付处理的消息通道
* @return DirectChannel 实例
*/
@Bean
public MessageChannel paymentProcessedChannel() {
returnnew DirectChannel();
}
/**
* 定义库存检查的消息通道
* @return DirectChannel 实例
*/
@Bean
public MessageChannel inventoryCheckedChannel() {
returnnew DirectChannel();
}
/**
* 定义发货调度的消息通道
* @return DirectChannel 实例
*/
@Bean
public MessageChannel shipmentScheduledChannel() {
returnnew DirectChannel();
}
}
2.4 Controller
@RestController
@RequestMapping("/orders")
public class OrderController {
private final OrderService orderService;
@Autowired
public OrderController(OrderService orderService) {
this.orderService = orderService;
}
/**
* 创建订单的API端点
* @param order 订单对象
* @return 成功消息
*/
@PostMapping
public String createOrder(@RequestBody Order order) {
orderService.createOrder(order);
return "Order created successfully";
}
}
2.5 订单服务
/**
* 订单服务类,负责创建订单并将订单信息发送到相应的消息通道
*/
@Service
public class OrderService {
private final OrderGateway gateway;
@Autowired
public OrderService(OrderGateway gateway) {
this.gateway = gateway;
}
/**
* 创建订单并触发订单创建流程
* @param order 订单对象
*/
public void createOrder(Order order) {
System.out.println("Creating order: " + order.getOrderId());
// 将订单发送到orderCreatedChannel消息通道
gateway.processOrder(order);
}
}
2.6 支付处理服务
/**
* 支付处理服务类,监听订单创建消息通道,处理支付逻辑
*/
@Component
public class PaymentService {
private final OrderGateway gateway;
@Autowired
public PaymentService(OrderGateway gateway) {
this.gateway = gateway;
}
/**
* 处理订单创建消息,模拟支付处理
* @param order 订单对象
*/
@ServiceActivator(inputChannel = "orderCreatedChannel")
public void handleOrderCreation(@Payload Order order) {
System.out.println("Handling order creation for: " + order.getOrderId());
// 模拟支付处理
System.out.println("Processing payment for order: " + order.getOrderId());
// 假设支付成功
gateway.processPayment(order);
}
}
2.7 库存检查服务
/**
* 库存检查服务类,监听支付处理消息通道,检查库存并决定是否发货
*/
@Component
public class InventoryService {
private final OrderGateway gateway;
@Autowired
public InventoryService(OrderGateway gateway) {
this.gateway = gateway;
}
/**
* 处理支付处理消息,检查库存
* @param order 订单对象
*/
@ServiceActivator(inputChannel = "paymentProcessedChannel")
public void checkInventory(@Payload Order order) {
System.out.println("Checking inventory for product: " + order.getProductId());
// 模拟库存检查
boolean isInStock = true; // 假设库存充足
if (isInStock) {
System.out.println("Product is in stock.");
gateway.scheduleShipment(order);
} else {
System.out.println("Product is out of stock.");
// 通知用户的逻辑
}
}
}
```
## 2.8 发货调度服务
```java
/**
* 发货调度服务类,监听发货调度消息通道,安排发货
*/
@Component
public class ShipmentService {
/**
* 处理发货调度消息,模拟发货
* @param order 订单对象
*/
@ServiceActivator(inputChannel = "shipmentScheduledChannel")
public void scheduleShipment(@Payload Order order) {
System.out.println("Scheduling shipment for order: " + order.getOrderId());
// 模拟发货调度
System.out.println("Shipment scheduled for order: " + order.getOrderId());
}
}
```
## 2.9 订单处理相关的消息网关接口
```java
/**
* 定义订单处理相关的消息网关接口
*/
public interface OrderGateway {
/**
* 将订单发送到orderCreatedChannel消息通道
* @param order 订单对象
*/
@Gateway(requestChannel = "orderCreatedChannel")
void processOrder(Order order);
/**
* 将订单发送到paymentProcessedChannel消息通道
* @param order 订单对象
*/
@Gateway(requestChannel = "paymentProcessedChannel")
void processPayment(Order order);
/**
* 将订单发送到inventoryCheckedChannel消息通道
* @param order 订单对象
*/
@Gateway(requestChannel = "inventoryCheckedChannel")
void checkInventory(Order order);
/**
* 将订单发送到shipmentScheduledChannel消息通道
* @param order 订单对象
*/
@Gateway(requestChannel = "shipmentScheduledChannel")
void scheduleShipment(Order order);
}
```