netty踩坑初体验

基于4.1.10.Final
目前为止踩坑最多,踩netty之前,要先踩Java NIO,因为netty还是基于Java NIO的api开发的,事件模型什么的需要有基础,这只是一个初步的研究,毕竟只是出于兴趣,比较好的坑在这里。Java NIO 系列教程

netty in action.png

netty的基本组件与各组件功能,netty核心就不介绍了,网上各种大牛的源码分析,认真看完基本就通了。本文是记录踩坑,不是教学。思路按着这个思维导图来走,实现一下简单的功能。

1.字符串传输。

netty是端对端的传输,最简单的可以使用嵌套字传输,基本功能就是hello word。假定一个场景,有一个服务端一直开着接收字符串,客户端想发送字符串到服务端,怎么做?
首先明确一点,Netty中的消息传递,都必须以字节的形式,以ByeBuff为载体传递。简单的说,就是你想直接写个字符串过去,不行,收到都是乱码,虽然Netty定义的writer的接口参数是Object的,这就是比较坑的地方了。
有了这个分析,就有思路了。
客户端就这么发:

        ByteBuf buffer = Unpooled.copiedBuffer(msg, Charset.forName("UTF-8"));
        ChannelFuture future = ctx.writeAndFlush(buffer);
        future.addListener(f ->  ctx.close());

服务器这么收:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf m = (ByteBuf) msg;
        String message = m.toString(CharsetUtil.UTF_8);
        System.out.println(message);
        ctx.close();
    }

是的可以直接强转,原理不明,不知道哪里进行了装箱。---------- 遗留问题1

但是每次都要这么写是不是有点麻烦?不是有定义好的编码与解码器吗,那么就可以先加一下处理,两边都这么加

                   new ChannelInitializer<SocketChannel>(){
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            ch.pipeline().addLast("encoder", new StringEncoder());
                            ch.pipeline().addLast(new ServerHandler());
                        }
                    }

对的直接放上去,编码与解码不用关心顺序,处理类放在最后就好了。

客户端:

        ChannelFuture future = ctx.writeAndFlush("hello world");
        future.addListener(f ->  ctx.close());

服务器端

   
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(msg);
        ctx.close();
    }

对,直接就是收到一个String。至此字符串就可以互相传递了,但是还有问题,netty中在传送字符串的长度有限制,超过1024个字节就截断了,导致接收的信息不完整,ok要这么处理一下。

new ChannelInitializer<SocketChannel>(){
     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
     ch.pipeline().addLast("decoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));
     ch.pipeline().addLast("encoder", new LengthFieldPrepender(4, false));
     ch.pipeline().addLast(new ServerHandler());
 }      

自定义长度, 4个字节32位,足够存了。

2.传递简单对象

有几种方法可以实现对象的传递,这里用的是protocol buffers编解码器进行对象传递。
首先要了解什么是protocol buffers,这东西就相当于xsd对于xml,是一个规则文件,通过.proto文件通过官方提供的工具就可以生成java类,他有两个常用方法

public byte[] toByteArray(){};
T parseFrom(byte[]){};

他提供了序列化方法,直接把类转化为字节数组,再把数据转为java类,十分方便。netty是天生支持这种序列化方式的
服务器端:

         @Override
         protected void initChannel(SocketChannel ch) throws Exception {
        /**
        * 采用Base 128 Varints进行编码,在消息头上加上32个整数,来标注数据的长度。
        */
        ch.pipeline().addLast("protobufVarint32FrameDecoder", new ProtobufVarint32FrameDecoder());
        ch.pipeline().addLast("protobufDecoder", new ProtobufDecoder(AddressBookProtos.AddressBook.getDefaultInstance()));

        /**
         * 对采用Base 128 Varints进行编码的数据解码
         */
        ch.pipeline().addLast("protobufVarint32LengthFieldPrepender", new ProtobufVarint32LengthFieldPrepender());
        ch.pipeline().addLast("protobufEncoder", new ProtobufEncoder());
        ch.pipeline().addLast(new ServerHandler());
        }

增加已经提供了的解码、编码器,在业务处理的handle中可以这么拿数据。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        AddressBookProtos.AddressBook addressBookProtos = (AddressBookProtos.AddressBook)msg;
        List<AddressBookProtos.Person> list = addressBookProtos.getPeopleList();
    }

任然是可以直接强转成目标对象。然后获取里面的成员变量。

客户端:
在管道里加上那4个编码、解码器。然后在业务代码中这样定义数据并且直接塞到ctx中就可以了。其余的根本不用操心,都封装好了,我们只需要关心自己的业务实现。

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        AddressBookProtos.AddressBook pb = AddressBookProtos.AddressBook.newBuilder()
                .addPeople(
                        AddressBookProtos.Person.newBuilder().setEmail("345@qq.com").setId(34).setName("zhangsn")
                )
                .addPeople(
                        AddressBookProtos.Person.newBuilder().setEmail("123@163.com").setId(12).setName("lisi")
                )
                .build();

        ChannelFuture future = ctx.writeAndFlush(pb);
        future.addListener(f ->  ctx.close());
    }

3.使用http协议

