Example 1 : Word Count
Every time you start a new project, the first thing to do is drawing your topology blueprint.
-
word count topology data flow
1.Sentence spout :
{ "sentence":"my dog has fleas" }
2.Split sentences bolt :
{ "word" : "my" }
{ "word" : "dog" }
{ "word" : "has" }
{ "word" : "fleas" }
3.Word count bolt:
{ "word" : "dog", "count" : 5 }- report bolt: for now, we will just use the a reddis source code form udacity.
Implementing the sentence spout
public class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
};
String sentence = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
- Implementing the split sentence bolt
public class SplitSentenceBolt extends BaseRichBolt{
private OutputCollector collector;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
} }
- implement the word count bolt:
public class WordCountBolt extends BaseRichBolt{
private OutputCollector collector;
private HashMap<String, Long> counts = null;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if(count == null){
count = 0L; }
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
5 . Implement report bolt
public class ReportBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
public void prepare(Map config, TopologyContext context,
OutputCollector collector) {
this.counts = new HashMap<String, Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
this.counts.put(word, count);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
public void cleanup() {
System.out.println("--- FINAL COUNTS ---");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for (String key : keys) {
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("--------------");
}
}
- Combine this and implement topology
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout);
// SentenceSpout --> SplitSentenceBolt
builder.setBolt(SPLIT_BOLT_ID, splitBolt)
.shuffleGrouping(SENTENCE_SPOUT_ID);
// SplitSentenceBolt --> WordCountBolt
builder.setBolt(COUNT_BOLT_ID, countBolt)
.fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
// WordCountBolt --> ReportBolt
builder.setBolt(REPORT_BOLT_ID, reportBolt)
.globalGrouping(COUNT_BOLT_ID);
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.
createTopology());
} }
waitForSeconds(10);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
- output:
--- FINAL COUNTS ---
a : 2726
ate : 2722
beverages : 2723
cold : 2723
cow : 2726
dog : 5445
don't : 5444
fleas : 5451
has : 2723
have : 2722
homework : 2722
i : 8175
like : 5449
man : 2722
my : 5445
the : 2727
think : 2722
--------------
Example 2: Trident Topologies
The code is like this:
public class OutbreakDetectionTopology {
public static StormTopology buildTopology() {
TridentTopology topology = new TridentTopology();
DiagnosisEventSpout spout = new DiagnosisEventSpout();
Stream inputStream = topology.newStream("event", spout);
inputStream
.each(new Fields("event"), new DiseaseFilter()))
.each(new Fields("event"), new CityAssignment(), new Fields("city"))
.each(new Fields("event", "city"), new HourAssignment(), new Fields("hour", "cityDiseaseHour"))
.groupBy(new Fields("cityDiseaseHour"))
.persistentAggregate(new OutbreakTrendFactory(),
new Count(),
new Fields("count"))
.newValuesStream()
// Detect an outbreak
.each(new Fields("cityDiseaseHour", "count"),
new OutbreakDetector(), new Fields("alert"))
// Dispatch the alert
.each(new Fields("alert"),
new DispatchAlert(), new Fields());
}
}
Exercise
Set up
- Install VirtualBox for your operating system:https://www.virtualbox.org/wiki/Downloads
- Install Vagrant
- git clone https://github.com/Udacity/ud381
- vagrant up
- vagrant ssh
- open another terminal, and vagrant ssh
- enter the /viz folder, and run
python app.py
(you can build your own report bolt like above one instead of using this) - Change your source file and display the word count.
- Try different group streaming method.