Apache Flink源码解析 (七)Flink RPC的底层实现

Prerequisites


  • Flink的RPC服务是基于Akka Remote实现的。一个简单的Akka Remoting ActorSystem的配置如下:
    akka {
      actor {
        provider = remote
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 2552
        }
     }
    }
    
    • 从这份配置文件可以看出,要建立一个ActorSystem,首先需要提供ActorSystem运行的机器的地址和端口。

  • 参考Akka Remoting的文档,获取远程节点的Actor有两条途径。
    • 第一条,通过actorSelection(path),在这儿需要知道远程节点的地址。获取到了ActorSelection就已经可以发送消息过去,也可以通过回信获取这个Actor的ActorRef。
    • 第二条,通过配置,配置文件如下。通过这种方式,远程系统的daemon会被请求建立这个Actor,ActorRef可以直接通过system.actorOf(new Props(...)获取。
    akka {
      actor {
        deployment {
          /sampleActor {
            remote = "akka.tcp://sampleActorSystem@127.0.0.1:2553"
          }
        }
      }
    }
    

定义RPC协议


  • RPC协议是客户端和服务端的通信接口。如下所示定义了一个BaseGateway的通信接口。
      public interface BaseGateway extends RpcGateway {
          CompletableFuture<Integer> foobar();
      }
    

  • 在Flink中,RPC协议的定义通过实现RpcGateway.
    public interface RpcGateway {
        /**
         * Returns the fully qualified address under which the associated rpc endpoint is reachable.
         *
         * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable
         */
        String getAddress();
    
        /**
         * Returns the fully qualified hostname under which the associated rpc endpoint is reachable.
         *
         * @return Fully qualified hostname under which the associated rpc endpoint is reachable
         */
        String getHostname();
    }
    

  • 这个接口需要实现两个方法,分别是getAddress和getHostname。原因如下:
    • 如上文所述,想要通过ActorSystem获取远程Actor,必须要有地址。而在Flink中,例如Yarn这种模式下,JobMaster会先建立ActorSystem,这时TaskExecutor的Container都还没有分配,自然无法在配置中指定远程Actor的地址,所以一个远程节点提供自己的地址是必须的。

实现RPC协议


  • Flink的RPC协议一般定义为一个Java接口,服务端需要实现这个接口。如下是上面定义的BaseGateway的实现。
      public static class BaseEndpoint extends RpcEndpoint implements BaseGateway {
    
          private final int foobarValue;
    
          protected BaseEndpoint(RpcService rpcService, int foobarValue) {
              super(rpcService);
    
              this.foobarValue = foobarValue;
          }
    
          @Override
          public CompletableFuture<Integer> foobar() {
              return CompletableFuture.completedFuture(foobarValue);
          }
    
          @Override
          public CompletableFuture<Void> postStop() {
              return CompletableFuture.completedFuture(null);
          }
      }
    

    • RpcEndpoint是rpc请求的接收端的基类。RpcEndpoint是通过RpcService来启动的。

构造并启动RpcService


  • RpcService会在每一个ClusterEntrypoint(JobMaster)和TaskManagerRunner(TaskExecutor)启动的过程中被初始化并启动。
  • RpcService主要负责启动RpcEndpoint(也就是服务端),连接到远程的RpcEndpoint并提供一个代理(也就是客户端)。
  • 此外,为了防止状态的concurrent modification,RpcEndpoint上所有的Rpc调用都只会运行在主线程上,RpcService提供了运行在其它线程的方法。

构造并启动RpcEndpoint(服务端)

  • 每一个RpcEndpoint在初始化阶段会通过该节点的RpcService的startServer方法来初始化服务。
    • 在该方法中创建了一个Akka的Actor,这个Actor也是Rpc调用的实际接收者,Rpc的请求会在客户端被封装成RpcInvocation对象以Akka消息的形式发送。
      @Override
      public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
          ...
          // 新建一个包含了rpcEndpoint的AkkaRpcActor,负责接收封装成Akka消息的rpc请求
              akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
          ...
              actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
          ...
      }
    
    
    • 接下来生成一个本地的InvocationHandler,用于将调用转换成消息发送到相应的RpcEndpoint(具体细节在下一节发送Rpc请求会详细介绍)
          ...
          // 获取这个Endpoint的所有Gateway,也就是所有RPC协议的接口
          Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
          ...
          // 新建一个InvocationHandler,用于将rpc请求包装成LocalRpcInvocation消息并发送给RpcServer(本地)
    
          final InvocationHandler akkaInvocationHandler;
          ...
              akkaInvocationHandler = new AkkaInvocationHandler(akkaAddress, hostname, actorRef, timeout, maximumFramesize, terminationFuture);
          ...
    
    • 通过Rpc接口和InvocationHandler构造一个代理对象,这个代理对象存在RpcEndpoint的RpcServer变量中,是给RpcEndpoint所在的JVM本地调用使用
          // 生成一个包含这些接口的代理,将调用转发到InvocationHandler
          @SuppressWarnings("unchecked")
          RpcServer server = (RpcServer) Proxy.newProxyInstance(classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler);
    
          return server;
      }
    

  • 启动RpcEndpoint
    • 实际上就是启动构造阶段生成的RpcServer的start方法,这个方法由AkkaInvocationHandler实现,实际上就是向绑定的RpcEndpoint的Actor发送一条START消息,通知它服务已启动。

