title: storm批处理事物原理
date: 2017-08-20
categoties:
- storm
tags: - storm
- java
storm批处理事物原理
对于容错机制,storm通过一个系统级别的组件acker,结合xor校验机制判断一个tuple是否发送成功,进而spout可以重发该tuple,保证一个tuple在出错的情况下至少被重发一次。
但是在需要精确统计tuple的数量如销售额场景时,希望每个tuple被且仅被处理一次,storm 0.7.0引入了transactional topology,它保证每个tuple被且仅被处理一次,这样我们就可以实现一种非常准确,且高度容错方式来实现计数类应用。逐个处理单个tuple,增加很多开销,如写库,输出结果频率过高。
事物处理单个tuple效率比较低,因此storm中引入了batch处理。事物可确保该批次要么全部处理成功,如果有处理失败的则全部不计,storm会对失败的批次重新发送,且确保每个batch有且仅被处理一次。
事物机制原理:
对于只处理一次的需要,从原理上来讲,需要在发送tuple的时候带上事物id:txid,在需要事物处理的时候,根据该txid是否以前已经处理成功来决定是否进行处理,当然需要把txid和处理结果一起做保存。并且需要保障顺序性,在当前请求txid提交之前,所有比自己低txid请求都已经提交。
在事物batch处理中,一批tuple赋予一个txid,为了提高batch之间处理的并行度,storm采用pipeline(管道)处理模型,这样多个事物可以并行执行,但是commit的是按严格顺序的。
storm事物处理中,把一个batch的计算分成两个阶段processing和commit阶段:
processing阶段:多个batch可以并行计算;
commiting阶段:batch之间强制按照顺序进行提交。
事物topo:
processing阶段:多个batch可以并行计算,比如说bolt2是普通的batchbolt(实现了IBatchBolt),那么多个batch在bolt2的task之间可以并行执行。
commiting阶段:batch之间强制按照顺序进行提交,比如bolt3实现IBatchBolt并且标记需要事物处理(实现了ICommitter接口,或者通过TransactionalTopologyBuilder的setCommitterBolt方法把BatchBolt添加到topology里面),那么storm认为可以提交batch的时候调用finishbatch,在finishBatch做txid的比较以及状态保存工作。
使用transactional topologles的时候,storm会为你做下面的事情:
管理状态:storm把所有实现transactional topologies所必须的状态保存在zookeeper里面,包括当前transaction id以及每个batch的一些元数据;
协调事务:storm帮你管理所有事情,如帮你决定在任何一个时间点是该processing还是该committing。
错误检测:storm利用acking框架来高效地检测什么时候一个batch被成功处理了,被成功提交了,或者失败了。Storm然后会相应地replay对应的batch。你不需要自己手动做任何acking或者anchoring(emit的时候发生的动作)。
内置的批处理api:storm在普通bolt之上包装了一层API来提供对tuple的批处理支持。storm管理所有的协调工作,包括决定什么时候一个bolt接收到一个特定transaction的所有tuple,storm同时也会自动清理每个transaction所产生的中间数据。
事物性的spout需要实现ITransactionalSpout,这个接口包含两个内部类接口类Coordinator和Emmiter。在topology运行的时候,事务性的spout内部包含一个子topology。
这里面有两种类型的tuple,一种是事务性的tuple,一种是batch中的tuple;
coordinator开启一个事务准备发射一个batch时候,进入一个事务的processing阶段,会发射一个事务性tuple(transactionAttempt & metadata)到“batch emit”流
emitter以all grouping(广播)的方式订阅coordinator的“batch emit”流,负责为每个batch实际发射tuple,发送的tuple都必须以transactionAttempt作为第一个field,storm根据这个field来判断tuple属于哪一个batch。
coordinator只有一个,emmitter根据并行度可以有多个实例</br>
<font color="#4590a3" size = "3px">transactionAttempt包含两个值:一个transaction id,一个attempt id。
transaction id的作用就是我们上面说的对每个batch中的tuple是唯一的,而不管这个batch replay多少次都是一样的。
attemp id是对于每个batch唯一的一个id,但对于同一个batch,它replay之后的attempt id和replay之前的就不一样了。
我们可以把attempt id理解成replay-times,storm利用这个id来区别一个batch发送的tuple的不同版本。
metadata(元数据)中包含当前事务可以从哪个point进行重放数据,存放到zookeeper中的,spout可以通过Kryo从zookeeper中序列化和反序列化该元数据。
</font>
事务bolt:
BaseTransactionalBolt:
处理batch在一起的tuples,对于每一个tuple调用execute方法,而整个batch处理(processing)完成的时候调用finishBatch方法。如果BatchBolt被标记成committer,则只能在commit阶段调用finishBatch方法。一个batch的commit阶段由storm保证只在前一个batch成功提交之后才会执行。并且它会重试直到topology里面的所有bolt在commit完成提交。那么如何直到batch的processing完成了,也就是bolt是否接受处理了batch里面所有的tuple:在bolt内部,有一个CoordinatedBolt的模型。
CoordinateBolt:
每个CoordinateBolt记录两个值:有哪些task给我发送了tuple(根据topology的grouping信息);我给哪些task发送信息(同样根据grouping信息)。
等所有的tuple都发送完成之后,coordinatorBolt通过另外一个特殊的stream以emitDirect的方式告诉所有它发送过tuple的task,它发送了多少tuple给这个task。下游task会将这个数字和自己已经接收到的tuple数量做对比,如果相等,则说明处理完了所有的tuple。
下游CoordinateBolt会重复上面的步骤,通知其下游。
![](/Users/shuaidong/Documents/Yu Writer Libraries/Default/storm/pic/coordinateBolt.png)
案例
事务一般只适合做汇总型的统计
定义元数据
import java.io.Serializable;
public class MyMdata implements Serializable {
//必须实现序列化接口
private static final long serialVersionUID = -2894782092366251691L;
private long beginPoint;//事务开始位置
private long num;//bitch的tuple个数
public static long getSerialVersionUID() {
return serialVersionUID;
}
public long getBeginPoint() {
return beginPoint;
}
public void setBeginPoint(long beginPoint) {
this.beginPoint = beginPoint;
}
public long getNum() {
return num;
}
public void setNum(long num) {
this.num = num;
}
}
定义事务spout
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.tuple.Fields;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class MyTxSpout implements ITransactionalSpout<MyMdata> {
Map<Long,String> dbMap = new HashMap<>();
public MyTxSpout() {
Random _rand = new Random();
String[] hosts={"www.haoyidao.com"};
String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
StringBuffer stringBuffer = new StringBuffer();
for (long i=0;i<100;i++){
dbMap.put(i,hosts[0] + "\t" + session_id[_rand.nextInt(5)]+"\t"+ _rand.nextInt(8));
}
}
//getCoordinator方法,告诉Storm用来协调生成批次的类
@Override
public Coordinator<MyMdata> getCoordinator(Map map, TopologyContext topologyContext) {
return new MyCoordinator();
}
//getEmitter,负责读取批次并把它们分发到拓扑中的数据流组
@Override
public Emitter<MyMdata> getEmitter(Map map, TopologyContext topologyContext) {
return new MyEmitter(dbMap);
}
//定义类型
// 最后,就像之前做过的,需要声明要分发的域。
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tx","log"));
}
//获得配置文件
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
定义协调者告诉storm如何协调生成批次,即实现Coordinator,代码如下
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.utils.Utils;
import java.math.BigInteger;
public class MyCoordinator implements ITransactionalSpout.Coordinator<MyMdata> {
private static int BATCH_NUM = 10;
//bigInteger为事务txid myMdata上一个元数据
@Override
public MyMdata initializeTransaction(BigInteger bigInteger, MyMdata myMdata) {
long beginPoint;
if (myMdata==null){
beginPoint = 0;
}else {
beginPoint = myMdata.getBeginPoint()+myMdata.getNum();
}
MyMdata myMdata1 = new MyMdata();
myMdata1.setBeginPoint(beginPoint);
myMdata1.setNum(BATCH_NUM);
System.out.println("启动一个事务:"+myMdata1.toString());
return myMdata1;
}
@Override
public boolean isReady() {
//没启动之后让其等待2秒在执行下一个事务
// Utils.sleep(2000);
System.out.println("myready start 执行。。。");
return true;
}
@Override
public void close() {
}
}
实现实际执行批次的类emitter,将事务中元数据提交到topology中
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.transactional.ITransactionalSpout;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Values;
import java.math.BigInteger;
import java.util.Map;
public class MyEmitter implements ITransactionalSpout.Emitter<MyMdata> {
//TransactionAttempt 事务的标示,同一个批次被strom重发了,txid是相同的,但是TransactionAttempt是不同的
private Map<Long,String> dbMap;
public MyEmitter(Map map) {
this.dbMap =map;
}
@Override
public void emitBatch(TransactionAttempt transactionAttempt, MyMdata myMdata, BatchOutputCollector batchOutputCollector) {
long beginPoint = myMdata.getBeginPoint();
long num = myMdata.getNum();
for (long i=beginPoint;i<beginPoint+num;i++){
if (dbMap.get(i)==null){
continue;
}
batchOutputCollector.emit(new Values(transactionAttempt,dbMap.get(i)));
}
}
@Override
public void cleanupBefore(BigInteger bigInteger) {
}
@Override
public void close() {
}
}
定义blot,实现对每一个批次的处理、统计
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Iterator;
import java.util.Map;
public class MytransactionBlot extends BaseTransactionalBolt{
private int count = 0;
BatchOutputCollector batchOutputCollector;
TransactionAttempt transactionAttempt;
//初始化
@Override
public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
this.batchOutputCollector = batchOutputCollector;
this.transactionAttempt = transactionAttempt;
}
//从emit接到每一行处理,同一个批次处理完成后交给finishBatch
@Override
public void execute(Tuple tuple) {
TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0);
System.out.println("MytransactionBlot TransactionAttempt id:"+tx.getAttemptId()+" txid:"+tx.getTransactionId().toString());
String log = String.valueOf(tuple.getValue(1));
Iterator iterator = tuple.getFields().iterator();
while (iterator.hasNext()){
System.out.println("filds字段值:"+iterator.next());
}
if (log!=null&&log.length()>0){
count++;
}
}
//同一个批次处理完成后做一个处理
@Override
public void finishBatch() {
// System.out.println("prepare method getAttemptId"+transactionAttempt.getAttemptId()+"txid:"+transactionAttempt.getTransactionId().toString());
batchOutputCollector.emit(new Values(transactionAttempt,count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tx","count"));
}
}
定义一个统一统计的类commit,对上一级多线程进行统计,在构建topo时这个bolt必须为单线程
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
public class MyCommitter extends BaseTransactionalBolt implements ICommitter,Serializable {
private static final long serialVersionUID = 1136043849412072523L;
public static Map<String,DbValue> dbValueMap = new HashMap<>();
public static final String GLOBAL_KEY = "GLOBAL_KEY";
int sum = 0;
TransactionAttempt transactionAttempt;
@Override
public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
this.transactionAttempt = transactionAttempt;
System.out.println("MyTransactionBolt prepare getAttemptId:"+transactionAttempt.getAttemptId()+"getTransactionId:"+transactionAttempt.getTransactionId());
}
//execute会从emit中得到每一行进行处理,同一个批次处理完成交给finishBatch
@Override
public void execute(Tuple tuple) {
TransactionAttempt tx = (TransactionAttempt) tuple.getValue(0);
String log = tuple.getString(1);
System.out.println("execut bolt TransactionAttempt id:"+tx.getAttemptId()+"TransactionId:"+tx.getTransactionId());
if (log!=null&&log.length()>0){
sum++;
}
// sum += tuple.getInteger(1);
}
@Override
public void finishBatch() {
DbValue value = dbValueMap.get(GLOBAL_KEY);
if (value == null|| !value.txid.equals(transactionAttempt.getTransactionId())){
//更新数据库
DbValue newValue = new DbValue();
newValue.txid = this.transactionAttempt.getTransactionId();
if (value ==null){
newValue.count = sum;
}else {
newValue.count = value.count+sum;
}
dbValueMap.put(GLOBAL_KEY,newValue);
}else {
}
System.out.println("total===============:"+dbValueMap.get(GLOBAL_KEY).count);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
public static class DbValue{
BigInteger txid;
int count = 0;
}
}
定义main方法,进行提交到storm中
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.transactional.TransactionalTopologyBuilder;
import java.util.HashMap;
import java.util.Map;
public class MyTopo {
public static void main(String[] args) {
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutId",new MyTxSpout(),1);
builder.setBolt("bolt1",new MytransactionBlot(),3).shuffleGrouping("spoutId");
builder.setBolt("committer",new MyCommitter(),1).shuffleGrouping("bolt1");
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS,4);
// System.setProperty("storm.jar","/Users/shuaidong/Downloads/StromLearning/transaction/target/Demo03.jar");
if (args!=null&&args.length>0){
try {
StormSubmitter.submitTopology(args[0],conf,builder.buildTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
}else {
try {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("myTxToPo",conf,builder.buildTopology());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}