前言
Scheduler体现了响应式编程思想:通过Scheduler实现了变化,并能向下传播。(变化传播)
线程变换功能模块:
- 让代码可以在不同的线程执行
- subscribeOn-订阅时的线程
- observeOn- 接收时的线程
- Scheduler - 实际做线程变换
1.RxJava1 线程变换
- Scheduler调度者
- Operator操作者符接口
- lift核心操作符
实例
Observable.
create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (!subscriber.isUnsubscribed()) {
Log.d(TAG, "currentThread:" + Thread.currentThread());
subscriber.onNext("test");
subscriber.onCompleted();
}
}
}).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext:" + s + "currentThread:" + Thread.currentThread());
}
});
运行
06-12 17:00:13.846 6227-6495/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[RxNewThreadScheduler-1,5,main]
06-12 17:00:13.856 6227-6227/com.haocai.rxjavademo D/kpioneer: onNext:testcurrentThread:Thread[main,5,main]
2.RxJava2 线程变换
- Scheduler调度者
- AbstractObservableWithUpStream抽象类
/*---------无背压---------*/
Observable.
create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
if (!emitter.isDisposed()) {
Log.d(TAG, "Observable currentThread:" + Thread.currentThread());
emitter.onNext("test");
emitter.onComplete();
}
}
}).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String o) {
Log.d(TAG, "Observable onNext:" + o);
Log.d(TAG, "Observable currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
/*---------有背压---------*/
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
if (!emitter.isCancelled()) {
Log.d(TAG, "Flowable currentThread:" + Thread.currentThread());
emitter.onNext("test");
emitter.onComplete();
}
}
}, BackpressureStrategy.DROP).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(String s) {
Log.d(TAG, "Flowable onNext:" + s);
Log.d(TAG, "Flowable currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
06-13 13:37:13.009 3063-3949/com.haocai.rxjavademo D/kpioneer: Observable currentThread:Thread[RxNewThreadScheduler-1,5,main]
06-13 13:37:13.019 3063-3063/com.haocai.rxjavademo D/kpioneer: Observable onNext:test
06-13 13:37:13.019 3063-3063/com.haocai.rxjavademo D/kpioneer: Observable currentThread:Thread[main,5,main]
06-13 13:37:13.019 3063-3950/com.haocai.rxjavademo D/kpioneer: Flowable currentThread:Thread[RxNewThreadScheduler-2,5,main]
06-13 13:37:13.029 3063-3063/com.haocai.rxjavademo D/kpioneer: Flowable onNext:test
06-13 13:37:13.029 3063-3063/com.haocai.rxjavademo D/kpioneer: Flowable currentThread:Thread[main,5,main]
3.RxJava1 Scheduler调度者源码分析
- Scheduler:抽象类
- Worker:真正做线程调度的类
- Action0: 在线程中执行的操作
- schedule: 实际做线程调度的方法,入参为Action0
public abstract class Scheduler {
public abstract Worker createWorker();
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
* Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
*/
public abstract static class Worker implements Subscription {
/**
* Schedules an Action for execution.
*
* @param action
* Action to schedule
* @return a subscription to be able to prevent or cancel the execution of the action
*/
public abstract Subscription schedule(Action0 action);
public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
return SchedulePeriodicHelper.schedulePeriodically(this, action,
initialDelay, period, unit, null);
}
public long now() {
return System.currentTimeMillis();
}
}
public long now() {
return System.currentTimeMillis();
}
@SuppressWarnings("unchecked")
public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
}
线程调度过程:
- 传入不同Scheduler来使用不同的线程
public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
- 用Scheduler创建Worker来使用真正的线程池
NewThreadWorker 中创建线程池
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor;
volatile boolean isUnsubscribed;
/** The purge frequency in milliseconds. */
private static final String FREQUENCY_KEY = "rx.scheduler.jdk6.purge-frequency-millis";
/** Force the use of purge (true/false). */
private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force";
private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
private static final boolean SHOULD_TRY_ENABLE_CANCEL_POLICY;
/** The purge frequency in milliseconds. */
public static final int PURGE_FREQUENCY;
private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
private static final AtomicReference<ScheduledExecutorService> PURGE;
/**
* Improves performance of {@link #tryEnableCancelPolicy(ScheduledExecutorService)}.
* Also, it works even for inheritance: {@link Method} of base class can be invoked on the instance of child class.
*/
private static volatile Object cachedSetRemoveOnCancelPolicyMethod;
/**
* Possible value of {@link #cachedSetRemoveOnCancelPolicyMethod} which means that cancel policy is not supported.
*/
private static final Object SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED = new Object();
static {
EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
PURGE = new AtomicReference<ScheduledExecutorService>();
PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);
// Forces the use of purge even if setRemoveOnCancelPolicy is available
final boolean purgeForce = Boolean.getBoolean(PURGE_FORCE_KEY);
final int androidApiVersion = PlatformDependent.getAndroidApiVersion();
// According to http://developer.android.com/reference/java/util/concurrent/ScheduledThreadPoolExecutor.html#setRemoveOnCancelPolicy(boolean)
// setRemoveOnCancelPolicy available since Android API 21
SHOULD_TRY_ENABLE_CANCEL_POLICY = !purgeForce
&& (androidApiVersion == ANDROID_API_VERSION_IS_NOT_ANDROID || androidApiVersion >= 21);
}
/**
* Registers the given executor service and starts the purge thread if not already started.
* <p>{@code public} visibility reason: called from other package(s) within RxJava
* @param service a scheduled thread pool executor instance
*/
public static void registerExecutor(ScheduledThreadPoolExecutor service) {
do {
ScheduledExecutorService exec = PURGE.get();
if (exec != null) {
break;
}
exec = Executors.newScheduledThreadPool(1, new RxThreadFactory(PURGE_THREAD_PREFIX));
if (PURGE.compareAndSet(null, exec)) {
exec.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
purgeExecutors();
}
}, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);
break;
} else {
exec.shutdownNow();
}
} while (true);
EXECUTORS.putIfAbsent(service, service);
}
/**
* Deregisters the executor service.
* <p>{@code public} visibility reason: called from other package(s) within RxJava
* @param service a scheduled thread pool executor instance
*/
public static void deregisterExecutor(ScheduledExecutorService service) {
EXECUTORS.remove(service);
}
/** Purges each registered executor and eagerly evicts shutdown executors. */
static void purgeExecutors() {
try {
// This prevents map.keySet to compile to a Java 8+ KeySetView return type
// and cause NoSuchMethodError on Java 6-7 runtimes.
Map<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> map = EXECUTORS;
Iterator<ScheduledThreadPoolExecutor> it = map.keySet().iterator();
while (it.hasNext()) {
ScheduledThreadPoolExecutor exec = it.next();
if (!exec.isShutdown()) {
exec.purge();
} else {
it.remove();
}
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
RxJavaHooks.onError(t);
}
}
/**
* Tries to enable the Java 7+ setRemoveOnCancelPolicy.
* <p>{@code public} visibility reason: called from other package(s) within RxJava.
* If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may
* be called to enable the backup option of purging the executors.
* @param executor the executor to call setRemoveOnCancelPolicy if available.
* @return true if the policy was successfully enabled
*/
public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) {
if (SHOULD_TRY_ENABLE_CANCEL_POLICY) { // NOPMD
final boolean isInstanceOfScheduledThreadPoolExecutor = executor instanceof ScheduledThreadPoolExecutor;
Method methodToCall;
if (isInstanceOfScheduledThreadPoolExecutor) {
final Object localSetRemoveOnCancelPolicyMethod = cachedSetRemoveOnCancelPolicyMethod;
if (localSetRemoveOnCancelPolicyMethod == SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED) {
return false;
}
if (localSetRemoveOnCancelPolicyMethod == null) {
Method method = findSetRemoveOnCancelPolicyMethod(executor);
cachedSetRemoveOnCancelPolicyMethod = method != null
? method
: SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED;
methodToCall = method;
} else {
methodToCall = (Method) localSetRemoveOnCancelPolicyMethod;
}
} else {
methodToCall = findSetRemoveOnCancelPolicyMethod(executor);
}
if (methodToCall != null) {
try {
methodToCall.invoke(executor, true);
return true;
} catch (InvocationTargetException e) {
RxJavaHooks.onError(e);
} catch (IllegalAccessException e) {
RxJavaHooks.onError(e);
} catch (IllegalArgumentException e) {
RxJavaHooks.onError(e);
}
}
}
return false;
}
/**
* Tries to find {@code "setRemoveOnCancelPolicy(boolean)"} method in the class of passed executor.
*
* @param executor whose class will be used to search for required method.
* @return {@code "setRemoveOnCancelPolicy(boolean)"} {@link Method}
* or {@code null} if required {@link Method} was not found.
*/
static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executor) {
// The reason for the loop is to avoid NoSuchMethodException being thrown on JDK 6
// which is more costly than looping through ~70 methods.
for (final Method method : executor.getClass().getMethods()) {
if (method.getName().equals("setRemoveOnCancelPolicy")) {
final Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1 && parameterTypes[0] == Boolean.TYPE) {
return method;
}
}
}
return null;
}
/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
/**
* Schedules the given action by wrapping it into a ScheduledAction on the
* underlying ExecutorService, returning the ScheduledAction.
* @param action the action to wrap and schedule
* @param delayTime the delay in execution
* @param unit the time unit of the delay
* @return the wrapper ScheduledAction
*/
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
@Override
public void unsubscribe() {
isUnsubscribed = true;
executor.shutdownNow();
deregisterExecutor(executor);
}
@Override
public boolean isUnsubscribed() {
return isUnsubscribed;
}
}
传入具体操作Action0
通过scheduler方法来实现调度
ScheduledAction 中 action.call(); 执行具体操作
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
/** */
private static final long serialVersionUID = -3962399486978279857L;
final SubscriptionList cancel;
final Action0 action;
public ScheduledAction(Action0 action) {
this.action = action;
this.cancel = new SubscriptionList();
}
public ScheduledAction(Action0 action, CompositeSubscription parent) {
this.action = action;
this.cancel = new SubscriptionList(new Remover(this, parent));
}
public ScheduledAction(Action0 action, SubscriptionList parent) {
this.action = action;
this.cancel = new SubscriptionList(new Remover2(this, parent));
}
@Override
public void run() {
try {
lazySet(Thread.currentThread());
action.call();
} catch (OnErrorNotImplementedException e) {
signalError(new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e));
} catch (Throwable e) {
signalError(new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e));
} finally {
unsubscribe();
}
}
void signalError(Throwable ie) {
RxJavaHooks.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
@Override
public boolean isUnsubscribed() {
return cancel.isUnsubscribed();
}
@Override
public void unsubscribe() {
if (!cancel.isUnsubscribed()) {
cancel.unsubscribe();
}
}
/**
* Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed
* if the underlying {@code action} completes or the this scheduled action is cancelled.
*
* @param s the Subscription to add
*/
public void add(Subscription s) {
cancel.add(s);
}
/**
* Adds the given Future to the unsubscription composite in order to support
* cancelling the underlying task in the executor framework.
* @param f the future to add
*/
public void add(final Future<?> f) {
cancel.add(new FutureCompleter(f));
}
/**
* Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
* cancelled or terminates, it can remove itself from this parent.
*
* @param parent
* the parent {@code CompositeSubscription} to add
*/
public void addParent(CompositeSubscription parent) {
cancel.add(new Remover(this, parent));
}
/**
* Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
* cancelled or terminates, it can remove itself from this parent.
*
* @param parent
* the parent {@code CompositeSubscription} to add
*/
public void addParent(SubscriptionList parent) {
cancel.add(new Remover2(this, parent));
}
/**
* Cancels the captured future if the caller of the call method
* is not the same as the runner of the outer ScheduledAction to
* prevent unnecessary self-interrupting if the unsubscription
* happens from the same thread.
*/
final class FutureCompleter implements Subscription {
private final Future<?> f;
FutureCompleter(Future<?> f) {
this.f = f;
}
@Override
public void unsubscribe() {
if (ScheduledAction.this.get() != Thread.currentThread()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
@Override
public boolean isUnsubscribed() {
return f.isCancelled();
}
}
/** Remove a child subscription from a composite when unsubscribing. */
static final class Remover extends AtomicBoolean implements Subscription {
/** */
private static final long serialVersionUID = 247232374289553518L;
final ScheduledAction s;
final CompositeSubscription parent;
public Remover(ScheduledAction s, CompositeSubscription parent) {
this.s = s;
this.parent = parent;
}
@Override
public boolean isUnsubscribed() {
return s.isUnsubscribed();
}
@Override
public void unsubscribe() {
if (compareAndSet(false, true)) {
parent.remove(s);
}
}
}
/** Remove a child subscription from a composite when unsubscribing. */
static final class Remover2 extends AtomicBoolean implements Subscription {
/** */
private static final long serialVersionUID = 247232374289553518L;
final ScheduledAction s;
final SubscriptionList parent;
public Remover2(ScheduledAction s, SubscriptionList parent) {
this.s = s;
this.parent = parent;
}
@Override
public boolean isUnsubscribed() {
return s.isUnsubscribed();
}
@Override
public void unsubscribe() {
if (compareAndSet(false, true)) {
parent.remove(s);
}
}
}
}
rxJava1: rxandroid中的Scheduler
通过Handler和Looper来实现执行在主线程
/** Android-specific Schedulers. */
public final class AndroidSchedulers {
private static final AtomicReference<AndroidSchedulers> INSTANCE = new AtomicReference<>();
private final Scheduler mainThreadScheduler;
private static AndroidSchedulers getInstance() {
for (;;) {
AndroidSchedulers current = INSTANCE.get();
if (current != null) {
return current;
}
current = new AndroidSchedulers();
if (INSTANCE.compareAndSet(null, current)) {
return current;
}
}
}
private AndroidSchedulers() {
RxAndroidSchedulersHook hook = RxAndroidPlugins.getInstance().getSchedulersHook();
Scheduler main = hook.getMainThreadScheduler();
if (main != null) {
mainThreadScheduler = main;
} else {
mainThreadScheduler = new LooperScheduler(Looper.getMainLooper());
}
}
/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread() {
return getInstance().mainThreadScheduler;
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new LooperScheduler(looper);
}
/**
* Resets the current {@link AndroidSchedulers} instance.
* This will re-init the cached schedulers on the next usage,
* which can be useful in testing.
*/
@Experimental
public static void reset() {
INSTANCE.set(null);
}
}
class LooperScheduler extends Scheduler {
private final Handler handler;
LooperScheduler(Looper looper) {
handler = new Handler(looper);
}
LooperScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
static class HandlerWorker extends Worker {
private final Handler handler;
private final RxAndroidSchedulersHook hook;
private volatile boolean unsubscribed;
HandlerWorker(Handler handler) {
this.handler = handler;
this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
}
@Override
public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}
action = hook.onSchedule(action);
ScheduledAction scheduledAction = new ScheduledAction(action, handler);
Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.
handler.sendMessageDelayed(message, unit.toMillis(delayTime));
if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}
return scheduledAction;
}
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
}
static final class ScheduledAction implements Runnable, Subscription {
private final Action0 action;
private final Handler handler;
private volatile boolean unsubscribed;
ScheduledAction(Action0 action, Handler handler) {
this.action = action;
this.handler = handler;
}
@Override public void run() {
try {
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
@Override public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacks(this);
}
@Override public boolean isUnsubscribed() {
return unsubscribed;
}
}
}
4.RxJava2 Scheduler调度者源码分析
public abstract class Scheduler {
static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
static {
CLOCK_DRIFT_TOLERANCE_NANOSECONDS = TimeUnit.MINUTES.toNanos(
Long.getLong("rx2.scheduler.drift-tolerance", 15));
}
public static long clockDriftTolerance() {
return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
}
@NonNull
public abstract io.reactivex.Scheduler.Worker createWorker();
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
public void start() {
}
public void shutdown() {
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final io.reactivex.Scheduler.Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
io.reactivex.Scheduler.DisposeTask task = new io.reactivex.Scheduler.DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final io.reactivex.Scheduler.Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
io.reactivex.Scheduler.PeriodicDirectTask periodicTask = new io.reactivex.Scheduler.PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}
@SuppressWarnings("unchecked")
@NonNull
public <S extends io.reactivex.Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
public abstract static class Worker implements Disposable {
/**
* Schedules a Runnable for execution without any time delay.
*
* <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
*
* @param run
* Runnable to schedule
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
/**
* Schedules an Runnable for execution at some point in the future specified by a time delay
* relative to the current time.
* <p>
* Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
* as if the {@link #schedule(Runnable)} was called.
*
* @param run
* the Runnable to schedule
* @param delay
* time to "wait" before executing the action; non-positive values indicate an non-delayed
* schedule
* @param unit
* the time unit of {@code delayTime}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
@NonNull
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new io.reactivex.Scheduler.Worker.PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
/**
* Returns the 'current time' of the Worker in the specified time unit.
* @param unit the time unit
* @return the 'current time'
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* Holds state and logic to calculate when the next delayed invocation
* of this task has to happen (accounting for clock drifts).
*/
final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
this.decoratedRun = decoratedRun;
this.sd = sd;
this.periodInNanoseconds = periodInNanoseconds;
lastNowNanoseconds = firstNowNanoseconds;
startInNanoseconds = firstStartInNanoseconds;
}
@Override
public void run() {
decoratedRun.run();
if (!sd.isDisposed()) {
long nextTick;
long nowNanoseconds = now(TimeUnit.NANOSECONDS);
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
|| nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
nextTick = nowNanoseconds + periodInNanoseconds;
/*
* Shift the start point back by the drift as if the whole thing
* started count periods ago.
*/
startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
} else {
nextTick = startInNanoseconds + (++count * periodInNanoseconds);
}
lastNowNanoseconds = nowNanoseconds;
long delay = nextTick - nowNanoseconds;
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}
static final class PeriodicDirectTask
implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable run;
@NonNull
final io.reactivex.Scheduler.Worker worker;
volatile boolean disposed;
PeriodicDirectTask(@NonNull Runnable run, @NonNull io.reactivex.Scheduler.Worker worker) {
this.run = run;
this.worker = worker;
}
@Override
public void run() {
if (!disposed) {
try {
run.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
worker.dispose();
throw ExceptionHelper.wrapOrThrow(ex);
}
}
}
@Override
public void dispose() {
disposed = true;
worker.dispose();
}
@Override
public boolean isDisposed() {
return disposed;
}
@Override
public Runnable getWrappedRunnable() {
return run;
}
}
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final io.reactivex.Scheduler.Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull io.reactivex.Scheduler.Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
}
- 传入不同的Scheduler来使用不同的线程
- 用Scheduler创建Worker来使用真正的线程池
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
/**
* Schedules the given runnable on the underlying executor directly and
* returns its future wrapped into a Disposable.
* @param run the Runnable to execute in a delayed fashion
* @param delayTime the delay amount
* @param unit the delay time unit
* @return the ScheduledRunnable instance
*/
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
/**
* Schedules the given runnable periodically on the underlying executor directly
* and returns its future wrapped into a Disposable.
* @param run the Runnable to execute in a periodic fashion
* @param initialDelay the initial delay amount
* @param period the repeat period amount
* @param unit the time unit for both the initialDelay and period
* @return the ScheduledRunnable instance
*/
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (period <= 0L) {
InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
try {
Future<?> f;
if (initialDelay <= 0L) {
f = executor.submit(periodicWrapper);
} else {
f = executor.schedule(periodicWrapper, initialDelay, unit);
}
periodicWrapper.setFirst(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try {
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
/**
* Wraps the given runnable into a ScheduledRunnable and schedules it
* on the underlying ScheduledExecutorService.
* <p>If the schedule has been rejected, the ScheduledRunnable.wasScheduled will return
* false.
* @param run the runnable instance
* @param delayTime the time to delay the execution
* @param unit the time unit
* @param parent the optional tracker parent to add the created ScheduledRunnable instance to before it gets scheduled
* @return the ScheduledRunnable instance
*/
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
/**
* Shuts down the underlying executor in a non-interrupting fashion.
*/
public void shutdown() {
if (!disposed) {
disposed = true;
executor.shutdown();
}
}
@Override
public boolean isDisposed() {
return disposed;
}
}
- 传入具体操作Runnable
- 通过schedule方法来实现调度
RxJava2: rxandroid中的Scheduler
与RxJava1类似通过Handler和Looper来实现执行在主线程
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
delegate.run();
} catch (Throwable t) {
RxJavaPlugins.onError(t);
}
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
5.RxJava1 Scheduler线程变换仿写
主要对象:
Switcher线程切换者
- 用于线程切换
- 有一个createWorker方法
/**
* Created by Xionghu on 2018/6/14.
* Desc: 用于线程切换
*/
public abstract class Switcher {
public abstract Worker createWorker();
public static abstract class Worker implements Calling{
public abstract Calling switches(Action0 action0);
}
}
Worker
- 真正执行线程变换的类
- 通过switches方法执行变换
NewThreadSwitcher
- 切换到新线程的Switcher
- 实现createWorker方法
/**
* Created by Xionghu on 2018/6/14.
* Desc: 新线程的switcher
*/
public class NewThreadSwitcher extends Switcher {
@Override
public Worker createWorker() {
return new NewThreadWorker();
}
}
NewThreadWorker
- 有一个只有一个线程的线程池
- 实现切换线程的switches方法
- 将真正的操作用Runnable包裹丢入线程池执行
/**
* Created by Xionghu on 2018/6/14.
* Desc:新线程的工作类
*/
public class NewThreadWorker extends Switcher.Worker {
//newScheduledThreadPool :创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable runnable) {
return new Thread(runnable, " NewThreadWorker");
}
});
private volatile boolean mIsUnCalled;
@Override
public void unCall() {
mIsUnCalled = true;
}
@Override
public boolean isUnCalled() {
return mIsUnCalled;
}
@Override
public Calling switches(Action0 action0) {
SwitcherAction switcherAction = new SwitcherAction(action0);
mExecutor.submit(switcherAction);
return switcherAction;
}
private static class SwitcherAction implements Runnable, Calling {
private final Action0 action0;
private volatile boolean mIsUnCalled;
public SwitcherAction(Action0 action0) {
this.action0 = action0;
}
@Override
public void unCall() {
mIsUnCalled = true;
}
@Override
public boolean isUnCalled() {
return mIsUnCalled;
}
@Override
public void run() {
action0.call();
}
}
}
LooperSwitcher
Android中切换到某个线程的Looper中
/**
* Created by Xionghu on 2018/6/14.
* Desc: 用于Android中Looper的Switcher
*/
public class LooperSwitcher extends Switcher {
private Handler mHandler;
public LooperSwitcher(Looper looper) {
mHandler = new Handler(looper);
}
@Override
public Worker createWorker() {
return new HandlerWorker(mHandler);
}
}
HandlerWorker
将具体操作发送到指定的Looper中执行
import android.os.Handler;
import android.os.Message;
/**
* Created by Xionghu on 2018/6/14.
* Desc: 用于Android 的Worker
*/
public class HandlerWorker extends Switcher.Worker {
private final Handler mHandler;
private volatile boolean mIsUnCalled;
public HandlerWorker(Handler mHandler) {
this.mHandler = mHandler;
}
@Override
public void unCall() {
mIsUnCalled = true;
mHandler.removeCallbacksAndMessages(this);
}
@Override
public boolean isUnCalled() {
return mIsUnCalled;
}
@Override
public Calling switches(Action0 action0) {
SwitcherAction switcherAction = new SwitcherAction(action0, mHandler);
Message message = Message.obtain(mHandler, switcherAction);
message.obj = this;
mHandler.sendMessage(message);
return switcherAction;
}
private static class SwitcherAction implements Runnable, Calling {
private final Action0 action0;
private final Handler handler;
private volatile boolean mIsUnCalled;
public SwitcherAction(Action0 action0, Handler handler) {
this.action0 = action0;
this.handler = handler;
}
@Override
public void unCall() {
mIsUnCalled = true;
handler.removeCallbacks(this);
}
@Override
public boolean isUnCalled() {
return mIsUnCalled;
}
@Override
public void run() {
action0.call();
}
}
}
6.RxJava2 Scheduler线程变换仿写
Switcher线程切换者
- 用于线程切换
- 有一个createWorker方法
- 本身就包含一个switches方法(与RxJava1有区别)
/**
* Created by Xionghu on 2018/6/14.
* Desc: 用于线程切换的抽象类
*/
public abstract class Switcher {
public abstract Worker createWorker();
public Release switches(final Runnable runnable) {
Worker worker = createWorker();
worker.switches(new Runnable() {
@Override
public void run() {
runnable.run();
}
});
return worker;
}
public static abstract class Worker implements Release {
public abstract Release switches(Runnable runnable);
}
}
Worker
- 真正执行线程变换的类
- 通过switches方法执行变换
NewThreadSwitcher
- 切换到新线程的Switcher
- 实现createWorker方法
/**
* Created by Xionghu on 2018/6/14.
* Desc: 新线程的switcher
*/
public class NewThreadSwitcher extends Switcher {
@Override
public Worker createWorker() {
return new NewThreadWorker();
}
}
NewThreadWorker
- 有一个只有一个线程的线程池
- 实现切换线程的switches方法
- 将真正的操作用Runnable包裹丢入线程池执行
import android.support.annotation.NonNull;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* Created by Xionghu on 2018/6/14.
* Desc:新线程的工作类
*/
public class NewThreadWorker extends Switcher.Worker {
private final ExecutorService mExecutor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable runnable) {
return new Thread(runnable, "NewThreadWorker");
}
});
private volatile boolean mIsReleased;
@Override
public boolean isReleased() {
return mIsReleased;
}
@Override
public void release() {
mIsReleased = true;
}
@Override
public Release switches(Runnable runnable) {
SwitcherAction switcherAction = new SwitcherAction(runnable);
mExecutor.submit((Callable<Object>) switcherAction);
return switcherAction;
}
private static class SwitcherAction implements Runnable, Callable<Object>, Release {
private final Runnable mRunnable;
private volatile boolean mIsReleased;
public SwitcherAction(Runnable mRunnable) {
this.mRunnable = mRunnable;
}
@Override
public boolean isReleased() {
return mIsReleased;
}
@Override
public void release() {
mIsReleased = true;
}
@Override
public void run() {
mRunnable.run();
}
@Override
public Object call() throws Exception {
run();
return null;
}
}
}
LooperSwitcher
Android 中切换到某个线程的Looper中
HandlerWorker
将具体操作发送到指定的Looper中执行
7.RxJava1 subscribeOn 原理分析
- 通过OnSubscribe来做原理
- 利用Scheduler将发出动作放到线程中执行
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler;
this.source = source;
this.requestOn = requestOn;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);
inner.schedule(parent);
}
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> actual;
final boolean requestOn;
final Worker worker;
Observable<T> source;
Thread t;
SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
this.actual = actual;
this.requestOn = requestOn;
this.worker = worker;
this.source = source;
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}
@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
src.unsafeSubscribe(this);
}
@Override
public void setProducer(final Producer p) {
actual.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread() || !requestOn) {
p.request(n);
} else {
worker.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
}
利用了代理机制
8.RxJava2 subscribeOn 原理分析
8.1.RxJava2(无背压) subscribeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
- 继承AbstractObservableWithUpstream
- 实现subscribeActual方法
- 利用Scheduler将发送动作放到线程中执行
8.2.RxJava2(有背压) subscribeOn
@CheckReturnValue
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.CUSTOM)
@Experimental
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
}
public final class FlowableSubscribeOn<T> extends AbstractFlowableWithUpstream<T , T> {
final Scheduler scheduler;
final boolean nonScheduledRequests;
public FlowableSubscribeOn(Flowable<T> source, Scheduler scheduler, boolean nonScheduledRequests) {
super(source);
this.scheduler = scheduler;
this.nonScheduledRequests = nonScheduledRequests;
}
@Override
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
s.onSubscribe(sos);
w.schedule(sos);
}
static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 8094547886072529208L;
final Subscriber<? super T> actual;
final Scheduler.Worker worker;
final AtomicReference<Subscription> s;
final AtomicLong requested;
final boolean nonScheduledRequests;
Publisher<T> source;
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
this.actual = actual;
this.worker = worker;
this.source = source;
this.s = new AtomicReference<Subscription>();
this.requested = new AtomicLong();
this.nonScheduledRequests = !requestOn;
}
@Override
public void run() {
lazySet(Thread.currentThread());
Publisher<T> src = source;
source = null;
src.subscribe(this);
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(this.s, s)) {
long r = requested.getAndSet(0L);
if (r != 0L) {
requestUpstream(r, s);
}
}
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
worker.dispose();
}
@Override
public void onComplete() {
actual.onComplete();
worker.dispose();
}
@Override
public void request(final long n) {
if (SubscriptionHelper.validate(n)) {
Subscription s = this.s.get();
if (s != null) {
requestUpstream(n, s);
} else {
BackpressureHelper.add(requested, n);
s = this.s.get();
if (s != null) {
long r = requested.getAndSet(0L);
if (r != 0L) {
requestUpstream(r, s);
}
}
}
}
}
void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n));
}
}
@Override
public void cancel() {
SubscriptionHelper.cancel(s);
worker.dispose();
}
static final class Request implements Runnable {
private final Subscription s;
private final long n;
Request(Subscription s, long n) {
this.s = s;
this.n = n;
}
@Override
public void run() {
s.request(n);
}
}
}
}
- 继承AbstractFlowableWithUpstream
- 实现subscribeActual方法
- 利用Scheduler将发出动作放到线程中执行
9. RxJava1 subscribeOn仿写
public final Caller<T> callOn(Switcher switcher) {
return create(new OperatorCallOn<>(switcher, this));
}
/**
* Created by Xionghu on 2018/6/14.
* Desc: 用于callOn的OnCall
*/
public class OperatorCallOn<T> implements Caller.OnCall<T> {
private final Switcher switcher;
private final Caller<T> tCaller;
public OperatorCallOn(Switcher switcher, Caller<T> tCaller) {
this.switcher = switcher;
this.tCaller = tCaller;
}
@Override
public void call(final Receiver<T> tReceiver) {
Switcher.Worker worker = switcher.createWorker();
worker.switches(new Action0() {
@Override
public void call() {
Receiver<T> tReceiver1 = new Receiver<T>() {
@Override
public void onCompleted() {
tReceiver.onCompleted();
}
@Override
public void onError(Throwable t) {
tReceiver.onError(t);
}
@Override
public void onReceive(T t) {
tReceiver.onReceive(t);
}
};
tCaller.call(tReceiver1);
}
});
}
}
用于callOn的OnCall
- 持有原Caller和Switcher
- 创建新的Receiver包裹旧的丢入线程中
运行
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import com.haocai.mylibrary.rxJava1.Caller;
import com.haocai.mylibrary.rxJava1.NewThreadSwitcher;
import com.haocai.mylibrary.rxJava1.Receiver;
import com.haocai.rxjavademo.R;
import butterknife.ButterKnife;
import butterknife.OnClick;
/**
* Created by Xionghu on 2018/6/11.
* Desc: .RxJava1 subscribeOn仿写
*/
public class Lesson3_2Activity extends AppCompatActivity {
public static final String TAG = "kpioneer";
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
Caller.
create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
if (!stringReceiver.isUnCalled()) {
stringReceiver.onReceive("test");
stringReceiver.onCompleted();
}
}
}).
callOn(new NewThreadSwitcher()).
call(new Receiver<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(String o) {
Log.d(TAG, "onReceive:" + o);
Log.d(TAG, "currentThread:" + Thread.currentThread());
}
});
}
}
06-15 14:19:02.219 17153-17366/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-15 14:19:02.219 17153-17366/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]
10. RxJava2 subscribeOn仿写
10.1RxJava2(无背压)
public Caller<T> callOn(Switcher switcher) {
return new CallerCallOn<>(this, switcher);
}
/**
* Created by Xionghu on 2018/6/15.
* Desc: 用于callOn
*/
public class CallerCallOn<T> extends CallerWithUpstream<T, T> {
private Switcher mSwitcher;
public CallerCallOn(Caller<T> source, Switcher mSwitcher) {
super(source);
this.mSwitcher = mSwitcher;
}
@Override
protected void callActual(Callee<T> callee) {
final CallOnCallee<T> tCallOnCallee = new CallOnCallee<>(callee);
callee.onCall(tCallOnCallee);
mSwitcher.switches(new Runnable() {
@Override
public void run() {
source.call(tCallOnCallee);
}
});
}
private static final class CallOnCallee<T> implements Callee<T>, Release {
private final Callee<T> callee;
public CallOnCallee(Callee<T> callee) {
this.callee = callee;
}
@Override
public void onCall(Release release) {
}
@Override
public void onReceive(T t) {
callee.onReceive(t);
}
@Override
public void onCompleted() {
callee.onCompleted();
}
@Override
public void onError(Throwable t) {
callee.onError(t);
}
@Override
public boolean isReleased() {
return false;
}
@Override
public void release() {
}
}
}
CallerCallOn
- 继承自CallerWithUpstream
- 持有原Caller和Switcher
- 创建新的Callee包裹旧的丢入线程中
10.2RxJava2(有背压)
public Telephoner<T> callOn(Switcher switcher) {
return new TelephonerCallOn<>(this, switcher);
}
import com.haocai.mylibrary.rxJava2.Switcher;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by Xionghu on 2018/6/15.
* Desc:用于callOn
*/
public class TelephonerCallOn<T> extends TelephonerWithUpstream<T, T> {
private final Switcher mSwitcher;
public TelephonerCallOn(Telephoner<T> source, Switcher switcher) {
super(source);
mSwitcher = switcher;
}
@Override
protected void callActual(Receiver<T> receiver) {
final CallOnReceiver<T> tCallOnReceiver = new CallOnReceiver<>(receiver);
receiver.onCall(tCallOnReceiver);
mSwitcher.switches(new Runnable() {
@Override
public void run() {
source.call(tCallOnReceiver);
}
});
}
private static final class CallOnReceiver<T> extends AtomicLong implements Receiver<T>, Drop {
private final Receiver<T> mReceiver;
public CallOnReceiver(Receiver<T> receiver) {
mReceiver = receiver;
}
@Override
public void request(long n) {
BackpressureHelper.add(this, n);
}
@Override
public void drop() {
}
@Override
public void onCall(Drop d) {
mReceiver.onCall(d);
}
@Override
public void onReceive(T t) {
if (get() != 0) {
mReceiver.onReceive(t);
BackpressureHelper.produced(this, 1);
}
}
@Override
public void onError(Throwable t) {
mReceiver.onError(t);
}
@Override
public void onCompleted() {
mReceiver.onCompleted();
}
}
}
TelephonerCallOn
- 继承自TelephonerWithUpstream
- 持有原Telephoner和Switcher
- 创建新的Receiver包裹旧的丢入线程中
10.3 运行
/**
* Created by Xionghu on 2018/6/11.
* Desc: .RxJava2 subscribeOn仿写
*/
public class Lesson3_3Activity extends AppCompatActivity {
public static final String TAG = "kpioneer";
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
/*---------无背压---------*/
Caller.
create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("test");
callerEmitter.onCompleted();
}
}).
callOn(new NewThreadSwitcher()).
call(new Callee<String>() {
@Override
public void onCall(Release release) {
}
@Override
public void onReceive(String string) {
Log.d(TAG, "无背压:onReceive:" + string);
Log.d(TAG, "无背压:currentThread:" + Thread.currentThread());
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
});
/*---------有背压---------*/
Telephoner.
create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("test");
telephonerEmitter.onCompleted();
}
}).
callOn(new NewThreadSwitcher()).
call(new Receiver<String>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE);
}
@Override
public void onReceive(String s) {
Log.d(TAG, "有背压:onReceive:" + s);
Log.d(TAG, "有背压:currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
}
}
06-15 16:13:27.002 813-1150/com.haocai.rxjavademo D/kpioneer: 无背压:onReceive:test
06-15 16:13:27.003 813-1150/com.haocai.rxjavademo D/kpioneer: 无背压:currentThread:Thread[NewThreadWorker,5,main]
06-15 16:13:27.011 813-1151/com.haocai.rxjavademo D/kpioneer: 有背压:onReceive:test
06-15 16:13:27.011 813-1151/com.haocai.rxjavademo D/kpioneer: 有背压:currentThread:Thread[NewThreadWorker,5,main]
11 RxJava1 observeOn原理分析
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
this(scheduler, delayError, RxRingBuffer.SIZE);
}
/**
* @param scheduler the scheduler to use
* @param delayError delay errors until all normal events are emitted in the other thread?
* @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
*/
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
public static <T> Operator<T, T> rebatch(final int n) {
return new Operator<T, T>() {
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(Schedulers.immediate(), child, false, n);
parent.init();
return parent;
}
};
}
/** Observe through individual queue per observer. */
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final boolean delayError;
final Queue<Object> queue;
/** The emission threshold that should trigger a replenishing request. */
final int limit;
// the status of the current stream
volatile boolean finished;
final AtomicLong requested = new AtomicLong();
final AtomicLong counter = new AtomicLong();
/**
* The single exception if not null, should be written before setting finished (release) and read after
* reading finished (acquire).
*/
Throwable error;
/** Remembers how many elements have been emitted before the requests run out. */
long emitted;
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
this.child = child;
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}
void init() {
// don't want this code in the constructor because `this` can escape through the
// setProducer call
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
if (!queue.offer(NotificationLite.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule();
}
@Override
public void onCompleted() {
if (isUnsubscribed() || finished) {
return;
}
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
if (isUnsubscribed() || finished) {
RxJavaHooks.onError(e);
return;
}
error = e;
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
@Override
public void call() {
long missed = 1L;
long currentEmission = emitted;
// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
// of the constant fields
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
// requested and counter are not included to avoid JIT issues with register spilling
// and their access is is amortized because they are part of the outer loop which runs
// less frequently (usually after each bufferSize elements)
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(NotificationLite.<T>getValue(v));
currentEmission++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}
if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}
emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}
}
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}
if (done) {
if (delayError) {
if (isEmpty) {
Throwable e = error;
try {
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
}
} else {
Throwable e = error;
if (e != null) {
q.clear();
try {
a.onError(e);
} finally {
recursiveScheduler.unsubscribe();
}
return true;
} else
if (isEmpty) {
try {
a.onCompleted();
} finally {
recursiveScheduler.unsubscribe();
}
return true;
}
}
}
return false;
}
}
}
用于observeOn的Operator
- 是observeOn的Operator
- 通过lift去变换这个Operator
- 在Operator中返回一个用于observeOn的Subscriber
用于observeOn的Subscriber
在调用到onNext等方法时丢到线程中去执行
12 RxJava2 observeOn原理分析
12.1 RxJava2(无背压) observeOn原理分析
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
void drainFused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
Throwable ex = error;
if (!delayError && d && ex != null) {
actual.onError(error);
worker.dispose();
return;
}
actual.onNext(null);
if (d) {
ex = error;
if (ex != null) {
actual.onError(ex);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (cancelled) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public int requestFusion(int mode) {
if ((mode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() throws Exception {
return queue.poll();
}
@Override
public void clear() {
queue.clear();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
}
}
- 继承自AbstractObservableWithUpstream
- 利用subscribeActual方法
- 创建一个新的Observer包裹旧的
- 在调用到onNext等方法时丢到线程中去执行
12.2 RxJava2(有背压) observeOn原理分析
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new FlowableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int prefetch;
public FlowableObserveOn(
Flowable<T> source,
Scheduler scheduler,
boolean delayError,
int prefetch) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.prefetch = prefetch;
}
@Override
public void subscribeActual(Subscriber<? super T> s) {
Worker worker = scheduler.createWorker();
if (s instanceof ConditionalSubscriber) {
source.subscribe(new ObserveOnConditionalSubscriber<T>(
(ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
} else {
source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
}
}
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
private static final long serialVersionUID = -8241002408341274697L;
final Worker worker;
final boolean delayError;
final int prefetch;
final int limit;
final AtomicLong requested;
Subscription s;
SimpleQueue<T> queue;
volatile boolean cancelled;
volatile boolean done;
Throwable error;
int sourceMode;
long produced;
boolean outputFused;
BaseObserveOnSubscriber(
Worker worker,
boolean delayError,
int prefetch) {
this.worker = worker;
this.delayError = delayError;
this.prefetch = prefetch;
this.requested = new AtomicLong();
this.limit = prefetch - (prefetch >> 2);
}
@Override
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
if (!queue.offer(t)) {
s.cancel();
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
trySchedule();
}
@Override
public final void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
trySchedule();
}
@Override
public final void onComplete() {
if (!done) {
done = true;
trySchedule();
}
}
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(requested, n);
trySchedule();
}
}
@Override
public final void cancel() {
if (cancelled) {
return;
}
cancelled = true;
s.cancel();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
worker.schedule(this);
}
@Override
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
abstract void runBackfused();
abstract void runSync();
abstract void runAsync();
final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
if (cancelled) {
clear();
return true;
}
if (d) {
if (delayError) {
if (empty) {
Throwable e = error;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
Throwable e = error;
if (e != null) {
clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
@Override
public final int requestFusion(int requestedMode) {
if ((requestedMode & ASYNC) != 0) {
outputFused = true;
return ASYNC;
}
return NONE;
}
@Override
public final void clear() {
queue.clear();
}
@Override
public final boolean isEmpty() {
return queue.isEmpty();
}
}
static final class ObserveOnSubscriber<T> extends BaseObserveOnSubscriber<T>
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -4547113800637756442L;
final Subscriber<? super T> actual;
ObserveOnSubscriber(
Subscriber<? super T> actual,
Worker worker,
boolean delayError,
int prefetch) {
super(worker, delayError, prefetch);
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> f = (QueueSubscription<T>) s;
int m = f.requestFusion(ANY | BOUNDARY);
if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;
actual.onSubscribe(this);
return;
} else
if (m == ASYNC) {
sourceMode = ASYNC;
queue = f;
actual.onSubscribe(this);
s.request(prefetch);
return;
}
}
queue = new SpscArrayQueue<T>(prefetch);
actual.onSubscribe(this);
s.request(prefetch);
}
}
@Override
void runSync() {
int missed = 1;
final Subscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
a.onError(ex);
worker.dispose();
return;
}
if (cancelled) {
return;
}
if (v == null) {
a.onComplete();
worker.dispose();
return;
}
a.onNext(v);
e++;
}
if (cancelled) {
return;
}
if (q.isEmpty()) {
a.onComplete();
worker.dispose();
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runAsync() {
int missed = 1;
final Subscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
e++;
if (e == limit) {
if (r != Long.MAX_VALUE) {
r = requested.addAndGet(-e);
}
s.request(e);
e = 0L;
}
}
if (e == r && checkTerminated(done, q.isEmpty(), a)) {
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runBackfused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
actual.onNext(null);
if (d) {
Throwable e = error;
if (e != null) {
actual.onError(e);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
if (v != null && sourceMode != SYNC) {
long p = produced + 1;
if (p == limit) {
produced = 0;
s.request(p);
} else {
produced = p;
}
}
return v;
}
}
static final class ObserveOnConditionalSubscriber<T>
extends BaseObserveOnSubscriber<T> {
private static final long serialVersionUID = 644624475404284533L;
final ConditionalSubscriber<? super T> actual;
long consumed;
ObserveOnConditionalSubscriber(
ConditionalSubscriber<? super T> actual,
Worker worker,
boolean delayError,
int prefetch) {
super(worker, delayError, prefetch);
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> f = (QueueSubscription<T>) s;
int m = f.requestFusion(ANY | BOUNDARY);
if (m == SYNC) {
sourceMode = SYNC;
queue = f;
done = true;
actual.onSubscribe(this);
return;
} else
if (m == ASYNC) {
sourceMode = ASYNC;
queue = f;
actual.onSubscribe(this);
s.request(prefetch);
return;
}
}
queue = new SpscArrayQueue<T>(prefetch);
actual.onSubscribe(this);
s.request(prefetch);
}
}
@Override
void runSync() {
int missed = 1;
final ConditionalSubscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long e = produced;
for (;;) {
long r = requested.get();
while (e != r) {
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
a.onError(ex);
worker.dispose();
return;
}
if (cancelled) {
return;
}
if (v == null) {
a.onComplete();
worker.dispose();
return;
}
if (a.tryOnNext(v)) {
e++;
}
}
if (cancelled) {
return;
}
if (q.isEmpty()) {
a.onComplete();
worker.dispose();
return;
}
int w = get();
if (missed == w) {
produced = e;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runAsync() {
int missed = 1;
final ConditionalSubscriber<? super T> a = actual;
final SimpleQueue<T> q = queue;
long emitted = produced;
long polled = consumed;
for (;;) {
long r = requested.get();
while (emitted != r) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
if (a.tryOnNext(v)) {
emitted++;
}
polled++;
if (polled == limit) {
s.request(polled);
polled = 0L;
}
}
if (emitted == r && checkTerminated(done, q.isEmpty(), a)) {
return;
}
int w = get();
if (missed == w) {
produced = emitted;
consumed = polled;
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
} else {
missed = w;
}
}
}
@Override
void runBackfused() {
int missed = 1;
for (;;) {
if (cancelled) {
return;
}
boolean d = done;
actual.onNext(null);
if (d) {
Throwable e = error;
if (e != null) {
actual.onError(e);
} else {
actual.onComplete();
}
worker.dispose();
return;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
@Nullable
@Override
public T poll() throws Exception {
T v = queue.poll();
if (v != null && sourceMode != SYNC) {
long p = consumed + 1;
if (p == limit) {
consumed = 0;
s.request(p);
} else {
consumed = p;
}
}
return v;
}
}
}
- 继承自AbstractFlowableWithUpstream
- 利用subscribeActual方法
- 创建一个新的Subscriber包裹旧的.
- 在调用到onNext等方法时丢到线程中去执行
13 RxJava1 observeOn仿写
/**
* Created by Xionghu on 2018/6/14.
* Desc: 用于CallbackOn
*/
public class OperatorCallbackOn<T> implements Caller.Operator<T, T> {
private final Switcher switcher;
public OperatorCallbackOn(Switcher switcher) {
this.switcher = switcher;
}
@Override
public Receiver<T> call(final Receiver<T> tReceiver) {
return new CallbackOnReceiver<>(tReceiver, switcher);
}
private static final class CallbackOnReceiver<T> extends Receiver<T> implements Action0 {
private final Receiver<T> tReceiver;
private final Switcher.Worker worker;
private final Queue<T> tQueue = new LinkedList<>();
public CallbackOnReceiver(Receiver<T> tReceiver, Switcher switcher) {
this.tReceiver = tReceiver;
this.worker = switcher.createWorker();
}
@Override
public void call() {
T t = tQueue.poll(); //移除元素,如果队列为空,则返回null
tReceiver.onReceive(t);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(T t) {
tQueue.offer(t);// offer:添加一个元素并返回true 如果队列已满,则返回false
switches();
}
private void switches() {
worker.switches(this);
}
}
}
OperatorCallbackOn
- 持有Switcher
- call方法中返回用于callbackOn的Receiver
CallbackOnReceiver
- 持有原Caller和Switcher
- 在onReceive等方法中做调度
- 调度后用原Receiver再调用onReceive
实例
/**
* Created by Xionghu on 2018/6/11.
* Desc: .RxJava1 observeOn
*/
public class Lesson3_4Activity extends AppCompatActivity {
public static final String TAG = "kpioneer";
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
Caller.
create(new Caller.OnCall<String>() {
@Override
public void call(Receiver<String> stringReceiver) {
stringReceiver.onReceive("test");
Log.d(TAG, "currentThread:" + Thread.currentThread());
stringReceiver.onCompleted();
}
}).
callOn(new NewThreadSwitcher()).
callbackOn(new LooperSwitcher(getMainLooper())).
call(new Receiver<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onReceive(String s) {
Log.d(TAG, "onReceive:" + s);
Log.d(TAG, "currentThread:" + Thread.currentThread());
}
});
}
}
06-19 10:05:36.245 11194-11346/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[ NewThreadWorker,5,main]
06-19 10:05:36.265 11194-11194/com.haocai.rxjavademo D/kpioneer: onReceive:test
06-19 10:05:36.265 11194-11194/com.haocai.rxjavademo D/kpioneer: currentThread:Thread[main,5,main]
14 RxJava2 observeOn仿写
14.1 RxJava2(无背压) observeOn
public Caller<T> callbackOn(Switcher switcher) {
return new CallerCallbackOn<>(this, switcher);
}
/**
* Created by Xionghu on 2018/6/19.
* Desc: 用于callbackon
*/
public class CallerCallbackOn<T> extends CallerWithUpstream<T, T> {
private final Switcher mSwitcher;
public CallerCallbackOn(Caller<T> source, Switcher mSwitcher) {
super(source);
this.mSwitcher = mSwitcher;
}
@Override
protected void callActual(Callee<T> callee) {
source.call(new CallbackOnCallee<T>(callee, mSwitcher));
}
private static final class CallbackOnCallee<T> implements Callee<T>, Runnable {
private final Callee<T> mCallee;
private final Switcher.Worker worker;
private final Queue<T> tQueue = new LinkedList<>();
public CallbackOnCallee(Callee<T> mCallee, Switcher switcher) {
this.mCallee = mCallee;
this.worker = switcher.createWorker();
}
@Override
public void onCall(Release release) {
}
@Override
public void onReceive(T t) {
tQueue.offer(t);
switches();
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
@Override
public void run() {
T t = tQueue.poll();
mCallee.onReceive(t);
}
private void switches() {
worker.switches(this);
}
}
}
CallerCallbackOn
- 持有Switcher
- 实现callActual方法
- 用原有Caller去call一个用于线程切换的Callee
CallbackOnCallee
- 持有原Callee和Switcher
- 在onReceive等方法中做调度
- 调度后用原Callee在调用onReceive
14.2 RxJava2(有背压) observeOn
public Telephoner<T> callbackOn(Switcher switcher){
return new TelephonerCallbackOn<>(this,switcher);
}
/**
* Created by Xionghu on 2018/6/19.
* Desc:用于callbackon
*/
public class TelephonerCallbackOn<T> extends TelephonerWithUpstream<T, T> {
private final Switcher mSwitcher;
public TelephonerCallbackOn(Telephoner<T> source, Switcher mSwitcher) {
super(source);
this.mSwitcher = mSwitcher;
}
@Override
protected void callActual(Receiver<T> receiver) {
source.call(new CallbackOnReceiver<>(receiver, mSwitcher));
}
private static final class CallbackOnReceiver<T> implements Receiver<T>, Runnable {
private final Receiver<T> tReceiver;
private final Switcher.Worker worker;
private final Queue<T> tQueue = new LinkedList<>();
public CallbackOnReceiver(Receiver<T> tReceiver, Switcher switcher) {
this.tReceiver = tReceiver;
this.worker = switcher.createWorker();
}
@Override
public void onCall(Drop d) {
tReceiver.onCall(d);
}
@Override
public void onReceive(T t) {
tQueue.offer(t);
switches();
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
@Override
public void run() {
T t = tQueue.poll();
tReceiver.onReceive(t);
}
private void switches() {
worker.switches(this);
}
}
}
TelephonerCallbackOn
- 持有Switcher
- 实现callActual方法
- 用原Telephoner去call一个用于线程切换的Receiver
CallbackOnReceiver
- 持有原Telephoner和Switcher
- 在onReceive等方法中做调度
- 调度后用原Receiver再调用onReceive
实例
/**
* Created by Xionghu on 2018/6/11.
* Desc: .RxJava2 observeOn仿写
*/
public class Lesson3_5Activity extends AppCompatActivity {
public static final String TAG = "kpioneer";
@Override
protected void onCreate(final Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_custom_test);
ButterKnife.bind(this);
}
@OnClick(R.id.testDo)
public void onViewClicked() {
/*---------无背压---------*/
Caller.
create(new CallerOnCall<String>() {
@Override
public void call(CallerEmitter<String> callerEmitter) {
callerEmitter.onReceive("test");
Log.d(TAG, "无背压 currentThread:" + Thread.currentThread());
callerEmitter.onCompleted();
}
}).
callOn(new NewThreadSwitcher()).
callbackOn(new LooperSwitcher(getMainLooper())).
call(new Callee<String>() {
@Override
public void onCall(Release release) {
}
@Override
public void onReceive(String s) {
Log.d(TAG, "无背压 onReceive:" + s);
Log.d(TAG, "无背压 currentThread:" + Thread.currentThread());
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
});
/*---------有背压---------*/
Telephoner.
create(new TelephonerOnCall<String>() {
@Override
public void call(TelephonerEmitter<String> telephonerEmitter) {
telephonerEmitter.onReceive("test");
Log.d(TAG, "有背压 currentThread:" + Thread.currentThread());
}
}).
callOn(new NewThreadSwitcher()).
callbackOn(new LooperSwitcher(getMainLooper())).
call(new Receiver<String>() {
@Override
public void onCall(Drop d) {
d.request(Long.MAX_VALUE);
}
@Override
public void onReceive(String s) {
Log.d(TAG, "有背压 onReceive:" + s);
Log.d(TAG, "有背压 currentThread:" + Thread.currentThread());
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
});
}
}
06-19 13:19:29.185 25626-25771/com.haocai.rxjavademo D/kpioneer: 无背压 currentThread:Thread[NewThreadWorker,5,main]
06-19 13:19:29.195 25626-25626/com.haocai.rxjavademo D/kpioneer: 无背压 onReceive:test
06-19 13:19:29.195 25626-25626/com.haocai.rxjavademo D/kpioneer: 无背压 currentThread:Thread[main,5,main]
06-19 13:19:29.205 25626-25772/com.haocai.rxjavademo D/kpioneer: 有背压 currentThread:Thread[NewThreadWorker,5,main]
06-19 13:19:29.205 25626-25626/com.haocai.rxjavademo D/kpioneer: 有背压 onReceive:test
06-19 13:19:29.205 25626-25626/com.haocai.rxjavademo D/kpioneer: 有背压 currentThread:Thread[main,5,main]