Netty对http协议有自己的抽象,把一个FullHttpRequest抽象成了HttpRequest、HttpContent、LastHttpContent。生成一个http request也有点不同。例子演示了,客户端发送http请求,服务端接收并发送http响应到客户,客户端接收响应之后断开连接。
服务端:

            new ChannelInitializer<SocketChannel>(){
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
               /*
               * https部分
                File certificate = new File("/Users/public.crt"); // 证书
                File privateKey = new File("/Users/private.pem"); // 私钥
                final SslContext context = SslContextBuilder.forServer(certificate, privateKey).build();
                SSLEngine engine = context.newEngine(ch.alloc());
                ch.pipeline().addLast(new SslHandler(engine));
                */

//                            ch.pipeline().addLast("decoder", new HttpRequestDecoder());
//                            ch.pipeline().addLast("encoder", new HttpResponseEncoder());
                ch.pipeline().addLast(new HttpServerCodec()); //等于上面那两个
                ch.pipeline().addLast(new HttpObjectAggregator(512 * 1024)); //聚合把头部和content聚合在了一起
                ch.pipeline().addLast(new HttpServiceHandle());
            }
        }

如果不使用聚合,那么在接收的时候会多次触发read方法,第一次接收HttpRequest,之后接收HttpContent内容。使用聚合HttpServerCodec之后,接收的参数即有HttpRequest也有HttpContent。

客户端发送与接收:

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        URI uri = new URI("http://127.0.0.1:8889");
        String msg = "Message from client";
        DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
                uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));

        // 构建http请求
        request.headers().set(HttpHeaders.Names.HOST, "127.0.0.1");
        request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
        request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
        // 发送http请求
        ctx.writeAndFlush(request);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HttpResponse) {
            HttpResponse response = (HttpResponse) msg;
            System.out.println("CONTENT_TYPE:" + response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
        }
        if(msg instanceof HttpContent) {
            HttpContent content = (HttpContent)msg;
            ByteBuf buf = content.content();
            System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
            buf.release();
        }
    }

服务端接收:

    private HttpRequest request;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //http 如果请求没有聚合,则分段传输过来
        if (msg instanceof HttpRequest) {
            request = (HttpRequest) msg;
            String uri = request.uri();
            System.out.println("Uri:" + uri);
        }

        if (msg instanceof HttpContent) {
            HttpContent content = (HttpContent) msg;
            ByteBuf buf = content.content();
            System.out.println(buf.toString(io.netty.util.CharsetUtil.UTF_8));
            buf.release();

            String res = "response from server";
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1,
                    OK, Unpooled.wrappedBuffer(res.getBytes("UTF-8")));
            response.headers().set(CONTENT_TYPE, "text/plain");
            response.headers().set(CONTENT_LENGTH, response.content().readableBytes());
            if (HttpHeaders.isKeepAlive(request)) {
                response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
            }
            ctx.write(response);
            ctx.flush();
        }
    }

4.心跳

这个东西应该是与HTTP长连接或者是websocket一起的,这里独立出来了。
服务端:

                    {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new IdleStateHandler(10, 0, 0, TimeUnit.SECONDS));
                            ch.pipeline().addLast(new WebSocketServiceHandle());
                        }
                    }

handle,如果10秒内没有触发读,那么就会触发userEventTriggered方法。

    int dealTime = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
        String ip = socketAddress.getHostName() + ":" + socketAddress.getPort();

        ByteBuf byteBuf = (ByteBuf)msg;
        String message = byteBuf.toString(CharsetUtil.UTF_8);
        System.out.println(ip + ":" + message);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (dealTime == 2){
            System.out.println("关喽");
            ctx.channel().close();
        }
        dealTime++;
        String recall = "are you alive?" ;
        ByteBuf buffer = Unpooled.copiedBuffer(recall, Charset.forName("UTF-8"));
        ctx.writeAndFlush(buffer);
        super.userEventTriggered(ctx, evt);
    }

客户端:就是一个简单的应该,连接上之后什么也不干,干等10秒,等待服务端发来询问。

     @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf)msg;
        String message = byteBuf.toString(CharsetUtil.UTF_8);
        System.out.println("message from service: " + message);

        String recall = "hello service i am alive";
        ByteBuf buffer = Unpooled.copiedBuffer(recall, Charset.forName("UTF-8"));
        ctx.writeAndFlush(buffer);
    }

文件读写、websocket、最终demo。。咕咕咕

相关连接:
[netty]--最通用TCP黏包解决方案:LengthFieldBasedFrameDecoder和LengthFieldPrepender
Protocol Buffer的基本使用(六)
Protocol Buffer 语法(syntax)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,656评论 18 139
  • Netty的简单介绍 Netty 是一个 NIO client-server(客户端服务器)框架,使用 Netty...
    AI乔治阅读 8,407评论 1 101
  • 计算机网络概述 网络编程的实质就是两个(或多个)设备(例如计算机)之间的数据传输。 按照计算机网络的定义,通过一定...
    蛋炒饭_By阅读 1,224评论 0 10
  • github链接:https://github.com/LantaoYu/SeqGAN论文及appendix里有很...
    yingtaomj阅读 1,738评论 0 2
  • 妍最后一次见他是在他母亲的葬礼上。 妍看他悲痛的表情,心中愧疚。 林南生,对不起。 林南生只是...
    柯家妍阅读 435评论 0 0