Scheduler (二)

这篇文章主要讲下scheduler和thread的关系。
主要是读RxJava Threading Examples的笔记。

no thread

普通的observable的话是不会产生额外的线程,是在当前线程执行任务的。

代码示例

public class NoThreadObservable {
    public static void main(String[] args) {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
            return e + 10;
        }).map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
            return e - 10;
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer s) {
                System.out.println(Thread.currentThread().getName() + " Received " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
    }
}

代码的输出都是在main线程上。

输出结果

main Generated 1
main Shifted Up 11
main Shifted Down 1
main Received 1
main Generated 2
main Shifted Up 12
main Shifted Down 2
main Received 2
main Generated 3
main Shifted Up 13
main Shifted Down 3
main Received 3
main Generated 4
main Shifted Up 14
main Shifted Down 4
main Received 4
main Generated 5
main Shifted Up 15
main Shifted Down 5
main Received 5

弹珠图

NoThreadObservable.png
NoThreadObservable.png

scheduler api

Scheduler purpose
Schedulers.computation( ) meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead); the number of threads, by default, is equal to the number of processors
Schedulers.from(executor) uses the specified Executor as a Scheduler
Schedulers.immediate( ) schedules work to begin immediately in the current thread
Schedulers.io( ) meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( ); Schedulers.io( ) by default is a CachedThreadScheduler, which is something like a new thread scheduler with thread caching
Schedulers.newThread( ) creates a new thread for each unit of work
Schedulers.trampoline( ) queues work to begin on the current thread after any already-queued work

Observable.subscribeOn

SubscribeOn运算符指定Observable将在哪个线程上开始操作,无论该运算符在运算符链中的哪个点被调用

代码示例

public static void normalCase() throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).subscribeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
}

输出结果

RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 2
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 4
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5

可以看到使用的是RxCachedThreadScheduler-1线程。

注意点

  1. 因为subscribeOn之后使用了RxCachedThreadScheduler-1线程(rxjava大量使用了immutable objects和decorate pattern模式),如果主线程不等待的话,会在subscribe之间程序结束,导致没有输出。一般来说如果我们使用web server等有至少一个非daemon thread的服务的话,是不用担心的。
  2. subscribeOn之后是新的observable,不会对之前的observable造成影响,如果不使用的话会导致仍旧在main线程运行
public static void main(String[] args) throws InterruptedException {
    List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
    Observable<Integer> observable = Observable.fromStream(intList.stream());
    Observable<Integer> scheduledObservable = observable.subscribeOn(Schedulers.io());
    observable.subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {

        }

        @Override
        public void onNext(@NonNull Integer integer) {
            System.out.println(Thread.currentThread().getName() + " " + integer);
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });
}

运行结果是:

main 1
main 2
main 3
main 4
main 5

弹珠图

Observable.subscribeOn.png
Observable.subscribeOn.png

Observable.observeOn

ObserveOn影响Observable将在该运算符出现的位置使用的线程。

代码示例

package com.zihao.schedulers.threadexample;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;

import java.util.Arrays;
import java.util.List;

/**
 * observeOn
 *
 * @author tangzihao
 * @Date 2021/1/4 11:35 上午
 */
public class ObserveOnObservable {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })).observeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
    }
}

输出结果

main Generated 1
main Generated 2
main Generated 3
main Generated 4
main Generated 5
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5

注意点

  1. main线程和RxCachedThreadScheduler-1是在一定程度上并行的,如果增加数据量的话可以看到main和RxCachedThreadScheduler-1线程并行执行。
  2. RxCachedThreadScheduler-1只是从scheduler取了一个线程,对于这个线程来说,执行还是串行的(This is key: observeOn() is not an instruction saying “process these values using this thread pool”, but only “process these values using one thread from this Scheduler”.)

弹珠图

Observable.observeOn.png
Observable.observeOn.png

Two Observable.observeOn calls in one chain

ObserveOn影响Observable将在该运算符出现的位置使用的线程,所以可以使用多次observeOn的操作符。

代码示例

public class TwoObserveOnObservable {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })).observeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .observeOn(Schedulers.computation())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
    }
}

输出结果

main Generated 1
main Generated 2
main Generated 3
main Generated 4
main Generated 5
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Up 13
RxComputationThreadPool-1 Shifted Down 1
RxCachedThreadScheduler-1 Shifted Up 14
RxComputationThreadPool-1 Received 1
RxCachedThreadScheduler-1 Shifted Up 15
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5

注意点

  1. 这个例子和文章中的有些出入,文章中的输出如下
                        [main] Generated         1
                        [main] Generated             2
                        [main] Generated                 3
                        [main] Generated                     4
                        [main] Generated                         5
   [RxCachedThreadScheduler-1] Shifted Up       11
   [RxComputationThreadPool-1] Shifted Down      1
   [RxComputationThreadPool-1] Received          1
   [RxCachedThreadScheduler-1] Shifted Up           12
   [RxComputationThreadPool-1] Shifted Down          2
   [RxComputationThreadPool-1] Received              2
   [RxCachedThreadScheduler-1] Shifted Up               13
