32 storm 单词计数

上一篇 简单看 storm, 主要简单讲解了storm 的集群架构、核心概念、并行度、流分组,本篇利用 storm 结合代码进行单词计数,讲解从代码层面理解入门storm。

直接开撸代码

单词计数简单的实现逻辑:

  • 构造一个 Spout,为下游 Bolt 作业提供数据源
  • 构造一个 Bolt,处理上游流向数据,进行单词切分
  • 构造一个 Bolt,处理上游 Bolt ,进行单词计数
  • 将 Spout 、Bolt 组装起来,构建成一个拓扑(Topology)
  • 将 Topology 提交到 storm 集群,等待结果

创建名为 storm-wordcount 的普通 maven project

  • 在 pom.xml 引入相关类库依赖
  <dependencies>
     <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
             <version>1.1.0</version>
                 <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>commons-collections</groupId>
          <artifactId>commons-collections</artifactId>
          <version>3.2.1</version>
        </dependency>
        <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>22.0</version>
        </dependency>
  </dependencies>
  • 加入 plugin ,打包
<build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>test/main/java</testSourceDirectory>
        
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <configuration>
                    <createDependencyReducedPom>true</createDependencyReducedPom>
                    <filters>
                        <filter>
                            <artifact>*:*</artifact>
                            <excludes>
                                <exclude>META-INF/*.SF</exclude>
                                <exclude>META-INF/*.sf</exclude>
                                <exclude>META-INF/*.DSA</exclude>
                                <exclude>META-INF/*.dsa</exclude>
                                <exclude>META-INF/*.RSA</exclude>
                                <exclude>META-INF/*.rsa</exclude>
                                <exclude>META-INF/*.EC</exclude>
                                <exclude>META-INF/*.ec</exclude>
                                <exclude>META-INF/MSFTSIG.SF</exclude>
                                <exclude>META-INF/MSFTSIG.RSA</exclude>
                            </excludes>
                        </filter>
                    </filters>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
    
          <plugin>
            <groupId>org.codehaus.mojo</groupId>
            <artifactId>exec-maven-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
              <execution>
                <goals>
                  <goal>exec</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <executable>java</executable>
              <includeProjectDependencies>true</includeProjectDependencies>
              <includePluginDependencies>false</includePluginDependencies>
              <classpathScope>compile</classpathScope>
              <mainClass></mainClass>
            </configuration>
          </plugin>
        </plugins>
    </build>

新建一个 WordCountTopology 类

  • 在 WordCountTopology 中编写一个 RandomSentenceSpout 静态内部类,继承实现 BaseRichSpout 抽象类
/**
     * 
     * 编写spout ,继承一个基类,负责从数据源获取数据
     * @author bill
     * @date 2017年9月16日 下午8:21:46
     */
    public static class RandomSentenceSpout extends BaseRichSpout{
        
        private static final long serialVersionUID = 6102239192526611945L;

        private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);
        
        private SpoutOutputCollector collector;
        private Random random;

        /**
         * 当一个Task被初始化的时候会调用此open方法,
         * 一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化
         */
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.random = new Random();
        }

        /**
         * 这个spout类,之前说过,最终会运行在task中,某个worker进程的某个executor线程内部的某个task中
         * 那个task会负责去不断的无限循环调用nextTuple()方法
         * 只要的话呢,无限循环调用,可以不断发射最新的数据出去,形成一个数据流
         */
        public void nextTuple() {
            String[] sentences = new String[]{
                     "I used to watch her from my kitchen widow"
                    , "she seemed so small as she muscled her way through the crowd of boys on the playground"
                    , "The school was across the street from our home and I would often watch the kids as they played during recess"
                    , "A sea of children, and yet tome"
                    , "she stood out from them all"};
            String sentence = sentences[random.nextInt(sentences.length)];
            LOGGER.info(" ★★★  发射 sentence 数据 > {}", sentence);  
            // 这个values,你可以认为就是构建一个tuple,tuple是最小的数据单位,无限个tuple组成的流就是一个stream,通过 emit 发送数据到下游bolt tuple
            this.collector.emit(new Values(sentence));
        }

        /**
         * 用于声明当前Spout的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
         * 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游
 bolt 中 execute 接收数据 key 
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
    }
  • 在 WordCountTopology 中编写一个 SplitSentenceBolt 静态内部类,继承实现 BaseRichBolt 抽象类,用于处理上游 Spout 发送过来的数据,这里做句子单词切分
/**
     * 
     * 编写一个bolt,用于切分每个单词,同时把单词发送出去
     * @author bill
     * @date 2017年9月16日 下午8:27:45
     */
    public static class SplitSentenceBolt extends BaseRichBolt{
        
        private static final long serialVersionUID = -4758047349803579486L;
        
        private OutputCollector collector;

        /**
         * 当一个Task被初始化的时候会调用此prepare方法,对于bolt来说,第一个方法,就是prepare方法
         * OutputCollector,这个也是Bolt的这个tuple的发射器,一般都会在此方法中对发送Tuple的对象OutputCollector初始化
         */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        
        /**
         * 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的
         * 就是说,每次接收到一条数据后,就会交给这个executor方法来执行
         * 切分单词
         */
        public void execute(Tuple input) {
            // 接收上游数据
            String sentence = input.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for(String word : words){
                //发射数据
                this.collector.emit(new Values(word));
            }
        }

        /**
         * 用于声明当前bolt的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
         * 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游 bolt 中 execute 接收数据 key 
         * 定义发射出去的tuple,每个field的名称
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }   
    }
  • 在 WordCountTopology 中编写一个 WordCountBolt 静态内部类,继承实现 BaseRichBolt 抽象类,用于处理上游 Bolt 发送过来的数据,这里做单词计数
/**
     * 
     * 单词次数统计bolt
     * @author bill
     * @date 2017年9月16日 下午8:35:00
     */
    public static class WordCountBolt extends BaseRichBolt{
        
        private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);

        private static final long serialVersionUID = -7114915627898482737L;
        
        private OutputCollector collector;
        
        Map<String,Long> countMap = Maps.newConcurrentMap();
        
        /**
         * 当一个Task被初始化的时候会调用此prepare方法,对于bolt来说,第一个方法,就是prepare方法
         * OutputCollector,这个也是Bolt的这个tuple的发射器,一般都会在此方法中对发送Tuple的对象OutputCollector初始化
         */
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        /**
         * 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的
         * 就是说,每次接收到一条数据后,就会交给这个executor方法来执行
         * 统计单词
         */
        public void execute(Tuple input) {
            // 接收上游数据
            String word = input.getStringByField("word");
            Long count = countMap.get(word);
            if(null == count){
                count = 0L;
            }
            count ++;
            countMap.put(word, count);
            LOGGER.info(" ★★★  单词计数[{}] 出现的次数:{}", word, count); 
            //发射数据
            this.collector.emit(new Values(word,count));
        }
        
        /**
         * 用于声明当前bolt的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
         * 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游 bolt 中 execute 接收数据 key 
         * 定义发射出去的tuple,每个field的名称
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","count"));
        }
    }
  • 构造拓扑(Topology),提交 storm 集群执行(**注: storm 提供了本地模拟集群,可以直接在代码编辑器中编译执行

在 WordCountTopology 中编写main 执行方法,代码如下:

public static void main(String[] args) {
        //去将spout和bolts组合起来,构建成一个拓扑
        TopologyBuilder builder = new TopologyBuilder();
        
        // 第一个参数的意思,就是给这个spout设置一个名字
        // 第二个参数的意思,就是创建一个spout的对象
        // 第三个参数的意思,就是设置spout的executor有几个
        builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
        builder.setBolt("SplitSentence", new SplitSentenceBolt(), 5)
        //为bolt 设置 几个task
        .setNumTasks(10)
        //设置流分组策略
        .shuffleGrouping("RandomSentence");
        
        // fieldsGrouping 这个很重要,就是说,相同的单词,从SplitSentenceSpout发射出来时,一定会进入到下游的指定的同一个task中
        // 只有这样子,才能准确的统计出每个单词的数量
        // 比如你有个单词,hello,下游task1接收到3个hello,task2接收到2个hello
        // 通过fieldsGrouping 可以将 5个hello,全都进入一个task
        builder.setBolt("wordCount", new WordCountBolt(), 10)
        //为bolt 设置 几个task
        .setNumTasks(20)
        //设置流分组策略
        .fieldsGrouping("SplitSentence", new Fields("word"));
        
        // 运行配置项
        Config config = new Config();
        
        //说明是在命令行执行,打算提交到storm集群上去
        if(args != null && args.length > 0){
            /** 
             *  要想提高storm的并行度可以从三个方面来改造
             *  worker(进程)>executor(线程)>task(实例)
             *  增加work进程,增加executor线程,增加task实例
             *  对应 supervisor.slots.port 中配置个数
             *  这里可以动态设置使用个数
             *  最好一台机器上的一个topology只使用一个worker,主要原因时减少了worker之间的数据传输
             *  
             *  注意:如果worker使用完的话再提交topology就不会执行,因为没有可用的worker,只能处于等待状态,把之前运行的topology停止一个之后这个就会继续执行了
             */
            config.setNumWorkers(3);
            try {
                // 将Topolog提交集群
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }else{
            // 说明是在eclipse里面本地运行
            
            // 用本地模式运行1个拓扑时,用来限制生成的线程的数量
            config.setMaxTaskParallelism(20);
            
            // 将Topolog提交本地集群
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordCountTopology", config, builder.createTopology());
            
            // 为了测试模拟等待
            Utils.sleep(60000);
            // 执行完毕,关闭cluster
            cluster.shutdown();
        }
    }
  • 运行main 方法,查看效果
运行效果

总结:本次demo,讲解了如何从代码层面理解storm 的集群架构、核心概念、并行度、流分组,结合上一篇文章,同时展现了Spout 到 Bolt,Bolt 到 Bolt 的通信

以上就是本章内容,如有不对的地方,请多多指教,谢谢!

为了方便有需要的人,本系列全部软件都在 https://pan.baidu.com/s/1qYsJZfY

下章预告:主要讲解 storm 集群搭建

代码地址附上:https://github.com/bill5/cache-project/tree/master/storm-wordcount

作者:逐暗者 (转载请注明出处)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容