YARN 状态机的原理和使用

2023-11-02

有了异步处理,为什么还需要状态机?

  • 可以保存对象当前的状态。
  • 当接收到一个事件后,可以根据不同的状态,可以方便的决定怎么处理。
    如接收到用户发送的KILL事件时:
    当Job的状态为正在初始化时,只要把启动的进程kill就可以。
    当Job的状态为正在运行,并且是提交到yarn的作业时,除了要把启动的进程kill,还需要调用yarn application -kill 把 yarn上运行的作业kill。
    当Job的状态为运行结束时,这时就不用再把状态改为killed。

状态:

YARN中的每个组件都有其自身所处的一系列状态, 以ResourceManager的RMAppImpl为例:

public enum RMAppState {
  NEW,
  NEW_SAVING,
  SUBMITTED,
  ACCEPTED,
  RUNNING,
  FINAL_SAVING,
  FINISHING,
  FINISHED,
  FAILED,
  KILLING,
  KILLED
}

一个对象不能同时处于多个状态。
对象可以在不同状态间转换,也可以转换后还是之前的状态。

RMAppEventType

public enum RMAppEventType {
  // Source: ClientRMService
  START,
  RECOVER,
  KILL,
  MOVE, // Move app to a new queue

  // Source: Scheduler and RMAppManager
  APP_REJECTED,

  // Source: Scheduler
  APP_ACCEPTED,

  // Source: RMAppAttempt
  ATTEMPT_REGISTERED,
  ATTEMPT_UNREGISTERED,
  ATTEMPT_FINISHED, // Will send the final state
  ATTEMPT_FAILED,
  ATTEMPT_KILLED,
  NODE_UPDATE,

  // Source: Container and ResourceTracker
  APP_RUNNING_ON_NODE,

  // Source: RMStateStore
  APP_NEW_SAVED,
  APP_UPDATE_SAVED,
}

RMAppEvent

public class RMAppEvent extends AbstractEvent<RMAppEventType>{

  private final ApplicationId appId;
  private final String diagnosticMsg;

  public RMAppEvent(ApplicationId appId, RMAppEventType type) {
    this(appId, type, "");
  }

  public RMAppEvent(ApplicationId appId, RMAppEventType type,
      String diagnostic) {
    super(type);
    this.appId = appId;
    this.diagnosticMsg = diagnostic;
  }

  public ApplicationId getApplicationId() {
    return this.appId;
  }

  public String getDiagnosticMsg() {
    return this.diagnosticMsg;
  }
}

StateMachineFactory Example

