主要内容
首先要熟悉flink架构基础知识,了解client、jobmanager、taskmanager的基本用途;本文通过源码角度分析各个组件的启动流程深入理解client、jobmanager、taskmanager的功能和之间的交互
首先集群启动脚本分析,也是便于我们从源头找到阅读flink源码的流程。
集群启动脚本start-cluster
首先集群启动时会执行start-cluster脚本,如下:
$ ./bin/start-cluster.sh
start-cluster核心代码如下:
"${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
如上述会调用jobmanager.sh脚本
ENTRYPOINT=standalonesession
"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
jommanager.sh脚本中会调用Flink-daemon.sh脚本
(standalonesession)
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
jobmanager启动的入口类
jobmanager.sh脚本中会调用启动的入口类StandaloneSessionClusterEntrypoint,分析如下:
jobmanager功能概述
概述:jobmanager启动过程首先初始化各种服务(initializeServices(configuration, pluginManager)),包括haServices,blobServer,heartbeatServices,metricRegistry,archivedExecutionGraphStore,之后会启动三大组件WebMonitorEndpoint、ResourceManager、DispatcherRunner;WebMonitorEndpoint主要功能接收来自客户端的请求服务,然后转发到不同的handler处理,比如提交作业submitJob会调用SubmitJobHandler处理。;ResourceManager负责资源的分配和管理;Dispatcher 负责用于接收作业的提交,并进行持久化,生成要执行的作业管理器任务,并在主任务失败时进行恢复,还会会拉起一个JobMaster。
启动过程涉及的核心类及方法
StandaloneSessionClusterEntrypoint.main()->ClusterEntrypoint.runCluster()
具体过程:
public static void main(String[] args) {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(
LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
//获取配置
final EntrypointClusterConfiguration entrypointClusterConfiguration =
ClusterEntrypointUtils.parseParametersOrExit(
args,
new EntrypointClusterConfigurationParserFactory(),
StandaloneSessionClusterEntrypoint.class);
Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
StandaloneSessionClusterEntrypoint entrypoint =
new StandaloneSessionClusterEntrypoint(configuration);
//启动集群
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
}
执行main方法启动集群jobmanager,ClusterEntrypoint.runClusterEntrypoint(entrypoint);
最终会调用ClusterEntrypoint的runCluster()方法:
private void runCluster(Configuration configuration, PluginManager pluginManager)
throws Exception {
synchronized (lock) {
/**
*初始化各种服务(6个):
*commonRpcService 一个基于akka得actorSystem,内部会启动一个actor;主节点启动完会自己给自己发送一个hello消息,最终是有这个service处理的。
*ioExecutor 专门处理IO的服务。
*haServices 具体是哪种高可用实现是读取flink-conf.yml配置(high-availability = zookeeper),一般是创建一个zookeeperHaService内部主要使用curator框架。
*blobServer做大文件的传送服务比如用户作业的jar包,TM上传log文件等
*heartbeatServices
*/
initializeServices(configuration, pluginManager);
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
final DispatcherResourceManagerComponentFactory
dispatcherResourceManagerComponentFactory =
createDispatcherResourceManagerComponentFactory(configuration);
//创建组件
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(
metricRegistry.getMetricQueryServiceRpcService()),
this);
}
}
调用DefaultDispatcherResourceManagerComponentFactory.create()创建WebMonitorEndpoint、ResourceManager、Dispatcher;代码如下:
public DispatcherResourceManagerComponent create(
Configuration configuration,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler)
throws Exception {
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
LeaderRetrievalService resourceManagerRetrievalService = null;
/**
* TODO 创建三大组件
*/
WebMonitorEndpoint<?> webMonitorEndpoint = null;
ResourceManager<?> resourceManager = null;
DispatcherRunner dispatcherRunner = null;
......
///返回DispatcherRestEndpoint类型的对象
webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
//TODO 启动的关键方法-重点分析
webMonitorEndpoint.start();
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
metricRegistry,
hostname,
ioExecutor);
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
log.debug("Starting ResourceManager.");
resourceManager.start();
如上首先初始化各种服务,之后创建三大组件WebMonitorEndpoint、ResourceManager、DispatcherRunner。
分析WebMonitorEndpoint、ResourceManager、DispatcherRunner组件启动过程
WebMonitorEndpoint启动
WebMointorEndpoint类图:
调用从父类RestServerEndpoint继承的start()方法,start()方法主要作用有3个:1初始化各个handler 2.启动netty服务 3,进行leader选举.
/**
* Starts this REST server endpoint.
*
* @throws Exception if we cannot start the RestServerEndpoint
*/
public final void start() throws Exception {
.....
//1.初始化handler 调用DispatcherRestEndpoint对象的initializeHandlers方法
handlers = initializeHandlers(restAddressFuture);
//2.启动netty引导器 ,ServerBootstrap
bootstrap = new ServerBootstrap();
//3.启动WebMonitEndpoint leader选举
startInternal();
.....
}
调用DispatcherRestEndpoint类的initializeHandlers()方法初始化handlers:
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(
final CompletableFuture<String> localAddressFuture) {
/**
* //TODO ChannelInboundHandler channelRead0()方法,这个方法会自动被Netty去调用执行
* 不管初始的哪个handler,里面都有一个 handlerRequest的方法
* channelRead0()的底层,最终调用的就是handler.handleRequest()方法,
* 当我们提交job的时候,最终由WebMonitorEndpoint 接收到,跳转到JobSubmitHandler来执行
* 最终执行请求的就是handle Request()方法
*/
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers =
//调用父类WebMonitorEndpoint的方法
super.initializeHandlers(localAddressFuture);
// Add the Dispatcher specific handlers
final Time timeout = restConfiguration.getTimeout();
//处理用户提交作业请求的handler
JobSubmitHandler jobSubmitHandler =
new JobSubmitHandler(
leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration);
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
return handlers;
}
调用WebMonitorEndpoint类的startInternal() :
/**
*启动leader选举
*
*/
public void startInternal() throws Exception {
//执行leader选举
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();
if (hasWebUI) {
log.info("Web frontend listening at {}.", getRestBaseUrl());
}
}
leaderElectionService.start()方法在Flink代码中会经常看到,一般在需要leader选举时都会调用。主要使用Curator框架,leader选举成功将会回调当前类的grantLeadership()方法。
未完待续,下篇将继续分析Dispatcher的创建流程及作用。