Rx系列<第八篇>:RxJava之创建操作符

(1)create

    Observable.create();
    Flowable.create();
    Single.create();
    Completable.create();
    Maybe.create();

用来创建被观察者, 5种被观察者都可以使用create操作符创建。

(2)just

    Observable.just("1", "2", "3");
    Flowable.just("1", "2", "3");
    Single.just("1");
    Maybe.just("1");

只有四种被观察者可以使用just,一旦使用just就必须发射一条数据。

  • Observable 可以发射0个或多个数据,标准写法如下

    Observable.just("1");
    Observable.just("1", "2");
    Observable.just("1", "2", "3");
    
  • Flowable可以发射0个或多个数据,标准写法如下

    Flowable.just("1");
    Flowable.just("1", "2");
    Flowable.just("1", "2", "3");
    
  • Single只能发射一条数据,标准写法如下

      Single.just("1");
    
  • Completable不能发射数据,由于just至少要有一条数据,所以Completable没有just操作符;

  • Maybe只能发射0条或1条数据,由于just至少要有一条数据,所以标准写法如下

    Maybe.just("1");
    

另外,需要注意的是:

just操作符最多可以设置10个参数。

(3)from

from只要是将其他类种的对象和数据类型转成Observable

  • fromPublisher 将Publisher转成Observable

      Observable.fromPublisher(new Publisher<String>() {
    
          @Override
          public void subscribe(Subscriber<? super String> s) {
              s.onNext("A");
          }
      }).subscribe(new Consumer<String>() {
    
          @Override
          public void accept(String o) throws Exception {
              System.out.println(o);
          }
      });
    

Publisher将在后续介绍压背的时候重点提出。

  • fromArray 将数组转成Observable

      Observable.fromArray("A", "B", "C");
    

看一下源码

图片.png

在这里被我圈出来的可变长度参数,也就是说参数的个数是可变的。

  • fromIterable 将集合转成Observable

      List<String> list = new ArrayList<>();
      Observable.fromIterable(list);
    

fromIterable的参数是Iterable类型, Collection是Iterable的子接口,所以只要是最终实现Collection接口的集合都可以作为参数,以下的java集合框架图可以作为参考:

图片.png
  • fromCallable 将Callable转成Observable

      ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    
      Callable callable = new Callable<String>() {
    
          @Override
          public String call() throws Exception {
              return "A";
          }
      };
    
      singleThreadExecutor.submit(callable);
    
      Observable.fromCallable(callable).subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
              System.out.println(s);
          }
      });
    
  • fromFuture 将Future转成Observable

      ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    
      Future future = singleThreadExecutor.submit(new Runnable() {
          @Override
          public void run() {
    
          }
      }, "A");
    
      Observable.fromFuture(future).subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
              System.out.println(s);
          }
      });
    

注意:其他被观察者也可以使用from操作符

图片.png
图片.png
图片.png
图片.png

(4)range

发射0~9之间的整数,左闭右开[0,9)

    Observable.range(0, 9).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(String.valueOf(integer));
        }
    });

其他方式

    Observable.rangeLong();
    Flowable.range();
    Flowable.rangeLong();

(5) defer

defer只有在订阅的时候才会创建被观察者,以保证每次发射的数据是最新的。

    Observable.defer(new Callable<ObservableSource<String>>() {
        @Override
        public ObservableSource<String> call() throws Exception {
            return Observable.just("A", "B");
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

其他方式

    Flowable.defer();
    Single.defer();
    Completable.defer();
    Maybe.defer();

(6)interval

创建一个按固定时间间隔发射整数序列的Observable

图片.png
    //延迟initialDelay秒后,按period秒定时时间间隔发射整数序列,调度器为computation
    Observable.interval(1000, 1000, TimeUnit.MILLISECONDS, Schedulers.computation());

    //延迟period秒后,按period秒定时时间间隔发射整数序列,调度器为computation
    Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println(String.valueOf(aLong));
        }
    });

有关Schedulers的讲解:

  • Schedulers.computation() :
    用于cpu密集型计算任务,即不会被被I/O等操作限制性能的耗时操作,例如XML,JSON文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU核数,不可以用于IO操作,因为IO操作的等待时间会浪费cpu

  • Schedulers.from(@NonNull Excutor excutor):指定一个线程调度器,由此调度器来控制任务的执行策略。

  • Schedulers.io():用于IO密集型的操作,例如写磁盘操作,查询数据库,网络访问,具有线程缓存机制,在此调度器接收到任务之后,先检查线程缓存池中是否有空闲的线程可用,如果有,复用,如果没有则 创建新的线程,并将其加入到线程池中,如果每次都没有空闲的线程使用,可以无上限的创建线程。

  • Schedulers.newThread(): 在每执行一个任务时创建一个新的线程,不具有线程缓存机制,由于创建一个线程比起复用一个线程更加耗时耗力,虽然使用Schedulers.io()的地方都可以使用Schedulers.newThread(),但是总体上的Schedulers.newThread()的效率没有Schedulers.io()的高。

  • Schedulers.trampoline():在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停下来,等插入进来的任务执行完成之后,再将未完成的任务接着执行。

  • Schedulers.single():拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务在执行的时候其他任务将按照队列先进先出的顺序依次执行。

  • AndroidSchedulers.mainThread():在Andriod UI线程中执行任务,属于Android的专属定制。

注意:在Rxjava 2.x版本中,废弃了1.x版本Schedulers.immediate(),在1.x中,Schedulers.immediate的作用是在当前线程中立即执行任务,功能等同于Rxjava中的2.x版本中的Schedulers.trampoline(),而在Schedulers.trampoline()在1.x版本的时候,作用是:当其他排队的任务执行完成之后,在当前线程排队开始执行接收到的任务,有点像2.x版本的Schedulers.single(),但是也不完全相同,因为Schedulers.single()不是在当前线程而是在一个线程单例中排队执行任务。

其他方式

    Flowable.interval();

(7)timer

实现延迟执行

    Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println(String.valueOf(aLong));
        }
    });

    Observable.timer(1000, TimeUnit.MILLISECONDS, Schedulers.computation());

其他方式

    Flowable.timer();
    Single.timer();
    Completable.timer();
    Maybe.timer();

(8)empty

使直接完成发射数据,也就是直接执行了onComplete。

    Observable.empty().subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("33333333333333");
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.empty();
    Maybe.empty();

(9)error

使直接发生异常结束发射数据,也就是直接执行了onError。

    Observable.error(new Throwable("nullpoint exception")).subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.error();
    Single.error();
    Completable.error();
    Maybe.error();

(10)never

不发射数据,也不结束发射。

    Observable.never().subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.never();
    Single.never();
    Completable.never();
    Maybe.never();
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容

  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,193评论 2 8
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 928评论 0 3
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    BrotherChen阅读 1,615评论 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 3,073评论 0 21
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    测天测地测空气阅读 636评论 0 1