private static final StateMachineFactory<RMAppImpl,
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent> stateMachineFactory
                               = new StateMachineFactory<RMAppImpl, 
                                           RMAppState,
                                           RMAppEventType,
                                           RMAppEvent>(RMAppState.NEW)


     // Transitions from NEW state
    .addTransition(RMAppState.NEW, RMAppState.NEW,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
    .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())
    .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
        new AppKilledTransition())
    .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(new AppRejectedTransition(),
          RMAppState.FAILED))

    // Transitions from NEW_SAVING state
    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.KILL,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.NEW_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
          new FinalSavingTransition(new AppRejectedTransition(),
            RMAppState.FAILED))
    .addTransition(RMAppState.NEW_SAVING, RMAppState.NEW_SAVING,
        RMAppEventType.MOVE, new RMAppMoveTransition())

     // Transitions from SUBMITTED state
    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.SUBMITTED, RMAppState.SUBMITTED,
        RMAppEventType.MOVE, new RMAppMoveTransition())
    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
        RMAppEventType.APP_REJECTED,
        new FinalSavingTransition(
          new AppRejectedTransition(), RMAppState.FAILED))
    .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
        RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
    .addTransition(RMAppState.SUBMITTED, RMAppState.FINAL_SAVING,
        RMAppEventType.KILL,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))

     // Transitions from ACCEPTED state
    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
        RMAppEventType.MOVE, new RMAppMoveTransition())
    .addTransition(RMAppState.ACCEPTED, RMAppState.RUNNING,
        RMAppEventType.ATTEMPT_REGISTERED)
    .addTransition(RMAppState.ACCEPTED,
        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
        // ACCEPTED state is possible to receive ATTEMPT_FAILED/ATTEMPT_FINISHED
        // event because RMAppRecoveredTransition is returning ACCEPTED state
        // directly and waiting for the previous AM to exit.
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.ACCEPTED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_FINISHED,
        new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
        RMAppEventType.KILL, new KillAttemptTransition())
    .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_KILLED,
        new FinalSavingTransition(new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())

     // Transitions from RUNNING state
    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
        RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
        RMAppEventType.MOVE, new RMAppMoveTransition())
    .addTransition(RMAppState.RUNNING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_UNREGISTERED,
        new FinalSavingTransition(
          new AttemptUnregisteredTransition(),
          RMAppState.FINISHING, RMAppState.FINISHED))
    .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
      // UnManagedAM directly jumps to finished
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.RUNNING, RMAppState.RUNNING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.RUNNING,
        EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.ACCEPTED))
    .addTransition(RMAppState.RUNNING, RMAppState.KILLING,
        RMAppEventType.KILL, new KillAttemptTransition())

     // Transitions from FINAL_SAVING state
    .addTransition(RMAppState.FINAL_SAVING,
      EnumSet.of(RMAppState.FINISHING, RMAppState.FAILED,
        RMAppState.KILLED, RMAppState.FINISHED), RMAppEventType.APP_UPDATE_SAVED,
        new FinalStateSavedTransition())
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_FINISHED,
        new AttemptFinishedAtFinalSavingTransition())
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    // ignorable transitions
    .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
        EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
          RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))

     // Transitions from FINISHING state
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    // ignorable transitions
    .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
      EnumSet.of(RMAppEventType.NODE_UPDATE,
        // ignore Kill/Move as we have already saved the final Finished state
        // in state store.
        RMAppEventType.KILL, RMAppEventType.MOVE))

     // Transitions from KILLING state
    .addTransition(RMAppState.KILLING, RMAppState.KILLING, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_KILLED,
        new FinalSavingTransition(
          new AppKilledTransition(), RMAppState.KILLED))
    .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
        RMAppEventType.ATTEMPT_UNREGISTERED,
        new FinalSavingTransition(
          new AttemptUnregisteredTransition(),
          RMAppState.FINISHING, RMAppState.FINISHED))
    .addTransition(RMAppState.KILLING, RMAppState.FINISHED,
      // UnManagedAM directly jumps to finished
        RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
    .addTransition(RMAppState.KILLING,
        EnumSet.of(RMAppState.FINAL_SAVING),
        RMAppEventType.ATTEMPT_FAILED,
        new AttemptFailedTransition(RMAppState.KILLING))

    .addTransition(RMAppState.KILLING, RMAppState.KILLING,
        EnumSet.of(
            RMAppEventType.NODE_UPDATE,
            RMAppEventType.ATTEMPT_REGISTERED,
            RMAppEventType.APP_UPDATE_SAVED,
            RMAppEventType.KILL, RMAppEventType.MOVE))

     // Transitions from FINISHED state
     // ignorable transitions
    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
        EnumSet.of(
            RMAppEventType.NODE_UPDATE,
            RMAppEventType.ATTEMPT_UNREGISTERED,
            RMAppEventType.ATTEMPT_FINISHED,
            RMAppEventType.KILL, RMAppEventType.MOVE))

     // Transitions from FAILED state
     // ignorable transitions
    .addTransition(RMAppState.FAILED, RMAppState.FAILED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(RMAppState.FAILED, RMAppState.FAILED,
        EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE,
            RMAppEventType.MOVE))

     // Transitions from KILLED state
     // ignorable transitions
    .addTransition(RMAppState.KILLED, RMAppState.KILLED, 
        RMAppEventType.APP_RUNNING_ON_NODE,
        new AppRunningOnNodeTransition())
    .addTransition(
        RMAppState.KILLED,
        RMAppState.KILLED,
        EnumSet.of(RMAppEventType.APP_ACCEPTED,
            RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
            RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
            RMAppEventType.NODE_UPDATE, RMAppEventType.MOVE))

     .installTopology();

