Yarn之状态机分析

Yarn之状态机分析

前言

hadoop2.x.x版本的底层实现中作了很多优化:用状态机对各种对象生命周期和状态转移进行管理;采用事件机制避免线程同步与阻塞。主要分析下yarn中状态机的实现机制

一、事件

YARN中的很多组件之间进行通信,主要借助于事件。为了可读性、可维护性及可扩展性,YARN中的事件由事件名称和事件类型组成。比如JobImpl处理的事件名称为JobEvent,而事件类型为JobEventType.

二、状态

YARN中的很多组件之间进行通信,主要借助于事件。为了可读性、可维护性及可扩展性,YARN中的事件由事件名称和事件类型组成。比如JobImpl处理的事件名称为JobEvent,而事件类型为JobEventType,如下所示

public enum JobStateInternal {
  NEW,
  SETUP,
  INITED,
  RUNNING,
  COMMITTING,
  SUCCEEDED,
  FAIL_WAIT,
  FAIL_ABORT,
  FAILED,
  KILL_WAIT,
  KILL_ABORT,
  KILLED,
  ERROR,
  REBOOT
}

我们看到JobImpl的内部状态包括新建(NEW)、初始化(INITED)、运行中(RUNNING)、提交中(COMMITTING)、成功(SUCCEEDED)、失败(FAILED)等

三、转化

我们已经了解了事件与状态的基本实现与概念,那么事件与状态有什么关系?一个对象当前处于状态state0,当对象接收到事件Event后,将引发转换动作transition,最终当前对象的状态过渡到state1.

1)Yarn中与状态转化相关的类图如下

image.png

2)SingleArcTransition

@Public
@Evolving
public interface SingleArcTransition<OPERAND, EVENT> {
  /**
   * Transition hook.
   * 
   * @param operand the entity attached to the FSM, whose internal 
   *                state may change.
   * @param event causal event
   */
  public void transition(OPERAND operand, EVENT event);

}

由于SingleArcTransition的具体实现类只负责接收到事件后的具体操作或行为,并没有包含状态相关的信息,所以在状态机执行状态过渡时,并不是直接调用SingleArcTransition具体实现类的transition方法,而是由接口Transition定义(见代码清单3)真正的转态过渡(包括行为和状态改变)。

  • Transition接口
private interface Transition<OPERAND, STATE extends Enum<STATE>,
          EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
    STATE doTransition(OPERAND operand, STATE oldState,
                       EVENT event, EVENTTYPE eventType);
  }

  • SingleInternalArc接口

SingleInternalArc作为Transition接口的实现类,在代理SingleArcTransition的同时,负责状态变换

private class SingleInternalArc
                    implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {

    private STATE postState;
    private SingleArcTransition<OPERAND, EVENT> hook; // transition hook

    SingleInternalArc(STATE postState,
        SingleArcTransition<OPERAND, EVENT> hook) {
      this.postState = postState;
      this.hook = hook;
    }

    @Override
    public STATE doTransition(OPERAND operand, STATE oldState,
                              EVENT event, EVENTTYPE eventType) {
      if (hook != null) {
        hook.transition(operand, event);
      }
      return postState;
    }
  }

3)MultipleArcTransition

@Public
@Evolving
public interface MultipleArcTransition
        <OPERAND, EVENT, STATE extends Enum<STATE>> {

  /**
   * Transition hook.
   * @return the postState. Post state must be one of the 
   *                      valid post states registered in StateMachine.
   * @param operand the entity attached to the FSM, whose internal 
   *                state may change.
   * @param event causal event
   */
  public STATE transition(OPERAND operand, EVENT event);

}

由于MultipleArcTransition的具体实现类只负责接收到事件后的具体操作或行为,并没有包含状态相关的信息,所以在状态机执行状态过渡时,并不是直接调用MultipleArcTransition具体实现类的transition方法,而是通过代理类MultipleInternalArc。MultipleInternalArc也实现了Transition接口,并在代理 MultipleArcTransition 的转换行为的同时,负责状态变换.

  • MultipleInternalArc
private class MultipleInternalArc
              implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>{

    // Fields
    private Set<STATE> validPostStates;
    private MultipleArcTransition<OPERAND, EVENT, STATE> hook;  // transition hook

    MultipleInternalArc(Set<STATE> postStates,
                   MultipleArcTransition<OPERAND, EVENT, STATE> hook) {
      this.validPostStates = postStates;
      this.hook = hook;
    }

    @Override
    public STATE doTransition(OPERAND operand, STATE oldState,
                              EVENT event, EVENTTYPE eventType)
        throws InvalidStateTransitonException {
      STATE postState = hook.transition(operand, event);

      if (!validPostStates.contains(postState)) {
        throw new InvalidStateTransitonException(oldState, eventType);
      }
      return postState;
    }
  }

