《netty in action》读书笔记 PART2

11 Provided ChannelHandlers and codecs

Netty 提供了大量通用协议的编解码器和Handler,开箱即用。

11.1 使用SSL/TLS搭建安全的Netty应用

SSL/TLS协议是用于保证数据的安全性,位于其他协议的上层。使用场景包括基于HTTP服务,SMTP邮件服务甚至关系型数据库。

为了支持SSL/TLS,Java提供了javax.net.ssl包,其中的SSLContextSSLEngine类对加解密提供了直观的实现。Netty则是通过SslHandler。除了JDK内置实现的SSLEngine,netty还可以使用性能更好的OpenSslEngine。但是无论选择哪种Engine,Netty提供的API都是一样的。

为应用添加支持SSL/TLS功能:

public class SslChannelInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean startTls;

    public SslChannelInitializer(SslContext context, boolean startTls) {
        this.context = context;
        this.startTls = startTls;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        SSLEngine engine = context.newEngine(ch.alloc());
        ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
    }
}

11.2 使用Netty构建HTTP/HTTPS应用

FullHttpRequest/FullHttpResponse:一个完整的HTTP请求/应答。

HttpRequest:HTTP请求的第一部分,包含header。

HttpContent:报文正文数据。

LastHttpContent:HTTP请求/应答结束符。

为应用添加支持HTTP/HTTPS功能:

public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
    private final boolean client;

    public HttpPipelineInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        if (client) {
            pipeline.addLast("decoder", new HttpResponseDecoder());
            pipeline.addLast("encoder", new HttpRequestEncoder());
        } else {
            pipeline.addLast("decoder", new HttpRequestDecoder());
            pipeline.addLast("encoder", new HttpResponseEncoder());
        }
    }
}

11.2.2 HTTP message aggregation

HTTP的请求和应答由很多部分组成,我们需要把它们组合成一个完整的HTTP报文。Netty提供了聚合器,当消息是完整的HTTP报文时才会被传递到下一个ChannelInboundHandler

public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;

    public HttpAggregatorInitializer(boolean isClient) {
        this.isClient = isClient;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }

        pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
    }
}

11.2.3 HTTP压缩

使用HTTP非常建议对报文压缩减少传输量,特别是对于文本数据。Netty为压缩和解压缩提供了ChannelHandler实现,支持gzip和deflate编码。

HTTP请求标头客户端可以通过Accept-Encoding来指示支持的加密模式:

GET /encrypted-area HTTP/1.1     
Host: www.example.com     
Accept-Encoding: gzip, deflate 

但是, 请注意, 服务器没有义务压缩它发送的数据。

public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;

    public HttpCompressionInitializer(boolean isClient) {
        this.isClient = isClient;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
            pipeline.addLast("decompressor", new HttpContentDecompressor());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
            pipeline.addLast("compressor", new HttpContentCompressor());
        }
    }
}

JDK6及更早的版本,需要引入JZlib依赖。

<dependency>
    <groupId>com.jcraft</groupId>
    <artifactId>jzlib</artifactId>
    <version>1.1.3</version>
</dependency>

11.2.4 使用HTTPS

public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean isClient;

    public HttpsCodecInitializer(SslContext context, boolean isClient) {
        this.context = context;
        this.isClient = isClient;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        SSLEngine engine = context.newEngine(ch.alloc());
        pipeline.addFirst("ssl", new SslHandler(engine));
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
    }
}

11.2.5 WebSocket

WebSocket解决了一个长久存在的难题,HTTP服务器如何实时推送消息到客户端?ajax轮询和comet是比较讨巧的做法,但最好的解决方案还是WebSocket,它相比前两种方式更加高效。web页面和远端服务器可以通过WebSocket进行双向通信。

如果需要让我们的应用支持WebSocket,我们需要在ChannelPipeline里添加客户端或者服务器端的WebSocket ChannelHandler即可。这个ChannelHandler会根据frames的类型来处理消息。WebSocketFrame可以被分为data framecontrol frame

