Flink源码阅读(十)--- Flink 心跳机制

1. 背景

心跳机制通过定期向对方发送请求方式用于检测客户端或者服务端是否存活的一种机制,常见的心跳检测有两种:
   1. socket 套接字 SO_KEEPALIVE 本身带有的心跳机制,定期向对方发送心跳包,对方在收到心跳包后会自动回复;
   2. 应用自身实现心跳机制,同样也是使用定期发送请求的方式
Flink实现的是第二种方案。

Flink引擎中,RM(ResourceManager)、JM(JobMaster)、TaskExecutor之间存在相互检测的心跳机制。

  1. RM会主动发送请求探测JobMaster、TaskExecutor是否存活。
  2. JobMaster主动发送请求探测TaskExecutor是否存活,以便进行任务重启或者失败处理。

2. 心跳注册

Job运行 这篇文章介绍了作业启动步骤,咱们这次重新梳理下,看下flink心跳机制是怎么生效的。

2.1 JobMaster 与 ResourceManager 心跳交互

这里主要看下JobMaster#startJobMasterServices方法,

     private void startJobMasterServices() throws Exception {
        startHeartbeatServices();

        // start the slot pool make sure the slot pool now accepts messages for this leader
        slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());

        // TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
        // try to reconnect to previously known leader
        reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

        // job is ready to go, try to establish connection with resource manager
        //   - activate leader retrieval for the resource manager
        //   - on notification of the leader, the connection will be established and
        //     the slot pool will start requesting slots
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }
2.1.1 心跳服务启动:startHeartbeatServices
  • 在JobMaster中,主要有两个心跳服务,taskManagerHeartbeatManager(heartbeatServices.createHeartbeatManagerSender)和 resourceManagerHeartbeatManager(heartbeatServices.createHeartbeatManager)
  • taskManagerHeartbeatManager主要是探测注册到JM的所有TM心跳是否正常,主动向所有的TM发心跳请求;resourceManagerHeartbeatManager主要是与RM进行心跳交互,接受RM发来的心跳请求。
2.1.2 信息注册

调用链:

JobMaster#reconnectToResourceManager --> JobMaster#tryConnectToResourceManager -->  JobMaster#connectToResourceManager --> RegisteredRpcConnection#start 

   - 把JM注册到RM
   - 当JM注册到RM成功之后,JM会记录RM的监控信息,也就是JobMaster的resourceManagerHeartbeatManager对象中,监控target加入RM。由于都是RM主动往JM发心跳检测,所以在JobMaster的resourceManagerHeartbeatManager中,不需要实现requestHeartbeat(主动往其他target sender heartbeat时用到),实现了receiveHeartbeat用于接受RM发来的心跳请求。
   - 向RM注册JM之后,RM中同样会添加JM的监控信息,在jobManagerHeartbeatManager对象监控target中,加入jobManagerResourceId。由于都是RM主动往JM发心跳检测,所以在RM的jobManagerHeartbeatManager中,只需要实现requestHeartbeat(用于往JM发心跳请求),不需要实现receiveHeartbeat。

2.2 TaskManager 与 ResourceManager 、JobMaster 心跳交互

TaskManager的申请入口是 JobMaster#resetAndStartScheduler,也就是在作业部署重启过程中申请的。

2.2.1 TaskManager 申请启动
TM申请

TM申请隐藏的比较深,我们给出调用链

JobMaster#resetAndStartScheduler --> JobMaster#startScheduling --> SchedulerBase#startScheduling --> DefaultScheduler#startSchedulingInternal --> PipelinedRegionSchedulingStrategy#startScheduling --> PipelinedRegionSchedulingStrategy#maybeScheduleRegions --> PipelinedRegionSchedulingStrategy#maybeScheduleRegion --> DefaultScheduler#allocateSlotsAndDeploy  --> DefaultScheduler#allocateSlots --> SlotSharingExecutionSlotAllocator#allocateSlotsFor --> SlotSharingExecutionSlotAllocator#getOrAllocateSharedSlot --> PhysicalSlotProviderImpl#allocatePhysicalSlot --> PhysicalSlotProviderImpl#requestNewSlot --> SlotPoolImpl#requestNewAllocatedSlot --> SlotPoolImpl#requestNewAllocatedSlotInternal --> SlotPoolImpl#requestSlotFromResourceManager --> ResourceManager#requestSlot --> SlotManagerImpl#registerSlotRequest --> SlotManagerImpl#internalRequestSlot --> SlotManagerImpl#fulfillPendingSlotRequestWithPendingTaskManagerSlot --> SlotManagerImpl#allocateResource --> ResourceActionsImpl#allocateResource --> ActiveResourceManager#startNewWorker -->  ActiveResourceManager#requestNewWorker --> YarnResourceManagerDriver#requestResource --> AMRMClientAsyncImpl#addContainerRequest

