storm_Trident

<h2>简介:</h2>
Trident 是 Storm 的一种高度抽象的实时计算模型,它可以将高吞吐量(每秒百万级)数据输入、有状态的流式处理与低延时的分布式查询无缝结合起来。如果你了解 Pig 或者 Cascading 这样的高级批处理工具,你就会发现他们和 Trident 的概念非常相似。Trident 同样有联结(join)、聚合(aggregation)、分组(grouping)、函数(function)以及过滤器(filter)这些功能。Trident 为数据库或者其他持久化存储上层的状态化、增量式处理提供了基础原语。由于 Trident 有着一致的、恰好一次的语义,因此推断出 Trident 拓扑的状态也是一件很容易的事。
<b>Trident 流程图:</b>

Paste_Image.png

就按照流程图来讲吧:
<h2>Trident Spouts</h2>
查看官方demo中代码:
<pre>
TridentTopology topology = new TridentTopology();
topology.newStream("myspoutid", new MyRichSpout());
</pre>
查看newStream()方法源代码:
<pre>
//上节中BaseRichSpout类就是实现了IRichSpout
public Stream newStream(String txId, IRichSpout spout) {
return newStream(txId, new RichSpoutBatchExecutor(spout));
}
//非事务型 spout,每次会输出一个 batch 的 tuple.接下来的demo会用到
public Stream newStream(String txId, IBatchSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}
//这是最常用的 API,支持事务型和模糊事务型的语义实现。不过一般会根据需要使用它的某个已有的实现,而不是直接实现该接口。
public Stream newStream(String txId, ITridentSpout spout) {
Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
return addNode(n);
}
//可以从分布式数据源(比如一个集群或者 Kafka 服务器)读取数据的事务型 spout。
public Stream newStream(String txId, IPartitionedTridentSpout spout) {
return newStream(txId, new PartitionedTridentSpoutExecutor(spout));
}
//可以从分布式数据源读取数据的模糊事务型 spout。
public Stream newStream(String txId, IOpaquePartitionedTridentSpout spout) {
return newStream(txId, new OpaquePartitionedTridentSpoutExecutor(spout));
}
</pre>
<h2>Trident Bolts</h2>
主要有 5 类操作:
<ol>
<li>针对每个小分区(partition)的本地操作,这类操作不会产生网络数据传输</li>
<li>针对一个数据流的重新分区操作,这类操作不会改变数据流中的内容,但是会产生一定的网络传输</li>
<li>通过网络数据传输进行的聚合操作</li>
<li>针对数据流的分组操作</li>
<li>融合与联结操作</li>
</ul>
<h4>本地分区操作</h4>
<b>函数:</b>
函数负责接收一个输入域的集合并选择输出或者不输出 tuple。输出 tuple 的域会被添加到原始数据流的输入域中。如果一个函数不输出 tuple,那么原始的输入 tuple 就会被直接过滤掉。否则,每个输出 tuple 都会复制一份输入 tuple 。假设你有下面这样的函数:
<pre>
public class Split extends BaseFunction {

@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
    for(String word: tuple.getString(0).split(" ")) {
        if(word.length() > 0) {
            collector.emit(new Values(word));
        }
    }
}

}

Paste_Image.png

</pre>

上图是源码中提供的一个Split函数用于按空格进行分割,分割完成以后继续延续原来的输出。
<b>过滤器</b>
过滤器负责判断输入的 tuple 是否需要保留,直接改变stream 的内容:
<pre>

public class FilterNull extends BaseFilter {
@Override
public boolean isKeep(TridentTuple tuple) {
for(Object o: tuple) {
if(o==null) return false;
}
return true;
}
}

Paste_Image.png

</pre>
上图就是一个判断tuple是否为空的filter,如果为false的则不继续留在stream流中
<b>partitionAggregate</b>
会在一批 tuple 的每个分区上执行一个指定的功能操作。与上面的函数不同,由 partitionAggregate
发送出的 tuple 会将输入 tuple 的域替换。以下面这段代码为例:
官方给出的代码:
<pre>mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))</pre>
假设stream 中tuple的内容如下:
<pre>
Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]
</pre>
执行上面的分区聚合的结果为:
<pre>
Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]
</pre>
解释一下上面的函数功能为:分区聚合内容对分区中的内容求和即sum()然后输出key为sum的bolt流。
查看sum函数源代码:
<pre>
public class Sum implements CombinerAggregator<Number> {
@Override
public Number init(TridentTuple tuple) {
return (Number) tuple.getValue(0);
}
@Override
public Number combine(Number val1, Number val2) {
return Numbers.add(val1, val2);
}
@Override
public Number zero() {
return 0;
}
}

