本文基于 incubator-livy 0.4.0-incubating
从Livy Rest Api的介绍中我们可以知道,livy 共有两种 job,分别是 session 和 batch。然而,在源码实现中,session 和 batch 都是 Session 的子类,rest api 中的 session 对应源码中的 InteractivateSession
;rest api 中的 batch 对应源码中的 BatchSession
。在之后关于 livy 的所有文章中,session 或 batch 对应 rest api 中的含义,InteractivateSession
和 BatchSession
及 Session
都对应代码中的含义。
session 和 batch 的创建过程也很不相同,batch 的创建以对应的 spark app 启动为终点;而 session 除了要启动相应的 spark app,还要能支持共享 sparkContext 来接受一个个 statements 的提交及运行,我将 session 的创建分为两个大步骤:
- client 端:运行在 LivyServer 中,接受 request 直到启动 spark app(注意,这里虽然叫 client 端,但是运行在 LivyServer 中的)
- server 端:session 对应的 spark app driver 的启动
这篇文章主要讲讲 client 端
都做了些什么
一:整体流程
一图胜千言,上图就是创建一个 session 在 client 端的主要流程,我们将以注释的方式来说明那些没那么重要或复杂的流程,而核心的流程都在下文中分小节进行剖析。
二:启动 session 对应的 spark app
接下来直捣黄龙,直接到第 (8) 步 ContextLauncher#startDriver
看看 session 对应的 spark app 是如何启动的。ContextLauncher#startDriver
可以分为两个大步骤:
- 启动 spark app
- 等待 SparkSubmit 退出
2.2:启动 spark app
如上图,startDriver 无非就是 new 了一个 SparkLauncher
对象,进行了配置、资源、mainClass 等设置,然后调用 launch()
方法拿到了 SparkSubmit 进程的 对应的 Process 对象 process。
可以看到,session 对应的 spark app 的 mainClass 为 org.apache.livy.rsc.driver.RSCDriverBootstrapper
2.3:等待 SparkSubmit 退出
SparkLauncher#launch()
返回的进程是 SparkSubmit 进程,再返回 process 后,会 new 一个 ContextLauncher.ChildProcess
对象,在过程中会新启动一个线程来一直等待 SparkSubmit 进程退出,该线程中的逻辑如下:若 SparkSubmit 非正常退出(exitCode != 0),表示 Spark App 启动失败,会抛异常
public void run() {
try {
int exitCode = child.waitFor();
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
fail(new IOException(String.format("Child process exited with code %d.", exitCode)));
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
Thread.interrupted();
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
}
}
三:与 driver 建立连接
我们知道,session 最大的特点就是可以共享 SparkContext,让用户提交的多个代码片段都能跑在一个 SparkContext 上,这有两个好处:
- 大大加速任务的启动速度:我们知道,在 yarn 上启动一个 app 是比较耗时的,一般都需要 20s 左右;而使用 session,除了启动 session 也需要相当的耗时外,之后提交的代码片段都将立即执行
- 共享 RDD、table:持久化的 RDD、table 都可以被之后的代码片段使用,这在不同用户需要在相同的 RDD、table 上做计算的场景非常有用
而共享 SparkContext 就需要 client 与 driver 之间建立起连接,能让 client 向 driver 发送代码片段、查询运行状态、获取运行结果等
3.1:client 传递其 RpcServer 信息给 driver
时序图中的第 (5) 步:RSCClientFactory#createClient
,在该调用中创建了一个 org.apache.livy.rsc.rpc.RpcServer
(后文简称 RpcServer)对象赋值给成员 server。该 server 会在 driver 启动时被 driver 中的 rpc client 连接并告知 driver 中的 RpcServer 的信息,以便之后 client 端可以通过该信息向 driver 中的 RpcServer 发起连接及请求。由于 driver 可能被 yarn 调度到任何一个节点启动,所以无法由 LivyServer 主动与 driver 建立连接,而是预先在 client 端建立好 RpcServer 等待 driver 来连接。
另外,RpcServer 与 rpc client 是通过一个由 RpcServer 自身生成的 secret 进行匹配的。要能让 driver 连接到该 RpcServer,还需要知道 LivyServer 的 host 和 port,这这些信息都是通过 conf 传给 driver 的,在 ContextLauncher
构造函数中实现:
// 生成 client id
this.clientId = UUID.randomUUID().toString();
// 由server 生成 secret
this.secret = factory.getServer().createSecret();
...
conf.set(LAUNCHER_ADDRESS, factory.getServer().getAddress());
conf.set(LAUNCHER_PORT, factory.getServer().getPort());
conf.set(CLIENT_ID, clientId);
conf.set(CLIENT_SECRET, secret);
这些配置最终也将作为启动 driver 的 conf 的一部分传给 driver,这样 driver 在启动后就知道 client 中的 RpcServer 的地址和 secret 了
3.2:driver 连接 client 并传递其 RpcServer 信息
该过程在 RSCDriver#initializeServer
中实现,是 seesion driver 的初始化步骤
3.3:client 接收 driver rpcServer 地址信息并连接
在 client 传递其 RpcServer 信息给 driver
之前已经为 RSCClientFactory
对象的成员 server: RpcServer
注册了 client 以及相应 client 成功连接的处理函数:
final RegistrationHandler handler = new RegistrationHandler();
factory.getServer().registerClient(clientId, secret, handler);
这里的 clientId、secret 即 3.1 小节中传递给 driver 的。Registration
类用来处理 driver 端的 rpcClient 连接到 server 时的处理逻辑,即:
private void handle(ChannelHandlerContext ctx, RemoteDriverAddress msg) {
ContextInfo info = new ContextInfo(msg.host, msg.port, clientId, secret);
if (promise.trySuccess(info)) {
timeout.cancel(true);
}
}
参数 RemoteDriverAddress msg
即在 3.2 小节中 driver 中的 rpcClient 发送给 server 的 driver 中 rpcServer 的地址(包括 address、host),之后再结合 clientId、serrect 来构造 ContextInfo info
来触发 promise.trySuccess(info)
,info 表名了 driver 中 rpcServer 的地址已经发起连接需要的 clientId、secret,这与 3.2 小节中 driver 中的 rpcServer 注册的 client 信息相符。
在创建 RSCClient 对象时会在 promise 上 add 相应的 listener,promise.trySuccess(info)
会触发 onSuccess(ContextInfo info)
进而调用 connectToContext(info)
:
Utils.addListener(this.contextInfoPromise, new FutureListener<ContextInfo>() {
@Override
public void onSuccess(ContextInfo info) throws Exception {
connectToContext(info);
...
}
@Override
public void onFailure(Throwable error) {
connectionError(error);
...
}
})
在 connectToContext(info)
方法中会使用拿到的 driver 端 rpcServer 的连接信息发起连接得到 driverRpc,即用于向 driver 端 rpcServer 发送 rpc 调用的 client,这是 RSCClient 的成员,之后 RSCClient 和 driver 之间的通信都通过 driverRpc 来进行。
四:Session 的创建与初始化
在与 driver 建立连接之后,会使用 rscClient、livyConf 等信息来创建 InteractiveSession 对象并进行初始化,流程如上。初始化过程汇总,比较关键的步骤是将 session 信息存储到 state store 中以便livy server 挂掉后能进行 recovery;再就是向 driver 发送一个空的 PingJob 来确定 driver 的状态是否 ok,若 PingJob 成功执行,则说明 driver 状态 ok,将 session 置为 running 状态;若出错或失败,则说明 driver 出了一些问题,则将 session 的状态置为 error。
在成功完成 session 的创建及初始化后,会将 session 添加到 SessionManager 中进行统一管理。SessionManager 的主要职责包括:
- 持有所有 sessions
- 清理过期 session
- 从 state store 中恢复 sessions