《netty in action》读书笔记 PART1

6. ChannelHandler and ChannelPipeline

6.1 The ChannelHandler family

6.1.1 Channel的生命周期

ChannelUnregistered

已创建,但是还没有被注册到EventLoop上。

ChannelRegistered

已创建,并且已经注册到EventLoop。

ChannelActive

连接上远程主机。

ChannelActive

没有连接到远程主机。

Channel状态的变化会触发相应的事件。

6.1.2 ChannelHandler的生命周期

handlerAdd

添加handler

handlerRemove

删除handler

exceptionCaught

发生异常

ChannelHandler有两个重要的子接口:ChannelInboundHandlerChannelOutboundHandler

6.1.3 ChannelInboundHandler接口

接受到数据或者Channel的状态发生改变会调用ChannelInboundHandler中的方法。注意,当ChannelInboundHandler中的channelRead()方法被overwrite,需要对ByteBuf实例持有的资源进行显示释放。

public class DiscardHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ReferenceCountUtil.release(msg);
}
}

可以使用SimpleChannelInboundHandler,它会自动释放资源,无需人工干预:

@Sharable
public class SimpleDiscardHandler
extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
Object msg) {
// No need to do anything special
}
}

6.1.4 ChannelOutboundHandler接口

它一个比较强大的功能是延迟执行。

CHANNELPROMISE VS. CHANNELFUTURE

CHANNELPROMISE是CHANNELFUTURE的子接口,CHANNELFUTURE是不可写的,CHANNELPROMISE是可写的(例如setSuccess(),setFailure()方法)

6.1.5 ChannelHandler adapters

关系图

6.1.6 资源管理

要注意ChannelInboundHandler.channelRead()或者ChannelOutboundHandler.write()要释放相应的资源,否则会产生内存泄漏。netty使用引用计数法来管理内存资源。可以使用netty提供的ResourceLeakDetector来发现潜在的内存泄漏问题。

java -Dio.netty.leakDetectionLevel=ADVANCED

leakDetectionLevel可以为DISABLED、SIMPLE(默认)、ADVANCED和PARANOID。

6.2 ChannelPipeline接口

ChannelPipeline可以看成由ChannelHandler组成的链表,I/O事件会在ChannelPipeline上传播。每个新Channel会绑定一个新ChannelPipeline,两者是一对一关系。


pipeline中的事件传播

事件传播的时候,会判断ChannelHandler的类型(implements Inbound还是OutBound的接口)和事件传播的方向是否一致,不一致跳过。

6.2.1 ChannelPipeline修改

ChannelPipeline中ChannelHandler可以动态地被添加、删除或者替换。


ChannelPipeline中操作ChannelHandler

6.2.2 Firing events

会调用ChannelPipeline中下一个ChannelHandler里的方法。


代码示例:


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

public class HttpServer {
    
    public static void main(String[] args) throws InterruptedException {
        
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        
        
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup,workGroup)
                     .channel(NioServerSocketChannel.class)
                     .childHandler(new ChannelInitializer<Channel>() {
    
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new MyHandler());
                            ch.pipeline().addLast(new MyHandler2());
                        }
                         
                    });
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

}

class MyHandler extends SimpleChannelInboundHandler<String>{

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("in MyHandler1 , messageReceived invoked");
        for(int i = 0;i < 10 ; i++) {
            ctx.fireChannelInactive();//调用fireChannelInactive 10次
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("in MyHandler1 , channelInactive invoked");
    }
}

class MyHandler2 extends SimpleChannelInboundHandler<String>{
    @Override
    protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("in MyHandler2 ,messageReceived invoked");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("in MyHandler2 , channelInactive invoked");
    }
}

输出:

控制台输出

6.3 ChannelHandlerContext接口

ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之间的联系,无论何时,添加一个ChannelHandler到ChannelPipeline就会创建一个ChannelHandlerContext。ChannelHandlerContext的主要功能是和所在ChannelPipeline的其他ChannelHandler交互。

ChannelHandlerContext有很多方法,大部分方法在Channel和ChannelPipeline里都出现过,但是这里有一个非常大的区别,调用Channel和ChannelPipeline里的方法,会在整个pipeline里传播(从头到尾),而ChannelHandlerContext里同名的方法,是从当前ChannelHandler开始传播。

