温故知新:Spring Integration

一、机制详解

Spring Integration 是基于 Spring 框架构建的轻量级消息传递和企业集成模式实现框架,其核心是消息驱动机制,核心组件包括消息(Message)、消息通道(Channel)、消息处理器(Message Handler)等,通过这些组件的协同工作,实现系统内不同模块或者不同系统之间的集成和交互。下面结合代码,具体分析消息驱动机制的工作原理如下:

二、举个例子

1. 方法调用

调用 getFile 方法,传入远程文件的目录(包含文件名):

@Autowired
private SftpGatewayProvider sftpGatewayProvider;

// ...

File downloadedFile = sftpGatewayProvider.getFile("/remote/path/to/file.txt");

2. 消息转换与发送

@MessagingGateway
public interface SftpGatewayProvider {
    /**
     * 下载指定文件
     *
     * @param directory 远程文件名(包含目录)
     * @return File
     */
    @Gateway(requestChannel = "getChannelProvider")
    File getFile(String directory);
}

由于 SftpGatewayProvider 接口被 @MessagingGateway 注解标记,Spring 会为其生成一个代理实现类。当调用 getFile 方法时,代理类会将方法调用转换为一个消息,消息的有效负载(payload)就是传入的远程文件目录。@Gateway(requestChannel = "getChannelProvider") 注解指定了这个消息要发送到名为 getChannelProvider 的消息通道。

3. SessionFactory 配置

@Slf4j
@Configuration
@RequiredArgsConstructor
public class SftpProviderConfiguration {

    private final SftpProperties sftpProperties;

    /**
     * sftpSessionFactory
     *
     * @return SessionFactory
     */
    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactoryProvider() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        SftpProperties.SftpConfigProperties properties = sftpProperties.getConfig().get("provider");
        factory.setHost(properties.getHost());
       ...
        CachingSessionFactory<SftpClient.DirEntry> cachingSessionFactory = new CachingSessionFactory<>(factory);
        cachingSessionFactory.setPoolSize(64);
        cachingSessionFactory.setSessionWaitTimeout(10000);
        return new CachingSessionFactory<>(factory);
    }


    /**
     * SftpRemoteFileTemplate
     * 于 SftpSessionFactory 创建的 sftp 文件操作模板类
     * 用来创建 SftpOutboundGateway(见 SftpOutboundGateway 构造方法源码)
     *
     * @return SftpRemoteFileTemplate
     */
    @Bean
    public SftpRemoteFileTemplate sftpRemoteFileTemplateProvider() {
        return new SftpRemoteFileTemplate(sftpSessionFactoryProvider());
    }
    @Bean
    @ServiceActivator(inputChannel = "getChannelProvider")
    public MessageHandler getHandlerProvider() {
        SftpOutboundGateway gateway = new SftpOutboundGateway(sftpSessionFactoryProvider(), "get", "payload");
        gateway.setOptions("-P");
        gateway.setLocalDirectory(new File(Constant.PROVIDER_DEFAULT_TEMP_DIR));
        return gateway;
    }
}

SftpProviderConfiguration 类中,sftpSessionFactoryProvider 方法创建并配置了一个 SessionFactory 实例用于创建与 SFTP 服务器的会话,它包含了 SFTP 服务器的连接信息,如主机名、端口、用户名、密码等。在后续的下载操作中,会使用这个 SessionFactory 来建立与 SFTP 服务器的连接。

4. SftpOutboundGateway 处理消息

getHandlerProvider 方法创建了一个 SftpOutboundGateway 实例,并将其注册为处理 getChannelProvider 通道消息的处理器:

@Bean
@ServiceActivator(inputChannel = "getChannelProvider")
public MessageHandler getHandlerProvider() {
    SftpOutboundGateway gateway = new SftpOutboundGateway(sftpSessionFactoryProvider(), "get", "payload");
    gateway.setOptions("-P");
    gateway.setLocalDirectory(new File(Constant.PROVIDER_DEFAULT_TEMP_DIR));
    return gateway;
}
  • SftpOutboundGateway 接收到 getChannelProvider 通道的消息后,会根据消息的有效负载(远程文件目录)和配置的 SessionFactory 建立与 SFTP 服务器的连接。
  • "get" 表示要执行的 SFTP 命令,即下载文件。
  • "-P"get 命令的选项,通常表示在下载过程中保留文件的权限和时间戳。
  • setLocalDirectory 方法指定了下载文件的本地目录,这里使用 Constant.PROVIDER_DEFAULT_TEMP_DIR 作为本地临时目录。

5. 文件下载与结果返回

SftpOutboundGateway 执行 get 命令,从 SFTP 服务器下载指定的文件到本地目录。下载完成后,会将下载的文件作为响应消息发送回消息通道,最终返回到调用 getFile 方法的地方。

总结

当调用 getFile 方法时,实际发生的流程如下:
1、方法调用转换:Spring 为 SftpGatewayProvider 接口生成的代理类会将 getFile 方法的调用转换为一个消息,消息的有效负载是传入的 directory 参数。
2、消息发送:消息被发送到 getChannelProvider 消息通道。
3、SFTP 适配器处理:配置在 getChannelProvider 通道上的 SFTP 出站通道适配器接收到消息后,会根据消息中的 directory 参数,连接到 SFTP 服务器,下载指定的文件。
4、结果返回:下载完成后,SFTP 适配器会将下载的文件作为响应消息发送回消息通道,最终返回到调用 getFile 方法的地方。


Spring Integration其他适用场景

这种消息驱动集成机制具有松耦合、可扩展性强等优点,因此适用于多种场景,以下是一些典型的应用场景:

  1. 文件传输与同步:就像上述代码中的 SFTP 文件下载场景,系统需要从远程服务器获取文件。此外,还可以用于文件上传、不同服务器之间的文件同步等场景,通过配置不同的消息处理器和通道,可以实现多种文件传输协议(如 FTP、FTPS 等)的集成。
  2. 系统集成:当企业中有多个不同的系统需要进行数据交互时,Spring Integration 可以作为中间层,将不同系统的接口进行封装和转换,实现系统之间的数据传输和业务流程整合。例如,将电商系统与物流系统集成,实现订单信息的同步和物流状态的更新。
  3. 数据处理与转换:在数据处理流程中,可能需要对数据进行清洗、转换、聚合等操作。可以通过消息驱动机制将不同的数据处理组件连接起来,每个组件作为一个消息处理器,负责完成特定的数据处理任务。例如,从数据库中读取数据,进行格式转换后发送到消息队列,供其他系统消费。
  4. 消息队列集成:与各种消息队列(如 RabbitMQ、Kafka 等)进行集成,实现异步消息处理。系统可以将消息发送到消息队列中,由其他消费者异步处理,提高系统的并发处理能力和可靠性。例如,在电商系统中,用户下单后将订单消息发送到消息队列,由库存系统、支付系统等异步处理。
  5. 定时任务与调度:结合 Spring 的定时任务功能,通过消息驱动机制实现定时数据处理和任务调度。例如,每天定时从远程服务器下载数据,进行数据分析和报表生成。

参考:
1、Spring Integration 简介

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 去年年底,据国际互联网工程任务组( IETF )消息,HTTP-over-QUIC 实验性协议将被重命名为 HTT...
    abel_cao阅读 372评论 0 1
  • Spring Web MVC Spring Web MVC 是包含在 Spring 框架中的 Web 框架,建立于...
    Hsinwong阅读 22,604评论 1 92
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,079评论 19 139
  • 第一讲 大数据的基本特征是什么? 数据规模巨大(Volume) 传统数据 GB->TB大数据 TB->PB 数据类...
    zzhy在学习阅读 2,262评论 0 1
  • (一) Web 技术演变 静态页面阶段页面以HTML文件的形式存储于服务器中,浏览器请求直接返回该HTML文件。 ...
    MarcusMa阅读 287评论 0 1