基于okhttp和RxJava封装的自动重连的WebSocket

*本篇文章已授权微信公众号 guolin_blog (郭霖)独家发布

一 . 概述

1. RxWebSocket是一个基于okhttp和RxJava封装的WebSocket客户端,此库的核心特点是 除了手动关闭WebSocket(就是RxJava取消订阅),WebSocket在异常关闭的时候(onFailure,发生异常,如WebSocketException等等),会自动重连,永不断连.其次,对WebSocket做的缓存处理,同一个URL,共享一个WebSocket.

2. 由于是基于RxJava封装,所以带来了无限可能,可以和RxBinding,Rxlifecycle一起使用,方便对WebSocket的管理.

项目地址: 欢迎star


效果图

demo效果

重连

重连

项目已经上传Jcenter,依赖方法:

//本项目
compile 'com.dhh:websocket:1.3.0'

//由于项目是基于okhttp,RxJava,RxAndroid编写,所以还需加入如下依赖.
//okhttp,RxJava,RxAndroid
compile 'com.squareup.okhttp3:okhttp:3.9.0'
compile 'io.reactivex:rxjava:1.3.1'
compile 'io.reactivex:rxandroid:1.2.1'

二 . 使用方法

0. 初始化,可以也忽略直接使用.

如果你想使用自己的okhttpClient:

        OkHttpClient yourClient = new OkHttpClient();
        RxWebSocketUtil.getInstance().setClient(yourClient);

是否打印日志:

RxWebSocketUtil.getInstance().setShowLog(BuildConfig.DEBUG);

1.获取一个WebSocket,接收消息,多种方式:

RxWebSocketUtil.getInstance().getWebSocketInfo(url)
                        .subscribe(new Action1<WebSocketInfo>() {
                            @Override
                            public void call(WebSocketInfo webSocketInfo) {
                                mWebSocket = webSocketInfo.getWebSocket();
                                Log.d("MainActivity", webSocketInfo.getString());
                                Log.d("MainActivity", "ByteString:" + webSocketInfo.getByteString());
                            }
                        });

mWebSocket.send("hello word");

        //get StringMsg
        RxWebSocketUtil.getInstance().getWebSocketString(url)
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                    }
                });
        // get ByteString
        RxWebSocketUtil.getInstance().getWebSocketByteString(url)
                .subscribe(new Action1<ByteString>() {
                    @Override
                    public void call(ByteString byteString) {

                    }
                });
        //get WebSocket
        RxWebSocketUtil.getInstance().getWebSocket(url)
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) {

                    }
                });
       // 带timeout的WebSocket,当在指定时间内没有收到消息,就重连WebSocket.为了适配小米平板.
       //小米平板测试的时候,出现网络断连,不发送错误,导致不能重连
        RxWebSocketUtil.getInstance().getWebSocketInfo(url,10, TimeUnit.SECONDS)
                .subscribe(new Action1<WebSocketInfo>() {
                    @Override
                    public void call(WebSocketInfo webSocketInfo) {
                        
                    }
                });       

2. 发送消息:

  //用WebSocket的引用直接发
  mWebSocket.send("hello word");

  //url 对应的WebSocket已经打开可以这样send,否则报错
  RxWebSocketUtil.getInstance().send(url, "hello");
  RxWebSocketUtil.getInstance().send(url, ByteString.EMPTY);

  //异步发送,若WebSocket已经打开,直接发送,若没有打开,打开一个WebSocket发送完数据,直接关闭.
  RxWebSocketUtil.getInstance().asyncSend(url, "hello");
  RxWebSocketUtil.getInstance().asyncSend(url, ByteString.EMPTY);

3.关闭WebSocket

