rxjava用例

  • 倒计时
@OnClick(R.id.button)
public void onViewClicked() {

    ToastUtils.showToast("哈士奇爱吃苹果");
    final int count = 10;//倒计时10秒
    Observable.interval(0, 1, TimeUnit.SECONDS)
            .take(count + 1)
            .map(new Function<Long, Long>() {
                @Override
                public Long apply(Long aLong) throws Exception {
                    return count - aLong;
                }
            })
            .observeOn(AndroidSchedulers.mainThread())//ui线程中进行控件更新
            .doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(Disposable disposable) throws Exception {
                    button.setEnabled(false);
                    button.setTextColor(Color.BLACK);
                }
            }).subscribe(new Observer<Long>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(Long num) {

            button.setText("剩余" + num + "秒");
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {
            //回复原来初始状态
            button.setEnabled(true);
            button.setText("发送验证码");
        }
    });

}

作者:一汪藏海
链接:https://www.jianshu.com/p/41253d493f7d
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

  • 取消订阅,取消任务继续执行。

RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )

  • 写个rxjava2 manager管理“被观察者”, 当页面退出时取消网络请求;取消后台耗时操作; 取消倒计时;

参考思路

  • 取消网络请求, 取消订阅, 取消倒计时

  • 线程切换-工作线程做耗时任务, 工作完成切换到UI线程

  • 变换 - 链式调用,避免多层级if嵌套

  • 被观察者和观察者

创建被观察者,
变换->新的被观察者,
处理新的被观察者;

  • rxjava2是对ReactiveX库的java实现
  • ReactiveX要干啥?
    ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
    通过被观察者序列实现对异步程序和基于事件程序的排版处理;
  • ReactiveX 纵览 Observables, 包括订阅,取消订阅,Those Observables can then (if they have no other interested observers) choose to stop generating new items to emit.

http://reactivex.io/documentation/observable.html

  • ReactiveX 的错误处理
    被观察者 has failed to generate the expected data or has encountered some other error.

被观察者在生成item的时候,发生异常或错误了,
然后呢, 被观察者通知观察者, 目前的情况;
调用onError方法;

同时了, 在观察者接收到异常或错误之前,
可以添加些操作符,拦截异常情况, 可以做些处理再给观察者,
也可以直接拦截了, 不给观察者;
错误拦截操作符
catch
retry

  • "热"observable&“冷”Observable
    “热”立即开发发射, 导致观察者错过一切;
    “冷”要等待观察者已经准备好了, 观察者不会错过;

&create(ObservableOnSubscribe<T> source)
Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.

*ReaciveX强大的操作符
变换, 结合, 操纵,,,

  • ReactiveX 变换操作符
  • Buffer
    周期性的从被观察者处获取items,然后打包成bundle, 然后发射这个bundle;

  • GroupBy
    先给observables分组;然后再按组发射出去;organized by key

  • flatMap
    对observables进行转换成变换后的observables,再发射转换后的observables,
    一次一个;

  • Map
    先对单个observable进行变换, 再把变换后的observable发射出去, 一次一个;

-scan
对observables进行变换,得到新的observables,然后对新的observables进行排序,按照变换后的key值进行排序, 然后再按照排序一个个发射出去;

  • window

定期将Observable中的项目细分为Observable窗口并发出这些窗口,而不是一次发出一个项目

  • ReactiveX 过滤操作符
  • Debounce
    如果特定的时间跨度已经过去而没有发出另一个项目,则只从Observable中发出一个项目
  • distinct
    去重复;

  • ElementAt
    发出指定的observable;

emit only item n emitted by an Observable

  • Filter
    emit only those items from an Observable that pass a predicate test
    只发射通过测试的observable;
  • First
    emit only the first item, or the first item that meets a condition, from an Observable

发出”第一个“;

也就是只发出一个;

  • Last

仅发出Observable发出的最后一项

  • IgnoreElements
    不从Observable中发出任何项目,但镜像其终止通知
  • Sample
    emit the most recent item emitted by an Observable within periodic time intervals
    在周期性时间间隔内发出Observable发出的最新项目
    采样最近的;

  • Skip
    跳过Observable发出的前n项

  • SkipLast
    跳过Observable发出的最后n个项目

  • Take
    仅发出Observable发出的前n项

-TakeLast
仅发出Observable发出的最后n个项目

  • ReactiveX 联合操作符

  • Error Handling Operators

Operators that help to recover from error notifications from an Observable

Catch-
recover from an onError notification by continuing the sequence without error
捕获异常,并调用观察者的onError方法,告知观察者异常;
然后继续发射;观察者的onNext方法继续接收;

Retry-
if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error
如果源Observable发送'onError`通知,则重新订阅它,希望它能完成而不会出错

  • ReactiveX - 工具类
  • TimeInterval
    convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
    指定发射间隔;

  • Timeout
    镜像源Observable,
    但如果在没有任何发出项目的情况下经过特定时间段,则发出错误通知

-Timestamp
将时间戳附加到Observable发出的每个项目

  • Using
    创建一个与Observable具有相同生命周期的可支配资源
    也就是克隆一个源observable
  • 用例

