Storm的分组方式

Storm中内置了7种分组方式

Shuffle grouping

  • 定义: Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
  • 样例
    此样例由Storm的官方提供,通过下面这个例子可以对Shuffle grouping有更直观的认识

    public class ExclamationTopology {
    
    public static class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;
    
        @Override
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }
    
        @Override
        public void execute(Tuple tuple) {
            System.out.println(tuple.getString(0) + " is from task " + tuple.getSourceTask() + " of Spout/Bolt:" + tuple.getSourceComponent());
    
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    
    }
    
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
    
        builder.setSpout("word", new TestWordSpout(), 10);
        builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
        builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
    
        Config conf = new Config();
        conf.setDebug(true);
    
        if (args != null && args.length > 0) {
            conf.setNumWorkers(30);
    
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        }
        else {
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(5000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }
    

}
```

本地运行这个样例,会有类似如下的日志打印,从这个打印中可以看到,Bolt exclaim1的数据来自于Spout word的10个task,即task[7-16]

```
jackson is from task 11 of Spout/Bolt:word
mike is from task 8 of Spout/Bolt:word
nathan is from task 12 of Spout/Bolt:word
nathan is from task 16 of Spout/Bolt:word
nathan is from task 13 of Spout/Bolt:word
```

Fields grouping

  • 定义:The stream is partitioned by the fields specified in the grouping. For example, if the stream is grouped by the "user-id" field, tuples with the same "user-id" will always go to the same task, but tuples with different "user-id"'s may go to different tasks.
  • 样例

    对上面的样例稍加改造

    builder.setSpout("word", new TestWordSpout(), 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).fieldsGrouping("word", new Fields("word"));
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
    

    从运行的结果中可以看到类似如下的打印,说明相同的字符都来自于同一个task

    mike!!! is from task 2 of Spout/Bolt:exclaim1
    mike!!! is from task 2 of Spout/Bolt:exclaim1
    mike!!! is from task 2 of Spout/Bolt:exclaim1
    mike!!! is from task 2 of Spout/Bolt:exclaim1
    

    或者在execute方法中在加如下的打印System.out.println("Current thread is " + Thread.currentThread().getId() + " to emit " + tuple.getString(0) + "!!!");,可以看到类似如下的打印,所有的mike!!!都是由同一个线程处理的。

    Current thread is 124 to emit mike!!!
    Current thread is 124 to emit mike!!!
    

All grouping

  • 定义:The stream is replicated across all the bolt's tasks. Use this grouping with care.
  • 样例

    对上面的样例稍加改造

    builder.setSpout("word", new TestWordSpout(), 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).allGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
    

    从运行的结果中可以看到类似如下的打印,因为Bolt exclaim1的有3个task,所以下面的结果说明了,Bolt exclaim2要从每个task中都取一次

    Current thread is 124 to emit mike!!!
    Current thread is 128 to emit mike!!!
    Current thread is 150 to emit mike!!!
    
    [Thread-18-exclaim1-executor[2 2]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 6 tuple: source: exclaim1:2, stream: default, id: {}, [mike!!!]]
    [Thread-22-exclaim1-executor[3 3]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: exclaim1:3, stream: default, id: {}, [mike!!!]]
    [Thread-44-exclaim1-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 6 tuple: source: exclaim1:4, stream: default, id: {}, [mike!!!]]
    

Global grouping

  • 定义:The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the
    task with the lowest id.
  • 样例

    对上面的样例稍加改造

     builder.setSpout("word", new TestWordSpout(), 10);
     builder.setBolt("exclaim1", new ExclamationBolt(), 3).globalGrouping("word");
     builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
    

    结果中会有类似如下的打印,说明mike!!!都来自于了同一个Bolt

    mike!!! is from task 2 of Spout/Bolt:exclaim1
    mike!!! is from task 2 of Spout/Bolt:exclaim1
    mike!!! is from task 2 of Spout/Bolt:exclaim1
    

None grouping

  • 定义:This grouping specifies that you don't care how the stream is grouped. Currently, none
    groupings are equivalent to shuffle groupings.

Direct grouping

  • 定义:This is a special kind of grouping. A stream grouped this way means that the producer
    of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the emitDirect methods.

Local or shuffle grouping

  • 定义: If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a normal shuffle grouping.
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • 同学聚会,惯例先吃饭后k歌。吃饭不怕,天天练习。k歌已是不行,半年未进歌厅,连会唱啥歌都得使劲回忆,回忆不起,还得...
    宛如初夏阅读 172评论 0 0
  • 脂肪饮食原则吃低脂肪饮食。 如果过多的食入脂肪类食物,不仅不容易消化吸收,而且过多的脂肪会直接加重病情,尤其是动物...
    cd2016阅读 869评论 0 1
  • 似是而飞阅读 170评论 0 0