这篇文章主要讲下scheduler和thread的关系。
主要是读RxJava Threading Examples的笔记。
no thread
普通的observable的话是不会产生额外的线程,是在当前线程执行任务的。
代码示例
public class NoThreadObservable {
public static void main(String[] args) {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
intList.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
});
}
}).map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
}).map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
代码的输出都是在main线程上。
输出结果
main Generated 1
main Shifted Up 11
main Shifted Down 1
main Received 1
main Generated 2
main Shifted Up 12
main Shifted Down 2
main Received 2
main Generated 3
main Shifted Up 13
main Shifted Down 3
main Received 3
main Generated 4
main Shifted Up 14
main Shifted Down 4
main Received 4
main Generated 5
main Shifted Up 15
main Shifted Down 5
main Received 5
弹珠图

NoThreadObservable.png
scheduler api
Scheduler | purpose |
---|---|
Schedulers.computation( ) |
meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors |
Schedulers.from(executor) |
uses the specified Executor as a Scheduler |
Schedulers.immediate( ) |
schedules work to begin immediately in the current thread |
Schedulers.io( ) |
meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ) ; Schedulers.io( ) by default is a CachedThreadScheduler , which is something like a new thread scheduler with thread caching |
Schedulers.newThread( ) |
creates a new thread for each unit of work |
Schedulers.trampoline( ) |
queues work to begin on the current thread after any already-queued work |
Observable.subscribeOn
SubscribeOn运算符指定Observable将在哪个线程上开始操作,无论该运算符在运算符链中的哪个点被调用
代码示例
public static void normalCase() throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
intList.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
});
}
}).subscribeOn(Schedulers.io())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
})
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(1000);
}
输出结果
RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 2
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 4
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
可以看到使用的是RxCachedThreadScheduler-1线程。
注意点
- 因为subscribeOn之后使用了RxCachedThreadScheduler-1线程(rxjava大量使用了immutable objects和decorate pattern模式),如果主线程不等待的话,会在subscribe之间程序结束,导致没有输出。一般来说如果我们使用web server等有至少一个非daemon thread的服务的话,是不用担心的。
- subscribeOn之后是新的observable,不会对之前的observable造成影响,如果不使用的话会导致仍旧在main线程运行
public static void main(String[] args) throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable<Integer> observable = Observable.fromStream(intList.stream());
Observable<Integer> scheduledObservable = observable.subscribeOn(Schedulers.io());
observable.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println(Thread.currentThread().getName() + " " + integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
运行结果是:
main 1
main 2
main 3
main 4
main 5
弹珠图

Observable.subscribeOn.png
Observable.observeOn
ObserveOn影响Observable将在该运算符出现的位置使用的线程。
代码示例
package com.zihao.schedulers.threadexample;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.Arrays;
import java.util.List;
/**
* observeOn
*
* @author tangzihao
* @Date 2021/1/4 11:35 上午
*/
public class ObserveOnObservable {
public static void main(String[] args) throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
})).observeOn(Schedulers.io())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
})
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(1000);
}
}
输出结果
main Generated 1
main Generated 2
main Generated 3
main Generated 4
main Generated 5
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
注意点
- main线程和RxCachedThreadScheduler-1是在一定程度上并行的,如果增加数据量的话可以看到main和RxCachedThreadScheduler-1线程并行执行。
- RxCachedThreadScheduler-1只是从scheduler取了一个线程,对于这个线程来说,执行还是串行的(This is key: observeOn() is not an instruction saying “process these values using this thread pool”, but only “process these values using one thread from this Scheduler”.)
弹珠图

Observable.observeOn.png
Two Observable.observeOn calls in one chain
ObserveOn影响Observable将在该运算符出现的位置使用的线程,所以可以使用多次observeOn的操作符。
代码示例
public class TwoObserveOnObservable {
public static void main(String[] args) throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
})).observeOn(Schedulers.io())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
})
.observeOn(Schedulers.computation())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(1000);
}
}
输出结果
main Generated 1
main Generated 2
main Generated 3
main Generated 4
main Generated 5
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Up 13
RxComputationThreadPool-1 Shifted Down 1
RxCachedThreadScheduler-1 Shifted Up 14
RxComputationThreadPool-1 Received 1
RxCachedThreadScheduler-1 Shifted Up 15
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5
注意点
- 这个例子和文章中的有些出入,文章中的输出如下
[main] Generated 1
[main] Generated 2
[main] Generated 3
[main] Generated 4
[main] Generated 5
[RxCachedThreadScheduler-1] Shifted Up 11
[RxComputationThreadPool-1] Shifted Down 1
[RxComputationThreadPool-1] Received 1
[RxCachedThreadScheduler-1] Shifted Up 12
[RxComputationThreadPool-1] Shifted Down 2
[RxComputationThreadPool-1] Received 2
[RxCachedThreadScheduler-1] Shifted Up 13
...
弹珠图

TwoObserveOnObservable (1).png
Two Observable.subscribeOn calls in one chain
代码示例
public static void subscribeOnAndSubscribeOn() throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
intList.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
});
}
}).subscribeOn(Schedulers.computation())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
})
.subscribeOn(Schedulers.io())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(3000);
}
输出结果
只用到了第一个subscribeOn(Schedulers.computation())的RxComputationThreadPool中的线程。
RxComputationThreadPool-1 Generated 1
RxComputationThreadPool-1 Shifted Up 11
RxComputationThreadPool-1 Shifted Down 1
RxComputationThreadPool-1 Received 1
RxComputationThreadPool-1 Generated 2
RxComputationThreadPool-1 Shifted Up 12
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Generated 5
RxComputationThreadPool-1 Shifted Up 15
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5
注意点
在one chain中,onObserve可以调用多次,onSubscribe使用第一个onSubscribe的scheduler
Observable.subscribeOn and Observable.observeOn together
先使用subscribeOn,然后使用observeOn
代码示例
public static void subscribeOnAndObserveOn() throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
intList.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
});
}
}).subscribeOn(Schedulers.computation())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
})
.observeOn(Schedulers.io())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(3000);
}
输出结果
RxComputationThreadPool-1 Generated 1
RxComputationThreadPool-1 Shifted Up 11
RxComputationThreadPool-1 Generated 2
RxComputationThreadPool-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 1
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxCachedThreadScheduler-1 Received 1
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 2
RxComputationThreadPool-1 Generated 5
RxCachedThreadScheduler-1 Received 2
RxComputationThreadPool-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
弹珠图