4)ApplicableTransition

为了将所有状态机中的状态过渡与状态建立起映射关系,YARN中提供了ApplicableTransition接口用于将SingleInternalArc和 MultipleInternalArc 添加到状态机的拓扑表中,提高在检索状态对应的过渡实现时的性能,ApplicableTransition的实现类为ApplicableSingleOrMultipleTransition类,其apply方法用于代理SingleInternalArc和MultipleInternalArc ,将它们添加到状态拓扑表中。

  • ApplicableTransition接口定义
private interface ApplicableTransition
             <OPERAND, STATE extends Enum<STATE>,
              EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
    void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);
  }

  • ApplicableSingleOrMultipleTransition
static private class ApplicableSingleOrMultipleTransition
             <OPERAND, STATE extends Enum<STATE>,
              EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
          implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {
    final STATE preState;
    final EVENTTYPE eventType;
    final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;

    ApplicableSingleOrMultipleTransition
        (STATE preState, EVENTTYPE eventType,
         Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {
      this.preState = preState;
      this.eventType = eventType;
      this.transition = transition;
    }

    @Override
    public void apply
             (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {
      Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
        = subject.stateMachineTable.get(preState);
      if (transitionMap == null) {
        // I use HashMap here because I would expect most EVENTTYPE's to not
        //  apply out of a particular state, so FSM sizes would be 
        //  quadratic if I use EnumMap's here as I do at the top level.
        transitionMap = new HashMap<EVENTTYPE,
          Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
        subject.stateMachineTable.put(preState, transitionMap);
      }
      transitionMap.put(eventType, transition);
    }
  }

可以看到ApplicableSingleOrMultipleTransition的apply方法就是为构建状态拓扑表

四、状态机

YARN中状态机的实现类是StateMachineFactory,它主要包含4个属性信息:

  • transitionsListNode:就是将状态机的一个个过渡的ApplicableTransition实现串联为一个列表,每个节点包含一个ApplicableTransition实现及指向下一个节点的引用
private class TransitionsListNode {
    final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
    final TransitionsListNode next;

    TransitionsListNode
        (ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition,
        TransitionsListNode next) {
      this.transition = transition;
      this.next = next;
    }
  }

transitionsListNode形成的过渡列表节点如下:

image.png
  • stateMachineTable:状态拓扑表,为了提高检索状态对应的过渡map而冗余的数据结构,此结构在optimized为真时,通过对transitionsListNode链表进行处理产生
image.png
  • defaultInitialState:对象创建时,内部有限状态机的默认初始状态。比如:JobImpl的内部状态机默认初始状态是JobStateInternal.NEW。

  • optimized:布尔类型,用于标记当前状态机是否需要优化性能,即构建状态拓扑表stateMachineTable。

  • 公共构造器

    StateMachineFactory 的公有构造器只有一个。

    public StateMachineFactory(STATE defaultInitialState) {
      this.transitionsListNode = null;
      this.defaultInitialState = defaultInitialState;
      this.optimized = false;
      this.stateMachineTable = null;
    }
    
    
  • 私有构造器

    StateMachineFactory 的 私有构造器有两个,其中代码清单11中的构造器在addTransition方法中使用。
    从其实现看出,此构造器的主要作用是构建transitionsListNode链表。

    private StateMachineFactory
        (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,
         ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) {
      this.defaultInitialState = that.defaultInitialState;
      this.transitionsListNode 
          = new TransitionsListNode(t, that.transitionsListNode);
      this.optimized = false;
      this.stateMachineTable = null;
    }
    
    

    下面的构造器则用于installTopology方法

    private StateMachineFactory
       (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,
        boolean optimized) {
     this.defaultInitialState = that.defaultInitialState;
     this.transitionsListNode = that.transitionsListNode;
     this.optimized = optimized;
     if (optimized) {
       makeStateMachineTable();
     } else {
       stateMachineTable = null;
     }
    }
    
    

    构造器当optimized参数为true时,调用了makeStateMachineTable方法,makeStateMachineTable的实现如下:

    private void makeStateMachineTable() {
      Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
        new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();
    
      Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
        prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();
    
      prototype.put(defaultInitialState, null);
    
      // I use EnumMap here because it'll be faster and denser.  I would
      //  expect most of the states to have at least one transition.
      stateMachineTable
         = new EnumMap<STATE, Map<EVENTTYPE,
                             Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);
    
      for (TransitionsListNode cursor = transitionsListNode;
           cursor != null;
           cursor = cursor.next) {
        stack.push(cursor.transition);
      }
    
      while (!stack.isEmpty()) {
        stack.pop().apply(this);
      }
    }
    
    

    1.创建堆栈stack,用于将transitionsListNode链表中各个节点持有的ApplicableSingleOrMultipleTransition压入栈中;

    2.创建状态拓扑表stateMachineTable,并在此拓扑表中插入一个额外的默认初始状态defaultInitialState与null的映射;

    3.迭代访问transitionsListNode链表,并将各个节点持有的ApplicableSingleOrMultipleTransition压入栈中;

    4.依次弹出栈顶的ApplicableSingleOrMultipleTransition,并应用其apply方法(已在前面小节介绍),持续不断的构建状态拓扑表stateMachineTable。

五、状态机的构建

为了简化叙述,本节以JobImpl中状态机的构建为例。由于JobImpl的状态机预设的(调用addTransition方法)加入的ApplicableSingleOrMultipleTransition非常多,我们节选其中的2个作为典型进行分析。最后还会分析installTopology方法的实现。

protected static final
    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> 
       stateMachineFactory
     = new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
              (JobStateInternal.NEW)

          // Transitions from NEW state
          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
              JobEventType.JOB_DIAGNOSTIC_UPDATE,
              DIAGNOSTIC_UPDATE_TRANSITION)
          .addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
              JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
          .addTransition
              (JobStateInternal.NEW,
              EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
              JobEventType.JOB_INIT,
              new InitTransition())
          // 省略其它addTransition调用
          // create the topology tables
          .installTopology();

