通过SSL/TLS 保护Netty 应用程序
Netty 通过一个名为SslHandler 的ChannelHandler实现利用javax.net.ssl下的SSLContext和SSLEngine的api,其中SslHandler 在内部使用SSLEngine 来完成实际的工作。
public class SslChannelInitializer extends ChannelInitializer<Channel>{
private final SslContext context; // 传入要使用的SslContext
private final boolean startTls; // 如果设置为true,第一个写入的消息将不会被加密(客户端应该设置为true)
public SslChannelInitializer(SslContext context,boolean startTls) {
this.context = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel ch) throws Exception {
// 对于每个SslHandler 实例,都使用Channel 的ByteBuf-Allocator 从SslContext 获取一个新的SSLEngine
SSLEngine engine = context.newEngine(ch.alloc());
// 将SslHandler 作为第一个ChannelHandler 添加到ChannelPipeline 中
ch.pipeline().addFirst("ssl",new SslHandler(engine, startTls));
}
}
构建基于Netty 的HTTP/HTTPS 应用程序
- HTTP 解码器、编码器和编解码器
添加HTTP 支持例子,这样你就知道有多简单了:
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPipelineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) { // 如果是客户端
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
} else {
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
}
}
}
聚合HTTP 消息
由于HTTP 的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。为了消除这项繁琐的任务,Netty 提供了一个聚合器,它可以将多个消息部分合并为FullHttpRequest 或者FullHttpResponse 消息。
/**
* 自动聚合HTTP 的消息片段
*/
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpAggregatorInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
pipeline.addLast("aggregator",new HttpObjectAggregator(512 * 1024));
}
}
-
HTTP 压缩
Netty 为压缩和解压缩提供了ChannelHandler 实现,它们同时支持gzip 和deflate 编码。
/**
* 自动压缩HTTP 消息
*/
public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpCompressionInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
// 如果是客户端,则添加HttpContentDecompressor 以处理来自服务器的压缩内容.
pipeline.addLast("decompressor",new HttpContentDecompressor());
} else {
pipeline.addLast("codec", new HttpServerCodec());
// 如果是服务器,则添加HttpContentCompressor来压缩数据(如果客户端支持它)
pipeline.addLast("compressor",new HttpContentCompressor());
}
}
}
- 使用HTTPS
public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean isClient;
public HttpsCodecInitializer(SslContext context, boolean isClient) {
this.context = context;
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SSLEngine engine = context.newEngine(ch.alloc());
pipeline.addFirst("ssl", new SslHandler(engine));
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
}
}
-
WebSocket
WebSocket为网页和远程服务器之间的双向通信提供了一种替代HTTP轮询的方案。
要想向你的应用程序中添加对于WebSocket 的支持,你需要将适当的客户端或者服务器WebSocket ChannelHandler 添加到ChannelPipeline 中。这个类将处理由WebSocket 定义的称为帧的特殊消息类型。如表所示,WebSocketFrame 可以被归类为数据帧或者控制帧。
public class WebSocketServerInitializer extends ChannelInitializer<Channel>{
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(
new HttpServerCodec(),
new HttpObjectAggregator(65536), // 为握手提供聚合的HttpRequest
new WebSocketServerProtocolHandler("/websocket"), // 如果被请求的端点是"/websocket",则处理该升级握手
new TextFrameHandler(), // TextFrameHandler 处理TextWebSocketFrame
new BinaryFrameHandler(),
new ContinuationFrameHandler());
}
public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,TextWebSocketFrame msg) throws Exception {
// Handle text frame
}
}
public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,BinaryWebSocketFrame msg) throws Exception {
// Handle binary frame
}
}
public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,ContinuationWebSocketFrame msg) throws Exception {
// Handle continuation frame
}
}
}
要想为WebSocket 添加安全性,只需要将SslHandler 作为第一个ChannelHandler 添加到ChannelPipeline 中。
空闲的连接和超时
以下代码展示了当使用通常的发送心跳消息到远程节点的方法时,如果在60 秒之内没有接收或者发送任何的数据,我们将如何得到通知;如果没有响应,则连接会被关闭.
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler 将在被触发时发送一个IdleStateEvent 事件
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE =Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1));
@Override
public void userEventTriggered(ChannelHandlerContext ctx,Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { // 发送心跳消息,并在发送失败时关闭该连接
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else { // 不是IdleStateEvent事件,所以将它传递给下一个ChannelInboundHandler
super.userEventTriggered(ctx, evt);
}
}
}
}
解码基于分隔符的协议和基于长度的协议
-
基于分隔符的协议
基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或者消息段(通常被称为帧)的开头或者结尾。由RFC文档正式定义的许多协议(如SMTP、POP3、IMAP以及Telnet)都是这样的.
处理由行尾符分隔的帧的例子:
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx,ByteBuf msg) throws Exception {
// Do something with the data extracted from the frame
}
}
}
作为示例,我们将使用下面的协议规范:
1.传入数据流是一系列的帧,每个帧都由换行符(\n)分隔;
2.每个帧都由一系列的元素组成,每个元素都由单个空格字符分隔;
3.一个帧的内容代表一个命令,定义为一个命令名称后跟着数目可变的参数。
我们用于这个协议的自定义解码器将定义以下类:
1.Cmd—将帧(命令)的内容存储在ByteBuf 中,一个ByteBuf 用于名称,另一个用于参数;
2.CmdDecoder—从被重写了的decode()方法中获取一行字符串,并从它的内容构建一个Cmd 的实例;
3.CmdHandler —从CmdDecoder 获取解码的Cmd 对象,并对它进行一些处理;
4.CmdHandlerInitializer —为了简便起见,我们将会把前面的这些类定义为专门的ChannelInitializer 的嵌套类,其将会把这些ChannelInboundHandler 安装到ChannelPipeline 中。
public class CmdHandlerInitializer extends ChannelInitializer<Channel> {
final byte SPACE = (byte)' ';
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CmdDecoder(64 * 1024)); // 添加CmdDecoder 以提取Cmd 对象,并将它转发给下一个ChannelInboundHandler
pipeline.addLast(new CmdHandler()); // 添加CmdHandler 以接收和处理Cmd 对象
}
public static final class Cmd {
private final ByteBuf name;
private final ByteBuf args;
public Cmd(ByteBuf name, ByteBuf args) {
this.name = name;
this.args = args;
}
public ByteBuf name() {
return name;
}
public ByteBuf args() {
return args;
}
}
public static final class CmdDecoder extends LineBasedFrameDecoder {
public CmdDecoder(int maxLength) {
super(maxLength);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
ByteBuf frame = (ByteBuf) super.decode(ctx, buffer);
if (frame == null) {
return null;
}
// 查找第一个空格字符的索引。前面是命令名称,接着是参数
int index = frame.indexOf(frame.readerIndex(),frame.writerIndex(), SPACE);
// 使用包含有命令名称和参数的切片创建新的Cmd 对象
return new Cmd(frame.slice(frame.readerIndex(), index),frame.slice(index + 1, frame.writerIndex()));
}
}
public static final class CmdHandler extends SimpleChannelInboundHandler<Cmd> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception {
// Do something with the command(获取Cmd对象进一步操作)
}
}
}
- 基于长度的协议
你将经常会遇到被编码到消息头部的帧大小不是固定值的协议。为了处理这种变长帧,你可以使用LengthFieldBasedFrameDecoder,它将从头部字段确定帧长,然后从数据流中提取指定的字节数。
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx,ByteBuf msg) throws Exception {
// Do something with the frame
}
}
}
-
写大型数据
通过支持零拷贝的文件传输的Channel 来发送的文件区域.
FileInputStream in = new FileInputStream(file);
// 以该文件的完整长度创建一个新的DefaultFileRegion
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
// 发送该DefaultFileRegion,并注册一个ChannelFutureListener
channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause(); // 处理失败
// Do something
}
}
});
这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗。
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
private final File file;
private final SslContext sslCtx;
public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
this.file = file;
this.sslCtx = sslCtx;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new WriteStreamHandler()); // 一旦连接建立,WriteStreamHandler就开始写文件数据
}
public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx)throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
}
}
}
序列化数据
- JDK 序列化
-
使用JBoss Marshalling 进行序列化
比JDK序列化最多快3 倍,而且也更加紧凑。
public class MarshallingInitializer extends ChannelInitializer<Channel> {
private final MarshallerProvider marshallerProvider;
private final UnmarshallerProvider unmarshallerProvider;
public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider,MarshallerProvider marshallerProvider) {
this.marshallerProvider = marshallerProvider;
this.unmarshallerProvider = unmarshallerProvider;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
pipeline.addLast(new MarshallingEncoder(marshallerProvider));
pipeline.addLast(new ObjectHandler()); // 添加ObjectHandler,以处理普通的实现了Serializable 接口的POJO
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext,Serializable serializable) throws Exception {
// Do something
}
}
}
-
通过Protocol Buffers 序列化
Protocol Buffers 以一种紧凑而高效的方式对结构化的数据进行编码以及解码。它具有许多的编程语言绑定,使得它很适合跨语言的项目。(由Google公司开发的、现在已经开源的数据交换格式。)
public class ProtoBufInitializer extends ChannelInitializer<Channel> {
private final MessageLite lite;
public ProtoBufInitializer(MessageLite lite) {
this.lite = lite;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufEncoder()); ①
pipeline.addLast(new ProtobufDecoder(lite));
pipeline.addLast(new ObjectHandler());
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Do something with the object
}
}
}