示例简介
一个简单的Storm的示例,拓扑结构由一个Spout
和一个Bolt
组成,Spout
负责从Redis
中读取经纬度点(Coordinate)的数据,并且在这些点中随机挑选一个放入tuple
中发射给Bolt
。Bolt
负责处理当前的点与上一个接收到的点之间的距离,并存储到Redis中的另一张表中。
Maven依赖
Demo使用的Storm版本为0.9.6,同时使用了Jedis
来直接与Redis
交互。在pom文件中添加如下依赖:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
创建Spout文件
Spout
创建时的核心方法是 public void nextTuple()
方法,Storm在这里将信息发送到collector
的Stream
中。
public void nextTuple() {
Utils.sleep(100);
Integer r = rand.nextInt(points.length);
String coor = points[r];
collector.emit(new Values(coor));
}
完整的代码如下:
package wech.storm_starter_demo;
import java.util.List;
import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import redis.clients.jedis.Jedis;
public class PointReader implements IRichSpout {
/**
*
*/
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private Random rand;
private Jedis jedis;
private String[] points;
private boolean complete = false;
private static String POINTS_KEY = "points";
public boolean isDistributed() {
return false;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.rand = new Random();
this.jedis = new Jedis();
List<String>list = jedis.lrange(POINTS_KEY, 0, -1);
this.points = list.toArray(new String[1]);
}
public void ack(Object object) {
System.out.println("OK" + object);
}
public void activate() {
// TODO Auto-generated method stub
}
public void close() {
// TODO Auto-generated method stub
}
public void deactivate() {
// TODO Auto-generated method stub
}
public void fail(Object object) {
System.out.println("Fail" + object);
}
public void nextTuple() {
Utils.sleep(100);
Integer r = rand.nextInt(points.length);
String coor = points[r];
collector.emit(new Values(coor));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("point"));
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
创建Bolt
bolt
主要负责接收stream
中的tuple
信息并且进行处理,然后选择传递到下一个bolt
或者通知Spout
处理完成(失败)。代码如下:
package wech.storm_starter_demo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
public class PointHandle implements IRichBolt {
private OutputCollector collector;
private Jedis jedis;
private static String DISTANCE_KEY = "distancekey";
private String lastPoint;
private static Double R = 6378137.0;
public void cleanup() {
// TODO Auto-generated method stub
}
public Double[] splitPoint(String pointStr) {
String[] temp = pointStr.split(",");
Double[] fin = new Double[]{Double.parseDouble(temp[0]) , Double.parseDouble(temp[1])};
return fin;
}
public Double distance(Double lon1, Double lat1, Double lon2, Double lat2) {
Double a;
Double b;
lat1 = lat1 * Math.PI / 180.0;
lat2 = lat2 * Math.PI / 180.0;
a = lat1 - lat2;
b = (lon1 - lon2) * Math.PI / 180.0;
Double distance;
Double sa2,sb2;
sa2 = Math.sin(a / 2.0);
sb2 = Math.sin(b / 2.0);
distance = 2 * R * Math.asin(Math.sqrt(sa2 * sa2 + Math.cos(lat1)
* Math.cos(lat2) * sb2 * sb2));
return distance;
}
public void execute(Tuple input) {
String currentPoint = input.getString(0);
if (lastPoint.isEmpty()) {
lastPoint = currentPoint;
return;
} else {
Double[] lastCoor = this.splitPoint(lastPoint);
Double[] currentCoor = this.splitPoint(currentPoint);
Double distance = this.distance(lastCoor[0], lastCoor[1], currentCoor[0], currentCoor[1]);
String result = lastPoint + " -> " + currentPoint + " = " + distance;
System.out.println(result);
jedis.rpush(DISTANCE_KEY, result);
}
collector.ack(input);
}
public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
this.collector = collector;
jedis = new Jedis();
lastPoint = "";
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
创建Topology
创建一个Topology
结构,将Spout
和Bolt
添加进去。代码如下:
package wech.storm_starter_demo;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
public class PointTopology {
public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new PointReader(),5);
builder.setBolt("bolt", new PointHandle(),8).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("point-handle", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
};
}
}