title: storm不透明分区事务
date: 2017-08-27
categoties:
-storm
tags:
- storm
- java
不透明分区事务
一般情况下storm是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化,在prepare()方法中对不可序列化的对象进行实例化
接口说明
IOpaquePartitionedTransactionalSpout<T>:不透明分区事务Spout
--IOpaquePartitionedTransactionalSpout.Coordinator
--isReady():为true开启一个新的事务,没提交一个事务停顿一段时间,可以在此进行sleep
--IOpaquePartitionedTransactionalSpout.Emitter<X>
-- emitPartitionBatch(TransactionAttempt tx,BatchOutputCollector collector,int partition,X lastPartitionMeta)
-- numPartitions()
不透明分区事务特点
以下X表示元数据
它不区分发新消息还是重发旧消息,全部用emitPartitionBatch搞定。虽然emitPartitionBatch返回的X应该是下一批次供自己使用的(emitPartitionBatch的第4个参数),但是只有一个批次成功以后X才会更新到ZooKeeper中,如果失败重发,emitPartitionBatch读取的X还是旧的。所以这时候自定义的X不需要记录当前批次的开始位置和下一批次的开始位置两个值,只需要记录下一批次开始位置一个值即可,例如:
public class BatchMeta{
public long nextOffset; //下一批次的偏移量
}
实例
制造一批数据对数据按照时间进行内的进行统计
定义不透明分区事务的spout,在构造函数中制造数据源
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
public class MyOpaquePtTxSpout implements IOpaquePartitionedTransactionalSpout<MyMdata> {
private static final long serialVersionUID = -1889920861418121463L;
public static Map<Integer,Map<Long,String>> Partion_Data_Base = new HashMap<Integer, Map<Long, String>>();
public static int PART_COUNT = 5;
public MyOpaquePtTxSpout() {
Random _rand = new Random();
String[] hosts={"www.haoyidao.com"};
String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
for (int i=0;i<PART_COUNT;i++){
Map<Long,String> map = new HashMap<Long, String>();
for (long j=0;j<30;j++){
map.put(j,hosts[0]+"\t"+session_id[_rand.nextInt(5)]+"\t"+_rand.nextInt(8));
}
Partion_Data_Base.put(i,map);
}
System.err.println("MtPtTxSpout start.....");
}
public Emitter<MyMdata> getEmitter(Map map, TopologyContext topologyContext) {
return new MyEmitter();
}
public Coordinator getCoordinator(Map map, TopologyContext topologyContext) {
return new MyCoordinator();
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tx","today","log"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
public class MyCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator{
public boolean isReady() {
Utils.sleep(1000);
return true;
}
public void close() {
}
}
public static final long BITCHNUM = 10;
public class MyEmitter implements IOpaquePartitionedTransactionalSpout.Emitter<MyMdata>{
public MyMdata emitPartitionBatch(TransactionAttempt transactionAttempt, BatchOutputCollector batchOutputCollector, int i, MyMdata myMdata) {
long beginPoint = 0;
if (myMdata == null){
beginPoint = 0;
}else {
beginPoint = myMdata.getBeginPoint()+myMdata.getNum();
}
MyMdata myMdata1 = new MyMdata();
myMdata1.setBeginPoint(beginPoint);
myMdata1.setNum(BITCHNUM);
System.err.println("启动一个事务:"+myMdata1.toString());
//进行发射数据
Map<Long,String> batchMap = Partion_Data_Base.get(i);
for (Long j = myMdata1.getBeginPoint();j<myMdata1.getBeginPoint()+myMdata1.getNum();j++){
//表示一个分区消息发送完了
if (batchMap.size()<j){
break;
}
batchOutputCollector.emit(new Values(transactionAttempt,"2017-08-09",batchMap.get(j)));
}
return myMdata1;
}
public int numPartitions() {
return 5;
}
public void close() {
}
}
}
定义bolt
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.coordination.IBatchBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;
public class Mybolt implements IBatchBolt<TransactionAttempt> {
private static Map<String,Integer> countMap = new HashMap<String, Integer>();
private BatchOutputCollector batchOutputCollector;
private TransactionAttempt transactionAttempt;
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String today;
Integer count;
public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
this.batchOutputCollector = batchOutputCollector;
this.transactionAttempt = transactionAttempt;
}
public void execute(Tuple tuple) {
TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
String log = (String) tuple.getValue(1);
String[] strings = log.split("\t");
today = "2014-01-07";
count = countMap.get(today);
if (count == null){
count = 0;
}
count++;
countMap.put(today,count);
}
public void finishBatch() {
System.err.println(this.transactionAttempt.toString()+"---"+today+"----"+count);
this.batchOutputCollector.emit(new Values(this.transactionAttempt,today,count));
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("tx","today","count"));
}
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
定义提交报告的commit
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
public class MyCommitbolt extends BaseTransactionalBolt implements ICommitter{
private static final long serialVersionUID = 1550322821850864035L;
public static Map<String,DbValue> dbValueMap = new HashMap<String, DbValue>();
Map<String,Integer> countMap = new HashMap<String, Integer>();
TransactionAttempt transactionAttempt;
BatchOutputCollector collector;
String today = null;
public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
this.transactionAttempt = transactionAttempt;
this.collector = batchOutputCollector;
}
public void execute(Tuple tuple) {
today = tuple.getString(1);
Integer count = tuple.getInteger(2);
transactionAttempt = (TransactionAttempt) tuple.getValue(0);
if (today!=null&&count!=null){
Integer batchCount = countMap.get(today);
if (batchCount==null){
batchCount = 0;
}
batchCount = batchCount+count;
countMap.put(today,batchCount);
}
}
public void finishBatch() {
if (countMap.size()>0){
DbValue dbValue = dbValueMap.get(today);
DbValue newValue;
if (dbValue==null||!dbValue.txid.equals(transactionAttempt.getTransactionId())){
newValue = new DbValue();
newValue.txid = transactionAttempt.getTransactionId();
newValue.dataStr = today;
if (dbValue==null){
newValue.count = countMap.get(today);
newValue.pre_cout = 0;
}else {
newValue.count = dbValue.count+countMap.get(today);
newValue.pre_cout = dbValue.count;
}
dbValueMap.put(today,newValue);
}
System.out.println("total==========:"+dbValueMap.get(today).count);
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
public static class DbValue{
BigInteger txid;
int count = 0;
String dataStr;
int pre_cout;
}
}
定义topo结构
import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
public class MyCommitbolt extends BaseTransactionalBolt implements ICommitter{
private static final long serialVersionUID = 1550322821850864035L;
public static Map<String,DbValue> dbValueMap = new HashMap<String, DbValue>();
Map<String,Integer> countMap = new HashMap<String, Integer>();
TransactionAttempt transactionAttempt;
BatchOutputCollector collector;
String today = null;
public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
this.transactionAttempt = transactionAttempt;
this.collector = batchOutputCollector;
}
public void execute(Tuple tuple) {
today = tuple.getString(1);
Integer count = tuple.getInteger(2);
transactionAttempt = (TransactionAttempt) tuple.getValue(0);
if (today!=null&&count!=null){
Integer batchCount = countMap.get(today);
if (batchCount==null){
batchCount = 0;
}
batchCount = batchCount+count;
countMap.put(today,batchCount);
}
}
public void finishBatch() {
if (countMap.size()>0){
DbValue dbValue = dbValueMap.get(today);
DbValue newValue;
if (dbValue==null||!dbValue.txid.equals(transactionAttempt.getTransactionId())){
newValue = new DbValue();
newValue.txid = transactionAttempt.getTransactionId();
newValue.dataStr = today;
if (dbValue==null){
newValue.count = countMap.get(today);
newValue.pre_cout = 0;
}else {
newValue.count = dbValue.count+countMap.get(today);
newValue.pre_cout = dbValue.count;
}
dbValueMap.put(today,newValue);
}
System.out.println("total==========:"+dbValueMap.get(today).count);
}
}
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
public static class DbValue{
BigInteger txid;
int count = 0;
String dataStr;
int pre_cout;
}
}