转换(过渡)

我们已经了解了事件与状态的基本实现与概念,那么事件与状态有什么关系?从哲学角度讲,状态是一个事物的静止属性,而事件则是一个事物与外界沟通的桥梁,只有静止却没有变化,那么它只是一潭死水。事物只有在接收信息后动起来,才算与外界有了互动。一个事物动起来就会潜移默化的发生改变,它内部就会发生转换。一个对象当前处于状态state0,当对象接收到事件Event后,将引发转换动作transition,最终当前对象的状态过渡到state1。

单弧转换

由于SingleArcTransition的具体实现类只负责接收到事件后的具体操作或行为,并没有包含状态相关的信息,所以在状态机执行状态过渡时,并不是直接调用SingleArcTransition具体实现类的transition方法,而是由接口Transition定义(见代码清单3)真正的转态过渡(包括行为和状态改变)。

 .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
        RMAppEventType.START, new RMAppNewlySavingTransition())
private static final class RMAppNewlySavingTransition extends RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {

      // If recovery is enabled then store the application information in a
      // non-blocking call so make sure that RM has stored the information
      // needed to restart the AM after RM restart without further client
      // communication
      LOG.info("Storing application with id " + app.applicationId);
      app.rmContext.getStateStore().storeNewApplication(app);
    }
  }
 private static class RMAppTransition implements
      SingleArcTransition<RMAppImpl, RMAppEvent> {
    public void transition(RMAppImpl app, RMAppEvent event) {
    };

  }
public interface SingleArcTransition<OPERAND, EVENT> {
  /**
   * Transition hook.
   * 
   * @param operand the entity attached to the FSM, whose internal 
   *                state may change.
   * @param event causal event
   */
  public void transition(OPERAND operand, EVENT event);

}

StateMachineFactory.addTransition

addTransition首先初始化了SingleInternalArc对象,然后调用ApplicableSingleOrMultipleTransition

  public StateMachineFactory
             <OPERAND, STATE, EVENTTYPE, EVENT>
          addTransition(STATE preState, STATE postState,
                        EVENTTYPE eventType,
                        SingleArcTransition<OPERAND, EVENT> hook){
    return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
        (this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
           (preState, eventType, new SingleInternalArc(postState, hook)));
  }

SingleInternalArc

private class SingleInternalArc
                    implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {

    private STATE postState;
    private SingleArcTransition<OPERAND, EVENT> hook; // transition hook

    SingleInternalArc(STATE postState,
        SingleArcTransition<OPERAND, EVENT> hook) {
      this.postState = postState;
      this.hook = hook;
    }

    @Override
    public STATE doTransition(OPERAND operand, STATE oldState,
                              EVENT event, EVENTTYPE eventType) {
      if (hook != null) {
        hook.transition(operand, event);
      }
      return postState;
    }
  }

多弧转换

YARN中多弧过渡的作用是当有限状态机(FSM)中的状态转换为已经注册到状态机的多个有效状态中的一个时,伴随的行为与操作。

addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
            RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
            RMAppState.KILLED, RMAppState.FINAL_SAVING),
        RMAppEventType.RECOVER, new RMAppRecoveredTransition())

RMAppRecoveredTransition.transition方法返回转换之后的状态。