public class WebSocketServerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new HttpServerCodec(),
                new HttpObjectAggregator(65536),
                new WebSocketServerProtocolHandler("/websocket"),
                new TextFrameHandler(), new BinaryFrameHandler(),
                new ContinuationFrameHandler());
    }

    public static final class TextFrameHandler extends
            SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
                TextWebSocketFrame msg) throws Exception {
            // Handle text frame
        }
    }

    public static final class BinaryFrameHandler extends
            SimpleChannelInboundHandler<BinaryWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
                BinaryWebSocketFrame msg) throws Exception {
            // Handle binary frame
        }
    }

    public static final class ContinuationFrameHandler extends
            SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
                ContinuationWebSocketFrame msg) throws Exception {
            // Handle continuation frame
        }
    }
}

11.3 空闲连接和超时

检测空闲连接和超时,对于及时释放资源至关重要。

Detecting idle connections and timeouts is essential to freeing resources in a timely manner. This is such a common task that Netty provides several Channel- Handler implementations just for this purpose

例子11.7展示了,当连接空闲超过60s,会使用心跳机制通知给对方,如果没有应答,连接就会被关闭。

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
        pipeline.addLast(new HeartbeatHandler());
    }

    public static final class HeartbeatHandler extends
            ChannelStateHandlerAdapter {
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled
                .unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT",
                        CharsetUtil.ISO_8859_1));

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                throws Exception {
            if (evt instanceof IdleStateEvent) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(
                        ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

如果连接空闲60s,IdleStateHandler会调用userEventTriggered(),入参是IdleStateEvent

ChannelFutureListener.CLOSE_ON_FAILURE,如果发送失败,就会关闭连接。

11.4 基于分隔符(delimited)和基于长度(length-based)解码

11.4.1 基于分隔符的解码

类型 描述
DelimiterBasedFrameDecoder 用户可以自定义分隔符
LineBasedFrameDecoder 以换行符\r\n或者\n作为分隔符

用法示例:

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
        pipeline.addLast(new FrameHandler());
    }

    public static final class FrameHandler extends
            SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                throws Exception {
            // Do something with the data extracted from the frame
        }
    }
}

11.4.2 length-based解码器

根据长度+内容来编解码

类型 描述
FixedLengthFrameDecoder 按照固定长度解码
LengthFieldBasedFrameDecoder frame的头部代表消息长度,可变的,按照这个头部的值解码
固定长度解码
头部指定长度解码
public class LengthBasedInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
//frame的前8个字节是frame length
        pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8));
        pipeline.addLast(new FrameHandler());
    }

    public static final class FrameHandler extends
            SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                throws Exception {
            // Do something with the frame
        }
    }
}

11.5 写入大量数据

由于网络饱和的可能性,在异步框架中有效地编写大块数据是一个特殊问题。由于写操作是非阻塞的,因此即使所有数据都没有写出到网络,它们也会在完成时返并通知ChannelFuture。 发生这种情况时,如果您不停止写入,则可能会出现内存不足的风险。解决这个问题,要用Netty的零拷贝(zero-copy)。把文件内容传输到网络堆栈,zero-copy帮我们省了拷贝的操作。

interface FileRegion , defined in the Netty API documentation as “a region of a file that is sent via a Channel that supports zero-copy file transfer.”

用法示例:

        FileInputStream in = new FileInputStream(file);
        FileRegion region = new DefaultFileRegion(in.getChannel(), 0,
                file.length());
        channel.writeAndFlush(region).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future)
                    throws Exception {
                if (!future.isSuccess()) {
                    Throwable cause = future.cause();
                    // Do something
                }
            }
        });

上面的例子是直接把文件数据传输到网络,如果我们需要对文件内容进行操作,我们可以使用ChunkedWriteHandlerChunkedWriteHandler支持异步写入大量数据,同时不会产生大的内存消耗。

11.6 数据序列化

11.6.1 JDK序列化

效率低,码流大,一般不用这个。

11.6.2 使用JBoss Marshalling序列化

性能好,比JDK序列化快3倍。

名称 描述
CompatibleMarshallingDecoder,CompatibleMarshallingEncoder 能够兼容JDK序列化
MarshallingDecoder,MarshallingEncoder 适合双方均使用JBoss Marshalling codecs
public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;

    public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider, MarshallerProvider marshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
        pipeline.addLast(new MarshallingEncoder(marshallerProvider));
        pipeline.addLast(new ObjectHandler());
    }

    public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
        @Override
        public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable)
                throws Exception {
            // Do something
        }
    }
}

