服务端
EchoServerHandler - 消息处理
@ChannelHandler.Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws FileNotFoundException {
//ReferenceCountUtil.release(msg);
ByteBuf buf = (ByteBuf)msg;
System.out.println("Server recevied:"+ buf.toString(Charset.forName("UTF-8")));
ctx.write(buf);
}
final ByteBuf byteBuffer = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("hello world!\r\n", CharsetUtil.UTF_8));
@Override
public void channelActive(ChannelHandlerContext ctx){
ChannelFuture cf= ctx.writeAndFlush(byteBuffer.duplicate());
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("Write successful!");
}else{
System.out.println("Write Error!");
future.cause().printStackTrace();
}
}
});
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE); //将未决消息冲刷到远程节点且关闭该Channel
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
EchoServer- 启动程序
public class EchoServer {
public static void main(String[] args) throws InterruptedException {
EchoServer echoNettyServer = new EchoServer();
echoNettyServer.start(9981);
}
public void start(int port) throws InterruptedException {
final EchoServerHandler serverHandler = new EchoServerHandler();
final ByteBuf byteBuffer = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("HI!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new NioEventLoopGroup(); //非阻塞方式
try{
ServerBootstrap b = new ServerBootstrap(); //创建ServerBootstrap
b.group(group)//NioSctpChannel
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}//每个已接收的连接都调用它
});
ChannelFuture f = b.bind().sync(); //异步绑定服务器,调用sync()方法阻塞等待直接绑定完成
f.channel().closeFuture().sync(); //获取Channel的CloseFuture,并且阻塞当前线程直接它完成
}finally {
group.shutdownGracefully().sync();
}
}
}
客户端
EchoClientHandler --消息处理
/**
* SimpleChannelInboundHandler:
* 1. 当channnelRead0方法完成时,已传入的消息并且已处理完它了。当方法返回时,SimpleChannelInboundHandler负责释放指向保存该消息的ByteBuffer的内存引用
* 2. 而服务端Handler,需要将传入的消息回传给发送者,而write()是异步操作,直接ChannelRead()方法返回后可能仍然没有完成。
*/
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 接收到消息,服务器发送的消息可能会被分块接收(并非一次性全部接收)
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client received:"+msg.toString(CharsetUtil.UTF_8));
}
@Override
public void channelActive(ChannelHandlerContext ctx){
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!!", CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
EchoClient - 启动程序
public class EchoClient {
public static void main(String[] args) throws InterruptedException {
String host = "localhost";
int port = 9981;
new EchoClient().start(host,port);
}
public void start(String host,int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
try {
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host,port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler())
;
}
});
ChannelFuture f = b.connect().sync(); //连接到远程节点,阻塞等待直接连接完成
f.channel().closeFuture().sync();//阻塞,直接Channel关闭
f.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("Connection established!");
}else{
System.out.println("Connection attempt failed!");
future.cause().printStackTrace();;
}
}
}
);
}finally {
group.shutdownGracefully().sync();
}
}
}