一直觉得多线程是Android开发工程师的一个硬伤, 感觉一提到多线程就是Handler;
Example:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
LogUtils.log(Note01.class, "subscribe()->ThreadName:" + Thread.currentThread().getName());
}
})
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.log(Note01.class, "onSubscribe()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onNext(Integer value) {
LogUtils.log(Note01.class, "onNext()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
LogUtils.log(Note01.class, "onError()->ThreadName:" + Thread.currentThread().getName());
}
@Override
public void onComplete() {
LogUtils.log(Note01.class, "onComplete()->ThreadName:" + Thread.currentThread().getName());
}
});
- 打印结果
onSubscribe()->ThreadName:main
subscribe()->ThreadName:RxNewThreadScheduler-2
onNext()->ThreadName:RxNewThreadScheduler-2
onComplete()->ThreadName:RxNewThreadScheduler-2
- 打印结果是onSubscribe()为主线程, subscribe, onNext, onComplete()均在子线程中调用;
目前有几个疑问:
- 1、如何创建线程;
- 2、子线程切换到主线程时如何进行主子线程通信;
public final class Schedulers {
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
static final Scheduler NEW_THREAD;
static {
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
});
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = NewThreadScheduler.instance();
}
}
public final class NewThreadScheduler extends Scheduler {
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
}
- 1、.subscribeOn(Schedulers.newThread())里面传入的Schedule实际指向NewThreadScheduler;
- 2、其内部实现等待后边onXXX系列时继续分析;
public abstract class Observable<T> implements ObservableSource<T> {
public final Observable<T> subscribeOn(Scheduler scheduler) {
return new ObservableSubscribeOn<T>(this, scheduler);
}
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
}
- 1、创建ObservableSubscribeOn对象, 并将其引用赋给Observer;
- 2、将this即ObservableCreate引用赋给AbstractObservableWithUpstream中的ObservableSource;
- 3、ObservableSubscribeOn内部持有NewThreadScheduler的引用;
.subscribe(new Observer<Integer>() {...}
public abstract class Observable<T> implements ObservableSource<T> {
@Override
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
protected abstract void subscribeActual(Observer<? super T> observer);
}
- subscribeActual被子类ObservableSubscribeOn实现:
public final class ObservableSubscribeOn<T> {
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
}
- disposable指向SubscribeOnObserver;从代码中可以看出s.onSubscribe(parent);还没有创建任何线程, 印证了开始的打印结果;
- 然后看下面代码是如何创建子线程的;
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
public abstract class Scheduler {
public Disposable scheduleDirect(Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = run;
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
public abstract Worker createWorker();
}
- 前边提到过Scheduler由NewThreadScheduler实现:
public final class NewThreadScheduler extends Scheduler {
@Override
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);
}
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
}
- 下边看看是如何创建线程:
public interface ThreadFactory {
Thread newThread(Runnable r);
}
public final class NewThreadScheduler extends Scheduler {
private static final RxThreadFactory THREAD_FACTORY;
static {
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
}
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
Thread t = new Thread(r, nameBuilder.toString());
t.setPriority(priority);
t.setDaemon(true);
return t;
}
}
- 内部创建线程, 并为线程赋别名;
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@Override
public Disposable schedule(final Runnable run) {
return schedule(run, 0, null);
}
@Override
public Disposable schedule(final Runnable action, long delayTime, TimeUnit unit) {
return scheduleActual(action, delayTime, unit, null);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
ScheduledRunnable sr = new ScheduledRunnable(run, parent);
Future<?> f = executor.submit((Callable<Object>)sr);
sr.setFuture(f);
return sr;
}
}
executor.submit()->sr的call()方法执行:
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
final Runnable actual;
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(2);
this.actual = actual;
}
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
@Override
public void run() {
try {
try {
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
} finally {
Object o = get(PARENT_INDEX);
if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
((DisposableContainer)o).delete(this);
}
for (;;) {
o = get(FUTURE_INDEX);
if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
break;
}
}
}
}
}
- 此处的actual即为我们在ObservableSubscribeOn中new出来的Runnable;
- 下一篇尝试分析主子线程切换, 这两篇文章分析完以后会切换回来去分析Atomic系列, Executor系列以及适配器模式, 代理模式, 装饰模式