netty的核心组件
Channel
channel是入站或出站数据的载体,它可以被打开或关闭,连接或断开连接
回调
回调就是一个方法,指向已被提供给另外一个方法的方法的引用,netty使用了回调来处理事件,当一个回调被触发时,相关的事件可以被一个interfaceChannelHandler的实现处理
ChannelFuture
netty提供了自己的实现的ChannelFuture,区别于JDK内置的Future,不需要手动检查完成,或者阻塞等待,可以理解为回调的精致版本
事件和ChannelHandler
netty使用不同的事件来通知状态的改变或者是操作的状态,可以基于已发生的事件来触发适当的动作
简单应用
编写server handler
@ChannelHandler.Sharable //在pipeline中可以被多个channel共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("server receive: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将未决消息冲刷到远程节点,并且关闭该channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close(); //关闭该channel
}
}
编写server类
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
final EchoServerHandler echoServerHandler = new EchoServerHandler();
//创建eventloopgroup
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建server引导类
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class) //指定使用Nio传输通道
.localAddress(new InetSocketAddress("localhost",port)) //设置套接字地址
//添加serverhandler到子channel的pipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(echoServerHandler);
}
});
//异步绑定服务器,调用sync方法组侧等待直到绑定完成
ChannelFuture f = b.bind().sync();
//获取channel的closeFuture,并且阻塞当前线程直到完成
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
//在端口8888上启用监听
new EchoServer(8888).start();
}
}
编写client channel handler
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* channel为活跃状态时,发送一条消息
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("channel active...",CharsetUtil.UTF_8));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
System.out.println(byteBuf.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
client 类
private final int port;
private final String host;
public EchoClient(String host,int port) {
this.port = port;
this.host = host;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端引导类
Bootstrap b = new Bootstrap();
//eventloop处理客户端事件
b.group(group)
//适用于Nio传输的channel
.channel(NioSocketChannel.class)
//远端服务器地址
.remoteAddress(new InetSocketAddress(host,port))
//向channel的pipeline添加一个handler实例
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
//连接到远程节点,阻塞等待直到连接完成
ChannelFuture f = b.connect().sync();
//阻塞,直到channel关闭
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new EchoClient("localhost",8888).start();
}
}
运行结果
...
10:35:32.937 [nioEventLoopGroup-2-2] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@5b90a3bd
server receive: channel active...