Storm的消息保证机制

Storm提供了三种不同层次的消息保证机制,分别是At Most Once,At Least Once,Exactly Once.消息保证机制依赖于消息是否被完全处理
消息完全处理
每个从Spout(Storm中数据源点)发出的Tuple(Storm中最小的消息单元)可能会生成成千上万个新的Tuple,形成一颗Tuple树,当整颗Tuple树的节点都被成功处理了,我们就说从Spout发出的Tuple被完全处理了。我们可以通过下面的例子来更好的诠释消息是否被完全处理这个概念:

            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("sentences",new KafkaSpout(spoutConfig),spoutNum);
            builder.setBolt("split",new SplitSentence(),10).shuffleGrouping("sentences");
            builder.setBolt("count",new WordCount(),20).fieldsGrouping("split",new Fields("word"));

这个Topology从Kafka(一个开源的分布式消息队列)读取信息发往下游,下游的Bolt将收到句子分割成单独的单词,并进行计数。每一个从Spout发送出来的Tuple会衍生出多个新的Tuple,从Spout发送出来的Tuple以及后续衍生出来的Tuple形成一颗Tuple树,下图是一颗Tuple树示例:

Paste_Image.png

上图中所有的Tuple都被成功处理了,我们才认为Spout发出的Tuple被完全处理。如果在一个固定的时间内(这个时间可以配置,默认为30秒),有至少一个Tuple处理失败或者超时,则认为整颗Tuple树处理失败,即从Spout发出的Tuple处理失败

如何实现不同层次的消息保证机制
Storm提供的三种不同消息保证机制中,利用Spout,Bolt以及Acker的组合可以实现At Most Once以及At Least Once语义,Storm在At Least Once的基础上进行了一次封装(Trident),从而实现Exactly Once语义
Storm的消息保证机制中,如果需要实现At Most Once语义,只需要满足下面任何一条即可:

        1.关闭ACK机制,即Acker数目设置为0
        2.Spout不实现可靠性传输
                Spout发送消息是使用不带message ID的API
                不实现fail函数
        3.Bolt不把处理成功或失败的消息发送给Acker

如果需要实现At Least Once 语义,则需要同时保证如下几条:

         1.开启ACK机制,即Acker数目大于0
         2.Spout实现可靠性传输保证
                  Spout发送消息附带message 的ID
                  如果收到Acker的处理失败反馈,需要进行消息重传,即实现fail函数
         3.Bolt在处理成功或失败后需要调用相应的方法通知Acker

实现Exactly Once语义,则需要在At Least Once的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在Storm中使用Trident API实现

下图中,每种消息保证机制中左边的字母表示上游发送的消息,右边的字母表示下游收到的消息。从图中直到,At Most Once中,消息可能会丢失(上游发送了两个A,下游只收到一个A);At Least Once中,消息不会丢失,可能会重复(上游只发送了一个B,下游收到两个B);Exactly Once中,消息不丢失,不重复,因此需要在At Least Once的基础上保存相应的状态,表示上游的哪些消息已经发送到下游 ,防止同一条消息发送多次给下游的情况

Paste_Image.png

Tuple的完全处理需要Spout、Bolt以及Acker(Storm中用来记录某颗Tuple树是否被完全处理的节点)协调完成,如图所示。从Spout发送Tuple到下游,并把相应信息通知给Acker,整颗Tuple树中某个Tuple被成功处理了都会通知Acker,待整颗Tuple树都被完全处理完成之后,Acker将成功处理信息返回给Spout;如果某个Tuple处理失败,或者超时,Acker将会给Spout发送一个处理失败的消息,Spout根据Acker的返回信息以及用户对消息保证机制的选择判断是否需要进行消息重传

Paste_Image.png

不同消息可靠性保证的使用场景
对于Storm提供的三种消息可靠性保证,优缺点以及使用场景如下所示:

  可靠性保证层次        优点        缺点        使用场景
   At Most Once      处理速度快  数据可能丢失  对处理速度要求高,且对数据丢失容忍度高的场景
   At Least Once    数据不会丢失  数据可能重复   不能容忍数据丢失,可以容忍数据重复的场景
   Exactly  Once    数据不会丢失不会重复  处理速度慢  对数据不丢不重性质要求高非常高,且处理速度要求没那么高,比如支付金额

不同层次的可靠性如何实现
如何实现可靠的Spout

    实现可靠的Spout需要在nextTuple函数中发送消息时,调用带msgID的emit方法,然后实现失败消息重传(fail函数),参考如下所示:
      //想实现可靠的Spout,需要实现如下两点
      //1.在nextTuple函数中调用emit函数时需要带一个msgId,用来表示当前的消息(如果消息发送失败会用msgID作为参数回调fail函数)
     //2.自己实现fail函数,进行重发(注意,在storm中没有msgID和消息的对应关系,需要自己维护)

  public void nextTuple(){
            collector.emit(new Values(curNum+" ",round+" "),curNum+":"+round);
    }
  @Override
    public void fail(Object msgId){
              String tmp = (String)msgId;
              String[] args = tmp.split(":");
              collector.emit(new Values(args[0],args[1]),msgId);
      }

如何实现可靠的Bolt
Storm提供了两种不同类型的Bolt,分别是BaseRichBolt和BaseBasicBolt,都可以实现可靠性消息传递,不过BaseRichBolt需要自己做很多周边的事情(建立anchor树,以及手动ACL/FAIL,通知Acker),使用场景更广泛,而BaseBasicBolt则由Storm帮忙实现很多周边的事情,实现起来方便简单,但是使用场景单一。如果使用这两个Bolt实现(不)可靠的消息传递如下所示:

      //BaseRichBolt实现不可靠消息传递
      public class SplitSentence extends BaseRichBolt{
                  OutputCollector _collector;
                  
                  public void prepare(Map conf,TopologyContext context,OutputCollector collector){
                  _collector  = collector;
                  }
                  
                  public void execute(Tuple tuple){
                              String sentence = tuple.getString(0);
                              for(String word:sentence.split(" ")){
                                              _collector.emit(new Values(word));
                              }
                              _collector.ack(tuple);
                      }

                  public void declareOutputFields(OutputFieldsDeclarer declarer){
                           declarer.declare(new Fields("word"));
                  }
        }

      //BaseRichBolt实现可靠的Bolt
      public class SplitSentence extends BaseRichBolt{
                           OutputCollector _collector;
                            
                          public void prepare(Map conf,TopologyContext context,OutputCollector collector){
                                    _collector = collector;
                          }
                          
                          public void execute(Tuple tuple){
                                      String sentence  =tuple.getString(0);
                                      for(String word:sentence.split(" ")){
                                                _collector.emit(tuple,new Values(word));
                                      }
                                    _collector.ack(tuple);
                      }

                      public void declareOutputFields(OutputFieldsDeclarer declarer){
                                  declarer.declare(new Fields("word"));
                      }
          }

    下面的示例会可以建立Multi-anchoring
          List<Tuple> anchors = new ArrayList<Tuple>();
          anchors.add(tuple1);
          anchors.add(tuple2);
          _collector.emit(anchors,new Values(1,2,3));
            
          public class SplitSentence extends BaseBasicBolt{
                        public void execute(Tuple tuple,BasicOutputCollector collector){ 
                             String sentence = tuple.getString(0);
                              for(String word:sentence.split(" ")){
                                          collector.emit(new Values(word));
                              }
                          }

                          public void declareOutputFields(OutputFieldsDeclarer declarer){
                                declarer.declare(new Fields("word"));
                      }
                }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354

推荐阅读更多精彩内容