storm之trident的概念及用法(FixedBatchSpout)

转载:https://blog.csdn.net/cjf_wei/article/details/73699805
转载:https://blog.csdn.net/liuxinghao/article/details/68944280

概念

trident是基于Storm进行实时留处理的高级抽象,提供了对实时流4的聚集,投影,过滤等操作,从而大大减少了开发Storm程序的工作量。Trident还提供了针对数据库或则其他持久化存储的有状态的,增量的更新操作的原语。

在storm的0.8版本之后,事务性已经被封装到Trident中。trident提供了一套非常成熟的批处理API来批量处理元组,可以对这些元组执行分组(group by)、连接(join)、聚合(aggregation)、运行函数、运行过滤器等,trident还封装了DRPC功能,同样支持DRPC远程调用。

基本操作

场景:词频统计

FixedBatchSpout

若我们要开发一个对文本中的词频进行统计的程序,使用Storm框架的话我们需要开发三个Storm组件:

1.一个Spout负责收集文本信息并分段,做为sentence字段发送给下游的Bolt

2.一个Bolt将将每段文本粉刺,将分词结果以word字段发送给下游的Bolt

3.一个Bolt对词频进行统计,把统计结果记录在count字段并存储

用trident时,我们使用storm内部定义好的spout----FixedBatchSpout
先看看FixedBatchSpout的创建方法

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),...);

FixedBatchSpout的参数sentence为输出的字段名,数字3表示将发送的元组最多分为3个Batch进行处理,剩余的参数均为spout待发送的内容列表。
FixedBatchSpout继承自IBatchSpout,IBatchSpout是一个非事务Spout,每次发送一个Batch元组。

看看源码

public class FixedBatchSpout implements IBatchSpout {
 
    Fields fields;
    List<Object>[] outputs;
    int maxBatchSize;
     
    public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
        this.fields = fields; // 输出字段
        this.outputs = outputs;  // 保存至本地, 每个对象都是一个List<Object>
        this.maxBatchSize = maxBatchSize; //  该批次最大发射次数,但是不是唯一决定元素
    }
     
    int index = 0;
    boolean cycle = false;
     
    public void setCycle(boolean cycle) {
        this.cycle = cycle;
    }
     
    @Override
    public void open(Map conf, TopologyContext context) {
        index = 0;
    }
    <br>    // trident调用
    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        //Utils.sleep(2000);
        if(index>=outputs.length && cycle) {
            index = 0;  // 超过下标后,让index归零, 继续循环发送
        }
 
       //  在不超过outputs大小的情况下,每次发射一个List<Object>
        for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
            collector.emit(outputs[index]);
        }
    }

强调下:这里maxBatchSize设置为3,表示每次最多发送3个List<Value>,但是设置更大,也不会出错,参见上面的代码注释,它要同时满足不超过数组大小,所以不会越界。

创建Trident实例

TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout) 
    .parallelismHint(16)
    .each(new Fields("sentence"),new Split(), new Fields("word"))
    .groupBy(new Fields("word"))
    .persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count"))
    .parallelismHint(16);

创建TridentTopology对象topology,这个对象就是Trident计算的入口。
TridentState对象在本例中代表了所有单词的统计,用于对外提供给DRPC服务进行查询。
newStream():newStream方法在拓扑中创建一个新的数据流以便从输入源中读取数据
parallelismHint():设置并行处理的数量
each()配合运行函数(或过滤器):
例如:each(new Fields(“sentence”),new Split(), new Fields(“word”)) //对每个输入的sentence”字段”,调用Split()函数进行处理,并生成新的字段word
例如:each(new Fields(“sentence”),new MyFilter()) //对每个输入的sentence”字段”,调用MyFilter()函数进行过滤
groupBy():分组操作,按特定的字段进行分组
persistentAggregate():聚合函数,persistentAggregate实现的是将数据持久到特定的存储介质中
stateQuery():提供对已生成的TridentState对象的查询
过滤器,需实现接口BaseFilter

