在使用RxJava的第一步就是创建一个你所需要的Observable,根据不同的需求创建不同的Observable。
首先你得了解Observable的分类
Observable分类
-
Cold Observable
当没有订阅的时候,该Observable不处理任何事件,当有订阅者发起订阅后,才开始响应,从第一个数据开始发射。
-
Hot Observable
不管有没有订阅者,该Observable一直在发射数据。
关于‘冷’、‘热’我是这样理解的:‘冷’的Observable是默认定义了数据或者获取数据的方式,每次订阅后将所有的数据,按照一定的规则发射给你;‘热’则是动态的数据,而这个被观察者一直处于发射状态,当你订阅后,其实就是从现在开始接收到数据而已,这里的数据都是动态的、实时变化的,而不是定义Observable就把数据默认好了。
如何选择合适的Observable
-
已经获取到的数据进行处理 -
Cold
比较典型的例子,筛选一个数组中所有的偶数
Integer[] array = new Integer[]{1,2,3,4,5,6,7,8}; Observable.fromArray(array) .filter(item -> item%2 == 0) .subscribe(item -> System.out.println("item#"+item));
如果你单独创建一个
Observable.fromArray(array)
这个被观察者是没有处理任何事件
-
触发一个固定事件获取数据-
Cold
这种一般用于异步获取数据,比如网络请求:
//模拟一个耗时3秒的请求 String getData(){ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "test"; } Observable.create((ObservableOnSubscribe<String>) e -> { try { System.out.println("start#"+System.currentTimeMillis()); String data = getData(); e.onNext(data); //一般这种获取数据方式的被观察者,获取到一个数据后就调用onComplete e.onComplete(); }catch (Exception e1){ e.onError(e1); } }).observeOn(Schedulers.newThread()) .subscribe(data->{ System.out.println("data#"+data); System.out.println("end#"+System.currentTimeMillis()); });
结果
start#1493776191045 data#test end#1493776194051
这种方式创建的Observable也是
冷
的,可以在Demo去掉subscribe
订阅,就会发现实际这个Observable什么事情都没做。
-
创建一个当前状态的Observable-
Hot
这种Observable实际运用中非常有用、方便、简洁。比如对Activity的生命周期,可以做一个Observable,当生命周期改变时,对其发射数据。对TextView的text变化创建一个Observable,每当text变化的时候发射给订阅者,等等。
这里已TextView text变化为例:
这里使用
BehaviorSubject
而不使用PublishSubject
是因为,在实际使用中我们需要从订阅开始那一刻就知道目前TextView的text是什么,使用BehaviorSubject
会在订阅那一刻将最后一次发射的数据发射给订阅者,而PublishSubject
则是订阅开始后,直到下次有text变化,那订阅者才能接收到第一个数据。创建一个工具类
TextViews.java
public static Observable<CharSequence> textChange(final TextView textView){ //创建Observable,并对其设置默认值 BehaviorSubject<CharSequence> observable = BehaviorSubject.createDefault(textView.getText()); TextWatcher textWatcher = new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) {} @Override public void onTextChanged(CharSequence s, int start, int before, int count) {} @Override public void afterTextChanged(Editable s) { observable.onNext(s); Log.d("RxJava","onChange"); } }; textView.addTextChangedListener(textWatcher); //doOnDispose 当取消订阅时候响应,同时取消监听 return observable.doOnDispose(()->{ textView.removeTextChangedListener(textWatcher); observable.onComplete(); }); }
使用
EditText editText = (EditText) findViewById(R.id.edit); final Disposable disposable = TextViews.textChange(editText) .subscribe(text-> Log.d("RxJava","text#"+text)); //点击按钮取消监听 findViewById(R.id.cancel).setOnClickListener(v ->{ if(!disposable.isDisposed()){ disposable.dispose(); } });
在后续实际开发中基本只需要关心怎么使用
textChange
所返回的数据,而其变为Observable之后操作很多事件就变得很方便比如将其变为监听text长度
TextViews.textChange(editText) .map(CharSequence::length) .subscribe(len-> Log.d("RxJava","text#"+len));
对于一个button,当Edittext的text长度大于4才能点击
TextViews.textChange(editText) .map(CharSequence::length) .map(len->len>4) .subscribe(button::setEnabled);
总结
关于创建Observable的方式还有很多,这里简单说了我自己最常用的部分。希望大家能给大家带来一定的帮助。如果有错误的地方指出来,大家都共同学习下。