Java 是响应式编程(Reactive Programming)在 Java 语言上的实现
主要内容:
RxJava 简单介绍
观察者模式实现
Subscriber 使用
Action 使用
Scheduler 使用
Function 使用
map、flatMap、concatMap 使用
filter、toList使用
关于 RxJava 2.x
RxJava 简单介绍
参考
github 地址 - ReactiveX/RxJava:https://github.com/ReactiveX/RxJava
RxJava 1.x javadoc:http://reactivex.io/RxJava/1.x/javadoc/
观察者模式 Java:http://blog.csdn.net/u012005313/article/details/72236997
官方介绍:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
RxJava 是一个在Java虚拟机上实现的响应式扩展:一个使用可观察序列构成异步和基于事件编程的库
RxJava 扩展了观察者模式(Observer Pattern),是一个全新的编程思想(响应式编程)的体现
在学习过程中,比较难以入门。官方文档比较难以理解,网上的博客也并没有很完整的讲解,尤其现在发布了两个版本 - RxJava 1.x / RxJava 2.x, 所以理解其中编程思维还是很困难的。没办法,只能多看,多练了。
Note:目前 RxJava 有两个版本 - RxJava 1.x 和 RxJava 2.x,这两个版本并不兼容,RxJava 2.x 在 RxJava 1.x 的基础上有所发展,同时舍弃了部分内容。由于 RxJava 2.x 发行的时间不长,目前网上的资料也不太多,更多的讲解和资料都是关于 RxJava 1.x 的,所以下面先学习 RxJava 1.x 的内容,再补充 RxJava 2.x 的内容
gradle 依赖
在工程中加入 RxJava 1.x 和 RxAndroid 1.x 依赖:
compile 'io.reactivex:rxjava:1.3.0'
compile 'io.reactivex:rxandroid:1.2.1
12
观察者模式实现
介绍
Observable:被观察者类,这个类就是观察者模式中的被观察对象。当触发某些事件后,就可以向观察者推送通知
Observer:观察者接口,当订阅了 Observable,Observable 就可以调用 Observer 接口函数对其进行通知
现在用 RxJava 实现一个简单的观察者模式
首先创建一个被观察者对象:
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello World");
subscriber.onNext("Hi zj");
subscriber.onCompleted();
}
});
123456789
然后创建一个观察者对象:
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: s = " + s);
}
};
12345678910111213141516
观察者对象订阅被观察者对象:
observable.subscribe(observer);
1
欧了,就是这么的简单!!!
实现功能:被观察对象调用观察者接口方法 onNext 传输字符串,然后调用 onCompleted 结束,观察者对象收到通知后,打印出得到的字符串
实现结果:
Observer 接口详解
Observer 接口有三个函数:onNext、onCompleted、onError
onNext - void onNext(T t):该方法用于被观察者向观察者的信息传送。当被观察者调用 onCompleted 或者 onError 后,将不再传输 onNext 方法到观察者
onCompleted - void onCompleted():该方法用于通知观察者,被观察者结束发送信息了。当被观察者已经调用 onError 方法后,其后的 onNext 和 onCompleted 方法都将无效
onError - void onError(java.lang.Throwable e):该方法用于通知观察者,自己遭遇了一个错误,结束发送信息了。当被观察者已经调用 onError 方法后,其后的 onNext 和 onCompleted 方法都将无效
Observable 类介绍
Observable 类内容比较复杂,还没有完全理清,下面就记录关于如何创建 Observable,以及如何进行订阅
上面用到了 Observable.create 方法:
@Deprecated
public static Observable create(Observable.OnSubscribe f)
指定传送信息的类型(比如 String)
也可以直接输入一组数据的方法 Observable.from:
public static Observable from(T[] array)
实现代码:
String[] arr = {"Hello World", "Hi zj"};
Observable<String> observable = Observable.from(arr);
12
或者使用方法 Observable.just:
public static Observable just(T value)
实现代码:
Observable<String> observable = Observable.just("Hello World", "Hi zj");
1
链式编程
RxJava 可以实现链式编程,修改上面的程序如下:
Observable.just("Hello World", "Hi zj")
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: s = " + s);
}
});
1234567891011121314151617
Note:Observable 的信息发送和 Observer 的信息处理均发生在 subscribe(订阅)之后
Subscriber 使用
Subscriber 是一个抽象类,继承了 Observer 接口:
public abstract class Subscriber<T> extends java.lang.Object
implements Observer<T>, Subscription
12
Subscriber 同时扩展了一些新的函数,可以使用 Subscriber 类替代 Observer 接口。
比如Subscriber 提供了手动取消订阅的方法 unsubscribe:
public final void unsubscribe()
当在函数 onCompleted 之前调用 unsubscribe,将会不再接收接下来的信息
调用 unsubscribe 还可以避免内存泄漏的风险,所以尽量在不需要使用后调用此函数
参考:RxJava的使用
也可以查询是否订阅 - isUnsubscribed:
public final boolean isUnsubscribed()
返回为 true 表明已取消订阅
Subscriber 类还新增了方法 onStart:
public void onStart()
该方法在被观察者已经和观察者连接,但还没有发出通知之前被调用,比如弹出进度条,增加有用的初始化信息
使用 Subscriber 类实现观察者模式如下:
Observable.just("Hello", " World", "Hi", " zj")
.subscribe(new Subscriber<String>() {
@Override
public void onStart() {
super.onStart();
Log.e(TAG, "onStart: " + this.isUnsubscribed());
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " + this.isUnsubscribed());
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
Log.e(TAG, "onError: " + this.isUnsubscribed());
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: " + s);
if (s.equals("Hi")) {
this.unsubscribe();
}
Log.e(TAG, "onNext: " + this.isUnsubscribed());
}
});
123456789101112131415161718192021222324252627282930
实现结果:
Note:接下来将使用 Subscriber 替代 Observer
Action 使用
Action 是一个接口,没有任何函数或参数,但是其他 ActionX(X 表示数字)接口都继承自它,ActionX(X 表示数字)接口仅有一个函数 call,其参数随数字 X 的变化而增大
当没有参数时,可以使用 Action0 接口:
public interface Action0
extends Action
它有单个函数 call:
void call()
1
当有单个参数时,可以使用 Action1 接口:
public interface Action1
extends Action
它的函数 call 有一个参数:
void call(T t)
1
以此类推,直到 Action9 接口,如果需要大于 9 个参数的接口,可以使用 ActionN:
public interface ActionN
extends Action
12
其函数 call 的参数为数组形式:
void call(java.lang.Object... args)
1
使用 Action1 替代 Observer 或者 Subscriber
从上面描述可知,可以显式创建 Observer 接口或者 Subscriber 类作为观察者对象,RxJava 同样也提供了 Action 接口来隐式处理被观察者发送的通知
public final Subscription subscribe(Action1<? super T> onNext)
public final Subscription subscribe(Action1<? super T> onNext,
Action1<java.lang.Throwable> onError)
public final Subscription subscribe(Action1<? super T> onNext,
Action1<java.lang.Throwable> onError,
Action0 onCompleted)
123456
RxJava 提供了三种方式,可以仅处理观察者发出的 onNext 方法,也可以处理 onNext 和 onError,或者实现 onNext,onError,onCompleted,示例代码如下:
Action1<String> onNext = new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "call: " + s);
}
};
Action1<Throwable> onError = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, "call: " + throwable.getMessage());
}
};
Action0 onCompleted = new Action0() {
@Override
public void call() {
Log.e(TAG, "call: ");
}
};
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onNext("Hi");
subscriber.onNext("zj");
Throwable e = new Throwable("eeeeeeee");
subscriber.onError(e);
}
});
Observable.just("Hello World", "Hi zj")
.subscribe(onNext);
observable.subscribe(onNext, onError);
observable.subscribe(onNext, onError, onCompleted);
12345678910111213141516171819202122232425262728293031323334353637383940
结果:
Note:如果被观察者发送了 onError 后,必须在订阅时也实现该方法
Scheduler 使用
参考
RXjava observeOn subscribeOn 解析:http://blog.csdn.net/lazyer_dog/article/details/52586069
RxJava observeOn()与subscribeOn()的关系:http://blog.csdn.net/jdsjlzx/article/details/51685769
介绍
上面描述的程序运行均在同一线程内,使用 Scheduler 类可以实现异步处理
public abstract class Scheduler
extends java.lang.Object
12
Scheduler 可以调度不同任务在不同的线程内。RxJava 已经封装了一些常用的 Scheduler 对象,可以在 Schedulers 类中找到:
public final class Schedulers
extends java.lang.Object
12
当进行 io 操作时,可以使用 Schedulers.io:
public static Scheduler io()
当进行密集计算工作时,可以使用 Schedulers.computation:
public static Scheduler computation()
Note:不要混淆上面两个 Scheduler 对象,当进行计算操作时,就调用 computation;当进行io 操作时,就调用 io
* 如果仅需要新建一个线程,可以使用 Schedulers.newThread:
public static Scheduler newThread()
1
对于 Android 开发,还可以调用 AndroidThread.mainThread 来指定主线程:
public static Scheduler mainThread()
observeOn、subscribeOn 和 doOnSubscribe
subscribeOn:
public final Observable<T> subscribeOn(Scheduler scheduler)
1
官方介绍:
Asynchronously subscribes Observers to this Observable on the specified Scheduler.
If there is a create(Action1, rx.Emitter.BackpressureMode) type source up in the chain, it is recommended to use subscribeOn(scheduler, false) instead to avoid same-pool deadlock because requests pile up behind a eager/blocking emitter.
12
observeOn:
public final Observable<T> observeOn(Scheduler scheduler)
1
官方介绍:
Modifies an Observable to perform its emissions and notifications on a specified Scheduler, asynchronously with a bounded buffer of RxRingBuffer.SIZE slots.
Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous. If strict event ordering is required, consider using the observeOn(Scheduler, boolean) overload.
12
doOnSubscribe:
public final Observable<T> doOnSubscribe(Action0 subscribe)
1
官方介绍:
Modifies the source Observable so that it invokes the given action when it is subscribed from its subscribers. Each subscription will result in an invocation of the given action except when the source Observable is reference counted, in which case the source Observable will invoke the given action for the first subscription.
1
通过网上博文学习到:
observeOn 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;
Note:调用 observeOn 后箭头颜色变化了,说明 observeOn 仅改变之后操作所在线程
subscribeOn 影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
Note:subscribeOn 的使用改变了前后的箭头颜色,说明 subscribeOn 改变的是被观察者所在的线程
当被观察者需要在订阅后,运行前设置初始信息,比如进度条,可以调用 doOnSubscribe。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程
Note:调用 doOnSubscribe 后,将会运行在订阅后最开始的位置
示例:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Log.e(TAG, "create call: " + Process.myTid());
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
Log.e(TAG, "map call: " + Process.myTid());
return integer + 10;
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
Log.e(TAG, "doOnSubscribe call: " + Process.myTid());
Log.e(TAG, "call: Hello");
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "onNext call: " + Process.myTid());
Log.e(TAG, "call: " + integer);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, "onError call: " + Process.myTid());
Log.e(TAG, "call: " + throwable.getMessage());
}
});
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
结果:
Function 使用
Function 接口使用和 Action 接口类似,区别只是 Function 接口中函数 call 有返回值,比如 Func1 接口:
public interface Func1<T,R>
extends Function
12
泛型 T 表示参数类型,泛型 R 表示返回值类型。call 函数格式如下:
R call(T t)
1
map、flatMap、concatMap 使用
参考
RxJava(三) flatMap操作符用法详解:http://blog.csdn.net/johnny901114/article/details/51532776
map
public final <R> Observable<R> map(Func1<? super T,? extends R> func)
1
官方介绍:
Returns an Observable that applies a specified function to each item emitted by the source Observable and emits the results of these function applications.
1
流程图:
调用函数 map 后,可以转换传输信息的类型,比如从一个类中取出某个字段,将图像 id 值转换为 Bitmap:
Observable.just(android.R.drawable.alert_dark_frame)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.io())
.map(new Func1<Integer, Bitmap>() {
@Override
public Bitmap call(Integer integer) {
return BitmapFactory.decodeResource(getResources(), integer);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
image.setImageBitmap(bitmap);
}
});
12345678910111213141516
先开启新线程,传送 Integer 类型信息;
然后在 io 线程中加载图像,从而传输信息类型转为 Bitmap;
最后转回主线程,显示图像
flatmap
public final <R> Observable<R> flatMap(Func1<? super T,? extends Observable<? extends R>> func)
1
官方介绍:
Returns an Observable that emits items based on applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then merging those resulting Observables and emitting the results of this merger.
1
流程图:
flatMap 方法获取了原来的被观察者发送的 item,然后转换为一个 Observable 对象,将所有转换后得到的被观察者对象收集起来后,再向下传送(具体使用场景还没有想出来,可能在实际工程中使用后会更加理解)
示例代码如下:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer + 10)
.subscribeOn(Schedulers.newThread());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "call: " + integer);
}
});
1234567891011121314151617
Note:可以对 flatMap 方法生成的 Observable 对象调用 subscribeOn 和 observeOn,这样,每个生成的 Observable 运行在不同的线程中
concatMap
public final <R> Observable<R> concatMap(Func1<? super T,? extends Observable<? extends R>> func)
1
官方介绍:
Returns a new Observable that emits items resulting from applying a function that you supply to each item emitted by the source Observable, where that function returns an Observable, and then emitting the items that result from concatenating those resulting Observables.
1
流程图:
作用和 flatMap 类似,唯一区别就是返回结果是按输入顺序返回的,而 flatMap 的顺序不定。
示例代码:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer integer) {
return Observable.just(integer + 10)
.subscribeOn(Schedulers.newThread());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "call: " + integer);
}
});
1234567891011121314151617
filter、toList 使用
filter
public final Observable<T> filter(Func1<? super T,java.lang.Boolean> predicate)
1
官方介绍:
Filters items emitted by an Observable by only emitting those that satisfy a specified predicate.
1
顾名思义,就是筛选被观察者发送的通知。流程图如下:
示例 - 过滤掉偶数:
Observable.just(1, 2, 3, 4, 5, 6)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "call: " + integer);
}
});
12345678910111213
toList
public final Observable<java.util.List<T>> toList()
1
将被观察者发送的所有信息结合为单个列表,然后发送给观察者
流程图如下:
示例:
Observable.just(1, 2, 3, 4, 5, 6)
.toList()
.subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for (Integer i : integers) {
Log.e(TAG, "call: " + i);
}
}
});
---------------------
作者:编号1993
来源:CSDN
原文:https://blog.csdn.net/u012005313/article/details/72818125
版权声明:本文为博主原创文章,转载请附上博文链接!