SSL/TSL支持
为了支持ssl/tls,java提供了javax.net.ssl包,netty通过一个SslHandler的ChannelHandler实现利用该api,其中SslHandler在内部使用SSLEngine完成实际的工作
public class SslChannelInitializer extends ChannelInitializer<Channel> {
private final SslContext sslContext;
private final boolean startTls;
public SslChannelInitializer(SslContext context,boolean startTls) {
this.sslContext = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel channel) throws Exception {
SSLEngine engine = sslContext.newEngine(channel.alloc());
channel.pipeline().addFirst(new SslHandler(engine,startTls));
}
}
HTTP支持
http是基于请求/响应模式的,netty提供了多种编码器和解码器用来简化对这个协议的使用
public class HttpPiplineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPiplineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel channel) throws Exception {
if(client) { //客户端
channel.pipeline().addLast("decoder",new HttpResponseDecoder());
channel.pipeline().addLast("encoder",new HttpRequestEncoder());
} else { //服务器
channel.pipeline().addLast("decoder",new HttpRequestDecoder());
channel.pipeline().addLast("encoder",new HttpResponseEncoder());
}
}
}
聚合http消息
public class HttpPiplineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPiplineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel channel) throws Exception {
if(client) { //客户端
channel.pipeline().addLast("codec",new HttpClientCodec());
} else { //服务器
channel.pipeline().addLast("codec",new HttpServerCodec());
}
}
}
netty为http压缩和解压提供了ChannelHandler实现,同时支持gzip和deflate编码,客户端可通过header告知服务器支持的压缩模式
Accept-Encoding: gzip, deflate
public class HttpPiplineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPiplineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel channel) throws Exception {
if(client) { //客户端
channel.pipeline().addLast("codec",new HttpClientCodec());
//解压数据
channel.pipeline().addLast("decompressor",new HttpContentDecompressor());
} else { //服务器
channel.pipeline().addLast("codec",new HttpServerCodec());
//压缩数据
channel.pipeline().addLast("compressor",new HttpContentCompressor());
}
}
}
空闲连接及超时
IdleStateHandler:当连接空闲时间太长,将会触发一个IdelStateEvent事件
ReadTimeoutHandler:如果在指定时间内没有收到任何的入站数据,则抛出一个ReadTimeoutException并关闭对应的Channel,可以通过重写ChannleHandler的exceptionCaugh()方法检测ReaderTimeOutException
WriteTimeoutHandler:指定时间内没有任何的入站数据,操作方式同上
IdleStateHandler测试,如果60秒之内没有接受或发送任何数据,则关闭连接
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(
new IdleStateHandler(0,0,60, TimeUnit.SECONDS)
).addLast(new HearbeatHandler());
}
public static final class HearbeatHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("heartbeat", CharsetUtil.ISO_8859_1));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
//发送心跳信息,在发送失败时关闭连接
if(evt instanceof IdleStateEvent) {
ctx.writeAndFlush(HEARTBEAT.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
//不是Idle事件则向下一个handler传递事件
super.userEventTriggered(ctx,evt);
}
}
}
}
基于分隔符和基于长度的协议
基于分隔符的协议
DelimiterBasedFrameDecoder:使用任何用户提供的分隔符来提取帧的通用解码器
LineBasedFrameDecoder:提取由行尾符(\n或\r\n)分割的帧的解码器
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new LineBasedFrameDecoder(60 * 1024))
.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
//传入了每个帧的内容,自定义处理
}
}
}
基于长度的协议
FixedLengthFrameDecoder:提供在构造函数时指定的定长帧
LengthFiledBaseFrameDecoder:根据编码进帧头部中的长度值提取帧,该字段的偏移量及长度在构造函数中指定
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel channel) throws Exception {
//将帧长度编码到起始的前八个字节
channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(64*1024,0,8))
.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
//传入了每个帧的内容,自定义处理
}
}
}
序列化数据
JDK序列化
CompatibleObjectDecoder:和使用JDK序列化的非基于Netty的远程节点进行互操作的解码器
CompatibleObjectEncoder:和使用JDK序列化的非基于Netty的远程节点进行互操作的编码器
JBoss Marshalling序列化
最多比JDK序列化快3倍,而且更加紧凑
CompatibleMarshallingDecoder/Encoder:与只使用JDK序列化的远程节点兼容
MarshallingDecoder/Encoder:适用于使用JBoss Marshalling的节点,这些类必须一起使用
使用时向pipline中添加Handler即可
Protocol Buffers序列化
Google开源的数据交换格式,具有许多编程语言绑定,很适合跨语言项目的通信
ProtobufDecoder/Encoder:编解码
ProtobufVarint32FrameDecoder:根据消息中的“Base 128 Varints”整型长度字段值动态的分割所收到的ByteBuf
ProtobufVarint32LengthFieldPrepender:向ByteBuf前追加一个“Base 128 Varints”整型的长度字段值