netty是一个nio的框架,这个与原生的java的nio的不同之处在于原生的nio比较难以使用,通过netty可以不用了解底层的编写,快速实现网络编程,更快实现我们需要的业务//虽然我推荐多了解底层
netty的使用比较广泛,比如游戏的数据包传输,还有rpc的调用,比如dubbo。因为个人倾向了解一样东西可以先从demo入手,因此决定写一个简单的rpc调用来了解netty,show code!
先新建一个maven项目,在里面创建三个moudle,分别是start_interface,start_rpc_consumer和start_rpc_provider。在interface的pom.xml加依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
然后新建RPCClient作为客户端的使用
package com.example.start_interface;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class RPCClient {
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static RPCClientHandler client;
/**
* 创建一个代理对象
*/
public Object createProxy(final Class<?> serviceClass, final String providerName
, final RPCClientHandler rpcClientHandler, String ip, int port) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class<?>[]{serviceClass}, (proxy, method, args) -> {
if (client == null) {
initClient(rpcClientHandler, ip, port);
}
String param = providerName;
String[] params = param.split("#");
String className = params[0];
String methodName = params[1];
RPCParam rpcParam =new RPCParam();
rpcParam.setClazzName(className);
rpcParam.setMethodName(methodName);
rpcParam.initParams(args.length);
for (int i = 0; i < args.length; i++) {
rpcParam.getParams()[i]=args[i];
rpcParam.getParamTypes()[i] = args[i].getClass();
}
// 设置参数
client.setRpcParam(rpcParam);
return executor.submit(client).get();
});
}
/**
* 初始化客户端
*/
private static void initClient(RPCClientHandler rpcClientHandler, String ip, int port) {
client = rpcClientHandler;
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
p.addLast(client);
}
});
try {
b.connect(ip, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
以及他的Handler
package com.example.start_interface;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class RPCClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private ChannelHandlerContext context;
//结果
private Object result;
//参数
private RPCParam rpcParam;
@Override
public void channelActive(ChannelHandlerContext ctx) {
context = ctx;
}
/**
* 收到服务端数据,唤醒等待线程
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
result = msg;
notify();
}
/**
* 写出数据,开始等待唤醒
*/
@Override
public synchronized Object call() throws InterruptedException {
context.writeAndFlush(rpcParam);
//context.writeAndFlush("as");
wait();
return result;
}
public RPCParam getRpcParam() {
return rpcParam;
}
public void setRpcParam(RPCParam rpcParam) {
this.rpcParam = rpcParam;
}
}
以及RPCServer
package com.example.start_interface;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class RPCServer {
public static void startServer(String hostName, int port) {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ObjectEncoder());
p.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
p.addLast(new RPCServerHandler());
}
});
bootstrap.bind(hostName, port).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
以及RPCServerHandler
package com.example.start_interface;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class RPCServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(msg.toString());
RPCParam rpcParam = (RPCParam) msg;
String className = rpcParam.getClazzName();
String methodName = rpcParam.getMethodName();
try {
Class clazz = Class.forName(className);
Object object = clazz.newInstance();
Method method = object.getClass().getDeclaredMethod(methodName,rpcParam.getParamTypes());
Object result = method.invoke(object,rpcParam.getParams());
ctx.writeAndFlush(result);
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
其中为了获取参数,我们需要自己写一个param,来方便传输得到方法和参数
package com.example.start_interface;
import java.io.Serializable;
public class RPCParam implements Serializable {
private String clazzName;
private String methodName;
private Object[] params;
private Class<?>[] paramTypes;
public String getClazzName() {
return clazzName;
}
public void setClazzName(String clazzName) {
this.clazzName = clazzName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
public void initParams(int size){
params = new Object[size];
paramTypes = new Class[size];
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
}
其实返回结果感觉也是应该自己写一个了,但是因为是一个练手demo,就算了,然后再加一个interface
package com.example.start_interface;
public interface UserService {
/**
* helloWorld
* @param word
* @return
*/
String sayHello(String word);
String test(Integer i);
}
这时的整体框架算是搭建好了,这时候开始测试一下
这时候在consume包里实现调用
package com.example.start_rpc_consumer;
import com.example.start_interface.RPCClient;
import com.example.start_interface.RPCClientHandler;
import com.example.start_interface.UserService;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StartRpcConsumerApplication {
public static final String providerName = "com.example.start_rpc_provider.UserServiceImpl#test#";
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(StartRpcConsumerApplication.class, args);
RPCClient rpcClient = new RPCClient();
UserService userService = (UserService) rpcClient.createProxy(UserService.class,providerName
,new RPCClientHandler(),"localhost", 8888);
for (;;) {
System.out.println("ok???");
Thread.sleep(1000);
System.out.println(userService.test(12));
System.out.println("ok");
}
}
}
在provider包实现impl的类以及开启rpc服务
package com.example.start_rpc_provider;
import com.example.start_interface.UserService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class UserServiceImpl implements UserService {
@Override
public String sayHello(String word) {
System.out.println("调用成功--参数:" + word);
return "调用成功--参数:" + word;
}
@Override
public String test(Integer i){
return "canshu"+i;
}
}
package com.example.start_rpc_provider;
import com.example.start_interface.RPCServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StartRpcProviderApplication {
public static void main(String[] args) {
SpringApplication.run(StartRpcProviderApplication.class, args);
//UserServiceImpl.startServer("localhost", 8990);
RPCServer.startServer("localhost", 8888);
}
}
以下是输出结果
其实在这个demo里面学到了不少知识,除了粘包拆包以及netty的调用,还有就是动态代理以及反射机制,其中比较有意思的是,这个其实并不支持基本数据类型,只支持object类,我在网上看到的比较有意思的解决方法是,1.默认以后的参数都使用object类不使用基本数据类型。2写一个工具把基本数据类型进行装箱,这个就以后有机会再玩玩啦。
这个是根据之前的一篇博客进行改写的,但是忘了那个网址了,希望有知道的童鞋可以告诉我然后我再加上出处