6.3.1 Using ChannelHandlerContext

概念关系图

6.3.2 ChannelHandler和ChannelHandlerContext的高级用法。

  1. ChannelHandlerContext的pipeline()方法可以获取ChannelPipeline的引用,这样我们可以通过这个引用操作ChannelHandler,实现动态协议
  2. 可以把ChannelHandlerContext的引用缓存起来,在ChannelHandler方法外面用,甚至在一个不同的线程里使用。下面提供了一个示例。
引用缓存实例
  1. 可以将一个ChannelHandler实例可能会被添加到不同的ChannelPipeline里,但是需要使用@Sharable注解,此外还需注意的是,这个Sharable的ChannelHandler需要是线程安全的。

为什么需要@Sharable的ChannelHandler,一个需求就是通过这个@Sharable来统计多个Channel的数据。

6.4 异常处理

6.4.1 Inbound异常处理

Inbound异常处理

由于exception默认会从触发异常的ChannelHandler继续向后流动,所以图中的这种处理逻辑,我们一般放在最后ChannelPipeline的末尾。这样就可以确保,无论是哪个ChannelHandler触发异常,都能够被捕获并处理。如果不对异常做捕获处理操作,netty会打印异常未被捕获的日志。

6.4.2 outbound异常处理

进行outbound操作,要想知道结果(正常完成还是发生异常),需要这样做:

  1. 每个outbound操作都会返回一个ChannelFuture。添加到ChannelFuture上的监听器会收到成功或者错误通知。

  2. ChannelOutboundHandler中的方法绝大多数都会ChannelPromise类型的参数。ChannelPromise也可以添加监听来接受异步通知。ChannelPromise是可写的,可以通过它的setSucess()方法或者setFailure(Throwable cause)立即发布通知。

如果ChannelOutboundHandler自己抛出异常,netty会通知添加到ChannelPromise上的监听器。

7. EventLoop and threading model

7.1 Threading model overview

JDK早期版本多线程编程的方式是create新线程再start。JDK5推出了Executor API,它的线程池技术通过缓存和重用大大提高了性能。

  1. 有任务(Runnable实现)的时候,从线程池里挑选出一个空闲线程,把任务submit给它。
  2. 任务执行完毕了,线程变成空闲,回到线程池,等待下一次挑选使用。
线程池技术

线程池不能解决上下文切换开销的问题,上下文的开销在heavy load下会很大。

7.2 EventLoop接口

EventLoop是一个用来处理事件的任务,基本思想如下图所示:

image.png

EventLoop接口的API分为两类:concurrent和networking。

  1. concurrent
    基于java.util.concurrent包,提供thread executors
  2. networking
    io.netty.channel继承了EventLoop接口,提供了和Channel事件交互的能力。

7.2.1 Netty 4中I/O事件的处理

7.3.1 JDK 任务调度API

JDK5之前,任务调度只能用java.util.Timer,Timer就是一个后台线程,有很多限制:

  1. 如果执行多个定时任务,一个任务发生异常没有捕获,整个Timer线程会挂掉(其他所有任务都会down掉)
  2. 假如某个任务的执行时间过长,超过一些任务的间隔时间,会导致这些任务执行推迟。

JDK后续推出了java.util.concurrent,其中定义的ScheduleExecutorService克服了这些缺陷。

ScheduledExecutorService executor =Executors.newScheduledThreadPool(10);
ScheduledFuture<?> future = executor.schedule(
  new Runnable() {
  @Override
  public void run() {
  System.out.println("60 seconds later");
}
}, 60, TimeUnit.SECONDS);
//to do
executor.shutdown();

尽管ScheduledExecutorSevice挺好用的,但是在负载大的时候有较大的性能耗费,netty进行了优化。

7.3.2 使用EventLoop进行任务调度

ScheduledExecutorService也有一些限制,例如会创建额外创建一些线程来管理线程池,这在任务调度非常激烈的情况下,会成为性能的瓶颈。netty没有直接使用ScheduledExecutorService,使用了继承于ScheduledExecutorService,自己实现的EventLoop

Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().schedule(
  new Runnable() {
  @Override
  public void run() {
    System.out.println("60 seconds later");
  }
}, 60, TimeUnit.SECONDS);

