Storm Trident(一)官方Tutorial

原文链接:http://storm.apache.org/releases/1.0.2/Trident-tutorial.html

本人原创翻译,转载请注明出处

Trident是基于Storm做实时计算的高等级的抽象。它允许你无缝集成高吞吐量(每秒100万级别的消息)、无状态流处理、低延时的分布式查询。 如果你熟悉Pig或Cascading等高级别的批处理工具,就会很熟悉Trident的概念——Trident有joins, aggregations, grouping, functions, and filters。除此以外,Trident原生支持任何数据库或持久化存储之上的有状态的、增加的处理。Trident有一致的、仅一次的语义,所以很容易推出Trident topologies。(最后这句不是很理解,请参考原文)。

示例

这个例子会做两件事:
1.从输入句子流中计算单词的数量
2.实现查询:给出单词列表,返回单词数量之和

这个例子从下面的数据源中读取无限的句子流:

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"));
spout.setCycle(true);

下面是做单词计数的代码:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

我们一行一行的看代码。第一行,TridentTopology对象被创建,暴露出构成Trident计算的接口。TridentTopology 有一个方法newStream,读取输入,创建一个新的流。在这里,输入源就是之前定义的FixedBatchSpout。输入源也可以是Kestrel或Kafka这样的消息队列。对每个输入源,Trident跟踪一个较小数量的状态(关于已消费消息的元数据),状态信息存放在zookeeper中。zookeeper的节点根据字符串"spout1"来确定状态元数据的存放位置。

Trident以小批量tuples的形式处理流,例如,输入的句子流可能会被拆分成batchs,像这样:

batched-stream.png

batchs的大小与输入流的大小挂钩,输入流吞吐量大,batchs就会大。

Trident提供了成熟的API来处理这些小型batchs。API和Pig或Cascading的很像:可以做group by's, joins, aggregations, 运行functions, 运行filters等等。当然,孤立的处理每个小型batch没什么意义,所以Trident提供了多个batchs聚合及持久化的函数——包括内存, Memcached, Cassandra,等各种存储。最后,Trident有一流的查询实时状态源的函数,状态可以被Trident 更新(就像这个例子),或者也可能是独立的状态源。

回到例子,spout发出"sentence"流,topology 定义的下一行启用了Split函数,把句子转成单词。 Split定义:

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}

topology剩下的部分计算单词数量并持久化存储。首先以"word"字段分组,然后每个分组以Count聚合器持久化聚合。persistentAggregate 函数知道如何存储聚合的结果。这个例子中,单词计数保存在内存中,但是也可以轻易的切换到Memcached, Cassandra等其他持久化。例如切换到Memcached:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
MemcachedState.transactional()

"serverLocations"是Memcached集群的host/port列表。

Trident 很酷的一点是它有完全的容错和“仅一次”处理机制。这会使你的实时处理更易理解(reason about)。当失败和重传发生的时候,Trident 保存的状态使它不会为同一个数据多次更新数据库。

persistentAggregate把流传输给TridentState 对象,这个例子中,TridentState 代表了所有的单词计数,我们将使用它来进行分布式查询。

topology 的下一个部分实现了单词计数的低延时分布式查询。输入是以空格分隔的单词组,输出这些单词的数量总和。这些查询像普通RPC调用的一样执行,不过是以分布式形式。例如:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"

像这种简单查询的延时大概10ms。

topology 的分布式查询实现如下:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容