转述:Guaranteeing Message Processing
storm保证从spout发出的消息被完整处理,本文解释storm是怎么做到的,作为使用者应该注意些什么。
什么叫消息完整处理
消息在storm里是由tuple表示的,从spouts发出的一个tuple经过bolts处理会产生多个对应的tuple,这种关系构成一个树。消息完整处理指的就是由这个消息触发整个topo路径上所有节点都被遍历处理完。如果树🌲上某个节点在超时时间内未完成处理就认为这个消息处理失败或者说位完整处理。此处说的超时时间通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS设定,默认30s
消息处理的过程
先来看看storm spouts的接口
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
storm通过调用nextTuple来取一条消息(一个tuple), 在此之前spout通过open调用准备好消息流,从接口定义中可以看到是一个collector,并且每一个tuple带一个message id用来跟踪这条消息。之后storm将tuples发给bolts消费并全程跟踪由这个tuple触发的topo树处理情况,如果处理完成会调用ack, 反之超时未处理完则调用fail
如何使用storm api达到消息可靠处理
使用者需要做两件事来配合storm完成消息可靠处理
a. 告知storm topo树的上下节点关系,在emit消息时第一个参数设定为input tuple即可,这样storm在知道消息处理失败时可以沿路replay,这个步骤也叫做下锚anchoring
b. 调用ack或fail告诉storm一条消息已处理完毕,调用fail比等待timeout更快使storm知道要去replay
有些场景需要下多个锚,很简单在emit调用中第一个参数设定为input tuple的list就行了,此时的topo就不再是树结构而是有向无环图了。
storm提供了BaseBasicBolt抽象类来帮助简化使用者完成以上两步工作,使用者只需继承这个类就省了很多事了。但需要注意的是,对应聚类和交这种操作需要等待操作完毕才能ack tuple的场景BaseBasicBolt并不适用。
此处可以看出storm保证消息处理至少一次,如果要达到消息仅处理一次则需要适用事务性topo
storm如何实现消息可靠处理
一个topo会有几个acker任务来跟踪topo的运行状况,Config.TOPOLOGY_ACKER_EXECUTOR设定acker的数量,acker接收spouts/bolts发来的ids并进行异或运算,当结果为0时认为处理完毕,否则replay消息
不需要可靠处理该怎么做
有些应用消息丢失并不是什么大事,那么你可以有三种方式去掉ack的步骤来提升性能
a. 将Config.TOPOLOGY_ACKERS设定为0,storm会在emit消息后立即调用ack
b. 在SpoutOutputCollector.emit方法中去掉message id来关闭tracking
c. unanchor tuples