channel首先起到作用就是作为一个传输Message的管道。
在Spring Integration 中,实现了各种各样的channel,各自给管道赋予了不同的特性,满足不同的使用需求。
1、顶级接口
(1)MessageChannel
MessageChannel 是Spring Integration消息通道的顶级接口:
在该接口中,没有提供从channel中接收消息的方法,这个是因为spring integration的channel有两个不同的机制来处理接收消息,polling 、subscription,分别有两个子接口来表示。
(2)PollableChannel
PollableChannel 具备轮询获得消息的能力,这种channel要求消息消费者或者框架去周期性的检查是否有可用的消息到达
(3)SubscribableChannel
SubscribableChannel 发送消息给订阅了MessageHanlder的订阅者
2.常用的channel
在Spring integration中,默认的channel是SubscribanleChannels,默认的传输方式是同步的
(1)DirectChannel,Spring Integration默认的消息通道,它允许将消息发送给为一个订阅者,然后阻碍发送直到消息被接收
默认的Channel,传输方式都是同步的方式,下例中的三个service-activator都是同步调用的,都是由一个线程来运行的
(2)QueueChannel,允许消息接收者轮询获得消息,用一个队列(queue)接收消息,队列的容量大小可配置
在默认的channel中,三个动作同步的方式顺序执行,需要等待一起完成。有时候为了更快的返回结果,可以将部分动作另起线程处理。如上例中emailConfirme的动作可以延后异步执行,此时可以用QueueChannel来实现。
(3)PublishSubscribeChannel,允许广播消息给所有订阅者
在上例中,如果需要将“订购”的消息通知多个处理单元,不光发给“结账”模块。这时候我们就需要一个PublishSubscribeChannel,但由于PublishSubscribeChannel不支持缓存(queue),我们创建一个PublishSubscribeChannel来实现发布功能,再使用一个bridge连接原来的queueChannel,将消息发布出去
(4)PriorityChannel,可按照优先级将数据存储到队列。在业务场景中,有时候需要按照优先级来将通道中的数据进行排序,这个时候我们需要PriorityChannel
3.怎样去选择合适的Channel来实现我们的业务
基本上是从以下几点考虑的
sharing context,是否共享上下文
Atomic boundaries,原子性
Buffering messages,是否缓存消息
Blocking and nonblocking operations,当大数据量的时候,选择什么策略处理消息
Consumption mode,消息的消费模式,点对点还是发布-订阅模式?
4.MessageDispatcher
当一个channel被多个消费者订阅,而一个消息过来时,消费者如何分配这个消息,此时就涉及消息如何分发的策略了。
Spring Integration提供了两个dispatcher的实现:
UnicastingDispatcher:消息只发送给其中一个消费者
BoradcastingDispatcher:消息广播给全部的消费者
UnicastingDispatcher将消息分发给一个消费者,这里涉及了一个分发策略,这里引入了一个LoadBlancingStartegy,来确定消费者
目前这个接口的唯一实现是RoundRobinLoadBalancingStrategy,顺次循环获取消费者
5.ChannelInterceptor
Spring Integration提供了一个便利:当消息发送到channel或者从channel获取的时间点,我们可以获得一个切入点,来对当前的发送或者获取动作执行操作
举个书中的例子
我们定义了一个bean (auditInterceptor),该bean的实现类实现了preSend方法,在channel的配置中使用<interceptors>引入使用该bean。当消息要发送到chargedBookings时,这个bean就可以先获得这个消息做一些操作。
另一个例子,spring Integration自己实现的,应对监控场景
wire-tap节点引入一个interceptor,它也实现了presend方法,会将发送过来的消息也给发送到momitoringChannel中
再一个例子
这个例子实现了只有“ChargedBooking”类型的Message才被接收。定义了一个拦截器,这个拦截器需要实现presend方法,在方法中判断消息的类型是否是“ChargedBooking”,这个连接器使用内定义的MessageSelectingInterceptor来实现,由于需要判断类型是否正确,在初始化这个MessageSelectingInterceptor时,给他设置了一个selector(PayloadTypeSelector)来实现目的。
MessageSelectingInterceptor-》PayloadTypeSelector-》具体类型,