...

弹珠图

TwoObserveOnObservable (1).png
TwoObserveOnObservable (1).png

Two Observable.subscribeOn calls in one chain

代码示例

public static void subscribeOnAndSubscribeOn() throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).subscribeOn(Schedulers.computation())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .subscribeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        Thread.sleep(3000);
}

输出结果

只用到了第一个subscribeOn(Schedulers.computation())的RxComputationThreadPool中的线程。

RxComputationThreadPool-1 Generated 1
RxComputationThreadPool-1 Shifted Up 11
RxComputationThreadPool-1 Shifted Down 1
RxComputationThreadPool-1 Received 1
RxComputationThreadPool-1 Generated 2
RxComputationThreadPool-1 Shifted Up 12
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Generated 5
RxComputationThreadPool-1 Shifted Up 15
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5

注意点

在one chain中,onObserve可以调用多次,onSubscribe使用第一个onSubscribe的scheduler

Observable.subscribeOn and Observable.observeOn together

先使用subscribeOn,然后使用observeOn

代码示例

public static void subscribeOnAndObserveOn() throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).subscribeOn(Schedulers.computation())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .observeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

        Thread.sleep(3000);
}

输出结果

RxComputationThreadPool-1 Generated 1
RxComputationThreadPool-1 Shifted Up 11
RxComputationThreadPool-1 Generated 2
RxComputationThreadPool-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 1
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxCachedThreadScheduler-1 Received 1
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 2
RxComputationThreadPool-1 Generated 5
RxCachedThreadScheduler-1 Received 2
RxComputationThreadPool-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5

弹珠图

SubscribeOnObserveOn.png
SubscribeOnObserveOn.png

先使用observeOn,然后使用subscribeOn

代码示例

public static void observeOnAndSubscribeOn() throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                intList.forEach(i -> {
                    System.out.println(Thread.currentThread().getName() + " Generated " + i);
                    emitter.onNext(i);
                });
            }
        }).map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
            return e + 10;
        }).observeOn(Schedulers.io()).map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
            return e - 10;
        }).subscribeOn(Schedulers.computation()).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer s) {
                System.out.println(Thread.currentThread().getName() + " Received " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

        Thread.sleep(3000);
}

输出结果

RxComputationThreadPool-1 Shifted Up 12
RxComputationThreadPool-1 Generated 3
RxComputationThreadPool-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 1
RxComputationThreadPool-1 Generated 4
RxComputationThreadPool-1 Shifted Up 14
RxCachedThreadScheduler-1 Received 1
RxComputationThreadPool-1 Generated 5
RxCachedThreadScheduler-1 Shifted Down 2
RxComputationThreadPool-1 Shifted Up 15
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5

弹珠图

ObserveOnSubscribeOn.png
ObserveOnSubscribeOn.png

注意点

  1. 使用了Schedulers.io()和Schedulers.computation()创建两个不同的线程池中的线程
  2. 这种结合导致两个线程并行处理不同值的运算。使用subscribeOn调度的线程在生成和处理更多值之前不会阻塞等待下游调度程序中的工作。
  3. subscribeOn和observeOn的操作符顺序没有明确的要求
  4. 如果使用两个subscribeOn,将使用第一个subscribeOn的scheduler

隐式的线程切换比如使用delay操作符

public class ImplicitThreadLikeDelay {
    public static void main(String[] args) throws InterruptedException {
        List<Integer> intList = Arrays.asList(1, 2, 3, 4, 5);
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> intList.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        }))
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .delay(10, TimeUnit.MILLISECONDS)
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Delayed " + e);
                    return e;
                })
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
    }
}

输出结果

main Generated 1
main Shifted Up 11
main Generated 2
main Shifted Up 12
main Generated 3
main Shifted Up 13
main Generated 4
main Shifted Up 14
main Generated 5
main Shifted Up 15
RxComputationThreadPool-1 Delayed 11
RxComputationThreadPool-1 Shifted Down 1
RxComputationThreadPool-1 Received 1
RxComputationThreadPool-1 Delayed 12
RxComputationThreadPool-1 Shifted Down 2
RxComputationThreadPool-1 Received 2
RxComputationThreadPool-1 Delayed 13
RxComputationThreadPool-1 Shifted Down 3
RxComputationThreadPool-1 Received 3
RxComputationThreadPool-1 Delayed 14
RxComputationThreadPool-1 Shifted Down 4
RxComputationThreadPool-1 Received 4
RxComputationThreadPool-1 Delayed 15
RxComputationThreadPool-1 Shifted Down 5
RxComputationThreadPool-1 Received 5

