- 上一篇文章分析是如何创建线程, 其实仅仅就是把源码走了一遍, 里面很多细节由于目前本身能力有限无法继续分析, 待主子线程通信分析完以后把多线程, 并发包, 设计模式系统的过一遍再回过头把RxJava系列重新梳理一遍;
Example:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onNext(Integer value) {
LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
}
});
- 打印结果:
10-09 23:41:07.514 3178-3178/com.test onSubscribe()->ThreadName:main
10-09 23:41:07.514 3178-3572/com.test subscribe()->ThreadName:RxNewThreadScheduler-1
10-09 23:41:07.514 3178-3178/com.test onNext()->ThreadName:main
10-09 23:41:07.514 3178-3178/com.test onComplete()->ThreadName:main
- 如何进行线程间通信:
.observeOn(AndroidSchedulers.mainThread());
public final class AndroidSchedulers {
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
}
- 1、创建HandlerScheduler, 将引用赋给Scheduler;
- 2、在主线程中创建Handler, HandlerScheduler内部持有Handler的引用;
public abstract class Observable<T> implements ObservableSource<T> {
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
}
- 又是老套路;
- 1、Observable持有ObservableSubscribeOn的引用;
- 2、ObservableObserveOn内部持有ObservableSubscribeOn的引用;
- 3、ObservableSubscribeOn内部持有ObservableCreate的引用;
然后调用subscribe()时先从ObservableSubscribeOn开始:
public final class ObservableObserveOn<T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
final Observer<? super T> actual;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
}
}
- 然后递归到ObservableSubscribeOn中的subscribeActual()方法:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
}
- 接下来又回到了上一节当中, 在子线程中调用source.subscribe()方法, source实际指向ObservableCreate.
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
}
- 1、将SubscribeOnObserver引用赋给CreateEmitter内部的Observer;
- 2、将CreateEmitter引用赋给SubscribeOnObserver内部的Disposable;
- 3、将CreateEmitter引用赋给ObservableOnSubscribe内部的ObservableEmitter;
- 接下来通过发射器CreateEmitter调用onXXX()系列方法, 看看执行了那么逻辑: 通过上面分析知道CreateEmitter此时的onXXX()方法均在子线程中执行;
CreateEmitter.onNext() ->SubscribeOnObserver.onNext() -> ObserveOnObserver.onNext();
static final class ObserveOnObserver<T> {
@Override
public void onNext(T t) {
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
}
- Schedule指向HandlerScheduler:
public abstract static class Worker implements Disposable {
public Disposable schedule(Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
}
final class HandlerScheduler extends Scheduler {
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
}
- worker调用schedule-> handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)))最终将ObserveOnObserver的run方法发送到主线程;
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
}
顺便得出几个结果:
- 1、要像观察者跑在主线程, 需要在方法被执行前调用.observeOn(AndroidSchedulers.mainThread()), 当遍历到该Observer时, 会通过handler.sendMessage实现主子线程通信;
- 2、子线程的前提下想让某个方法跑在主线程, 则只需要在该方法前面调用.observeOn(AndroidSchedulers.mainThread())即可;
- 3、连续多次调用scheduleOn(schedule.io), 则被观察者执行的操作所在线程为最后一次schedule.io创建的线程中;
目前来看, RxJava使用观察者的模式如下图流程(类型链表):
每次调用类似subscribeOn(), doOnNext(), map()等方法时, 都会创建一个Observable子类, 然后该子类会包含前一个子类的引用, 并将自己的引用传给Observable下一个子类. 当源头ObservableEmitter调用onXXX()方法时, 会从最后一个子类开始调用, 然后递归调用pre.onXXX()方法直至Observable的第一个子类, 然后又会遍历从第一个子类调用next.onXXX()方法, 直至最后一个子类;