项目是依托RxJava实现的,所以关闭WebSocket的方法也就是在适当的时候注销 Observable,项目里的demo里,写了一个简单的lifecycle,将Observable生命绑定到Activity的onDestroy,自动注销.代码细节请看demo,因为内部实现了同一个URL的WebSocket共享机制,所以当外部所有持有这个URL的Observable都注销后,这个WebSocket连接就会自动关闭.请看原理解析部分.下面两种常用注销方法:

       //注意取消订阅,有多种方式,比如 rxlifecycle
                        mSubscription = RxWebSocketUtil.getInstance().getWebSocketInfo(url)
                                .subscribe(new Action1<WebSocketInfo>() {
                                    @Override
                                    public void call(WebSocketInfo webSocketInfo) {
                                        mWebSocket = webSocketInfo.getWebSocket();
                                        if (webSocketInfo.isOnOpen()) {
                                            Log.d("MainActivity", " on WebSocket open");
                                        } else {

                                            String string = webSocketInfo.getString();
                                            if (string != null) {
                                                Log.d("MainActivity", string);
                                                textview.setText(Html.fromHtml(string));

                                            }

                                            ByteString byteString = webSocketInfo.getByteString();
                                            if (byteString != null) {
                                                Log.d("MainActivity", "webSocketInfo.getByteString():" + byteString);

                                            }
                                        }
                                    }
                                });
    //注销
    if (mSubscription != null) {
            mSubscription.unsubscribe();
        }

//lifecycle注销,详情看demo
        RxWebSocketUtil.getInstance().getWebSocketString(url)
                .compose(this.<String>bindOnActivityEvent(ActivityEvent.onDestory))
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        
                    }
                });

三. 原理解析

1. 首先需要将okhttp的WebSocket包装成Observable,由于需要将WebSocket,Stringmsg,ByteString等信息一同发送给观察者所以先构建一个WebSocketInfo类,将信息封装:

public class WebSocketInfo {
    private WebSocket mWebSocket;
    private String mString;
    private ByteString mByteString;
    private boolean onOpen;
    //其他省略
}

onOpen字段主要用来判断当前的这个WebSocketInfo是否是当WebSocket打开时发送的消息(onOpen),这时,Stringmsg和ByteString都是null.

2. 将WebSocketInfo包装成Observable发出:

    private final class WebSocketOnSubscribe implements Observable.OnSubscribe<WebSocketInfo> {
        private String url;

        private WebSocket webSocket;

        private WebSocketInfo startInfo, stringInfo, byteStringInfo;

        public WebSocketOnSubscribe(String url) {
            this.url = url;
            startInfo = new WebSocketInfo(true);
            stringInfo = new WebSocketInfo();
            byteStringInfo = new WebSocketInfo();
        }

        @Override
        public void call(final Subscriber<? super WebSocketInfo> subscriber) {
            if (webSocket != null) {
                //降低重连频率
                if (!"main".equals(Thread.currentThread().getName())) {
                    SystemClock.sleep(2000);
                }
            }
            initWebSocket(subscriber);
        }

        private void initWebSocket(final Subscriber<? super WebSocketInfo> subscriber) {
            webSocket = client.newWebSocket(getRequest(url), new WebSocketListener() {
                @Override
                public void onOpen(final WebSocket webSocket, Response response) {
                    if (showLog) {
                        Log.d("RxWebSocketUtil", url + " --> onOpen");
                    }
                    webSocketMap.put(url, webSocket);
                    AndroidSchedulers.mainThread().createWorker().schedule(new Action0() {
                        @Override
                        public void call() {
                            if (!subscriber.isUnsubscribed()) {
                                subscriber.onStart();
                                startInfo.setWebSocket(webSocket);
                                subscriber.onNext(startInfo);
                            }
                        }
                    });
                }

                @Override
                public void onMessage(WebSocket webSocket, String text) {
                    if (!subscriber.isUnsubscribed()) {
                        stringInfo.setWebSocket(webSocket);
                        stringInfo.setString(text);
                        subscriber.onNext(stringInfo);
                    }
                }

                @Override
                public void onMessage(WebSocket webSocket, ByteString bytes) {
                    if (!subscriber.isUnsubscribed()) {
                        byteStringInfo.setWebSocket(webSocket);
                        byteStringInfo.setByteString(bytes);
                        subscriber.onNext(byteStringInfo);
                    }
                }

                @Override
                public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                    if (showLog) {
                        Log.e("RxWebSocketUtil", t.toString() + webSocket.request().url().uri().getPath());
                    }
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onError(t);
                    }
                }

                @Override
                public void onClosing(WebSocket webSocket, int code, String reason) {
                    webSocket.close(1000, null);
                }

                @Override
                public void onClosed(WebSocket webSocket, int code, String reason) {
                    if (showLog) {
                        Log.d("RxWebSocketUtil", url + " --> onClosed:code= " + code);
                    }
                }
            });
            subscriber.add(new MainThreadSubscription() {
                @Override
                protected void onUnsubscribe() {
                    webSocket.close(3000, "手动关闭");
                }
            });
        }

    }

