模拟一片文章,计算其个单词出现的次数
【Spout】数据源
public class WordSpout implements IRichSpout {
private static final long serialVersionUID = 1l;
private SpoutOutputCollector collector;
private int index = 0 ;
//模拟静态数据
private String[] sentences = {
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don`t have a cow man",
"i don`t think i like fleas"
};
@Override
public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
/**
* 轮询发数组中的某个元素
*/
public void nextTuple() {
this.collector.emit(new Values(sentences[index]));
index++;
if(index >= sentences.length){
index = 0;
}
Utils.waitForSeconds(1);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
public void ack(Object arg0) {
}
public void activate() {
}
/**
* 停止之前
*/
public void close() {
System.out.println("spout停止啦!!!!");
}
public void deactivate() {
}
public void fail(Object arg0) {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
【bolt1】空格切分
public class WordSplitBolt implements IRichBolt {
private OutputCollector collector;
//初始化方法
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public void cleanup() {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
【bolt2】汇总map
public class WordCountBolt implements IRichBolt {
private OutputCollector collector;
private HashMap<String,Long> counts = null;//单词 - 个数
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.counts = new HashMap<String , Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if(count == null){
count = 0l;
}
count++;
this.counts.put(word, count);
this.collector.emit(new Values(word,count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
public void cleanup() {
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
【bolt3】输出结果
public class WordReportBolt implements IRichBolt {
private HashMap<String,Long> counts = null;
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String,Long>();
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = tuple.getLongByField("count");
this.counts.put(word, count);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
/**
* 停止Bolt
*/
public void cleanup() {
System.out.println("-----------------FINAL COUNTS----------------");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key : keys){
System.out.println(key + " : " + this.counts.get(key));
}
System.out.println("-----------------------------------------------");
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
【topology】逻辑组件
public class WordTopology {
//定义常量
private static final String WORD_SPOUT_ID = "word-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
//实例化对象
WordSpout spout = new WordSpout();
WordSplitBolt splitBolt = new WordSplitBolt();
WordCountBolt countBolt = new WordCountBolt();
WordReportBolt reportBolt = new WordReportBolt();
//构建拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(WORD_SPOUT_ID, spout);
builder.setBolt(SPLIT_BOLT_ID, splitBolt,5).shuffleGrouping(WORD_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt,5).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
builder.setBolt(REPORT_BOLT_ID, reportBolt,10).globalGrouping(COUNT_BOLT_ID);
Config config = new Config();
config.setDebug(false);
LocalCluster cluster = new LocalCluster();
//1:本地运行
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
Utils.waitForSeconds(100);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
//2:集群运行
//StormSubmitter.submitTopology(TOPOLOGY_NAME, config,builder.createTopology());
}
}
本地运行结果