重复定时执行:

Channel ch = ...
ScheduledFuture<?> future = ch.eventLoop().scheduleAtFixedRate(
  new Runnable() {
  @Override
  public void run() {
    System.out.println("Run every 60 seconds");
  }
}, 60, 60, TimeUnit.Seconds);

7.4 实现细节

7.4.1 线程管理

netty线程模型的优越之处是在于它会确定当前执行线程的身份,再进行相应操作。如果当前执行线程被绑定到当前的ChannelEventLoop,会被直接执行,否则会被放到EventLoop的队列里,每个EventLoop有自己单独的队列。

EventLoop 执行逻辑

Never put a long-running task in the execution queue, because it will block any other task from executing on the same thread.” If you must make blocking calls or execute long-running tasks, we advise the use of a dedicated EventExecutor.

7.4.2 EventLoop/Thread分配

EventLoopGroup包含了EventLoopsChannelsEventLoops创建方式取决于使用哪种I/O.

异步I/O

异步I/O仅仅使用少量的EventLoops,这些EventLoops被很多的Channels共享,这样就可以用最少的线程接受很多的Channels,而不是一个线程一个Channel

阻塞I/O

共同点:每个Channel的I/O事件只会被一个线程处理。

8. Bootstrapping

bootstrapping an application is the process of configuring it to run

8.1 Bootstrap classes

Namely, a server devotes a parent channel to accepting connections from clients and
creating child channels for conversing with them, whereas a client will most likely
require only a single, non-parent channel for all network interactions. (As we’ll see, this
applies also to connectionless transports such as UDP , because they don’t require a
channel for each connection.)

server需要一个parent channel来接受客户端连接,需要创建多个child channels来应答客户端。

client只需要一个单独的channel,不需要parent channel。

服务端处理使用ServerBootstrap,客户端使用Bootstrap

Why are the bootstrap classes Cloneable?
You’ll sometimes need to create multiple channels that have similar or identical settings. To support this pattern without requiring a new bootstrap instance to be created and configured for each channel, AbstractBootstrap has been marked Cloneable . Calling clone() on an already configured bootstrap will return another bootstrap instance that’s immediately usable. Note that this creates only a shallow copy of the bootstrap’s EventLoopGroup , so the latter will be shared among all of the cloned channels. This is acceptable, as the cloned channels are often short-lived, a typical case being a channel created to make an HTTP request.

8.2 Bootstrapping clients and connectionless protocols

Bootstrap主要用来给客户端和使用面向无连接的应用创建Channels

Bootstraping a client:

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
    @Override
    protected void channeRead0(
    ChannelHandlerContext channelHandlerContext,
    ByteBuf byteBuf) throws Exception {
        System.out.println("Received data");
    }
} );
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture)throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Connection established");
        } else {
            System.err.println("Connection attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
} );

8.2.2 Channel和EventLoopGroup的兼容性

you can’t mix components having different
prefixes, such as NioEventLoopGroup and OioSocketChannel . The following listing
shows an attempt to do just that.

ChannelEventLoopGroup的前缀要一样。否则会抛出IllegalStateException

8.3 Bootstraping servers

ServerBootstrap类

A ServerBootstrap creating a ServerChannel on bind() , and the ServerChannel managing a number of child Channels.

相比 Bootstrap类,增加了childHandler(),childAttr(),childOption()方法。ServerChannel来创建许许多多的子Channel,代表接受的连接。ServerBootstrap提供了这些方法来简化对子Channel的配置。

NioEventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx,ByteBuf byteBuf) throw Exception {
    System.out.println("Received data");
}
} );

ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Server bound");
        } else {
            System.err.println("Bound attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
} );

8.4 Bootstrapping clients from a Channel

Suppose your server is processing a client request that requires it to act as a client to
a third system. This can happen when an application, such as a proxy server, has to
integrate with an organization’s existing systems, such as web services or databases. In
such cases you’ll need to bootstrap a client Channel from a ServerChannel

作为服务端接受连接,同时又作为客户端,请求远程服务器(类似于proxy),最容易想到的办法是再创建一个客户端的Bootstrap,但是这样需要另外一个EventLoop来处理客户端角色的Channel,发生在服务端Channel和客户端Channel之间数据交换引起的上文切换也会带来额外的性能损耗。