实现一个WebSocketOnSubscribe 将WebSocket的回调转化成subscriber调用.发送给Observable下游.在onOpen时调用 subscriber.onStart(),并且发送一个onOpen的WebSocketInfo.在subscriber注销的时候关闭WebSocket.在call方法最上面有个SystemClock.sleep(2000),这个主要是为了降低在断连的时候的重连频率,将在下面讲到.

包装成Observable:

Observable.create(new WebSocketOnSubscribe(url))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

3. 实现自动重连:

Observable.create(new WebSocketOnSubscribe(url))
                    //自动重连
                    .timeout(timeout, timeUnit)
                    .retry()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

RxJava retry操作符,很完美的实现了这个功能,当上游发出Throwable的时候,retry将错误吃掉,并重新调用 onSubscribe的call方法,也就是WebSocketOnSubscribe的call,就会重新初始化一个WebSocket连接,达到重连的目的,如果一直没有网络,这个retry的调用频率非常高,所以在call方法里面,当是重连的时候,就SystemClock.sleep(2000),休眠2秒,这样重连的频率就是2秒重连一次. 当然在retry上面还有一个timeout操作符.当subscriber.onNext()在指定时间间隔里没有调用,就发出一个timeoutException,让retry重连WebSocket.这个主要是为了适配部分国产机型,当WebSocket发生连接异常时,不会及时发出错误,如小米平板.在每次重连都会把原来的WebSocket关闭.

4. 实现同一个URL的WebSocket共享

Observable.create(new WebSocketOnSubscribe(url))
                    //自动重连
                    .timeout(timeout, timeUnit)
                    .retry()
                    //共享
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            observableMap.remove(url);
                            webSocketMap.remove(url);
                            if (showLog) {
                                Log.d("RxWebSocketUtil", "注销");
                            }
                        }
                    })
                    .doOnNext(new Action1<WebSocketInfo>() {
                        @Override
                        public void call(WebSocketInfo webSocketInfo) {
                            if (webSocketInfo.isOnOpen()) {
                                webSocketMap.put(url, webSocketInfo.getWebSocket());
                            }
                        }
                    })
                    .share()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());

实现共享功能,主要是为了防止一个URL的WebSocket,建立多个连接,这个主要是由RxJava的share操作符实现,share操作符,使得一个Observable可以有多个subscriber,当有多个subscriber时,当所有的subscriber都取消订阅,这个Observable才会取消订阅. getWebSocketInfo()方法完整代码:

    public Observable<WebSocketInfo> getWebSocketInfo(final String url, final long timeout, final TimeUnit timeUnit) {
        Observable<WebSocketInfo> observable = observableMap.get(url);
        if (observable == null) {
            observable = Observable.create(new WebSocketOnSubscribe(url))
                    //自动重连
                    .timeout(timeout, timeUnit)
                    .retry()
                    //共享
                    .doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            observableMap.remove(url);
                            webSocketMap.remove(url);
                            if (showLog) {
                                Log.d("RxWebSocketUtil", "注销");
                            }
                        }
                    })
                    .doOnNext(new Action1<WebSocketInfo>() {
                        @Override
                        public void call(WebSocketInfo webSocketInfo) {
                            if (webSocketInfo.isOnOpen()) {
                                webSocketMap.put(url, webSocketInfo.getWebSocket());
                            }
                        }
                    })
                    .share()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread());
            observableMap.put(url, observable);
        } else {
            observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable);
        }
        return observable;
    }

doOnUnsubscribe作用:在Observable注销,即 WebSocket关闭时,移除map中的缓存的Observable和WebSocket.

doOnNext作用: 判断接收到的WebSocketInfo是否是WebSocket在onOpen的时候发的,然后将其缓存起来.作用就是:如果有一个相同的URL订阅Observable,就从缓存中取,这个时候我们应该把一个WebSocket的onOpen事件也发给这个订阅者:

//使用merge操作符,将onOpen事件发给订阅者
observable = Observable.merge(Observable.just(new WebSocketInfo(webSocketMap.get(url), true)), observable);

这样的话,同一个URL的WebSocket,不管在什么地方什么时间订阅,都能收到一个onOpen事件,外部表现的就像一个新的WebSocket.