private static final class RMAppRecoveredTransition implements
      MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {

    @Override
    public RMAppState transition(RMAppImpl app, RMAppEvent event) {

      RMAppRecoverEvent recoverEvent = (RMAppRecoverEvent) event;
      app.recover(recoverEvent.getRMState());
      // The app has completed.
      if (app.recoveredFinalState != null) {
        app.recoverAppAttempts();
        new FinalTransition(app.recoveredFinalState).transition(app, event);
        return app.recoveredFinalState;
      }

      if (UserGroupInformation.isSecurityEnabled()) {
        // asynchronously renew delegation token on recovery.
        try {
          app.rmContext.getDelegationTokenRenewer()
              .addApplicationAsyncDuringRecovery(app.getApplicationId(),
                  app.parseCredentials(),
                  app.submissionContext.getCancelTokensWhenComplete(),
                  app.getUser());
        } catch (Exception e) {
          String msg = "Failed to fetch user credentials from application:"
              + e.getMessage();
          app.diagnostics.append(msg);
          LOG.error(msg, e);
        }
      }

      // No existent attempts means the attempt associated with this app was not
      // started or started but not yet saved.
      if (app.attempts.isEmpty()) {
        app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
          app.submissionContext.getQueue(), app.user,
          app.submissionContext.getReservationID()));
        return RMAppState.SUBMITTED;
      }

      // Add application to scheduler synchronously to guarantee scheduler
      // knows applications before AM or NM re-registers.
      app.scheduler.handle(new AppAddedSchedulerEvent(app.applicationId,
        app.submissionContext.getQueue(), app.user, true,
          app.submissionContext.getReservationID()));

      // recover attempts
      app.recoverAppAttempts();

      // Last attempt is in final state, return ACCEPTED waiting for last
      // RMAppAttempt to send finished or failed event back.
      if (app.currentAttempt != null
          && (app.currentAttempt.getState() == RMAppAttemptState.KILLED
              || app.currentAttempt.getState() == RMAppAttemptState.FINISHED
              || (app.currentAttempt.getState() == RMAppAttemptState.FAILED
                  && app.getNumFailedAppAttempts() == app.maxAppAttempts))) {
        return RMAppState.ACCEPTED;
      }

      // YARN-1507 is saving the application state after the application is
      // accepted. So after YARN-1507, an app is saved meaning it is accepted.
      // Thus we return ACCECPTED state on recovery.
      return RMAppState.ACCEPTED;
    }
  }
  public StateMachineFactory
             <OPERAND, STATE, EVENTTYPE, EVENT>
          addTransition(STATE preState, Set<STATE> postStates,
                        EVENTTYPE eventType,
                        MultipleArcTransition<OPERAND, EVENT, STATE> hook){
    return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
        (this,
         new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
           (preState, eventType, new MultipleInternalArc(postStates, hook)));
  }
private class MultipleInternalArc
              implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>{

    // Fields
    private Set<STATE> validPostStates;
    private MultipleArcTransition<OPERAND, EVENT, STATE> hook;  // transition hook

    MultipleInternalArc(Set<STATE> postStates,
                   MultipleArcTransition<OPERAND, EVENT, STATE> hook) {
      this.validPostStates = postStates;
      this.hook = hook;
    }

    @Override
    public STATE doTransition(OPERAND operand, STATE oldState,
                              EVENT event, EVENTTYPE eventType)
        throws InvalidStateTransitonException {
      STATE postState = hook.transition(operand, event);

      if (!validPostStates.contains(postState)) {
        throw new InvalidStateTransitonException(oldState, eventType);
      }
      return postState;
    }
  }

一个preState,一个postState, 多个事件

转换成多次addTransition单个事件

addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
        EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
          RMAppEventType.APP_NEW_SAVED, RMAppEventType.MOVE))

StateMachineFactory.installTopology

  public StateMachineFactory
             <OPERAND, STATE, EVENTTYPE, EVENT>
          installTopology() {
    return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);
  }

StateMachineFactory

 private StateMachineFactory
      (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,
       boolean optimized) {
    this.defaultInitialState = that.defaultInitialState;
    this.transitionsListNode = that.transitionsListNode;
    this.optimized = optimized;
    if (optimized) {
      makeStateMachineTable();
    } else {
      stateMachineTable = null;
    }
  }

makeStateMachineTable

private void makeStateMachineTable() {
    Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
      new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();

    Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
      prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();

    prototype.put(defaultInitialState, null);

    // I use EnumMap here because it'll be faster and denser.  I would
    //  expect most of the states to have at least one transition.
    stateMachineTable
       = new EnumMap<STATE, Map<EVENTTYPE,
                           Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);

    for (TransitionsListNode cursor = transitionsListNode;
         cursor != null;
         cursor = cursor.next) {
      stack.push(cursor.transition);
    }

    while (!stack.isEmpty()) {
      stack.pop().apply(this);
    }
  }

