Scheduler (二)

这篇文章主要讲下scheduler和thread的关系。
主要是读RxJava Threading Examples的笔记。

no thread

普通的observable的话是不会产生额外的线程,是在当前线程执行任务的。

代码示例

public class NoThreadObservable {
    public static void main(String[] args) {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
            return e + 10;
        }).map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
            return e - 10;
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer s) {
                System.out.println(Thread.currentThread().getName() + " Received " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }
}

代码的输出都是在main线程上。

输出结果

main Generated 1
main Shifted Up 11
main Shifted Down 1
main Received 1
main Generated 2
main Shifted Up 12
main Shifted Down 2
main Received 2
main Generated 3
main Shifted Up 13
main Shifted Down 3
main Received 3
main Generated 4
main Shifted Up 14
main Shifted Down 4
main Received 4
main Generated 5
main Shifted Up 15
main Shifted Down 5
main Received 5

弹珠图

NoThreadObservable.png
NoThreadObservable.png

scheduler api

Scheduler purpose
Schedulers.computation( ) meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors
Schedulers.from(executor) uses the specified Executor as a Scheduler
Schedulers.immediate( ) schedules work to begin immediately in the current thread
Schedulers.io( ) meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching
Schedulers.newThread( ) creates a new thread for each unit of work
Schedulers.trampoline( ) queues work to begin on the current thread after any already-queued work

Observable.subscribeOn

SubscribeOn运算符指定Observable将在哪个线程上开始操作,无论该运算符在运算符链中的哪个点被调用

代码示例

public static void normalCase() throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).subscribeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
}

输出结果

RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 2
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 4
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5

可以看到使用的是RxCachedThreadScheduler-1线程。

注意点

  1. 因为subscribeOn之后使用了RxCachedThreadScheduler-1线程(rxjava大量使用了immutable objects和decorate pattern模式),如果主线程不等待的话,会在subscribe之间程序结束,导致没有输出。一般来说如果我们使用web server等有至少一个非daemon thread的服务的话,是不用担心的。
  2. subscribeOn之后是新的observable,不会对之前的observable造成影响,如果不使用的话会导致仍旧在main线程运行
public static void main(String[] args) throws InterruptedException {
    List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    Observable<Integer> observable = Observable.fromStream(intList.stream());
    Observable<Integer> scheduledObservable = observable.subscribeOn(Schedulers.io());
    observable.subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {

        }

        @Override
        public void onNext(@NonNull Integer integer) {
            System.out.println(Thread.currentThread().getName() + " " + integer);
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}

运行结果是:

main 1
main 2
main 3
main 4
main 5

弹珠图

Observable.subscribeOn.png
Observable.subscribeOn.png

Observable.observeOn

ObserveOn影响Observable将在该运算符出现的位置使用的线程。

代码示例

package com.zihao.schedulers.threadexample;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.util.Arrays;
import java.util.List;

/**
 * observeOn
 *
 * @author tangzihao
 * @Date 2021/1/4 11:35 上午
 */
public class ObserveOnObservable {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })).observeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
    }
}

输出结果

main Generated 1
main Generated 2
main Generated 3
main Generated 4
main Generated 5
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5

注意点

  1. main线程和RxCachedThreadScheduler-1是在一定程度上并行的,如果增加数据量的话可以看到main和RxCachedThreadScheduler-1线程并行执行。
  2. RxCachedThreadScheduler-1只是从scheduler取了一个线程,对于这个线程来说,执行还是串行的(This is key: observeOn() is not an instruction saying “process these values using this thread pool”, but only “process these values using one thread from this Scheduler”.)

弹珠图

Observable.observeOn.png
Observable.observeOn.png

Two Observable.observeOn calls in one chain

ObserveOn影响Observable将在该运算符出现的位置使用的线程,所以可以使用多次observeOn的操作符。

代码示例

public class TwoObserveOnObservable {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })).observeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .observeOn(Schedulers.computation())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
    }
}

输出结果

main Generated 1
main Generated 2
main Generated 3
main Generated 4
main Generated 5
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Up 13
RxComputationThreadPool-1 Shifted Down 1
RxCachedThreadScheduler-1 Shifted Up 14
RxComputationThreadPool-1 Received 1
RxCachedThreadScheduler-1 Shifted Up 15
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5

注意点

  1. 这个例子和文章中的有些出入,文章中的输出如下
                        [main] Generated         1
                        [main] Generated             2
                        [main] Generated                 3
                        [main] Generated                     4
                        [main] Generated                         5
   [RxCachedThreadScheduler-1] Shifted Up       11
   [RxComputationThreadPool-1] Shifted Down      1
   [RxComputationThreadPool-1] Received          1
   [RxCachedThreadScheduler-1] Shifted Up           12
   [RxComputationThreadPool-1] Shifted Down          2
   [RxComputationThreadPool-1] Received              2
   [RxCachedThreadScheduler-1] Shifted Up               13
...

弹珠图

TwoObserveOnObservable (1).png
TwoObserveOnObservable (1).png

Two Observable.subscribeOn calls in one chain

代码示例

public static void subscribeOnAndSubscribeOn() throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).subscribeOn(Schedulers.computation())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .subscribeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        Thread.sleep(3000);
}

输出结果

只用到了第一个subscribeOn(Schedulers.computation())的RxComputationThreadPool中的线程。

RxComputationThreadPool-1 Generated 1
RxComputationThreadPool-1 Shifted Up 11
RxComputationThreadPool-1 Shifted Down 1
RxComputationThreadPool-1 Received 1
RxComputationThreadPool-1 Generated 2
RxComputationThreadPool-1 Shifted Up 12
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Generated 5
RxComputationThreadPool-1 Shifted Up 15
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5

注意点

