基于Netty的Android端长连接设计

协议定制与数据序列化

1、长连接这里我们肯定是基于TCP的,而TCP协议其实默认已经支持长连接,但是socket连接存在随时断开的情况,这就需要有比较好的协议保障连接状态的检测。
2、定制数据序列化格式,建议使用protobuf或者thrift而不是htttp中常用的json,可以减少序列化与反序列化的开销。当然如果用一些其他的协议,你可能需要自己实现encoder decoder了,TCP是流,上层协议对TCP的流是要做分包粘包处理的,注意好对handler中channelRead和channelReadComplete的方法的复写。

基于Netty 设计的客户端架构

1、我们会需要设计一个客户端,就像netty的官方demo中做的那样,定义好bootstrap和nioEventLoopGroup。注意NioEventLoopGroup是可以复用的,线程池复用对客户端比较重要,在断线重连的时候会排上用场。
我以采用webSocket协议为例

        mClientHandler = new ClientHandler(sURI); //客户端收到分包处理完的数据,然后开始分发
        mMessageHandler = new MessageHandler(mHashMap, mBussinessCodeHelper); // 真正处理业务代码的handler
        bootstrap.group(mWorkGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .remoteAddress(sURI.getHost(), sURI.getPort());
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast(new TbbLoggerHandler());
                pipeline.addLast(new IdleStateHandler(200, 180, 0, TimeUnit.SECONDS)); //读超时与写超时检测的handler, 读超时200s比写超时时间长一些,发生读超时的时候直接断开重连了。
                pipeline.addLast(new HttpClientCodec());
                pipeline.addLast(new HttpObjectAggregator(MAX_CONTENT_LENGTH));
                pipeline.addLast(mTbbClientHandler);
                pipeline.addLast(mTbbMessageHandler);
            }
        });

        try {
            mChannel = bootstrap.connect().sync().channel();
            mChannel.closeFuture().sync(); // 会阻塞
            XGLog.logger_d(mChannel);
        } catch (Exception e) {
            XGLog.logger_d("exception " + e);
            e.printStackTrace();
        } finally {
            XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
            if (!TextUtils.isEmpty(mToken)) {
                mWorkGroup.schedule(new Runnable() {
                    @Override
                    public void run() {
                        connect();  // 断线重连,这里简单处理,就是断了以后每隔2s 尝试连接一次,其实为了省电需要限制次数并倍增间隔时间的
                    }
                }, 2, TimeUnit.SECONDS);
            }
        }

2、设计好你的handler, netty框架的运用精髓基本都在handler当中,包括处理流解包然后处理业务最后发送数据,几乎全可以包含在handler当中,客户端主动发送数据依赖于channel,简单点讲就是channel 的 writeAndFlush,向缓冲区写数据并刷新缓冲区,刷新的操作其实就是发送数据了,socket的操作本质上都抽象成IO动作。一个简单的handler的例子,不一定能正常运行,只是作为例子,最为关键的几个方法

(1) channelRead0(ChannelHandlerContext ctx, Object msg)
处理解包后的数据,也可以分发数据包给下个handler
(2) channelActivie(ChannelHandlerContext ctx)
通道建立了,这个时候相当于tcp握手了
(3) channelInActive(ChannelHandlerContext ctx)
tcp断开连接
(4) excepitonCaught(ChannelHandlerContext ctx, Throwable cause)
异常处理,最好要处理,不处理也别忘了吧throwable发给下handler,这个一定得做
(5) userEventTriggered(final ChannelHandlerContext ctx, Object evt)
处理一些自定义的事件,包括读超时写超时这样的事件,充分体现了netty事件驱动的特点

@Sharable
public class ClientHandler extends SimpleChannelInboundHandler<Object> {
    private static final int BLOCKING_QUEUE_SIZE = 1 << 12;
    private static final Queue<MCProtocolPB.MCProtocol> mQueue = new LinkedList<>();
    private static final long IDLE_TIME = (long) (5 * 1e9);
 
    /**
     * 用于 WebSocket 的握手
     */
    private WebSocketClientHandshaker mHandshaker;
    /**
     *
     */
    private ChannelPromise mChannelPromise;
    private final PingWebSocketFrame mPingWebSocketFrame = new PingWebSocketFrame();
    private final CloseWebSocketFrame mCloseWebSocketFrame = new CloseWebSocketFrame();
    private ChannelHandlerContext mChannelHandlerContext;
  

