转载:https://blog.csdn.net/cjf_wei/article/details/73699805
转载:https://blog.csdn.net/liuxinghao/article/details/68944280
概念
trident是基于Storm进行实时留处理的高级抽象,提供了对实时流4的聚集,投影,过滤等操作,从而大大减少了开发Storm程序的工作量。Trident还提供了针对数据库或则其他持久化存储的有状态的,增量的更新操作的原语。
在storm的0.8版本之后,事务性已经被封装到Trident中。trident提供了一套非常成熟的批处理API来批量处理元组,可以对这些元组执行分组(group by)、连接(join)、聚合(aggregation)、运行函数、运行过滤器等,trident还封装了DRPC功能,同样支持DRPC远程调用。
基本操作
场景:词频统计
FixedBatchSpout
若我们要开发一个对文本中的词频进行统计的程序,使用Storm框架的话我们需要开发三个Storm组件:
1.一个Spout负责收集文本信息并分段,做为sentence字段发送给下游的Bolt
2.一个Bolt将将每段文本粉刺,将分词结果以word字段发送给下游的Bolt
3.一个Bolt对词频进行统计,把统计结果记录在count字段并存储
用trident时,我们使用storm内部定义好的spout----FixedBatchSpout
先看看FixedBatchSpout的创建方法
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),...);
FixedBatchSpout的参数sentence为输出的字段名,数字3表示将发送的元组最多分为3个Batch进行处理,剩余的参数均为spout待发送的内容列表。
FixedBatchSpout继承自IBatchSpout,IBatchSpout是一个非事务Spout,每次发送一个Batch元组。
看看源码
public class FixedBatchSpout implements IBatchSpout {
Fields fields;
List<Object>[] outputs;
int maxBatchSize;
public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
this.fields = fields; // 输出字段
this.outputs = outputs; // 保存至本地, 每个对象都是一个List<Object>
this.maxBatchSize = maxBatchSize; // 该批次最大发射次数,但是不是唯一决定元素
}
int index = 0;
boolean cycle = false;
public void setCycle(boolean cycle) {
this.cycle = cycle;
}
@Override
public void open(Map conf, TopologyContext context) {
index = 0;
}
<br> // trident调用
@Override
public void emitBatch(long batchId, TridentCollector collector) {
//Utils.sleep(2000);
if(index>=outputs.length && cycle) {
index = 0; // 超过下标后,让index归零, 继续循环发送
}
// 在不超过outputs大小的情况下,每次发射一个List<Object>
for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
collector.emit(outputs[index]);
}
}
强调下:这里maxBatchSize设置为3,表示每次最多发送3个List<Value>,但是设置更大,也不会出错,参见上面的代码注释,它要同时满足不超过数组大小,所以不会越界。
创建Trident实例
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout)
.parallelismHint(16)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count"))
.parallelismHint(16);
创建TridentTopology对象topology,这个对象就是Trident计算的入口。
TridentState对象在本例中代表了所有单词的统计,用于对外提供给DRPC服务进行查询。
newStream():newStream方法在拓扑中创建一个新的数据流以便从输入源中读取数据
parallelismHint():设置并行处理的数量
each()配合运行函数(或过滤器):
例如:each(new Fields(“sentence”),new Split(), new Fields(“word”)) //对每个输入的sentence”字段”,调用Split()函数进行处理,并生成新的字段word
例如:each(new Fields(“sentence”),new MyFilter()) //对每个输入的sentence”字段”,调用MyFilter()函数进行过滤
groupBy():分组操作,按特定的字段进行分组
persistentAggregate():聚合函数,persistentAggregate实现的是将数据持久到特定的存储介质中
stateQuery():提供对已生成的TridentState对象的查询
过滤器,需实现接口BaseFilter
public class FilterNull extends BaseFilter {
@Override
public boolean isKeep(TridentTuple tuple) {
for(Object o: tuple) {
if(o==null) return false;
}
return true;
}
}
创建DRPC流
topology.newDRPCStream("words")
.each(new Fields("args"), new Split(), new Fields("word")) // 6
.groupBy(new Fields("word")) // 7
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) // 8
.each(new Fields("count"), new FilterNull()) // 9
.aggregate(new Fields("count"), new Sum(), new Fields("sum")); // 10
使用同一个TridentTopology对象来创建DRPC流,命名该函数为words,函数名与DRPCClient执行时的第一个参数名称相同。每个DRPC请求都是一个小的批量作业,将请求的单个tuple作为输入,tuple中包含名为args的字段,args中包含了客户端的请求参数。在这个例子中,请求参数就是“cat dog the man”。
行8,stateQuery方法用于查询第一部分生成的TridentState对象。MapGet将被执行,根据输入的单词,查询单词的数量。因为DRPC流的分组方式与TridentState分组方式相同(都是通过word字段),所以每个单词查询会被自动路由到该单词的TridentState对象的分区。
完整代码
//TridentWordCount.java
package org.apache.storm.starter.trident;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.operation.builtin.FilterNull;
import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.operation.builtin.Sum;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.TridentTuple;
public class TridentWordCount {
public static class Split extends BaseFunction {
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
String sentence = tuple.getString(0);
for (String word : sentence.split(" ")) {
collector.emit(new Values(word));
}
}
}
public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
new Values("how many apples can you eat"), new Values("to be or not to be the person"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout) //newStream方法在拓扑中创建一个新的数据流以便从输入源(FixedBatchSpout)中读取数据
.parallelismHint(16)
.each(new Fields("sentence"),new Split(), new Fields("word")) //each(),对每个输入的sentence"字段",调用Split()函数进行处理
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count"))
.parallelismHint(16);
topology.newDRPCStream("words", drpc) //以words作为函数名,对应于drpc.execute("words", "cat the dog jumped")中的words名
.each(new Fields("args"), new Split(), new Fields("word"))//对于输入参数args,使用Split()方法进行切分,并以word作为字段发送
.groupBy(new Fields("word"))//对word字段进行重新分区,保证相同的字段落入同一个分区
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"),new FilterNull())//使用FilterNull()方法过滤count字段的数据(过滤没有统计到的单词)
.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setMaxSpoutPending(20);
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
for (int i = 0; i < 100; i++) {
//通过drpc调用words方法查询cat/the/dog/jumped在wordcounts的结果中
//查询并返回个数,并把个数过滤再相加,最后结果是 6
System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
Thread.sleep(1000);
}
} else {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
}
}
}
运行
将代码打成jar包,提交storm拓扑,可以通过storm日志 ./storm/logs/workers-artifacts/* 观察实时的统计;
也可以通过DRPC,远程查询相关字段的统计
DRPC客户端 查询
import backtype.storm.utils.DRPCClient;
public class TestDRPC {
public static void main(String[] args) throws Exception{
// TODO Auto-generated method stub
DRPCClient client = new DRPCClient("localhost",3772);//3772是drpc对外默认的服务端口
System.out.println("DRPC result:" + client.execute("words", "the man storm"));
}
}
storm会切分查询列表”the man storm”(each(new Fields(“args”), new Split(), new Fields(“word”))),并通过stateQuery进行查询