Storm架构
Storm是一个分布式、可靠的实时计算系统。与Hadoop不同的是,它采用流式的消息处理方法,对于每条消息输入到系统中后就能被立即处理。适用于一些对实时性要求高的场景,比如广告点击在线统计、交易额实时统计等。
一些名词解释
Stream:Storm中被处理的数据流,一条消息称为一个元组。
Spout:Storm连接外部数据源的组件,可以认为Storm的数据源。
Bolt:数据处理组件,Bolt里面封装了处理数据的逻辑。Spout和Bolt是Storm中的两类组件,类似MapReduce中的Map和Reduce。比如可以在Bolt上定义过滤、聚合、join、写数据库等。
-
Stream Group:消息分组策略,定义了Bolt组件以何种方式接收数据。Storm内置了八种消息分组策略,我们也可以通过实现CustomStreamGrouping定义自己的消息分组策略。
- Shuffle grouping:随机分配消息给Bolt task,能够保证每个Bolt task都能分配到相同数据量的元组。
- Fileds grouping:根据字段进行划分,比如按“user-id”字段进行划分,那么相同“user-id”的值会被分配到一个Bolt task中。
- Partial grouping:类似Filed grouping,但是能够保证下游Bolt任务负载均衡。
- All grouping:将每条消息都广播给所有Bolt任务,也就是说每个Bolt处理的数据完全相同。需要小心使用。
- Global grouping:所有消息流数据全部发送到一个Bolt任务中。
- None grouping:不关心分组策略,相当于Shuffle grouping。
- Direct grouping:直接分组,上游指定哪个Bolt任务接受数据。
- Local or shuffle grouping:资源本地化的一种实现方式,如果任务都在同一个进程中,则会发送到该Bolt任务中。入果没有,相当于shuffle grouping。
Topology:由消息分组将Spout和Bolt连接起来的任务拓扑。相当于MapReduce中Map和Reduce组成的任务。
Worker:工作进程,运行在Supervisor节点上,一个Woker进程可以包含一个或多个Executor线程。每个Woker进程上会执行一组Task,比如Storm集群总共有50个Woker,如果Task总数为300,那么每个Worker上面需要执行6个Task。Worker执行topology的一个子集(不会出现一个Worker进程为多个Topology提供服务),一个Topology任务可能由多个Woker进程负责执行。
Executor:执行Spout或Bolt任务的线程,由Worker进程创建。每个Executor线程只会执行一个Topology中的一个Component的task实例(但不一定只执行一个task,可能执行多个task)。
Task:Storm中最小的处理单元,一个Executor中可以运行一个或多个Task。Topology中的Spout和Bolt可以设置并行度,一个并行度对应一个Task。
一个Topology任务启动后,组件(Spout或Bolt)的task数量就已经确定了(就是组件的并行度)。但是我们可以为该组件添加执行线程,也就是Executor(因为有可能一个Executor执行了多个task,为了提高执行效率,可以增加Executor线程。但是需要注意,一个Executor只会为同一个Component的task服务)。默认情况下,task数是等于Executor数的,即一个Executor执行一个task。
Storm架构
Storm集群有两类节点:运行Nimbus守护进程的主节点和运行Supervisor守护进程的工作节点。Nimbus节点用于分配代码、分配计算任务(分配给哪些Supervisor上的哪些Woker)和监控状态(用于故障检测、恢复)。Supervisor节点负责监听工作(监听Nimbus分配的任务)、启动并停止Woker进程。
Worker是运行在Supervisor上的进程,Supervisor收到Nimbus分配的任务后,负责启动Nimbus指定的Woker进程。Woker进程执行Topology的子集,一个Topology任务可能由运行在多台机器上的Worker进程组成。
Woker进程启动一个或多个Executor线程,Executor线程中可以有一个或多个Task。每个Executor都会启动一个消息循环线程,用于接受、处理和发送消息。
Nimbus和Supervisor之间协调工作也是由Zookeeper来完成的。
Nimbus和Supervisor都能快速失败恢复,而且它们都是无状态的,状态信息存储在Zookeeper(元数据)和本地中。当Nimbus或Supervisor挂掉后,可以重新启动并读取状态信息到集群中来正常运行,所以Storm系统具有很高的容错性。
在逻辑上将Storm中消息来源节点称为Spout,消息处理节点称为Bolt,它们通过流分组组成Topology。
Storm中元数据
为了更好的理解Storm的设计,我们可以通过Zookeeper中存储的元数据来理解Storm架构中各个节点之间的关系。
Storm在Zookeeper存储的消息都经都是以/storm开始,所有数据都存储在叶子节点上。下面说一个每个节点数据存储的具体含义:
- /storm/workerbeats/{topology-id}/node-port:拓扑任务所在的Woker进程信息,node和port是Woker所在的主机和端口。里面主要存储了Woker的运行状态和统计信息。Woker进程会定时上报心跳到该节点,Nimbus通过心跳信息来确认Woker进程的存活,对于死掉的Woker,Nimbus会重新调度。统计信息包括该Woker上所有Executor的统计信息,比如发送消息数,接受消息数等,这些信息会显示在UI中。一个Topology任务可能划分到多个Woker(node-port)上。
- /storm/storms/{topolog-id}:存储了拓扑任务的本身信息,比如名字、启动时间、运行状态、使用Woker数、每个组件的并行度等。这个信息在任务注册后,就不会在发生改变。该节点信息,可以帮助Nimbus进行资源分配,因为能够知道哪些Supervisor上面有哪些任务。
- /storm/assignments/{topology-id}:Nimbus为每个Topology任务的分配信息,比如Topology在该Nimbus中的存储目录、被分配给哪些Supervisor、Woker、Executor上。
- /storm/supervisors/{supervisor-id>}:Supervisor节点的注册信息,存储了该节点自身一些统计信息,比如使用了哪些端口等。该Znode是临时节点,当Supervisor下线后,这些信息会自动删除,Nimbus感知到该节点下线后会重新为该Supervisor下的任务分配节点。
- /storm/errors/{topology-id}/{component-id}/{equential-id}:存储运行中每个组件的错误信息,每个组件只会保留最近十条错误信息。
下面一张图讲述了节点直接如何创建、使用这些元数据:
Nimbus、Supervisor、Woker两两之间都需要维持心跳。Nimbus通过/storm/superviors/节点能够知道哪些Supervisor存活。Nimbus通过/storm/workerbeats/节点能够知道哪些Woker存活。Supervisor和Woker通过本地文件维持心跳,他们虽然是两个进程无法直接通信,为什么通过文件维持心跳呢,应该有很多网络通信框架可以吧。
Storm源码
Storm基于Clojure和Java编写的。其中Clojure实现了Nimbus、Supervisor、Woker、Executor以及Task这些基础服务。Java实现了Storm的流处理(组件实现)以及事务Topology。
Storm中还有一些Trident代码,Trident是Storm实时消息处理的更高层抽象。
Storm集群搭建
环境准备
1、Java安装
Storm1.x在Java7和Java8中完成了测试,所以Java版本最好是1.7+。
2、Python安装
Storm1.x在Python2.6.6中完成了测试,理论上3.x也能够运行,但是官方并没有测试。
3、Zookeeper安装
Storm使用Zookeeper进行协调服务,所以需要准备Zookeeper环境,版本官方上面没有指定版本。
Storm安装
下载Storm jar包
http://storm.apache.org/downloads.html
Storm文件配置
Storm配置文件在${STORM_HOME}/conf/storm.yaml中,下面是一些必须的配置项:
#zookeeper集群
storm.zookeeper.servers:
- "192.168.0.1"
- "192.168.0.2"
- "192.168.0.3"
#zookeeper端口
storm.zookeeper.port: 2181
#可作为nimbus的候选主机
nimbus.seeds: ["192.168.0.1"]
#storm数据存储目录,用于存储少量状态信息,比如jar、conf等
storm.local.dir: "/opt/yjz/storm/data"
#suppervisor可以作为woker进程启动的端口,表明该Supervisor最多可以启动四个Worker进程
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
更多配置可以查看https://github.com/apache/storm/blob/v1.2.2/conf/defaults.yaml。随着深入了解,可以优化集群配置。
节点启动
nohub bin/storm nimbus &
nohub bin/storm ui &
nonub bin/storm supervisor &
查看nimbus节点,发现nimbus进程已经启动,core为ui进程。
jps
8134 nimbus
8713 core
10123 Jps
19710 QuorumPeerMain
查看supervisor节点,发现supervisor进程已经启动。
jps
19237 QuorumPeerMain
7610 Supervisor
8975 Jps
查看UI,192.168.0.1:8080。
Storm编程
编程模型
我们上面说过Storm提供了两类组件:Spout和Bolt。所以我们使用Storm编程的对象主要也就是针对Spout和Bolt。
Spout用于定义接受外部数据源数据,并将其转换成Storm内部数据,然后将这些数据发送给Bolt。
Bolt组件定义了数据处理逻辑,也就是我们的业务处理逻辑,比如过滤、聚合、Join等。Bolt组件也可以用于写入外部介质,比如写入Mysql、Redis等。
组件之间传输的数据在Storm中称为元组(Tuple,可以理解为一行数据),Tuple是Storm数据传输的基本单元。Tuple中定义了字段(Field,可以理解一行数据中一个字段),这些Field是有Schema的,Filed可以是byte、short、integer、long、float、double、boolean、string和byte array,除了这些基础类型,我们还可以自定义数据类型(需要自己实现针对自定义类型的序列化)。
Storm 组件
在编写Storm作业时我们会针对Spout和Bolt进行编程实现,下面我们分别看一下Storm为我们提供的组件接口。
Spout组件
Spout是Storm中Topology的生产者,负责读取外部数据,并转换成Tuple。Spout可以设置处理的消息类型为可靠或不可靠。对可靠的消息,Spout会缓存发出去的消息,当该消息在topology中处理失败时,Spout可以重新发送该消息。对于不可靠的消息,Spout一旦发送出该消息后就会将该消息扔掉,所以如果该消息处理失败,那么该消息就会丢失。
Spout可以发射多条消息流Stream,通过使用OutputFieldsDeclarer.declareStream()
方法来定义多个Stream,然后使用SpoutOutputCollector.emit()
方法在发射消息时指定Stream。
Spout中最重要的方法nextTuple()
,将外部数据源数据以tuple形式发送到Topology中的Bolt组件进行处理。需要注意nextTuple()不能阻塞,因为Storm是在一个线程内调用Spout的所有方法。
Spout对于可靠类型消息,还有两个比较重要的方法就是ack和fail。Storm在检测一个tuple被整个Topology执行成功的时候会回调ack,否则调用fail。
下面是Spout组件在Storm中的实现,我们分别来看下这些组件都是什么。
IComponent接口
IComponent接口是所有组件的顶级接口,IComponent中定义了所有组件可能需要用到的方法(其实就是定义了Spout和Bolt组件都用得到的方法)。
- declareOutputFields方法用于声明该拓扑所有流(Stream)的输出模式(Schema),比如声明输出流ID、输出字段(Field)以及输出流是否为直接流。
- getComponentConfiguration方法用于声明该组件的配置,但是它只能覆盖以"topology.*"配置为开头的子集。我们也可以通过TopologyBuilder构建拓扑时,对组件进行进一步配置覆盖。
ISpout接口
ISpout是Spout组件的定义的顶级接口,它定义了Spout组件支持的方法。Storm会在相应的阶段调用ISpout接口中特定的方法,比如启动拓扑时会首先调用open()方法,正常停止拓扑时会调用close()方法(kill -9不会被调用)。
- open方法会在任务在集群工作进程初始化时调用,用于提供Spout执行所需要的环境。比如读取外部数据源的一些初始化配置可以写在这里。Map类型的conf参数是这个Spout的配置,它包含了拓扑与集群配置的合并集。TopologyContext类型的context是该拓扑的上下文,包括拓扑id、组件id、输入输出信息等。SpoutOutputCollector类型的collector参数是Spout的收集器,用于发射tuple。
- close方法当一个ISpout关闭时会被调用,但是并不能保证一定被调用,比如Supervisor被kill -9强制杀死的时候。
- active方法是Spout从失效模式到激活状态时被调用。Spout可以处于失效状态或激活状态,处于失效状态的Spout不会调用nextTuple方法。从失效状态到激活状态调用nextTuple方法前,会调用active方法。
- deactive方法是当Spout失效时被调用,失效状态的Spout不会调用nextTuple方法。
- nextTuple方法是Spout组件最重要的方法,nextTuple用于读取外部数据源数据转换为tuple,并且通过SpoutOutputCollector收集器发射tuple。由于nextTuple、ack、fail方法都是在一个线程内被调用,所以nextTuple方法不应该有阻塞代码。
- ack方法是当Storm确定从该Spout发射出去标识符为msgId的消息被topology完整处理完成时,会调用ack方法,我们可以在这里实现一些逻辑,比如从我们的数据源队列中移除该消息。
- fail方法和ack方法相反,当msgId消息在topology中被处理失败时会被调用。
Spout的可靠性消息类型不需要我们通过fail方法实现,Storm会自动实现。ack和fail方法只是给我们获取消息被处理成功与否的接口。
IRichSpout接口
IRichSpout接口继承了ISpout和IComponent接口,它是我们实现Spout组件的主要接口。它本身没有定义任何方法,所以我们实现Spout组件时候只需要实现ISpout和IComponent接口的方法。
BaseComponent抽象类
BaseComponent抽象类实现实现了IComponent组件的getComponentConfiguration方法,并且返回为空。
它主要的作用就是对于一些Spout组件并不需要进行覆盖配置,这时候通过继承BaseComponent抽象类就不需要实现该方法了。
BaseXxx在Storm组件中,一般都是指一些基础实现。目的就是为了避免我们写代码时候去实现一些我们用不到的方法(我们一般都是置为空)。
BaseRichSpout抽象类
BaseRichSpout继承了IRichSpout接口和BaseComponent接口,它实现了一些我们在实际编写Spout组件时可能用不到的方法。比如close、active、deactive、ack、fail方法(方法体都为空)。这样当我们通过继承BaseRichSpout抽象类来实现Spout组件时候,这些方法我们都可以不需要实现了。
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
}
总结
当我们编写Spout组件时,如果想要实现Spout给我们提供的所有方法,可以直接实现接口IRichSpout接口。如果我们只需要实现Spout组件所必须的方法,可以直接继承BaseRichSpout抽象类。
Bolt组件接口
所有的消息处理逻辑被封装到Bolt中,比如可以用来做过滤、聚合、查询数据库、写入数据等。
Bolt也可以发送多条消息流Stream,使用OutputFieldsDeclarer.declareStream()
方法定义Stream,然后使用OutputCollector.emit()
方法在发射消息时,指定Stream。
Bolt最重要的方法时execute(),它以接受一个tuple,经过逻辑处理后,使用OutputCollector.emit()
发射出0个或多个tuple。Bolt还需要为每个tuple调用ack方法,来通知Storm这个消息被该Bolt task执行完成,从而通知这个tuple的发射者Spout(调用Spout的ack或fail)。
Bolt组件定义的逻辑和Spout组件的类似,下面是Bolt组件的实现方式。
IBolt接口
IBolt接口是Bolt组件的顶级接口,它定义了Bolt组件所需要的方法。IBolt的设计原则是以一个元组作为输入,通过逻辑处理生成零个或多个元组输出。
- prepare方法在拓扑作业在工作进程初始化时调用。和ISpout中的open方法一样,做一些组件初始化的工作。prepare方法同样提供了三个stormConf、context和collector,用途和ISpout中的open方法一样。需要注意Bolt中的collector的类型是OutputCollector。
- execute方法是Bolt组件最重要的方法,接收上游发送的元组,执行业务逻辑,然后通过OutputCollector来向下游发射元组。在执行完execute方法后,我们应该调用ack或fail方法来通知Storm该消息已经被处理(否则Storm一直会等到消息处理超时,才认为该消息处理失败)。
- cleanup方法当IBolt被即将关闭时调用,和ISpout的close方法一样,不保证一定被执行到。
execute中的tuple可以不立即执行,可以等待其它元组到来一起执行。比如做聚合、join等操作。
IRichBolt接口
IRichBolt接口实现了IBolt和IComponent接口,它的作用和ISpout接口一样,这里就不在复述了。
BaseRichBolt抽象方法
BaseRichBolt抽象类和BaseRichSpout抽象方法一样,对一些我们可能用不到的方法,提供默认实现。
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
@Override
public void cleanup() {
}
}
IBasicBolt、BaseBasicBolt
使用IBasicBolt和BaseBasicBolt类来实现Bolt组件,我们可以不需要手动ack,IBasicBolt的execute方法会自动执行Acking机制(仔细看它使用的是BasicOutputCollector在emit后,执行ack方法)。如果我们希望该元组失败,可以显示抛出一个FailedException
异常。
IBasicBolt接口方法和IRichBolt具有相同的方法,只是prepare中没有传递OutputCollector收集器了,而是在execute方法中直接传递了BasicOutputCollector。所以如果我们不需要对该Bolt组件添加配置和获取拓扑上下文对象,可以直接实现BaseBasicBolt抽象类,因为该类提供了prepare和cleanup的默认实现,我们只需要实现execute方法即可。
总结
如果我们不想要手动调用ack,可以继承IBasicBolt或BaseBasickBolt来实现Bolt组件。当然,如果想要显示灵活调用,可以通过继承IRichBolt或BaseRichBolt来实现Bolt组件。
Stream Groupings
我们上面说过Topology是通过Stream Grouping将Spout和Bolt组合而成的。无论Bolt还是Spout我们都可以为其设置并行度,并行度对应着task,而Stream Grouping定义了该Bolt中的所有task以什么形式来接受数据流。Stream Grouping支持的八种流分组方式在上面已经说过了,这里就不说了。
Topology配置
当我编写完Spout和Bolt组件后,需要提供一个主类来设置拓扑。这个主类也是Storm执行Topology任务的入口类。
并行度
在说Topology配置前,我们先理解一下Storm中的并行度。
在Storm中运行Topology任务主要依赖下面三个实体:
- 工作进程(Worker processes)
- 执行线程(Executor)
- 任务(Task)
一个工作进程运行一个Topology的子集,并且每个工作进程只属于一个Topology任务(不会存在一个Woker服务多个Topology),一个Topology任务可能由多台机器的多个Woker组成。
Woker进程中可以启动多个Executor线程,每个Executor线程运行一个或多个Task,但是这些task必须是同一Component中的。也就是每个Executor只能服务于一个Component。
Task是具体执行数据处理的,我们实现的Bolt或Spout组件中每个组件可以启动一个或多个task来执行,以达到提高处理效率的目的。Task数在Topology启动后就不能在改变了,但是我们可以修改执行Task的Executor线程数,来动态调整为该拓扑分配的资源。默认情况下,每个Executor会对应一个task。
配置实例
我们首先需要使用TopologyBuilder来配置拓扑关系,在设置过程中可以添加配置、设置组件执行Executor数量、设置组件task数以及设置Stream Grouping。
public static void main(String[] args) throws Exception{
//设置Component(bolt和spout)之间的拓扑结构
TopologyBuilder builder = new TopologyBuilder();
//添加spout组件
builder.setSpout("word-reader",new WordReaderSpout()).addConfiguration("topology.debug",true);
//添加bolt组件,并设置组件所需task数,这里没有设置Executor数量,默认会为每个task启动一个Executor线程。这里还设置了以shuffleGrouping分组方式接收上游word-reader组件发送的消息
builder.setBolt("word-normalizer",new WordNormalizerBolt()).setNumTasks(2).shuffleGrouping("word-reader");
//设置bolt组件,并且设置了启动Executor数,这里没有设置task数量,默认会为么给Executor分配一个task。这里还设置了以shuffleGrouping分组方式接收上游word-normailizer组件发送的消息
builder.setBolt("word-counter",new WordCounterBolt()).shuffleGrouping("word-normalizer",3);
//运行时与集群配置合并,并通过prepare或open方法发送给所有组件节点
Config conf = new Config();
//设置运行该拓扑需要几个Worker进程,如果没有设置默认为1个。
conf.setNumWorkers(2);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS,10);
conf.put("word-file",args[0]);
conf.setDebug(false);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING,1);
//使用集群方式提交拓扑执行
StormSubmitter.submitTopology("storm-hello-word",conf,builder.createTopology());
}
如果集群中Woker数量被用完(storm.yaml中设置的supervisor.slots.ports),在提交新Topology时会失败。需要等待运行中的Topology释放资源后才可以执行。
更新运行中的Topology的Executor
Storm提供了对运行中Topology任务的Executor热更新,有两种方式可以进行更新。
- 使用Storm web UI中rebalance更新。
- 是用Storm命令行工具CLI进行更新。
比如将myTopology任务改为5个工作进程,组件blue-spout使用3个Executor,组件yello-bolt使用5个bolt。
storm rebalance myTopology -n 5 -e blue-spout=3 -e yello-bolt=5
Storm配置文件
Storm有许多各种各样的配置,有些是系统配置不能通过Topology任务进行修改,而有些配置是支持在Topology任务中进行修改。
默认Storm中的所有配置都在Storm代码库中的default.yaml中,我们可以通过在Nimbus节点或Supervisor节点中的storm.yaml进行覆盖。除了通过storm.yaml进行覆盖修改配置外,我们还可以通过StormSubmitter构建拓扑时来修改配置(传入的Config对象),但是这里只能修改以TOPOLOGY为前缀的配置项。
我们也可以通过Java API修改配置,有两种方式:
- 内部修改:覆盖Spout或Bolt的getComponentConfiguration方法来修改配置。
- 外部修改:在TopologyBuilder中的setSpout或setBolt方法返回的对象调用addConfiguration来覆盖配置。
这些配置的优先级为:
default.yaml < storm.yaml < 配置拓扑添加配置项 < 组件内部添加配置项 < 组件外部添加配置项
Maven开发配置
使用Maven开发Storm作业,首先就需要配置pom.xml文件,包括Jar包引入、打包配置等。
Storm目前版本是1.2.2,Jar包引入:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
这里之所有使用provided模式是因为,Storm会自动从工作节点下载Storm Jar包。
当我们编写完Storm作业后,需要将相关依赖打到一个Jar包内,可以使用assembly插件进行打包。
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.path.to.main.Class</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
其中mainClass就是我们作业的启动主类。
本地模式
Storm为了方便开发测试,提供了本地模式运行Storm Topology,本地模式模拟了集群模式下作业的执行,所以我们在开发调试过程中可以使用本地模式测试开发的Storm作业。
本地模式将Spout和Bolt都运行在一个进程上的多个线程执行,来模拟真实的集群运行情况。对于一些耗时操作,会采用Thread.sleep()方法模拟,所以有时会导致运行速度缓慢。
//只需要创建LocalCluster对象类就可以使用本地模式
LocalCluster localCluster = new LocalCluster();
//提交作业
localCluster.submitTopology();
//停止作业
localCluster.killTopology();
//关闭本地集群模式
localCluster.shutdown();
Local模式提供了一下配置型:
Config.TOPOLOGY_MAX_TASK_PARALLELISM:设置Topology的最大并行度。
Config.TOPOLOGY_DEBUG:开启DEBUG模式,方便作业调试。
Storm核心机制原理
Storm消息可靠性处理
Storm提供了几种不同级别的消息可靠性处理:
- 尽力处理(best effort)
- 至少一次被处理(at least once)
- 恰好处理一次(exactly once,需要借助Trident)
尽力处理属于最低级别保证机制,我们可以不添加任何额外操作,Storm就能帮我们达到。
至少一次被处理(at least once)
Spout发出的消息可能会产生成千上万条消息(经过各种Bolt task处理后就会分散出一条消息),这些消息会组成一颗消息树,其中Spout发出的消息为消息根,Storm会跟踪整棵树的处理情况,如果这颗树中的任一消息处理失败,或者整棵树在规定时间没有被处理完(通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
配置,默认为30s),那么Storm就认为Spout发出的这些消息处理失败了,Spout会重新发送该条消息。
判断一个tuple tree是否被处理完成,有以下两个条件:
- tuple tree不在生长。
- tuple tree中的任何消息都被处理。
使用Storm API来实现
如果想要使用Storm提供的可靠性处理,我们需要做两件事:
- 无论何时,只要在tuple tree中创建了一个新节点,就需要告知Storm。
- 当处理完一个消息后,需要告知Storm中对应tuple tree。
通过上面两个步骤,Storm就可以检测一个tuple tree是否被处理完成,并且会调用消息产生对应Spout的ack和fail方法。
当为tuple tree中指定的节点增加一个新节点时,称为锚定(anchoring)。锚定是在发送消息的同时进行的,具体锚定方式为:把输入消息作为emit方法的第一个参数。这样就告知tuple tree,该节点产生了新节点,只有当新节点也完成时tuple tree才算执行完成。
public class SplitSentence extends BaseRichBolt {
...
public void execute(Tuple tuple) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
_collector.emit(tuple, new Values(word));
}
_collector.ack(tuple);
}
...
Storm支持一个输出消息被锚定在一个或多个输入消息上,比如join、聚合等场景。一个被多重锚定的消息处理失败,会导致与之关联的多个Spout重新发送消息。
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
多重锚定会将锚定的消息添加到多棵tuple tree上,并且有可能打破树结构,从而形成一个DAG图。
如果没有锚定,也就是没有在emit方法的第一个参数指定输入tuple。那么这个节点所产生的子树失败,spout不会重新发送消息。该节点ack完成后,tuple tree就认为被处理完成了。有些场景非常适合这种不需要锚定的消息。
完成锚定后,我们还需要在消息被处理完成后告知tuple tree。我们必须在每个execute方法的后面显示调用OutputCollector的ack或fail方法,来表明该消息在该bolt是否被处理完成(否则会一直等到超时)。
显示调用ack或fail,是除了快速告知tuple tree消息是否被处理完成外,还有一个原因就是防止内存被打满。因为Storm使用内存来跟踪每个元组是否被处理完成,所以如果不调用ack或fail,很容易将内存打满。
Storm提供了IBasicBolt和BaseBasicBolt接口来隐式调用ACK机制,也就是说我们如果使用它们实现Bolt组件,就不需要手动锚定和调用ack/fail方法了。
public class SplitSentence extends BaseBasicBolt {
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
针对于多重锚定,IBasicBolt和BaseBasicBolt是无法处理的。需要我们显示完成,也就是需要实现IRichBolt或BaseRichBolt定义Bolt组件。
acker框架
Storm使用Acker框架来跟踪消息是否被成功处理。Acker是Storm中一组特殊的任务,用于跟踪每个Spout发送tuple的DAG。当acker发现DAG中节点都完全被处理完成后,它会向创建该tuple的Spout发送一条消息(成功或失败)。
我们可以使用Config.TOPOLOGY_ACKERS
在拓扑配置中设置Acker数量,默认情况下每个Woker进程会启动一个Acker任务。
当在Topology中创建一个新的元组时,会为每个元组分配一个64bit的随机id(无论spout还是bolt组件)。Acker使用这些id来跟踪tuple tree。每个tuple被创建后tuple tree中的根id都会被复制到这个消息中,当这个消息处理完成后,它会根据根id来找到跟踪这棵树的Acker,并向该Acker发送状态变更信息,比如:该元组已经处理完成,又产生了新元组,需要你跟踪下。
这里有个点需要考虑下,就是每个元组如何知道自己的tuple tree对应着哪个Acker。Storm使用一种哈希算法根据Spout tupel id来确定那个Acker负责该tuple tree,而每个消息都知道根id,因此就知道与哪个Acker通信了。
我们知道了Acker与Spout tuple对应关系,知道了每个tuple tree 元组如何找到对应的Acker与其通信。接下来还需要考虑一点,Acker如何跟踪tuple tree。
我们知道每个tuple tree都有可能有成千上万个节点,如果跟踪每个节点,那么内存很容易就被打满了。Storm采用了一个不同的跟踪策略,每个Spout元组只需要固定数据量的空间(大约20字节),就可以跟踪tuple tree。这种跟踪算法是Storm能够正常工作的关键,也是其重大突破之一。
我们看下Storm是如何做的。Acker为每个Spout元组存储一个消息ID(随机分配的那个ID)到一对值的映射,这对值的第一个元素就是Spout任务ID,第二个元素是64bit数字,称为“ack val”,它是tuple tree中所有消息id的异或结果。ack val代表了整棵树的状态,当这个ack val为0时就代表整棵树已经被处理完成了。
它的异或原理就是,当我们无论创建一个节点还是完成一个节点都使用消息ID来与之异或,这样同一个消息ID一来一回异或结果就为0了。
如上图,ack val最终值为T1T2T3T4T5T1T2T3T4^T5=0。
选择合适的可靠性
Acker任务是轻量级的,所以拓扑中不需要很多Acker任务,我们可以通过Storm UI来查看Acker吞吐量,如果吞吐量很差,可以适当添加Acker任务。
如果我们认为消息可靠性不是必要的(处理失败情况下丢失消息没有关系),我们可以关闭消息可靠性。这样拓扑性能也会提升,因为不跟踪元组树,传输的消息会减半(因为元组树中每个元组都需要发送一条确认信息)。并且每个下游元组中保留更少的数据(不需要存储根ID),从而减少带宽使用。
关闭消息可靠性的方式:
- 将Config.TOPOLOGY_ACKERS设置为0,这样Spout发送元组后,它的ack方法会被立即调用。
- 在Spout中使用SpoutOutputCollector.emit发送消息时不指定消息ID。这样可以对一些特定消息关闭消息可靠性。
- 如果不在意某个消息派生出子消息的可靠性,那么在对应的botl组件中可以不进行锚定(不指定输入tuple)。
学习资料
- Storm官网(http://storm.apache.org/releases/1.2.2/index.html),看了许多Storm书籍,大部分都是直接翻译的Storm官方文档。
- 《从零开始学Storm》对Storm讲解的非常全面,由浅入深,覆盖面比较广。
- 《Storm源码分析》对Storm从源码级别进行了讲解,有些地方原理讲的还是不错的,就是该书讲解的Storm版本比较低。