RxJava | 把 Socket 撸成 RxSocket

RxJava.png

0. 概述

最近,公司的项目需要重构代码,因为个人最近接触了Rx这个高大上的东东,
而,项目原来是满满的回调地狱...项目是基于Socket的长连接,所以呢,
从最底层开始,把普通的Socket撸成Rx!
简单的示例用法

更新 Log
1 .2017年1月30日21:44:20

  • 增加一个简单的示例,修复一些bug,这个RxSocket已经用在了我公司的项目里面,算是踩过坑了。

1. 设计模式

首先,这个RxSocket是唯一的,也就是,全局唯一咯,
嗯,设计成单例。

public static RxSocket getInstance() {
        RxSocket rxSocket = defaultInstance;
        if (defaultInstance == null) {
            synchronized (RxSocket.class) {
                rxSocket = defaultInstance;
                if (defaultInstance == null) {
                    rxSocket = new RxSocket();
                    defaultInstance = rxSocket;
                }
            }
        }
        return rxSocket;
    }

双重加锁型的单例。

2. 对外的接口/方法

Socket,第一个想到就是连接,读写,而,我们外界想知道的,就只是是否
写,连接是否成功,和读到啥数据。所以定义:

public Observable<Boolean> connectRx(String ip, int port);
public Observable<Boolean> disConnect();
public Observable<byte[]> read();
public Observable<Boolean> write(ByteBuffer buffer);

还有一点,应该要有一个方法,让外界知道,这个Socket的状态,也就是监听方法:
public Observable<SocketStatus> socketStatusListener ();

3. 具体代码实现

/**
 * <pre>
 *     author: 栗子酱
 *     blog  : http://www.jianshu.com/u/a0206b5f4526
 *     time  : 2017年1月30日17:00:21
 *     desc  :  RxSocket
 *     thanks To:
 *     dependent on:
 *     update log:
 *          1.  2016年12月20日16:35:31     修复Socket被远程关闭后,不断接收关闭消息的bug     by 栗子酱
 *          2.  2017年1月28日22:38:32      修复:Write中,返回值为 0 的时候,是正常的,只是写了 长度 为0的包。     by  栗子酱
 *          3.  2017年1月29日13:52:35      增加:disConnect()
 * </pre>
 */

public class RxSocket {

    /*  常量
    * */
    private String TAG = "RxSocket";
    private boolean OpenLog = false;
    private long WRITE_TIME_OUT = 3000;
    private long CONNECT_TIME_OUT = 3000;

    /*  单例
    * */
    private Subject<Object,byte[]> readSubject;
    private Subject<Object,SocketStatus> connectStatus;
    private static volatile RxSocket defaultInstance;
    private RxSocket() {
        readSubject = new SerializedSubject(PublishSubject.create());
        connectStatus = new SerializedSubject(PublishSubject.create());
    }
    public static RxSocket getInstance() {
        RxSocket rxSocket = defaultInstance;
        if (defaultInstance == null) {
            synchronized (chestnut.RxSocket.RxSocket.class) {
                rxSocket = defaultInstance;
                if (defaultInstance == null) {
                    rxSocket = new RxSocket();
                    defaultInstance = rxSocket;
                }
            }
        }
        return rxSocket;
    }

    /*  变量
    * */
    private SocketStatus socketStatus = SocketStatus.DIS_CONNECT;
    private Selector selector = null;
    private SocketChannel socketChannel = null;
    private SelectionKey selectionKey = null;
    private ReadThread readThread = null;
    private boolean isReadThreadAlive = true;
    private SocketReconnectCallback socketReconnectCallback = null;

    /*  方法
    * */
    /**
     * 监听Socket的状态
     * @return Rx SocketStatus 状态
     */
    public Observable<SocketStatus> socketStatusListener () {
        return connectStatus;
    }

