- 倒计时
@OnClick(R.id.button)
public void onViewClicked() {
ToastUtils.showToast("哈士奇爱吃苹果");
final int count = 10;//倒计时10秒
Observable.interval(0, 1, TimeUnit.SECONDS)
.take(count + 1)
.map(new Function<Long, Long>() {
@Override
public Long apply(Long aLong) throws Exception {
return count - aLong;
}
})
.observeOn(AndroidSchedulers.mainThread())//ui线程中进行控件更新
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
button.setEnabled(false);
button.setTextColor(Color.BLACK);
}
}).subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long num) {
button.setText("剩余" + num + "秒");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
//回复原来初始状态
button.setEnabled(true);
button.setText("发送验证码");
}
});
}
作者:一汪藏海
链接:https://www.jianshu.com/p/41253d493f7d
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
- 取消订阅,取消任务继续执行。
RxJava2 中多种取消订阅 dispose 的方法梳理( 源码分析 )
- 写个rxjava2 manager管理“被观察者”, 当页面退出时取消网络请求;取消后台耗时操作; 取消倒计时;
取消网络请求, 取消订阅, 取消倒计时
线程切换-工作线程做耗时任务, 工作完成切换到UI线程
变换 - 链式调用,避免多层级if嵌套
被观察者和观察者
创建被观察者,
变换->新的被观察者,
处理新的被观察者;
- rxjava2是对ReactiveX库的java实现
- ReactiveX要干啥?
ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.
通过被观察者序列实现对异步程序和基于事件程序的排版处理;
- ReactiveX 纵览 Observables, 包括订阅,取消订阅,Those Observables can then (if they have no other interested observers) choose to stop generating new items to emit.
http://reactivex.io/documentation/observable.html
- ReactiveX 的错误处理
被观察者 has failed to generate the expected data or has encountered some other error.
被观察者在生成item的时候,发生异常或错误了,
然后呢, 被观察者通知观察者, 目前的情况;
调用onError方法;
同时了, 在观察者接收到异常或错误之前,
可以添加些操作符,拦截异常情况, 可以做些处理再给观察者,
也可以直接拦截了, 不给观察者;
错误拦截操作符
catch
retry
- "热"observable&“冷”Observable
“热”立即开发发射, 导致观察者错过一切;
“冷”要等待观察者已经准备好了, 观察者不会错过;
&create(ObservableOnSubscribe<T> source)
Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world.
*ReaciveX强大的操作符
变换, 结合, 操纵,,,
- ReactiveX 变换操作符
Buffer
周期性的从被观察者处获取items,然后打包成bundle, 然后发射这个bundle;GroupBy
先给observables分组;然后再按组发射出去;organized by key
flatMap
对observables进行转换成变换后的observables,再发射转换后的observables,
一次一个;Map
先对单个observable进行变换, 再把变换后的observable发射出去, 一次一个;
-scan
对observables进行变换,得到新的observables,然后对新的observables进行排序,按照变换后的key值进行排序, 然后再按照排序一个个发射出去;
- window
定期将Observable中的项目细分为Observable窗口并发出这些窗口,而不是一次发出一个项目
- ReactiveX 过滤操作符
- Debounce
如果特定的时间跨度已经过去而没有发出另一个项目,则只从Observable中发出一个项目
distinct
去重复;ElementAt
发出指定的observable;
emit only item n emitted by an Observable
- Filter
emit only those items from an Observable that pass a predicate test
只发射通过测试的observable;
- First
emit only the first item, or the first item that meets a condition, from an Observable
发出”第一个“;
也就是只发出一个;
- Last
仅发出Observable发出的最后一项
- IgnoreElements
不从Observable中发出任何项目,但镜像其终止通知
Sample
emit the most recent item emitted by an Observable within periodic time intervals
在周期性时间间隔内发出Observable发出的最新项目
采样最近的;Skip
跳过Observable发出的前n项SkipLast
跳过Observable发出的最后n个项目Take
仅发出Observable发出的前n项
-TakeLast
仅发出Observable发出的最后n个项目
ReactiveX 联合操作符
Error Handling Operators
Operators that help to recover from error notifications from an Observable
Catch-
recover from an onError
notification by continuing the sequence without error
捕获异常,并调用观察者的onError方法,告知观察者异常;
然后继续发射;观察者的onNext方法继续接收;
Retry-
if a source Observable sends an onError
notification, resubscribe to it in the hopes that it will complete without error
如果源Observable发送'onError`通知,则重新订阅它,希望它能完成而不会出错
- ReactiveX - 工具类
TimeInterval
convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions
指定发射间隔;Timeout
镜像源Observable,
但如果在没有任何发出项目的情况下经过特定时间段,则发出错误通知
-Timestamp
将时间戳附加到Observable发出的每个项目
- Using
创建一个与Observable具有相同生命周期的可支配资源
也就是克隆一个源observable
- 用例
A Decision Tree of Observable Operators
- 组合
Zip
whenever each of the Observables has emitted a new item
当组合队伍中所有人都发射一个,然后才会把所有的结合起来, 变换后再发射出去;
比如那个足球和板球的例子,得到的最终发射结果是也喜欢板球也喜欢足球的人。
*BiFunction
- 不可用Dispose
CompositeDisposable - A disposable container that can hold onto multiple other disposables and offers O(1) add and removal complexity.
clear】
Atomically clears the container, then disposes all the previously contained Disposables.
清空并停止所有;
dispose】
仅清空所有;
remove】
Removes and disposes the given disposable if it is part of this container.
观察者本身就是disposiable
DisposableObserver利用CompositeDisposable和DisposableObserver可以实现对被观察者取消订阅;并停止被观察者发射items;
&Subscriber 包含取消订阅的方法;
&Interface Subscription
A Subscription represents a one-to-one lifecycle of a Subscriber subscribing to a Publisher.
It can only be used once by a single Subscriber.
It is used to both signal desire for data and cancel demand (and allow resource cleanup).
-- Subscriber
Flowable&Observable&SingleObservable&CompletableObservable
Flowable&Observable用例区分
- MaybeObserver&SingleObserver&Observer&DisposableObserver
&CompletableObserver
&Consumer
&Subscriber
- Publisher&Subscriber
** Processer
** Flowable
** FlowableProcessor
*** PublishProcessor
- compose/编写,组成
- compose for reusable code
&FlowableTransformer
&ObservableTransformer
-
Flowable
** FlowableProcessor
*** XXXProcessor
image.png
- Observable
** Subject
** *XXXSubject
- Flowable的优势体现在链式配置返回Disposable对象, 以及对背压的支持;
-
&导致app 崩掉的异常
*** Observable.just("Cricket", null);
java.lang.NullPointerException: The second item is null
at io.reactivex.internal.functions.ObjectHelper.requireNonNull(ObjectHelper.java:39)
at io.reactivex.Observable.just(Observable.java:2199)
* &不会导致app 崩溃的异常, 并且当异常发生时, 观察者可以被通知到
07-06 11:48:31.218 9169-9169/com.rxjava2.android.samples D/ExceptionExampleActivity: onError : 测试异常捕获范围
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
throw new NullPointerException("测试异常捕获范围");
}
});
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, " onSubscribe : " + d.isDisposed());
}
@Override
public void onNext(String value) {
textView.append(" onNext : value : " + value);
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onNext : value : " + value);
}
@Override
public void onError(Throwable e) {
textView.append(" onError : " + e.getMessage());
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onError : " + e.getMessage());
}
@Override
public void onComplete() {
textView.append(" onComplete");
textView.append(AppConstant.LINE_SEPARATOR);
Log.d(TAG, " onComplete");
}
};
**** PublishProcessor捕获异常, 必须要指定onError consumer, 否则异常无法被捕获
07-06 12:00:22.032 11129-11129/com.rxjava2.android.samples I/PaginationActivity: accept: 我是空指针异常
Consumer<List<String>> consumer = new Consumer<List<String>>() {
@Override
public void accept(@NonNull List<String> items) throws Exception {
paginationAdapter.addItems(items);
paginationAdapter.notifyDataSetChanged();
loading = false;
progressBar.setVisibility(View.INVISIBLE);
}
};
Consumer<Throwable> onError = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.i(TAG, "accept: "+throwable.getMessage());
}
};
Disposable disposable
= paginator
.onBackpressureDrop()
.concatMap(new Function<Integer, Publisher<List<String>>>() {
@Override
public Publisher<List<String>> apply(@NonNull Integer page) throws Exception {
loading = true;
progressBar.setVisibility(View.VISIBLE);
return dataFromNetwork(page);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer, onError);
compositeDisposable.add(disposable);
paginator.onNext(pageNumber);
// Subscriber
// Processor
// PublishProcessor
}
/**
* Simulation of network data
*/
private Flowable<List<String>> dataFromNetwork(final int page) {
return Flowable.just(true)
.delay(2, TimeUnit.SECONDS)
.map(new Function<Boolean, List<String>>() {
@Override
public List<String> apply(@NonNull Boolean value) throws Exception {
throw new NullPointerException("我是空指针异常");
// List<String> items = new ArrayList<>();
// for (int i = 1; i <= 10; i++) {
// items.add("Item " + (page * 10 + i));
// }
// return items;
}
});
}
* &Retrofit
* /retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java
支持取消订阅和取消网络请求
支持对json解析异常的捕获;
支持取消订阅和取消网络请求
static final class RequestArbiter<T> extends AtomicBoolean implements Subscription, Producer {
private final Call<T> call;
private final Subscriber<? super Response<T>> subscriber;
RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
this.call = call;
this.subscriber = subscriber;
}
@Override public void request(long n) {
if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
if (n == 0) return; // Nothing to do when requesting 0.
if (!compareAndSet(false, true)) return; // Request was already triggered.
try {
Response<T> response = call.execute();
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(response);
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (!subscriber.isUnsubscribed()) {
subscriber.onError(t);
}
return;
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override public void unsubscribe() {
call.cancel();
}
@Override public boolean isUnsubscribed() {
return call.isCanceled();
}
}
* &支持json解析异常的捕获
/retrofit2/OkHttpCall.java
@Override public void enqueue(final Callback<T> callback) {
if (callback == null) throw new NullPointerException("callback == null");
okhttp3.Call call;
Throwable failure;
synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;
call = rawCall;
failure = creationFailure;
if (call == null && failure == null) {
try {
call = rawCall = createRawCall();
} catch (Throwable t) {
failure = creationFailure = t;
}
}
}
if (failure != null) {
callback.onFailure(this, failure);
return;
}
if (canceled) {
call.cancel();
}
call.enqueue(new okhttp3.Callback() {
@Override public void onResponse(okhttp3.Call call, okhttp3.Response rawResponse)
throws IOException {
Response<T> response;
try {
response = parseResponse(rawResponse);
} catch (Throwable e) {
callFailure(e);
return;
}
callSuccess(response);
}
@Override public void onFailure(okhttp3.Call call, IOException e) {
try {
callback.onFailure(OkHttpCall.this, e);
} catch (Throwable t) {
t.printStackTrace();
}
}
private void callFailure(Throwable e) {
try {
callback.onFailure(OkHttpCall.this, e);
} catch (Throwable t) {
t.printStackTrace();
}
}
private void callSuccess(Response<T> response) {
try {
callback.onResponse(OkHttpCall.this, response);
} catch (Throwable t) {
t.printStackTrace();
}
}
});
}
@Override public synchronized boolean isExecuted() {
return executed;
}
@Override public Response<T> execute() throws IOException {
okhttp3.Call call;
synchronized (this) {
if (executed) throw new IllegalStateException("Already executed.");
executed = true;
if (creationFailure != null) {
if (creationFailure instanceof IOException) {
throw (IOException) creationFailure;
} else {
throw (RuntimeException) creationFailure;
}
}
call = rawCall;
if (call == null) {
try {
call = rawCall = createRawCall();
} catch (IOException | RuntimeException e) {
creationFailure = e;
throw e;
}
}
}
if (canceled) {
call.cancel();
}
return parseResponse(call.execute());
}
private okhttp3.Call createRawCall() throws IOException {
Request request = serviceMethod.toRequest(args);
okhttp3.Call call = serviceMethod.callFactory.newCall(request);
if (call == null) {
throw new NullPointerException("Call.Factory returned null.");
}
return call;
}
Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
ResponseBody rawBody = rawResponse.body();
// Remove the body's source (the only stateful object) so we can pass the response along.
rawResponse = rawResponse.newBuilder()
.body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
.build();
int code = rawResponse.code();
if (code < 200 || code >= 300) {
try {
// Buffer the entire body to avoid future I/O.
ResponseBody bufferedBody = Utils.buffer(rawBody);
return Response.error(bufferedBody, rawResponse);
} finally {
rawBody.close();
}
}
if (code == 204 || code == 205) {
return Response.success(null, rawResponse);
}
ExceptionCatchingRequestBody catchingBody = new ExceptionCatchingRequestBody(rawBody);
try {
T body = serviceMethod.toResponse(catchingBody);
return Response.success(body, rawResponse);
} catch (RuntimeException e) {
// If the underlying source threw an exception, propagate that rather than indicating it was
// a runtime exception.
catchingBody.throwIfCaught();
throw e;
}
}
* &retrofit2支持rxjava2取消订阅&okHttp的call取消&json解析异常捕获;