构造Rpc客户端

  • Rpc的客户端实际上是一个代理对象,构造这个代理对象,需要提供实现的接口和InvocationHandler,在Flink中有AkkaInvocationHandler的实现。
  • 在构造RpcEndpoint的过程中实际上已经生成了一个供本地使用的Rpc客户端。并且通过RpcEndpoint的getSelfGateway方法可以直接获取这个代理对象。
  • 而在远程调用时,则通过RpcService的connect方法获取远程RpcEndpoint的客户端(也是一个代理)。connect方法需要提供Actor的地址。(至于地址是如何获得的,可以通过LeaderRetrievalService,在这个部分不多做介绍。)
    <C extends RpcGateway> CompletableFuture<C> connect(
        String address,
        Class<C> clazz);
    
    • 首先通过地址获取ActorSelection,在Prerequisite中也介绍了这是连接远程Actor的方法之一
    private <C extends RpcGateway> CompletableFuture<C> connectInternal(
            final String address,
            final Class<C> clazz,
            Function<ActorRef, InvocationHandler> invocationHandlerFactory) {
        ...
        // 通过地址获取ActorSelection, 并获取ActorRef引用
        final ActorSelection actorSel = actorSystem.actorSelection(address);
    
    • 通过ActorSelection获取ActorRef并发送握手消息
        final Future<ActorIdentity> identify = Patterns
            .ask(actorSel, new Identify(42), timeout.toMilliseconds())
            .<ActorIdentity>mapTo(ClassTag$.MODULE$.<ActorIdentity>apply(ActorIdentity.class));
        final CompletableFuture<ActorIdentity> identifyFuture = FutureUtils.toJava(identify);
        final CompletableFuture<ActorRef> actorRefFuture = identifyFuture.thenApply(
        ...
        // 发送handshake消息
        final CompletableFuture<HandshakeSuccessMessage> handshakeFuture = actorRefFuture.thenCompose(
            (ActorRef actorRef) -> FutureUtils.toJava(
                Patterns
                    .ask(actorRef, new RemoteHandshakeMessage(clazz, getVersion()), timeout.toMilliseconds())
                    .<HandshakeSuccessMessage>mapTo(ClassTag$.MODULE$.<HandshakeSuccessMessage>apply(HandshakeSuccessMessage.class))));
    
    • 最后根据ActorRef,通过InvocationHandlerFactory生成AkkaInvocationHandler并构造代理
        // 根据ActorRef引用生成InvocationHandler
        return actorRefFuture.thenCombineAsync(
            handshakeFuture,
            (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
                InvocationHandler invocationHandler = invocationHandlerFactory.apply(actorRef);
                ClassLoader classLoader = getClass().getClassLoader();
                @SuppressWarnings("unchecked")
                C proxy = (C) Proxy.newProxyInstance(
                    classLoader,
                    new Class<?>[]{clazz},
                    invocationHandler);
    
                return proxy;
            },
            actorSystem.dispatcher());
    }
    
    • 而invocationHandlerFactory.apply方法如下
    Tuple2<String, String> addressHostname = extractAddressHostname(actorRef);
    return new AkkaInvocationHandler(addressHostname.f0, addressHostname.f1, actorRef, timeout, maximumFramesize, null);
    

发送Rpc请求

  • 上文中客户端会提供代理对象,而代理对象会调用AkkaInvocationHandler的invoke方法并传入Rpc调用的方法和参数信息。
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
  • 而在AkkaInvocationHandler对该方法的实现中,会判断方法属于哪个类,如果是Rpc方法的话就会调用invokeRpc方法。

    • 首先将方法封装成一个RpcInvocation,它有两种实现,一种是本地的LocalRpcInvocation,不需要序列化,另一种是远程的RemoteRpcInvocation。根据当前AkkaInvocationHandler和对应的RpcEndpoint是否在同一个JVM中来判断生成哪一个。
      private Object invokeRpc(Method method, Object[] args) throws Exception {
          ...
          final RpcInvocation rpcInvocation = createRpcInvocationMessage(methodName, parameterTypes, args);
      
    • 根据返回类型判断使用tell还是ask的形式发送akka消息
          Class<?> returnType = method.getReturnType();
          final Object result;
          if (Objects.equals(returnType, Void.TYPE)) {
              tell(rpcInvocation);
              result = null;
          } else if (Objects.equals(returnType, CompletableFuture.class)) {
              // execute an asynchronous call
              result = ask(rpcInvocation, futureTimeout);
          } else {
              // execute a synchronous call
              CompletableFuture<?> futureResult = ask(rpcInvocation, futureTimeout);
              result = futureResult.get(futureTimeout.getSize(), futureTimeout.getUnit());
          }
          return result;
      }
      

Rpc请求的处理


  • 首先Rpc消息是通过RpcEndpoint所绑定的Actor的ActorRef发送的,所以接收到消息的就是RpcEndpoint构造期间生成的AkkRpcActor
    akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture, getVersion());
    actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
    

  • AkkaRpcActor接收到的消息总共有三种
    • 一种是握手消息,如上文所述,在客户端构造时会通过ActorSelection发送过来。收到消息后会检查接口,版本,如果一致就返回成功
    • 第二种是启停消息。例如在RpcEndpoint调用start方法后,就会向自身发送一条Processing.START消息,来转换当前Actor的状态为STARTED。STOP也类似。并且只有在Actor状态为STARTED时才会处理Rpc请求
    • 第三种就是Rpc请求消息,通过解析RpcInvocation获取方法名和参数类型,并从RpcEndpoint类中找到Method对象,并通过反射调用该方法。如果有返回结果,会以Akka消息的形式发送回sender。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,642评论 18 139
  • 持久化 当我们在集群系统中,一台机器向另一台机器发送一段数据,负责接收的机器在接收数据前突然宕机,就会造成数据丢失...
    mango_knight阅读 4,525评论 0 4
  • Spark Network 模块分析 为什么用Netty通信框架代替Akka 一直以来,基于Akka实现的RPC通...
    Alex90阅读 2,782评论 0 3
  • 今天分布式应用、云计算、微服务大行其道,作为其技术基石之一的 RPC 你了解多少?一篇 RPC 的技术总结文章,数...
    零一间阅读 1,888评论 1 46
  • 知识分类 分为,显性,隐性知识 第2种分为,常识,经验性知识,神话传说,科学知识,科学,艺术,宗教 第3种分为,事...
    duduwa阅读 170评论 0 0