SubscribeOnObserveOn.png
先使用observeOn,然后使用subscribeOn
代码示例
public static void observeOnAndSubscribeOn() throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
intList.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
});
}
}).map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
}).observeOn(Schedulers.io()).map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
}).subscribeOn(Schedulers.computation()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(3000);
}
输出结果
RxComputationThreadPool-1 Shifted Up 12
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 1
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxCachedThreadScheduler-1 Received 1
RxComputationThreadPool-1 Generated 5
RxCachedThreadScheduler-1 Shifted Down 2
RxComputationThreadPool-1 Shifted Up 15
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
弹珠图

ObserveOnSubscribeOn.png
注意点
- 使用了Schedulers.io()和Schedulers.computation()创建两个不同的线程池中的线程
- 这种结合导致两个线程并行处理不同值的运算。使用subscribeOn调度的线程在生成和处理更多值之前不会阻塞等待下游调度程序中的工作。
- subscribeOn和observeOn的操作符顺序没有明确的要求
- 如果使用两个subscribeOn,将使用第一个subscribeOn的scheduler
隐式的线程切换比如使用delay操作符
public class ImplicitThreadLikeDelay {
public static void main(String[] args) throws InterruptedException {
List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
}))
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
})
.delay(10, TimeUnit.MILLISECONDS)
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Delayed " + e);
return e;
})
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(1000);
}
}
输出结果
main Generated 1
main Shifted Up 11
main Generated 2
main Shifted Up 12
main Generated 3
main Shifted Up 13
main Generated 4
main Shifted Up 14
main Generated 5
main Shifted Up 15
RxComputationThreadPool-1 Delayed 11
RxComputationThreadPool-1 Shifted Down 1
RxComputationThreadPool-1 Received 1
RxComputationThreadPool-1 Delayed 12
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Delayed 13
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Delayed 14
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Delayed 15
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5
注意点
rxjava的一些operator(比如这个例子中的delay)可能会在不同的线程上调度subscriber,而不是调用subscribe方法的线程。
combine observable
single thread
代码示例
public static void usingOneThread() throws InterruptedException {
List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);
List<Integer> intStream2 = Arrays.asList(2, 4, 6, 8);
Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
})).mergeWith(Observable.create(emitter -> intStream2.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
})))
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
return e + 10;
})
.subscribeOn(Schedulers.io())
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(1000);
}
输出结果
RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
RxCachedThreadScheduler-1 Generated 7
RxCachedThreadScheduler-1 Shifted Up 17
RxCachedThreadScheduler-1 Shifted Down 7
RxCachedThreadScheduler-1 Received 7
RxCachedThreadScheduler-1 Generated 2
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Generated 4
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Generated 6
RxCachedThreadScheduler-1 Shifted Up 16
RxCachedThreadScheduler-1 Shifted Down 6
RxCachedThreadScheduler-1 Received 6
RxCachedThreadScheduler-1 Generated 8
RxCachedThreadScheduler-1 Shifted Up 18
RxCachedThreadScheduler-1 Shifted Down 8
RxCachedThreadScheduler-1 Received 8
multi thread
代码示例
public static void usingMultiThread() throws InterruptedException {
List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);
Observable<Integer> generatorSubscribed = Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {
System.out.println(Thread.currentThread().getName() + " Generated " + i);
emitter.onNext(i);
})).subscribeOn(Schedulers.io());
Observable<Integer> shiftUp1 = generatorSubscribed.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up #1 " + (e + 10));
return e + 10;
});
Observable<Integer> shiftUp2 = generatorSubscribed.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Up #2 " + (e + 10));
return e + 10;
});
shiftUp1.mergeWith(shiftUp2)
.map(e -> {
System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
return e - 10;
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println(Thread.currentThread().getName() + " Received " + s);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Thread.sleep(1000);
}
输出结果
RxCachedThreadScheduler-2 Generated 1
RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-2 Shifted Up #2 11
RxCachedThreadScheduler-1 Shifted Up #1 11
RxCachedThreadScheduler-2 Shifted Down 1
RxCachedThreadScheduler-2 Received 1
RxCachedThreadScheduler-2 Generated 3
RxCachedThreadScheduler-2 Shifted Up #2 13
RxCachedThreadScheduler-2 Shifted Down 3
RxCachedThreadScheduler-2 Received 3
RxCachedThreadScheduler-2 Generated 5
RxCachedThreadScheduler-2 Shifted Up #2 15
RxCachedThreadScheduler-2 Shifted Down 5
RxCachedThreadScheduler-2 Received 5
RxCachedThreadScheduler-2 Generated 7
RxCachedThreadScheduler-2 Shifted Up #2 17
RxCachedThreadScheduler-2 Shifted Down 7
RxCachedThreadScheduler-2 Received 7
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up #1 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up #1 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
RxCachedThreadScheduler-1 Generated 7
RxCachedThreadScheduler-1 Shifted Up #1 17
RxCachedThreadScheduler-1 Shifted Down 7
RxCachedThreadScheduler-1 Received 7