在Storm(五)第一个集群Topology中用如下的命令行向集群提交了Topology:
storm jar cluster-topology-1.0-SNAPSHOT.jar com.quiterr.ExclamationTopology
但是在项目中需要动态自动化提交Topology,也就是通过代码来提交Topology。
这里仍然以ExclamationTopology为例来说明,不过要对ExclamationTopology做一些改造,要把ExclamationTopology中的Bolt单独提出来做一个项目(或模块),创建Topology相关的代码做成另一个项目。
一、Topology项目(后续以项目名dynamic-submit-classes来叙述)
这个项目用来放Topology自身相关的东西,包括Spout、Bolt等,实际项目中可以很复杂,但是在这个例子中很简单,因为Spout是Storm核心库提供的,而Bolt只有一个。
(一)pom文件
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
极简单,没什么好说的。
(二)ExclamationBolt
这个项目只有一个类ExclamationBolt,从ExclamationTopology里边copy出来的。
public class ExclamationBolt extends BaseRichBolt {
OutputCollector _collector;
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
_collector.ack(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
(三)安装到本地
使用mvn install
命令安装到本地,因为下一个项目要用到本项目。
(四)部署到Storm服务器
把生成的jar包放到Storm服务器上,这里放的路径是/home/app/test
本项目源代码:https://github.com/quiterr/storm-test/tree/master/dynamic-submit-classes
二、创建Topology项目(后续以项目名dynamic-submit来叙述)
用Spring Boot做一个RESTful API,专门用来接收创建Topology的请求,创建完毕后向集群提交Topology。
(一)pom文件
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>1.5.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--配置处理器,会在编辑配置文件的时候智能提示-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<!--自己实现的topology所在的库-->
<dependency>
<groupId>com.quiterr</groupId>
<artifactId>dynamic-submit-classes</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>1.5.3.RELEASE</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
除了Spring Boot相关的依赖,就是上一个项目的依赖。
(二)SubmitController
SubmitController接收一个topologyName参数,然后创建topology,最后提交到集群。代码中有一行指明了topology所在的位置:System.setProperty("storm.jar", "/home/app/test/dynamic-submit-classes-1.0-SNAPSHOT.jar");
@RestController
public class SubmitController {
@RequestMapping(value = "/topology/create", method = RequestMethod.POST)
private boolean create(@RequestBody String topologyName){
Config conf = new Config();
conf.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 10);
builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
System.setProperty("storm.jar", "/home/app/test/dynamic-submit-classes-1.0-SNAPSHOT.jar");
try {
StormSubmitter.submitTopology(topologyName, conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
return true;
}
}
本项目源代码:https://github.com/quiterr/storm-test/tree/master/dynamic-submit
三、测试和说明
把dynamic-submit-1.0-SNAPSHOT.jar和dynamic-submit-classes-1.0-SNAPSHOT.jar都放到Storm服务器上后,用java -jar dynamic-submit-1.0-SNAPSHOT.jar
启动第一个项目,然后用一个RESTful的测试客户端发请求即可。
有个问题:topology被提交到哪里了?
在Storm基础(三)配置这篇文章里提到过,如果没有定义storm.yml文件,就会使用Storm核心库中自带的default.xml文件,默认情况下topology就会提交到本地,这就是一定要把程序放到Storm所在的服务器上的原因。
四、用代码远程提交
能不能不把topology传到服务器上,就在本地提交呢?
理论上是可以的。
1.把dynamic-submit-classes-1.0-SNAPSHOT.jar放到dynamic-submit工程目录下。
2.StormSubmitter类中设置环境变量:System.setProperty("storm.jar", "dynamic-submit-classes-1.0-SNAPSHOT.jar");
3.dynamic-submit加一个storm.yml配置文件,内容像这样:
########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
- "172.19.3.147"
- "172.19.3.148"
- "172.19.3.149"
nimbus.seeds: ["172.19.3.147"]
storm.local.dir: "/data/storm_local"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
- 6704
- 6705
- 6706
- 6707