Storm架构与编程模型分析

这里使用[wordcount]程序来进行分析,其中主类如下

public class WordCountTopologyDriver {

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {

        //1、创建topologyBuilder,设置spout和bolt
        TopologyBuilder topologyBuilder = new TopologyBuilder();

        //设置spout   传参:id,使用的Spout类,并发度
        topologyBuilder.setSpout("myspout",new MySpout(),2);

        //设置Bolt    传参:id,使用的Bolt类,并发度
        //设置分组策略    随机分 参数为spout的id
        //mybolt1与myspout跟进id进行连接,怎么连接?取决于分组策略,shuffleGrouping会对myspout进行分组
        //五个task(也就是五个executor或者说五个线程)
        topologyBuilder.setBolt("mybolt1",new SplitBolt(),2).shuffleGrouping("myspout");
        //设置分组策略    按字段分 参数为上一阶段的bolt的id
        //注:如果字段与mybolt里面声明的不一致会出现backtype.storm.generated.InvalidTopologyException: null
        topologyBuilder.setBolt("mybolt2",new CountBolt(),4).fieldsGrouping("mybolt1",new Fields("word"));

        //2、创建Config,指定分配的worker的数量
        Config config = new Config();
        config.setNumWorkers(2);

        //提交任务,可以使用storm集群来提交也可以使用本地模式来提交(便于调试)
//        StormSubmitter.submitTopology("wordcountsubmit",config,topologyBuilder.createTopology());
        //使用本地模式提交
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("wordcountsubmit",config,topologyBuilder.createTopology());
    }
}

设置了myspout的并发度为2,mybolt1的并发度为4,mybolt2的并发度为2,worker的数量为2。
分析图

image.png

流程分析:
这里有4台物理机,storm1-4分别对应nimbus和3个supervisor。
1、当客户端提交任务时,nimbus接收到任务,开始做任务分配,它会找到空闲的supervisor比如找的是storm2和storm3两台机器,并进行任务分配;
2、根据前面的介绍,这里一个需要8个task,而有2个worker,那么每个worker就分配了4个task。worker1和worker2各分配一个mySpout和splitBolt,从自己跑的wordcount程序能看到多个task在worker上分配一般是轮询的,所以可以认为worker1上分配了myBolt2的0和2task,worker2上分配了myBolt2的1和3task(或者反过来)。
3、任务分配好了选择了对应的端口(worker),找出了Supervisor。那么将通过zookeeper将任务分发下去,让2个Supervisor开始跑程序。
4、两个mySpout获取数据源,将数据随机发送到自己worker上的bolt1和另外一台的worker上的bolt(因为设置了随机发射数据shuffleGrouping)。
5、两台机器上的bolt1接着会向bolt2发送数据,发送规则为为指定字段(单词)hash取模比bolt2的数量(这样保证相同的单词肯定会发送到某个bolt2中,而不会很多bolt2中有相同的单词,如果是这样某个单词的统计就不完整了最后还需要统一去统计,所以必须用fieldsGrouping策略发射数据)。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 一、Storm简介 Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数...
    达微阅读 4,398评论 0 3
  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 17,240评论 30 60
  • Strom集群结构是有一个主节点(nimbus)和多个工作节点(supervisor)组成的主从结构,主节点通过配...
    看山远兮阅读 7,978评论 0 7
  • Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的数据流,像Hadoop...
    timothyue1阅读 3,870评论 0 0
  • 才看到今天是世界读书日,现在能够静下心来读书的人又有多少。 我发现几乎每天都有什么什么日,然而并没有多少人关心,而...
    烟花瞬间阅读 2,920评论 0 3

友情链接更多精彩内容