这里使用[wordcount]程序来进行分析,其中主类如下
public class WordCountTopologyDriver {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//1、创建topologyBuilder,设置spout和bolt
TopologyBuilder topologyBuilder = new TopologyBuilder();
//设置spout 传参:id,使用的Spout类,并发度
topologyBuilder.setSpout("myspout",new MySpout(),2);
//设置Bolt 传参:id,使用的Bolt类,并发度
//设置分组策略 随机分 参数为spout的id
//mybolt1与myspout跟进id进行连接,怎么连接?取决于分组策略,shuffleGrouping会对myspout进行分组
//五个task(也就是五个executor或者说五个线程)
topologyBuilder.setBolt("mybolt1",new SplitBolt(),2).shuffleGrouping("myspout");
//设置分组策略 按字段分 参数为上一阶段的bolt的id
//注:如果字段与mybolt里面声明的不一致会出现backtype.storm.generated.InvalidTopologyException: null
topologyBuilder.setBolt("mybolt2",new CountBolt(),4).fieldsGrouping("mybolt1",new Fields("word"));
//2、创建Config,指定分配的worker的数量
Config config = new Config();
config.setNumWorkers(2);
//提交任务,可以使用storm集群来提交也可以使用本地模式来提交(便于调试)
// StormSubmitter.submitTopology("wordcountsubmit",config,topologyBuilder.createTopology());
//使用本地模式提交
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordcountsubmit",config,topologyBuilder.createTopology());
}
}
设置了myspout的并发度为2,mybolt1的并发度为4,mybolt2的并发度为2,worker的数量为2。
分析图
流程分析:
这里有4台物理机,storm1-4分别对应nimbus和3个supervisor。
1、当客户端提交任务时,nimbus接收到任务,开始做任务分配,它会找到空闲的supervisor比如找的是storm2和storm3两台机器,并进行任务分配;
2、根据前面的介绍,这里一个需要8个task,而有2个worker,那么每个worker就分配了4个task。worker1和worker2各分配一个mySpout和splitBolt,从自己跑的wordcount程序能看到多个task在worker上分配一般是轮询的,所以可以认为worker1上分配了myBolt2的0和2task,worker2上分配了myBolt2的1和3task(或者反过来)。
3、任务分配好了选择了对应的端口(worker),找出了Supervisor。那么将通过zookeeper将任务分发下去,让2个Supervisor开始跑程序。
4、两个mySpout获取数据源,将数据随机发送到自己worker上的bolt1和另外一台的worker上的bolt(因为设置了随机发射数据shuffleGrouping)。
5、两台机器上的bolt1接着会向bolt2发送数据,发送规则为为指定字段(单词)hash取模比bolt2的数量(这样保证相同的单词肯定会发送到某个bolt2中,而不会很多bolt2中有相同的单词,如果是这样某个单词的统计就不完整了最后还需要统一去统计,所以必须用fieldsGrouping策略发射数据)。