Storm(二)官方Tutorial

原文链接Storm Tutorial

本人原创翻译,转载请注明出处

这个教程内容包含如何创建topologies及部署到Storm集群上。Java是主要使用语言,但有的例子使用了Python,主要是为了解释Storm的多语言能力。

前言

本教程使用的例子来自storm-starter project。建议clone该project并照着练习。阅读Setting up a development environmentCreating a new Storm project以使你的计算机具备开始条件。
这两篇文章本人已翻译,请阅Storm(一)打造开发环境&创建一个Storm项目

Storm集群的组件

表面上看Storm集群和Hadoop集群有些像。在Hadoop上运行的是"MapReduce jobs",而在Storm上运行的是topologies。"Jobs" 和 "topologies"大不相同,有一个关键不同就是MapReduce job最终会停止,而topology永不停止(除非被用户kill掉)。

Storm集群有两类节点:master节点和worker节点。master节点上运行着一个守护程序 "Nimbus"(和Hadoop的 "JobTracker"有些像)。Nimbus负责在集群中散布code,给各个机器分配任务以及监控失败的情况。每个worker节点上也运行着一个守护程序"Supervisor"。Supervisor负责接收Nimbus分配的任务,按需启动和停止worker进程。每个worker进程执行了一个topology的子集。一个运行中的topology包含了多个worker,这些worker分布在多个机器上。

所有Nimbus和Supervisors的协调都通过Zookeeper集群进行。此外,Nimbus和Supervisors是立即失败和无状态的(fail-fast and stateless)。所有的状态都保存在Zookeeper或本地硬盘上。这意味着即使通过kill -9 杀死Nimbus和Supervisors,他们也会自动恢复,这个设计给了Storm集群难以置信的稳定性。

Topologies

要利用Storm来进行实时计算,就要创建Topologies。一个topology是一个计算的图,topology 中的每个节点包含处理逻辑,topology 中的每个边(link)指明了数据如何在节点中传递。运行一个topology很简单,首先打包code和依赖到jar文件,然后执行以下命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
这样就启动了org.apache.storm.MyTopology类,参数是arg1、arg2,这个类的主函数定义了 topology并提交到Nimbus。由于topology是Thrift structs,而且Nimbus是Thrift service,因此可以用任意的编程语言创建和提交topologies。这里举的是基于JVM语言的最简单的例子,更多信息请阅Running topologies on a production cluster

Streams

Storm 中最核心的抽象概念是"stream"。stream是元组(tuples)的无限序列,Storm提供了一种分布式、可靠的方式来将一个stream转化成一个新的stream。举个例子,你可以将一个tweets stream转化成一个trending topics stream。

Storm提供了"spouts" 和 "bolts"来完成stream的转化。通过实现Spouts和bolts的接口,你可以运行应用相关的逻辑。
spout是stream的来源,举个例子,spout可以从Kestrel队列中读取tuples并生成stream,或者spout也可以通过Twitter API生成一个tweets stream。

bolt可以消费任意数量的输入stream,做一些处理,很可能抛出新的stream。类似将tweets stream转化成trending topics stream这样的复杂转化,常常需要多个步骤,对应着多个bolt。Bolts可以做的事情很多,比如run functions, filter tuples, do streaming aggregations, do streaming joins, talk to databases等等。

spouts和bolts的网络被打包成了"topology",这是你提交到Storm集群执行的最高级别的抽象。 topology是一个stream转化的图,图的节点是spout或bolt,图的边指明了哪个bolt在订阅哪个stream。当一个spout 或 bolt发出一个tuple到一个stream,那么订阅了这个stream的所有bolt都会收到这个tuple。


Storm topology中的每个节点都是并行运行,在你的topology中,你可以指定各个节点的并行程度,Storm会按照你指定的数目在集群中启动相应数量的线程。

topology会永久执行下去,除非你停止它。Storm会自动重新分配失败的任务,此外,Storm保证数据不会丢失,即便机器宕机并且messages are dropped。

Data model

Storm的数据模型是tuple。tuple是一个命名list,字段可以是任意类型的对象。Storm支持所有primitive types, strings, and byte arrays作为tuple的字段值。如果要使用其他类型的对象,只需要实现serializer。
topology的每个节点都需要定义输出的tuple字段。如下的例子中,bolt定义了两个带有 "double"和"triple"字段的tuple。

