http://www.jianshu.com/p/fe08ce770c15
什么是Rx
Rx是响应式编程的意思,本质上就是观察者设计模式,是以观察者(Observer)和订阅者(Subscriber)为基础的异步响应方式
在Android编程的时候,经常使用后台线程,那么就可以使用这种方式,能够使得逻辑比较清晰明了(有的人说会增加好多的代码,但是我觉得代码的链式结构让代码看起来更加简洁明了)
Rx模式以及有点
优势一
创建:Rx可以方便的创建事件流和数据流
组合:Rx使用查询式的操作符合组合和变换数据流
监听:Rx可以订阅任何可观察的数据量并执行操作
优势二(简化代码)
函数式风格:对可观察数据流使用无副作用的输入流输出函数,避免了程序里面的错综复杂的状态
简化代码:Rx的操作符通常可以将复杂的难题简化成很少的几行代码(配合lambda表达式还能简化)
异步错误处理:Rx提供了何时的错误处理机制
轻松使用并发:Rx的Observables和Schedulers让开发着可以很方便的切换UI线程和子线程,摆脱底层的线程同步和各种并发问题
响应式编程
Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和符合变得非常高效。
你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好,使用Observable,在数据准备好的时候,生产者将数据推送给消费者,数据可以同步或者异步的到达,方式更加灵活。
RxJava观察者模式
需求:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间做出反应。
RxJava四个基本概念
Observable(被观察者)
Observer(观察者)
subscribe(订阅)
事件
Observable和Observer通过subscribe()方法实现订阅的关系,从而Observable可以在需要的时候发出事件来通知Observer。
手动实现观察者模式
首先我们需要有观察者和被观察者。
被观察者接口(里面简单的定义添加观察者,移除观察者,通知观察者三个方法)
publicinterfaceWatched{//添加观察者publicvoidaddWatcher(Watcher watcher);//移除观察者publicvoidremoveWatcher(Watcher watcher);//通知观察者publicvoidnotifyWathers(String str);}
观察者接口(定义更新的方法)
publicinterfaceWatcher{//数据变化进行更新publicvoidupdate(Stringstr);}
被观察者实现类
publicclassConcreteWathedimplementsWatched{//观察者List mList =newArrayList<>();@OverridepublicvoidaddWatcher(Watcher watcher){ mList.add(watcher); }@OverridepublicvoidremoveWatcher(Watcher watcher){ mList.remove(watcher); }@OverridepublicvoidnotifyWathers(String str){for(Watcher w : mList) { w.update(str); } }}
观察者实现类
publicclassConcreteWatherimplementsWatcher{ @Overridepublicvoidupdate(Stringstr) { System.out.println(str); }}
测试类
publicstaticvoid main(String[] args){ Watched watched =newConcreteWathed(); Watcher watcher1 =newConcreteWather(); Watcher watcher2 =newConcreteWather(); Watcher watcher3 =newConcreteWather(); watched.addWatcher(watcher1); watched.addWatcher(watcher2); watched.addWatcher(watcher3); watched.notifyWathers("I go"); }
输出结果
IgoIgoIgo
当然了,这只是简单的实现,只要晓得原理就行,除了自己实现,官方也给我们提供了观察者与被观察者接口。只要我们去实现接口就可以了。
利用系统提供的类和接口实现观察者模式
被观察者
publicclassXTObservableextendsObservable{privateintdata =0;publicintgetData(){returndata; }publicvoidsetData(inti){if(this.data != i){this.data = i; setChanged();//发生改变notifyObservers();//通知观察者} }}
观察者
publicclassXTobserverimplementsObserver{publicXTobserver(XTObservable observable){ observable.addObserver(this); }@Overridepublicvoidupdate(Observable observable, Object o){ System.out.println("data is changed"+ ((XTObservable) observable).getData()); }}
测试类
publicclassTest{publicstaticvoid main(String[] args) { XTObservable mObservable =newXTObservable(); XTobserver mXTobserver =newXTobserver(mObservable); mObservable.setData(1); mObservable.setData(2); mObservable.setData(3); }}
输出结果
datais changed1datais changed2datais changed3
上面已经手动实现观察者模式和通过系统提供类实现,当然这都不是重点,重点是Rx响应式编程
RxAndroid使用
一:使用前配置
在项目工程的build.gradle文件添加这样的一句话(如果使用lambda)
classpath'me.tatarka:gradle-retrolambda:2.5.0'(这一句在gradle版本下面紧接着)
在该module工程的build.gradle文件中添加
applyplugin:'me.tatarka.retrolambda'(使用lambda)在文件的第二行
在buildTypes节点的下(不是节点内)添加下面一句
compileOptions { sourceCompatibility JavaVersion.VERSION_1_8targetCompatibility JavaVersion.VERSION_1_8}
然后在依赖中添加下面几句(没有提示一定添加的可以根据自己选择性添加)
//rx一定添加compile'io.reactivex:rxjava:1.1.0'compile'io.reactivex:rxandroid:1.1.0'compile'com.google.code.gson:gson:2.4'compile'com.jakewharton:butterknife:7.0.1'compile'com.squareup.picasso:picasso:2.5.2'//添加compile'com.squareup.okhttp3:okhttp:3.+'
至此,使用环境已经配置好了,接下来我们来简单的使用一下。
利用create创建来使用Rx
/**
* 使用create方式
*/publicstaticvoidcreateObserable(){//定义被观察者Observable observable = Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {//观察者和被观察者还有订阅消息subscriber.onNext("hello");//返回的数据subscriber.onNext("hi"); subscriber.onNext(getUserName());//因为是传入的是字符串泛型subscriber.onCompleted();//完成} } });//定义观察者Subscriber showSub =newSubscriber() {@OverridepublicvoidonCompleted(){ Log.i(TAG,"onCompleted");//用于对话框消失}@OverridepublicvoidonError(Throwable e){ Log.i(TAG, e.getMessage());//错误处理}@OverridepublicvoidonNext(Object o){ Log.i(TAG, o.toString()); } }; observable.subscribe(showSub);//两者产生订阅}/** * 可以用来写成我们的下载返回数据 * *@return*/publicstaticStringgetUserName(){return"jsonName"; }
在主activity中调用,我们来看下控制台输出的结果:
也是一个测试,打印
/**
* 打印的功能 链式结构,更加易于代码的可毒性
*/publicstaticvoidcreatePrint(){ Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {for(inti =0; i <10; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } } }).subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){ Log.i(TAG,"onCompleted"); }@OverridepublicvoidonError(Throwable e){ Log.i(TAG, e.getMessage()); }@OverridepublicvoidonNext(Integer integer){ Log.i(TAG,"result--->:"+ integer); } }); }
看下控制台结果
from函数
/**
* 使用在被观察者,返回的对象一般都是数据类型
* 它接收一个集合作为输入,然后每次输出一个元素给subscriber
*/publicstaticvoidfrom(){ Integer[] items = {1,2,3,4,5,6,7,8}; Observable onservable = Observable.from(items); onservable.subscribe(newAction1() { @Overridepublicvoidcall(Object o){ Log.i(TAG, o.toString()); } }); }
控制台结果
interval函数
/**
* 指定某一时刻进行数据发送
* interval()函数的两个参数:一个指定两次发射的时间间隔,另一个是用到的时间单位
*/publicstaticvoidinterval(){ Integer[] items = {1,2,3,4}; Observable observable = Observable.interval(1,1, TimeUnit.SECONDS); observable.subscribe(newAction1() {@Overridepublicvoidcall(Object o){ Log.i(TAG, o.toString()); } }); }
just函数
/**
* 假如我们只有3个独立的AppInfo对象并且我们想把他们转化为Observable并填充到RecyclerView的item中:
* 这里我们有两个数组,然后通过转化为Observable组成一个item
*/publicstaticvoidjust(){ Integer[] items1 = {1,2,3,4}; Integer[] items2 = {2,4,6,8}; Observable observable = Observable.just(items1, items2); observable.subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){ Log.i(TAG,"onCompleted"); }@OverridepublicvoidonError(Throwable e){ Log.i(TAG, e.getMessage()); }@OverridepublicvoidonNext(Integer[] integers){for(inti =0; i < integers.length; i++) { Log.i(TAG,"result--->"+ i); } } }); }
输出结果:
range函数
/**
* 指定输出数据的范围
*/publicstaticvoidrange() { Observable observable = Observable.range(1,4); observable.subscribe(newSubscriber() { @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted"); } @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage()); } @OverridepublicvoidonNext(Integero) {Log.i(TAG,"next---->"+ o); } }); }
输出结果:
filter函数
/**
* 使用过滤功能 发送消息的时候,先过滤在发送
*/publicstaticvoidfilter() { Observable observable = Observable.just(1,2,3,4,5,6); observable.filter(newFunc1() { @OverridepublicBooleancall(Integero) {returno <5; } }).observeOn(Schedulers.io()).subscribe(newSubscriber() { @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted"); } @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage()); } @OverridepublicvoidonNext(Object o) {Log.i(TAG, o.toString()); } }); }
输出结果:
好了,几个常用到的函数已经介绍完了,接下来就用几个例子来说验证一下吧。
使用Rx+OkHttp下载图片
Rx下载的封装
/**
* 声明一个被观察者对象,作为结果返回
*/publicObservable downLoadImage(String path) {returnObservable.create(newObservable.OnSubscribe(){@Overridepublicvoidcall(Subscriber subscriber){if(!subscriber.isUnsubscribed()) {//存在订阅关系//访问网络操作//请求体Request request =newRequest.Builder().url(path).get().build();//异步回调mOkHttpClient.newCall(request).enqueue(newCallback() {@OverridepublicvoidonFailure(Call call, IOException e){ subscriber.onError(e); }@OverridepublicvoidonResponse(Call call, Response response)throwsIOException{if(response.isSuccessful()) {byte[] bytes = response.body().bytes();if(bytes !=null) { subscriber.onNext(bytes);//返回结果} } subscriber.onCompleted();//访问完成} }); } } }); }
在使用的时候调用
//使用HTTP协议获取数据mUtils.downLoadImageOne(url) .subscribeOn(Schedulers.io())//在子线程请求.observeOn(AndroidSchedulers.mainThread())//结果返回到主线程这一步很厉害啊,不用我们去用handler或者async切换线程了// 主要我们去调用一下代码,就已经帮我们切换好了线程,是不是感觉有点很厉害啊.subscribe(newSubscriber() {@OverridepublicvoidonCompleted(){ Log.i(TAG,"onCompleted");//对话框消失}@OverridepublicvoidonError(Throwable e){ Log.i(TAG,e.getMessage()); }@OverridepublicvoidonNext(byte[] bytes){ Bitmap bitmap = BitmapFactory.decodeByteArray(bytes,0,bytes.length); mImageView.setImageBitmap(bitmap); } });
Rx+okhttp实现登录
/**
*
* @param url 登录地址
* @param params 请求参数
* @return 后台返回的数据
*/publicObservable login(Stringurl,Mapparams) {returnObservable.create((Observable.OnSubscribe) subscriber -> {if(!subscriber.isUnsubscribed()) {//创建formbodyFormBody.Builder builder =newFormBody.Builder();if(params!=null&& !params.isEmpty()) {//循环获取body中的数据for (Map.Entry entry :params.entrySet()) { builder.add(entry.getKey(), entry.getValue()); } }//请求体RequestBody requestBody = builder.build(); Request request =newRequest.Builder().url(url).post(requestBody).build(); mOkHttpClient.newCall(request).enqueue(newCallback() { @OverridepublicvoidonFailure(Call call, IOException e) { subscriber.onError(e); } @OverridepublicvoidonResponse(Call call, Response response) throws IOException {if(response.isSuccessful()) {//交给观察者处理数据subscriber.onNext(response.body().string()); }//完成的回调subscriber.onCompleted(); } }); } }); }
登录调用
Mapparams=newHashMap();params.put("username", userName.getText().toString().trim());params.put("password", passWord.getText().toString().trim()); mUtils.login(url,params).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()).subscribe(newSubscriber() { @OverridepublicvoidonCompleted() {Log.i(TAG,"onCompleted"); } @OverridepublicvoidonError(Throwable e) {Log.i(TAG, e.getMessage()); } @OverridepublicvoidonNext(Strings) {if(JsonUtils.parse(s)) { Intent intent =newIntent(LoginActivity.this, ContentActivity.class); startActivity(intent); } } });
如果有想需要代码的,可以看这里,所有代码已经传至github。https://github.com/wuyinlei/RxAndroidDemo