Storm通信机制
Storm和Flink都是流计算系统中比较成熟的开源实现,在一些流计算平台选型的文章中,两者也常常被拿来重点比较,但是两个平台都在持续更新,一些选型比较和压测结果文章都可能已经过时。而一个系统的核心实现往往比较少改动,因此本文尝试从底层实现的层面来比较两者的异同。
在上一篇文章中,我们自顶向下地了解了Flink的内部通信机制,本文按照同样的结构来说明Storm的通信机制。代码基于tag v1.2.2。
当我们编写一个Storm topology时,也是在描绘一个有向图。图中的边相当于由一系列数据记录组成的数据流,图中的顶点相当于我们对数据流的处理。
先简单介绍Storm的术语使用,并和Flink作对比,方便熟悉其中任意一个系统的同学更快地理解这些术语。术语介绍部分主要参考来源为Storm官方文档Concepts和
Understanding the Parallelism of a Storm Topology
- Spout: Spout是有向运行图中没有上游的节点,是数据流的起点。相当于Flink中的SourceOperator。
- Bolt: 对数据流进行处理的节点,相当于Flink中的TransformationOperator
- Component:Bolt和Spout的统称 ,相当于Flink中的Operator
- Executor: 代表了Component的并行度。每个executor都会有一个工作线程,负责处理用户定义的业务逻辑,一个发送线程,负责把数据发送到下游队列。一个Executor中可以运行同一个component的多个实例。(相当于Flink中的SubTask)
- Task: task代表一个component实例, 同一个Execotor中的task会被串行执行(要区别于并行度)
- Worker: 代表一个进程,一个Work中可以运行多个Executor,相当于Flink中的TaskManager
术语对比:
Storm | Flink |
---|---|
Worker | TaskManager |
Executor | Task |
Spout | SourceOperator |
Bolt | TransformationOperator |
Component | Operator |
Stream | Stream |
Task | SubTask |
? | Chain |
Storm的Tasks和Flink的Chain并不能等价,在storm中多个Task可以运行在一个Executor上,但这些Task指的是属于同一个component的不同实例,两个Task之间是是等价关系而不是上下游关系。Flink中一个Task中可以有多个Subtask , 在Flink中的Chain是指有上下游关系的且满足一定条件的多个Subtask可以成链的方式被当成一个Task处理。
Component间的通信实现
以Storm的WordCount为例,此例用Storm的底层API编写(相对Trident来说的底层),源码可以从Storm官方的Storm-starter模块获取。
Storm的编程模型由Spout,Bolt和Stream组成。在图中split和count为Bolt, 灰色箭头为stream。图中总共有三种不同的component, 其中spout的实现类为RandomSentenceSpout,负责随机地从一个字符串数组中选择句子。split的功能是对句子进行分词。count功能是计算单词的出现次数。功能上与Flink的WordCount例子大同小异。
为了图例简洁和简化模型,在图中这三种executor的并行度分别为2、2、1,和代码中并行度不一致。这里没有开启acker和metric功能, 因此本文没有画出__acker和__metric两种系统实现的bolt,这acker部分会在分布式事务的对比中分析。
Storm没有实现Flink那样的Chain功能,上下游component不会位于同一个线程中,因此Storm的上下游component通信只有两种方式,本地线程通信或远程线程通信。
Storm内部每个executor都会有一个接收队列和一个发送队列,一个工作线程和一个发送线程。每个worker内部都会有一个发送队列,一个接收线程和一个发送线程。Storm中的队列按职能分了三类:分别为executor接收队列,executor发送队列和worker发送队列。这三类队列的消费者以sleep()的方式不断轮询来接收消息,接收消息后的处理结果publish到下游队列。
有很多Storm的技术文章中画出了worker的接收线程和topology.receiver.buffer.size, 事实上在Storm1.0.x中 worker的接收线程已被移除,改为push的方式,在Storm server接收到消息后直接反序列化然后写到各个executor的receive-queue中。
Buffer的读写
Storm实现的生产者消费者模式使用到的缓存队列为LAMX Disruptor中的RingBuffer。LAMX Disruptor号称最快的无锁并发框架。在Storm的使用场景中,flush到RingBuffer时使用的等待策略为TimeoutBlockingWaitStrategy是通过ReentrantLock加锁阻塞的, 且flush到RingBuffer前也会通过锁来避免并发调用publishDirect(ArrayList<Object> objs, boolean block)方法。
Buffer写入
以Bolt->Bolt数据传输为例,Bolt中的tuple发送主要通过OutputCollector实现, 当一个bolt在execute()方法中调用了OutputCollector.emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) 后,它的调用栈如下图所示:
粉色部分为clojure实现,黄色部分是java实现。调用栈的最后调用了java实现的DisruptorQueue.publish(Object obj) 方法。DisruptorQueue是Storm对LAMX disruptor的封装,主要增加了批量发布和超时发布功能。由上一章节可知,tuple会被发布到一个名为executor$id-send-queue的DisruptorQueue中。
DisruptorQueue的发布的代码逻辑比较复杂,主要通过ThreadLocalInserter和Flusher分别实现Tuple的批量发布和超时发布。
批量发布部分主要实现如下:DisruptorQueue的公共方法publish(Object obj)中,先后调用 ThreadLocalInserter的add和flush方法
/**
变量解释:
batcher即为ThreadLocalInserter对象实例
**/
public void publish(Object obj) {
...
batcher.add(obj);
batcher.flush(false);
}
batcher.add(obj)方法的功能是把tuple放进当前批次的缓存中,如果当前批次_currentBatch满了,且当前没有发送失改批次(为了保证顺序性,未发送成功的批次需要先发送),会触发flush到disruptor的ringbuffer。但不保证flush成功,如果因为ringbuffer空间不足flush失败,会把失败的批次放进无界队列_overflow中缓存。注意:add方法不阻塞。
batcher.flush(boolean block)的功能是触发发送失败的批次flush到ringbuffer中。该方法还可能在定时调度的Flusher线程中被调用。注意:block==false时,flush方法不阻塞。
综上,RingBuffer中的发布单元为一个批次大小的tuple(而不是单个tuple),publish方法不会阻塞,_overflow是个无界非阻塞队列。因此,如果下游处理不及时且上游持续生产数据时,可能因为_overflow中缓存的对象过多而发生OOM。Storm提供了两种方式来避免这种情况,留在后续Storm和flink实现对比再讨论。
/**
变量解释:
_currentBatch为ArrayList<Object>对象实例, 用于缓存当前批次的tuple
_overflow为ConcurrentLinkedList<ArrayList<Object>>,无界队列,用于缓存发送失败的tuple batch
_inputBatchSize为当前批次的最大缓存tuple数
**/
public synchronized void add(Object obj) {
...
//如果当前批次已满
if (_currentBatch.size() >= _inputBatchSize) {
boolean flushed = false;
//如果当前批次已满且缓存中没有发送失败的批次
if (_overflow.isEmpty()) {
try {
//发布到disruptor的ringbuffer中,非阻塞,当ringbuffer空间不足时抛出InsufficientCapacityException
publishDirect(_currentBatch, false);
_overflowCount.addAndGet(0 - _currentBatch.size());
_currentBatch.clear();
flushed = true;
} catch (InsufficientCapacityException e) {
//Ignored we will flush later
}
}
//如果当前批次已满 且 (缓存中有发送失败的批次 或 当前批次发送失败)
if (!flushed) {
//把当前批次加入到未发送失败的缓存队列中
_overflow.add(_currentBatch);
_currentBatch = new ArrayList<Object>(_inputBatchSize);
}
}
}
//May be called by a background thread
public void flush(boolean block) {
if (block) {
_flushLock.lock();
} else if (!_flushLock.tryLock()) {
//Someone else if flushing so don't do anything
return;
}
try {
while (!_overflow.isEmpty()) {
publishDirect(_overflow.peek(), block);
_overflowCount.addAndGet(0 - _overflow.poll().size());
}
} catch (InsufficientCapacityException e) {
//Ignored we should not block
} finally {
_flushLock.unlock();
}
}
到这里,看过Flink通信机制的同学应该明白“Flink的反压机制实现得更天然”的说法了。
DisruptorQueue是底层实现,直接暴露给用户的发送数据到下游的接口是output collector。Storm output collector的实现相较Flink混乱,存在两个问题:
- Collector命名比较混乱,例如有的实现类叫XXCollectorImpl,有的又不带Impl后缀 , ISpoutOutputCollector和IOutputCollector是两个完全不同的接口,两者不在同一继承树中,分别实现Spout的数据发送接口和Bolt的数据发送接口。不通过关键字搜索比较难找出全部实现了“tuple发送”功能的代码。
- Storm的collector实现耦合了tuple的发送逻辑和tuple的ack fail逻辑,因为ack/fail逻辑不同而划分了两种主要的OutputCollector , 分别是负责发送Spout tuple的ISpoutOutputCollector、负责发送IRichBolt tuple的IOutputCollector、其它Collector基本上是通过委托模式基于这两个Collector实现的。例如带有自动自动ack/fail tuple功能的IBasicOutputCollector,这个类把tuple发送逻辑委托给OutputCollector,而java实现的OutputCollector最后会委托给由clojure代码executor.clj中实现的IOutputCollector匿名类。
Storm有两个批量处理框架,相关框架的实现类分别以Transactional和Trident开头,Transactional开头的批处理实现已经被标记为废弃,现主要维护Trident的实现。这两个API中提供给用户编程使用的ITridentSpout和ITransactionalSpout 最后都会在Bolt所在的executor中调用,所以批处理编程API中的Spout使用的Collector实际父类或委托类为IOutputCollector。
Buffer读取
RingBuffer的读取和处理逻辑通过com.lmax.disruptor.EventHandler接口实现,executor中的工作线程和发送线程以及worker中的发送线程都分别实现了该接口。以executor工作线程为例,executor工作线程读取event后转换为Tuple, 并调用IBolt.execute(Tuple tuple)接口触发用户实现的业务逻辑。
上图的逻辑在一个轮询间隔为0的无限循环中: 当队列空闲时,cpu空转。
(defnk consume-loop*
[^DisruptorQueue queue handler
:kill-fn (fn [error] (exit-process! 1 "Async loop died!"))]
(async-loop ;;定义一个循环
(fn [] (consume-batch-when-available queue handler) 0) ;;此处返回0, 代表sleep-time
:kill-fn kill-fn ;;接收到kill信号时执行的清理逻辑
:thread-name (.getName queue))) ;;线程名称
;; afn returns amount of time to sleep
(defnk async-loop [afn
:daemon false
:kill-fn (fn [error] (exit-process! 1 "Async loop died!"))
:priority Thread/NORM_PRIORITY
:factory? false
:start true
:thread-name nil]
...
(let [sleep-time (afn)]
(when-not (nil? sleep-time)
(sleep-secs sleep-time)
(recur)) ;;循环调用
)
...
Storm和Flink对比
- | Storm | Flink |
---|---|---|
队列 | Disruptor | ArrayDeque+synchronized |
队列有无锁 | 有锁,使用ReentrantLock | 有锁,使用synchronized |
队列有无等待 | 等待,默认Condiction.await(timeout) | 等待,使用wait/notify |
缓存 | 有缓存,用ArrayList和ConcurrentLinkedList | 有缓存,用自定义的MemorySegment和ArrayDeque |
缓存大小 | 可配置,默认100条,和message大小无关 | 可配置,默认最小32768 byte,和条数无关,是消息序列化后的大小(消息可以跨多个buffer) |
生产方式 | 多生产者 | 多生产者 |
消费方式 | 单消费者 | 单消费者 |
序列化 | 默认kryo | 自定义 |
序列化的时机 | 远程通信时 | 写入缓存时(因此本地线程通信也会序列化) |
队列数 | 每个工作线程一个接收队列(both spout and bolt),每 | 每个工作线程一个消费队列(source除外) |