11 Provided ChannelHandlers and codecs
Netty 提供了大量通用协议的编解码器和Handler
,开箱即用。
11.1 使用SSL/TLS搭建安全的Netty应用
SSL/TLS协议是用于保证数据的安全性,位于其他协议的上层。使用场景包括基于HTTP服务,SMTP邮件服务甚至关系型数据库。
为了支持SSL/TLS,Java提供了javax.net.ssl
包,其中的SSLContext
和SSLEngine
类对加解密提供了直观的实现。Netty则是通过SslHandler
。除了JDK内置实现的SSLEngine
,netty还可以使用性能更好的OpenSslEngine
。但是无论选择哪种Engine
,Netty提供的API都是一样的。
为应用添加支持SSL/TLS功能:
public class SslChannelInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean startTls;
public SslChannelInitializer(SslContext context, boolean startTls) {
this.context = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel ch) throws Exception {
SSLEngine engine = context.newEngine(ch.alloc());
ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
}
}
11.2 使用Netty构建HTTP/HTTPS应用
FullHttpRequest/FullHttpResponse
:一个完整的HTTP请求/应答。
HttpRequest
:HTTP请求的第一部分,包含header。
HttpContent
:报文正文数据。
LastHttpContent
:HTTP请求/应答结束符。
为应用添加支持HTTP/HTTPS功能:
public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPipelineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
pipeline.addLast("decoder", new HttpResponseDecoder());
pipeline.addLast("encoder", new HttpRequestEncoder());
} else {
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
}
}
}
11.2.2 HTTP message aggregation
HTTP的请求和应答由很多部分组成,我们需要把它们组合成一个完整的HTTP报文。Netty提供了聚合器,当消息是完整的HTTP报文时才会被传递到下一个ChannelInboundHandler
。
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpAggregatorInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
}
}
11.2.3 HTTP压缩
使用HTTP非常建议对报文压缩减少传输量,特别是对于文本数据。Netty为压缩和解压缩提供了ChannelHandler
实现,支持gzip和deflate编码。
HTTP请求标头客户端可以通过
Accept-Encoding
来指示支持的加密模式:
GET /encrypted-area HTTP/1.1
Host: www.example.com
Accept-Encoding: gzip, deflate
但是, 请注意, 服务器没有义务压缩它发送的数据。
public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
private final boolean isClient;
public HttpCompressionInitializer(boolean isClient) {
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
pipeline.addLast("decompressor", new HttpContentDecompressor());
} else {
pipeline.addLast("codec", new HttpServerCodec());
pipeline.addLast("compressor", new HttpContentCompressor());
}
}
}
JDK6及更早的版本,需要引入JZlib依赖。
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
<version>1.1.3</version>
</dependency>
11.2.4 使用HTTPS
public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean isClient;
public HttpsCodecInitializer(SslContext context, boolean isClient) {
this.context = context;
this.isClient = isClient;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
SSLEngine engine = context.newEngine(ch.alloc());
pipeline.addFirst("ssl", new SslHandler(engine));
if (isClient) {
pipeline.addLast("codec", new HttpClientCodec());
} else {
pipeline.addLast("codec", new HttpServerCodec());
}
}
}
11.2.5 WebSocket
WebSocket
解决了一个长久存在的难题,HTTP服务器如何实时推送消息到客户端?ajax
轮询和comet
是比较讨巧的做法,但最好的解决方案还是WebSocket
,它相比前两种方式更加高效。web页面和远端服务器可以通过WebSocket
进行双向通信。
如果需要让我们的应用支持WebSocket
,我们需要在ChannelPipeline
里添加客户端或者服务器端的WebSocket ChannelHandler
即可。这个ChannelHandler
会根据frames
的类型来处理消息。WebSocketFrame
可以被分为data frame
和control frame
。
public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec(),
new HttpObjectAggregator(65536),
new WebSocketServerProtocolHandler("/websocket"),
new TextFrameHandler(), new BinaryFrameHandler(),
new ContinuationFrameHandler());
}
public static final class TextFrameHandler extends
SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) throws Exception {
// Handle text frame
}
}
public static final class BinaryFrameHandler extends
SimpleChannelInboundHandler<BinaryWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
BinaryWebSocketFrame msg) throws Exception {
// Handle binary frame
}
}
public static final class ContinuationFrameHandler extends
SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
@Override
public void channelRead0(ChannelHandlerContext ctx,
ContinuationWebSocketFrame msg) throws Exception {
// Handle continuation frame
}
}
}
11.3 空闲连接和超时
检测空闲连接和超时,对于及时释放资源至关重要。
Detecting idle connections and timeouts is essential to freeing resources in a timely manner. This is such a common task that Netty provides several Channel- Handler implementations just for this purpose
例子11.7展示了,当连接空闲超过60s,会使用心跳机制通知给对方,如果没有应答,连接就会被关闭。
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
}
public static final class HeartbeatHandler extends
ChannelStateHandlerAdapter {
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT",
CharsetUtil.ISO_8859_1));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
super.userEventTriggered(ctx, evt);
}
}
}
}
如果连接空闲60s,IdleStateHandler
会调用userEventTriggered()
,入参是IdleStateEvent
。
ChannelFutureListener.CLOSE_ON_FAILURE
,如果发送失败,就会关闭连接。
11.4 基于分隔符(delimited)和基于长度(length-based)解码
11.4.1 基于分隔符的解码
类型 | 描述 |
---|---|
DelimiterBasedFrameDecoder |
用户可以自定义分隔符 |
LineBasedFrameDecoder |
以换行符\r\n 或者\n 作为分隔符 |
用法示例:
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends
SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
// Do something with the data extracted from the frame
}
}
}
11.4.2 length-based解码器
根据长度+内容来编解码
类型 | 描述 |
---|---|
FixedLengthFrameDecoder |
按照固定长度解码 |
LengthFieldBasedFrameDecoder |
frame 的头部代表消息长度,可变的,按照这个头部的值解码 |
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//frame的前8个字节是frame length
pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
pipeline.addLast(new FrameHandler());
}
public static final class FrameHandler extends
SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
throws Exception {
// Do something with the frame
}
}
}
11.5 写入大量数据
由于网络饱和的可能性,在异步框架中有效地编写大块数据是一个特殊问题。由于写操作是非阻塞的,因此即使所有数据都没有写出到网络,它们也会在完成时返并通知ChannelFuture
。 发生这种情况时,如果您不停止写入,则可能会出现内存不足的风险。解决这个问题,要用Netty的零拷贝(zero-copy
)。把文件内容传输到网络堆栈,zero-copy
帮我们省了拷贝的操作。
interface
FileRegion
, defined in the Netty API documentation as “a region of a file that is sent via a Channel that supports zero-copy file transfer.”
用法示例:
FileInputStream in = new FileInputStream(file);
FileRegion region = new DefaultFileRegion(in.getChannel(), 0,
file.length());
channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess()) {
Throwable cause = future.cause();
// Do something
}
}
});
上面的例子是直接把文件数据传输到网络,如果我们需要对文件内容进行操作,我们可以使用ChunkedWriteHandler
,ChunkedWriteHandler
支持异步写入大量数据,同时不会产生大的内存消耗。
11.6 数据序列化
11.6.1 JDK序列化
效率低,码流大,一般不用这个。
11.6.2 使用JBoss Marshalling序列化
性能好,比JDK序列化快3倍。
名称 | 描述 |
---|---|
CompatibleMarshallingDecoder ,CompatibleMarshallingEncoder
|
能够兼容JDK序列化 |
MarshallingDecoder ,MarshallingEncoder
|
适合双方均使用JBoss Marshalling codecs |
public class MarshallingInitializer extends ChannelInitializer<Channel> {
private final MarshallerProvider marshallerProvider;
private final UnmarshallerProvider unmarshallerProvider;
public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider, MarshallerProvider marshallerProvider) {
this.marshallerProvider = marshallerProvider;
this.unmarshallerProvider = unmarshallerProvider;
}
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
pipeline.addLast(new MarshallingEncoder(marshallerProvider));
pipeline.addLast(new ObjectHandler());
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
@Override
public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable)
throws Exception {
// Do something
}
}
}
11.6.3 使用Protocol Buffers序列化
特点:跨语言(cross-language)
名称 | 描述 |
---|---|
ProtobufDecoder ,ProtobufEncoder
|
使用Protocol Buffers来解编码数据 |
ProtobufVarint32FrameDecoder |
根据消息中Base 128 Varints 编码过的长度域动态分割消息 |
public class ProtoBufInitializer extends ChannelInitializer<Channel> {
private final MessageLite lite;
public ProtoBufInitializer(MessageLite lite) {
this.lite = lite;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//Adds ProtobufVarint32FrameDecoder to break down frames
pipeline.addLast(new ProtobufVarint32FrameDecoder());
pipeline.addLast(new ProtobufEncoder());
pipeline.addLast(new ProtobufDecoder(lite));
pipeline.addLast(new ObjectHandler());
}
public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// Do something with the object
}
}
}
12.WebSocket
read-time的web服务定义:
The real-time web is a network web using technologies and practices that enable users to receive information as soon as it is published by its authors, rather than requiring that they or their software check a source periodically for updates.
用户能够在服务器发布的时候,立刻得到消息,而不是轮询服务器。
WebSocket应用实例,聊天室:
协议升级机制(Protocol upgrade mechanism):
https://developer.mozilla.org/en-US/docs/HTTP/Protocol_upgrade_mechanism
HTTP provides a special mechanism allowing an already established connection to upgrade to a new, incompatible, protocol.
Protocol upgrades are always requested by the client; there is no mechanism provided for the server to request a protocol change. When the client wishes to upgrade to a new protocol, it does so by sending a normal request of any type to the server (GET, POST, etc.). The request needs to be configured specially to include the upgrade request, however.
HTTP允许一个已经建立的连接升级到一个全新的,不兼容的非HTTP协议,由客户端发起。
发起方式:
通过协议升级机制,可以将HTTP/HTTPS转换WebSocket协议。
聊天室既要支持HTTP(s),还要支持WebSocket
public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private final String wsUri;
private static final File INDEX;
static {
URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
try {
String path = location.toURI() + "index.html";
path = !path.contains("file:") ? path : path.substring(5);
INDEX = new File(path);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to locate index.html", e);
}
}
public HttpRequestHandler(String wsUri) {
this.wsUri = wsUri;
}
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
if (wsUri.equalsIgnoreCase(request.getUri())) {
//如果是WebSocket,增加引用计数器的值(retain),并且把它传递到下一个ChannelHandler
ctx.fireChannelRead(request.retain());
} else {
if (HttpHeaders.is100ContinueExpected(request)) {
send100Continue(ctx);
}
RandomAccessFile file = new RandomAccessFile(INDEX, "r");
HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
boolean keepAlive = HttpHeaders.isKeepAlive(request);
if (keepAlive) {
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
}
ctx.write(response);
if (ctx.pipeline().get(SslHandler.class) == null) {
ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
} else {
ctx.write(new ChunkedNioFile(file.getChannel()));
}
ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive) {
future.addListener(ChannelFutureListener.CLOSE);
}
}
}
private static void send100Continue(ChannelHandlerContext ctx) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
如果客户端请求是/ws,
HttpRequestHandler
会调用FullHttpRequest
的retain()
,并通过调用fireChannelRead(msg)
转发到下一个ChannelInboundHandler
。retain()
是需要的,不然SimpleChannelInboundHandler
会默认自动释放掉资源。如果是客户端请求头是HTTP 1.1
Expect:100-continue
。HttpRequestHandler
应答100 continue
。
12.3.2 处理WebSocket frames
ITEF定义的WebSocket RFC标准,定义了6种frames,netty对于它们有对应的实现。
我们的聊天室会用到4种frames:
CloseWebSocketFrame
PingWebSocketFrame
PongWebSocketFrame
TextWebSocketFrame
TextWebSocketFrame
是我们唯一一个我们真正需要处理的。为了和WebSocket RFC 保持一致,Netty提供了WebSocketServerProtocolHandler
来管理其他的Frame.
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private final ChannelGroup group;
public TextWebSocketFrameHandler(ChannelGroup group) {
this.group = group;
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
//If the event indicates that the handshake was successful,
//removes the HttpRequestHandler from the ChannelPipeline
//because no further HTTP messages will be received.
ctx.pipeline().remove(HttpRequestHandler.class);
group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
group.add(ctx.channel());
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//Increments the reference count of the message and writes it to all connected clients in the ChannelGroup
group.writeAndFlush(msg.retain());
}
}
12.3.3 初始化ChannelPipeline
public class ChatServerInitializer extends ChannelInitializer<Channel> {
private final ChannelGroup group;
public ChatServerInitializer(ChannelGroup group) {
this.group = group;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//Decodes bytes to HttpRequest , HttpContent , and LastHttpContent . Encodes HttpRequest , HttpContent , and LastHttpContent to bytes.
pipeline.addLast(new HttpServerCodec());
//Writes the contents of a file.
pipeline.addLast(new ChunkedWriteHandler());
//ggregates an HttpMessage and its following HttpContent s into a single FullHttpRequest or
//FullHttpResponse (depending on whether it’s being used to handle requests or responses).
//With this installed, the next ChannelHandler in the pipeline will receive only full HTTP
//requests.
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//Handles FullHttpRequests (those not sent to a /ws URI).
pipeline.addLast(new HttpRequestHandler("/ws"));
//As required by the WebSocket specification, handles the WebSocket upgrade handshake,
//PingWebSocketFrame s, PongWebSocketFrames, and CloseWebSocketFrames.
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//Handles TextWebSocketFrames and handshakecompletion events
pipeline.addLast(new TextWebSocketFrameHandler(group));
}
}
12.3.4 bootstrapping
把聊天室server启动起来:
public class ChatServer {
private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
public ChannelFuture start(InetSocketAddress address) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group).channel(NioServerSocketChannel.class).childHandler(createInitializer(channelGroup));
ChannelFuture future = bootstrap.bind(address);
future.syncUninterruptibly();
channel = future.channel();
return future;
}
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new ChatServerInitializer(group);
}
public void destroy() {
if (channel != null) {
channel.close();
}
channelGroup.close();
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
final ChatServer endpoint = new ChatServer();
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
然后启动ChatServer:
mvn -PChatServer -Dport=1111 clean package exec:exec
12.4.1 把聊天内容加密
在真实应用场景下,聊天内容需要加密以保证安全,对于Netty,只需要添加SslHandler
到ChannelPipeline
并配置好就可以轻松完成。
public class SecureChatServerInitializer extends ChatServerInitializer {
private final SslContext context;
public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
super(group);
this.context = context;
}
@Override
protected void initChannel(Channel ch) throws Exception {
super.initChannel(ch);
SSLEngine engine = context.newEngine(ch.alloc());
ch.pipeline().addFirst(new SslHandler(engine));
}
}
相应地,启动代码:
public class SecureChatServer extends ChatServer {
private final SslContext context;
public SecureChatServer(SslContext context) {
this.context = context;
}
@Override
protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
return new SecureChatServerInitializer(group, context);
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Please give port as argument");
System.exit(1);
}
int port = Integer.parseInt(args[0]);
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
final SecureChatServer endpoint = new SecureChatServer(context);
ChannelFuture future = endpoint.start(new InetSocketAddress(port));
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
endpoint.destroy();
}
});
future.channel().closeFuture().syncUninterruptibly();
}
}
启动之后,你就可以访问https://localhost:9999来连接聊天服务器了。
13 使用UDP广播事件
UDP,User Datagram Protocol,是面向无连接的协议,常用于性能至上,丢包可以容忍的场景。
Domain Name Service(DNS)是基于UDP最知名的协议之一。
13.1 UDP基础
面向连接的TCP协议管理着连接的建立、消息的有序性和可靠性、连接的有序终止。
面向无连接的UDP,没有持续连接这样的概念,UDP每个消息(UDP数据报)都是独立的。此外,UDP也没有像TCP那样的纠错机制(TCP中接收方收到了会发ACK确认已收到,如果发送方没收到接收方的ACK则会重发。)
TCP可以类比成电话,消息有序地双向传递。
UDP类似于群发邮件,不会知道它们到达的顺序。
UDP比TCP快很多。UDP不需要三次握手,也不需要对消息进行管理。
13.2 UDP广播
目前我们的例子都是unicast
,即单播,即消息的目的地是唯一的,UDP提供了额外的传播模式:
-
Multicast
:多播, 传播到定义的目标主机集合。 -
Broadcast
:广播,传播到网络(或者子网)上所有主机
本章的例子会演示UDP广播模式的使用,传播消息到同网络的所有主机。为了达到这个目的,我们使用需要广播地址255.255.255.255
,发到这个地址的消息的目的地会是本地网络(0.0.0.0
)上的所有主机,并且不会被路由器转发到其他网络。
13.3 UDP样例应用
这个应用类似于UNIX-like 操作系统中的syslog
工具。https://linux.cn/article-5023-1.html
13.4 The message POJO: LogEvent
public final class LogEvent {
public static final byte SEPARATOR = (byte) ':';
private final InetSocketAddress source;//the InetSocket Address of the source that sent the LogEvent
private final String logfile;// the name of the log file
private final String msg;//message contents
private final long received;//the time at which the LogEvent was received
public LogEvent(String logfile, String msg) {
this(null, -1, logfile, msg);
}
public LogEvent(InetSocketAddress source, long received, String logfile,
String msg) {
this.source = source;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
public InetSocketAddress getSource() {
return source;
}
public String getLogfile() {
return logfile;
}
public String getMsg() {
return msg;
}
public long getReceivedTimestamp() {
return received;
}
}
13.5 Writing the broadcaster
DatagramPacket
是一个简单的消息容器,DatagramChannel
的实现类们使用DatagramPacket
来和远程主机通信。为了把LogEvent
转换成DatagramPacket
,我们需要编码器,但是我们没有必要从头开始写,我们需要继承MessageToMessageEncoder
编码器代码:
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
public LogEventEncoder(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext channelHandlerContext,
LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf = channelHandlerContext.alloc().buffer(
file.length + msg.length + 1);
buf.writeBytes(file);
buf.writeByte(LogEvent.SEPARATOR);
buf.writeBytes(msg);
out.add(new DatagramPacket(buf, remoteAddress));
}
}
启动类:
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address));
this.file = file;
}
public void run() throws Exception {
Channel ch = bootstrap.bind(0).sync().channel();
long pointer = 0;
for (;;) {
long len = file.length();
if (len < pointer) {
// file was reset
pointer = len;
} else if (len > pointer) {
// Content was added
RandomAccessFile raf = new RandomAccessFile(file, "r");
raf.seek(pointer);
String line;
while ((line = raf.readLine()) != null) {
ch.writeAndFlush(new LogEvent(null, -1, file
.getAbsolutePath(), line));
}
pointer = raf.getFilePointer();
raf.close();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new IllegalArgumentException();
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(
new InetSocketAddress("255.255.255.255",
Integer.parseInt(args[0])), new File(args[1]));
try {
broadcaster.run();
} finally {
broadcaster.stop();
}
}
}
linux 工具netcat介绍:
netcat listens on a specified port and prints all data received to standard output
下面的命令就会开启一个9999端口,并且打印它接收到的消息。
$ nc -l -u 9999