最好的办法是创建的客户端Channel和服务端Channel共享同一个EventLoop:

    ServerBootstrap bootstrap = new ServerBootstrap();
//Sets the EventLoopGroups that provide EventLoops for processing Channel events
        bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup()).channel(NioServerSocketChannel.class)
                .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
                    ChannelFuture connectFuture;

                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  //Creates a Bootstrap to connect to remote host
                        Bootstrap bootstrap = new Bootstrap();
                        bootstrap.channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
                                System.out.println("Received data");
                            }
                        });
//Uses the same EventLoop as the one assigned to the accepted channel
                        bootstrap.group(ctx.channel().eventLoop());
                        connectFuture = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
                    }

                    @Override
                    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
                            throws Exception {
                        if (connectFuture.isDone()) {
// do something with the data
//When the connection is complete performs some data operation (such as proxying)   
                        }
                    }
                });
        ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    System.out.println("Server bound");
                } else {
                    System.err.println("Bind attempt failed");
                    channelFuture.cause().printStackTrace();
                }
            }
        });

8.5 Adding multiple ChannelHandlers during a bootstrap

bootstrap的时候,如何添加多个ChannelHandler?

netty提供了ChannelInboundHandlerAdapter的特殊子类ChannelInitializer:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter

ChannelInitializer提供了initChannel()可以轻松添加ChannelHandlersChannelPipeline

protected abstract void initChannel(C ch) throws Exception;

一旦Channel注册到EventLoop,我们实现的initChannel()就会被调用。当initChannel()返回的时候,ChannelInitializer实例会把自己从ChannelPipeline中删除。

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                 .channel(NioServerSocketChannel.class)
                 .childHandler(new ChannelInitializerImpl());

        ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
        future.sync();

对应ChannelInitializerImpl的实现:

final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new HttpClientCodec());
        pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
    }
}

8.6 Using Netty ChannelOptions and attributes

不需要我们手工配置每个Channel,netty提供了option()方法来把ChannelOptions应用到bootstrapChannelOptions中的配置会自动地应用到所有Channel

Netty的Channelbootstrap类,提供了AttributeMap抽象集合和AttributeKey<T>泛型类,用来insert和retrieve属性值。使用这些工具,我们可以安全地把任意类型的数据和Channel关联起来。

Attribute的一个使用场景是,服务端应用需要追踪用户和Channels的关系。可以把用户的ID作为一个属性存到Channel里。这样就可以实现根据ID来路由消息和Channel不活跃自动关闭等功能。

final AttributeKey<Integer> id = new AttributeKey<Integer>("ID");
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
        .handler(new SimpleChannelInboundHandler<ByteBuf>() {
            @Override
            public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                Integer idValue = ctx.channel().attr(id).get();
                // do something with the idValue
            }

            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf)
                    throws Exception {
                System.out.println("Received data");
            }
        });
bootstrap.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
bootstrap.attr(id, 123456);
ChannelFuture future = bootstrap.connect(new InetSocketAddress("www.manning.com", 80));
future.syncUninterruptibly();

8.7 Bootstrapping DatagramChannels

之前的bootstrap示例代码都是基于TCP-based的SocketChannelbootstrap也可以配置为无连接协议。

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(new OioEventLoopGroup()).channel(OioDatagramChannel.class)
        .handler(new SimpleChannelInboundHandler<DatagramPacket>() {
            @Override
            public void channelRead0(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
                // Do something with the packet
            }
        });
ChannelFuture future = bootstrap.bind(new InetSocketAddress(0));
future.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (channelFuture.isSuccess()) {
            System.out.println("Channel bound");
        } else {
            System.err.println("Bind attempt failed");
            channelFuture.cause().printStackTrace();
        }
    }
});

8.8 Shutdown

Alternatively, you can call Channel.close() explicitly on all active channels before calling EventLoopGroup.shutdownGracefully() . But in all cases, remember to shut down the EventLoopGroup itself.

EventLoopGroup.shutdownGracefully(),它的返回值是一个future,这也是一个异步操作。

EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class);
...
Future<?> future = group.shutdownGracefully();
// block until the group has shutdown
future.syncUninterruptibly();

9 Unit testing