Paste_Image.png

CombinerAggregator类只提供了sum和count求和函数
</pre>
Storm 有三个用于定义聚合器的接口:CombinerAggregator、ReducerAggregator
、 Aggregator。ReducerAggregator
<b>融合(Merge)与联结(join)</b>
Trident API 的最后一部分是联结不同的数据流的操作。联结数据流最简单的方式就是将所有的数据流融合到一个流中。你可以使用 TridentTopology 的 merge 方法实现该操作,比如这样:
<pre>
topology.merge(stream1, stream2, stream3);
</pre>
Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。

联结数据流的另外一种方法是使用 join。像 SQL 那样的标准 join 操作只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每个从 spout 中输出的小 batch。

下面是两个流的 join 操作的示例,其中一个流含有 [“key”, “val1″, “val2″] 域,另外一个流含有 [“x”, “val1″] 域:
<pre>
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
</pre>
上面的例子会使用 “key” 和 “x” 作为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,因为输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含:

join 域的列表。在这个例子里,输出的 “key” 域与 stream1 的 “key” 域以及 stream2 的 “x” 域对应。
来自所有流的非 join 域的列表。这个列表是按照传入 join 方法的流的顺序排列的。在这个例子里,“ a” 和 “b” 域与 stream1 的 “val1” 和 “val2” 域对应;而 “c” 域则与 stream2 的 “val1” 域相对应。
在对不同的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每个 spout 发送出的 tuple。

