前置知识:
前置知识1:Actor的创建
Props代表的是一个不可变的配置类,创建Actor时,必须要用到;它由两部分构成
- 创建的Actor类:akkaRpcActorType,它是Class类型
- 调用上述Class的构造方法使用的参数:剩余的参数
比如,如下代码用来创建持有通信实体TaskExecutor引用的Actor
Props.create(AkkaRpcActor.class,
TaskExecutor实例,
actorTerminationFuture,
getVersion(),
configuration.getMaximumFramesize())
看看AkkaRpcActor的构造方法,剩余参数与构造方法入参一一对应上了
AkkaRpcActor(
final T rpcEndpoint,
final CompletableFuture<Boolean> terminationFuture,
final int version,
final long maximumFramesize)
每个Actor在创建时都会返回一个ActorRef,用来与Actor通信。
前置知识2:flink使用的Actor
- flink中所有的通信实体,都需要继承RpcEndpoint抽象类;flink中常见的组件,如JobMaster、Dispatcher、ResourceManager、TaskExecutor等都直接或间接继承自RpcEndpoint。在flink使用的Actor有如下几种:
- SupervisorActor:用来创建AkkaRpcActor、FencedAkkaRpcActor,它们用来接收对应的ActorRef发送的消息
- AkkaRpcActor:RpcEndpoint使用的actor
- FencedAkkaRpcActor: FencedRpcEndpoint使用的actor,相较于RpcEndpoint增加了防护令牌,FencedAkkaRpcActor继承自 AkkaRpcActor
前置知识3:动态代理
RPC过程
-
那么RpcEndpoint与Actor的关系是怎样的?
如上图,每个Actor都持有RpcEndpoint的引用,当Actor接收到具体RPC消息后,会调用底层的RpcEndpoint实现类来干活。整个RPC流程如下:
- 构建代理$Proxy,它是发送消息的入口,这个过程等会再描述
- 用户调用$Proxy的方法干活,实际会调用AkkaInvocationHandler的invoke方法,handler就能获取到对应的方法签名与传参,将<方法签名,传参>封装为消息,通过ActorRef发送给AkkaActor
- 利用反射获取底层的RpcEndpoint实现类的Class对象,通过<方法签名,传参Class对象>就可以获取具体的实现,然后执行即可
- 如果需要返回,则返回结果给ActorRef
- 那Actor会处理什么信息?每个Actor都会重写AbstractActor的
createReceive
方法,当该actor接收到信息时,会根据消息的类型调用相应的方法来进行处理,比如SupervisorActor的实现:
@Override
public Receive createReceive() {
return receiveBuilder()
// 偏函数,当消息的类型是StartAkkaRpcActor,则使用createStartAkkaRpcActorMessage方法来处理
.match(StartAkkaRpcActor.class, this::createStartAkkaRpcActorMessage)
// 否则,对于其它的任意类型,都使用handleUnknownMessage方法来处理
.matchAny(this::handleUnknownMessage)
.build();
}
问题1:对于本地的RPC,$Proxy怎么构建?
最典型的场景莫过于JobMaster的SlotPoolImpl向ResourceManager请求资源了。整个过程如下
- 在JobMaster创建时,会传入ResourceManager的actor地址targetAddress
- 当JobMaster请求资源时,AkkaRpcService会通过targetAddress创建连接ResourceManager的ActorRef
- 然后通过该ActorRef就可以构建FencedAkkaInvocationHandler,然后构建代理,由于代理也实现了ResourceManagerGateway,该代理就被强转为大家看到的ResourceManagerGateway。当在看代码的时候,发现以GateWay结尾(通常都实现了FencedRpcGateway或RpcGateway接口)的基本都是上图中的$Proxy,它们是发送消息给Actor的入口
- SlotPoolImpl就可以通过ResourceManagerGateway与ResourceManager通信了
问题2:对于远程的RPC,$Proxy怎么构建?
最典型的场景莫过于TaskExecutor(即文档中说的TaskManager,用来提供slot与执行任务的,它也是RpcEndpoint)启动时调用自己的onStart方法向ResourceManager注册。TaskExecutor持有了ResourceManager的地址,过程和上面一致
问题2:消息怎么返回?返回给谁?
ActorRef就是客户端,也是返回给它
// rpc,就像在本地调用一样
final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
// rpcEndpoint就是目标通信实体的ActorRef
protected CompletableFuture<?> ask(Object message, Time timeout) {
return FutureUtils.toJava(
Patterns.ask(rpcEndpoint, message, timeout.toMilliseconds()));
}