Flink 源码之TaskManager启动流程

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

启动脚本分析

TaskManager的启动方式为taskmanager.sh start。因此我们如果想要知道程序入口类,必须首先分析这个脚本。

这个脚本比较长,我们重点关注如下片段:

# ...
# 设置ENTRYPOINT变量值为taskexecutor
ENTRYPOINT=taskexecutor
# ...
if [[ $STARTSTOP == "start-foreground" ]]; then
    exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
    if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
        # Start a single TaskManager
        # 我们关注这里,不使用NUMA方式后台启动
        "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
    else
        # Example output from `numactl --show` on an AWS c4.8xlarge:
        # policy: default
        # preferred node: current
        # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
        # cpubind: 0 1
        # nodebind: 0 1
        # membind: 0 1
        read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
        for NODE_ID in "${NODE_LIST[@]:1}"; do
            # Start a TaskManager for each NUMA node
            numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
        done
    fi
fi

通过分析上面的脚本,我们看到如果没有使用start-foreground(前台模式启动),实际上调用的是flink-daemon.sh脚本。此脚本的使用方式和参数示例为:

flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]

为了弄清楚Java的入口类,我们接着分析flink-daemon.sh,发现如下片段:

# ...
STARTSTOP=$1
# 经上面分析可知,通过taskmanager.sh执行,DAEMON的值为taskexecutor
DAEMON=$2
# ...
case $DAEMON in
    (taskexecutor)
        CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    ;;

    (zookeeper)
        CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
    ;;

    (historyserver)
        CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
    ;;

    (standalonesession)
        CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    ;;

    (standalonejob)
        CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
    ;;

    (*)
        echo "Unknown daemon '${DAEMON}'. $USAGE."
        exit 1
    ;;
esac
# ...

不难发现,DAEMON变量的值为taskexecutor,实际的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunnerTaskManagerRunner就是TaskManager启动的入口类。

下面我们分析TaskManagerRunner的相关源代码。

TaskManagerRunner

TaskManagerRunner是TaskManager在yarn模式和standalone模式下的启动类。

我们查看下main方法:

public static void main(String[] args) throws Exception {
    // startup checks and logging
    // 日志打印环境信息
    EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
    
    // 注册信号处理句柄
    // 在Linux下响应TERM,HUP和INT
    SignalHandler.register(LOG);
    
    // 增加shutdownlook,在JVM关闭之前回调
    // 让JVM关闭延迟5秒钟
    JvmShutdownSafeguard.installAsShutdownHook(LOG);

    // 获取并打印最大打开文件句柄数限制
    long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();

    if (maxOpenFileHandles != -1L) {
        LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
    } else {
        LOG.info("Cannot determine the maximum number of open file descriptors");
    }

    // 启动TaskManager
    runTaskManagerSecurely(args, ResourceID.generate());
}

继续跟踪runTaskManagerSecurely方法,内容如下:

public static void runTaskManagerSecurely(String[] args, ResourceID resourceID) {
    try {
        // 读取flink-conf.yaml和命令行传入的动态参数,作为配置信息
        final Configuration configuration = loadConfiguration(args);

        // 初始化共享文件系统配置(FileSystemFactory),比如HDFS
        FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));

        // 读取安全相关配置,包含flink,JAAS,Hadoop和Zookeeper的安全配置
        SecurityUtils.install(new SecurityConfiguration(configuration));

        // 以安全认证环境下调用runTaskManager
        SecurityUtils.getInstalledContext().runSecured(() -> {
            runTaskManager(configuration, resourceID);
            return null;
        });
    } catch (Throwable t) {
        final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
        LOG.error("TaskManager initialization failed.", strippedThrowable);
        System.exit(STARTUP_FAILURE_RETURN_CODE);
    }
}

该方法载入了Flink的主配置文件,初始化了文件系统和服务安全配置。启动TaskManager的方法在runTaskManager。如下所示:

public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
    // 创建一个TaskManagerRunner,使用随机的resourceId
    final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);

    // 调用start
    taskManagerRunner.start();
}

我们需要分析TaskManagerRunner构造函数和start方法。

首先我们分析下构造函数:

public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
    this.configuration = checkNotNull(configuration);
    this.resourceId = checkNotNull(resourceId);

    // 获取Akka超时时间
    timeout = AkkaUtils.getTimeoutAsTime(configuration);

    // 创建一个task manager的线程池
    // corePoolSize和机器CPU数量一致
    this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
        Hardware.getNumberCPUCores(),
        new ExecutorThreadFactory("taskmanager-future"));

    // 创建高可用服务
    // 负责选举JobManager和ResourceManager和获取leader信息
    highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
        configuration,
        executor,
        HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);

    // 创建RPC服务,和其他Flink进程互相通信的时候使用
    // 用于连接一个RpcEndpoint,连接成功之后返回一个RpcGateway
    rpcService = createRpcService(configuration, highAvailabilityServices);

    // 创建心跳服务
    HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

    // 创建监控指标注册
    // 用于记录所有的metrics,连接MetricGroup和MetricReporter
    metricRegistry = new MetricRegistryImpl(
        MetricRegistryConfiguration.fromConfiguration(configuration),
        ReporterSetup.fromConfiguration(configuration));

    // 开启metrics查询服务
    // 以key-value方式返回Flink中已注册的metrics
    final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, rpcService.getAddress());
    metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);

    // 创建BlobCache服务
    blobCacheService = new BlobCacheService(
        configuration, highAvailabilityServices.createBlobStore(), null
    );
    
    // 创建外部资源信息Provider
    final ExternalResourceInfoProvider externalResourceInfoProvider =
    ExternalResourceUtils.createStaticExternalResourceInfoProvider(
        ExternalResourceUtils.getExternalResourceAmountMap(configuration),
        ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));

    // 启动task manager
    // 稍后分析
    taskManager = startTaskManager(
        this.configuration,
        this.resourceId,
        rpcService,
        highAvailabilityServices,
        heartbeatServices,
        metricRegistry,
        blobCacheService,
        false,
        this);

    this.terminationFuture = new CompletableFuture<>();
    this.shutdown = false;

    MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
}

在分析了层层封装之后,终于找到startTaskManager方法。该方法创建出一个TaskExecutor对象,如下所示:

public static TaskExecutor startTaskManager(
        Configuration configuration,
        ResourceID resourceID,
        RpcService rpcService,
        HighAvailabilityServices highAvailabilityServices,
        HeartbeatServices heartbeatServices,
        MetricRegistry metricRegistry,
        BlobCacheService blobCacheService,
        boolean localCommunicationOnly,
        ExternalResourceInfoProvider externalResourceInfoProvider,
        FatalErrorHandler fatalErrorHandler) throws Exception {

    checkNotNull(configuration);
    checkNotNull(resourceID);
    checkNotNull(rpcService);
    checkNotNull(highAvailabilityServices);

    LOG.info("Starting TaskManager with ResourceID: {}", resourceID.getStringWithMetadata());

    // 获取外部访问地址
    String externalAddress = rpcService.getAddress();

    // 获取task executor资源详情
    // 包含CPU核数,task堆内存,tsk堆外内存和managed内存
    final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);

    // 获取Task Manager服务配置
    TaskManagerServicesConfiguration taskManagerServicesConfiguration =
        TaskManagerServicesConfiguration.fromConfiguration(
            configuration,
            resourceID,
            externalAddress,
            localCommunicationOnly,
            taskExecutorResourceSpec);

    // 创建task manager的MetricGroup
    Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
        metricRegistry,
        externalAddress,
        resourceID,
        taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());

    // 创建用于IO任务的线程池
    final ExecutorService ioExecutor = Executors.newFixedThreadPool(
        taskManagerServicesConfiguration.getNumIoThreads(),
        new ExecutorThreadFactory("flink-taskexecutor-io"));

    // 创建Task Manager服务,是其他多种资源或服务的容器
    // 它涉及的服务也非常多,稍后我们单独分析
    TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
        taskManagerServicesConfiguration,
        blobCacheService.getPermanentBlobService(),
        taskManagerMetricGroup.f1,
        ioExecutor,
        fatalErrorHandler);

    // 创建task manager的配置
    TaskManagerConfiguration taskManagerConfiguration =
        TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);

    // 获取metrics查询服务的地址
    String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();

    // 创建task executor
    return new TaskExecutor(
        rpcService,
        taskManagerConfiguration,
        highAvailabilityServices,
        taskManagerServices,
        externalResourceInfoProvider,
        heartbeatServices,
        taskManagerMetricGroup.f0,
        metricQueryServiceAddress,
        blobCacheService,
        fatalErrorHandler,
        new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
        createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));
}

到这里为止,一个完整的TaskExecutor创建完毕。创建的过程涉及到了很多相关服务器的初始化,稍后在文末以脑图形式为大家总结。

TaskManagerServices的fromConfiguration方法

TaskManagerServices是一系列TaskManager服务的容器,包含内存控制器,IO控制器,Shuffle环境等。各个服务的用途计划在后续博客中介绍。

这里我们重点关注它的fromConfiguration方法。如下所示:

public static TaskManagerServices fromConfiguration(
        TaskManagerServicesConfiguration taskManagerServicesConfiguration,
        MetricGroup taskManagerMetricGroup,
        Executor taskIOExecutor) throws Exception {

    // pre-start checks
    // 检查temp dir(yarn模式为local_dirs,standalone模式为java.io.tmpdir)目录是否存在,如果不存在会创建文件夹
    // 是否temp dir是否是目录,是否可写入
    checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());

    // 创建TaskEventDispatcher
    // 任务事件派发器,用于消费任务向生产任务发送TaskEvent
    final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();

    // 创建异步IO管理器
    // 根据配置的tmp dir的个数,创建对应数量的读写线程,负责异步读写数据
    // start the I/O manager, it will create some temp directories.
    final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());

    // 创建ShuffleEnvironment
    // 负责在本地提供一个shuffle环境,使用memory segment作为数据存储
    // 可以创建数据写入端ResultPartitionWriter和数据消费端InputGate
    final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
        taskManagerServicesConfiguration,
        taskEventDispatcher,
        taskManagerMetricGroup);
    // 启动ShuffleManager
    // 返回TaskManager的数据端口(taskmanager.data.port)
    final int dataPort = shuffleEnvironment.start();

    // 创建key value状态存储服务
    final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
    kvStateService.start();

    // 封装task manager连接信息到TaskManagerLocation
    final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
        taskManagerServicesConfiguration.getResourceID(),
        taskManagerServicesConfiguration.getTaskManagerAddress(),
        dataPort);

    // 创建广播变量管理器
    final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();

    // 创建TaskSlotTable,维护task和slot的分配关系
    final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
        taskManagerServicesConfiguration.getNumberOfSlots(),
        taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
        taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
        taskManagerServicesConfiguration.getPageSize());

    // 创建JobManagerTable,维护JobId和JobManager的对应关系
    final JobManagerTable jobManagerTable = new JobManagerTable();

    // 创建Job leader服务。Job leader是领导一个job的job manager。
    // 一旦某个job manager获得leader角色,或者失去leader状态,会通知JobLeaderListener,位于TaskExecutor.java中
    final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());

    // 读取本地状态保存根路径
    // taskmanager.state.local.root-dirs
    final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();

    final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];

    // 创建目录
    for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
        stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
    }

    // 创建任务状态管理器
    final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
        taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
        stateRootDirectoryFiles,
        taskIOExecutor);

    // 构建TaskManagerServices
    return new TaskManagerServices(
        taskManagerLocation,
        taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
        ioManager,
        shuffleEnvironment,
        kvStateService,
        broadcastVariableManager,
        taskSlotTable,
        jobManagerTable,
        jobLeaderService,
        taskStateManager,
        taskEventDispatcher);
}

TaskManager的启动

以上只是相关服务的创建逻辑,服务启动的逻辑位于start方法中。

TaskManagerRunnerstart方法如下所示:

public void start() throws Exception {  
    // 调用TaskExecutor的start方法
    // 即TaskExecutor父类RpcEndpoint的start方法
    taskManager.start();
}

RpcEndpoint启动的时候会调用生命周期的onStart方法。
TaskExecutoronStart方法:

@Override
public void onStart() throws Exception {
    try {
        // 启动TaskExecutor服务
        startTaskExecutorServices();
    } catch (Exception e) {
        final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
        onFatalError(exception);
        throw exception;
    }

    // 启动超时检测
    startRegistrationTimeout();
}

启动TaskExecutor服务的方法startTaskExecutorServices代码如下:

private void startTaskExecutorServices() throws Exception {
    try {
        // start by connecting to the ResourceManager
        // 启动leader变更监听服务,如果leader变更会通知ResourceManagerLeaderListener
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

        // tell the task slot table who's responsible for the task slot actions
        // 启动task slot table,参数的SlotAction负责定义释放slot(freeSlot)和slot超时(timeoutSlot)逻辑
        taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());

        // start the job leader service
        // 启动job leader服务
        // 管理某个job的task manager是这个job的job leader
        // job manager如果有变更,会通知JobLeaderListenerImpl
        jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

        // 创建文件缓存
        fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
    } catch (Exception e) {
        handleStartTaskExecutorServicesException(e);
    }
}

以上是TaskManager完整的创建和启动流程。

附录

TaskExecutor中的重要服务脑图

这些服务的作用在本人后续博客中计划陆续分析。


TaskExecutor
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容

  • 一、前言 Apache Flink作为一款高吞吐量、低延迟的针对流数据和批数据的分布式实时处理引擎,是当前实时处理...
    大菜鸟_阅读 1,688评论 0 0
  • Flink可以选择的部署方式有: Local、Standalone(资源利用率低)、Yarn、Mesos、Dock...
    临时_01e2阅读 1,099评论 0 0
  • 最近在基于flink做实时计算平台,准备写博客记录一些日常的工作。本篇主要是记一下如何调试Flink的源码,以St...
    hihl阅读 4,872评论 0 2
  • Checkpoint在分布式流处理框架的准确性具有重要意义。Flink实现了基于Chandy-Lamport算法的...
    WestC阅读 2,598评论 0 5
  • 这将是一套完整详细且持续更新的、长期维护的 原创 Flink系列教程、文档,其中会包含各种商用实例详解、Flin...
    Z尽际阅读 1,634评论 2 3