网络通信模块是分布式系统的底层基础,支撑上层分布式环境下复杂的进程间通信。远程过程调用(RPC,Remote Procedure Call)是一种常见的分布式网络通信协议。RPC 允许运行与一台机器的程序调用另一台机器的子程序,同时将网络细节屏蔽起来,大大简化了分布式程序的开发。
Hadoop RPC 简介
Hadoop 实现了自己的 RPC 通信协议,是上层分布式子系统(HDFS、MapReduce、HBase 等)公用的网络通信模块。Hadoop RPC 具有以下特点:
- 透明性。所有RPC框架的基本特性,对用户屏蔽了网络通信过程。
- 高性能。Hadoop 各个子系统均采用 Master/Slave 架构,Master 作为一个 RPC Server,负责处理所有 Slave 发送的请求,需要能够高效的处理多个并发 RPC 请求。
- 可控性。JDK 自带的 RPC 框架(RMI)过于重量级,用户可控之处太少,如:网络连接、超时和缓存等难以修改。因此 Hadoop 实现了轻量级的可控性更强的 RPC 框架。
Hadoop RPC 架构
Hadoop RPC 与其他 RPC 框架一样主要由四个部分组成:序列化层、函数调用层、网络传输层、服务端处理框架。
- 序列化层,将结构化数据转换为字节流,便于通过网络传输或进行持久化。
- 函数调用层,定位要调用的函数并执行函数。Hadoop RPC 采用 Java 反射和动态代理实现函数调用。
- 网络传输层,描述了 Client 和 Server 之间消息传输的方式。Hadoop RPC 采用基于 TCP/IP 的 Socket 机制。
- 服务端处理框架,可以抽象为网络I/O模型,直接决定了服务器端的并发处理能力。常见的有:阻塞式I/O、非阻塞式I/O、事件驱动I/O等,Hadoop RPC 采用基于 Reactor 设计模式的事件驱动I/O模型。
Hadoop RPC 总体架构如图,自下而上分为两层:第一层是基于 Java NIO 实现的 Client/Server 通信模型;第二层是供上层应用调用的 RPC 接口。
Hadoop RPC 使用
首先定义 RPC 协议,RPC 协议是客户端和服务端的通信接口,定义了服务器端对外提供的服务接口。
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
// 版本号,默认情况下不通版本号的 Client 和 Server 不能通信
public static final long versionId = 1L;
String echo(String value) throws IOException;
int add(int v1, int v2) throwd IOException;
}
实现 RPC 协议,Hadoop RPC 协议通常是一个 Java 接口。
public static class ClientProtocolImpl implemenets ClientProtocol {
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
return ClientProtocol.versionId;
}
public String echo(String value) throws IOException {
return value;
}
public int add(int v1, int v2) throwd IOException {
return v1 + v2;
}
}
构造并启动 RPC Server,使用 getServer()
方法构造 RPC Server,并启动
// serverHost 和 serverPort 表示服务器的 host 和端口
// numHandlers 表示服务器端处理请求的线程数
server = RPC.getServer(new ClientProtocolImpl(), serverHost, serverPort,
numHandlers, false, conf);
server.start();
构造 RPC CLient,并发送 RPC 请求。使用 getProxy()
方法构造客户端代理,通过代理对象调用远程服务器端方法
proxy = (ClientProtocol) RPC.getProxy(ClientProtocol.class, new ClientProtocolImpl(),
ClientProtocol.versionId, addr, conf);
int result = proxy.add(5, 6);
String echoResult = proxy.echo("hello");
经过上面四个步骤,便利用 Hadoop RPC 构建了一个简单的 Client/Server 网络模型。
深入理解 Hadoop RPC
Hadoop RPC 主要由三个类组成:RPC、Client、Server,分别对应接口、客户端实现、服务器端实现。
ipc.RPC
RPC 类是对底层客户机/服务器网络模型的封装,以便为开发人员提供方便简洁的编程接口。
RPC 类定义了一个内部类 RPC.Server,继承 Server 抽象,并利用反射机制实现了 call 接口。RPC 类包含一个 ClientCache 类型的成员根据用户提供的 SocketFactory 缓存 Client(重用 Client)。
与本地执行反射调用不通的是,RPC 函数调用时(执行 invock 方法),需要将函数调用信息(函数名、参数列表等)打包成可序列化对象 Invocation,通过网络发送给服务器端,服务器端接收到后,根据这些信息在利用反射机制完成函数调用。
ipc.Client
Client 类主要完成的功能是发送远程过程调用信息,并接收执行结果。Client 类内部有两个重要的类:Call 和 Connection。
Call 封装了一个 RPC 请求,包含唯一标识Id、函数调用信息、函数返回信息、错误信息、执行完成标识。Connection 封装了 Client 与每个 Server 之前的连接信息。包括通信连接唯一标识RemoteId、Socket、网络输入输出数据流、RPC请求等。
ipc.Server
Hadoop 采用 Master/Slave 结构,Master(NameNode、JobTracker)是整个系统的单点,是系统的性能和扩展瓶颈之一。Master 通过 ipc.Server 接收并处理所有 Slave 发送的请求,这要求 ipc.Server 将高并发和可扩展性作为设计目标。
ipc.Server 采用了多种提高并发处理能力的技术,包括:线程池、事件驱动和 Reactor 设计模式等。均采用 JDK 自带的库实现。下面着重介绍 Reactor 设计模式如何提高整体性能。
Reactor 是并发编程中一种基于事件驱动的设计模式,具有以下两个特点:
- 通过派发、分离I/O操作事件提高系统的并发性能
- 提高粗粒度的并发控制,单线程实现,避免复杂的同步处理
典型的 Reactor 实现原理图如下:
主要包括以下几个角色:
- Reactor,IO事件的派发者
- Acceptor,接收来自 Client 的连接,建立与 Client 对应的 Handler,并向 Reactor 注册 Handler
- Handler,与 Client 通信的实体,实现业务的处理,内部会进一步划分为:read、decode、compute、encode、send 等过程,
- Reader/Sender,为了加速处理速度,通过构建线程池,存放数据处理线程,数据读出后在线程池中等待后续处理即可。因此一般会分离 Handler 的读和写的过程,非别注册为读和写事件由 Reader 和 Sender 处理。
ipc.Server 实现了一个典型的 Reactor 模式,整体架构基本与上述一致。ipc.Server 被划分为三个阶段:接收请求、处理请求和返回结果
接收请求
接收来自各个客户端的请求,封装为 Call 对象,放入共享队列(callQueue)。其中 Listener 负责监听请求,整个 Server 只有一个 Listener。一旦有新的请求到达,会轮询的方式从线程池中选择一个 Reader 处理。Selector 对象负责监听相关事件。
处理请求
从共享队列中获取 Call 对象,执行对应的函数调用,由多个 Handler 并行完成。Handler 会尝试将结果返回给客户端,但是考虑到某些函数调用返回的结果很大或网络慢等原因,可能很难一次性将结果发送到客户端,Handler 会尝试将后续发送任务交给 Responder 处理。
返回结果
Server 端只有一个 Responder,负责处理 Handler 的结果返回给客户端,Selector 对象负责监听相关事件。
Hadoop RPC 参数
Hadoop RPC 提供了一些可配置参数:
参数 | 说明 |
---|---|
ipc.server.read.threadpool.size | Reader 线程数 默认值:1 |
ipc.server.handler.queue.size | 每个 Handler 对应最大 Call 数量,会影响 Call 队列长度 默认值:100 |
mapred.job.tracker.handler.count dfs.namenode.service.handler.count |
JobTracker 和 NameNode 中 Handler 数量 默认值:10 |
ipc.client.connect.max.retries | 客户端最大重试次数,间隔1s 默认值:10 |
《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》