RxJava详解之线程调度原理(六)
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onCompleted();
Log.i("@@@", "call" + Thread.currentThread().getName());
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.i("@@@", "onNext : " + Thread.currentThread().getName());
}
});
执行结果:
07-26 17:16:49.284 7266-7309/? I/@@@: callRxIoScheduler-2
07-26 17:16:49.368 7266-7266/? I/@@@: onNext : main
subscribeOn()
是指定被观察者事件源的执行线程。
observeOn()
是指定观察者的处理时间的线程。
subscribeOn源码分析:
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
/**
* A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
* class in {@link Schedulers}.
* 线程调度器
*/
public abstract class Scheduler {
...
}
它内部也是通过create()
创建一个Observable
但是唯一不同的是OnSubscribe
传递的是OperatorSubscribeOn
对象:
/**
* Subscribes Observers on the specified {@code Scheduler}.
*/
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
它内部原理其实和map
是一样一样的。我们就从call()
方法说起,首先看一下Subscriber
和Worker
类:
/**
* A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
* class in {@link Schedulers}.
*/
public abstract class Scheduler {
/**
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
* <p>
* When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}.
* <p>
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
*
* @return a Worker representing a serial queue of actions to be executed
*/
public abstract Worker createWorker();
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
* Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
*/
public abstract static class Worker implements Subscription {
/**
* Schedules an Action for execution.
*/
public abstract Subscription schedule(Action0 action);
/**
* Schedules an Action for execution at some point in the future.
*/
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
}
}
而上面的schedulers
是通过Schedulers.io()
创建的,这里看一下它的源码:
public final class Schedulers {
private final Scheduler computationScheduler;
private final Scheduler ioScheduler;
private final Scheduler newThreadScheduler;
private static final AtomicReference<Schedulers> INSTANCE = new AtomicReference<Schedulers>();
private static Schedulers getInstance() {
for (;;) {
Schedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new Schedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
} else {
current.shutdownInstance();
}
}
}
public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}
private Schedulers() {
@SuppressWarnings("deprecation")
RxJavaSchedulersHook hook = RxJavaPlugins.getInstance().getSchedulersHook();
Scheduler c = hook.getComputationScheduler();
if (c != null) {
computationScheduler = c;
} else {
computationScheduler = RxJavaSchedulersHook.createComputationScheduler();
}
// 下面几时创建ioScheduler的地方
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}
Scheduler nt = hook.getNewThreadScheduler();
if (nt != null) {
newThreadScheduler = nt;
} else {
newThreadScheduler = RxJavaSchedulersHook.createNewThreadScheduler();
}
}
...
}
继续onIOScheduler
的源码:
/**
* 和create()方法类似,这里简单的理解为直接返回参数即可
* Hook to call when the Schedulers.io() is called.
* @param scheduler the default io scheduler
* @return the default of alternative scheduler
*/
public static Scheduler onIOScheduler(Scheduler scheduler) {
Func1<Scheduler, Scheduler> f = onIOScheduler;
if (f != null) {
return f.call(scheduler);
}
return scheduler;
}
而上面getInstance().ioScheduler
的地方最终会调用到RxJavaSchedulersHook.createIoScheduler()
:
@Experimental
public static Scheduler createIoScheduler() {
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
// 看到了吗? 这里最终的Scheduler是CachedThreadScheduler
return new CachedThreadScheduler(threadFactory);
}
而在CachedThreadScheduler
类中:
public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
...
}
所以我们通过Schedulers.io()
的源码可以发现这里的具体实现类是CachedThreadScheduler
。而对应的Worker
的实现类是EventLoopWorker
:
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.once = new AtomicBoolean();
this.threadWorker = pool.get();
}
@Override
public void unsubscribe() {
if (once.compareAndSet(false, true)) {
// unsubscribe should be idempotent, so only do this once
// Release the worker _after_ the previous action (if any) has completed
threadWorker.schedule(this);
}
innerSubscription.unsubscribe();
}
@Override
public void call() {
pool.release(threadWorker);
}
@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
// 内部会调用外面传递过来的Action对象的call方法
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
这里会继续执行到threadWorker.scheduleActual
:
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor;
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
...
}
而ScheduledAction
的源码:
/**
* A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
* {@code Subscriber} in respect of an {@code Observer}.
*/
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
/** */
private static final long serialVersionUID = -3962399486978279857L;
final SubscriptionList cancel;
final Action0 action;
public ScheduledAction(Action0 action) {
this.action = action;
this.cancel = new SubscriptionList();
}
...
}
ScheduledAction
实现了Runnable
接口,通过线程池executor
最终实现了线程切换。上面便是subscribeOn(Schedulers.io())
实现线程切换的全部过程。
observeOn源码分析
直接上源码:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
observeOn
内部是通过lift
来实现的,看一下lift
的源码:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return create(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
归根到底还是使用了create()
,那就和之前分析的基本都差不多了,直接看OnSubscribeLift
:
/**
* Transforms the downstream Subscriber into a Subscriber via an operator
* callback and calls the parent OnSubscribe.call() method with it.
*/
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
}
也实现了OnSubscribe
,通过前面的分析我们知道一旦调用了subscribe()
将观察者与被观察绑定后就会触发被观察者所对应的OnSubscribe
的call()
方法,所以这里会触发OnSubscribeLift.call()
。在call()
中调用了OperatorObserveOn.call()
并返回了一个新的观察者Subscriber st
,接着调用了前一级Observable
对应OnSubscriber.call(st)
。
所以这里要看一下OperatorObserveOn
类及它的call()
方法:
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
这里会走到创建ObserveOnSubscriber
然后调用其init()
方法:
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final boolean delayError;
final Queue<Object> queue;
/** The emission threshold that should trigger a replenishing request. */
final int limit;
// the status of the current stream
volatile boolean finished;
final AtomicLong requested = new AtomicLong();
final AtomicLong counter = new AtomicLong();
/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;
/** Remembers how many elements have been emitted before the requests run out. */
long emitted;
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
// 通过参数传递过来的Schedule创建对应的worker
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
// 有该方法
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
// 有该方法
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
// 有该方法
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
// 调用recursiveScheduler,而recursiveScheduler是通过参数传递过来的
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
// 最终会执行到该方法
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
// 该方法会调用onNext方法
localChild.onNext(NotificationLite.<T>getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
}
我们看到上面的onNext
、onComplete()
和onError
方法里面都调用了schedule()
方法,所以这个方法就是具体的切换线程的部分:
protected void schedule() {
if (counter.getAndIncrement() == 0) {
// 调用recursiveScheduler,而recursiveScheduler是通过参数传递过来的,该方法将所有的事件都切换到了recursiveScheduler对应的线程
// 这里将this传递进去了,所以最后会执行到当前的call()方法
recursiveScheduler.schedule(this);
}
}
里面会调用recursiveScheduler
,而该recursiveScheduler
是通过构造函数初始化的:
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
// 通过参数传递过来的Schedule创建对应的worker
this.recursiveScheduler = scheduler.createWorker();
}
而在此处我们的示例代码中用到的Scheduler
是AndroidSchedulers.mainThread()
:
public final class AndroidSchedulers {
private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();
private final Scheduler mainThreadScheduler;
private static AndroidSchedulers getInstance() {
for (;;) {
AndroidSchedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new AndroidSchedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
}
}
}
private AndroidSchedulers() {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();
Scheduler main = hook.getMainThreadScheduler();
if (main != null) {
mainThreadScheduler = main;
} else {
// 具体的实现类是LooperScheduler
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
}
}
/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}
...
}
我们看到这里的具体的Scheduler
的实现类是LooperScheduler
:
class LooperScheduler extends Scheduler {
private final Handler handler;
LooperScheduler(Looper looper) {
handler = new Handler(looper);
}
LooperScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
static class HandlerWorker extends Worker {
private final Handler handler;
private final RxAndroidSchedulersHook hook;
private volatile boolean unsubscribed;
HandlerWorker(Handler handler) {
this.handler = handler;
this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
}
@Override
public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}
return scheduledAction;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
}
static final class ScheduledAction implements Runnable, Subscription {
private final Action0 action;
private final Handler handler;
private volatile boolean unsubscribed;
ScheduledAction(Action0 action, Handler handler) {
this.action = action;
this.handler = handler;
}
@Override public void run() {
try {
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacks(this);
}
@Override public boolean isUnsubscribed() {
return unsubscribed;
}
}
}
他里面对应的Worker
是HandlerWorker
,他里面也是通过ScheduledAction
来实现的,他实现了Runnable
接口。通过主线程的MainLooper
创建一个Handler
然后去执行ScheduledAction
中的run()
方法。然后在run()
方法中调用了ObserveOnSubscriber.call()
,这便是它实现线程切换的原理。
更多内容请看下一篇文章RxJava详解(七)