在作业部署过程中,如果freeSlots没有可用slot,那会向RM申请资源,申请调用链就是上面这个图。最终会把addContainerRequest加入一个map中,采用异步方式进行资源申请。

TM启动

资源申请下来之后,会回调 AMRMClientAsync#onContainersAllocated --> YarnResourceManagerDriver#onContainersAllocated,这个方法异步启动TM(拓展:这里会通过YARN Node Manager client来异步启动TM)。

接下来看下TM具体的启动流程,这里会涉及到怎么往RM,JM注册的东西。入口是TaskExecutor#onStart方法,主要看下startTaskExecutorServices

    private void startTaskExecutorServices() throws Exception {
        try {
            // start by connecting to the ResourceManager
            // resourceManagerLeaderRetriever其实是EmbeddedLeaderService的实现,A simple leader election service, which selects a leader among contenders and notifies listeners.
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

            // tell the task slot table who's responsible for the task slot actions
            taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());

            // start the job leader service
            jobLeaderService.start(
                    getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

            fileCache =
                    new FileCache(
                            taskManagerConfiguration.getTmpDirectories(),
                            blobCacheService.getPermanentBlobService());
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }
2.2.2 TaskManager 与 ResourceManager 心跳交互

看下ResourceManagerLeaderListener,当leader被选举出来之后,会去调用notifyLeaderAddress方法,这里会调用notifyOfNewResourceManagerLeader,最终会调用 TaskExecutor#connectToResourceManager --> RegisteredRpcConnection#start方法

    public void start() {
        checkState(!closed, "The RPC connection is already closed");
        checkState(
                !isConnected() && pendingRegistration == null,
                "The RPC connection is already started");

        final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();

        if (REGISTRATION_UPDATER.compareAndSet(this, null, newRegistration)) {
            newRegistration.startRegistration();
        } else {
            // concurrent start operation
            newRegistration.cancel();
        }
    }

和注册JM类似,执行的都是RegisteredRpcConnection#start方法,只不过invokeRegistration、onRegistrationSuccess具体实现有区别。

  • 步骤1: 注册TM( invokeRegistration )
       1. JM对应的是registerJobManager。
       2. TM对应的是registerTaskExecutor
         2.1 ResourceManager#registerTaskExecutor
            > 如果这个TM以前被注册过(taskExecutorResourceId在taskExecutors map中存在),那会把这个TM上的slot都释放掉(也就是把TM的slots从RM对象中remove)
            > RM中加入这个TM的心跳信息,也就是RM的taskManagerHeartbeatManager target中接入该TM,用于监控TM是否存活。由于都是RM主动往TM发心跳检测,所以在RM的taskManagerHeartbeatManager对象中,只需要实现requestHeartbeat用于向TM发心跳请求,不需要实现receiveHeartbeat。

  • 步骤2: 注册TM成功 ( onRegistrationSuccess )
       1. JM对应的是 JobMaster#establishResourceManagerConnection 方法:
          1.1 JM的resourceManagerHeartbeatManager target中加入RM,用于与RM进行心跳交互
       2. TM对应的是 TaskExecutor#establishResourceManagerConnection 方法:
          2.1 TM的resourceManagerHeartbeatManager target中加入RM,用于与RM进行心跳交互。由于都是RM主动往TM发心跳检测,所以在TM的resourceManagerHeartbeatManager对象中,只需要实现receiveHeartbeat用于接受RM发来的心跳请求,不需要实现requestHeartbeat。
          2.2 ResourceManager#sendSlotReport --> SlotManagerImpl#registerTaskManager: 在SlotManager中注册一个新TM,这样的话,TM的slots就可以被感知到,并且可以被分配。

2.2.3 TaskManager 与 JobMaster 心跳交互

看下 TaskExecutor#startTaskExecutorServices --> jobLeaderService.start --> JobLeaderListenerImpl#jobManagerGainedLeadership --> TaskExecutor#establishJobManagerConnection 方法

  1. 首先判断 job 的 JobMaster 是否存在
       1.1 若不存在,继续
       1.2 若存在并且JM与新的JobMasterId不同,说明JM有重启,那会调用disconnectJobManagerConnection把job下的所有task都fail掉。
  2. 在TM中加入JM的心跳监控,用来与JM进行心跳交互。
       也就是在 TM 的 jobManagerHeartbeatManager target 中加入jobManagerResourceID,由于都是JM往TM主动发送请求,因此只需要实现 receiveHeartbeat 用于接受JM发来的心跳请求,不需要实现 requestHeartbeat 。
  3. 把 Job 中所有没有分配 task 的 slot 通知给 JobMaster

3. 心跳超时处理

HeartbeatListener 是心跳超时处理总接口,RM、JM、TM都有对应的实现类,分别看下心跳超时处理方式

3.1 在RM中,检查到JM心跳超时

会调用 ResourceManager#closeJobManagerConnection 方法
      1. RM 心跳检测服务,不再检测这个JM,也就是从 jobManagerHeartbeatManager 对象中把该 JM 从 target remove掉。
      2. 如果 resourceManagerAddress != null 并且 resourceManagerId 没变,JM 会重新去连接RM(JM把RM的心跳监控信息保存;slotPoolImpl与RM建立连接并且根据slot请求队列waitingForResourceManager向RM申请资源)

3.2 在RM中,检查到TM心跳超时

会调用 ResourceManager#closeTaskManagerConnection 方法
      1. RM 心跳检测服务,不再检测这个TM,也就是从 taskManagerHeartbeatManager 对象中把该 TM 从 target remove掉。
      2. 把这个TM中所有的slot remove掉
         2.1 从SlotManagerImpl的freeSlots中把这些slot remove掉
         2.2 并且把pending状态slot request 采用completeExceptionally方式结束掉;
         2.3 如果slot已经分配了task,把slot上的task fail掉,并且把TaskSlotState状态置为RELEASING,把分配的内存释放掉。
         2.4 如果TM对应的RPC endpoint还是started状态的,TM会重新去连接RM(TM把RM的心跳监控信息保存;RM也会把这个TM保存到taskExecutors map中,也就是把TM注册到RM)

总结如下:
   关闭TaskManager和ResourceManager的连接,TaskManager会尝试重新注册到ResourceManager,ResourceManager在关闭TaskManager连接时,会同时向SlotManager发起unregister,此时SlotManager(真正干活的是SlotPool)会释放掉对应的所有slot,并按照AllocationID通知JobMaster去fail对应分配的task;超过最大注册时间还未注册成功,则退出TaskManager进程。

3.3 在JM中,检查到TM心跳超时

会调用 JobMaster#disconnectTaskManager 方法
      1. JM 心跳检测服务,不再检测这个TM,也就是从 taskManagerHeartbeatManager 对象中把该 TM 从 target remove掉。
      2. 把该TM上所有slot remove掉
         2.1 从SlotPool的allocatedSlots以及availableSlots对象中把slot remove。
         2.2 如果slot已经分配了task,把slot上的task fail掉,并且把TaskSlotState状态置为RELEASING,把分配的内存释放掉。
      3. TM直接fail掉该TM上该Job所有的task,并和JM断掉连接。
      4. TM重新连接JM,JM会向RM申请TM,申请之后JM进行重新调度。

总结如下:
   TaskManager直接fail该TM该Job所有的Task,JobManager标记Task失败,也会进行fail,并进行重新调度。

4. 总结

4.1 心跳注册

  1. JobMaster心跳注册
    在startJobMasterServices方法中,会完成向RM注册,并且把RM的心跳监控信息保存起来。同时在JM向RM注册的时候,RM同样会把JM的心跳监控信息保存起来,以后可以向JM发心跳请求。
  2. TaskManager心跳注册
    2.1 TaskManager的申请入口是:JobMaster#resetAndStartScheduler,也就是在作业部署重启过程中申请的。
    2.2 TaskManager的启动入口是:TaskExecutor#startTaskExecutorServices。
       2.2.1 TM首先会向RM注册,注册成功之后,会把RM的心跳监控信息保存起来。同时在TM向RM注册的时候,RM同样会把TM的心跳监控信息保存起来,以后可以向RM发心跳请求。
       2.2.2 TM向RM注册之后,还会向JM注册。TM与JM建立连接之后,会把JM的心跳监控信息保存起来。同时JM会把TM的心跳监控信息保存起来,以后可以向TM发心跳请求。

4.2 心跳超时处理

  1. 在RM中,检查到JM心跳超时:JM 会重新去连接RM
  2. 在RM中,检查到TM心跳超时
       关闭TaskManager和ResourceManager的连接,TaskManager会尝试重新注册到ResourceManager,ResourceManager在关闭TaskManager连接时,会同时向SlotManager发起unregister,此时SlotManager(真正干活的是SlotPool)会释放掉对应的所有slot,并按照AllocationID通知JobMaster去fail对应分配的task;超过最大注册时间还未注册成功,则退出TaskManager进程。
  3. 在JM中,检查到TM心跳超时
       TaskManager直接fail该TM该Job所有的Task,JobManager标记Task失败,也会进行fail,并进行重新调度。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容