    /**
     * 唯一的构造类
     *
     * @param uri WebSocket uri
     */
    public ClientHandler(URI uri) {
        mHandshaker = WebSocketClientHandshakerFactory.newHandshaker(uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders());
    }


    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, final Object msg) throws Exception {
        if (!mHandshaker.isHandshakeComplete()) {
            try {
                mHandshaker.finishHandshake(ctx.channel(), (FullHttpResponse) msg);
                mChannelPromise.setSuccess();
                while (!mQueue.isEmpty()) {
                    ctx.writeAndFlush(mQueue.poll());
                }
                ctx.fireUserEventTriggered(Event.CONNECTED); //发送websocket协议连接正式建立的事件
            } catch (WebSocketHandshakeException e) {
                mChannelPromise.setFailure(e);
            }
        }
     
        if (msg instanceof WebSocketFrame) {
            ctx.fireChannelRead(((WebSocketFrame) msg).retain());
        }


    }

    /**
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        mChannelHandlerContext = ctx;
        mHandshaker.handshake(ctx.channel());
        ctx.writeAndFlush(mPingWebSocketFrame.retain());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        ctx.fireUserEventTriggered(Event.DISCONNECTED);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        XGLog.logger_e("channel unregistered");

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        XGLog.logger_e(cause.toString());
        super.exceptionCaught(ctx, cause);
        if (!mChannelPromise.isDone()) {
            mChannelPromise.setFailure(cause);
        }


        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        super.handlerRemoved(ctx);
        XGLog.logger_d("handler removed");
    }

    /**
     * 
     * 
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        super.handlerAdded(ctx);
        XGLog.logger_i("handler added");
        mChannelPromise = ctx.newPromise();
    }

    /**
     * 端口闲时 发送心跳包 处理的方法
     * 
     */
    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {

        if (evt instanceof IdleStateEvent) {
            final IdleStateEvent event = (IdleStateEvent) evt;
            ctx.executor().execute(new Runnable() {
                @Override
                public void run() {
                    handleIdleEvent(ctx, event);
                }
            });
            super.userEventTriggered(ctx, evt);
        } else if (Event.REQUEST_TIME_OUT.equals(evt)) {
            XGLog.logger_i("REQUEST triggered already");
        }
    }
  
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

    /**
     * 处理{@link IdleStateEvent}
     *
     * @param ctx
     * @param event
     */
    private void handleIdleEvent(final ChannelHandlerContext ctx, IdleStateEvent event) {
        IdleState state = event.state();
        if (IdleState.READER_IDLE.equals(state)) {
            XGLog.logger_e("READ IDLE");
        } else if (IdleState.WRITER_IDLE.equals(state)) {
            XGLog.logger_e("WRITE IDLE");
         ctx.writeAndFlush(mPingWebSocketFrame.retain()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        } else if (IdleState.ALL_IDLE.equals(state)) {
            XGLog.logger_e("ALL IDLE");
        }
    }


    long ticksInNanos() {
        return System.nanoTime();
    }
}

3、考虑好你的断线重连的情况,建议每次客户端发送数据后,服务端都给回包,如果链路长时间空闲,那么触发写超时事件,发送心跳包给服务端,其实也可以反过来服务端给客户端发数据,然后如果还发生读超时事件,相当于对方没有给回包,那么断开连接,尝试重连。

public class MyLoggerHandler extends LoggingHandler {
    private static final long IDLE_TIME = (long) (9.9 * 1e9);
    private long mLastWriteTime = -1;
    private ScheduledFuture mScheduledFuture;

    public MyLoggerHandler() {
        super(LogLevel.INFO);
    }
   
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        XGLog.logger_i("read message " + msg);
        long current = ticksInNanos();
        long delta = Math.abs(current - mLastWriteTime);
        if (delta < IDLE_TIME) {
            if (mScheduledFuture != null) {
                mScheduledFuture.cancel(false);
            }
        }
    }

    @Override
    public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
        super.write(ctx, msg, promise);
        XGLog.logger_i("TbbLoggerHandler write message ");
        mScheduledFuture = ctx.executor().schedule(new Runnable() {
            @Override
            public void run() {
                long current = ticksInNanos();
                long delta = Math.abs(current - mLastWriteTime);
                XGLog.logger_i("current " + current + " last " + mLastWriteTime + " delta " + delta);
                if (delta > IDLE_TIME) {
                    ctx.close();
                }
            }
        }, 10, TimeUnit.SECONDS);  // 10s 内没有收到服务端回执,断线重连
        mLastWriteTime = ticksInNanos();

    }

    long ticksInNanos() {
        return System.nanoTime();
    }
}

4、如果客户端主动发起请求,那么通过我们的Client的channel引用,可以向服务端发送数据。
5、由于netty可以主动发起事件,在netty里处理完了数据如果要更新UI或者数据库,那么你需要设计一个简单的适配层,通过事件机制来触发事情就会变得简单。

针对网络波动情况的处理

1、如果发生可以主动检测到的链路断开的情况,一定会触发channelRemoved,然后channel会变成inActive,然后那个connect().sync()也就不再阻塞了,然后往下走,我们的代码中其实已经可以主动间隔2s去重连了。NioEventLoopGroup.exectue()类似于jdk的线程池,可以定时触发一个事件。

 try {
            mChannel = bootstrap.connect().sync().channel();
            mChannel.closeFuture().sync(); // 会阻塞
            XGLog.logger_d(mChannel);
        } catch (Exception e) {
            XGLog.logger_d("exception " + e);
            e.printStackTrace();
        } finally {
            XGLog.logger_d("workerGroup shall shutdown " + TextUtils.isEmpty(mToken));
            if (!TextUtils.isEmpty(mToken)) {
                mWorkGroup.schedule(new Runnable() {
                    @Override
                    public void run() {
                        connect();  // 断线重连,这里简单处理,就是断了以后每隔2s 尝试连接一次,其实为了省电需要限制次数并倍增间隔时间的
                    }
                }, 2, TimeUnit.SECONDS);
            }
        }

2、如果发生延时很长的情况,如果发送请求10s内没有读事件发生,那么你需要考虑重新建立连接了,简单的做法就是ChannelHandlerContext.close(),利用 1 中的NioEventLoopGroup线程池 mWorkGroup定时尝试连接,如果连接成功,该线程就阻塞,只有断开的时候才会跑到需要重连的地方。
3、如果打过电话或者检测到网络切换,那么你也需要断开然后重连,因为你的在移动网IP地址基本就变了,所以重连吧,谁让我们基于TCP/IP呢。这种情况需要借助Android的一些组件比如BroadCastReceiver来检测,与netty关系不大。

以上3点针对3/4G移动、电信、联通网基本就够用了,至于2G,反正这3家公司都不用2G了,在2017年开发长连接真的是比几年前幸福太多了
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容