public class DoubleAndTripleBolt extends BaseRichBolt {
    private OutputCollectorBase _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        int val = input.getInteger(0);        
        _collector.emit(input, new Values(val*2, val*3));
        _collector.ack(input);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("double", "triple"));
    }    
}

一个简单的topology

看一下storm-starter中ExclamationTopology的定义:

TopologyBuilder builder = new TopologyBuilder();        
builder.setSpout("words", new TestWordSpout(), 10);        
builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("words");
builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");

这个Topology包含一个spout和两个bolt,spout发出words,bolt在输入的string后面加上"!!!"。节点被组织成一条直线:spout发出到第一个bolt,然后到第二个bolt。如果spout发出了tuples ["bob"]和["john"],那么第二个bolt会发出 ["bob!!!!!!"]和["john!!!!!!"]。

setSpout 和 setBolt方法定义了节点,第一个参数是用户定义的ID,第二个参数是处理逻辑对象,第三个参数是并发线程数。

spout的处理逻辑对象实现了IRichSpout接口,bolt的处理逻辑对象实现了IRichBolt接口。最后一个参数是可选的,如果不指定,Storm只分配一个线程。

setBolt返回一个InputDeclarer对象,这个对象定义了bolt的输入。这里组件exclaim1声明了它要读取所有组件words发出的tuples。组件exclaim2声明了它要读取所有组件exclaim1发出的tuples。"shuffle grouping"会被随机的分配到bolt。组件之间有多种分组数据的方式,如果想要组件exclaim2同时读取组件words和exclaim1的tuples,可以像这样定义:

builder.setBolt("exclaim2", new ExclamationBolt(), 5)
            .shuffleGrouping("words")
            .shuffleGrouping("exclaim1");

接下来看看这个topology的spout和bolt的实现。spout负责发出数据到topology。TestWordSpout每隔100ms从["nathan", "mike", "jackson", "golda", "bertels"]中发出随机word作为一个tuple,TestWordSpout的nextTuple()实现如下:

public void nextTuple() {
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    _collector.emit(new Values(word));
}

ExclamationBolt的实现如下:

public static class ExclamationBolt implements IRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    @Override
    public void cleanup() {
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

prepare方法给bolt提供了一个OutputCollector,用来从这个bolt发出tuple。Tuples可以在任何时候发出——包括prepare, execute, 或cleanup方法,甚至在另一个线程中异步发出。这里prepare方法只是保存OutputCollector为实例变量,后面execute方法会用到。

execute方法接收一个tuple,ExclamationBolt抓取tuple的第一个字段并在后面加上"!!!"。如果bolt订阅了多个输入源,可以通过Tuple#getSourceComponent方法查询tuple来源。
输入tuple被作为了emit的第一个参数,最后一行ack了输入tuple。这些是Storm可靠性API的一部分,用以保证没有数据丢失,本教程后续会进一步介绍。

cleanup方法在bolt停止的时候调用,在这里应该关闭所有打开的资源。Storm集群不保证一定会调用这个方法:例如,if the machine the task is running on blows up, there's no way to invoke the method. cleanup方法适用于local模式,你可以运行和停止许多topologies,不必担心资源泄露。

declareOutputFields方法声明了ExclamationBolt发出带一个名为"word"字段的1-tuples。

getComponentConfiguration方法允许你配置组件运行的参数。这是一个更高级的主题Configuration

在一个bolt的实现中,像cleanup 和 getComponentConfiguration这样的方法常常不需要。你可以通过继承提供了默认实现的基类来更简洁的实现bolt。例如:

public static class ExclamationBolt extends BaseRichBolt {
    OutputCollector _collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
        _collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }    
}

在local模式下运行ExclamationTopology

在local模式中,Storm完全在一个进程内运行,worker通过线程模拟,主要用于测试和开发场景。当你运行storm-starte中的topologies,他们将在local模式下运行,你能够看到每个组件正在发出的消息。
更多local模式信息请阅Local mode
更多分布式模式信息请阅running topologies in local mode on Local mode

如下是local模式下运行ExclamationTopology的代码:

Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(2);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();

首先,通过创建LocalCluster对象定义了一个进程内的集群。提交topologies到这个虚拟集群和提交到分布式集群是完全一样的操作。submitTopology方法的第一个参数是topology的名字,第二个参数是topology的配置,第三个参数是topology对象。