Netty提供了embedded transport来测试ChannelHandlers,embedded transportEmbeddedChannel (一种特殊的Channel实现) 的特色功能,可以简单地实现在pipeline中传播事件。

我们可以写入inbound或者outbound数据到EmbeddedChannel,然后检查是否有东西传输到ChannelPipeline的末尾。我们还可以确定消息是否被编解码,是否有ChannelHandler被触发。

Inbound data会被ChannelInboundHandlers处理,代表着从远程主机读取的数据。

outbound data会被ChannelOutboundHandlers处理,代表将要发送到远程主机的数据。

相关API:

图9.1展示了数据在EmbededChannel的流动情况。我们可以:

  1. 使用writeOutbound(),写入消息到Channel,让消息以outbound方向在pipeline中传递。后续,我们可以使用readOutbound()读取处理过后的数据,判断结果是否与预期一致。

  2. 使用writeInbound(),写入消息到Channel,让消息以inbound方向在pipeline中传递。后续,我们可以使用readInbound()读取处理过后的数据,判断结果是否与预期一致。

9.2 Testing ChannelHandlers with EmbeddedChannel

9.2.1 Testing inbound messages

图9.2 展示了一个简单的ByteToMessageDecoder实现。如果有足够的数据,这个Decoder会产生固定大小的frame。如果没有足够的数据,没有达到这个固定的size值,它会等待接下来的数据,继续判断能否接着产生frame。

具体代码实现如下:

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException(
                    "frameLength must be a positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
            List<Object> out) throws Exception {
        while (in.readableBytes() >= frameLength) {
            ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }
}

那么如何进行单元测试呢,测试代码如下:

public class FixedLengthFrameDecoderTest {
    @Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(
                new FixedLengthFrameDecoder(3));
        // write bytes
        assertTrue(channel.writeInbound(input.retain()));
        assertTrue(channel.finish());
        // read messages
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        assertNull(channel.readInbound());
        buf.release();
    }

    @Test
    public void testFramesDecoded2() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(
                new FixedLengthFrameDecoder(3));
        assertFalse(channel.writeInbound(input.readBytes(2)));
        assertTrue(channel.writeInbound(input.readBytes(7)));
        assertTrue(channel.finish());
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(3), read);
        read.release();
        assertNull(channel.readInbound());
        buf.release();
    }
}

9.2.2 Testing outbound messages

我们需要测试一个编码器:AbsIntegerEncoder,它是Netty的MessageToMessageEncode的一个实现,功能是将整数取绝对值。

我们的流程如下:

  1. EmbeddedChannel会将一个四字节负数按照outbound方向写入Channel

  2. 编码器会从到来的ByteBuf读取每个负数,调用Math.abs()获得绝对值。

  3. 编码器将绝对值写入到ChannelHandlerPipe

编码器代码实现:

public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
            ByteBuf in, List<Object> out) throws Exception {
        while (in.readableBytes() >= 4) {
            int value = Math.abs(in.readInt());
            out.add(value);
        }
    }
}

怎么测试?请看下文:

public class AbsIntegerEncoderTest {
    @Test
    public void testEncoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 1; i < 10; i++) {
            buf.writeInt(i * -1);
        }
        EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
        assertTrue(channel.writeOutbound(buf));
        assertTrue(channel.finish());
        // read bytes
        for (int i = 1; i < 10; i++) {
            assertEquals(i, channel.readOutbound());
        }
        assertNull(channel.readOutbound());
    }
}

9.3 Testing exception handling

为了测试异常处理,我们有如下的示例。
为防止资源耗尽,当我们读取到的数据多于某个数值,我们会抛出一个TooLongFrameException


在图9.4中,最大frame的大小为3字节,当一个frame的字节数大于3,它会被忽略,并且会抛出TooLongFrameException,其他的pipeline里的其他ChannelHandlers要么覆写exceptionCaught()进行捕获处理,要么会忽略这个异常。

解码器代码:

public class FrameChunkDecoder extends ByteToMessageDecoder {
    private final int maxFrameSize;

    public FrameChunkDecoder(int maxFrameSize) {
        this.maxFrameSize = maxFrameSize;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
            List<Object> out) throws Exception {
        int readableBytes = in.readableBytes();
        if (readableBytes > maxFrameSize) {
            // discard the bytes
            in.clear();
            throw new TooLongFrameException();
        }
        ByteBuf buf = in.readBytes(readableBytes);
        out.add(buf);
    }
}

