1.api介绍
生成Topology
Map conf = new HashMp();
//topology所有自定义的配置均放入这个Map
TopologyBuilder builder = new TopologyBuilder();
//创建topology的生成器
int spoutParal = get("spout.parallel", 1);
//获取spout的并发设置
SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
new SequenceSpout(), spoutParal);
//创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格
int boltParal = get("bolt.parallel", 1);
//获取bolt的并发设置
BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
//创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,
//shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME),
//表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,
//即每个spout随机轮询发送tuple到下一级bolt中
int ackerParal = get("acker.parallel", 1);
Config.setNumAckers(conf, ackerParal);
//设置表示acker的并发数
int workerNum = get("worker.num", 10);
conf.put(Config.TOPOLOGY_WORKERS, workerNum);
//表示整个topology将使用几个worker
conf.put(Config.STORM_CLUSTER_MODE, "distributed");
//设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行
StormSubmitter.submitTopology(streamName, conf,
builder.createTopology());
//提交topology
IRichSpout
IRichSpout 为最简单的Spout接口
IRichSpout{
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
}
@Override
public void close() {
}
@Override
public void activate() {
}
@Override
public void deactivate() {
}
@Override
public void nextTuple() {
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
其中注意:
=>spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
=>spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个=>task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
=>open是当task起来后执行的初始化动作
=>close是当task被shutdown后执行的动作
=>activate 是当task被激活时,触发的动作
=>deactivate 是task被deactive时,触发的动作
=>nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
=>ack, 当spout收到一条ack消息时,触发的动作,详情可以参考 ack机制
=>fail, 当spout收到一条fail消息时,触发的动作,详情可以参考 ack机制
=>declareOutputFields, 定义spout发送数据,每个字段的含义
=>getComponentConfiguration 获取本spout的component 配置
Bolt
IRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
其中注意:
=>bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
=>bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
=>prepare是当task起来后执行的初始化动作
=>cleanup是当task被shutdown后执行的动作
=>execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考 ack机制 ** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考 ack机制
=>declareOutputFields, 定义bolt发送数据,每个字段的含义
=>getComponentConfiguration 获取本bolt的component 配置
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client</artifactId>
<version>0.9.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.jstorm</groupId>
<artifactId>jstorm-client-extension</artifactId>
<version>0.9.3.1</version>
<scope>provided</scope>
</dependency>
打包
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>storm.starter.SequenceTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
提交jar
xxxx.jar 为打包后的jar
com.alibaba.xxxx.xx 为入口类,即提交任务的类
parameter即为提交参数
jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter
2.Ack原理
Storm中有个特殊的task名叫acker,他们负责跟踪spout发出的每一个Tuple的Tuple树(因为一个tuple通过spout发出了,经过每一个bolt处理后,会生成一个新的tuple发送出去)。当acker(框架自启动的task)发现一个Tuple树已经处理完成了,它会发送一个消息给产生这个Tuple的那个task。Acker的跟踪算法是Storm的主要突破之一,对任意大的一个Tuple树,它只需要恒定的20字节就可以进行跟踪。
Acker跟踪算法的原理:acker对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0,然后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设每个发射出去的Tuple都被ack了,那么最后ack-val的值就一定是0。Acker就根据ack-val是否为0来判断是否完全处理,如果为0则认为已完全处理。
要实现ack机制:
1,spout发射tuple的时候指定messageId
2,spout要重写BaseRichSpout的fail和ack方法
3,spout对发射的tuple进行缓存(否则spout的fail方法收到acker发来的messsageId,spout也无法获取到发送失败的数据进行重发),看看系统提供的接口,只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,用户如何取得原来的msg貌似需要自己cache,然后用这个msgId去查询,太坑爹了3,spout根据messageId对于ack的tuple则从缓存队列中删除,对于fail的tuple可以选择重发。
4,设置acker数至少大于0;Config.setNumAckers(conf, ackerParal);
阿里自己的Jstorm会提供
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
这样更合理一些, 可以直接取得系统cache的msg values
ack机制即,spout发送的每一条消息,在规定的时间内,spout收到Acker的ack响应,即认为该tuple 被后续bolt成功处理
在规定的时间内(默认是30秒),没有收到Acker的ack响应tuple,就触发fail动作,即认为该tuple处理失败,timeout时间可以通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS来设定。
l或者收到Acker发送的fail响应tuple,也认为失败,触发fail动作
注意,我开始以为如果继承BaseBasicBolt那么程序抛出异常,也会让spout进行重发,但是我错了,程序直接异常停止了
这里我以分布式程序入门案例worldcount为例子吧。
问题:
有没有想过,如果该tuple的众多子tuple中,某一个子tuple处理
failed了,但是另外的子tuple仍然会继续执行,如果子tuple都是执
行数据存储操作,那么就算整个消息失败,那些生成的子tuple还
是会成功执行而不会回滚的。
(1)关于Storm如何处理重复的tuple问题
有人问到Storm 是怎么处理重复的tuple?
因为Storm 要保证tuple 的可靠处理,当tuple 处理失败或者超时的时候,spout 会fail并重新发送该tuple,那么就会有tuple 重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。不过也有一些可行的策略:
(1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后
续的批处理计算会更正实时计算的误差。
(2)使用第三方集中存储来过滤,比如利用MySQL、MemCached 或者Redis 根据逻辑主键来去重。
(3)使用bloom filter 做过滤,简单高效。
(2)关于Storm的ack和fail问题
在学习storm的过程中,有不少人对storm的Spout组件中的ack及fail相关的问题存在困惑,这里做一个简要的概述。
Storm保证每一个数据都得到有效处理,这是如何保证的呢?正是ack及fail机制确保数据都得到处理的保证,但是storm只是提供给我们一个接口,而具体的方法得由我们自己来实现。例如在spout下一个拓扑节点的bolt上,我们定义某种情况下为数据处理失败,则调用fail,则我们可以在fail方法中进行数据重发,这样就保证了数据都得到了处理。其实,通过读storm的源码,里面有讲到,有些类(BaseBasicBolt?)是会自动调用ack和fail的,不需要我们程序员去ack和fail,但是其他Bolt就没有这种功能了。