ApplicableSingleOrMultipleTransition

static private class ApplicableSingleOrMultipleTransition
             <OPERAND, STATE extends Enum<STATE>,
              EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
          implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {
    final STATE preState;
    final EVENTTYPE eventType;
    final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;

    ApplicableSingleOrMultipleTransition
        (STATE preState, EVENTTYPE eventType,
         Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {
      this.preState = preState;
      this.eventType = eventType;
      this.transition = transition;
    }

    @Override
    public void apply
             (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {
      Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
        = subject.stateMachineTable.get(preState);
      if (transitionMap == null) {
        // I use HashMap here because I would expect most EVENTTYPE's to not
        //  apply out of a particular state, so FSM sizes would be 
        //  quadratic if I use EnumMap's here as I do at the top level.
        transitionMap = new HashMap<EVENTTYPE,
          Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
        subject.stateMachineTable.put(preState, transitionMap);
      }
      transitionMap.put(eventType, transition);
    }
  }

在StateMachineFactory的addTransition过程中,是不生成真正的转换映射表的,直到最后的installTopology才生成状态机。
从stateMachineTable.put(preState, transitionMap);得知stateMachineTable先从状态机,找到transitionMap。从 transitionMap.put(eventType, transition);得知可以从当前的事件类型找到注册的转换。
在RMAppImpl的初始化方法里,调用··stateMachineFactory.make(this);··才生成最终的状态机实例。

private final StateMachine<RMAppState, RMAppEventType, RMAppEvent> stateMachine;
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
      Configuration config, String name, String user, String queue,
      ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
      ApplicationMasterService masterService, long submitTime,
      String applicationType, Set<String> applicationTags, 
      ResourceRequest amReq) {
   // 
    this.stateMachine = stateMachineFactory.make(this);
  }

RMAPPImpl.handle

状态机类要实现EventHandler<RMAppEvent>接口,直接调用stateMachine的transition。

@Override
  public void handle(RMAppEvent event) {

    this.writeLock.lock();

    try {
      ApplicationId appID = event.getApplicationId();
      LOG.debug("Processing event for " + appID + " of type "
          + event.getType());
      final RMAppState oldState = getState();
      try {
        /* keep the master in sync with the state machine */
        this.stateMachine.doTransition(event.getType(), event);
      } catch (InvalidStateTransitonException e) {
        LOG.error("Can't handle this event at current state", e);
        /* TODO fail the application on the failed transition */
      }

      if (oldState != getState()) {
        LOG.info(appID + " State change from " + oldState + " to "
            + getState() + " on event=" + event.getType());
      }
    } finally {
      this.writeLock.unlock();
    }
  }

注意事项

需要注意一下几点:

  • 如果object1给object2发送一个事件event1,当事件event1被处理时,object1是否还存在,或者状态变化之后是否影响。
2018-01-30 14:12:41,349 ERROR org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: Can't handle this event at current state
org.apache.hadoop.yarn.state.InvalidStateTransitonException: Invalid event: CONTAINER_ALLOCATED at ALLOCATED_SAVING
        at org.apache.hadoop.yarn.state.StateMachineFactory.doTransition(StateMachineFactory.java:305)
        at org.apache.hadoop.yarn.state.StateMachineFactory.access$300(StateMachineFactory.java:46)
        at org.apache.hadoop.yarn.state.StateMachineFactory$InternalStateMachine.doTransition(StateMachineFactory.java:448)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.handle(RMAppAttemptImpl.java:808)
        at org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl.handle(RMAppAttemptImpl.java:108)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$ApplicationAttemptEventDispatcher.handle(ResourceManager.java:803)
        at org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$ApplicationAttemptEventDispatcher.handle(ResourceManager.java:784)
        at org.apache.hadoop.yarn.event.AsyncDispatcher.dispatch(AsyncDispatcher.java:184)
        at org.apache.hadoop.yarn.event.AsyncDispatcher$1.run(AsyncDispatcher.java:110)
        at java.lang.Thread.run(Thread.java:745)

这时因为只有在Application被调度器允许时为ApplicationMaster分配Container,或者当ApplicationMaster 运行时,为该应用程序申请容器,其它状态不会申请容器。
当Application 为Running状态时,ApplicationMaster会不断的调用allocate方法向ResouceManager申请资源,但是ResouceManager不会直接资源分配,而是调度器把需要的资源放到程序对应的AppSchedulingInfo的实例中,然后返回。
ResourceManager的资源分配是NodeManager发送心跳的时候分配的,NodeManager 发送心跳时,会包括正在运行的Container列表,已经结束的Container列表,当前空闲的vcores数和未分配的Memory等信息。
当ResouceManager收到NodeManager的心跳时,会把该事件发送给如果应用程序有资源需求,并且本NodeManager有空闲资源,会往NodeManager分配Container。分配Container是把容器的信息作为一个事件RMAppAttemptEventType.CONTAINER_ALLOCATED发送到该应用程序的RMAppAttemptImpl对象中。RMAppAttemptImpl把事件中的Container 的信息保存到一个列表中,等下一次ApplicationMaster再一次调用allocate时,把分配给Application的资源取走。
* 问题
如果RMAppAttempt对应的ApplicationMaster在一台服务器上正在运行,不断的调用allocate申请资源。如果ApplicationMaster突然失败,或者NodeManager重启,NodeManager会把这个Container失败的信息汇报给ResourceManager。ResourceManager收到这个信息后,会重新初始化一个RMAppAttemptImpl对象,如果RMAppAttemptImpl的状态不是Scheduled并且不是RUNNING状态的时候,有NodeManager发送心跳,这时ResourceManager会分配容器到对应的RMAppAttemptImpl,即发送RMAppAttemptEventType.CONTAINER_ALLOCATED事件,但是当前的状态对这个事件的处理,所以程序报错。

  • 如果object1给object2发送一个事件event1, 当事件被处理时,object2可能不是当时的object了。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

YARN 状态机的原理和使用 的相关文章

  • PAT乙级刷题之路1055 集体照 (25分)

    1055 集体照 25分 拍集体照时队形很重要 这里对给定的 N 个人 K 排的队形设计排队规则如下 每排人数为 N K 向下取整 多出来的人全部站在最后一排 后排所有人的个子都不比前排任何人矮 每排中最高者站中间 中间位置为 m 2 1

随机推荐

  • 动态修改 MeshCollider

    借鉴 https www cnblogs com imteach p 10348744 html using System Collections Generic using UnityEngine RequireComponent typ
  • 关于超参数调优及迁移学习的一些见解

    前言 最近看了一本书 TensorFlow 实战google深度学习框架 其中里面的在第6章中的6 5 2这个小结中 里面有这样的一个代码 具体我就不贴了 总之就是先通过图片数据输入成一个 npy文件 然后通过这个npy文件来输入到模型中
  • 4.20/21实习总结:k8s

    文章目录 什么是k8s 是个软件系统 容器集群管理工具 提供了应用部署 规划 更新 维护的一种机制 1 k8s组件 2 k8s对象 都可以在yaml文件中作为一种API类型来配置 3 namespace名称空间 通过名称空间在同一个物理集群
  • Android开发技巧(三)—— 创建定时任务

    每种手机都有自己的休眠策略 Android手机在长时间不操作时会自动让CPU进入睡眠状态 这就导致JAVA原生Timer的定时任务无法运行 所以我们需要借助Alarm唤醒CPU 一 Alarm机制 Alarm借助了AlermManager类
  • React 常用生命周期函数

    文章目录 React 生命周期图示 创建 constructor render componentDidMount 更新 render componentDidUpdate prevProps prevState 卸载 componentW
  • P2597 [ZJOI2012]灾难【支配树】

    题目链接 这是一道支配树的模板题了 然后写一下我初见支配树的理解 第一次碰到支配树是在昨天的多校第三场的1002 当时我推了个拓扑排序加上LCA的求差 dp a dp b dp lca a b 来解这个问题 然后为了处理出来每个的dp值 我
  • caret教程10:随机森林示例

    我们今天给大家演示下caret包做随机森林分类的一个小例子 同时也给大家看看做预处理和不做预处理两种情况下的模型表现 加载R包和数据 rm list ls library caret Loading required package ggp
  • Jmeter分布式测试的各种坑

    第一坑 启动压力机的时候 直接 jmeter server 会报如下错误 错误原因 127 0 0 1是本机 一个回路地址 没有指定地址 正确的启动方式 启动命令加一个参数 IP地址写压力机对应的地址 jmeter server D jav
  • python爬虫之逆向破解_Python爬虫:一个反爬 JS 逆向分析的例子

    挺久没发爬虫相关的教程啦 今天给大伙分享一下关于网站反爬对请求参数值的加密分析例子 主要还是看看思路 定位加密点 在某网站中进行登录请求 简单抓下包 点击登录按钮之后 可以在浏览器的控制台中看到相关的请求 接着往下拉 可以看到 POST 请
  • 实时CPU设计

    Patmos with Chisel https github com t crest patmos
  • Wireshark使用详解

    文章目录 wireshark简介 抓包原理 抓包 抓包窗口介绍 封包详细信息 Packet Details Pane 过滤信息介绍 显示过滤 抓包过滤 高级功能 数据流追踪 wireshark简介 wireshark是捕获机器上的某一块网卡
  • smart doc:自动生成接口文档拓展

    smart doc 作为一个接口文档生成工具 可以根据代码的java doc注释 生成接口文档 同时已经对接到Torna接口平台 鉴于大家用的接口平台五花八门 Torna的功能可能不符合大家的需求 研究下是否可以基于smart doc 拓展
  • 百度UE富文本编辑器设置自适应大小和滚动条等

    其实真特么简单 一开始还试着自己设置 弄来弄去都不合适 最后发现你想要的人家其实都有 你只需要设置就好了 后知后觉啊 你想要的效果可能在文件夹下config js里边都有 只需要在界面中实例化页面的那行代码里边设置就好了 代码如下 var
  • ubuntu下redis安装配置

    为什么80 的码农都做不了架构师 gt gt gt ubuntu下redis安装配置 一 redis介绍 redis是一个key value存储系统 与memcached类似 但是解决了断电后数据完全丢失的现象 支持数据类型有string
  • 数学-麦克劳林公式

    麦克劳林公式是泰勒公式 在 记 的一种特殊形式 在不需要余项的精确表达式时 n阶泰勒公式也可写成 由此得近似公式 误差估计式变为 在麦克劳林公式中 误差 R x 是当x 0时比x 高阶的无穷小 若函数f x 在开区间 a b 有直到n 1阶
  • DiffusionDet: Diffusion Model for Object Detection

    DiffusionDet Diffusion Model for Object Detection 论文概述 不同之处 整体流程 论文题目 DiffusionDet Diffusion Model for Object Detection
  • C#连接sql的两种方法

    数据库连接 方法1 using System using System Collections Generic using System Linq using System Text using System Threading Tasks
  • H - Nine Packs Kattis - ninepacks

    题目链接 题意就是在a数组中找出x个数 b数组中找出y个数 且x个数之和等于y个数之和 本想着用暴力的写法 但是出来之后不对 因为此题并不是连续的 还可以跳着取数 下面是WA的代码 include
  • 基于Web的网络在线考试系统

    基于Web的网络在线考试系统 一 系统简介 本系统是一种基于Web的网络在线考试系统 各个模块主要从JSP Servlet JDBC JavaBean四部分技术角度出发搭建框架 其中Servlet是运行在服务器端的程序 被Web服务器 To
  • YARN 状态机的原理和使用

    有了异步处理 为什么还需要状态机 可以保存对象当前的状态 当接收到一个事件后 可以根据不同的状态 可以方便的决定怎么处理 如接收到用户发送的KILL事件时 当Job的状态为正在初始化时 只要把启动的进程kill就可以 当Job的状态为正在运