flink将用户编写的程序转换为jobGraph进行提交。ProgramDeployer线程类负责将用户程序部署到集群中,它根据ExecutionContext是否包含ClusterId选择重新启动集群部署任务或者将任务运行在已有的集群中。同时,在部署时需要获取ClusterDescriptor,也就是集群相关的配置信息,flink根据启动的命令行来解析将任务部署在yarn集群还是standalone集群。当然如果我们不是通过执行flink脚本的当时提交任务,可以根据部署的方式手动构建ClusterDescriptor。我们的任务部署在yarn上,所以使用的是YarnClusterDescriptor。
创建ClusterDescriptor
private AbstractYarnClusterDescriptor getClusterDescriptor(
Configuration configuration,
YarnConfiguration yarnConfiguration,
String configurationDirectory) {
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
configurationDirectory,
yarnClient,
false);
}
集群启动入口
private <T> void deployJob(ExecutionContext<T> context, JobGraph jobGraph, Result<T> result) {
// create or retrieve cluster and deploy job
try (final ClusterDescriptor<T> clusterDescriptor = context.createClusterDescriptor()) {
try {
// 创建新集群
if (context.getClusterId() == null) {
deployJobOnNewCluster(clusterDescriptor, jobGraph, result, context.getClassLoader());
}
// 将任务提交到已有的集群中
else {
deployJobOnExistingCluster(context.getClusterId(), clusterDescriptor, jobGraph, result);
}
} catch (Exception e) {
throw new SqlExecutionException("Could not retrieve or create a cluster.", e);
}
} catch (SqlExecutionException e) {
throw e;
} catch (Exception e) {
throw new SqlExecutionException("Could not locate a cluster.", e);
}
}
创建新的集群
- 通过yarn部署flink集群,并执行jobGraph。
- 根据执行的是query或者update语句,来选择是否阻塞等待执行结果。并将结果存储到executionResultBucket中。
private <T> void deployJobOnNewCluster(
ClusterDescriptor<T> clusterDescriptor,
JobGraph jobGraph,
Result<T> result,
ClassLoader classLoader) throws Exception {
ClusterClient<T> clusterClient = null;
try {
// 将Job部署到新集群中,detached为false表示任务结束需要返回结果
clusterClient = clusterDescriptor.deployJobCluster(context.getClusterSpec(), jobGraph, false);
// 保存集群id和weburl
result.setClusterInformation(clusterClient.getClusterId(), clusterClient.getWebInterfaceURL());
// sql or update
if (awaitJobResult) {
// we need to hard cast for now
final JobExecutionResult jobResult = ((RestClusterClient<T>) clusterClient)
.requestJobResult(jobGraph.getJobID())
.get()
.toJobExecutionResult(context.getClassLoader()); // throws exception if job fails
executionResultBucket.add(jobResult);
}
} finally {
try {
if (clusterClient != null) {
clusterClient.shutdown();
}
} catch (Exception e) {
// ignore
}
}
}
Yarn应用部署
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {
// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);
try {
return deployInternal(
clusterSpecification , // 集群启动时的配置
"Flink per-job cluster", //任务名称
getYarnJobClusterEntrypoint(), // Appmaster启动类
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
}
}
通过调用AbstractYarnClusterDescriptor#deployInternal方法来启动Flink应用程序。其中主要流程有:
- 内存相关配置验证。
- 对 cutoff ratio参数检查,也就是分配给contains内的其他JVM使用的内存率(0-1)。
- 对 minCutoff参数检查,minCutoff(默认600M)要小于taskManagerMemoryMB。
- 非堆内存大小验证。非堆内存要小于taskManagerMemoryMB-minCutoff-networkReservedMemory。
private void validateClusterSpecification(ClusterSpecification clusterSpecification) {
// 获取flink配置的taskManager内存大小
final long taskManagerMemorySize = clusterSpecification.getTaskManagerMemoryMB();
// 计算contains中被其他Jvm程序使用的内存大小,规则 min(taskManager内存*0.25, 600M),比如设置的tm内存1024,分配给其他程序的有600M.
final long cutoff = ContaineredTaskManagerParameters.calculateCutoffMB(flinkConfiguration, taskManagerMemorySize);
// 从剩余的内存中计算应用程序堆内存大小
TaskManagerServices.calculateHeapSizeMB(taskManagerMemorySize - cutoff, flinkConfiguration);
}
public static long calculateHeapSizeMB(long totalJavaMemorySizeMB, Configuration config) {
// M转字节
final long totalProcessMemory = megabytesToBytes(totalJavaMemorySizeMB);
// 网络通信使用的内存,64M
final long networkReservedMemory = getReservedNetworkMemory(config, totalProcessMemory);
// 424-64 = 360m
final long heapAndManagedMemory = totalProcessMemory - networkReservedMemory;
// 如果设置了非堆内存参数,对非堆内存大小进行检查
if (config.getBoolean(TaskManagerOptions.MEMORY_OFF_HEAP)) {
final long managedMemorySize = getManagedMemoryFromHeapAndManaged(config, heapAndManagedMemory);
// 非堆大小进行检查
ConfigurationParserUtils.checkConfigParameter(managedMemorySize < heapAndManagedMemory, managedMemorySize,
TaskManagerOptions.MANAGED_MEMORY_SIZE.key(),
"Managed memory size too large for " + (networkReservedMemory >> 20) +
" MB network buffer memory and a total of " + totalJavaMemorySizeMB +
" MB JVM memory");
return bytesToMegabytes(heapAndManagedMemory - managedMemorySize);
}
else {
return bytesToMegabytes(heapAndManagedMemory);
}
}
- flink依赖、申请core验证。
- Flink lib、config相关路径检查
- Yarn配置文件路径检查
- 启动JM时的核心数,要小于所有NM可用的最大核心数。
- 启动TM时的slot个数,要小于所有NM可用的最大核心数
private void isReadyForDeployment(ClusterSpecification clusterSpecification) throws YarnDeploymentException {
if (clusterSpecification.getNumberTaskManagers() <= 0) {
throw new YarnDeploymentException("Taskmanager count must be positive");
}
if (this.flinkJarPath == null) {
throw new YarnDeploymentException("The Flink jar path is null");
}
if (this.configurationDirectory == null) {
throw new YarnDeploymentException("Configuration directory not set");
}
if (this.flinkConfiguration == null) {
throw new YarnDeploymentException("Flink configuration object has not been set");
}
// 判断要启动的flink jm核数是否大于已经Yarn允许启动的最大核数
// Check if we don't exceed YARN's maximum virtual cores.
// Fetch numYarnMaxVcores from all the RUNNING nodes via yarnClient
final int numYarnMaxVcores; // yarn max vcores
try {
numYarnMaxVcores = yarnClient.getNodeReports(NodeState.RUNNING)
.stream()
.mapToInt(report -> report.getCapability().getVirtualCores())
.max()
.orElse(0);
} catch (Exception e) {
throw new YarnDeploymentException("Couldn't get cluster description, please check on the YarnConfiguration", e);
}
// AM 核心检查
int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
if (configuredAmVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format("The number of requested virtual cores for application master %d" +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
configuredAmVcores, numYarnMaxVcores));
}
// yarn.containers.vcores slot检查
int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnMaxVcores) {
throw new IllegalConfigurationException(
String.format("The number of requested virtual cores per node %d" +
" exceeds the maximum number of virtual cores %d available in the Yarn Cluster." +
" Please note that the number of virtual cores is set to the number of task slots by default" +
" unless configured in the Flink config with '%s.'",
configuredVcores, numYarnMaxVcores, YarnConfigOptions.VCORES.key()));
}
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null &&
System.getenv("YARN_CONF_DIR") == null) {
LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. " +
"The Flink YARN Client needs one of these to be set to properly load the Hadoop " +
"configuration for accessing YARN.");
}
}
- 任务绑定的队列是否存在检查
private void checkYarnQueues(YarnClient yarnClient) {
try {
List<QueueInfo> queues = yarnClient.getAllQueues();
if (queues.size() > 0 && this.yarnQueue != null) { // check only if there are queues configured in yarn and for this session.
boolean queueFound = false;
for (QueueInfo queue : queues) {
if (queue.getQueueName().equals(this.yarnQueue)) {
queueFound = true;
break;
}
}
if (!queueFound) {
String queueNames = "";
for (QueueInfo queue : queues) {
queueNames += queue.getQueueName() + ", ";
}
LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. " +
"Available queues: " + queueNames);
}
} else {
LOG.debug("The YARN cluster does not have any queues configured");
}
} catch (Throwable e) {
LOG.warn("Error while getting queue information from YARN: " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("Error details", e);
}
}
}
- 动态参数绑定
- 启动时绑定的动态参数填充到flinkConfiguration,例如:-D env.java.opts=-DappName=foobar
// ------------------ Add dynamic properties to local flinkConfiguraton ------
Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}
- Yarn内存资源检查
- 获取集群允许调度的最大、最小核数和内存<memory:, vCores:>。
- 根据YarnClient获取运行状态的节点,并计算出集群总的剩余内存totalFreeMemory、每个NM剩余内存nodeManagersFree、NM中剩余最大内存containerLimit。
- JM,TM内存下限设置为运行Yarn调度的最小内存,并且小于Yarn调度的最大内存,且要小于containerLimit,并且申请的总内存要小于totalFreeMemory。
- 返回flink运行配置ClusterSpecification。
// ------------------ Check if the YARN ClusterClient has the requested resources --------------
// Create application via yarnClient
final YarnClientApplication yarnApplication = yarnClient.createApplication();
final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
Resource maxRes = appResponse.getMaximumResourceCapability();
// 剩余集群内存进行统计
final ClusterResourceDescription freeClusterMem;
try {
freeClusterMem = getCurrentFreeClusterResources(yarnClient);
} catch (YarnException | IOException e) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
}
// 调度小分配的内存
final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
final ClusterSpecification validClusterSpecification;
try {
validClusterSpecification = validateClusterResources(
clusterSpecification,
yarnMinAllocationMB,
maxRes,
freeClusterMem);
} catch (YarnDeploymentException yde) {
failSessionDuringDeployment(yarnClient, yarnApplication);
throw yde;
}
protected ClusterSpecification validateClusterResources(
ClusterSpecification clusterSpecification,
int yarnMinAllocationMB,
Resource maximumResourceCapability,
ClusterResourceDescription freeClusterResources) throws YarnDeploymentException {
int taskManagerCount = clusterSpecification.getNumberTaskManagers();
int jobManagerMemoryMb = clusterSpecification.getMasterMemoryMB();
int taskManagerMemoryMb = clusterSpecification.getTaskManagerMemoryMB();
if (jobManagerMemoryMb < yarnMinAllocationMB || taskManagerMemoryMb < yarnMinAllocationMB) {
LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. "
+ "The value of 'yarn.scheduler.minimum-allocation-mb' is '" + yarnMinAllocationMB + "'. Please increase the memory size." +
"YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances " +
"you requested will start.");
}
// set the memory to minAllocationMB to do the next checks correctly
if (jobManagerMemoryMb < yarnMinAllocationMB) {
jobManagerMemoryMb = yarnMinAllocationMB;
}
if (taskManagerMemoryMb < yarnMinAllocationMB) {
taskManagerMemoryMb = yarnMinAllocationMB;
}
final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n";
if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\n"
+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + jobManagerMemoryMb + "MB. " + note);
}
if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\n"
+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb + "MB. " + note);
}
final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are " +
"connecting from the beginning because the resources are currently not available in the cluster. " +
"The allocation might take more time than usual because the Flink YARN client needs to wait until " +
"the resources become available.";
int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the cluster. "
+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available." + noteRsc);
}
if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
}
if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb + "MB) is more than "
+ "the largest possible YARN container: " + freeClusterResources.containerLimit + noteRsc);
}
// ----------------- check if the requested containers fit into the cluster.
int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
// first, allocate the jobManager somewhere.
if (!allocateResource(nmFree, jobManagerMemoryMb)) {
LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. " +
"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: " +
Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
}
// allocate TaskManagers
for (int i = 0; i < taskManagerCount; i++) {
if (!allocateResource(nmFree, taskManagerMemoryMb)) {
LOG.warn("There is not enough memory available in the YARN cluster. " +
"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree) + "\n" +
"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/" + taskManagerCount + ") TaskManagers, " +
"the following NodeManagers are available: " + Arrays.toString(nmFree) + noteRsc);
}
}
return new ClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
.setTaskManagerMemoryMB(taskManagerMemoryMb)
.setNumberTaskManagers(clusterSpecification.getNumberTaskManagers())
.setSlotsPerTaskManager(clusterSpecification.getSlotsPerTaskManager())
.createClusterSpecification();
}
- 启动AM
Yarn启动AppMaster也就是启动Flink JobManager是最为核心的步骤,它主要包含以下步骤:
- 根据配置文件初始化文件系统,在hdfs下创建/user/admin/.flink/application_xx/临时目录,任务正常停止会删除。
- 上传环境变量配置的文件、命令行指定的文件、日志文件、jobGraph依赖的jar。
- 创建并填充ContainerLaunchContext,主要是启动JM的命令行,入口类为org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint,堆内存大小为cutoff后的大小,如果JM内存配置为1024M,xmx设置为424M。
- 创建并填充ApplicationSubmissionContext,包括ContainerLaunchContext的 LocalResource、启动时的环境参数。
- 阻塞等待,直到获取应用成功部署状态。
启动JM的命令行:
"$JAVA_HOME/bin/java -Xmx424m -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+CMSIncrementalMode -XX:+CMSIncrementalPacing org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 1> <LOG_DIR>/jobmanager.out 2> <LOG_DIR>/jobmanager.err"
学习点:
tmpConfigurationFile.deleteOnExit(); //JVM退出时删除临时文件
// JVM添加停止时的回调线程
// add a hook to clean up in case deployment fails 部署失败删除yarnFilesDir
Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
//部署成功则移除
// since deployment was successful, remove the hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
public ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification) throws Exception {
// ------------------ Initialize the file systems -------------------------
org.apache.flink.core.fs.FileSystem.initialize(
configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));
// initialize file system
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
final FileSystem fs = FileSystem.get(yarnConfiguration);
final Path homeDir = fs.getHomeDirectory(); // 本地文件拷贝到文件系统的目录 /user/admin/
// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
fs.getScheme().startsWith("file")) {
LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
+ "The Flink YARN client needs to store its files in a distributed file system");
}
//===========ApplicationSubmissionContext
ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
for (File file : shipFiles) {
systemShipFiles.add(file.getAbsoluteFile()); // 命令行中绑定的jar文件
}
//上传conf文件夹下的logback.xml
//check if there is a logback or log4j file
File logbackFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
final boolean hasLogback = logbackFile.exists();
if (hasLogback) {
systemShipFiles.add(logbackFile);
}
//上传conf文件夹下的log4j.properties
File log4jFile = new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
final boolean hasLog4j = log4jFile.exists();
if (hasLog4j) {
systemShipFiles.add(log4jFile);
if (hasLogback) {
// this means there is already a logback configuration file --> fail
LOG.warn("The configuration directory ('" + configurationDirectory + "') contains both LOG4J and " +
"Logback configuration files. Please delete or rename one of them.");
}
}
// 上传环境变量中FLINK_PLUGINS_DIR ,FLINK_LIB_DIR包含的jar
addEnvironmentFoldersToShipFiles(systemShipFiles);
// 启动ApplicationSubmissionContext
// Set-up ApplicationSubmissionContext for the application
final ApplicationId appId = appContext.getApplicationId();
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
// 如果启动命令没有加zk相关参数,则使用flinkConfig文件
String zkNamespace = getZookeeperNamespace();
// no user specified cli argument for namespace?
if (zkNamespace == null || zkNamespace.isEmpty()) {
// namespace defined in config? else use applicationId as default.
// ha zk下的路径,默认/default
zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
setZookeeperNamespace(zkNamespace);
}
configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
// activate re-execution of failed applications
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
//设置失败间隔
activateHighAvailabilitySupport(appContext);
} else {
// set number of application retries to 1 in the default case
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
1));
}
// 上传用户程序的jar
final Set<File> userJarFiles = (jobGraph == null)
// not per-job submission
? Collections.emptySet()
// add user code jars from the provided JobGraph
: jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());
// local resource map for Yarn
final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
// list of remote paths (after upload)
final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
// ship list that enables reuse of resources for task manager containers
StringBuilder envShipFileList = new StringBuilder();
// 将要上传文件的路径填充到Paths中
// upload and register ship files
List<String> systemClassPaths = uploadAndRegisterFiles(
systemShipFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
final List<String> userClassPaths = uploadAndRegisterFiles(
userJarFiles,
fs,
homeDir,
appId,
paths,
localResources,
envShipFileList);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
systemClassPaths.addAll(userClassPaths);
}
// normalize classpath by sorting
Collections.sort(systemClassPaths);
Collections.sort(userClassPaths);
// classpath assembler
StringBuilder classPathBuilder = new StringBuilder();
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
for (String classPath : systemClassPaths) {
classPathBuilder.append(classPath).append(File.pathSeparator);
}
// Setup jar for ApplicationMaster
Path remotePathJar = setupSingleLocalResource(
"flink.jar",
fs,
appId,
flinkJarPath,
localResources,
homeDir,
"");
// set the right configuration values for the TaskManager
configuration.setInteger(
TaskManagerOptions.NUM_TASK_SLOTS,
clusterSpecification.getSlotsPerTaskManager());
configuration.setString(
TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY,
clusterSpecification.getTaskManagerMemoryMB() + "m");
// 本地创建appliction_id--flink-conf.yaml-randonm 配置文件,默认创建在命令行启动路径下
// Upload the flink configuration
// write out configuration file
File tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
//jvm 退出时删除
tmpConfigurationFile.deleteOnExit();
BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);
//flink-conf.yaml中的内容拷贝到临时文件中
Path remotePathConf = setupSingleLocalResource(
"flink-conf.yaml",
fs,
appId,
new Path(tmpConfigurationFile.getAbsolutePath()),
localResources,
homeDir,
"");
paths.add(remotePathJar);
classPathBuilder.append("flink.jar").append(File.pathSeparator);
paths.add(remotePathConf);
classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
for (String userClassPath : userClassPaths) {
classPathBuilder.append(userClassPath).append(File.pathSeparator);
}
}
//序列化job graph并上传,local resource Key为job.graph
// write job graph to tmp file and add it to local resource
// TODO: server use user main method to generate job graph
if (jobGraph != null) {
try {
File fp = File.createTempFile(appId.toString(), null);
fp.deleteOnExit();
try (FileOutputStream output = new FileOutputStream(fp);
ObjectOutputStream obOutput = new ObjectOutputStream(output);){
obOutput.writeObject(jobGraph);
}
final String jobGraphFilename = "job.graph";
flinkConfiguration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);
Path pathFromYarnURL = setupSingleLocalResource(
jobGraphFilename,
fs,
appId,
new Path(fp.toURI()),
localResources,
homeDir,
"");
paths.add(pathFromYarnURL);
classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
} catch (Exception e) {
LOG.warn("Add job graph to local resource fail");
throw e;
}
}
final Path yarnFilesDir = getYarnFilesDir(appId);
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
fs.setPermission(yarnFilesDir, permission); // set permission for path.
//To support Yarn Secure Integration Test Scenario
//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
//and KRB5 configuration files. We are adding these files as container local resources for the container
//applications (JM/TMs) to have proper secure cluster setup
Path remoteKrb5Path = null;
Path remoteYarnSiteXmlPath = null;
boolean hasKrb5 = false;
if (System.getenv("IN_TESTS") != null) {
File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
Path yarnSitePath = new Path(f.getAbsolutePath());
remoteYarnSiteXmlPath = setupSingleLocalResource(
Utils.YARN_SITE_FILE_NAME,
fs,
appId,
yarnSitePath,
localResources,
homeDir,
"");
String krb5Config = System.getProperty("java.security.krb5.conf");
if (krb5Config != null && krb5Config.length() != 0) {
File krb5 = new File(krb5Config);
LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
Path krb5ConfPath = new Path(krb5.getAbsolutePath());
remoteKrb5Path = setupSingleLocalResource(
Utils.KRB5_FILE_NAME,
fs,
appId,
krb5ConfPath,
localResources,
homeDir,
"");
hasKrb5 = true;
}
}
// setup security tokens
Path remotePathKeytab = null;
String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
if (keytab != null) {
LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
remotePathKeytab = setupSingleLocalResource(
Utils.KEYTAB_FILE_NAME,
fs,
appId,
new Path(keytab),
localResources,
homeDir,
"");
}
// 创建amContainer 并填充启动am的java命令, jm1024----->1024-600 =xmx424
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasLogback,
hasLog4j,
hasKrb5,
clusterSpecification.getMasterMemoryMB());
if (UserGroupInformation.isSecurityEnabled()) {
// set HDFS delegation tokens when security is enabled
LOG.info("Adding delegation token to the AM container..");
Utils.setTokensFor(amContainer, paths, yarnConfiguration);
}
// 为Yarn启动绑定环境参数
amContainer.setLocalResources(localResources);
fs.close();
// 绑定启动的classpath
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables 加载到环境变量中的参数的前缀
appMasterEnv.putAll(Utils.getEnvironmentVariables(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());
// set Flink on YARN internal configuration values
appMasterEnv.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(clusterSpecification.getNumberTaskManagers()));
appMasterEnv.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(clusterSpecification.getTaskManagerMemoryMB()));
appMasterEnv.put(YarnConfigKeys.FLINK_JAR_PATH, remotePathJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, homeDir.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, envShipFileList.toString());
appMasterEnv.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(clusterSpecification.getSlotsPerTaskManager()));
appMasterEnv.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(detached));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, yarnFilesDir.toUri().toString());
// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());
if (remotePathKeytab != null) {
appMasterEnv.put(YarnConfigKeys.KEYTAB_PATH, remotePathKeytab.toString());
String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
}
//To support Yarn Secure Integration Test Scenario
if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
if (dynamicPropertiesEncoded != null) {
appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
}
// 环境变量中的参数添加到appMasterEnv
// set classpath from YARN configuration
Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);
amContainer.setEnvironment(appMasterEnv);
// 为ApplicationMaster绑定资源
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(clusterSpecification.getMasterMemoryMB());
capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));// yarn.appmaster.vcores 默认为1
final String customApplicationName = customName != null ? customName : applicationName;
// 应用名称、应用类型、队列优先级、用户提交的应用、ContainerLaunchContext
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
// 应用程序节点标签, 走标签调度
setApplicationNodeLabel(appContext);
// 设置多个标签
setApplicationTags(appContext);
// 回调函数, 任务提交成功后执行
// add a hook to clean up in case deployment fails 部署失败删除yarnFilesDir
Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
LOG.info("Submitting application master " + appId);
yarnClient.submitApplication(appContext);
LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
// 阻塞等待直到创建成功
loop: while (true) {
try {
report = yarnClient.getApplicationReport(appId);
} catch (IOException e) {
throw new YarnDeploymentException("Failed to deploy the cluster.", e);
}
YarnApplicationState appState = report.getYarnApplicationState();
LOG.debug("Application State: {}", appState);
switch(appState) {
case FAILED:
case FINISHED:
case KILLED:
throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
+ appState + " during deployment. \n" +
"Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
"If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" +
"yarn logs -applicationId " + appId);
//break ..
case RUNNING:
LOG.info("YARN application has been deployed successfully.");
break loop;
default:
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - startTime > 60000) {
LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
}
lastAppState = appState;
Thread.sleep(250);
}
// print the application id for user to cancel themselves.
if (isDetachedMode()) {
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop " +
"it:\nyarn application -kill " + appId + "\nPlease also note that the " +
"temporary files of the YARN session in the home directory will not be removed.");
}
// 部署成功移除shutdown 回调
// since deployment was successful, remove the hook
ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
return report;
}
上传到HDFS中的文件,红色表示序列化的JobGraph,local Resource中的key为job.graph。
- 填充运行时配置,返回集群客户端。
String host = report.getHost();
int port = report.getRpcPort();
// Correctly initialize the Flink config
flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
flinkConfiguration.setInteger(JobManagerOptions.PORT, port);
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);
// the Flink cluster is deployed in YARN. Represent cluster
return createYarnClusterClient(
this,
validClusterSpecification.getNumberTaskManagers(),
validClusterSpecification.getSlotsPerTaskManager(),
report,
flinkConfiguration,
true);
protected ClusterClient<ApplicationId> createYarnClusterClient(
AbstractYarnClusterDescriptor descriptor,
int numberTaskManagers,
int slotsPerTaskManager,
ApplicationReport report,
Configuration flinkConfiguration,
boolean perJobCluster) throws Exception {
return new RestClusterClient<>(
flinkConfiguration,
report.getApplicationId());
}
AM运行时获取JobGraph
生产环境使用yarn per-job模式执行,也就是一个yarn应用只执行一个jobGraph,看下yarn per-job模式的执行流程。
jobGraph的执行是由Dispatcher发起的,在Dispatcher创建的时候会先从LocalResource中提取该JobGraph,把它传递给SubmittedJobGraphStore的子类SingleJobSubmittedJobGraphStore,当Dispatcher启动并成为Leader后会从SingleJobSubmittedJobGraphStore里面恢复JobGraph,执行调度。具体流程:
- 集群启动时,创建Dispatcher和ResourceManager的工厂类。传递JobGraphRetriever子类FileJobGraphRetriever,表示从LocalResource文件中提取JobGraph.
ClusterEntrypoint#runCluster
final DispatcherResourceManagerComponentFactory<?> dispatcherResourceManagerComponentFactory = createDispatcherResourceManagerComponentFactory(configuration);
protected DispatcherResourceManagerComponentFactory<?> createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return new JobDispatcherResourceManagerComponentFactory(
YarnResourceManagerFactory.INSTANCE,
// 从上传的文件中提取JobGrap
FileJobGraphRetriever.createFrom(configuration));
}
- 创建MiniDispatcher反序列化JobGraph
@Override
public MiniDispatcher createDispatcher(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist) throws Exception {
// 获取jobGraph
final JobGraph jobGraph = jobGraphRetriever.retrieveJobGraph(configuration);
final String executionModeValue = configuration.getString(EXECUTION_MODE);
final ClusterEntrypoint.ExecutionMode executionMode = ClusterEntrypoint.ExecutionMode.valueOf(executionModeValue);
return new MiniDispatcher(
rpcService,
getEndpointId(),
configuration,
highAvailabilityServices,
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore,
DefaultJobManagerRunnerFactory.INSTANCE,
fatalErrorHandler,
historyServerArchivist,
jobGraph,
executionMode);
}
- 将该jobGraph存储到SingleJobSubmittedJobGraphStore
public MiniDispatcher(
RpcService rpcService,
String endpointId,
Configuration configuration,
HighAvailabilityServices highAvailabilityServices,
GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
JobManagerMetricGroup jobManagerMetricGroup,
@Nullable String metricQueryServicePath,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
JobManagerRunnerFactory jobManagerRunnerFactory,
FatalErrorHandler fatalErrorHandler,
HistoryServerArchivist historyServerArchivist,
JobGraph jobGraph,
JobClusterEntrypoint.ExecutionMode executionMode) throws Exception {
super(
rpcService,
endpointId,
configuration,
highAvailabilityServices,
new SingleJobSubmittedJobGraphStore(jobGraph),
resourceManagerGatewayRetriever,
blobServer,
heartbeatServices,
jobManagerMetricGroup,
metricQueryServicePath,
archivedExecutionGraphStore,
jobManagerRunnerFactory,
fatalErrorHandler,
historyServerArchivist);
this.executionMode = checkNotNull(executionMode);
this.jobTerminationFuture = new CompletableFuture<>();
}
- Dispatcher成为Leader后,进行任务恢复,从submittedJobGraphStore拿到传递的JobGraph后开始部署。
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
runAsyncWithoutFencing(
() -> {
log.info("Dispatcher {} was granted leadership with fencing token {}", getAddress(), newLeaderSessionID);
final CompletableFuture<Collection<JobGraph>> recoveredJobsFuture = recoveryOperation.thenApplyAsync(
// recoverJobs 任务恢复
FunctionUtils.uncheckedFunction(ignored -> recoverJobs()),
getRpcService().getExecutor());
final CompletableFuture<Boolean> fencingTokenFuture = recoveredJobsFuture.thenComposeAsync(
(Collection<JobGraph> recoveredJobs) -> tryAcceptLeadershipAndRunJobs(newLeaderSessionID, recoveredJobs),
getUnfencedMainThreadExecutor());
final CompletableFuture<Void> confirmationFuture = fencingTokenFuture.thenCombineAsync(
recoveredJobsFuture,
BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
if (confirmLeadership) {
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
} else {
for (JobGraph recoveredJob : recoveredJobs) {
submittedJobGraphStore.releaseJobGraph(recoveredJob.getJobID());
}
}
return null;
}),
getRpcService().getExecutor());
confirmationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
onFatalError(
new DispatcherException(
String.format("Failed to take leadership with session id %s.", newLeaderSessionID),
(ExceptionUtils.stripCompletionException(throwable))));
}
});
recoveryOperation = confirmationFuture;
});
}
/**
* Recovers all jobs persisted via the submitted job graph store.
*/
@VisibleForTesting
Collection<JobGraph> recoverJobs() throws Exception {
log.info("Recovering all persisted jobs.");
final Collection<JobID> jobIds = submittedJobGraphStore.getJobIds();
try {
return recoverJobGraphs(jobIds);
} catch (Exception e) {
// release all recovered job graphs
for (JobID jobId : jobIds) {
try {
submittedJobGraphStore.releaseJobGraph(jobId);
} catch (Exception ie) {
e.addSuppressed(ie);
}
}
throw e;
}
}
@Nonnull
private Collection<JobGraph> recoverJobGraphs(Collection<JobID> jobIds) throws Exception {
final List<JobGraph> jobGraphs = new ArrayList<>(jobIds.size());
for (JobID jobId : jobIds) {
final JobGraph jobGraph = recoverJob(jobId);
if (jobGraph == null) {
throw new FlinkJobNotFoundException(jobId);
}
jobGraphs.add(jobGraph);
}
return jobGraphs;
}