构建JobImpl的状态机的步骤如下:

1.调用 StateMachineFactory 构造器创建一个初始的状态机

2.调用addTransition(STATE preState, STATE postState, EVENTTYPE eventType, SingleArcTransition<OPERAND, EVENT> hook)方法添加单弧过渡。从其实现(见代码清单15)可以知道addTransition方法将SingleArcTransition封装为SingleInternalArc,然后将SingleInternalArc封装为ApplicableSingleOrMultipleTransition,最后调用之前说的第一个私有构造器构建transitionsListNode链表

3.调用addTransition(STATE preState, Set<STATE> postStates, EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook)方法添加多弧过渡。从其实现(见代码清单16)可以知道addTransition方法将MultipleArcTransition封装为MultipleInternalArc,然后将MultipleInternalArc封装为ApplicableSingleOrMultipleTransition,最后调用之前说的第一个私有构造器构建transitionsListNode链表;

4.最后调用installTopology方法,其实现见代码清单17。installTopology正是在使用之前说的第二个私有构造器构建状态拓扑表stateMachineTable

public StateMachineFactory
           <OPERAND, STATE, EVENTTYPE, EVENT>
        addTransition(STATE preState, STATE postState,
                      EVENTTYPE eventType,
                      SingleArcTransition<OPERAND, EVENT> hook){
  return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
      (this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
         (preState, eventType, new SingleInternalArc(postState, hook)));
}

public StateMachineFactory
           <OPERAND, STATE, EVENTTYPE, EVENT>
        addTransition(STATE preState, Set<STATE> postStates,
                      EVENTTYPE eventType,
                      MultipleArcTransition<OPERAND, EVENT, STATE> hook){
  return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
      (this,
       new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
         (preState, eventType, new MultipleInternalArc(postStates, hook)));
}

public StateMachineFactory
           <OPERAND, STATE, EVENTTYPE, EVENT>
        installTopology() {
  return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);
}

Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
    new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();

  Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
    prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();

  prototype.put(defaultInitialState, null);

  // I use EnumMap here because it'll be faster and denser.  I would
  //  expect most of the states to have at least one transition.
  stateMachineTable
     = new EnumMap<STATE, Map<EVENTTYPE,
                         Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);

  for (TransitionsListNode cursor = transitionsListNode;
       cursor != null;
       cursor = cursor.next) {
    stack.push(cursor.transition);
  }

  //最终在这里,状态路由表都存放到一个StateMachineFactory中了
  while (!stack.isEmpty()) {
    stack.pop().apply(this);
  }

六、例子

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容