原文链接:http://storm.apache.org/releases/1.0.2/Trident-tutorial.html
本人原创翻译,转载请注明出处
Trident是基于Storm做实时计算的高等级的抽象。它允许你无缝集成高吞吐量(每秒100万级别的消息)、无状态流处理、低延时的分布式查询。 如果你熟悉Pig或Cascading等高级别的批处理工具,就会很熟悉Trident的概念——Trident有joins, aggregations, grouping, functions, and filters。除此以外,Trident原生支持任何数据库或持久化存储之上的有状态的、增加的处理。Trident有一致的、仅一次的语义,所以很容易推出Trident topologies。(最后这句不是很理解,请参考原文)。
示例
这个例子会做两件事:
1.从输入句子流中计算单词的数量
2.实现查询:给出单词列表,返回单词数量之和
这个例子从下面的数据源中读取无限的句子流:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
下面是做单词计数的代码:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(6);
我们一行一行的看代码。第一行,TridentTopology对象被创建,暴露出构成Trident计算的接口。TridentTopology 有一个方法newStream,读取输入,创建一个新的流。在这里,输入源就是之前定义的FixedBatchSpout。输入源也可以是Kestrel或Kafka这样的消息队列。对每个输入源,Trident跟踪一个较小数量的状态(关于已消费消息的元数据),状态信息存放在zookeeper中。zookeeper的节点根据字符串"spout1"来确定状态元数据的存放位置。
Trident以小批量tuples的形式处理流,例如,输入的句子流可能会被拆分成batchs,像这样:
batchs的大小与输入流的大小挂钩,输入流吞吐量大,batchs就会大。
Trident提供了成熟的API来处理这些小型batchs。API和Pig或Cascading的很像:可以做group by's, joins, aggregations, 运行functions, 运行filters等等。当然,孤立的处理每个小型batch没什么意义,所以Trident提供了多个batchs聚合及持久化的函数——包括内存, Memcached, Cassandra,等各种存储。最后,Trident有一流的查询实时状态源的函数,状态可以被Trident 更新(就像这个例子),或者也可能是独立的状态源。
回到例子,spout发出"sentence"流,topology 定义的下一行启用了Split函数,把句子转成单词。 Split定义:
public class Split extends BaseFunction {
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for(String word: sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
topology剩下的部分计算单词数量并持久化存储。首先以"word"字段分组,然后每个分组以Count聚合器持久化聚合。persistentAggregate 函数知道如何存储聚合的结果。这个例子中,单词计数保存在内存中,但是也可以轻易的切换到Memcached, Cassandra等其他持久化。例如切换到Memcached:
.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))
MemcachedState.transactional()
"serverLocations"是Memcached集群的host/port列表。
Trident 很酷的一点是它有完全的容错和“仅一次”处理机制。这会使你的实时处理更易理解(reason about)。当失败和重传发生的时候,Trident 保存的状态使它不会为同一个数据多次更新数据库。
persistentAggregate把流传输给TridentState 对象,这个例子中,TridentState 代表了所有的单词计数,我们将使用它来进行分布式查询。
topology 的下一个部分实现了单词计数的低延时分布式查询。输入是以空格分隔的单词组,输出这些单词的数量总和。这些查询像普通RPC调用的一样执行,不过是以分布式形式。例如:
DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"
像这种简单查询的延时大概10ms。
topology 的分布式查询实现如下:
topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));