【准备】
java环境
python环境
zookeeper环境
storm环境
上代码:
一般的storm程序大概分为三个组件(spout,bolt,topology),当然可以有多个spout,或者多个bolt。
spout:
数据源,它是storm要解析数据的入口
bolt:
数据业务处理组件(可以理解为数据在bolt中流动并接受业务处理)
topology:
拓扑。它的编写主要实现数据流向顺序,组装spout和bolt。一个拓扑打包了一个实时处理程序的逻辑
【Spout】(spout的实现方式主要有两种:继承 BaseRichSpout 实现 IRichSpout )
/**
*
* @author mis
*
* 创建spout类PWSpout,数据源
* 实现方式一:继承BaseRichSpout
* 实现方式二:实现IRichSpout
*/
public class PWSpout extends BaseRichSpout {
private static final long serialVersionUID = 1l;
// 数据源是往出写数据的
private SpoutOutputCollector collector;
// 制造数据源
private static final Map<Integer, String> map = new HashMap<Integer, String>();
//模拟数据
static {
map.put(0, "java");
map.put(1, "php");
map.put(2, "groovy");
map.put(3, "python");
map.put(4, "ruby");
}
//初始化方法
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// 对spout进行初始化
this.collector = collector;
}
// 轮询方法,发射数据
public void nextTuple() {
// 随机发送一个单词
final Random r = new Random();
int num = r.nextInt(5);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
// 发射数据
this.collector.emit(new Values(map.get(num)));// 解决发射声明,发射内容
}
// 指明
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 进行声明
declarer.declare(new Fields("print"));// 发往数据的id,解决下一个节点去哪取发射的数据
}
}
【Bolt】(bolt的实现方式主要有两种:继承 BaseBasicBolt 实现 IRichBolt)
【bolt1组件】
/**
*
* @author mis
* 创建的一个Bolt:PrintBolt接收数据并计算或者发射给下一个节点
*
* 实现方式一:继承BaseBasicBolt
* 实现方式二:实现IRichBolt
*/
public class PrintBolt extends BaseBasicBolt {
private static final Log log = LogFactory.getLog(PrintBolt.class);
private static final long serialVersionUID = 1l;
//接收数据进行业务操作操作
public void execute(Tuple input, BasicOutputCollector collector) {
//获取上一个组件所声明的Field,这里的id与上一个spout发送数据的id要对应才能获取spout发送的数据
String print = input.getStringByField("print");
//发送给下一个bolt
collector.emit(new Values(print));
}
//声明数据id
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("write")); //可以理解为这个bolt将数据发向id为write的bolt组件
}
}
【bolt2组件】
/**
*
* @author mis 创建的一个Bolt:WriteBolt接收数据并计算或者发射给下一个节点
*
* 实现方式一:继承BaseBasicBolt
* 实现方式二:IRichBolt
*/
public class WriteBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1l;
private static final Log log = LogFactory.getLog(WriteBolt.class);
private FileWriter writer;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// 获取上一个组件所声明的Filed
String text = input.getStringByField("write");
String sysname = System.getProperty("os.name");
try {
if (writer == null) {
if(sysname.equals("Windows 10") || sysname.equals("Windows 8.1") || sysname.equals("Windows 7")){
writer = new FileWriter("D:\\storm_helloword\\" + this);
}else if (System.getProperty("os.name").equals("Linux")) {
writer = new FileWriter("/usr/local/temp/stormData/" + this);
}
}
log.info("【write】:写入文件");
writer.write(text);
writer.write("\n");
writer.flush();
} catch (Exception e) {}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//这里数据处理结束的话就不用发送至下一个bolt组件
}
}
【Topology】实时处理程序的逻辑
/**
*
* @author mis
* 组件逻辑,可以理解为各个组件执行顺序的规定类
*/
public class PWTopology1 {
public static void main(String[] args) throws Exception {
Config cfg = new Config();
//cfg.setNumWorkers(2);
cfg.setDebug(true);
//组件执行顺序逻辑
TopologyBuilder builder = new TopologyBuilder();
//先创造spout数据源PWSpout
builder.setSpout("spout", new PWSpout());
//在创建bolt组件PrintBolt,并接受分组id为“spout”的数据
builder.setBolt("print-bolt", new PrintBolt()).shuffleGrouping("spout");
//在创建bolt组件WriteBolt,并接受分组id为“print-bolt”的数据
builder.setBolt("write-bolt", new WriteBolt()).shuffleGrouping("print-bolt");
//1 本地模式
LocalCluster cluster = new LocalCluster();
//提交分析
cluster.submitTopology("top1", cfg, builder.createTopology());
//10000毫秒后停止实时计算
Thread.sleep(10000);
cluster.killTopology("top1");
cluster.shutdown();
//2集群模式(任务id以参数的形式传入)
//StormSubmitter.submitTopology(args[0], cfg,builder.createTopology());
}
}
方案一
方案二
方案三
以上三种方案代表不同的组件处理逻辑
【本地运行】
【集群打包运行】
打包上传jar
启动相关程序
./zkServer.sh start
storm nimbus &
storm supervisor &
storm ui &
storm jar /root/Desktop/stormHelloword.jar storm01.topology.PWTopology1 top2
jar包地址 指定执行主类 传入任务id
【storm的jar终端提交命令】
启动:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】
关闭:storm kill 【拓扑名称】
更详细的任务信息