设计思路
RPC:远程过程调用,像是调用本地代码一样来调用远程的服务。所以一个简单的RPC至少包括两个角色:
- 服务提供方
- 服务调用方
服务调用方像调用本地代码一样调用远程服务,自然得有远程服务的接口信息。而服务提供方需要来实现接口提供服务返回到调用方。这样问题就变成了:
- 服务调用方如何通过服务接口将请求给到服务提供方?
- 服务提供方如何将数据回传到服务提供方?
先看第一个问题,因为服务在远程,我们可以告诉远程服务我们调用的是哪个接口,用的哪些参数就可以了。自然我们会想到使用动态代理,使用代理增强来接口方法,达到传输接口名参数的目的。服务端接收到这些数据后通过反射来执行。
Netty的简单描述
Netty是基于nio的网络编程框架。java nio由来已久,不过用的不多,原因在于使用原生的nio进行网络编程上手难度有些大。Netty对原生nio做了封装,原生的nio的核心组件selector来管理通道,而Netty则为Reactor。一个Reactor负责接收客户端的请求,同时也负责将不同的客户端请求放入到Channel中,这个便是Netty Reactor的单线程模型。最常用的还是主从模型:在多线程的基础上,让处理客户端的线程也变为多线程,一组线程池接收请求,一组线程池处理IO。做到了前面多个服务员,后面多个厨子的模式。
Reactor模型最为核心的是两个线程池,Netty使用 NioEventLoopGroup 来初始化线程池。一个 NioEventLoopGroup 下包含多个 NioEventLoop,NioEventLoop封装了selector,用于注册niochannel,每个 NioChannel 都绑定有一个自己的 ChannelPipeline。
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理。BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来,通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。
另外一个比较重要的类便是ChannelHandler,ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具体的业务逻辑。
代码实现
服务端代码:
public class NettyRPCServer {
private static final int PORT = 9090;
public void run(){
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap boot = new ServerBootstrap();
boot.group(bossGroup, workerGroup).
channel(NioServerSocketChannel.class).
option(ChannelOption.SO_BACKLOG, 128).
childOption(ChannelOption.SO_KEEPALIVE, true).
localAddress(PORT).
childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//使用netty内置的object编解码器
//编码器
pipeline.addLast("encoder", new ObjectEncoder());
//解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new InvokeHandler());
}
});
ChannelFuture channelFuture = boot.bind(PORT).sync();
System.out.println("server is ready");
channelFuture.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new NettyRPCServer().run();
}
}
InvokeHandler来实现反射调用:
/**
* 服务端业务处理类
*/
public class InvokeHandler extends ChannelInboundHandlerAdapter {
private static final String SERVER_PATH = "com.ucal.dc.nio.netty.rpc.server";
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo cl = (ClassInfo) msg;
Class impl = Class.forName(getImplClassName(cl));
Method method = impl.getDeclaredMethod(cl.getMethod(), cl.getType());
Object result = method.invoke(impl.newInstance(),cl.getObjects());
//写入到通道中
ctx.writeAndFlush(result);
}
//通过接口找到实现类
private String getImplClassName(ClassInfo classInfo) throws Exception {
int i = classInfo.getClassName().lastIndexOf(".");
String interfaceName = classInfo.getClassName().substring(i);
Class service = Class.forName(SERVER_PATH + interfaceName);
//使用Reflections框架
Reflections reflections = new Reflections(SERVER_PATH);
Set<Class> types = reflections.getSubTypesOf(service);
if(types.size() == 0){
throw new RuntimeException("未能找到服务实现类");
}else if(types.size() > 1){
throw new RuntimeException("找到多个服务实现类,未能明确使用哪一个");
}else{
Class c = types.iterator().next();
return c.getName();//得到实现类的名字
}
}
}
客户端代理类(使用ClassInfo类来封装数据):
/**
* 客户端代理类
*/
public class NettyRPCProxy {
private static final int PORT = 9090;
//根据接口创建动态代理对象
public static Object create(Class target){
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
//封装ClassInfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethod(method.getName());
classInfo.setObjects(objects);
classInfo.setType(method.getParameterTypes());
//开始使用netty发送数据
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
ResultHandler resultHandler = new ResultHandler();
try {
bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//编码器
pipeline.addLast("encoder", new ObjectEncoder());
//解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
//客户端业务处理类
pipeline.addLast(resultHandler);
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync();
channelFuture.channel().writeAndFlush(classInfo).sync();
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
return resultHandler.response;
}
});
}
}
ClassInfo:
/**
* 用于封装类消息,用于数据传输
*/
public class ClassInfo implements Serializable {
private static final long serialVersionUID = 9159351154097982580L;
private String className; //类名
private String method; //方法名
private Class<?>[] type; //参数类型
private Object[] objects;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public Class<?>[] getType() {
return type;
}
public void setType(Class<?>[] type) {
this.type = type;
}
public Object[] getObjects() {
return objects;
}
public void setObjects(Object[] objects) {
this.objects = objects;
}
}
ResultHandler便是客户端接收到消息的Handler:
/**
* 客户端接收消息处理器
*/
public class ResultHandler extends ChannelInboundHandlerAdapter {
public Object response;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
ctx.close();
}
}
在客户端有这么一个远程服务接口:
/**
* 服务接口
*/
public interface HelloNetty {
String hello();
}
而服务端有这么一个接口的实现类:
public class HelloNettyImpl implements HelloNetty {
@Override
public String hello() {
return "hello netty";
}
}
开始测试:先启动服务端,在客户端使用NettyRPCProxy创建HelloNetty的代理对象,在调用代理对应的接口方法的时候,实际上就是将类信息封装成ClassInfo,通过netty传输到服务端NettyRPCServer。
public class TestNettyRPC {
public static void main(String[] args) {
HelloNetty hn = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
System.out.println(hn.hello());
}
}
下面增加一个带参的调用:
public interface HelloRPC {
String hello(String name);
}
实现类:
public class HelloRPCImpl implements HelloRPC {
@Override
public String hello(String name) {
return "hello "+name;
}
}
测试一下:
public class TestNettyRPC {
public static void main(String[] args) {
HelloNetty hn = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
System.out.println(hn.hello());
HelloRPC hr = (HelloRPC)NettyRPCProxy.create(HelloRPC.class);
System.out.println(hr.hello("susan"));
}
}
输出:
