Rxjava2的简单使用与基本操作符

一 、关于Rxjava

异步:RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。
简洁:异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

二 、基本概念

  • Observable:发射源,在观察者模式中称为被观察者
  • Observer:接收源,在观察者模式中成为观察者,可接收Observable、Subject发射的数据;
  • Subscriber:“订阅者”,也是接收源,接收源在观察者模式中成为观察者。Subscriber实现了Observer接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅。
  • subscribe:订阅,Observable和Observer通过subscribe()进行订阅
  • Subscription:Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;
  • Disposable:用于维系观察者、被观察者之间的联系。
  • Event:事件。

1、基本调用

添加依赖

implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxjava:2.2.3'

创建被观察者

Observable<String> normalObservable  = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
        observableEmitter.onNext("msg1");//通过onNext(),发射一个"msg1"的String
        observableEmitter.onNext("msg2");//通过onNext(),发射一个"msg2"的String
        observableEmitter.onComplete();//发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法
    }
});

创建观察者

Observer<String> mObserver = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable disposable) {
        //d.dispose();移除订阅关系
        //d.isDisposed()是否发生订阅关系
    }
    @Override
    public void onNext(String s) {
    }
    @Override
    public void onError(Throwable throwable) {
    }
    @Override
    public void onComplete() {
    }
}

调用subscribe实现订阅

normalObservable.subscribe(mObserver);

2、链式调用

当然我们也可以使用链式操作的写法
其中Consumer参数的方法表示下游只对我们关心onNext事件,或Throwable事件进行处理

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
    }
});

三、线程切换

我们在请求网络时必须是将其放在子线程执行,然后在安卓主线程中更新Ui

代码 含义
Schedulers.immediate 直接在当前线程运行。
Schedulers.newThread 启用新线程,并在线程执行操作。
Schedulers.io 内部是一个无数量上限的的线程池,可以重用空闲的线程,不要把计算工作放在io中。
Schedulers.computation 使用固定的线程池,大小为CPU核数。
  • subscribeOn():指定Observable线程

    subscribeOn(Schedulers.io())在IO线程中请求网络

  • observeOn():指定Observer线程

    observeOn(AndroidSchedulers.mainThread())在主线程中更新界面

Observable.create(new ObservableOnSubscribe<Resp>() {
     @Override
     public void subscribe(ObservableEmitter<Resp> e) throws Exception {
          //模拟登陆
        Call<Resp> respCall = api.login(new User(username, password));
        Resp resp = respCall.execute().body();
        e.onNext(resp);
    }
})
//设置请求网络在io线程内执行、子线程中执行
.subscribeOn(Schedulers.io())
//设置更新ui在安卓主线程中执行
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        //更新UI
    }
});

四、操作符

操作符变换符是RxJava中一个很重要的概念,也是其简洁很重要的原因,这里整理一些很常见的操作符。

》》1.使用just( ),创建一个Observable并自动为你调用onNext( )发射数据(最多可发送9个)

Observable  justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"

String []justs={"just1","just2"};
Observable.fromArray(justs)//传入数组,类似于just();

》》2.使用from( ),遍历集合,发送每个item

List<String> list = new ArrayList<>();
list.add("from1");
list.add("from2");
list.add("from3");
Observable  fromObservable = Observable.from(list);  //遍历list 每次发送一个

注意:just()方法也可以传list,但是发送的是整个list对象,而from()发送的是list的一个item
》》3.使用defer( ),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable

Observable  deferObservable = Observable.defer(new Func0<Observable<String>>() {
    @Override
    //注意此处的call方法没有Subscriber参数
    public Observable<String> call() {
        return Observable.just("deferObservable");
    }});

》》4.使用interval( )创建一个按固定时间间隔发射整数序列的Observable,可用作定时器

Observable  intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒发送一次

》》5.使用range( ),创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常

Observable rangeObservable = Observable.range(10, 5);//将发送整数10,11,12,13,14

》》6.使用timer( ),创建一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法

Observable timeObservable = Observable.timer(3, TimeUnit.SECONDS);  //3秒后发射一个值

》》7.使用repeat( ),创建一个重复发射特定数据的Observable

Observable repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射3次

》》8.使用concat( ),连接两个被订阅者,订阅者将会按照a->b的顺序收到两个被订阅者所发射的消息。

final String[] aStrings = {"A1", "A2", "A3", "A4"};
final String[] bStrings = {"B1", "B2", "B3"};
final Observable<String> aObservable = Observable.fromArray(aStrings);
final Observable<String> bObservable = Observable.fromArray(bStrings);
Observable.concat(aObservable, bObservable);

输出A1", "A2", "A3", "A4","B1", "B2", "B3"

》》9.使用window( ),每隔n秒,发射这段时间内的数据,不是有数据就发射

Observable windowObservable=Observable.interval(1, TimeUnit.SECONDS).window(3, TimeUnit.SECONDS);//3秒后,发射前三秒所发射的数据

五、变换操作符

》》1. Map:最常用且最实用的操作符之一,将对象转换成另一个对象发射出去,应用范围非常广,如数据的转换,数据的预处理等。(如我们传入用户id需要查询用户信息,我们就可以使用map创建id返回user对象)
例一:数据类型转换,改变最终的接收的数据类型。假设传入本地图片路径,根据路径获取图片的Bitmap。

Observable.just(filePath).map(new Func1<String, Bitmap>() {
    @Override
    public Bitmap call(String path) {
         return getBitmapByPath(path);
    }}).subscribe(new Action1<Bitmap>() {
     @Override
    public void call(Bitmap bitmap) {
    //获取到bitmap,显示
}});

例二:对数据进行预处理,最后得到理想型数据。实际开发过程中,从后台接口获取到的数据也许不符合我们想要的,这时候可以在获取过程中对得到的数据进行预处理(结合Retrofit)。

Observable.just("12345678").map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s.substring(0,4);//只要前四位
    }})
.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.i("mytag",s);
    }});

》》2. FlatMap:和Map很像但又有所区别,Map只是转换发射的数据类型,而FlatMap可以将原始Observable转换成另一个Observable。
例:
需要使用的school类,student类就不展示了,保存一些学生基本信息的字段

public class School {
    private String name;
    private List<Student> studentList;
    ......
    class studengt{
    ......
    }
}
List<School> schoolList = new ArrayList<>();

首先假设要打印全国所有学校的名称,可以直接用Map:

Observable.from(schoolList).map(new Func1<School, String>() {
    @Override
    public String call(School school) {
          return school.getName();
    }}).subscribe(new Action1<String>() {
    @Override
    public void call(String schoolName) {
          Log.i(TAG,schoolName);
    }});

再进一步,打印学校所有学生的姓名,先使用map

Observable.from(schoolList).map(new Func1<School, School.Student>() {
    @Override
    public School.Student call(School school) {
        return school.getStudentList();//错误的地方
    }}).subscribe(new Action1<School.Student>() {
    @Override
    public void call(School.Student student) {
            Log.i(TAG,student.getName());
    }});

看似可行,但事实上,这是一段错误的代码,细心的人就会发现错误的地方 school.getStudentList()返回的时list集合
Map是一对一的关系,无法将单一的School对象转变成多个Student。FlatMap可以改变原始Observable变成另外一个Observable,如果我们能利用from()操作符把school.getStudentList()变成另外一个Observable,现在使用FlatMap实现

Observable.from(schoolList).flatMap(new Func1<School, Observable<School.Student>>() {
    @Override
    public Observable<School.Student> call(School school) {

        return Observable.from(school.getStudentList()); //关键,将学生列表以另外一个Observable发射出去

    }}).subscribe(new Action1<School.Student>() {

    @Override
    public void call(School.Student student) {
        Log.i(TAG,student.getName());
    }});

》》3. Buffer:缓存,可以设置缓存大小,缓存满后,以list的方式将数据发送出去;例:

Observable.just(1,2,3).buffer(2).subscribe(new Action1<List<Integer>>() {
    @Override
    public void call(List<Integer> list) {
        Log.i(TAG"size:"+list.size());
    }});

运行打印结果如下:

MainActivity.this: size:2
MainActivity.this: size:1

BufferMap经常一起使用,通常发生在从后台取完数据,对一个List中的数据进行预处理后,再用Buffer缓存后一起发送,保证最后数据接收还是一个List,如:

List<School> schoolList = new ArrayList<>();
Observable.from(schoolList).map(new Func1<School, School>() {
    @Override
    public School call(School school) {
        school.setName("NB大学");  //将所有学校改名
        return school;
    }}).buffer(schoolList.size())  //缓存起来,最后一起发送
.subscribe(new Action1<List<School>>() {
    @Override
    public void call(List<School> schools) {   
}});

六 、过滤操作符

》》1.Take:发射前n项数据,还是用上面的例子,假设不要改所有学校的名称了,就改前四个学校的名称:

Observable.from(schoolList).take(4).map(new Func1<School, School>() {
    @Override
    public School call(School school) {
        school.setName("NB大学");
        return school;
    }}).buffer(4).subscribe(new Action1<List<School>>() {
    @Override
    public void call(List<School> schools) {
    }});

》》2.Distinct:去掉重复的项,比较好理解

Observable.just(1, 2, 1, 1, 2, 3)
        .distinct()
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer item) {
                System.out.println("Next: " + item);
            }
        });

输出

Next: 1
Next: 2
Next: 3

》》3.Filter:过滤,通过谓词判断的项才会被发射,例如,发射小于4的数据