A Decision Tree of Observable Operators

  • 组合
    Zip
    whenever each of the Observables has emitted a new item
    当组合队伍中所有人都发射一个,然后才会把所有的结合起来, 变换后再发射出去;
    比如那个足球和板球的例子,得到的最终发射结果是也喜欢板球也喜欢足球的人。

*BiFunction

  • 不可用Dispose
    CompositeDisposable
  • A disposable container that can hold onto multiple other disposables and offers O(1) add and removal complexity.

clear】
Atomically clears the container, then disposes all the previously contained Disposables.
清空并停止所有;

dispose】
仅清空所有;

remove】
Removes and disposes the given disposable if it is part of this container.

  • 观察者本身就是disposiable
    DisposableObserver

  • 利用CompositeDisposable和DisposableObserver可以实现对被观察者取消订阅;并停止被观察者发射items;
    &Subscriber 包含取消订阅的方法;
    &Interface Subscription
    A Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
    It can only be used once by a single Subscriber.
    It is used to both signal desire for data and cancel demand (and allow resource cleanup).
    -- Subscriber

  • Flowable&Observable&SingleObservable&CompletableObservable

  • Flowable&Observable用例区分

  • MaybeObserver&SingleObserver&Observer&DisposableObserver
    &CompletableObserver
    &Consumer
    &Subscriber
  • Publisher&Subscriber
    ** Processer
    ** Flowable
    ** FlowableProcessor
    *** PublishProcessor
  • compose/编写,组成
  • compose for reusable code
    &FlowableTransformer
    &ObservableTransformer
  • Flowable
    ** FlowableProcessor
    *** XXXProcessor


    image.png
  • Observable
    ** Subject
    ** *XXXSubject
image.png
  • Flowable的优势体现在链式配置返回Disposable对象, 以及对背压的支持;
  • &导致app 崩掉的异常

    *** Observable.just("Cricket", null);

    
    

java.lang.NullPointerException: The second item is null
at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
at io.reactivex.Observable.just(Observable.java:2199)


* &不会导致app 崩溃的异常, 并且当异常发生时, 观察者可以被通知到
 

07-06 11:48:31.218 9169-9169/com.rxjava2.android.samples D/ExceptionExampleActivity: onError : 测试异常捕获范围

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
throw new NullPointerException("测试异常捕获范围");
}
});

new Observer<String>() {

        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, " onSubscribe : " + d.isDisposed());
        }

        @Override
        public void onNext(String value) {
            textView.append(" onNext : value : " + value);
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onNext : value : " + value);
        }

        @Override
        public void onError(Throwable e) {
            textView.append(" onError : " + e.getMessage());
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onError : " + e.getMessage());
        }

        @Override
        public void onComplete() {
            textView.append(" onComplete");
            textView.append(AppConstant.LINE_SEPARATOR);
            Log.d(TAG, " onComplete");
        }
    };



**** PublishProcessor捕获异常, 必须要指定onError consumer, 否则异常无法被捕获

07-06 12:00:22.032 11129-11129/com.rxjava2.android.samples I/PaginationActivity: accept: 我是空指针异常

