上一篇 简单看 storm, 主要简单讲解了storm 的集群架构、核心概念、并行度、流分组,本篇利用 storm 结合代码进行单词计数,讲解从代码层面理解入门storm。
直接开撸代码
单词计数简单的实现逻辑:
- 构造一个 Spout,为下游 Bolt 作业提供数据源
- 构造一个 Bolt,处理上游流向数据,进行单词切分
- 构造一个 Bolt,处理上游 Bolt ,进行单词计数
- 将 Spout 、Bolt 组装起来,构建成一个拓扑(Topology)
- 将 Topology 提交到 storm 集群,等待结果
创建名为 storm-wordcount 的普通 maven project
- 在 pom.xml 引入相关类库依赖
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
</dependencies>
- 加入 plugin ,打包
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>test/main/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass></mainClass>
</configuration>
</plugin>
</plugins>
</build>
新建一个 WordCountTopology 类
- 在 WordCountTopology 中编写一个 RandomSentenceSpout 静态内部类,继承实现 BaseRichSpout 抽象类
/**
*
* 编写spout ,继承一个基类,负责从数据源获取数据
* @author bill
* @date 2017年9月16日 下午8:21:46
*/
public static class RandomSentenceSpout extends BaseRichSpout{
private static final long serialVersionUID = 6102239192526611945L;
private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);
private SpoutOutputCollector collector;
private Random random;
/**
* 当一个Task被初始化的时候会调用此open方法,
* 一般都会在此方法中对发送Tuple的对象SpoutOutputCollector和配置对象TopologyContext初始化
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}
/**
* 这个spout类,之前说过,最终会运行在task中,某个worker进程的某个executor线程内部的某个task中
* 那个task会负责去不断的无限循环调用nextTuple()方法
* 只要的话呢,无限循环调用,可以不断发射最新的数据出去,形成一个数据流
*/
public void nextTuple() {
String[] sentences = new String[]{
"I used to watch her from my kitchen widow"
, "she seemed so small as she muscled her way through the crowd of boys on the playground"
, "The school was across the street from our home and I would often watch the kids as they played during recess"
, "A sea of children, and yet tome"
, "she stood out from them all"};
String sentence = sentences[random.nextInt(sentences.length)];
LOGGER.info(" ★★★ 发射 sentence 数据 > {}", sentence);
// 这个values,你可以认为就是构建一个tuple,tuple是最小的数据单位,无限个tuple组成的流就是一个stream,通过 emit 发送数据到下游bolt tuple
this.collector.emit(new Values(sentence));
}
/**
* 用于声明当前Spout的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
* 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游
bolt 中 execute 接收数据 key
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
- 在 WordCountTopology 中编写一个 SplitSentenceBolt 静态内部类,继承实现 BaseRichBolt 抽象类,用于处理上游 Spout 发送过来的数据,这里做句子单词切分
/**
*
* 编写一个bolt,用于切分每个单词,同时把单词发送出去
* @author bill
* @date 2017年9月16日 下午8:27:45
*/
public static class SplitSentenceBolt extends BaseRichBolt{
private static final long serialVersionUID = -4758047349803579486L;
private OutputCollector collector;
/**
* 当一个Task被初始化的时候会调用此prepare方法,对于bolt来说,第一个方法,就是prepare方法
* OutputCollector,这个也是Bolt的这个tuple的发射器,一般都会在此方法中对发送Tuple的对象OutputCollector初始化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的
* 就是说,每次接收到一条数据后,就会交给这个executor方法来执行
* 切分单词
*/
public void execute(Tuple input) {
// 接收上游数据
String sentence = input.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words){
//发射数据
this.collector.emit(new Values(word));
}
}
/**
* 用于声明当前bolt的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
* 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游 bolt 中 execute 接收数据 key
* 定义发射出去的tuple,每个field的名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
- 在 WordCountTopology 中编写一个 WordCountBolt 静态内部类,继承实现 BaseRichBolt 抽象类,用于处理上游 Bolt 发送过来的数据,这里做单词计数
/**
*
* 单词次数统计bolt
* @author bill
* @date 2017年9月16日 下午8:35:00
*/
public static class WordCountBolt extends BaseRichBolt{
private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);
private static final long serialVersionUID = -7114915627898482737L;
private OutputCollector collector;
Map<String,Long> countMap = Maps.newConcurrentMap();
/**
* 当一个Task被初始化的时候会调用此prepare方法,对于bolt来说,第一个方法,就是prepare方法
* OutputCollector,这个也是Bolt的这个tuple的发射器,一般都会在此方法中对发送Tuple的对象OutputCollector初始化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* 这是Bolt中最关键的一个方法,对于Tuple的处理都可以放到此方法中进行。具体的发送也是通过emit方法来完成的
* 就是说,每次接收到一条数据后,就会交给这个executor方法来执行
* 统计单词
*/
public void execute(Tuple input) {
// 接收上游数据
String word = input.getStringByField("word");
Long count = countMap.get(word);
if(null == count){
count = 0L;
}
count ++;
countMap.put(word, count);
LOGGER.info(" ★★★ 单词计数[{}] 出现的次数:{}", word, count);
//发射数据
this.collector.emit(new Values(word,count));
}
/**
* 用于声明当前bolt的Tuple发送流的域名字。Stream流的定义是通过OutputFieldsDeclare.declareStream方法完成的
* 通俗点说法:就是这个方法是定义一个你发射出去的每个tuple中的每个field的名称是什么,作为下游 bolt 中 execute 接收数据 key
* 定义发射出去的tuple,每个field的名称
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
- 构造拓扑(Topology),提交 storm 集群执行(**注: storm 提供了本地模拟集群,可以直接在代码编辑器中编译执行)
在 WordCountTopology 中编写main 执行方法,代码如下:
public static void main(String[] args) {
//去将spout和bolts组合起来,构建成一个拓扑
TopologyBuilder builder = new TopologyBuilder();
// 第一个参数的意思,就是给这个spout设置一个名字
// 第二个参数的意思,就是创建一个spout的对象
// 第三个参数的意思,就是设置spout的executor有几个
builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
builder.setBolt("SplitSentence", new SplitSentenceBolt(), 5)
//为bolt 设置 几个task
.setNumTasks(10)
//设置流分组策略
.shuffleGrouping("RandomSentence");
// fieldsGrouping 这个很重要,就是说,相同的单词,从SplitSentenceSpout发射出来时,一定会进入到下游的指定的同一个task中
// 只有这样子,才能准确的统计出每个单词的数量
// 比如你有个单词,hello,下游task1接收到3个hello,task2接收到2个hello
// 通过fieldsGrouping 可以将 5个hello,全都进入一个task
builder.setBolt("wordCount", new WordCountBolt(), 10)
//为bolt 设置 几个task
.setNumTasks(20)
//设置流分组策略
.fieldsGrouping("SplitSentence", new Fields("word"));
// 运行配置项
Config config = new Config();
//说明是在命令行执行,打算提交到storm集群上去
if(args != null && args.length > 0){
/**
* 要想提高storm的并行度可以从三个方面来改造
* worker(进程)>executor(线程)>task(实例)
* 增加work进程,增加executor线程,增加task实例
* 对应 supervisor.slots.port 中配置个数
* 这里可以动态设置使用个数
* 最好一台机器上的一个topology只使用一个worker,主要原因时减少了worker之间的数据传输
*
* 注意:如果worker使用完的话再提交topology就不会执行,因为没有可用的worker,只能处于等待状态,把之前运行的topology停止一个之后这个就会继续执行了
*/
config.setNumWorkers(3);
try {
// 将Topolog提交集群
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else{
// 说明是在eclipse里面本地运行
// 用本地模式运行1个拓扑时,用来限制生成的线程的数量
config.setMaxTaskParallelism(20);
// 将Topolog提交本地集群
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("wordCountTopology", config, builder.createTopology());
// 为了测试模拟等待
Utils.sleep(60000);
// 执行完毕,关闭cluster
cluster.shutdown();
}
}
- 运行main 方法,查看效果
总结:本次demo,讲解了如何从代码层面理解storm 的集群架构、核心概念、并行度、流分组,结合上一篇文章,同时展现了Spout 到 Bolt,Bolt 到 Bolt 的通信
以上就是本章内容,如有不对的地方,请多多指教,谢谢!
为了方便有需要的人,本系列全部软件都在 https://pan.baidu.com/s/1qYsJZfY
下章预告:主要讲解 storm 集群搭建
代码地址附上:https://github.com/bill5/cache-project/tree/master/storm-wordcount
作者:逐暗者 (转载请注明出处)