不透明分区事务


title: storm不透明分区事务
date: 2017-08-27
categoties:
-storm
tags:

  • storm
  • java

不透明分区事务

一般情况下storm是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化,在prepare()方法中对不可序列化的对象进行实例化

接口说明

IOpaquePartitionedTransactionalSpout<T>:不透明分区事务Spout
--IOpaquePartitionedTransactionalSpout.Coordinator
--isReady():为true开启一个新的事务,没提交一个事务停顿一段时间,可以在此进行sleep
--IOpaquePartitionedTransactionalSpout.Emitter<X>
-- emitPartitionBatch(TransactionAttempt tx,BatchOutputCollector collector,int partition,X lastPartitionMeta)
-- numPartitions()

不透明分区事务特点

以下X表示元数据
它不区分发新消息还是重发旧消息,全部用emitPartitionBatch搞定。虽然emitPartitionBatch返回的X应该是下一批次供自己使用的(emitPartitionBatch的第4个参数),但是只有一个批次成功以后X才会更新到ZooKeeper中,如果失败重发,emitPartitionBatch读取的X还是旧的。所以这时候自定义的X不需要记录当前批次的开始位置和下一批次的开始位置两个值,只需要记录下一批次开始位置一个值即可,例如:
public class BatchMeta{
public long nextOffset; //下一批次的偏移量
}

实例

制造一批数据对数据按照时间进行内的进行统计

定义不透明分区事务的spout,在构造函数中制造数据源

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

public class MyOpaquePtTxSpout implements IOpaquePartitionedTransactionalSpout<MyMdata> {


    private static final long serialVersionUID = -1889920861418121463L;

    public static Map<Integer,Map<Long,String>> Partion_Data_Base = new HashMap<Integer, Map<Long, String>>();
    public static int PART_COUNT = 5;

    public MyOpaquePtTxSpout() {

        Random _rand = new Random();
        String[] hosts={"www.haoyidao.com"};
        String[] session_id =  { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
                "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };

        String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53",
                "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };

        for (int i=0;i<PART_COUNT;i++){
            Map<Long,String> map = new HashMap<Long, String>();
            for (long j=0;j<30;j++){
                map.put(j,hosts[0]+"\t"+session_id[_rand.nextInt(5)]+"\t"+_rand.nextInt(8));
            }
            Partion_Data_Base.put(i,map);
        }

        System.err.println("MtPtTxSpout start.....");

    }

    public Emitter<MyMdata> getEmitter(Map map, TopologyContext topologyContext) {
        return new MyEmitter();
    }

    public Coordinator getCoordinator(Map map, TopologyContext topologyContext) {
        return new MyCoordinator();
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("tx","today","log"));
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public class MyCoordinator implements IOpaquePartitionedTransactionalSpout.Coordinator{

        public boolean isReady() {
            Utils.sleep(1000);
            return true;
        }

        public void close() {

        }
    }

    public static final long BITCHNUM = 10;

    public class MyEmitter implements IOpaquePartitionedTransactionalSpout.Emitter<MyMdata>{

        public MyMdata emitPartitionBatch(TransactionAttempt transactionAttempt, BatchOutputCollector batchOutputCollector, int i, MyMdata myMdata) {

            long beginPoint = 0;
            if (myMdata == null){
                beginPoint = 0;
            }else {
                beginPoint = myMdata.getBeginPoint()+myMdata.getNum();
            }
            MyMdata myMdata1 = new MyMdata();
            myMdata1.setBeginPoint(beginPoint);
            myMdata1.setNum(BITCHNUM);
            System.err.println("启动一个事务:"+myMdata1.toString());

            //进行发射数据
            Map<Long,String> batchMap = Partion_Data_Base.get(i);
            for (Long j = myMdata1.getBeginPoint();j<myMdata1.getBeginPoint()+myMdata1.getNum();j++){
                //表示一个分区消息发送完了
                if (batchMap.size()<j){
                    break;
                }
                batchOutputCollector.emit(new Values(transactionAttempt,"2017-08-09",batchMap.get(j)));
            }

            return myMdata1;
        }

        public int numPartitions() {
            return 5;
        }

        public void close() {

        }
    }
}

定义bolt

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.coordination.IBatchBolt;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.Map;

public class Mybolt implements IBatchBolt<TransactionAttempt> {

    private static  Map<String,Integer> countMap = new HashMap<String, Integer>();
    private BatchOutputCollector batchOutputCollector;
    private TransactionAttempt transactionAttempt;
    DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
    String today;
    Integer count;



    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {
        this.batchOutputCollector = batchOutputCollector;
        this.transactionAttempt = transactionAttempt;

    }