11.6.3 使用Protocol Buffers序列化

特点:跨语言(cross-language)

名称 描述
ProtobufDecoderProtobufEncoder 使用Protocol Buffers来解编码数据
ProtobufVarint32FrameDecoder 根据消息中Base 128 Varints编码过的长度域动态分割消息
public class ProtoBufInitializer extends ChannelInitializer<Channel> {
    private final MessageLite lite;

    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
//Adds ProtobufVarint32FrameDecoder to break down frames
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(new ProtobufDecoder(lite));
        pipeline.addLast(new ObjectHandler());
    }

    public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            // Do something with the object
        }
    }
}

12.WebSocket

read-time的web服务定义:

The real-time web is a network web using technologies and practices that enable users to receive information as soon as it is published by its authors, rather than requiring that they or their software check a source periodically for updates.

用户能够在服务器发布的时候,立刻得到消息,而不是轮询服务器。

WebSocket应用实例,聊天室:


协议升级机制(Protocol upgrade mechanism):

https://developer.mozilla.org/en-US/docs/HTTP/Protocol_upgrade_mechanism

HTTP provides a special mechanism allowing an already established connection to upgrade to a new, incompatible, protocol.

Protocol upgrades are always requested by the client; there is no mechanism provided for the server to request a protocol change. When the client wishes to upgrade to a new protocol, it does so by sending a normal request of any type to the server (GET, POST, etc.). The request needs to be configured specially to include the upgrade request, however.

HTTP允许一个已经建立的连接升级到一个全新的,不兼容的非HTTP协议,由客户端发起。

发起方式:


通过协议升级机制,可以将HTTP/HTTPS转换WebSocket协议。

聊天室既要支持HTTP(s),还要支持WebSocket


public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final String wsUri;
    private static final File INDEX;
    static {
        URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();
        try {
            String path = location.toURI() + "index.html";
            path = !path.contains("file:") ? path : path.substring(5);
            INDEX = new File(path);
        } catch (URISyntaxException e) {
            throw new IllegalStateException("Unable to locate index.html", e);
        }
    }

    public HttpRequestHandler(String wsUri) {
        this.wsUri = wsUri;
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (wsUri.equalsIgnoreCase(request.getUri())) {
    //如果是WebSocket,增加引用计数器的值(retain),并且把它传递到下一个ChannelHandler
            ctx.fireChannelRead(request.retain());
        } else {
            if (HttpHeaders.is100ContinueExpected(request)) {
                send100Continue(ctx);
            }
            RandomAccessFile file = new RandomAccessFile(INDEX, "r");
            HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
            response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");
            boolean keepAlive = HttpHeaders.isKeepAlive(request);
            if (keepAlive) {
                response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());
                response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);
            if (ctx.pipeline().get(SslHandler.class) == null) {
                ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));
            } else {
                ctx.write(new ChunkedNioFile(file.getChannel()));
            }
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            if (!keepAlive) {
                future.addListener(ChannelFutureListener.CLOSE);
            }
        }
    }

    private static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
  1. 如果客户端请求是/ws,HttpRequestHandler会调用FullHttpRequestretain(),并通过调用fireChannelRead(msg)转发到下一个ChannelInboundHandlerretain()是需要的,不然SimpleChannelInboundHandler会默认自动释放掉资源。

  2. 如果是客户端请求头是HTTP 1.1 Expect:100-continueHttpRequestHandler应答100 continue

12.3.2 处理WebSocket frames

ITEF定义的WebSocket RFC标准,定义了6种frames,netty对于它们有对应的实现。

我们的聊天室会用到4种frames:

  • CloseWebSocketFrame
  • PingWebSocketFrame
  • PongWebSocketFrame
  • TextWebSocketFrame

TextWebSocketFrame是我们唯一一个我们真正需要处理的。为了和WebSocket RFC 保持一致,Netty提供了WebSocketServerProtocolHandler来管理其他的Frame.

