目标:客户端远程调用服务端的一项服务,具体来说,客户端给服务端指定具体的类,方法,和参数信息,服务端用这些信息完成服务,并将调用结果或异常返回给客户端。
这个例子中,客户端想要调用的服务是 HelloService
public interface HelloService {
String sayHello(String name);
}
具体实现为 HelloServiceImpl
/**
* 给参数中的名字返回打招呼
*/
public class HelloServiceImpl implements HelloService{
@Override
public String sayHello(String name) {
return "您好," + name;
}
}
首先,客户端与服务端的这些通讯信息需要载体,我们把他们封装成两个类,客户端给服务端发送的远程调用Request,和服务端返回的Response。从设计考虑,让他们都继承与Message这个类
Message
@Data
public abstract class Message implements Serializable {
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
public static Class<?> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
static {
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
客户端的request类,包含了客户端想要调用的服务的一切信息(全类名、方法名、参数...)
RpcRequestMessage
@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
// 接口的全限定名
private String interfaceName;
// 调用的方法名
private String methodName;
// 方法的返回值类
private Class<?> returnType;
// 方法的参数类型数组
private Class[] parameterTypes;
// 方法的参数值数组
private Object[] parameterValue;
// 一个可以设置sequenceID的构造器
public RpcRequestMessage(int sequenceID, String interfaceName, String methodName,
Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceID);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return Message.RPC_MESSAGE_TYPE_REQUEST;
}
}
服务端Request类,携带服务的返回值或异常值 RpcResponseMessage
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
// 返回值
private Object returnValue;
// 异常值
private Exception exceptionValue;
@Override
public int getMessageType() {
return Message.RPC_MESSAGE_TYPE_RESPONSE;
}
}
好了,我们现在有了信息的载体,如何在客户端和服务端之间进行网络通信呢,这里使用了Netty框架
首先编写服务侧:RpcServer
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class)
.group(boss, worker)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder()) // 处理粘包半包
.addLast(LOGGING_HANDLER) // 日志
.addLast(MESSAGE_CODEC) // 自定义协议 消息编解码
.addLast(RPC_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error",e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
由于本篇旨在实现rpc,netty的基本内容就不再赘述,代码中的一些自定义的handler(处理粘包半包问题的、编解码协议的)就不再一一实现了。服务侧我们只关注一个关键的handler -- RpcRequestMessageHandler
这个handler专门负责处理从客户端来的 RpcRequestMessage
。
处理思路为
- 先在封装的request信息中拿到所需服务的全类名,然后根据全类名拿到服务的实现类
- 拿到所需服务的方法、参数类型、参数值、返回类型
- 使用反射调用方法,拿到返回值或异常,封装成response类,返回客户端
由于没有整合Spring,需要手写一个从接口类拿到实现类的工具,具体为:
首先,在配置文件(application.properties)中,将服务的接口类和实现类绑定
# rpc bean
com.yldog.rpc.HelloService=com.yldog.rpc.HelloServiceImpl
然后一个工厂
public class ServicesFactory {
static Properties properties;
static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
static {
try {
InputStream in = MyConfig.class.getResourceAsStream("/application.properties");
properties = new Properties();
properties.load(in);
Set<String> names = properties.stringPropertyNames();
for (String name : names) {
if (name.endsWith("Service")) {
// 拿到接口类Class对象
Class<?> interfaceClass = Class.forName(name);
// 拿到接口的实现类Class对象
Class<?> instanceClass = Class.forName(properties.getProperty(name));
map.put(interfaceClass, instanceClass.getDeclaredConstructor().newInstance());
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 根据接口类获得实现类
public static <T> T getService(Class<T> interfaceClass) {
return (T) map.get(interfaceClass);
}
}
最后就能编写服务端的handler了
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception {
// 构造一个响应实例
RpcResponseMessage resp = new RpcResponseMessage();
// 设置响应消息的序号 -- 应与请求消息序号一致
resp.setSequenceId(msg.getSequenceId());
try {
// 通过request message拿到接口类,在通过接口类拿到实现类
HelloService service = (HelloService) ServicesFactory.getService(Class.forName(msg.getInterfaceName()));
// 拿到方法
Method method = service.getClass().getDeclaredMethod(msg.getMethodName(), msg.getParameterTypes());
// 在刚才拿到的实现类上调用方法
Object result = method.invoke(service, msg.getParameterValue());
resp.setReturnValue(result);
} catch (Exception e) {
String message = e.getCause().getMessage();
resp.setExceptionValue(new Exception("远程调用异常:" + message));
} finally {
ctx.writeAndFlush(resp);
}
}
}
接着编写客户端
首先要实现客户端给服务端发远程调用请求。
第一步是先要拿到与服务端联络的 SocketChannel
,再使用channel来进行远程调用
@Slf4j
public class RpcClientManager {
private static volatile Channel channel = null;
private static final Object LOCK = new Object();
// 使用DCL保证channel单例
public static Channel getChannel() {
if (channel != null) {
return channel;
}
synchronized (LOCK) {
if (channel != null) {
return channel;
}
initChannel();
return channel;
}
}
// 初始化channel,只能初始化一次
private static void initChannel() {
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_HANDLER = new MessageCodecSharable();
RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(worker)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtocolFrameDecoder())
.addLast(LOGGING_HANDLER)
.addLast(MESSAGE_HANDLER)
.addLast(RPC_HANDLER);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
// 初始化channel后不能在这里阻塞住,所以采用异步
// channel.closeFuture().sync();
channel.closeFuture().addListener(future -> { worker.shutdownGracefully(); });
} catch (InterruptedException e) {
log.debug("Client Error", e);
}
}
}
注意:
-
getChannel()
就可以拿到与服务端通信的Channel,由于使用了DLC单例模式,保证了channel的唯一性 - 在处理连接关闭时,不能使用
channel.closeFuture().sync();
否则代码阻塞在了initChannel()
中,要使用异步的处理方式
这样,客户端就能向服务端发送request消息了,具体为:
public static void main(String[] args) {
getChannel().writeAndFlush(new RpcRequestMessage(
1,
"com.yldog.rpc.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"张三"}
));
}
但是这种让用户一个个填写参数的方式,非常的不友好,理想的方式应该是用户像在本地调用方法一般,如
service.sayHello("张三")
,这里的思路是使用代理模式将构建请求消息这一步封装起来,具体为编写一个构造代理类的方法:
// 创建代理类
public static <T> T getProxyService(Class<T> serviceClazz) {
ClassLoader classLoader = serviceClazz.getClassLoader();
Class<?>[] interfaces = {serviceClazz};
Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
// 1. 将方法调用 转为 消息对象
int sequenceID = SequenceIdGenerator.nextId();
RpcRequestMessage msg = new RpcRequestMessage(
sequenceID,
serviceClazz.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2. 发送消息
getChannel().writeAndFlush(msg);
// 暂时不考虑如何接收返回值
});
return (T) o;
}
这样,用户的使用就变成了:
public static void main(String[] args) {
// getChannel().writeAndFlush(new RpcRequestMessage(
// 1,
// "com.yldog.rpc.HelloService",
// "sayHello",
// String.class,
// new Class[]{String.class},
// new Object[]{"张三"}
// ));
// 以上这种远程调用的方法非常不友好,考虑使用代理封装
HelloService proxyService = getProxyService(HelloService.class);
proxyService.sayHello("张三");
proxyService.sayHello("李四");
proxyService.sayHello("王五");
}
最后,我们要考虑的就是客户端如何接收服务端的返回消息,由于在netty中,调用远程服务的线程与收到返回结果的线程并不是一个线程,接收返回消息的线程一般在 nioEventLoop
中,所以想要在调用服务的线程中获取服务的结果,就涉及到了线程之间异步交换信息,具体实现思路为
- 发起调用的线程在发送了Request请求信息后,准备一个空的Promise,然后等待nio线程将服务端返回的结果放入Promise中
- nio线程得到返回的结果后,将结果放入Promise中,并通知调用的线程,实现异步
首先编写处理服务器响应信息的handler
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
/**
* 一个用来传递线程之间异步结果的容器集合
* 某个线程进行远程调用后,开启一个Promise容器,Nio线程收到远程结果后,将结果放入那个线程的Promise中
*/
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
log.debug("{}", msg);
// 拿到属于这次消息接收的空的promise
Promise<Object> promise = PROMISES.remove(msg.getSequenceId());
// 往promise放结果
if (promise != null) {
if ((msg.getExceptionValue() == null)) { // 若异常值为空,则正常调用
promise.setSuccess(msg.getReturnValue());
} else { // 异常值不为空,则调用异常
promise.setFailure(msg.getExceptionValue());
}
}
}
}
然后完整的客户端代码为
@Slf4j
public class RpcClientManager {
private static volatile Channel channel = null;
private static final Object LOCK = new Object();
public static void main(String[] args) {
// getChannel().writeAndFlush(new RpcRequestMessage(
// 1,
// "com.yldog.rpc.HelloService",
// "sayHello",
// String.class,
// new Class[]{String.class},
// new Object[]{"张三"}
// ));
// 以上这种远程调用的方法非常不友好,考虑使用代理封装
HelloService proxyService = getProxyService(HelloService.class);
proxyService.sayHello("张三");
proxyService.sayHello("李四");
proxyService.sayHello("王五");
}
// 创建代理类
public static <T> T getProxyService(Class<T> serviceClazz) {
ClassLoader classLoader = serviceClazz.getClassLoader();
Class<?>[] interfaces = {serviceClazz};
Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
// 1. 将方法调用转为 消息对象
int sequenceID = SequenceIdGenerator.nextId();
RpcRequestMessage msg = new RpcRequestMessage(
sequenceID,
serviceClazz.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
// 2. 发送消息
getChannel().writeAndFlush(msg);
// 3. 准备一个这次接收消息专用的Promise对象 指定promise异步接收结果的线程
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
// 放入promise集合中,Nio线程取用时,用sequenceId作为key
RpcResponseMessageHandler.PROMISES.put(sequenceID, promise);
// 4. 等待 Nio线程将返回的结果放入 promise 中
promise.await();
// 5. 拿到结果后,判断调用是否正常
if (promise.isSuccess()) {
// 调用成功
return promise.getNow();
} else {
// 调用异常
throw new RuntimeException(promise.cause());
}
});
return (T) o;
}
// 使用DCL保证channel单例
public static Channel getChannel() {...}
// 初始化channel,只能初始化一次
private static void initChannel() {...}
}