操作符
设计模式
观察者模式、发布订阅模式:发布者发布信息,订阅者获取信息,订阅了就能收到信息,没订阅就收不到信息。
应用场景
RxJava配合Retrofit ,防抖,网络嵌套
核心思想
有一个起点和一个终点,起点开始流向我们的“事件”
把事件流向到终点,只不过在流向的过程中,可以增加拦截,
拦截时可以对“事件进行改变” 终点只关心它上一个拦截
Observable创建过程时序图
订阅流程分析
1.从终点开始也就是Rxjava中的自定义被观察者new Observer<T>()。
2.通过订阅事件subscribe(自定义观察者)方法中调用抽象函数subscribeActual(自定义观察者)。
3.ObervableCreat类中实现了subscribeActual(自定义观察者)方法,在这个方法中有三个重要步骤:
1)new CreateEmitter<T>(自定义观察者) =parent。
2)自定义观察者通过onSubscribe(parent)拿到发射器。
3)自定义被观察者的封装类ObservableOnSubscribe<T>,自定义被观察订阅发射器source.subscribe(parent)。
4.到达起点Rxjava中自定义被观察者的创建以及封装到new ObservableOnSubscribe<T>(),并实现subscribe订阅方法,通过里面订阅到的发射器e.onNext(T)发射事件,由于自定义观察者拿到过发射器所以实际调用的是“自定义观察者.onNext”最后回到终点。
Observable与Observer 订阅的过程以及时序图
Map操作符
Rxjava的装饰模型
Rxjava的线程
线程流程分析
1.从终点开始也就是Rxjava中的自定义被观察者new Observer<T>()。
2.通过订阅事件subscribe(自定义观察者)方法中调用抽象函数subscribeActual(自定义观察者)。
3.通过线程上游定义函数subscribeOn(scheduler),new ObservableSubscribeOn<T>(this:自定义观察者, scheduler)这个是将自定义观察者和它所需要的线程实现类封装起来。
ObservableSubscribeOn这个类中有静态内部类SubscribeOnObserver继承AtomicReference<Disposable>实现了自定义观察者的所有接口Observer以及打断接口Disposable。
4.在ObservableSubscribeOn这个类中的subscribeActual()还new SubscribeTask(parent:SubscribeOnObserver);在SubscribeTask这个类的run方法中去将自定义被观察者和parent订阅在一起。
5.scheduler.scheduleDirect(new SubscribeTask(parent));这个函数是将任务交给对应的线程池,scheduleDirect()方法中创建对应的线程Worker w = createWorker(),然后将任务交给w线程去执行 w.schedule(task, delay, unit);同时将任务和w线程封装成DisposeTask,返回DisposeTask(这个特别之处是可以中断的Runnable)。如果是主线程的话其实走的是HandlerScheduler。
6.由于在对应线程中执行了subscribe方法,所以下面的操作多在对应的线程上执行,之后到达ObervableCreat类,此类中也实现了subscribeActual(parent:SubscribeOnObserver)方法,在这个方法中有三个重要步骤:
1)new CreateEmitter<T>(自定义观察者) =parent。
2)自定义观察者通过onSubscribe(parent)拿到发射器。
3)自定义被观察者的封装类ObservableOnSubscribe<T>,自定义被观察订阅发射器source.subscribe(parent)。
7.到达起点Rxjava中自定义被观察者的创建以及封装到new ObservableOnSubscribe<T>(),并实现subscribe订阅方法,通过里面订阅到的发射器e.onNext(T)发射事件,由于自定义观察者拿到过发射器所以实际调用的是“自定义观察者.onNext”最后回到终点。
注意:哪个线程调用subscribe,在自定义观察者的实现方法中onSubscribe(Disposable d)就是哪个线程。可以在这里做赋全局变量,在不需要再继续执行的地方做中断Rxjava的流式操作
。
自定义Rxjava操作符
1.首先继承Observable<T>这个类,重写里面的subscribeActual()方法。
2.在这个类中写个事件的静态内部类实现对应的事件以及Disposable接口。
自定义被观察者类
import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.view.View;
import androidx.annotation.NonNull;
import java.util.concurrent.atomic.AtomicBoolean;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
public class ViewClickObservable extends Observable<Object> {
private final View view;
// 事件 第一节课 防抖 事件 没用
private static final Object EVENT = new Object();
private static Object EVENT2;
public ViewClickObservable(View view) {
this.view = view;
EVENT2 = view;
}
@Override
protected void subscribeActual(Observer<? super Object> observer) {
// 可以干自己的
MyListener myListener = new MyListener(view, observer);
observer.onSubscribe(myListener);
this.view.setOnClickListener(myListener);
}
// 我们的包裹
static final class MyListener implements View.OnClickListener, Disposable {
private final View view;
private Observer<Object> observer; // 存一份 下一层
// 原子性
// https://www.jianshu.com/p/8a44d4a819bc
// boolean == AtomicBoolean
private final AtomicBoolean isDisposable = new AtomicBoolean();
public MyListener(View view, Observer<Object> observer) {
this.view = view;
this.observer = observer;
}
@Override
public void onClick(View v) {
if (isDisposed() == false) {
observer.onNext(EVENT);
}
}
// 如果用调用了 中断
@Override
public void dispose() {
// 如果没有中断过,才有资格, 取消view.setOnClickListener(null);
if (isDisposable.compareAndSet(false, true)) {
// 主线程 很好的中断
if (Looper.myLooper() == Looper.getMainLooper()) {
view.setOnClickListener(null);
} else { // 主线程,通过Handler的切换
/*new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
super.handleMessage(msg);
view.setOnClickListener(null);
}
};*/
// HandlerScheduler.scheduleDirect
AndroidSchedulers.mainThread().scheduleDirect(new Runnable() {
@Override
public void run() {
view.setOnClickListener(null);
}
});
}
}
}
@Override
public boolean isDisposed() {
return isDisposable.get();
}
}
}
操作符类
import android.view.View;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.plugins.RxJavaPlugins;
public class RxView {
private final static String TAG = RxView.class.getSimpleName();
// 我们自己的操作符 == 函数
public static Observable<Object> clicks(View view) {
return new ViewClickObservable(view);
}
}
应用
Button button = findViewById(R.id.button);
RxView.clicks(button)
.throttleFirst(2000, TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Derry");
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(L.TAG, "accept: 终点:" + s);
}
});
}
});