    /**
     * 建立Socket连接,只是尝试建立一次
     * @param ip    IP or 域名
     * @param port  端口
     * @return  Rx true or false
     */
    public Observable<Boolean> connectRx(String ip, int port) {
        return Observable
                .create(new Observable.OnSubscribe<Boolean>() {
                    @Override
                    public void call(Subscriber<? super Boolean> subscriber) {

                        LogUtils.i(OpenLog,TAG,"connectRx:"+"status:"+socketStatus.name());

                        //正在连接
                        if (socketStatus == SocketStatus.CONNECTING) {
                            subscriber.onNext(false);
                            subscriber.onCompleted();
                            return;
                        }

                        //未连接 | 已经连接,关闭Socket
                        socketStatus = SocketStatus.DIS_CONNECT;
                        isReadThreadAlive = false;
                        readThread = null;
                        if (selector!=null)
                            try {
                                selector.close();
                            } catch (Exception e) {
                                LogUtils.i(OpenLog,TAG,"selector.close");
                            }
                        if (selectionKey!=null)
                            try {
                                selectionKey.cancel();
                            } catch (Exception e) {
                                LogUtils.i(OpenLog,TAG,"selectionKey.cancel");
                            }
                        if (socketChannel!=null)
                            try {
                                socketChannel.close();
                            } catch (Exception e) {
                                LogUtils.i(OpenLog,TAG,"socketChannel.close");
                            }

                        //重启Socket
                        isReadThreadAlive = true;
                        readThread = new ReadThread(ip,port);
                        readThread.start();
                        socketReconnectCallback = new SocketReconnectCallback() {
                            @Override
                            public void onSuccess() {
                                LogUtils.i(OpenLog,TAG,"connectRx:"+"CONNECTED");
                                socketStatus = SocketStatus.CONNECTED;
                                subscriber.onNext(true);
                                subscriber.onCompleted();
                            }

                            @Override
                            public void onFail(String msg) {
                                LogUtils.i(OpenLog,TAG,"connectRx:"+msg);
                                subscriber.onNext(false);
                                subscriber.onCompleted();
                            }
                        };
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .map(aBoolean -> {
                    socketReconnectCallback = null;
                    return aBoolean;
                })
                .timeout(CONNECT_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));
    }

    /**
     * 断开当前的Socket
     *  还能再继续连接
     * @return Rx true or false
     */
    public Observable<Boolean> disConnect() {
        return Observable.create(new Observable.OnSubscribe<Boolean>() {
            @Override
            public void call(Subscriber<? super Boolean> subscriber) {
                try {
                    if (socketStatus == SocketStatus.DIS_CONNECT) {
                        subscriber.onNext(true);
                        subscriber.onCompleted();
                    }
                    else {
                        socketStatus = SocketStatus.DIS_CONNECT;
                        isReadThreadAlive = false;
                        readThread = null;
                        if (selector!=null)
                            try {
                                selector.close();
                            } catch (Exception e) {
                                LogUtils.i(OpenLog,TAG,"selector.close");
                            }
                        if (selectionKey!=null)
                            try {
                                selectionKey.cancel();
                            } catch (Exception e) {
                                LogUtils.i(OpenLog,TAG,"selectionKey.cancel");
                            }
                        if (socketChannel!=null)
                            try {
                                socketChannel.close();
                            } catch (Exception e) {
                                LogUtils.i(OpenLog,TAG,"socketChannel.close");
                            }
                        subscriber.onNext(true);
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onNext(false);
                    subscriber.onCompleted();
                }
            }
        });
    }

    /**
     * 读取Socket的消息
     * @return  Rx error 或者 有数据
     */
    public Observable<byte[]> read() {
        return readSubject;
    }

    /**
     * 向Socket写消息
     * @param buffer    数据包
     * @return  Rx true or false
     */
    public Observable<Boolean> write(ByteBuffer buffer) {
        return Observable
                .create(new Observable.OnSubscribe<Boolean>() {
                    @Override
                    public void call(Subscriber<? super Boolean> subscriber) {
                        if (socketStatus != SocketStatus.CONNECTED) {
                            LogUtils.i(OpenLog, TAG, "write." + "SocketStatus.DISCONNECTED");
                            subscriber.onNext(false);
                            subscriber.onCompleted();
                        }
                        else {
                            if (socketChannel!=null && socketChannel.isConnected()) {
                                try {
                                    int result = socketChannel.write(buffer);
                                    if (result<0) {
                                        LogUtils.i(OpenLog, TAG, "write." + "发送出错");
                                        subscriber.onNext(false);
                                        subscriber.onCompleted();
                                    }
                                    else {
                                        LogUtils.i(OpenLog, TAG, "write." + "success!");
                                        subscriber.onNext(true);
                                        subscriber.onCompleted();
                                    }
                                } catch (Exception e) {
                                    LogUtils.i(OpenLog,TAG,"write."+e.getMessage());
                                    subscriber.onNext(false);
                                    subscriber.onCompleted();
                                }
                            }
                            else {
                                LogUtils.i(OpenLog,TAG,"write."+"close");
                                subscriber.onNext(false);
                                subscriber.onCompleted();
                            }
                        }
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .timeout(WRITE_TIME_OUT, TimeUnit.MILLISECONDS, Observable.just(false));
    }

    /**
     * 获取Socket的链接状态
     * @return  状态
     */
    public SocketStatus getSocketStatus() {
        return socketStatus;
    }

    /*  类 && 枚举 && 接口
        * */
    private class ReadThread extends Thread {
        private String ip;
        private int port;
        ReadThread(String ip, int port) {
            this.ip = ip;
            this.port = port;
        }
        @Override
        public void run() {
            LogUtils.i(OpenLog,TAG,"ReadThread:"+"start");
            while (isReadThreadAlive) {
                //连接
                if (socketStatus == SocketStatus.DIS_CONNECT) {
                    try {
                        if (selectionKey != null) selectionKey.cancel();
                        socketChannel = SocketChannel.open();
                        socketChannel.configureBlocking(false);
                        selector = Selector.open();
                        socketChannel.connect(new InetSocketAddress(ip, port));
                        selectionKey = socketChannel.register(selector, SelectionKey.OP_CONNECT);
                        socketStatus = SocketStatus.CONNECTING;
                        connectStatus.onNext(SocketStatus.CONNECTING);
                    } catch (Exception e) {
                        isReadThreadAlive = false;
                        socketStatus = SocketStatus.DIS_CONNECT;
                        connectStatus.onNext(SocketStatus.DIS_CONNECT);
                        LogUtils.e(OpenLog, TAG, "ReadThread:init:" + e.getMessage());
                        if (socketReconnectCallback!=null)
                            socketReconnectCallback.onFail("SocketConnectFail1");
                    }
                }
                //读取
                else if (socketStatus == SocketStatus.CONNECTING || socketStatus  == SocketStatus.CONNECTED) {
                    try {
                        selector.select();
                        Iterator<SelectionKey> it = selector.selectedKeys().iterator();
                        while (it.hasNext()) {
                            SelectionKey key = it.next();
                            if (key.isConnectable()) {
                                if (socketChannel.isConnectionPending()) {
                                    try {
                                        socketChannel.finishConnect();
                                        socketStatus = SocketStatus.CONNECTED;
                                        connectStatus.onNext(SocketStatus.CONNECTED);
                                        socketChannel.configureBlocking(false);
                                        socketChannel.register(selector, SelectionKey.OP_READ);
                                        if (socketReconnectCallback!=null)
                                            socketReconnectCallback.onSuccess();
                                    } catch (Exception e) {
                                        isReadThreadAlive = false;
                                        socketStatus = SocketStatus.DIS_CONNECT;
                                        connectStatus.onNext(SocketStatus.DIS_CONNECT);
                                        LogUtils.e(OpenLog, TAG, "ReadThread:finish:" + e.getMessage());
                                        if (socketReconnectCallback!=null)
                                            socketReconnectCallback.onFail("SocketConnectFail2");
                                    }
                                }
                            } else if (key.isReadable()) {
                                ByteBuffer buf = ByteBuffer.allocate(10000);
                                int length = socketChannel.read(buf);
                                if (length <= 0) {
                                    LogUtils.e(OpenLog, TAG, "服务器主动断开链接!");
                                    isReadThreadAlive = false;
                                    socketStatus = SocketStatus.DIS_CONNECT;
                                    connectStatus.onNext(SocketStatus.DIS_CONNECT);
                                    if (socketReconnectCallback!=null)
                                        socketReconnectCallback.onFail("SocketConnectFail3");
                                } else {
                                    LogUtils.i(OpenLog, TAG, "readSubject:msg!"+ "length:" + length);
                                    byte[] bytes = new byte[length];
                                    for (int i = 0; i < length; i++) {
                                        bytes[i] = buf.get(i);
                                    }
                                    readSubject.onNext(bytes);
                                }
                            }
                        }
                        it.remove();
                    } catch (Exception e) {
                        isReadThreadAlive = false;
                        socketStatus = SocketStatus.DIS_CONNECT;
                        connectStatus.onNext(SocketStatus.DIS_CONNECT);
                        LogUtils.e(OpenLog, TAG, "ReadThread:read:" + e.getMessage());
                        if (socketReconnectCallback!=null)
                            socketReconnectCallback.onFail("SocketConnectFail4");
                    }
                }
            }
        }
    }

    public enum SocketStatus {
        DIS_CONNECT,
        CONNECTING,
        CONNECTED,
    }

    private interface SocketReconnectCallback {
        void onSuccess();
        void onFail(String msg);
    }
}

额,好像有点长,这个Socket是NIO包的Socket,里面只是开启了一条线程。

4. 注意

  • 之所以放出一个监听方法,我想的是,Socket连接上后,有可能会被断开,
    这样,就需要做一个重连的策略,当然,这个策略看项目的要求,
    因而,我把其对外开放了。你可以监听这个方法,去做Socket的重连策略。

  • RxSokcet的读方法,需要注意,要在适当的时候去解除订阅。
    还有,Socket状态的监听也是。

  • 最后,有哪些不合理的地方,各位大老要好好教导一下小弟~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,099评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,828评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,540评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,848评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,971评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,132评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,193评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,934评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,376评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,687评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,846评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,537评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,175评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,887评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,134评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,674评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,741评论 2 351

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,892评论 25 707
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,642评论 18 139
  • ▲ 凭借这两天刷屏的母亲节广告《一九三一》,百雀羚又火了一把。 ▲这支主打“与时间对抗”、长图文形式的民国谍战风广...
    华商CEO阅读 9,697评论 0 2
  • 一号的时候,朋友叫去绿野音乐节。本来是不怎么想去。但是估摸着在家也没啥事就去了,去了不到二十分钟,手机和卡包...
    杨荣瓒阅读 275评论 0 1
  • “人生本来就有很多事是徒劳无功的”就像青春,虽然徒劳,却在人的心里住的最久,挥之不去,却怀念那段青涩年华,怀念它的...
    米粒的世界阅读 360评论 0 1