Observable.just(1, 2, 3, 4, 5)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer item) {
                return( item < 4 );
            }
        }).subscribe(new Action1<Integer>() {
          @Override
          public void call(Integer item) {
                System.out.println("Next: " + item);
      }});

输出:

Next: 1
Next: 2
Next: 3

》》window:

关于其他操作符或详情查看官网:RxJava使用以及操作符

六、注意事项

1、RxBinding的使用

RxBinding是在RxJava的基础上封装的一些操作,可以处理常用的一些UI的响应问题,这里具体实现就不分析了,只整理一些常用的方法作为记录。

添加依赖:

implementation 'com.jakewharton.rxbinding3:rxbinding:3.0.0-alpha1'

1.1、常用方法:

1.1.1、RxView

  • RxView.clicks().throttleFirst(long windowDuration, TimeUnit unit)指定的时间windowDuration内,点击clicks事件只响应一次
  • RxView.longClicks()长按监听
  • RxView.draws()绘制监听
  • RxView.drags()拖拽监听
  • RxView.scrollChangeEvents()滑动触发
  • .....

例:按钮防抖,指定时间内事件只响应1次

//2秒内,按钮点击事件只响应1次
RxView.clicks(btn)
    .throttleFirst(2, TimeUnit.SECONDS)
    .subscribe(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
         }
    }

1.1.2、RxTextView

  • RxTextView.textChanges()EditText输入监听
  • RxTextView.textChangeEvents()封装了TextWatcher文本改变的监听,返回数据的类型为TextViewTextChangeEvent,内部包含详细的文本改变数据。
  • RxTextView.editorActions()监听了软键盘的回车点击
  • RxTextView.editorActionEvents()监听了软键盘的回车点击,返回类型为TextViewEditorActionEvent。
  • ......

例:监听文本变化

RxTextView.textChanges(et)
    .subscribe(new Consumer<CharSequence>() {
        @Override
        public void accept(CharSequence charSequence) throws Exception {
        }
    );

1.1.3、RxCompoundButton

  • RxCompoundButton.checkedChanges()选中状态改变事件

  • .....

    RxView.clicks(btnLogin)
        .subscribe(o -> {
            RxCompoundButton.checked(cb).accept(true);
        }));
    RxCompoundButton.checkedChanges(cb)
        .subscribe(aBoolean -> {
           ......
    });
    

2、避免内存泄漏

Activity被销毁时,我们的后台任务没有执行完,那么就会导致Activity不能正常回收,而对于每一个Observer,都会有一个Disposable对象用于管理。

ObserveronSubscribe回调中,会传入一个Disposable对象,下游可以通过该对象的dispose()方法主动切断和上游的联系,在这之后上游的observableEmitter.isDisposed()方法将返回true。当上游和下游的联系切断之后,下游收不到包括onComplete/onError在内的任何事件,若此时上游再调用onError方法发送事件,那么将会报错。

为避免造成内存泄漏,我们需要将其将入到该集合当中,在Activity的onDestroy方法中,调用它的clear方法,就能避免内存泄漏的发生。

public class MainActivity extends AppCompatActivity {
  private static final String TAG = "MainActivity";
  private CompositeDisposable compositeDisposable;

  @Override
  protected void onCreate(Bundle savedInstanceState) {
      super.onCreate(savedInstanceState);
      setContentView(R.layout.activity_main);

      compositeDisposable=new CompositeDisposable();
      Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
          @Override
          public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
              observableEmitter.onNext("msg");
          }
      });
      DisposableObserver<String> disposableObserver = new DisposableObserver<String>() {
          @Override
          public void onNext(String s) {
              Log.i(TAG, "onNext: "+s);
          }
          @Override
          public void onError(Throwable throwable) {
          }
          @Override
          public void onComplete() {
          }
      };
      observable.subscribe(disposableObserver);
      compositeDisposable.add(disposableObserver);
  }

  @Override
  protected void onDestroy() {
      compositeDisposable.clear();
      super.onDestroy();
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 222,104评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,816评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,697评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,836评论 1 298
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,851评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,441评论 1 310
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,992评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,899评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,457评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,529评论 3 341
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,664评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,346评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,025评论 3 334
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,511评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,611评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,081评论 3 377
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,675评论 2 359

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,866评论 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    BrotherChen阅读 1,623评论 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是为了提供函数式的特性,函数式最大的好处就是使得数据处理简洁易...
    无求_95dd阅读 3,109评论 0 21
  • 注:只包含标准包中的操作符,用于个人学习及备忘参考博客:http://blog.csdn.net/maplejaw...
    小白要超神阅读 2,199评论 2 8
  • 创建操作 用于创建Observable的操作符Create通过调用观察者的方法从头创建一个ObservableEm...
    rkua阅读 1,836评论 0 1