Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
启动脚本分析
TaskManager的启动方式为taskmanager.sh start
。因此我们如果想要知道程序入口类,必须首先分析这个脚本。
这个脚本比较长,我们重点关注如下片段:
# ...
# 设置ENTRYPOINT变量值为taskexecutor
ENTRYPOINT=taskexecutor
# ...
if [[ $STARTSTOP == "start-foreground" ]]; then
exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
else
if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
# Start a single TaskManager
# 我们关注这里,不使用NUMA方式后台启动
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
else
# Example output from `numactl --show` on an AWS c4.8xlarge:
# policy: default
# preferred node: current
# physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
# cpubind: 0 1
# nodebind: 0 1
# membind: 0 1
read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
for NODE_ID in "${NODE_LIST[@]:1}"; do
# Start a TaskManager for each NUMA node
numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
done
fi
fi
通过分析上面的脚本,我们看到如果没有使用start-foreground
(前台模式启动),实际上调用的是flink-daemon.sh
脚本。此脚本的使用方式和参数示例为:
flink-daemon.sh (start|stop|stop-all) (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob) [args]
为了弄清楚Java的入口类,我们接着分析flink-daemon.sh
,发现如下片段:
# ...
STARTSTOP=$1
# 经上面分析可知,通过taskmanager.sh执行,DAEMON的值为taskexecutor
DAEMON=$2
# ...
case $DAEMON in
(taskexecutor)
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;
(zookeeper)
CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
;;
(historyserver)
CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
;;
(standalonesession)
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
;;
(standalonejob)
CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;
(*)
echo "Unknown daemon '${DAEMON}'. $USAGE."
exit 1
;;
esac
# ...
不难发现,DAEMON变量的值为taskexecutor
,实际的入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner
。TaskManagerRunner
就是TaskManager启动的入口类。
下面我们分析TaskManagerRunner
的相关源代码。
TaskManagerRunner
TaskManagerRunner是TaskManager在yarn模式和standalone模式下的启动类。
我们查看下main
方法:
public static void main(String[] args) throws Exception {
// startup checks and logging
// 日志打印环境信息
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
// 注册信号处理句柄
// 在Linux下响应TERM,HUP和INT
SignalHandler.register(LOG);
// 增加shutdownlook,在JVM关闭之前回调
// 让JVM关闭延迟5秒钟
JvmShutdownSafeguard.installAsShutdownHook(LOG);
// 获取并打印最大打开文件句柄数限制
long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
if (maxOpenFileHandles != -1L) {
LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
} else {
LOG.info("Cannot determine the maximum number of open file descriptors");
}
// 启动TaskManager
runTaskManagerSecurely(args, ResourceID.generate());
}
继续跟踪runTaskManagerSecurely
方法,内容如下:
public static void runTaskManagerSecurely(String[] args, ResourceID resourceID) {
try {
// 读取flink-conf.yaml和命令行传入的动态参数,作为配置信息
final Configuration configuration = loadConfiguration(args);
// 初始化共享文件系统配置(FileSystemFactory),比如HDFS
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
// 读取安全相关配置,包含flink,JAAS,Hadoop和Zookeeper的安全配置
SecurityUtils.install(new SecurityConfiguration(configuration));
// 以安全认证环境下调用runTaskManager
SecurityUtils.getInstalledContext().runSecured(() -> {
runTaskManager(configuration, resourceID);
return null;
});
} catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("TaskManager initialization failed.", strippedThrowable);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
}
该方法载入了Flink的主配置文件,初始化了文件系统和服务安全配置。启动TaskManager的方法在runTaskManager
。如下所示:
public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
// 创建一个TaskManagerRunner,使用随机的resourceId
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId);
// 调用start
taskManagerRunner.start();
}
我们需要分析TaskManagerRunner
构造函数和start
方法。
首先我们分析下构造函数:
public TaskManagerRunner(Configuration configuration, ResourceID resourceId) throws Exception {
this.configuration = checkNotNull(configuration);
this.resourceId = checkNotNull(resourceId);
// 获取Akka超时时间
timeout = AkkaUtils.getTimeoutAsTime(configuration);
// 创建一个task manager的线程池
// corePoolSize和机器CPU数量一致
this.executor = java.util.concurrent.Executors.newScheduledThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("taskmanager-future"));
// 创建高可用服务
// 负责选举JobManager和ResourceManager和获取leader信息
highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
executor,
HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
// 创建RPC服务,和其他Flink进程互相通信的时候使用
// 用于连接一个RpcEndpoint,连接成功之后返回一个RpcGateway
rpcService = createRpcService(configuration, highAvailabilityServices);
// 创建心跳服务
HeartbeatServices heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
// 创建监控指标注册
// 用于记录所有的metrics,连接MetricGroup和MetricReporter
metricRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(configuration),
ReporterSetup.fromConfiguration(configuration));
// 开启metrics查询服务
// 以key-value方式返回Flink中已注册的metrics
final RpcService metricQueryServiceRpcService = MetricUtils.startMetricsRpcService(configuration, rpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId);
// 创建BlobCache服务
blobCacheService = new BlobCacheService(
configuration, highAvailabilityServices.createBlobStore(), null
);
// 创建外部资源信息Provider
final ExternalResourceInfoProvider externalResourceInfoProvider =
ExternalResourceUtils.createStaticExternalResourceInfoProvider(
ExternalResourceUtils.getExternalResourceAmountMap(configuration),
ExternalResourceUtils.externalResourceDriversFromConfig(configuration, pluginManager));
// 启动task manager
// 稍后分析
taskManager = startTaskManager(
this.configuration,
this.resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
false,
this);
this.terminationFuture = new CompletableFuture<>();
this.shutdown = false;
MemoryLogger.startIfConfigured(LOG, configuration, terminationFuture);
}
在分析了层层封装之后,终于找到startTaskManager
方法。该方法创建出一个TaskExecutor
对象,如下所示:
public static TaskExecutor startTaskManager(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
ExternalResourceInfoProvider externalResourceInfoProvider,
FatalErrorHandler fatalErrorHandler) throws Exception {
checkNotNull(configuration);
checkNotNull(resourceID);
checkNotNull(rpcService);
checkNotNull(highAvailabilityServices);
LOG.info("Starting TaskManager with ResourceID: {}", resourceID.getStringWithMetadata());
// 获取外部访问地址
String externalAddress = rpcService.getAddress();
// 获取task executor资源详情
// 包含CPU核数,task堆内存,tsk堆外内存和managed内存
final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
// 获取Task Manager服务配置
TaskManagerServicesConfiguration taskManagerServicesConfiguration =
TaskManagerServicesConfiguration.fromConfiguration(
configuration,
resourceID,
externalAddress,
localCommunicationOnly,
taskExecutorResourceSpec);
// 创建task manager的MetricGroup
Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
metricRegistry,
externalAddress,
resourceID,
taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
// 创建用于IO任务的线程池
final ExecutorService ioExecutor = Executors.newFixedThreadPool(
taskManagerServicesConfiguration.getNumIoThreads(),
new ExecutorThreadFactory("flink-taskexecutor-io"));
// 创建Task Manager服务,是其他多种资源或服务的容器
// 它涉及的服务也非常多,稍后我们单独分析
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
taskManagerServicesConfiguration,
blobCacheService.getPermanentBlobService(),
taskManagerMetricGroup.f1,
ioExecutor,
fatalErrorHandler);
// 创建task manager的配置
TaskManagerConfiguration taskManagerConfiguration =
TaskManagerConfiguration.fromConfiguration(configuration, taskExecutorResourceSpec, externalAddress);
// 获取metrics查询服务的地址
String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
// 创建task executor
return new TaskExecutor(
rpcService,
taskManagerConfiguration,
highAvailabilityServices,
taskManagerServices,
externalResourceInfoProvider,
heartbeatServices,
taskManagerMetricGroup.f0,
metricQueryServiceAddress,
blobCacheService,
fatalErrorHandler,
new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));
}
到这里为止,一个完整的TaskExecutor
创建完毕。创建的过程涉及到了很多相关服务器的初始化,稍后在文末以脑图形式为大家总结。
TaskManagerServices的fromConfiguration方法
TaskManagerServices
是一系列TaskManager服务的容器,包含内存控制器,IO控制器,Shuffle环境等。各个服务的用途计划在后续博客中介绍。
这里我们重点关注它的fromConfiguration
方法。如下所示:
public static TaskManagerServices fromConfiguration(
TaskManagerServicesConfiguration taskManagerServicesConfiguration,
MetricGroup taskManagerMetricGroup,
Executor taskIOExecutor) throws Exception {
// pre-start checks
// 检查temp dir(yarn模式为local_dirs,standalone模式为java.io.tmpdir)目录是否存在,如果不存在会创建文件夹
// 是否temp dir是否是目录,是否可写入
checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
// 创建TaskEventDispatcher
// 任务事件派发器,用于消费任务向生产任务发送TaskEvent
final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
// 创建异步IO管理器
// 根据配置的tmp dir的个数,创建对应数量的读写线程,负责异步读写数据
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
// 创建ShuffleEnvironment
// 负责在本地提供一个shuffle环境,使用memory segment作为数据存储
// 可以创建数据写入端ResultPartitionWriter和数据消费端InputGate
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
taskManagerServicesConfiguration,
taskEventDispatcher,
taskManagerMetricGroup);
// 启动ShuffleManager
// 返回TaskManager的数据端口(taskmanager.data.port)
final int dataPort = shuffleEnvironment.start();
// 创建key value状态存储服务
final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
kvStateService.start();
// 封装task manager连接信息到TaskManagerLocation
final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
taskManagerServicesConfiguration.getResourceID(),
taskManagerServicesConfiguration.getTaskManagerAddress(),
dataPort);
// 创建广播变量管理器
final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
// 创建TaskSlotTable,维护task和slot的分配关系
final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
taskManagerServicesConfiguration.getNumberOfSlots(),
taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
taskManagerServicesConfiguration.getPageSize());
// 创建JobManagerTable,维护JobId和JobManager的对应关系
final JobManagerTable jobManagerTable = new JobManagerTable();
// 创建Job leader服务。Job leader是领导一个job的job manager。
// 一旦某个job manager获得leader角色,或者失去leader状态,会通知JobLeaderListener,位于TaskExecutor.java中
final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
// 读取本地状态保存根路径
// taskmanager.state.local.root-dirs
final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
// 创建目录
for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
}
// 创建任务状态管理器
final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
stateRootDirectoryFiles,
taskIOExecutor);
// 构建TaskManagerServices
return new TaskManagerServices(
taskManagerLocation,
taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
ioManager,
shuffleEnvironment,
kvStateService,
broadcastVariableManager,
taskSlotTable,
jobManagerTable,
jobLeaderService,
taskStateManager,
taskEventDispatcher);
}
TaskManager的启动
以上只是相关服务的创建逻辑,服务启动的逻辑位于start
方法中。
TaskManagerRunner
的start
方法如下所示:
public void start() throws Exception {
// 调用TaskExecutor的start方法
// 即TaskExecutor父类RpcEndpoint的start方法
taskManager.start();
}
RpcEndpoint启动的时候会调用生命周期的onStart
方法。
TaskExecutor
的onStart
方法:
@Override
public void onStart() throws Exception {
try {
// 启动TaskExecutor服务
startTaskExecutorServices();
} catch (Exception e) {
final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
// 启动超时检测
startRegistrationTimeout();
}
启动TaskExecutor服务的方法startTaskExecutorServices
代码如下:
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
// 启动leader变更监听服务,如果leader变更会通知ResourceManagerLeaderListener
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
// 启动task slot table,参数的SlotAction负责定义释放slot(freeSlot)和slot超时(timeoutSlot)逻辑
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
// 启动job leader服务
// 管理某个job的task manager是这个job的job leader
// job manager如果有变更,会通知JobLeaderListenerImpl
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
// 创建文件缓存
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
以上是TaskManager完整的创建和启动流程。
附录
TaskExecutor中的重要服务脑图
这些服务的作用在本人后续博客中计划陆续分析。