    public void execute(Tuple tuple) {
        TransactionAttempt transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        String log = (String) tuple.getValue(1);
        String[] strings = log.split("\t");
        today = "2014-01-07";
         count = countMap.get(today);
        if (count == null){
            count = 0;
        }
        count++;
        countMap.put(today,count);

    }

    public void finishBatch() {
        System.err.println(this.transactionAttempt.toString()+"---"+today+"----"+count);
        this.batchOutputCollector.emit(new Values(this.transactionAttempt,today,count));

    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        outputFieldsDeclarer.declare(new Fields("tx","today","count"));

    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

定义提交报告的commit

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;

public class MyCommitbolt extends BaseTransactionalBolt implements ICommitter{

    private static final long serialVersionUID = 1550322821850864035L;

    public static Map<String,DbValue> dbValueMap = new HashMap<String, DbValue>();
    Map<String,Integer> countMap = new HashMap<String, Integer>();
    TransactionAttempt transactionAttempt;
    BatchOutputCollector collector;
    String today = null;


    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {

        this.transactionAttempt = transactionAttempt;
        this.collector = batchOutputCollector;
    }

    public void execute(Tuple tuple) {
        today = tuple.getString(1);
        Integer count = tuple.getInteger(2);
        transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        if (today!=null&&count!=null){
            Integer batchCount = countMap.get(today);
            if (batchCount==null){
                batchCount = 0;
            }
            batchCount = batchCount+count;
            countMap.put(today,batchCount);
        }

    }

    public void finishBatch() {
        if (countMap.size()>0){
            DbValue dbValue = dbValueMap.get(today);

            DbValue newValue;

            if (dbValue==null||!dbValue.txid.equals(transactionAttempt.getTransactionId())){
                newValue = new DbValue();
                newValue.txid = transactionAttempt.getTransactionId();
                newValue.dataStr = today;
                if (dbValue==null){
                    newValue.count = countMap.get(today);
                    newValue.pre_cout = 0;
                }else {
                    newValue.count = dbValue.count+countMap.get(today);
                    newValue.pre_cout = dbValue.count;
                }

                dbValueMap.put(today,newValue);
            }

            System.out.println("total==========:"+dbValueMap.get(today).count);

        }


    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    public static class DbValue{
        BigInteger txid;
        int count = 0;
        String dataStr;
        int pre_cout;
    }
}

定义topo结构

import org.apache.storm.coordination.BatchOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseTransactionalBolt;
import org.apache.storm.transactional.ICommitter;
import org.apache.storm.transactional.TransactionAttempt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;

public class MyCommitbolt extends BaseTransactionalBolt implements ICommitter{

    private static final long serialVersionUID = 1550322821850864035L;

    public static Map<String,DbValue> dbValueMap = new HashMap<String, DbValue>();
    Map<String,Integer> countMap = new HashMap<String, Integer>();
    TransactionAttempt transactionAttempt;
    BatchOutputCollector collector;
    String today = null;


    public void prepare(Map map, TopologyContext topologyContext, BatchOutputCollector batchOutputCollector, TransactionAttempt transactionAttempt) {

        this.transactionAttempt = transactionAttempt;
        this.collector = batchOutputCollector;
    }

    public void execute(Tuple tuple) {
        today = tuple.getString(1);
        Integer count = tuple.getInteger(2);
        transactionAttempt = (TransactionAttempt) tuple.getValue(0);
        if (today!=null&&count!=null){
            Integer batchCount = countMap.get(today);
            if (batchCount==null){
                batchCount = 0;
            }
            batchCount = batchCount+count;
            countMap.put(today,batchCount);
        }

    }

    public void finishBatch() {
        if (countMap.size()>0){
            DbValue dbValue = dbValueMap.get(today);

            DbValue newValue;

            if (dbValue==null||!dbValue.txid.equals(transactionAttempt.getTransactionId())){
                newValue = new DbValue();
                newValue.txid = transactionAttempt.getTransactionId();
                newValue.dataStr = today;
                if (dbValue==null){
                    newValue.count = countMap.get(today);
                    newValue.pre_cout = 0;
                }else {
                    newValue.count = dbValue.count+countMap.get(today);
                    newValue.pre_cout = dbValue.count;
                }

                dbValueMap.put(today,newValue);
            }

            System.out.println("total==========:"+dbValueMap.get(today).count);

        }


    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    public static class DbValue{
        BigInteger txid;
        int count = 0;
        String dataStr;
        int pre_cout;

    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容