RxNetty 在android上的使用之 TCP 长连接

原文地址
在上个项目开始我们开始使用tcp异步通信机制来实现所需要的功能,使用异步的方式主要的好处能够不阻塞,以便能在接收数据的时候更加流畅,我们选用了netty异步通讯框架来实现这个功能,之前我写了一篇关于netty实现异步通讯长连接的文章,但是在使用中我发现,有的时候会莫名其妙的报连接断开的现象,而且代码逻辑也不是特别好,之后我在github上发现rxnetty这个库不错(主要是最近在学习rx……的使用,所以在使用netty的时候就想会不会有一个使用rxjava实现的netty框架呢?于是在github上发现真的有 哈哈哈);这个框架可以更根据设置读取数据的时间就可以自己尝试重连操作;本文中使用的代码完全是在上一篇netty实现长链接: http://www.jianshu.com/p/2dfecc719cd5 的基础上修改简化的,

package com.jcy.nettyserver;

import android.os.Handler;
import android.os.Message;
import android.util.Log;

import com.alibaba.fastjson.JSON;
import com.jcy.data.ReceiveData;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.LineEncoder;
import io.netty.handler.codec.string.LineSeparator;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.protocol.tcp.client.TcpClient;
import rx.Observable;
import rx.functions.Action1;

/**
 * @className: RxNettyManager
 * @desc:
 * @author: Jiangcy
 * @datetime: 2016/8/2
 */
public enum RxNettyManager {
    instance;
    //多长时间为请求后,发送心跳
    private Connection<String, String> mConnection;
    private String serverIP;
    private int port;
    private static final String TAG = "NettyManager";
    private String HEAT_STRING;//心跳数据
    private String LOGIN_STRING;//登陆数据
    private boolean isOnLine = false;
    private String heartAction;
    private int spacingTime = 5;
    private NettyListener mListener;


    RxNettyManager() {
    }


    /**
     * 初始化RxNetty异步通信库
     * @param serverip      服务器IP
     * @param port          通讯端口号
     * @param login         登陆数据
     * @param heartStr      心跳数据
     * @param heartAction  心跳数据Action
     * @param spacingTime  心跳间隔
     */
    public void init(String serverip, int port,String login, String heartStr, String heartAction, int spacingTime) {
        this.serverIP=serverip;
        this.port=port;
        LOGIN_STRING = login;
        HEAT_STRING = heartStr;
        this.heartAction = heartAction;
        this.spacingTime=spacingTime;
    }


    private Handler mHandler = new Handler(){
        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            mHandler.removeMessages(0);
            send(HEAT_STRING);
        }
    };

    /**
     * 客户端连接服务器
     */
    public void connectServer() {
        if (LOGIN_STRING.equals("") || LOGIN_STRING == null) {
            new RuntimeException("LOGIN_STRING isEmpty please init first");
            return;
        }
        if (HEAT_STRING.equals("") || HEAT_STRING == null) {
            new RuntimeException("HEAT_STRING isEmpty  please init first");
            return;
        }
        SocketAddress socketAddress = new InetSocketAddress(serverIP, port);
        Log.e(TAG, " rxNettyClientConnect  socketAddress : " + socketAddress.toString());
        TcpClient.newClient(socketAddress)
                .<String, String>pipelineConfigurator(new Action1<ChannelPipeline>() {
                    @Override
                    public void call(ChannelPipeline pipeline) {
                        //这部分设置一定要和服务器上设置相同,要不然会出现无法连接的现象
                        // Decoders
                        pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(1024 * 1024 *
                                1024));
                        pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
                        // Encoder
                        pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
                        pipeline.addLast("lineEncoder", new LineEncoder(LineSeparator.UNIX,
                                CharsetUtil.UTF_8));
                    }
                })
                .channelOption(ChannelOption.SO_KEEPALIVE, true)