在one chain中,onObserve可以调用多次,onSubscribe使用第一个onSubscribe的scheduler

Observable.subscribeOn and Observable.observeOn together

先使用subscribeOn,然后使用observeOn

代码示例

public static void subscribeOnAndObserveOn() throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).subscribeOn(Schedulers.computation())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .observeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        Thread.sleep(3000);
}

输出结果

RxComputationThreadPool-1 Generated 1
RxComputationThreadPool-1 Shifted Up 11
RxComputationThreadPool-1 Generated 2
RxComputationThreadPool-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 1
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxCachedThreadScheduler-1 Received 1
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 2
RxComputationThreadPool-1 Generated 5
RxCachedThreadScheduler-1 Received 2
RxComputationThreadPool-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5

弹珠图

SubscribeOnObserveOn.png
SubscribeOnObserveOn.png

先使用observeOn,然后使用subscribeOn

代码示例

public static void observeOnAndSubscribeOn() throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
            return e + 10;
        }).observeOn(Schedulers.io()).map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
            return e - 10;
        }).subscribeOn(Schedulers.computation()).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer s) {
                System.out.println(Thread.currentThread().getName() + " Received " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

        Thread.sleep(3000);
}

输出结果

RxComputationThreadPool-1 Shifted Up 12
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 1
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxCachedThreadScheduler-1 Received 1
RxComputationThreadPool-1 Generated 5
RxCachedThreadScheduler-1 Shifted Down 2
RxComputationThreadPool-1 Shifted Up 15
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5

弹珠图

ObserveOnSubscribeOn.png
ObserveOnSubscribeOn.png

注意点

  1. 使用了Schedulers.io()和Schedulers.computation()创建两个不同的线程池中的线程
  2. 这种结合导致两个线程并行处理不同值的运算。使用subscribeOn调度的线程在生成和处理更多值之前不会阻塞等待下游调度程序中的工作。
  3. subscribeOn和observeOn的操作符顺序没有明确的要求
  4. 如果使用两个subscribeOn,将使用第一个subscribeOn的scheduler

隐式的线程切换比如使用delay操作符

public class ImplicitThreadLikeDelay {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        }))
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .delay(10, TimeUnit.MILLISECONDS)
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Delayed " + e);
                    return e;
                })
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
    }
}

输出结果

main Generated 1
main Shifted Up 11
main Generated 2
main Shifted Up 12
main Generated 3
main Shifted Up 13
main Generated 4
main Shifted Up 14
main Generated 5
main Shifted Up 15
RxComputationThreadPool-1 Delayed 11
RxComputationThreadPool-1 Shifted Down 1
RxComputationThreadPool-1 Received 1
RxComputationThreadPool-1 Delayed 12
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Delayed 13
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Delayed 14
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Delayed 15
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5

注意点
rxjava的一些operator(比如这个例子中的delay)可能会在不同的线程上调度subscriber,而不是调用subscribe方法的线程。

combine observable

single thread

代码示例

public static void usingOneThread() throws InterruptedException {
        List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);
        List<Integer> intStream2 = Arrays.asList(2, 4, 6, 8);
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })).mergeWith(Observable.create(emitter -> intStream2.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })))
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .subscribeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
}

输出结果

RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
RxCachedThreadScheduler-1 Generated 7
RxCachedThreadScheduler-1 Shifted Up 17
RxCachedThreadScheduler-1 Shifted Down 7
RxCachedThreadScheduler-1 Received 7
RxCachedThreadScheduler-1 Generated 2
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Generated 4
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Generated 6
RxCachedThreadScheduler-1 Shifted Up 16
RxCachedThreadScheduler-1 Shifted Down 6
RxCachedThreadScheduler-1 Received 6
RxCachedThreadScheduler-1 Generated 8
RxCachedThreadScheduler-1 Shifted Up 18
RxCachedThreadScheduler-1 Shifted Down 8
RxCachedThreadScheduler-1 Received 8

multi thread

代码示例

public static void usingMultiThread() throws InterruptedException {
        List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);
        Observable<Integer> generatorSubscribed = Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })).subscribeOn(Schedulers.io());
        Observable<Integer> shiftUp1 = generatorSubscribed.map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Up #1  " + (e + 10));
            return e + 10;
        });
        Observable<Integer> shiftUp2 = generatorSubscribed.map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Up #2  " + (e + 10));
            return e + 10;
        });
        shiftUp1.mergeWith(shiftUp2)
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer s) {
                System.out.println(Thread.currentThread().getName() + " Received " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
        Thread.sleep(1000);
}

输出结果

RxCachedThreadScheduler-2 Generated 1
RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-2 Shifted Up #2  11
RxCachedThreadScheduler-1 Shifted Up #1  11
RxCachedThreadScheduler-2 Shifted Down 1
RxCachedThreadScheduler-2 Received 1
RxCachedThreadScheduler-2 Generated 3
RxCachedThreadScheduler-2 Shifted Up #2  13
RxCachedThreadScheduler-2 Shifted Down 3
RxCachedThreadScheduler-2 Received 3
RxCachedThreadScheduler-2 Generated 5
RxCachedThreadScheduler-2 Shifted Up #2  15
RxCachedThreadScheduler-2 Shifted Down 5
RxCachedThreadScheduler-2 Received 5
RxCachedThreadScheduler-2 Generated 7
RxCachedThreadScheduler-2 Shifted Up #2  17
RxCachedThreadScheduler-2 Shifted Down 7
RxCachedThreadScheduler-2 Received 7
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up #1  13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up #1  15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
RxCachedThreadScheduler-1 Generated 7
RxCachedThreadScheduler-1 Shifted Up #1  17
RxCachedThreadScheduler-1 Shifted Down 7
RxCachedThreadScheduler-1 Received 7
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容