如何测试,请看:

public class FrameChunkDecoderTest {
    @Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
        assertTrue(channel.writeInbound(input.readBytes(2)));
        try {
            channel.writeInbound(input.readBytes(4));
            Assert.fail();
        } catch (TooLongFrameException e) {
            // expected exception
        }
        assertTrue(channel.writeInbound(input.readBytes(3)));
        assertTrue(channel.finish());
        // Read frames
        ByteBuf read = (ByteBuf) channel.readInbound();
        assertEquals(buf.readSlice(2), read);
        read.release();
        read = (ByteBuf) channel.readInbound();
        assertEquals(buf.skipBytes(4).readSlice(3), read);
        read.release();
        buf.release();
    }
}

10.The codec framework

encoder,将outbound消息转换成易于传输的方式(大部分是字节流)。
decoder,将inbound网络字节流转回成应用程序消息格式。

10.2 Decoders

两种场景需要使用到Decoders:

  1. 将字节流解码成消息--ByteToMessageDecoderReplayingDecoder
  2. 将一种消息类型解码成另一种类型--MessageToMessageDecoder

10.2.1 ByteToMessageDecoder抽象类

功能: 将字节流解码成消息或者另一种字节流。

使用示例ToIntegerDecoder

每次从ByteBuf读取四个字节,解码成int,添加到List里。当没有更多的数据添加到List,List里的内容会传递到下一个ChannelInboundHandler

public class ToIntegerDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.readableBytes() >= 4) {
            out.add(in.readInt());
        }
    }
}

编解码框架里,消息处理完了,会自动调用ReferenceCountUtil.release(message),资源会自动释放。

Reference counting in codecs
As we mentioned in chapters 5 and 6, reference counting requires special attention. In the case of encoders and decoders, the procedure is quite simple: once a mes- sage has been encoded or decoded, it will automatically be released by a call to ReferenceCountUtil.release(message) . If you need to keep a reference for later use you can call ReferenceCountUtil.retain(message) . This increments the reference count, preventing the message from being released.

10.2.2 ReplayingDecoder抽象类

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder

ReplayingDecoder继承于ByteToMessageDecoder,特点是我们不再需要调用readableBytes(),省了判断数据是否足够的逻辑。

注意:

  1. 不是所有的ByteBuf的操作都被支持。如果不支持会抛出UnsupportedOperationException异常。

  2. ReplayingDecoder会比ByteToMessageDecoder稍慢。

ToIntegerDecoder2:

public class ToIntegerDecoder2 extends ReplayingDecoder<Void> {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(in.readInt());
    }
}

更多的解码工具可以在io.netty.handler.codec下找到。

  1. io.netty.handler.codec.LineBasedFrameDecoder,通过换行符(\n或者\r\n)来解析消息。

  2. io.netty.handler.codec.http.HttpObjectDecoder,解析HTTP数据。

10.2.3 MessageToMessageDecoder抽象类

消息格式互相转换,如把一种类型的POJO转换成另外一种。

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter

API差不多

示例:IntegerToStringDecoder

public class IntegerToStringDecoder extends MessageToMessageDecoder<Integer> {
    @Override
    public void decode(ChannelHandlerContext ctx, Integer msg, List<Object> out) throws Exception {
        out.add(String.valueOf(msg));
    }
}

一个更贴切详细的例子是io.netty.handler.codec.http.HttpObjectAggregator

TooLongFrameException防止资源耗尽:

public class SafeByteToMessageDecoder extends ByteToMessageDecoder {
    private static final int MAX_FRAME_SIZE = 1024;

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int readable = in.readableBytes();
        if (readable > MAX_FRAME_SIZE) {
            in.skipBytes(readable);
            throw new TooLongFrameException("Frame too big!");
        }
        // do something
    }
}

10.3 Encoders

与解码器类似,Encoders分为两种:

  1. 将消息编码成字节流。
  2. 将一种消息编码成另一种格式的消息。

10.3.1 MessageToByteEncoder抽象类

示例ShortToByteEncoder

