Observable

快速入门

public class HelloWorld {
    public static void main(String[] args) {
        Flowable.fromArray("Ben", "George").subscribe(s -> System.out.println("Hello " + s + "!"));
    }
}

创建observable

使用observable操作符

可以使用just()或者from()方法将对象,数组,集合转换为Observable对象。

Observable<String> stringObservable = Observable.fromArray("a", "b", "c");
stringObservable.subscribe(s -> System.out.println(s + " "));

List<Integer> intList = Arrays.asList(1, 3, 5, 7);
Observable<Integer> integerObservable = Observable.fromStream(intList.stream());
integerObservable.subscribe(s -> System.out.println(s + " "));

Observable.just("one object").subscribe(System.out::println);

对于由Observable发射的每个元素,这些转换后的Observable都会同步调用它们的任何订阅者的onNext()方法,然后将调用订阅者的onCompleted()方法。

使用create方法

image.png
image.png

可以使用Create运算符从头创建Observable。向此运算符传递一个接受观察者作为其参数的函数。编写此函数,使其表现为可观察到的状态-通过适当地调用观察者的onNext,onError和onCompleted方法。Observable必须仅一次调用观察者的onCompleted方法或一次仅调用其onError方法。

同步

package com.zihao.observable.create;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;

import java.util.stream.IntStream;

/**
 * 同步observable
 * 不会产生额外的线程
 *
 * @author tangzihao
 * @Date 2020/12/28 9:10 下午
 */
public class SyncCreateObservable {
    public static void main(String[] args) {
        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            IntStream.rangeClosed(1, 50).forEach(i -> {
                if (!emitter.isDisposed()) {
                    //如果抛出异常执行到30就不会执行了
                    /*if (i == 30) {
                        throw new RuntimeException("老子不开心");
                    }*/
                    emitter.onNext("value_" + i);
                }
            });
            if (!emitter.isDisposed()) {
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("开始订阅啦");
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("完成了");
            }
        });
    }
}

上面代码有几个注意点:

  • 这个是同步的代码,不会产生新的线程,主线程会等上面的onComplete后才会执行
  • emitter.isDisposed()判断是否不感兴趣了,早期版本叫做unsubscribed
  • 如果抛出异常,异常之后的都不会执行,包括onComplete方法也不会执行
  • 如果没有抛出异常的话,onError方法不会执行
  • onSubscribe,onError,onComplete只会执行一次。

异步

package com.zihao.observable.create;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;

import java.util.stream.IntStream;

/**
 * @author tangzihao
 * @Date 2020/12/28 9:46 下午
 */
public class AsyncCreateObservable {
    public static void main(String[] args) {
        Observable.create((ObservableOnSubscribe<String>) emitter -> new Thread(() -> {
            IntStream.rangeClosed(1, 50).forEach(i -> {
                if (!emitter.isDisposed()) {
                    //如果抛出异常执行到30就不会执行了
                    /*if (i == 30) {
                        throw new RuntimeException("老子不开心");
                     }*/
                    emitter.onNext("value_" + i);
                }
            });
            if (!emitter.isDisposed()) {
                emitter.onComplete();
            }
        }).start()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("开始订阅啦");
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("完成了");
            }
        });
        System.out.println("主线程可以正常工作。。。");
    }
}

上面代码有如下注意点

  • 会产生新的线程,observer不会阻塞主线程的工作
  • 其他和同步一样

转换(transform)observable

image.png

示例代码

public class TransformObservable {
    public static void main(String[] args) {
        Observable.create((ObservableOnSubscribe<String>) emitter -> {
            IntStream.rangeClosed(1, 50).forEach(i -> {
                if (!emitter.isDisposed()) {
                    emitter.onNext("value_" + i);
                }
            });
            if (!emitter.isDisposed()) {
                emitter.onComplete();
            }
        })
                .skip(5)
                .take(10)
                .map(s -> s + "_xform")
                .subscribe(System.out::println);
    }
}

异常处理

通过onError捕获异常

//observable代码
if (!emitter.isDisposed()) {
   if (i == 30) {
      throw new RuntimeException("老子不开心");
   }
   emitter.onNext("value_" + i);
}

//observer代码
@Override
public void onError(Throwable throwable) {
      throwable.printStackTrace();
}

使用onErrorResume的方式fallback

其他代码都同上

onErrorResumeNext(throwable -> Observable.just("发生了异常,直接onComplete了"))
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容