storm自定义实现wordcount

storm中的任务

  1. storm中的任务的结构是Topology(拓扑图),这个拓扑图是一个有向无环图(DAG),DAG能够清楚的表达链式的任务,每一个节点都是一个任务,边的方向代表着数据流的方向。如下图


    Paste_Image.png
  2. storm任务中数据流的数据结构是一个个tuple,tuple元组是任意数据结构类型的键值对组合。例如:(k1:v1, k2:v2, k3:v3, ····)
  3. Spout是数据采集器,从数据源采集数据,转成tuple发射到后面的bolt处理
  4. Bolt是数据处理器,可执行数据过滤,分析等操作。

开发流程

  1. 设计Topology图


    Paste_Image.png
  2. 按照Topology图,创建maven项目后,依次写各个任务节点。首先写SentenceSpout节点。
package strom.strom;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class SentenceSpout extends BaseRichSpout {

    // tuple发射器
    private SpoutOutputCollector collector;

    private static final String[] SENTENCES = { "hadoop yarn mapreduce spark", "flume hadoop hive spark",
            "oozie yarn spark storm", "storm yarn mapreduce error", "error flume storm spark" };

    /*
     * 用于指定只针对本组件的一些特殊配置
     */
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    /*
     * spout组件的初始化方法 创建这个sentenceSpout组件实例时调用一次
     */
    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
        // 用实例变量接收发射器
        this.collector = arg2;
    }

    /*
     * 声明向后面的组件发送tuple的key是什么
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("sentence"));
    }

    /*
     * 1)指定tuple的value值,封装tuple后,并将其发射给后面的组件, 2) 会迭代式的循环调用这个方法
     */
    @Override
    public void nextTuple() {
        // 从数组中随意获取一个值
        String sentence = SENTENCES[new Random().nextInt(SENTENCES.length)];
        // 指定value值并封装为tuple后,把tuple发射给后面的组件
        this.collector.emit(new Values(sentence));
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
  1. 写splitbolt组件
package strom.strom;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;


public class SplitBolt implements IRichBolt {

    // bolt组件中的发射器
    private OutputCollector collector;

    @Override
    public void cleanup() {

    }

    /*
     * 设置key名称
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("word"));
    }

    /*
     * 每次接受到前面组件发送的tuple调用一次 ,封装好tuple后发射
     */
    @Override
    public void execute(Tuple input) {
        // 获取key value对后,取出value值
        String values = input.getStringByField("sentence");
        if (values != null && !"".equals(values)) {
            // 按空格分割value
            String[] valuelist = values.split(" ");
            for (String value : valuelist) {
                // 向后面的组件发射封装好的tuple
                this.collector.emit(new Values(value));
            }
        }
    }

    /*
     * bolt组件初始化方法,只会调用一次
     */
    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        this.collector = arg2;
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

}

4.CountBolt组件实现计数逻辑

package strom.strom;
//

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class CountBolt extends BaseRichBolt {

    // 发射器
    private OutputCollector collector;
    // 为了计数
    private Map<String, Integer> counts;

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        this.collector = arg2;
        this.counts = new HashMap<String, Integer>();
    }

    /*
     * 声明key名称,可以同时声明多个
     */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        arg0.declare(new Fields("word", "count"));
    }

    /*
     * 统计单词
     */
    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");

        int count = 1;
        // 如果这个单词已经存在,则取出count再加一
        if (counts.containsKey(word)) {
            count = counts.get(word) + 1;
        }
        counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }
}

5 . PrintBolt组件

package strom.strom;
//

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class PrintBolt extends BaseRichBolt {

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

    }

    /*
     * 打印到控制台
     */
    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        int count = input.getIntegerByField("count");
        System.out.println(word + "---->" + count);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {

    }

}

6 . WordCountTopology类用来连接这些组件

package strom.strom;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {

    private static final String SPOUT_ID = "sentenceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String COUNT_BOLT = "countBolt";
    private static final String PRINT_BOLT = "printBolt";

    public static void main(String[] args) {
        // 构造Topology
        TopologyBuilder builder = new TopologyBuilder();
        // 指定spout
        builder.setSpout(SPOUT_ID, new SentenceSpout());
        // 指定bolt,并指定当有有多个bolt时,数据流发射的分组策略
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(SPOUT_ID);
        // 因为要保证正确的单词计数,同一个单词一定要划分到同一个CountBolt上,所以按照字段值分组
        builder.setBolt(COUNT_BOLT, new CountBolt()).fieldsGrouping(SPLIT_BOLT, new Fields("word"));
        // 全局分组,所有tuple发射到一个printbolt,一般是id最小的那一个
        builder.setBolt(PRINT_BOLT, new PrintBolt()).globalGrouping(COUNT_BOLT);

        Config conf = new Config();

        if (args == null || args.length == 0) {
            // 本地执行
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("wordcount", conf, builder.createTopology());
        } else {
            // 提交到集群上执行
            // 指定使用多少个进程来执行该Topology
            conf.setNumWorkers(1);
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        }
    }
}
  1. 本地执行测试


    Paste_Image.png
  2. 打成jar包后上传到storm集群测试


    Paste_Image.png

    下面的jar包包含着依赖的包,上面的jar包中没有包括,所以我们选择使用下面这个jar包。
    上传到集群上然后执行

$ bin/storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar strom.strom.WordCountTopology wordcount

在UI中查看运行情况


Paste_Image.png

查看运行日志


Paste_Image.png

Paste_Image.png

Paste_Image.png

查看拓扑图


Paste_Image.png

Paste_Image.png

Paste_Image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容

  • Date: Nov 17-24, 2017 1. 目的 积累Storm为主的流式大数据处理平台对实时数据处理的相关...
    一只很努力爬树的猫阅读 2,176评论 0 4
  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 17,114评论 30 60
  • 本文主要介绍storm中的基本概念,从基础上了解strom的体系结构,便于后续编程过程中作为基础指导。主要的概念包...
    看山远兮阅读 1,527评论 0 9
  • 流式计算中,各个中间件产品对计算过程中的角色的抽象都不尽相同,实现方式也是千差万别。本文针对storm中间件在进行...
    一品悟技术_张驰阅读 2,490评论 0 1
  • 他不经意走过她的身旁 微笑的眼底跳动着的却是忧伤 他看起来似乎并不迷惘 他说,只是当回忆想起时 难免有些彷徨 曾经...
    沙华_ad21阅读 139评论 0 3