一、机制详解
Spring Integration 是基于 Spring 框架构建的轻量级消息传递和企业集成模式实现框架,其核心是消息驱动机制,核心组件包括消息(Message)、消息通道(Channel)、消息处理器(Message Handler)等,通过这些组件的协同工作,实现系统内不同模块或者不同系统之间的集成和交互。下面结合代码,具体分析消息驱动机制的工作原理如下:
二、举个例子
1. 方法调用
调用 getFile
方法,传入远程文件的目录(包含文件名):
@Autowired
private SftpGatewayProvider sftpGatewayProvider;
// ...
File downloadedFile = sftpGatewayProvider.getFile("/remote/path/to/file.txt");
2. 消息转换与发送
@MessagingGateway
public interface SftpGatewayProvider {
/**
* 下载指定文件
*
* @param directory 远程文件名(包含目录)
* @return File
*/
@Gateway(requestChannel = "getChannelProvider")
File getFile(String directory);
}
由于 SftpGatewayProvider
接口被 @MessagingGateway
注解标记,Spring 会为其生成一个代理实现类。当调用 getFile
方法时,代理类会将方法调用转换为一个消息,消息的有效负载(payload
)就是传入的远程文件目录。@Gateway(requestChannel = "getChannelProvider")
注解指定了这个消息要发送到名为 getChannelProvider
的消息通道。
3. SessionFactory
配置
@Slf4j
@Configuration
@RequiredArgsConstructor
public class SftpProviderConfiguration {
private final SftpProperties sftpProperties;
/**
* sftpSessionFactory
*
* @return SessionFactory
*/
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactoryProvider() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
SftpProperties.SftpConfigProperties properties = sftpProperties.getConfig().get("provider");
factory.setHost(properties.getHost());
...
CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory = new CachingSessionFactory<>(factory);
cachingSessionFactory.setPoolSize(64);
cachingSessionFactory.setSessionWaitTimeout(10000);
return new CachingSessionFactory<>(factory);
}
/**
* SftpRemoteFileTemplate
* 于 SftpSessionFactory 创建的 sftp 文件操作模板类
* 用来创建 SftpOutboundGateway(见 SftpOutboundGateway 构造方法源码)
*
* @return SftpRemoteFileTemplate
*/
@Bean
public SftpRemoteFileTemplate sftpRemoteFileTemplateProvider() {
return new SftpRemoteFileTemplate(sftpSessionFactoryProvider());
}
@Bean
@ServiceActivator(inputChannel = "getChannelProvider")
public MessageHandler getHandlerProvider() {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpSessionFactoryProvider(), "get", "payload");
gateway.setOptions("-P");
gateway.setLocalDirectory(new File(Constant.PROVIDER_DEFAULT_TEMP_DIR));
return gateway;
}
}
在 SftpProviderConfiguration
类中,sftpSessionFactoryProvider
方法创建并配置了一个 SessionFactory
实例用于创建与 SFTP 服务器的会话,它包含了 SFTP 服务器的连接信息,如主机名、端口、用户名、密码等。在后续的下载操作中,会使用这个 SessionFactory
来建立与 SFTP 服务器的连接。
4. SftpOutboundGateway
处理消息
getHandlerProvider
方法创建了一个 SftpOutboundGateway
实例,并将其注册为处理 getChannelProvider
通道消息的处理器:
@Bean
@ServiceActivator(inputChannel = "getChannelProvider")
public MessageHandler getHandlerProvider() {
SftpOutboundGateway gateway = new SftpOutboundGateway(sftpSessionFactoryProvider(), "get", "payload");
gateway.setOptions("-P");
gateway.setLocalDirectory(new File(Constant.PROVIDER_DEFAULT_TEMP_DIR));
return gateway;
}
-
SftpOutboundGateway
接收到getChannelProvider
通道的消息后,会根据消息的有效负载(远程文件目录)和配置的SessionFactory
建立与 SFTP 服务器的连接。 -
"get"
表示要执行的 SFTP 命令,即下载文件。 -
"-P"
是get
命令的选项,通常表示在下载过程中保留文件的权限和时间戳。 -
setLocalDirectory
方法指定了下载文件的本地目录,这里使用Constant.PROVIDER_DEFAULT_TEMP_DIR
作为本地临时目录。
5. 文件下载与结果返回
SftpOutboundGateway
执行 get
命令,从 SFTP 服务器下载指定的文件到本地目录。下载完成后,会将下载的文件作为响应消息发送回消息通道,最终返回到调用 getFile
方法的地方。
总结
当调用 getFile 方法时,实际发生的流程如下:
1、方法调用转换:Spring 为 SftpGatewayProvider 接口生成的代理类会将 getFile 方法的调用转换为一个消息,消息的有效负载是传入的 directory 参数。
2、消息发送:消息被发送到 getChannelProvider 消息通道。
3、SFTP 适配器处理:配置在 getChannelProvider 通道上的 SFTP 出站通道适配器接收到消息后,会根据消息中的 directory 参数,连接到 SFTP 服务器,下载指定的文件。
4、结果返回:下载完成后,SFTP 适配器会将下载的文件作为响应消息发送回消息通道,最终返回到调用 getFile 方法的地方。
Spring Integration其他适用场景
这种消息驱动集成机制具有松耦合、可扩展性强等优点,因此适用于多种场景,以下是一些典型的应用场景:
- 文件传输与同步:就像上述代码中的 SFTP 文件下载场景,系统需要从远程服务器获取文件。此外,还可以用于文件上传、不同服务器之间的文件同步等场景,通过配置不同的消息处理器和通道,可以实现多种文件传输协议(如 FTP、FTPS 等)的集成。
- 系统集成:当企业中有多个不同的系统需要进行数据交互时,Spring Integration 可以作为中间层,将不同系统的接口进行封装和转换,实现系统之间的数据传输和业务流程整合。例如,将电商系统与物流系统集成,实现订单信息的同步和物流状态的更新。
- 数据处理与转换:在数据处理流程中,可能需要对数据进行清洗、转换、聚合等操作。可以通过消息驱动机制将不同的数据处理组件连接起来,每个组件作为一个消息处理器,负责完成特定的数据处理任务。例如,从数据库中读取数据,进行格式转换后发送到消息队列,供其他系统消费。
- 消息队列集成:与各种消息队列(如 RabbitMQ、Kafka 等)进行集成,实现异步消息处理。系统可以将消息发送到消息队列中,由其他消费者异步处理,提高系统的并发处理能力和可靠性。例如,在电商系统中,用户下单后将订单消息发送到消息队列,由库存系统、支付系统等异步处理。
- 定时任务与调度:结合 Spring 的定时任务功能,通过消息驱动机制实现定时数据处理和任务调度。例如,每天定时从远程服务器下载数据,进行数据分析和报表生成。