Trident编程中的=数据分组策略演示
代码
public class StrategyTopology {
public static class WriteFunction extends BaseFunction {
private static final Log log = LogFactory.getLog(WriteBolt.class);
@Override
public void execute(TridentTuple tuple, TridentCollector collector) {
// 获取上一个组件所声明的Filed
String text = tuple.getStringByField("sub");
//打印结果
System.out.println(text);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static StormTopology buildTopology(){
TridentTopology topology = new TridentTopology();
//设定数据源
@SuppressWarnings("unchecked")
FixedBatchSpout spout = new FixedBatchSpout(
new Fields("sub"), //声明输出的域字段为“sub”
4, //设置逼出来大小为4
//设置数据源内容
new Values("java"),
new Values("python"),
new Values("php"),
new Values("c++"),
new Values("ruby")
);
//指定是否循环
spout.setCycle(true);
//指定输入源spout
Stream inputStream = topology.newStream("spout", spout);
/**
* 要实现sqout - bolt的模式 在trident里使用each来完成
* each方法参数:
* 1,输入源参数名称
* 2,需要流转执行的function对象(就是bolt):new WriteFunction(),此function要求自己编写类
* 3,指定function对象里的输出参数名称,没有则不在继续流向
* */
inputStream.
//随机分组:shuffle
shuffle().
//分区分组:partitionBy
//partitionBy(new Fields("sub")).
//全局分组:global
//global().
//广播分组:broadcast
//broadcast().
each(new Fields("sub"), new WriteFunction(),new Fields()).parallelismHint(4);//parallelismHint设置并行度
return topology.build();
}
public static void main(String[] args) throws Exception {
Config conf = new Config();
conf.setNumWorkers(2);
conf.setMaxSpoutPending(20);
if(args.length == 0){
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("trident-filter", conf, buildTopology());
Thread.sleep(10000);
cluster.shutdown();
}else{
StormSubmitter.submitTopology(args[0], conf, buildTopology());
}
}
}