注意点
rxjava的一些operator(比如这个例子中的delay)可能会在不同的线程上调度subscriber,而不是调用subscribe方法的线程。

combine observable

single thread

代码示例

public static void usingOneThread() throws InterruptedException {
        List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);
        List<Integer> intStream2 = Arrays.asList(2, 4, 6, 8);
        Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })).mergeWith(Observable.create(emitter -> intStream2.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })))
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Up " + (e + 10));
                    return e + 10;
                })
                .subscribeOn(Schedulers.io())
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onNext(@NonNull Integer s) {
                        System.out.println(Thread.currentThread().getName() + " Received " + s);
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
        Thread.sleep(1000);
}

输出结果

RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-1 Shifted Up 11
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up 13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up 15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
RxCachedThreadScheduler-1 Generated 7
RxCachedThreadScheduler-1 Shifted Up 17
RxCachedThreadScheduler-1 Shifted Down 7
RxCachedThreadScheduler-1 Received 7
RxCachedThreadScheduler-1 Generated 2
RxCachedThreadScheduler-1 Shifted Up 12
RxCachedThreadScheduler-1 Shifted Down 2
RxCachedThreadScheduler-1 Received 2
RxCachedThreadScheduler-1 Generated 4
RxCachedThreadScheduler-1 Shifted Up 14
RxCachedThreadScheduler-1 Shifted Down 4
RxCachedThreadScheduler-1 Received 4
RxCachedThreadScheduler-1 Generated 6
RxCachedThreadScheduler-1 Shifted Up 16
RxCachedThreadScheduler-1 Shifted Down 6
RxCachedThreadScheduler-1 Received 6
RxCachedThreadScheduler-1 Generated 8
RxCachedThreadScheduler-1 Shifted Up 18
RxCachedThreadScheduler-1 Shifted Down 8
RxCachedThreadScheduler-1 Received 8

multi thread

代码示例

public static void usingMultiThread() throws InterruptedException {
        List<Integer> intStream1 = Arrays.asList(1, 3, 5, 7);
        Observable<Integer> generatorSubscribed = Observable.create((ObservableOnSubscribe<Integer>) emitter -> intStream1.forEach(i -> {
            System.out.println(Thread.currentThread().getName() + " Generated " + i);
            emitter.onNext(i);
        })).subscribeOn(Schedulers.io());
        Observable<Integer> shiftUp1 = generatorSubscribed.map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Up #1  " + (e + 10));
            return e + 10;
        });
        Observable<Integer> shiftUp2 = generatorSubscribed.map(e -> {
            System.out.println(Thread.currentThread().getName() + " Shifted Up #2  " + (e + 10));
            return e + 10;
        });
        shiftUp1.mergeWith(shiftUp2)
                .map(e -> {
                    System.out.println(Thread.currentThread().getName() + " Shifted Down " + (e - 10));
                    return e - 10;
                }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer s) {
                System.out.println(Thread.currentThread().getName() + " Received " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
        Thread.sleep(1000);
}

输出结果

RxCachedThreadScheduler-2 Generated 1
RxCachedThreadScheduler-1 Generated 1
RxCachedThreadScheduler-2 Shifted Up #2  11
RxCachedThreadScheduler-1 Shifted Up #1  11
RxCachedThreadScheduler-2 Shifted Down 1
RxCachedThreadScheduler-2 Received 1
RxCachedThreadScheduler-2 Generated 3
RxCachedThreadScheduler-2 Shifted Up #2  13
RxCachedThreadScheduler-2 Shifted Down 3
RxCachedThreadScheduler-2 Received 3
RxCachedThreadScheduler-2 Generated 5
RxCachedThreadScheduler-2 Shifted Up #2  15
RxCachedThreadScheduler-2 Shifted Down 5
RxCachedThreadScheduler-2 Received 5
RxCachedThreadScheduler-2 Generated 7
RxCachedThreadScheduler-2 Shifted Up #2  17
RxCachedThreadScheduler-2 Shifted Down 7
RxCachedThreadScheduler-2 Received 7
RxCachedThreadScheduler-1 Shifted Down 1
RxCachedThreadScheduler-1 Received 1
RxCachedThreadScheduler-1 Generated 3
RxCachedThreadScheduler-1 Shifted Up #1  13
RxCachedThreadScheduler-1 Shifted Down 3
RxCachedThreadScheduler-1 Received 3
RxCachedThreadScheduler-1 Generated 5
RxCachedThreadScheduler-1 Shifted Up #1  15
RxCachedThreadScheduler-1 Shifted Down 5
RxCachedThreadScheduler-1 Received 5
RxCachedThreadScheduler-1 Generated 7
RxCachedThreadScheduler-1 Shifted Up #1  17
RxCachedThreadScheduler-1 Shifted Down 7
RxCachedThreadScheduler-1 Received 7
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容