package com.csylh;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
/**
* Description:使用Storm实现累加求和操作
*
* @author: 留歌36
* Date:2018/9/3 16:50
*/
public class LocalSumStormTopology {
/**
* Spout需要继承BaseRichSpout
* 数据源需要产生数据并发射到Bolt
*/
public static class DataSourceSpout extends BaseRichSpout{
//定义一个发射器
private SpoutOutputCollector collector;
/**
* 初始化方法 只是会被调用一次
* @param conf 配置参数
* @param context 上下文:相当于一个框 可以从里面获取许多东西
* @param collector 数据发射器
*/
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//将传入的collector发射器 对私有变量 进行赋初值
this.collector = collector;
}
int number = 0;
/**
* 用于产生数据
* 生产中肯定是从消息队列中获取数据
* 这个方法是一个死循环
*/
@Override
public void nextTuple() {
//发送方式,调用上面定义的数据发射器
this.collector.emit(new Values(number++));
System.out.println("Spout==》发送的数据:" + number);
//每隔1s中发射一次,防止数据产生太快
Utils.sleep(1000);
}
/**
* 声明输出字段
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("num"));
}
}
/**
* Bolt需要继承BaseRichBolt
* 用于接收数据并对数据进行处理
*/
public static class SumBolt extends BaseRichBolt{
/**
* 初始化方法 ,会被执行一次
* @param stormConf
* @param context
* @param collector 这里的数据发射器,由于业务逻辑中没有没有必要进行放下发的操作,所以就是没有必要进行new一个
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
int sum = 0;
/**
* 也是一个死循环 ,职责: 获取Spout发射过来的数据
* @param input
*/
@Override
public void execute(Tuple input) {
//Bolt中获取值可以通过index获取
// 也可以根据上一个环节中定义的filed的名称获取(***推荐)
Integer value = input.getIntegerByField("num");
sum += value;
System.out.println("Bolt :Sum = ["+ sum + "]");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("DataSourceSpout",new DataSourceSpout());
builder.setBolt("SumBolt",new SumBolt()).shuffleGrouping("DataSourceSpout");
//创建一个本地的Storm集群 ,本地模式运行,不需要搭建Storm集群
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("LocalSumStormTopology",new Config(),builder.createTopology());
}
}
使用Storm实现累加求和操作
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 对列表里某个类中的某个属性进行求和操作非常常见。在Java 8诞生前,常用for循环手动累加,往往要写上三四行代码...
- 数据文件如下: 求和cat data|awk '{sum+=$1} END {print "Sum = ", su...
- 一个简单的实现tableView来求金额总和的demo, 图例如下: Github上代码:[https://git...