本文将学习如何使用java创建Storm拓扑
Storm集群的组件
Storm集群类似于Hadoop集群,只不过 Hadoop 上运行"MapReduce jobs", Storm 上运行"topologies"。
两者最大的差别是,MapReducejobs 最终是完成的,而 topologies 是一直处理消息(或直到你杀死它)。
集群 | 任务名称 | 任务时效性 |
---|---|---|
Storm | topologies(拓扑) | 一直处理消息(或直到你杀死它) |
Hadoop | MapReduce jobs | 最终是完成的 |
Storm集群上有两种节点:master 和 worker 节点
- master:
运行一个名为 Nimbus 的守护进程,
负责在集群周围分发代码,
为机器分配任务以及监控故障。
(类似 Hadoop 的 JobTracker) - worker:
运行一个名为 Supervisor 的守护进程,
负责监听、并根据需要启动、停止 "Nimbus" 分配给其的任务。
每个工作进程都执行拓扑的子集。 运行拓扑由分布在许多计算机上的许多工作进程组成。
Nimbus 和 Supervisors 之间的协调是通过 Zookeeper 实现的。
此外,Nimbus 守护程序和 Supervisors 守护程序是 fail-fast 和 stateless;
所有状态都保存在Zookeeper或本地磁盘上。这意味着你可以通过 kill -9
杀死 Nimbus 或者 Supervisors ,但是它们会像没事一样重新开始。
这种设计使Storm集群非常稳定。
Topologies
要想在 Storm 上进行实时计算,你需要创建一个 topologies 。
topologies 是一个计算图,topologies中的每个节点包含计算逻辑,并且通过节点之间的连接定义了数据在节点之间的流动方向。
运行拓扑很简单。首先,将所有代码和依赖项打包到一个jar中。然后,运行如下命令:
storm jar all-my-code.jar org.apache.storm.MyTopology arg1 arg2
额 这个命令没啥好解释的....
Streams
Stream 是一个无限的元组序列,是 Storm 抽象的核心。
Storm 提供了以分布式和可靠的方式进行 Stream 传换的原语。
比如将微博 Stream 转换为转换热门主题 Stream。
Storm 为进行 Stream 转换提供 spouts 和 bolt 两个基本原语。
- spouts
spouts 是 Stream 的来源。
例如,spout可以读取Kestrel队列中的元组并将其作为 Stream 发出。或者 spout 可以连接到Twitter API并发出推文流。 - bolt
bolt会消耗任意数量的输入流,进行一些处理,并可能发出新的流。
像从推文流计算趋势主题流之类的复杂流转换,需要多个步骤,因此需要多个 bolt 。
bolt 可以执行任何操作,包括运行函数,过滤元组,进行流聚合,进行流连接,与数据库对话等等。
spout 和 bolt 网络被打包成一个 topology ,这是提交给 Storm 集群执行的顶级抽象。
拓扑是流转换的图形,其中每个节点都是一个 spout 或 bolt 。
图中的表示哪些 bolt 订阅了哪些流。
当一个 spout 或 bolt 向一个流发出一个元组时,它会将元组发送给订阅该流的每个 bolt 。
拓扑中节点之间的链接指示应如何传递元组。
如上图,Spout A 和Bolt B 之间有链接,Spout A 到 Bolt C 之间有链接,以及从 Bolt B 到 Bolt C 之间有链接。
那么每次 Spout A 发出一个元组时,它都会将元组发送给 Bolt B 和 Bolt C .所有 Bolt B 的输出元组也将送给 Bolt C.
Storm拓扑中的每个节点并行执行。
在拓扑中,你可以为每个节点指定所需的并行度,Storm将在集群中生成该数量的线程以执行。
拓扑会一直执行(或直到你杀死它)。
Storm会自动重新分配失败的任务。
此外,Storm保证不会丢失数据,即使计算机出现故障并且消息丢失也是如此。
Data model
Storm使用元组作为其数据模型。
元组是一个命名的值列表,元组中的字段可以是任何类型的对象。
Storm支持所有原始类型,字符串和字节数组作为元组字段值。
要使用其他类型的对象,需要为该类型实现一个序列化程序。
拓扑中的每个节点都必须声明它发出的元组的输出字段。
例如下面代码中的bolt 声明它发出2元组,字段为 "double" 和 "triple"
package com.aaa.test;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* @author lillcol
* 2019/7/18-11:46
*/
public class DoubleAndTripleBolt extends BaseRichBolt {
private OutputCollector _collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple input) {
int val = input.getInteger(0);
_collector.emit(input, new Values(val*2, val*3));
_collector.ack(input);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("double", "triple"));//声明["double", "triple"]组件的输出字段
}
}
一个简单的拓扑(A simple topology)
如何实现一个简单的拓扑?
本地 idea测试
sbt构建
// libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"
定义一个Spout,此处采用随机数
package com.test.storm;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
/**
* @author lillcol
* 2019/7/18-12:03
*/
public class TestWordSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
//Spouts负责向拓扑中发送新消息
Utils.sleep(100);
//每隔100ms就会从列表中随机选一个单词发出
final String[] words = new String[]{"hellow", "lillcol", "study", "storm"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
collector.emit(new Values(word));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
定义Bolt,功能接收到的信息追加"levelUp!"
package com.test.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
/**
* @author lillcol
* 2019/7/18-12:04
*/
public class ExclamationBolt extends BaseRichBolt {
OutputCollector collector;
//prepare方法为 Bolt 提供了一个OutputCollector用于从 Bolt 中发出元组 。
//元组可以随时的从prepare,execute,cleanup,甚至在另一个线程中异步发出。
//当前prepare实现只是将OutputCollector作为实例变量保存,以便稍后在execute方法中使用。
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector =collector;
}
//execute方法从一个Bolt的输入接收一个元组。
//此execute取数组的第一个字段并发出追加字符串“levelUp!” 得字符串。
//如果您实现了一个订阅多个输入源的bolt,您可以通过使用Tuple#getSourceComponent方法找出Tuple来自哪个组件。
@Override
public void execute(Tuple input) {
String sourceComponent = input.getSourceComponent();
//输入元组作为第一个参数传递emit
collector.emit(input, new Values(input.getString(0) + "levelUp!"));
System.out.println(input.getString(0));
// 输入元组在最后一行被激活。这些是Storm的可靠性API的一部分,用于保证不会丢失数据
collector.ack(input);
}
//declareOutputFields方法声明ExclamationBolt发出1元组,其中一个字段称为“word”。
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
//如果implements IRichBol
//还需要重写下面两个方法
//当Bolt被关闭时调用cleanup方法,并且应该清除所有打开的资源。
//无法保证在集群上调用此方法:例如,如果任务正在运行的计算机爆炸,则无法调用该方法。
@Override
public void cleanup() {
}
//getComponentConfiguration方法允许配置此组件运行方式的各个方面
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
//但是一般情况下我们不需要这两个方法,所以我们可以通过继承BaseRichBolt来定义Bolt
定义调用类
package com.test.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.shade.org.apache.jute.Utils;
import org.apache.storm.topology.TopologyBuilder;
/**
* @author lillcol
* 2019/7/18-12:03
*/
public class SimpleTopology {
public static void main(String[] args) throws Exception {
SimpleTopology topology = new SimpleTopology();
topology.runLocal(60);
}
public void runLocal(int waitSeconds) throws Exception {
TopologyBuilder topologyBuilder = new TopologyBuilder();
//第一个参数是给Spout一个id "words",
//第二个参数是要调用的Spout类
//第三个参数是节点所需的并行度,是可选的。它指示应在群集中执行该组件的线程数。如果省略它,Storm将只为该节点分配一个线程。
topologyBuilder.setSpout("words", new TestWordSpout(), 1);
//Bolt的参数与Spout
//只是要通过shuffleGrouping 指定数据来源"words")
//“shuffle grouping”意味着元组应该从输入任务随机分配到bolt的任务中。
topologyBuilder.setBolt("DoubleAndTripleBolt1", new ExclamationBolt(), 1)
.shuffleGrouping("words");
//一个Bolt可以接收多个数据来源,是要多次调用shuffleGrouping即可
topologyBuilder.setBolt("DoubleAndTripleBolt2", new ExclamationBolt(), 1)
.shuffleGrouping("DoubleAndTripleBolt1")
.shuffleGrouping("words");
//loacl 测试
Config config = new Config();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word_count", config, topologyBuilder.createTopology());
org.apache.storm.utils.Utils.sleep(1000*10);
cluster.killTopology("word_count");
cluster.shutdown();
}
}
运行结果:
study
study
studylevelUp!
study
study
studylevelUp!
hellow
hellow
hellowlevelUp!
lillcol
lillcol
lillcollevelUp!
hellow
hellow
hellowlevelUp!
hellow
hellow
hellowlevelUp!
lillcol
lillcol
lillcollevelUp!
. . .
异常
可能出现异常1:
java.lang.NoClassDefFoundError: org/apache/storm/topology/IRichSpout
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.storm.topology.IRichSpout
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
Error: A JNI error has occurred, please check your installation and try again
这个是因为在sbt构建的时候 % "provided" 意思是已提供相关jar,但是我们idea测试的时候并没有相关jar
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0" % "provided"
所以不能用上面的语句,改成下面的即可
libraryDependencies += "org.apache.storm" % "storm-core" % "2.0.0"
maven 对应着改就可以了
可能出现异常2:
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.log4j.LogManager.getLogger(LogManager.java:44)
at org.slf4j.impl.Log4jLoggerFactory.getLogger(Log4jLoggerFactory.java:64)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:358)
at org.slf4j.LoggerFactory.getLogger(LoggerFactory.java:383)
at org.apache.storm.LocalCluster.<clinit>(LocalCluster.java:128)
at com.test.storm.SimpleTopology.runLocal(SimpleTopology.java:28)
at com.test.storm.SimpleTopology.main(SimpleTopology.java:16)
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49)
... 7 more
错误报的很明显
log4j-over-slf4j.jar AND slf4j-log4j12.jar 冲突了
我的解决办法是在测试的时候随便删掉一个,但是生产的时候在可能冲突的依赖中把它去掉
Storm 的 的hellow word(word count)
//定义Spout WordReader
package com.test.storm;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Map;
/**
* @author lillcol
* 2019/7/19-9:17
*/
public class WordReader extends BaseRichSpout {
private SpoutOutputCollector collector;
private FileReader fileReader;
private boolean completed = false;
/**
* open方法,接收三个参数:
* 第一个是创建Topology的配置,
* 第二个是所有的Topology数据
* 第三个是用来把Spout的数据发射给bolt
**/
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
try {
//获取创建Topology时指定的要读取的文件路径
this.fileReader = new FileReader(conf.get("wordsFile").toString());
} catch (FileNotFoundException e) {
throw new RuntimeException("Error reading file ["
+ conf.get("wordFile") + "]");
}
//初始化发射器
this.collector = collector;
}
/**
* nextTuple是Spout最主要的方法:
* 在这里我们读取文本文件,并把它的每一行发射出去(给bolt)
* 这个方法会不断被调用,为了降低它对CPU的消耗,当任务完成时让它sleep一下
**/
@Override
public void nextTuple() {
//如果要看到tail效果,去掉这个 if (completed) 语句块,我测试的时候不去掉看不到效果只有报错
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return;
}
String str;
BufferedReader bufferedReader = new BufferedReader(fileReader);
try {
while ((str = bufferedReader.readLine()) != null) {
//发送一行
collector.emit(new Values(str), str);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
completed = true;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
//定义Bolt WordSplit 实现切割
package com.test.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* @author lillcol
* 2019/7/19-9:38
*/
public class WordSplit implements IRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* execute是bolt中最重要的方法:
* 当接收到一个tuple时,此方法被调用
* 这个方法的作用就是把接收到的每一行切分成单个单词,并把单词发送出去(给下一个bolt处理)
**/
@Override
public void execute(Tuple input) {
String line = input.getString(0);
String[] words = line.split(",| |\\|");
for (String word : words) {
word = word.trim();
if (!word.isEmpty()) {
List a = new ArrayList();
a.add(input);
collector.emit(a, new Values(word));
}
}
collector.ack(input);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
//定义Bolt WordCounter 实现统计
package com.test.storm;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
/**
* @author lillcol
* 2019/7/19-10:01
*/
public class WordCounter implements IRichBolt {
Integer id;
String name;
Map<String, Integer> counters;
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.counters = new HashMap<String, Integer>();
this.collector = collector;
this.name = context.getThisComponentId();
this.id = context.getThisTaskId();
}
@Override
public void execute(Tuple input) {
String str = input.getString(0);
if (!counters.containsKey(str)) {
counters.put(str, 1);
} else {
counters.put(str, counters.get(str) + 1);
}
//如果要看到tail效果,应该在这里打印统计信息
// System.out.println(str+":"+counters.get(str) );
collector.ack(input);
}
//这里只是最后一次打印,要tail效果不应该在这里打印统计信息。
@Override
public void cleanup() {
System.out.println("--Word Counter [" + name + "-" + id + "] --");
for (Map.Entry<String, Integer> entry : counters.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
}
counters.clear();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
//定义主类
package com.test.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
/**
* @author lillcol
* 2019/7/19-10:33
*/
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("wordReader",new WordReader(),1);
topologyBuilder.setBolt("WordSplit",new WordSplit(),1)
.shuffleGrouping("wordReader");
topologyBuilder.setBolt("",new WordCounter(),2)
.shuffleGrouping("WordSplit");
//配置
Config config = new Config();
config.put("wordsFile","D:\\stromFile");
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//创建一个本地模式cluster
LocalCluster localCluster = new LocalCluster();
//提交Topology
localCluster.submitTopology("WordCountTopology",config,topologyBuilder.createTopology());
Thread.sleep(2000);//这个时间要控制好,太短看不到效果
localCluster.shutdown();
}
}
//输出结果
11:35:16.845 [SLOT_1024] INFO o.a.s.e.ExecutorShutdown - Shutting down executor :[2, 2]
11:35:16.845 [Thread-37--executor[2, 2]] INFO o.a.s.u.Utils - Async loop interrupted!
--Word Counter [-2] --
Thread[SLOT_1027:73
40673ms:1
11:34:31.865:1
30724ms:1
11:34:23.065:1
11:34:27.365:1
. . .
11:35:16.846 [SLOT_1024] INFO o.a.s.e.ExecutorShutdown - Shut down executor :[2, 2]
11:35:16.846 [SLOT_1024] INFO o.a.s.e.ExecutorShutdown - Shutting down executor :[1, 1]
11:35:16.846 [Thread-38--executor[1, 1]] INFO o.a.s.u.Utils - Async loop interrupted!
--Word Counter [-1] --
Thread[SLOT_1027:87
11:34:31.465:1
29524ms:1
26024ms:1
. . .