最后的结果查询你可以使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。然后就可以使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。
我想还是上个demo吧,要不然都要睡过去啦:
<pre>
public class Print extends BaseFilter {
//分区索引号从0开始标示
private int partitionIndex;
//总的分区数
private int numPartitions;
@Override
public void prepare(Map conf, TridentOperationContext context) {
//获取当前分区以及总的分区数
this.partitionIndex = context.getPartitionIndex();
this.numPartitions = context.numPartitions();
}
//过滤条件,其实这边就是用来打印输出,对最后的tuple元数据没有任何改变
@Override
public boolean isKeep(TridentTuple tuple) { System.err.println(String.format("Partition idx: %s out of %s partitions got %s/%s", partitionIndex, numPartitions, tuple.get(0).toString(),tuple.get(1).toString()));
return true;
}
//构造StormTopology
public static StormTopology buildTopology(LocalDRPC drpc) {
//构造一个固定的batch数的spout,这个类代码上面有大概分析过
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 5,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"), new Values("how many apples can you eat"),
new Values("to be or not to be the person"));
//循环发送数据
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
//TridentState对象最终代表了所有的单词的数量。我们会使用这个TridentState对象来实现在计算过程中的进行分布式查询。
TridentState wordCounts = topology.newStream("testSpout", spout)
//对每个tuple内容用空格来分隔,然后通过相同的字符串来分组
.each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word"))
//persistentAggregate函数使用三个并行度(三个线程)对源源不断发送过来数据流做一个总的聚合,对出现的次数累加,然后加结果缓存在当前节点的内存中
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(3);
topology.newDRPCStream("print", drpc)
.stateQuery(wordCounts, new TupleCollectionGet(), new Fields("word", "count"))
.each(new Fields("word", "count"), new Print());
return topology.build();
}
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
Config conf = new Config();
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
for (int i = 0; i < 10; i++) {
drpc.execute("print", "");
Thread.sleep(1000);
}
cluster.deactivate("wordCounter"); cluster.killTopology("wordCounter");
}
}
</pre>
在main方法控制台输出为:
<pre>
Partition idx: 1 out of 3 partitions got the/20
Partition idx: 1 out of 3 partitions got or/4
Partition idx: 1 out of 3 partitions got score/4
Partition idx: 1 out of 3 partitions got moon/4
Partition idx: 1 out of 3 partitions got four/4
Partition idx: 1 out of 3 partitions got over/4
Partition idx: 1 out of 3 partitions got bought/4
Partition idx: 1 out of 3 partitions got can/4
Partition idx: 0 out of 3 partitions got went/8
Partition idx: 0 out of 3 partitions got candy/8
Partition idx: 0 out of 3 partitions got seven/8
Partition idx: 0 out of 3 partitions got jumped/8
Partition idx: 0 out of 3 partitions got ago/8
Partition idx: 0 out of 3 partitions got store/8
Partition idx: 0 out of 3 partitions got cow/8
Partition idx: 0 out of 3 partitions got many/8
Partition idx: 0 out of 3 partitions got years/8
Partition idx: 0 out of 3 partitions got eat/8
Partition idx: 0 out of 3 partitions got person/8
Partition idx: 0 out of 3 partitions got to/24
Partition idx: 0 out of 3 partitions got apples/8
Partition idx: 2 out of 3 partitions got be/24
Partition idx: 2 out of 3 partitions got not/12
Partition idx: 2 out of 3 partitions got some/12
Partition idx: 2 out of 3 partitions got and/24
Partition idx: 2 out of 3 partitions got man/12
Partition idx: 2 out of 3 partitions got how/12
Partition idx: 2 out of 3 partitions got you/12
Partition idx: 2 out of 3 partitions got be/32
Partition idx: 2 out of 3 partitions got not/16
Partition idx: 2 out of 3 partitions got some/16
Partition idx: 2 out of 3 partitions got and/32
Partition idx: 2 out of 3 partitions got man/16
Partition idx: 2 out of 3 partitions got how/16
Partition idx: 2 out of 3 partitions got you/16
Partition idx: 2 out of 3 partitions got be/38
Partition idx: 2 out of 3 partitions got not/19
Partition idx: 2 out of 3 partitions got some/19
Partition idx: 2 out of 3 partitions got and/38
Partition idx: 2 out of 3 partitions got man/19
Partition idx: 2 out of 3 partitions got how/19
Partition idx: 2 out of 3 partitions got you/19
Partition idx: 0 out of 3 partitions got went/23
Partition idx: 0 out of 3 partitions got candy/23
Partition idx: 0 out of 3 partitions got seven/23
Partition idx: 0 out of 3 partitions got jumped/23
Partition idx: 0 out of 3 partitions got ago/23
Partition idx: 0 out of 3 partitions got store/23
Partition idx: 0 out of 3 partitions got cow/23
Partition idx: 0 out of 3 partitions got many/23
Partition idx: 0 out of 3 partitions got years/23
Partition idx: 0 out of 3 partitions got eat/23
Partition idx: 0 out of 3 partitions got person/23
Partition idx: 0 out of 3 partitions got to/69
Partition idx: 0 out of 3 partitions got apples/23
Partition idx: 0 out of 3 partitions got went/25
Partition idx: 0 out of 3 partitions got candy/25
Partition idx: 0 out of 3 partitions got seven/25
Partition idx: 0 out of 3 partitions got jumped/25
Partition idx: 0 out of 3 partitions got ago/25
Partition idx: 0 out of 3 partitions got store/25
Partition idx: 0 out of 3 partitions got cow/25
Partition idx: 0 out of 3 partitions got many/25
Partition idx: 0 out of 3 partitions got years/25
Partition idx: 0 out of 3 partitions got eat/25
Partition idx: 0 out of 3 partitions got person/25
Partition idx: 0 out of 3 partitions got to/75
Partition idx: 0 out of 3 partitions got apples/25
Partition idx: 2 out of 3 partitions got be/56
Partition idx: 2 out of 3 partitions got not/28
Partition idx: 2 out of 3 partitions got some/28
Partition idx: 2 out of 3 partitions got and/56
Partition idx: 2 out of 3 partitions got man/28
Partition idx: 2 out of 3 partitions got how/28
Partition idx: 2 out of 3 partitions got you/28
</pre>
不过对于TridentState 中的数据在分布式存储的环境如何存取的?
<pre>
DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("print", "cat dog the man");
</pre>
整合在我们自己的代码中就需要这么使用了:
<pre>
topology.newDRPCStream("print", drpc)
.stateQuery(wordCounts, new TupleCollectionGet(), new Fields("word", "count"))
.each(new Fields("word", "count"), new Print());
</pre>
查看newDRPCStream源码:
<pre>
public Stream newDRPCStream(String function, ILocalDRPC server) {
DRPCSpout spout;
if(server==null) {
spout = new DRPCSpout(function);
} else {
spout = new DRPCSpout(function, server);
}
return newDRPCStream(spout);
}

Paste_Image.png

发现是一个比较简单的spout
</pre>

最后在main方法中执行execute,就这么跑起来了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容