public class ShortToByteEncoder extends MessageToByteEncoder<Short> {
    @Override
    public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out) throws Exception {
        out.writeShort(msg);
    }
}

更具体的应用实践可以参见io.netty.handler.codec.http.websocketx.WebSocket08FrameEncoder

10.4 编解码抽象类

既能encode,又能decode,二合一。

10.4.1 ByteToMessageCodec抽象类

Any request/response protocol could be a good candidate for using the ByteToMessageCodec . For example, in an SMTP implementation, the codec would read incoming bytes and decode them to a custom message type, say SmtpRequest . On the receiving side, when a response is created, an SmtpResponse will be produced, which will be encoded back to bytes for transmission.

10.4.2 MessageToMessageCodec抽象类

public abstract class MessageToMessageCodec<INBOUND_IN,OUTBOUND_IN>
public class WebSocketConvertHandler extends MessageToMessageCodec<WebSocketFrame, WebSocketConvertHandler.MyWebSocketFrame> {
    @Override
    protected void encode(ChannelHandlerContext ctx,
        WebSocketConvertHandler.MyWebSocketFrame msg, List<Object> out)
        throws Exception {
        ByteBuf payload = msg.getData().duplicate().retain();

        switch (msg.getType()) {
        case BINARY:
            out.add(new BinaryWebSocketFrame(payload));

            break;

        case TEXT:
            out.add(new TextWebSocketFrame(payload));

            break;

        case CLOSE:
            out.add(new CloseWebSocketFrame(true, 0, payload));

            break;

        case CONTINUATION:
            out.add(new ContinuationWebSocketFrame(payload));

            break;

        case PONG:
            out.add(new PongWebSocketFrame(payload));

            break;

        case PING:
            out.add(new PingWebSocketFrame(payload));

            break;

        default:
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, WebSocketFrame msg,
        List<Object> out) throws Exception {
        ByteBuf payload = msg.getData().duplicate().retain();

        if (msg instanceof BinaryWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.BINARY,
                    payload));
        } else if (msg instanceof CloseWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.CLOSE,
                    payload));
        } else if (msg instanceof PingWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PING,
                    payload));
        } else if (msg instanceof PongWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.PONG,
                    payload));
        } else if (msg instanceof TextWebSocketFrame) {
            out.add(new MyWebSocketFrame(MyWebSocketFrame.FrameType.TEXT,
                    payload));
        } else if (msg instanceof ContinuationWebSocketFrame) {
            out.add(new MyWebSocketFrame(
                    MyWebSocketFrame.FrameType.CONTINUATION, payload));
        } else {
            throw new IllegalStateException("Unsupported websocket msg " + msg);
        }
    }

    public static final class MyWebSocketFrame {
        private final FrameType type;
        private final ByteBuf data;

        public WebSocketFrame(FrameType type, ByteBuf data) {
            this.type = type;
            this.data = data;
        }

        public FrameType getType() {
            return type;
        }

        public ByteBuf getData() {
            return data;
        }
        public enum FrameType {BINARY,
            CLOSE,
            PING,
            PONG,
            TEXT,
            CONTINUATION;
        }
    }
}

10.4.3 CombinedChannelDuplexHandler类

将编码器解码器放在一块影响代码的重用性。CombinedChannelDuplexHandler可以解决这个问题。我们可以使用它而不直接使用codec抽象类。

方法签名:

public class CombinedChannelDuplexHandler <I extends ChannelInboundHandler, O extends ChannelOutboundHandler>

下面是一个使用范例:
解码器例子ByteToCharDecoder
功能是一次读取2个字节,解码成char写到List

public class ByteToCharDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
        while (in.readableBytes() >= 2) {
            out.add(in.readChar());
        }
    }
}

编码器例子CharToByteEncoder

public class CharToByteEncoder extends MessageToByteEncoder<Character> {
    @Override
    public void encode(ChannelHandlerContext ctx, Character msg, ByteBuf out)
        throws Exception {
        out.writeChar(msg);
    }
}

是时候combine了:

public class CombinedByteCharCodec extends CombinedChannelDuplexHandler<ByteToCharDecoder, CharToByteEncoder> {
    public CombinedByteCharCodec() {
        super(new ByteToCharDecoder(), new CharToByteEncoder());
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容