前言
本文是 storm 入门第一篇,因为 Storm 的本地模式体验极其简单,
故而我希望第一篇我们先来体验一下 Storm,而不是其他分布式技术那样,
开门就是架构,简介....
1 Storm初体验之本地运行
1.1 下载 Storm Jar 包
这里我们直接用 Maven 管理,直接在我们项目的 pom.xml 文件下加入:
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
</dependency>
1.2 创建 Topology
什么是 Topology ?
Storm 为了方便编程,将 Storm 的程序封装成一个个的 Topology,这个Topology 也就是我们本文的重点 编程模型。(其实质是一个 DAG 有向无环图)-
Topology 是怎么样的?
Topology 包含以下几个结构:- 数据 Tuple:在 Storm 中,所有的数据都是以 Tuple 的形式进行传输的
- 数据发送者 Spout:这个数据发送只是相对 Storm 本身来说的,Spout 一般从指定的外部数据源读取数据封装成 Tuple,进行数据的发送。
- 数据处理组件 Bolt:Spout 的数据会发送到 Bolt,Bolt 就是用来做数据处理的组件,为了提高效率,一般 Bolt 只会处理一些单一的功能,然后会将数据继续往下一个 Bolt发送,形成一个 Bolt 链。
- Stream Grouping:在数据从 Spout 到 Bolt 或者 从 Bolt 到 Bolt 的时候可以指定数据的流向规则,这个规则就是 Stream Grouping。
- Stream 数据流:从 Spout 发出,到 Bolt 处理完形成的数据通道就是一个数据流,一个Spout 可以发送多个数据流。
-
Topology 如何创建?
好了,现在我们正式开始编码吧...- 创建 Spout:
//继承自 BaseRichSpout static class MySpout extends BaseRichSpout { private Map map; private TopologyContext topologyContext; private SpoutOutputCollector spoutOutputCollector; //初始化函数 public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.map = map; this.topologyContext = topologyContext; this.spoutOutputCollector = spoutOutputCollector; } //模拟的外部数据 String[] outData = new String[]{"张三", "李四", "王五"}; String[] outData2 = new String[]{"12", "13", "22"}; /** * 我们可以在这里来模拟从外部获取数据并发送到 bolt * 该函数会在 storm 运行期间被循环调用 */ public void nextTuple() { String name = outData[(int) (Math.random() * 3)]; String age = outData2[(int) (Math.random() * 3)]; //将数据封装到 Tuple 里面 Values v = new Values(name,age); // 将数据发送出去 spoutOutputCollector.emit(v); //休眠一下,便于观察 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 数据声明 * 发送的数据,通过这里的声明 告诉下游,我这个数据是什么 * 相当于 表中的字段名 */ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { //这里我们发送的是 姓名 和 年龄,主要顺序不能乱 outputFieldsDeclarer.declare(new Fields("name","age")); } }
- 创建 Bolt
//继承自 BaseRichBolt static class MyBolt extends BaseRichBolt { private Map stormConf; private TopologyContext context; private OutputCollector collector; //初始化函数 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.stormConf = stormConf; this.context = context; this.collector = collector; } // 处理数据 public void execute(Tuple input) { //获取上游发送的 name 字段 String name = input.getStringByField("name"); //获取上游发送的 age 字段 String age = input.getStringByField("age"); //这里我们简单的打印一下就好 System.out.println(name + "****"+age); // 如果你还要继续往下发送 那么:collector.emit() 就可以 } public void declareOutputFields(OutputFieldsDeclarer declarer) { //和 Spout 一样, 如果你还要继续往下发送数据, // 那么你就要在这里声明的发送的数据是什么 // 我们这里不往下游发送,所以可以不用写 } }
- 创建 Topology
上面我们创建了 Spout 和 Bolt,那么将他们组合起来就是我们的 Topology 了
//在主程序里面进行组装提交 public static void main(String[] args) { // Topology 的构建者 TopologyBuilder builder = new TopologyBuilder(); // 设置 Spout ,并为其命名为 textSpout builder.setSpout("textSpout", new MySpout(), 3); //设置 Bolt,并为其命名为 MyBolt, builder.setBolt("MyBolt", new MyBolt(), 3) // 设置其 Stream Grouping 为 shuffleGrouping,并且是从 textSpout 接受数据 .shuffleGrouping("textSpout"); // 创建Topology StormTopology topology = builder.createTopology() // 创建一个 本地集群 LocalCluster localCluster = new LocalCluster(); Config map = new Config(); map.setNumAckers(1); //将 topology 提交到集群运行 localCluster.submitTopology("test", map, topology ); }
如果没有意外,此时运行本程序,你应该就能正常启动 Storm了,然后你在控制台就可以看到如下:
有没有感觉很简单?和我们平时写的本地代码基本没什么区别...
Storm 计算模型
上面我们已经体验过 Storm 的本地模式了,虽然我们的代码极其简陋,但是最少让我们了解了 Storm 的编程模型到底是怎么样的了!再怎么复杂的东西,我们也可以从上面这个简陋的代码一步步衍生出来,下面我们看一下下面这幅图
从上面我们可以看到:
一个水龙头代表一个 Spout,一个闪电代表一个 Bolt,
Spout 和 Bolt 通过 数据Tuple 的通道建立起了一条条数据流。
该图可以很好的说明 Storm 的工作模式,
通过 Spout 和 Bolt 可以构建起各种数据流以满足我们的业务需求。