这里topology的配置很常见:
TOPOLOGY_WORKERS (由setNumWorkers设置) 指定了你想分配多少进程来执行这个topology。 topology中的每个组件将以线程的形式执行,线程的数量由setBolt 和 setSpout配置。
TOPOLOGY_DEBUG (由setDebug设置) 当设置为true时, Storm记录组件发出的每个消息。

更多配置信息请阅the Javadoc for Config.

Stream groupings

stream grouping用于描述组件之间如何发送tuple。在集群中,spouts 和 bolts总是并行执行任务,在执行任务层面上,一个topology看起来如下图所示:


当Bolt A的一个任务发出tuple到Bolt B时,应该发到Bolt B的哪个任务呢?
stream groupings就是用来解决这个问题。在深入了解不同种类的stream groupings之前,我们先看看storm-starter中的另一个topology。WordCountTopology从spout中读取句子,WordCountBolt统计单词出现的次数。

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("sentences", new RandomSentenceSpout(), 5);        
builder.setBolt("split", new SplitSentence(), 8)
        .shuffleGrouping("sentences");
builder.setBolt("count", new WordCount(), 12)
        .fieldsGrouping("split", new Fields("word"));

SplitSentence发出句子中的单词作为tuple,WordCount以map的形式存储单词出现的次数,每次WordCount收到单词,就更新map并发出新的单词数目。

最简单的grouping方式是"shuffle grouping",也就是随机发送tuple给任务。"fields grouping"是一个更有趣的grouping方式,这里用在了SplitSentence bolt 和 WordCount bolt之间。对WordCount bolt来说,相同的单词应该发送到相同的任务,否则多个任务都会收到同样的单词,由于每个任务的信息都不完全,他们可能会发出错误的单词数目。fields grouping用字段的子集来分组,字段值相同的tuple被发送到相同的任务。

Fields groupings是实现streaming joins 和 streaming aggregations,它的底层实现利用了mod hashing。还有一些其他的分组方式,请阅Concepts

使用其他编程语言定义Bolts

Bolts可以使用任意语言定义,用JVM-based语言以外定义的Bolts以子进程的方式运行,Storm以stdin/stdout之上的JSON格式消息与子进程通信。通信协议用到了一个100行左右的适配器库,支持Ruby, Python, Fancy。

WordCountTopology的SplitSentence定义如下:

public static class SplitSentence extends ShellBolt implements IRichBolt {
    public SplitSentence() {
        super("python", "splitsentence.py");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

SplitSentence重写了ShellBolt,声明它使用python,参数是splitsentence.py。splitsentence.py实现如下:

import storm

class SplitSentenceBolt(storm.BasicBolt):
    def process(self, tup):
        words = tup.values[0].split(" ")
        for word in words:
          storm.emit([word])

SplitSentenceBolt().run()

更多多语言信息,请阅Using non-JVM languages with Storm

确保消息被处理

这部分属于Storm's reliability API的内容:Storm如何保证spout发出的消息都能被处理,请阅Guaranteeing Message Processing

Transactional topologies

Storm保证每个消息都至少被处理一次。一个常见的问题是:Storm会不会overcount?Storm提供了一种机制,确保消息只被传递一次。transactional topologies

Distributed RPC

除了本教程已经展示的功能,Storm还可以做许多事。其中最有趣的应用之一是Distributed RPC

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容

  • Date: Nov 17-24, 2017 1. 目的 积累Storm为主的流式大数据处理平台对实时数据处理的相关...
    一只很努力爬树的猫阅读 2,145评论 0 4
  • 参考文章: Apache Storm 官方文档中文版 storm Tutorial 的解读 + 个人理解 官方文档...
    louisliaoxh阅读 1,123评论 0 1
  • Storm入门系列之一:storm核心概念及特性 本文的将介绍一些 storm 入门的基础知识,包括 storm ...
    zhaif阅读 3,019评论 0 17
  • 什么是实时流计算? 主要的处理模式可以分为:流处理,批处理 流处理是直接处理,有时也分为在线,离线,近线(st...
    Bloo_m阅读 5,038评论 1 1
  • 是什么时候开始,我喜欢上了写点文字,表达自己的心情。这个日子大概要追述到青春期的时候,那时候的我们正走...
    周周writing阅读 509评论 1 1