使用时:
Observable.just("xxxxx")
.map(new Function<String, Object>() {
@Override
public Object apply(@NonNull String s) throws Exception {
return null;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
注意到
.subscribeOn(Schedulers.io())//子线程执行
.observeOn(AndroidSchedulers.mainThread()) //主线程
一、子线程执行
执行
代表线程切换,源码分析
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
进入ObservableSubscribeOn类,只需要分析
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
和
@Override
public void onNext(T t) {
actual.onNext(t);
}
方法中
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
SubscribeTask是一个Runnable
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
scheduler.scheduleDirect()方法传入一个Runnable ,scheduler为
.subscribeOn(Schedulers.io())中Schedulers.io()看一下代码
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
IO初始化为
IO = RxJavaPlugins.initIoScheduler(new IOTask());
进一步
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
IoHolder.DEFAULT
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
IoScheduler
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
WORKER_THREAD_FACTORY为线程工厂
static final RxThreadFactory WORKER_THREAD_FACTORY;
看一下 this(WORKER_THREAD_FACTORY);
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
/**
* @param threadFactory thread factory to use for creating worker threads. Note that this takes precedence over any
* system properties for configuring new thread creation. Cannot be null.
*/
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
pool 为线程池 , start();方法
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
CachedWorkerPool类
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;//线程安全的集合,联想到eventbus中保证集合如下,先复制,再添加
总之是封装一个线程池对象,回到
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
中scheduler.scheduleDirect()方法再Scheduler类
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
scheduleDirect具体实现
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
createWorker();方法在IoScheduler类实现
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
w.schedule(task, delay, unit);方法IoScheduler类实现
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
threadWorker为
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
threadWorker.scheduleActual()为NewThreadWorker
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
是一些线程池
二、 主线程执行
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
ObservableObserveOn类中
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
发现
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
onNext中
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
schedule();方法
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
.observeOn(AndroidSchedulers.mainThread())方法
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
HandlerScheduler类
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
new Handler(Looper.getMainLooper()) 精华 new Handler() 和 new Handler(Looper.getMainLooper()) 的区别
new Handler() , 如果是在主线程中是没问题,但是有的时候可能会在子线程中调用,肯定就报错。
new Handler(Looper.getMainLooper()),确保创建的Handler永远在主线程中,Looper要是主线程的Looper。
三、简单实现子线程
Observable
public Observable<Bitmap> subscribeOn(Schedulers schedulers) {
return onAssembly(new ObservableSchedulers(this,schedulers));
}
ObservableSchedulers
final class ObservableSchedulers<T> extends Observable<T> {
final Observable<T> source;
final Schedulers schedulers;
public ObservableSchedulers(Observable<T> source, Schedulers schedulers) {
this.source = source;
this.schedulers = schedulers;
}
@Override
protected void subscribeActual(Observer<T> observer) {
schedulers.scheduleDirect(new SchedulerTask(observer));
}
private class SchedulerTask implements Runnable{
final Observer<T> observer;
public SchedulerTask(Observer<T> observer) {
this.observer = observer;
}
@Override
public void run() {
// 线程池最终回来执行 Runnable -> 这行代码,会执行上游的 subscribe()
// 而这个run方法在子线程中
source.subscribe(observer);
}
}
Schedulers 类
public abstract class Schedulers {
static Schedulers IO;
static {
IO = new IOSchedulers();
}
public static Schedulers io() {
return IO;
}
public abstract void scheduleDirect(Runnable runnable);
private static class IOSchedulers extends Schedulers {
ExecutorService service;
public IOSchedulers(){
service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable r) {
return new Thread(r);
}
});
}
@Override
public void scheduleDirect(Runnable runnable) {
service.execute(runnable);
}
}
}
四、主线程实现
Observable中
public Observable<T> observerOn(Schedulers schedulers) {
return onAssembly(new ObserverOnObservable(this,schedulers));
}
ObserverOnObservable
class ObserverOnObservable<T> extends Observable<T> {
final Observable<T> source;
final Schedulers schedulers;
public ObserverOnObservable(Observable<T> source, Schedulers schedulers) {
this.source = source;
this.schedulers = schedulers;
}
@Override
protected void subscribeActual(Observer<T> observer) {
source.subscribe(new ObserverOnObserver(observer,schedulers));
}
private class ObserverOnObserver<T> implements Observer<T>,Runnable{
final Observer<T> observer;
final Schedulers schedulers;
private T value;
public ObserverOnObserver(Observer<T> observer, Schedulers schedulers) {
this.observer = observer;
this.schedulers = schedulers;
}
@Override
public void onSubscribe() {
observer.onSubscribe();
}
@Override
public void onNext(@NonNull T item) {
value = item;
schedulers.scheduleDirect(this);
}
@Override
public void onError(@NonNull Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
@Override
public void run() {
// 主线程 或者 其他
observer.onNext(value);
}
}
}
完善Schedulers
public abstract class Schedulers {
static Schedulers MAIN_THREAD;
static Schedulers IO;
static {
IO = new IOSchedulers();
MAIN_THREAD = new MainSchedulers(new Handler(Looper.getMainLooper()));
}
public static Schedulers io() {
return IO;
}
public abstract void scheduleDirect(Runnable runnable);
public static Schedulers mainThread() {
return MAIN_THREAD;
}
private static class IOSchedulers extends Schedulers {
ExecutorService service;
public IOSchedulers(){
service = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(@NonNull Runnable r) {
return new Thread(r);
}
});
}
@Override
public void scheduleDirect(Runnable runnable) {
service.execute(runnable);
}
}
private static class MainSchedulers extends Schedulers {
private Handler handler;
public MainSchedulers(Handler handler) {
this.handler = handler;
}
@Override
public void scheduleDirect(Runnable runnable) {
Message message = Message.obtain(handler,runnable);
handler.sendMessage(message);
}
}