getWebSocketInfo方法的几种变体:

    /**
     * default timeout: 30 days
     * <p>
     * 若忽略小米平板,请调用这个方法
     * </p>
     */
    public Observable<WebSocketInfo> getWebSocketInfo(String url) {
        return getWebSocketInfo(url, 30, TimeUnit.DAYS);
    }

    public Observable<String> getWebSocketString(String url) {
        return getWebSocketInfo(url)
                .map(new Func1<WebSocketInfo, String>() {
                    @Override
                    public String call(WebSocketInfo webSocketInfo) {
                        return webSocketInfo.getString();
                    }
                })
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String s) {
                        return s != null;
                    }
                });
    }

    public Observable<ByteString> getWebSocketByteString(String url) {
        return getWebSocketInfo(url)
                .map(new Func1<WebSocketInfo, ByteString>() {
                    @Override
                    public ByteString call(WebSocketInfo webSocketInfo) {
                        return webSocketInfo.getByteString();
                    }
                })
                .filter(new Func1<ByteString, Boolean>() {
                    @Override
                    public Boolean call(ByteString byteString) {
                        return byteString != null;
                    }
                });
    }

    public Observable<WebSocket> getWebSocket(String url) {
        return getWebSocketInfo(url)
                .map(new Func1<WebSocketInfo, WebSocket>() {
                    @Override
                    public WebSocket call(WebSocketInfo webSocketInfo) {
                        return webSocketInfo.getWebSocket();
                    }
                });
    }

5 . send信息到服务端

上面已经讲到WebSocketInfo包含了WebSocket,所以在订阅后,就可以拿到这个WebSocket引用就可以WebSocket.send发送消息到服务端.当然我们的RxWebSocketUtil已经将开启的WebSocket已经缓存.所以我们也可以这样发消息:

    /**
     * 如果url的WebSocket已经打开,可以直接调用这个发送消息.
     *
     * @param url
     * @param msg
     */
    public void send(String url, String msg) {
        WebSocket webSocket = webSocketMap.get(url);
        if (webSocket != null) {
            webSocket.send(msg);
        } else {
            throw new IllegalStateException("The WebSokcet not open");
        }
    }

    /**
     * 如果url的WebSocket已经打开,可以直接调用这个发送消息.
     *
     * @param url
     * @param byteString
     */
    public void send(String url, ByteString byteString) {
        WebSocket webSocket = webSocketMap.get(url);
        if (webSocket != null) {
            webSocket.send(byteString);
        } else {
            throw new IllegalStateException("The WebSokcet not open");
        }
    }

当指定的URL的WebSocket没有打开会直接报错.

异步发送消息到服务端

    /**
     * 不用关心url 的WebSocket是否打开,可以直接发送
     *
     * @param url
     * @param msg
     */
    public void asyncSend(String url, final String msg) {
        getWebSocket(url)
                .first()
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) {
                        webSocket.send(msg);
                    }
                });

    }

    /**
     * 不用关心url 的WebSocket是否打开,可以直接发送
     *
     * @param url
     * @param byteString
     */
    public void asyncSend(String url, final ByteString byteString) {
        getWebSocket(url)
                .first()
                .subscribe(new Action1<WebSocket>() {
                    @Override
                    public void call(WebSocket webSocket) {
                        webSocket.send(byteString);
                    }
                });
    }

这两种发送方式,你不用关心URL的WebSocket是否打开,可以直接发送.实现思路也很简单,getWebSocket(url)会获取到Observable,或者是从缓存中取,或者是重新开启一个WebSocket,但你都不需要关心,经过first操作符后,如果是从缓存取的Observable,就注销的当前的Observable,当是新开的WebSocket,注销掉当前的subscriber后,就没有其他subscriber了,这个新开的WebSocket就会关闭(share操作符作用).

最后,如有什么好的建议,可以联系我.

项目地址: https://github.com/dhhAndroid/RxWebSocket

如果对你有帮助,谢谢 star !

尊重原创,禁止转载!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,596评论 18 139
  • 文章转自:http://gank.io/post/560e15be2dca930e00da1083作者:扔物线在正...
    xpengb阅读 7,017评论 9 73
  • 我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy阅读 5,455评论 7 62
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,600评论 8 93
  • http://blog.csdn.net/yyh352091626/article/details/5330472...
    奈何心善阅读 3,544评论 0 0