Consumer<List<String>> consumer = new Consumer<List<String>>() {
@Override
public void accept(@NonNull List<String> items) throws Exception {
paginationAdapter.addItems(items);
paginationAdapter.notifyDataSetChanged();
loading = false;
progressBar.setVisibility(View.INVISIBLE);
}
};

    Consumer<Throwable> onError = new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            Log.i(TAG, "accept: "+throwable.getMessage());
        }
    };


    Disposable disposable
            = paginator
            .onBackpressureDrop()
            .concatMap(new Function<Integer, Publisher<List<String>>>() {
                @Override
                public Publisher<List<String>> apply(@NonNull Integer page) throws Exception {
                    loading = true;
                    progressBar.setVisibility(View.VISIBLE);
                    return dataFromNetwork(page);
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(consumer, onError);

    compositeDisposable.add(disposable);

    paginator.onNext(pageNumber);

// Subscriber
// Processor
// PublishProcessor

}

/**
 * Simulation of network data
 */
private Flowable<List<String>> dataFromNetwork(final int page) {
    return Flowable.just(true)
            .delay(2, TimeUnit.SECONDS)
            .map(new Function<Boolean, List<String>>() {
                @Override
                public List<String> apply(@NonNull Boolean value) throws Exception {

                    throw new NullPointerException("我是空指针异常");

// List<String> items = new ArrayList<>();
// for (int i = 1; i <= 10; i++) {
// items.add("Item " + (page * 10 + i));
// }
// return items;
}
});
}




* &Retrofit 
* /retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java
支持取消订阅和取消网络请求
支持对json解析异常的捕获;

支持取消订阅和取消网络请求

static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {
private final Call<T> call;
private final Subscriber<? super Response<T>> subscriber;

RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
  this.call = call;
  this.subscriber = subscriber;
}

@Override public void request(long n) {
  if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
  if (n == 0) return; // Nothing to do when requesting 0.
  if (!compareAndSet(false, true)) return; // Request was already triggered.

  try {
    Response<T> response = call.execute();
    if (!subscriber.isUnsubscribed()) {
      subscriber.onNext(response);
    }
  } catch (Throwable t) {
    Exceptions.throwIfFatal(t);
    if (!subscriber.isUnsubscribed()) {
      subscriber.onError(t);
    }
    return;
  }

  if (!subscriber.isUnsubscribed()) {
    subscriber.onCompleted();
  }
}

@Override public void unsubscribe() {
  call.cancel();
}

@Override public boolean isUnsubscribed() {
  return call.isCanceled();
}

}



* &支持json解析异常的捕获
/retrofit2/OkHttpCall.java

@Override public void enqueue(final Callback<T> callback) {
if (callback == null) throw new NullPointerException("callback == null");

okhttp3.Call call;
Throwable failure;

synchronized (this) {
  if (executed) throw new IllegalStateException("Already executed.");
  executed = true;

  call = rawCall;
  failure = creationFailure;
  if (call == null && failure == null) {
    try {
      call = rawCall = createRawCall();
    } catch (Throwable t) {
      failure = creationFailure = t;
    }
  }
}

if (failure != null) {
  callback.onFailure(this, failure);
  return;
}

if (canceled) {
  call.cancel();
}

call.enqueue(new okhttp3.Callback() {
  @Override public void onResponse(okhttp3.Call call, okhttp3.Response rawResponse)
      throws IOException {
    Response<T> response;
    try {
      response = parseResponse(rawResponse);
    } catch (Throwable e) {
      callFailure(e);
      return;
    }
    callSuccess(response);
  }

  @Override public void onFailure(okhttp3.Call call, IOException e) {
    try {
      callback.onFailure(OkHttpCall.this, e);
    } catch (Throwable t) {
      t.printStackTrace();
    }
  }

  private void callFailure(Throwable e) {
    try {
      callback.onFailure(OkHttpCall.this, e);
    } catch (Throwable t) {
      t.printStackTrace();
    }
  }

  private void callSuccess(Response<T> response) {
    try {
      callback.onResponse(OkHttpCall.this, response);
    } catch (Throwable t) {
      t.printStackTrace();
    }
  }
});

}

@Override public synchronized boolean isExecuted() {
return executed;
}

@Override public Response<T> execute() throws IOException {
okhttp3.Call call;

synchronized (this) {
  if (executed) throw new IllegalStateException("Already executed.");
  executed = true;

  if (creationFailure != null) {
    if (creationFailure instanceof IOException) {
      throw (IOException) creationFailure;
    } else {
      throw (RuntimeException) creationFailure;
    }
  }

  call = rawCall;
  if (call == null) {
    try {
      call = rawCall = createRawCall();
    } catch (IOException | RuntimeException e) {
      creationFailure = e;
      throw e;
    }
  }
}

if (canceled) {
  call.cancel();
}

return parseResponse(call.execute());

}

private okhttp3.Call createRawCall() throws IOException {
Request request = serviceMethod.toRequest(args);
okhttp3.Call call = serviceMethod.callFactory.newCall(request);
if (call == null) {
throw new NullPointerException("Call.Factory returned null.");
}
return call;
}

Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();

// Remove the body's source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
    .body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
    .build();

int code = rawResponse.code();
if (code < 200 || code >= 300) {
  try {
    // Buffer the entire body to avoid future I/O.
    ResponseBody bufferedBody = Utils.buffer(rawBody);
    return Response.error(bufferedBody, rawResponse);
  } finally {
    rawBody.close();
  }
}

if (code == 204 || code == 205) {
  return Response.success(null, rawResponse);
}

ExceptionCatchingRequestBody catchingBody = new ExceptionCatchingRequestBody(rawBody);
try {
  T body = serviceMethod.toResponse(catchingBody);
  return Response.success(body, rawResponse);
} catch (RuntimeException e) {
  // If the underlying source threw an exception, propagate that rather than indicating it was
  // a runtime exception.
  catchingBody.throwIfCaught();
  throw e;
}

}



* &retrofit2支持rxjava2取消订阅&okHttp的call取消&json解析异常捕获;
















最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容

  • 包拯一生断案无数,比如《铡美案》,《狸猫换太子》,《乌盆案》,《铡包勉》,《铡判官》等等。每当遇到大案,有大官或者...
    诗书雅读阅读 756评论 0 1
  • 昨天,在好友圈看到前任晒了几张去泰国的照片。我看到后一笑而过,还给了一个赞。似乎我们没有发生任何事,就像好朋友一样...
    阿程先生阅读 261评论 1 0
  • 2018年1月9日参加了易效能G188期为期两天的课程学习,接下来就是90天的践行活动,选完班委过后,一共建...
    飄雪无痕阅读 613评论 1 4
  • 汨罗江畔一跃兮,屈子纵身成魂。 仿离骚以自比兮,继而九歌天问。 感夏日之炎炎兮,如旧绪绕彤云。 天青色等烟雨兮,粽...
    许士健阅读 224评论 0 2