//设置读取超时时间
                .readTimeOut(spacingTime+3, TimeUnit.SECONDS)
                .createConnectionRequest()
                .subscribe(newConnection -> {
                    mConnection = newConnection;
                    mConnection.getInput().subscribe(message -> {
                        Log.e(TAG, "receive : " + message);
                        String action = JSON.parseObject(message, ReceiveData.class).action;
                        //根据心跳进行心跳保活发送数据
                        if (heartAction.equals(action))
                            mHandler.sendEmptyMessageDelayed(0,spacingTime*1000);
                        if (mListener != null) mListener.reciveData(action, message);
                    }, e -> reconnect(e));
                }, e -> reconnect(e), () -> {
                    if (mListener != null) mListener.onLine();
                    isOnLine = true;
                    Log.e(TAG, "connect success");
                    send(LOGIN_STRING);
                });

    }

    /**
     * 断开自动重新连接
     */
    private void reconnect(Throwable e) {
        mHandler.removeCallbacksAndMessages(null);
        //延迟spacingTime秒后进行重连
        Observable.timer(spacingTime, TimeUnit.SECONDS).subscribe(l -> {
            if (mConnection != null) mConnection.closeNow();
            Log.e(TAG, "reconnect");
            if (isOnLine) {
                if (mListener != null) mListener.offLine();
            }
            isOnLine = false;
            connectServer();
        });
    }


    public void setListener(NettyListener listener) {
        mListener = listener;
    }

    /**
     * 发送数据
     *
     * @param s
     */
    public boolean send(String s) {
        if(mConnection==null)return false;
        Log.w(TAG, "send : " + s);
//在发送的时候一定要将subscribe实现,开始在使用write函数的时候由于rxjava知识不牢,导致只是实现
//writeString函数没有实现观察者,导致根本没有发送,所以之前一直是发送失败;
        mConnection.writeString(Observable.just(s)).subscribe(v -> {
//暂时还不知道是干啥用的
        }, e -> {
//发送失败,出现异常
        }, () -> {
//发送成功
        });
        return true;
    }

    /**
     * 停止服务
     */
    public void stopServer() {
        mHandler.removeCallbacksAndMessages(null);
        if (mConnection != null) {
            mConnection.closeNow();
            mConnection=null;
        }
    }

}

使用rxnetty只需要简单的几句话就能解决netty tcp长连接的实现过程,由于没有handler所以使用一个方法再其他程序中使用能够更加的方便;还有就是在使用可以通过使Lambda 表达式来简化代码,使代码的可读性能够增加,还有就是在使用长链接的时候推荐使用rxAndroid,使用这个方法最主要的好处就是子线程可以很容易的将数据发送到主线程来更新界面;
关于Lambda 在android中的使用
主要是在android studio的project的build中

dependencies {   
classpath 'com.android.tools.build:gradle:2.1.2'    
classpath 'me.tatarka:gradle-retrolambda:3.2.5'  
}

并且在使用的moudle中声明

apply plugin: 'com.android.application'
//加入plugin声明
apply plugin: 'me.tatarka.retrolambda'
android {
    compileSdkVersion 24
    buildToolsVersion "24.0.0"

    defaultConfig {
        applicationId "com.jcy.rxnetty"
        minSdkVersion 15
        targetSdkVersion 24
        versionCode 1
        versionName "1.0"
    }
    buildTypes {
        release {
            minifyEnabled false
            proguardFiles getDefaultProguardFile('proguard-android.txt'), 'proguard-rules.pro'
        }
    }
    //加入compileOptions,这会让IDE使用用JAVA8语法解析
    compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }
}

以下关于导入rxnetty包出现问题的解决方案来自以为网友写的博客
把RxNetty的tcp包加入到依赖,直接这样编译会有两个问题,第一个问题是jar重复:

com.android.build.api.transform.TransformException: com.android.builder.packaging.DuplicateFileException: Duplicate files 
copied in APK THIRD-PARTYFile1: 
C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-
core\1.11.2\f4f8cd9874f5cdbc272b715a381c57e65f67ddf2\jmh-core-1.11.2.jarFile2: 
C:\Users\XXX.gradle\caches\modules-2\files-2.1\org.openjdk.jmh\jmh-generator-
annprocess\1.11.2\72d854bf76ba5e59596d4c887a6de48e7003bee2\
jmh-generator-annprocess-1.11.2.jar



dependencies {
 ... 
compile('io.reactivex:rxnetty-tcp:0.5.2-RC1') 
{ exclude group: 'org.openjdk.jmh' }
 ...
}

另一个问题是引用的netty包中META-INF/下的部分文件重复。

packagingOptions {
    ...
    exclude 'META-INF/INDEX.LIST'
    exclude 'META-INF/BenchmarkList'
    exclude 'META-INF/io.netty.versions.properties'
    exclude 'META-INF/CompilerHints'
    ...
  }

本文中还是使用了两个库文件一个是rxjava和fastjson

  compile 'io.reactivex:rxjava:1.0.14'

   compile 'com.alibaba:fastjson:1.2.15'

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,451评论 25 707
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,158评论 6 151
  • 今天总感觉犯困,回家的路上在地铁里还有点想吐的感觉,真想晚上可以早点睡觉。 晚饭后,因为下雨,小鬼也没吵去散步,拿...
    mylittle阅读 220评论 0 0
  • 7月19日 星期三 天气:晴 今天趁儿子睡觉的工夫,看了看女儿的书《情感读本》,上面的故事很感人,看...
    官越妈妈阅读 102评论 0 3