Storm用来实时处理数据,特点:低延迟、高可用、分布式、可扩展、数据不丢失。提供简单容易理解的接口,便于开发。本文讲述在Strom平台开发一个WordCount的步骤
主要内容:
- 1.引入依赖
- 2.编写代码
- 3.提交测试
1.引入依赖
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.1</version>
</dependency>
2.编写代码
2.1.ReadFileSpolt
每隔500ms产生一条"i love you"发送给下一个bolt,其中指定键为"biaobai"
public class ReadFileSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
/**
* conf 应用程序能够读取的配置文件
* context 应用程序的上下文
* collector Spout输出的数据丢给SpoutOutputCollector
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//1、Kafka 连接 / MYSQL 连接 /Redis 连接
//TODO
//2、将SpoutOutputCollector复制给成员变量
this.collector = collector;
}
/**
* storm框架有个while循环,一直在nextTuple
*/
@Override
public void nextTuple() {
// 发送数据,使用collector.emit方法
collector.emit(new Values("i love u"));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("biaobai"));
}
}
2.2.SentenceSplitBolt.java
接收键为"biaobai"的数据并切分单词,统计每个单词的数量发送给下一个bolt,其中键为"word"、"num"
public class SentenceSplitBolt extends BaseRichBolt {
private OutputCollector collector;
/**
* 初始化方法
* Map stormConf 应用能够得到的配置文件
* TopologyContext context 上下文 一般没有什么用
* OutputCollector collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
//todo 连接数据 连接redis 连接hdfs
}
/**
* 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
*/
@Override
public void execute(Tuple input) {
// String sentence = input.getString(0);
// 底层先通过 biaobai 这个字段在map中找到对应的index角标值,然后再valus中获取对应数据。
String sentence = input.getStringByField("biaobai");
// TODO 切割
String[] strings = sentence.split(" ");
for (String word : strings) {
// TODO 输出数据
collector.emit(new Values(word, 1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明 输出的是什么字段
declarer.declare(new Fields("word", "num"));
}
}
2.3.WordCountBolt
利用HashMap统计缓存单词的总数量
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private Map<String, Integer> wordCountMap;
/**
* 初始化方法
* Map stormConf 应用能够得到的配置文件
* TopologyContext context 上下文 一般没有什么用
* OutputCollector collector 数据收集器
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
//todo 连接数据 连接redis 连接hdfs
wordCountMap = new HashMap<String, Integer>();
}
/**
* 有个while不停的调用execute方法,每次调用都会发一个数据进行来。
*/
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer num = input.getIntegerByField("num");
// 先判断这个单词是否出现过
if (wordCountMap.containsKey(word)) {
Integer oldNum = wordCountMap.get(word);
wordCountMap.put(word, oldNum + num);
} else {
wordCountMap.put(word, num);
}
System.out.println("=======================");
System.out.println(wordCountMap);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 声明 输出的是什么字段
declarer.declare(new Fields("fenshou"));
}
}
2.4.WordCountTopology
将Spolt、Bolt组装成Topology
public class WordCountTopology {
public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
//1、创建一个job(topology)
TopologyBuilder topologyBuilder = new TopologyBuilder();
//2、设置job的详细内容
topologyBuilder.setSpout("ReadFileSpout",new ReadFileSpout(),1);
topologyBuilder.setBolt("SentenceSplitBolt",new SentenceSplitBolt(),1).shuffleGrouping("ReadFileSpout");
topologyBuilder.setBolt("WordCountBolt",new WordCountBolt(),1).shuffleGrouping("SentenceSplitBolt");
//准备配置项
Config config = new Config();
config.setDebug(false);
//3、提交job
//提交由两种方式:一种本地运行模式、一种集群运行模式。
if (args != null && args.length > 0) {
//运行集群模式
config.setNumWorkers(1);
StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
} else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordcount", config, topologyBuilder.createTopology());
}
}
}
在本地测试直接run这个Topology就可以看到结果了
本地结果
3.提交到集群测试
storm jar storm-1.0-SNAPSHOT-jar-with-dependencies.jar me.jinkun.storm.wc.WordCountTopology wordcount
集群中的wordcount