public class FilterNull extends BaseFilter {
    @Override
    public boolean isKeep(TridentTuple tuple) {
        for(Object o: tuple) {
            if(o==null) return false;
        }
        return true;
    }
}

创建DRPC流

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word")) // 6
       .groupBy(new Fields("word")) // 7
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")) // 8
       .each(new Fields("count"), new FilterNull()) // 9
       .aggregate(new Fields("count"), new Sum(), new Fields("sum")); // 10

使用同一个TridentTopology对象来创建DRPC流,命名该函数为words,函数名与DRPCClient执行时的第一个参数名称相同。每个DRPC请求都是一个小的批量作业,将请求的单个tuple作为输入,tuple中包含名为args的字段,args中包含了客户端的请求参数。在这个例子中,请求参数就是“cat dog the man”。

行8,stateQuery方法用于查询第一部分生成的TridentState对象。MapGet将被执行,根据输入的单词,查询单词的数量。因为DRPC流的分组方式与TridentState分组方式相同(都是通过word字段),所以每个单词查询会被自动路由到该单词的TridentState对象的分区。

完整代码

//TridentWordCount.java
package org.apache.storm.starter.trident;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.LocalDRPC;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.operation.builtin.FilterNull;
import org.apache.storm.trident.operation.builtin.MapGet;
import org.apache.storm.trident.operation.builtin.Sum;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.tuple.TridentTuple;

public class TridentWordCount {

    public static class Split extends BaseFunction {
    @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String sentence = tuple.getString(0);
            for (String word : sentence.split(" ")) {
                collector.emit(new Values(word));
            }
        }
    }

    public static StormTopology buildTopology(LocalDRPC drpc) {
        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"),
            new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"),
            new Values("how many apples can you eat"), new Values("to be or not to be the person"));
        spout.setCycle(true);

        TridentTopology topology = new TridentTopology();
        TridentState wordCounts = topology.newStream("spout1", spout) //newStream方法在拓扑中创建一个新的数据流以便从输入源(FixedBatchSpout)中读取数据
            .parallelismHint(16)
            .each(new Fields("sentence"),new Split(), new Fields("word")) //each(),对每个输入的sentence"字段",调用Split()函数进行处理
            .groupBy(new Fields("word"))
            .persistentAggregate(new MemoryMapState.Factory(),new Count(), new Fields("count"))
            .parallelismHint(16);

        topology.newDRPCStream("words", drpc) //以words作为函数名,对应于drpc.execute("words", "cat the dog jumped")中的words名
            .each(new Fields("args"), new Split(), new Fields("word"))//对于输入参数args,使用Split()方法进行切分,并以word作为字段发送
            .groupBy(new Fields("word"))//对word字段进行重新分区,保证相同的字段落入同一个分区
            .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
            .each(new Fields("count"),new FilterNull())//使用FilterNull()方法过滤count字段的数据(过滤没有统计到的单词)
            .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

        return topology.build();
    }

    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setMaxSpoutPending(20);
        if (args.length == 0) {
            LocalDRPC drpc = new LocalDRPC();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
            for (int i = 0; i < 100; i++) {
 //通过drpc调用words方法查询cat/the/dog/jumped在wordcounts的结果中
//查询并返回个数,并把个数过滤再相加,最后结果是 6
                System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped"));
                Thread.sleep(1000);
            }
        } else {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
        }
    }
}

运行

将代码打成jar包,提交storm拓扑,可以通过storm日志 ./storm/logs/workers-artifacts/* 观察实时的统计;
也可以通过DRPC,远程查询相关字段的统计
DRPC客户端 查询

import backtype.storm.utils.DRPCClient;

public class TestDRPC {

    public static void main(String[] args) throws Exception{
        // TODO Auto-generated method stub
        DRPCClient client = new DRPCClient("localhost",3772);//3772是drpc对外默认的服务端口
        System.out.println("DRPC result:" + client.execute("words", "the man storm"));
    }
}

storm会切分查询列表”the man storm”(each(new Fields(“args”), new Split(), new Fields(“word”))),并通过stateQuery进行查询

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351