public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private final ChannelGroup group;

    public TextWebSocketFrameHandler(ChannelGroup group) {
        this.group = group;
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
//If the event indicates that the handshake was successful, 
//removes the HttpRequestHandler from the ChannelPipeline 
//because no further HTTP messages will be received.
            ctx.pipeline().remove(HttpRequestHandler.class);
            group.writeAndFlush(new TextWebSocketFrame("Client " + ctx.channel() + " joined"));
            group.add(ctx.channel());
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//Increments the reference count of the message and writes it to all connected clients in the ChannelGroup
        group.writeAndFlush(msg.retain());
    }
}

12.3.3 初始化ChannelPipeline

public class ChatServerInitializer extends ChannelInitializer<Channel> {
    private final ChannelGroup group;

    public ChatServerInitializer(ChannelGroup group) {
        this.group = group;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
//Decodes bytes to HttpRequest , HttpContent , and LastHttpContent . Encodes HttpRequest , HttpContent , and LastHttpContent to bytes.
        pipeline.addLast(new HttpServerCodec());

//Writes the contents of a file.
        pipeline.addLast(new ChunkedWriteHandler());

//ggregates an HttpMessage and its following HttpContent s into a single FullHttpRequest or 
//FullHttpResponse (depending on whether it’s being used to handle requests or responses). 
//With this installed, the next ChannelHandler in the pipeline will receive only full HTTP 
//requests.
        pipeline.addLast(new HttpObjectAggregator(64 * 1024));

//Handles FullHttpRequests (those not sent to a /ws URI).
        pipeline.addLast(new HttpRequestHandler("/ws"));

//As required by the WebSocket specification, handles the WebSocket upgrade handshake, 
//PingWebSocketFrame s, PongWebSocketFrames, and CloseWebSocketFrames.
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

//Handles TextWebSocketFrames and handshakecompletion events
        pipeline.addLast(new TextWebSocketFrameHandler(group));
    }
}

12.3.4 bootstrapping

把聊天室server启动起来:

public class ChatServer {
    private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
    private final EventLoopGroup group = new NioEventLoopGroup();
    private Channel channel;

    public ChannelFuture start(InetSocketAddress address) {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(group).channel(NioServerSocketChannel.class).childHandler(createInitializer(channelGroup));
        ChannelFuture future = bootstrap.bind(address);
        future.syncUninterruptibly();
        channel = future.channel();
        return future;
    }

    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
        return new ChatServerInitializer(group);
    }

    public void destroy() {
        if (channel != null) {
            channel.close();
        }
        channelGroup.close();
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);
        final ChatServer endpoint = new ChatServer();
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

然后启动ChatServer:
mvn -PChatServer -Dport=1111 clean package exec:exec

测试

12.4.1 把聊天内容加密

在真实应用场景下,聊天内容需要加密以保证安全,对于Netty,只需要添加SslHandlerChannelPipeline并配置好就可以轻松完成。

public class SecureChatServerInitializer extends ChatServerInitializer {
    private final SslContext context;

    public SecureChatServerInitializer(ChannelGroup group, SslContext context) {
        super(group);
        this.context = context;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        super.initChannel(ch);
        SSLEngine engine = context.newEngine(ch.alloc());
        ch.pipeline().addFirst(new SslHandler(engine));
    }
}

相应地,启动代码:

public class SecureChatServer extends ChatServer {
    private final SslContext context;

    public SecureChatServer(SslContext context) {
        this.context = context;
    }

    @Override
    protected ChannelInitializer<Channel> createInitializer(ChannelGroup group) {
        return new SecureChatServerInitializer(group, context);
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("Please give port as argument");
            System.exit(1);
        }
        int port = Integer.parseInt(args[0]);
        SelfSignedCertificate cert = new SelfSignedCertificate();
        SslContext context = SslContext.newServerContext(cert.certificate(), cert.privateKey());
        final SecureChatServer endpoint = new SecureChatServer(context);
        ChannelFuture future = endpoint.start(new InetSocketAddress(port));
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                endpoint.destroy();
            }
        });
        future.channel().closeFuture().syncUninterruptibly();
    }
}

启动之后,你就可以访问https://localhost:9999来连接聊天服务器了。

13 使用UDP广播事件

UDP,User Datagram Protocol,是面向无连接的协议,常用于性能至上,丢包可以容忍的场景。

Domain Name Service(DNS)是基于UDP最知名的协议之一。

13.1 UDP基础

面向连接的TCP协议管理着连接的建立、消息的有序性和可靠性、连接的有序终止。

