Storm介绍之安装部署及API

安装:
1.下载并解压缩Zookeeper

           官网地址:http://hadoop.apache.org/zookeeper/releases.html
           解压缩步骤此处省略....

2.修改zookeeper的配置文件

          把zookeeper对应的conf目录下的zoo-sample.cfg重命名为zoo.cfg,配置工作目录和端口号
         # The number of milliseconds of each 
              ticktickTime=2000
         # The number of ticks that the initial
         # synchronization phase can take
            initLimit=10
         # The number of ticks that can pass between# sending a request and getting an acknowledgement
          syncLimit=5
         # the directory where the snapshot is stored.
         # do not use /tmp for storage, /tmp here is just
         # example sakes.
            dataDir=/usr/local/dev/zookeeper-3.4.9/data
         # the port at which the clients will connect
            clientPort=2181
            server.id_num1=hostname1:2888:3888
            server.id_num2=hostname2:2888:3888
     注意:集群模式下分别在datadir目录下创建文件myid,其中的内容为id_num,例如:
            echo id_num1 > myid

3.下载并压缩storm

          官网地址:http://storm.apache.org/downloads.html
          解压缩步骤此处省略.....

4.修改storm配置文件

          修改conf/storm.yaml,conf/storm.yaml中的配置选项将覆盖conf/defaults.yaml
        1):storm.zookeeper.servers:storm集群中使用的zookeeper集群的地址
            storm.zookeeper.servers:
                -"host_ip"    (此处填写zookeeper集群的主机名或Ip,多个用逗号分隔)
            nimbus.host:"host_ip"  (此处填下nimbus进程的主机,即主节点ip)
            storm.local.dir:"/dest/to/path"  (此处填写storm存储目录)
            supervisor.slots.ports:
                -  "host_ip:port"    (此处填写从节点的端口号,可随意,当采用单机模式的时候,需要写不同的端口号,具体的个数根据从节点的个数来定)

5.启动zookeeper

注意:集群中的每台机器都要启动,且启动命令一致
  进入zookeeper的安装目录
  #bin/zkServer.sh start

6.启动storm nimbus

进入storm安装目录
 #bin/storm nimbus

7.启动storm supervisor

  注意:如果是单机模式,即启动一次即可,如果是集群模式,需要每台都要启动,命令一致    
    进入storm安装目录
     #bin/storm supervisor

8.启动storm ui

进入storm安装目录
#bin/storm ui

然后访问localhost:8080(或者主节点主机名:8080)就会看到storm的基本信息,到此,storm的安装部署已经成功

接下来进入storm的API,首先先要了解storm中的顶层接口IComponent
storm中Spout和Bolt都是其Component(部件的意思),所以storm定义了一个名叫IComponent的接口,全家普如下:

Paste_Image.png

注意:

绿色部分是我们常用的类,红色部分是与事务有关的

BaseComponent是Storm提供的"偷懒"的类,它及其子类,或多或少实现了接口的部分方法,这样我们在使用的时候,不用自己每次都写所有的方法,值得一提的是:像BaseXXX的类,它所实现的方法,都是空的,直接返回null,如果继承这样的类,需要自己重写方法。下面介绍Spout和Bolt组件相关的Api

Spout
首先看一下总体图:

Paste_Image.png

从图中很明显的看出Spout最顶层抽象的是ISpout接口,简单介绍一样接口中的方法:

Paste_Image.png

open():初始化动作,可以在该Spout初始化的时候做一些动作,传递上下文等

close():该Spout关闭之前执行,但不能得到保证一定可以执行.Spout是作为Task运行在Worker中的,在Cluster模式下,supervisor会直接kill -9 worker的进程,这样它就无法执行了.而在本地模式下,如果是发送停止命令,是可以保证close方法的执行的.

activate()和deactivate():一个Spout可以被暂时激活和关闭,这两个方法可以在对应的时刻调用执行

nextTuple():用来发射数据,Spout中最核心的部分,一些具体的需求可以在该方法中实现

ack():一个Tuple会有唯一一个id,当该Tuple被成功处理,会执行该方法

fail():与ack()方法同理,当Tuple处理失败会调用该方法

总结:

通常情况下(shell和事务除外),实现一个Spout,可以直接实现IRichSpout,如果不想写多余的代码,可以继承BaseRichSpout

Bolt
同样,首先看下总体图:

Paste_Image.png

可以看出为什么IBasicBolt没有继承IBolt?
我们先看下IBolt的方法:

Paste_Image.png

我们需要知道的是IBolt继承了java.io.Serializable,我们在nimbus上提交了Topology后,创建出来的Bolt会序列化发送到具体执行的Worker上,Worker在执行该Bolt时,会首先调用prepare方法传入当前执行的上下文

execute(Tuple):接收一个Tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail方法(表示失败)来反馈结果

cleanup():同Ispout的close方法,不能保证其一定被执行

好了,现在可以回答为什么IBasicBolt没有继承IBolt这个问题了,Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中反馈结果了,storm内部会自动反馈结果

总结:

通常情况下实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理反馈结果,可以实现IBasicBolt接口或继承BaseBasicBolt,它实际上是自己做掉了prepare方法和collector.emit.ack(inputTuple)方法.

OK,介绍完了简单的方法,下面写一个简单的Demo,加深一下对Spout和Bolt的理解

简单需求:对名称加后缀并转换成大写

RandomWordSpout.java

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class RandomWordSpout extends BaseRichSpout{

          private SpoutOutputCollector  collector;
          
          //模拟一些数据
          String[] str = {"hello","word","you","how","are"};
          

        //初始化方法,在spout组件实例化时调用一次
          @Override
          public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                  this.collector = collector;
          }

          @Override
          public void nextTuple{
                 //随机挑选出一个名称
                  Random random = new Random();
                  int index = random.nextInt(str.length);
                
                  //获取名称
                  String name = str[index];

                  //将名称进行封装成tuple,发送消息给下一个组件
                  collector.emit(new Vaules(name));
            }

        //声明本spout组件发送出去的tuple中的数据的字段名
           @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
                  declarer.declare(new Fields("orignname"));
              }
}

UpperBolt.java

      import backtype.storm.topology.BasicOutputCollector;
      import backtype.storm.topology.OutputFieldsDeclarer;
      import backtype.storm.topology.base.BaseBasicBolt;
      import backtype.storm.tuple.Fields;
      import backtype.storm.tuple.Tuple;
      import backtype.storm.tuple.Values;
    
      public class UpperBolt extends BaseBasicBolt{

              //业务处理逻辑
               @Override
              public void execute(Tuple tuple, BasicOutputCollector collector) {
                      //先获取到上一个组件传递过来的数据,数据在tuple里面
                      String godName = tuple.getString(0);
    
                       //将名称转换成大写
                      String godName_upper = godName.toUpperCase();
    
                      //将转换完成的商品名发送出去
                      collector.emit(new Values(godName_upper));
            }



             //声明该bolt组件要发出去的tuple的字段
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                    declarer.declare(new Fields("uppername"));
              }
    }

SuffixBolt.java

    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Tuple;

    public class SuffixBolt extends BaseBasicBolt{
          FileWriter fileWriter = null;
        
          //在bolt组件运行过程中只会被调用一次
            @Override
          public void prepare(Map stormConf, TopologyContext context) {  
                  try {
                      fileWriter = new   FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
                      } catch (IOException e) {
                              throw new RuntimeException(e);
                      }
}

        //该bolt组件的核心处理逻辑
        //每收到一个tuple消息,就会被调用一次
            @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            //先拿到上一个组件发送过来的名称
              String upper_name = tuple.getString(0);
              String suffix_name = upper_name + "_itisok";

          //为上一个组件发送过来的商品名称添加后缀
              try {
                  fileWriter.write(suffix_name);
                  fileWriter.write("\n");
                  fileWriter.flush();
              } catch (IOException e) {
                  throw new RuntimeException(e);
              }
        }
}

TopoMain.java

    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.generated.StormTopology;
    import backtype.storm.topology.TopologyBuilder;

  /**
   * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
 * 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
   */
    public class TopoMain {
        public static void main(String[] args) throws Exception {   
              TopologyBuilder builder = new TopologyBuilder();

            //将我们的spout组件设置到topology中去 
            //parallelism_hint :4  表示用4个excutor来执行这个组件
            //setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task

              builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
    
          //将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
          //.shuffleGrouping("randomspout")包含两层含义:
          //1、upperbolt组件接收的tuple消息一定来自于randomspout组件
          //2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping

              builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
    
          //将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息

              builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
    
          //用builder来创建一个topology
               StormTopology demotop = builder.createTopology();

          //配置一些topology在集群中运行时的参数
               Config conf = new Config();
         //这里设置的是整个demotop所占用的槽位数,也就是worker的数量
               conf.setNumWorkers(4);
              conf.setDebug(true);
              conf.setNumAckers(0);
    
        //将这个topology提交给storm集群运行
             StormSubmitter.submitTopology("demotopo", conf, demotop);  
}

}

最后将工程打包,在集群上运行
#storm jar jar_name.jar class_name args0 ....

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容

  • Date: Nov 17-24, 2017 1. 目的 积累Storm为主的流式大数据处理平台对实时数据处理的相关...
    一只很努力爬树的猫阅读 2,165评论 0 4
  • 目录 场景假设 调优步骤和方法 Storm 的部分特性 Storm 并行度 Storm 消息机制 Storm UI...
    mtide阅读 17,099评论 30 60
  • 一、Storm是什么 Storm是一个免费并开源的分布式实时计算系统。利用Storm可以很容易做到可靠地处理无限的...
    Graceleeman阅读 3,021评论 0 6
  • 原文链接Storm Tutorial 本人原创翻译,转载请注明出处 这个教程内容包含如何创建topologies及...
    quiterr阅读 1,615评论 0 6
  • 推酷诚意满满的设计周刊《设计匠艺》, 下面是内容列表,干货多多,也可以移步到官网进一步阅读。 产品之道 译文|如何...
    推酷阅读 217评论 0 1