storm Trident编程的分组策略

Trident编程中的=数据分组策略演示

代码

public class StrategyTopology {
    
    public static class WriteFunction extends BaseFunction {
        private static final Log log = LogFactory.getLog(WriteBolt.class);
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            // 获取上一个组件所声明的Filed
            String text = tuple.getStringByField("sub");
            //打印结果
            System.out.println(text);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    public static StormTopology buildTopology(){
        TridentTopology topology = new TridentTopology();
        //设定数据源
        @SuppressWarnings("unchecked")
        FixedBatchSpout spout = new FixedBatchSpout(
                new Fields("sub"), //声明输出的域字段为“sub”
                4,                  //设置逼出来大小为4
                //设置数据源内容
                new Values("java"),
                new Values("python"),
                new Values("php"),
                new Values("c++"),
                new Values("ruby")
                );
        //指定是否循环
        spout.setCycle(true);
        //指定输入源spout
        Stream inputStream = topology.newStream("spout", spout);
        /**
         * 要实现sqout - bolt的模式 在trident里使用each来完成
         * each方法参数:
         *      1,输入源参数名称
         *      2,需要流转执行的function对象(就是bolt):new WriteFunction(),此function要求自己编写类
         *      3,指定function对象里的输出参数名称,没有则不在继续流向
         * */
        inputStream.
        //随机分组:shuffle
        shuffle().
        //分区分组:partitionBy
        //partitionBy(new Fields("sub")).
        //全局分组:global
        //global().
        //广播分组:broadcast
        //broadcast().
        each(new Fields("sub"), new WriteFunction(),new Fields()).parallelismHint(4);//parallelismHint设置并行度
        return topology.build();
    }

    
    public static void main(String[] args) throws Exception {
        Config conf = new Config();
        conf.setNumWorkers(2);
        conf.setMaxSpoutPending(20);
        if(args.length == 0){
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("trident-filter", conf, buildTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }else{
            StormSubmitter.submitTopology(args[0], conf, buildTopology());
        }
    }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 声明 本文首发于个人技术博客,转载请注明出处,本文链接:http://qifuguang.me/2015/11/2...
    winwill2012阅读 2,220评论 1 15
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 173,523评论 25 708
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,767评论 18 399
  • 朋友的姐姐毕业两年了,现在在当地的一家大型医院工作,用医疗仪器给患者做身体检查。医院的福利看似不错,说出去也体面,...
    颜浸阅读 2,258评论 4 23
  • 日更第5天,写完了知识管理,来聊聊跑步那些事儿吧。 半年前,我还奉行着“能坐着就不站着,能躺着就不坐着”的人生信条...
    小中医左左阅读 377评论 1 4