Storm HelloWorld

示例简介


一个简单的Storm的示例,拓扑结构由一个Spout和一个Bolt组成,Spout负责从Redis中读取经纬度点(Coordinate)的数据,并且在这些点中随机挑选一个放入tuple中发射给BoltBolt负责处理当前的点与上一个接收到的点之间的距离,并存储到Redis中的另一张表中。

Maven依赖


Demo使用的Storm版本为0.9.6,同时使用了Jedis来直接与Redis交互。在pom文件中添加如下依赖:

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>0.9.6</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>redis.clients</groupId>
  <artifactId>jedis</artifactId>
  <version>2.8.1</version>
</dependency>

创建Spout文件


Spout创建时的核心方法是 public void nextTuple()方法,Storm在这里将信息发送到collectorStream中。

public void nextTuple() {
  Utils.sleep(100);
  Integer r = rand.nextInt(points.length);
  String coor = points[r];
  collector.emit(new Values(coor));
}

完整的代码如下:

package wech.storm_starter_demo;

import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import redis.clients.jedis.Jedis;

public class PointReader implements IRichSpout {

/**
*
*/
private static final long serialVersionUID = 1L;
private SpoutOutputCollector collector;
private Random rand;
private Jedis jedis;
private String[] points;
private boolean complete = false;
private static String POINTS_KEY = "points";

public boolean isDistributed() {
return false;
}

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
this.rand = new Random();
this.jedis = new Jedis();
List<String>list = jedis.lrange(POINTS_KEY, 0, -1);
this.points = list.toArray(new String[1]);
}

public void ack(Object object) {
System.out.println("OK" + object);
}

public void activate() {
// TODO Auto-generated method stub

}

public void close() {
// TODO Auto-generated method stub

}

public void deactivate() {
// TODO Auto-generated method stub

}

public void fail(Object object) {
System.out.println("Fail" + object);

}

public void nextTuple() {
Utils.sleep(100);
Integer r = rand.nextInt(points.length);
String coor = points[r];
collector.emit(new Values(coor));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("point"));
}

public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

}

创建Bolt


bolt主要负责接收stream中的tuple信息并且进行处理,然后选择传递到下一个bolt或者通知Spout处理完成(失败)。代码如下:

package wech.storm_starter_demo;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;

public class PointHandle implements IRichBolt {
private OutputCollector collector;
private Jedis jedis;
private static String DISTANCE_KEY = "distancekey";
private String lastPoint;
private static Double R = 6378137.0;
public void cleanup() {
// TODO Auto-generated method stub

}

public Double[] splitPoint(String pointStr) {
String[] temp = pointStr.split(",");
Double[] fin = new Double[]{Double.parseDouble(temp[0]) , Double.parseDouble(temp[1])};
return fin;
}

public Double distance(Double lon1, Double lat1, Double lon2, Double lat2) {
Double a;
Double b;
lat1 = lat1 * Math.PI / 180.0;
lat2 = lat2 * Math.PI / 180.0;
a = lat1 - lat2;
b = (lon1 - lon2) * Math.PI / 180.0;
Double distance;
Double sa2,sb2;
sa2 = Math.sin(a / 2.0);
sb2 = Math.sin(b / 2.0);
distance = 2 * R * Math.asin(Math.sqrt(sa2 * sa2 + Math.cos(lat1)
* Math.cos(lat2) * sb2 * sb2));
return distance;

}

public void execute(Tuple input) {
String currentPoint = input.getString(0);
if (lastPoint.isEmpty()) {
lastPoint = currentPoint;
return;
} else {
Double[] lastCoor = this.splitPoint(lastPoint);
Double[] currentCoor = this.splitPoint(currentPoint);
Double distance = this.distance(lastCoor[0], lastCoor[1], currentCoor[0], currentCoor[1]);
String result = lastPoint + " -> " + currentPoint + " = " + distance;
System.out.println(result);
jedis.rpush(DISTANCE_KEY, result);
}

collector.ack(input);
}

public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
this.collector = collector;
jedis = new Jedis();
lastPoint = "";
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));

}

public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}

}

创建Topology


创建一个Topology结构,将SpoutBolt添加进去。代码如下:

package wech.storm_starter_demo;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class PointTopology {

public static void main(String[] args) throws InterruptedException, AlreadyAliveException, InvalidTopologyException {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new PointReader(),5);
builder.setBolt("bolt", new PointHandle(),8).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);

StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("point-handle", conf, builder.createTopology());

Thread.sleep(10000);

cluster.shutdown();
};
}

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容