import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.Consumer;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
/**
- Created by tangchunsong on 2018/7/11.
- 结论:
- topology.join 是将两个stream内的两个batch的tuple进行join,join完了,这两个batch就pass了
- 轮到这两个流的分别的下面的batch进行join了
- 两个batch进行join是:
只有key能join上,才会输出,inner join
*/
public class StreamJoinMain {
public static StormTopology buildTopology() {
FixedBatchSpout spout1 = new FixedBatchSpout(new Fields("key", "value1"), 3, new Values("a", "1"),
new Values("b", "2"), new Values("a", "3"), new Values("a", "4"));
spout1.setCycle(true);//Spout是否循环发送
FixedBatchSpout spout2 = new FixedBatchSpout(new Fields("key", "value2"), 3, new Values("a", "1"),
new Values("b", "2"), new Values("a", "3"), new Values("a", "5"), new Values("a", "6"));
spout2.setCycle(true);//Spout是否循环发送
TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream("spout1", spout1).parallelismHint(2);
Stream stream2 = topology.newStream("spout2", spout2).parallelismHint(2);
topology.join(stream1, new Fields("key"), stream2, new Fields("key"), new Fields("key", "value1", "value2"))
.peek(new Consumer() {
public void accept(TridentTuple input) {
System.out.println(input.toString());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
return topology.build();
}
public static void main(String[] args) {
Config conf = new Config();
conf.setMaxSpoutPending(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCounter", conf, buildTopology());
}
}