面向无连接的UDP,没有持续连接这样的概念,UDP每个消息(UDP数据报)都是独立的。此外,UDP也没有像TCP那样的纠错机制(TCP中接收方收到了会发ACK确认已收到,如果发送方没收到接收方的ACK则会重发。)

TCP可以类比成电话,消息有序地双向传递。
UDP类似于群发邮件,不会知道它们到达的顺序。

UDP比TCP快很多。UDP不需要三次握手,也不需要对消息进行管理。

13.2 UDP广播

目前我们的例子都是unicast,即单播,即消息的目的地是唯一的,UDP提供了额外的传播模式:

  • Multicast:多播, 传播到定义的目标主机集合。
  • Broadcast :广播,传播到网络(或者子网)上所有主机

本章的例子会演示UDP广播模式的使用,传播消息到同网络的所有主机。为了达到这个目的,我们使用需要广播地址255.255.255.255,发到这个地址的消息的目的地会是本地网络(0.0.0.0)上的所有主机,并且不会被路由器转发到其他网络。

13.3 UDP样例应用

这个应用类似于UNIX-like 操作系统中的syslog工具。https://linux.cn/article-5023-1.html

13.4 The message POJO: LogEvent

public final class LogEvent {
    public static final byte SEPARATOR = (byte) ':';
    private final InetSocketAddress source;//the InetSocket Address of the source that sent the LogEvent
    private final String logfile;// the name of the log file
    private final String msg;//message contents
    private final long received;//the time at which the LogEvent was received

    public LogEvent(String logfile, String msg) {
        this(null, -1, logfile, msg);
    }

    public LogEvent(InetSocketAddress source, long received, String logfile,
            String msg) {
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }

    public InetSocketAddress getSource() {
        return source;
    }

    public String getLogfile() {
        return logfile;
    }

    public String getMsg() {
        return msg;
    }

    public long getReceivedTimestamp() {
        return received;
    }
}

13.5 Writing the broadcaster

DatagramPacket是一个简单的消息容器,DatagramChannel的实现类们使用DatagramPacket来和远程主机通信。为了把LogEvent转换成DatagramPacket,我们需要编码器,但是我们没有必要从头开始写,我们需要继承MessageToMessageEncoder

编码器代码:

public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
    private final InetSocketAddress remoteAddress;

    public LogEventEncoder(InetSocketAddress remoteAddress) {
        this.remoteAddress = remoteAddress;
    }

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext,
            LogEvent logEvent, List<Object> out) throws Exception {
        byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
        byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
        ByteBuf buf = channelHandlerContext.alloc().buffer(
                file.length + msg.length + 1);
        buf.writeBytes(file);
        buf.writeByte(LogEvent.SEPARATOR);
        buf.writeBytes(msg);
        out.add(new DatagramPacket(buf, remoteAddress));
    }
}

启动类:

public class LogEventBroadcaster {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    private final File file;

    public LogEventBroadcaster(InetSocketAddress address, File file) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler(new LogEventEncoder(address));
        this.file = file;
    }

    public void run() throws Exception {
        Channel ch = bootstrap.bind(0).sync().channel();
        long pointer = 0;
        for (;;) {
            long len = file.length();
            if (len < pointer) {
                // file was reset
                pointer = len;
            } else if (len > pointer) {
                // Content was added
                RandomAccessFile raf = new RandomAccessFile(file, "r");
                raf.seek(pointer);
                String line;
                while ((line = raf.readLine()) != null) {
                    ch.writeAndFlush(new LogEvent(null, -1, file
                            .getAbsolutePath(), line));
                }
                pointer = raf.getFilePointer();
                raf.close();
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.interrupted();
                break;
            }
        }
    }

    public void stop() {
        group.shutdownGracefully();
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            throw new IllegalArgumentException();
        }
        LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                new InetSocketAddress("255.255.255.255",
                        Integer.parseInt(args[0])), new File(args[1]));
        try {
            broadcaster.run();
        } finally {
            broadcaster.stop();
        }
    }
}

linux 工具netcat介绍:

netcat listens on a specified port and prints all data received to standard output

下面的命令就会开启一个9999端口,并且打印它接收到的消息。
$ nc -l -u 9999

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容