Rx学习笔记和总结

RxAndroid学习笔记和总结

前言

rx系列貌似在前一阶段火起来的,也是自己接触的少,一直没有去学习,今天就趁着周六,脑补一下。

什么是Rx

  • Rx是响应式编程的意思,本质上就是观察者设计模式,是以观察者(Observer)和订阅者(Subscriber)为基础的异步响应方式
  • 在Android编程的时候,经常使用后台线程,那么就可以使用这种方式,能够使得逻辑比较清晰明了(有的人说会增加好多的代码,但是我觉得代码的链式结构让代码看起来更加简洁明了)

Rx模式以及有点

优势一

  • 创建:Rx可以方便的创建事件流和数据流
  • 组合:Rx使用查询式的操作符合组合和变换数据流
  • 监听:Rx可以订阅任何可观察的数据量并执行操作

优势二(简化代码)

  • 函数式风格:对可观察数据流使用无副作用的输入流输出函数,避免了程序里面的错综复杂的状态
  • 简化代码:Rx的操作符通常可以将复杂的难题简化成很少的几行代码(配合lambda表达式还能简化)
  • 异步错误处理:Rx提供了何时的错误处理机制
  • 轻松使用并发:Rx的Observables和Schedulers让开发着可以很方便的切换UI线程和子线程,摆脱底层的线程同步和各种并发问题

响应式编程

Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和符合变得非常高效。
你可以把Observable当做Iterable的推送方式的等价物,使用Iterable,消费者从生产者那拉取数据,线程阻塞直至数据准备好,使用Observable,在数据准备好的时候,生产者将数据推送给消费者,数据可以同步或者异步的到达,方式更加灵活。

RxJava观察者模式

  • 需求:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间做出反应。
  • RxJava四个基本概念
    • Observable(被观察者)
    • Observer(观察者)
    • subscribe(订阅)
    • 事件
  • Observable和Observer通过subscribe()方法实现订阅的关系,从而Observable可以在需要的时候发出事件来通知Observer。

关于理论的知识,网上的介绍的太多了,大家可以去看下,在文章的结尾,我也会附几篇好的文章。

手动实现观察者模式

首先我们需要有观察者和被观察者。

被观察者接口(里面简单的定义添加观察者,移除观察者,通知观察者三个方法)
public interface Watched {
    //添加观察者
    public void addWatcher(Watcher watcher);
    //移除观察者
    public void removeWatcher(Watcher watcher);
    //通知观察者
    public void notifyWathers(String str);
}
观察者接口(定义更新的方法)
public interface Watcher {
    //数据变化进行更新
    public void update(String str);
}

被观察者实现类

public class ConcreteWathed implements Watched {
    //观察者
    List<Watcher> mList = new ArrayList<>();

    @Override
    public void addWatcher(Watcher watcher) {
        mList.add(watcher);
    }

    @Override
    public void removeWatcher(Watcher watcher) {
        mList.remove(watcher);
    }

    @Override
    public void notifyWathers(String str) {
        for (Watcher w : mList) {
            w.update(str);
        }
    }
}
观察者实现类
public class ConcreteWather implements Watcher {
    @Override
    public void update(String str) {
        System.out.println(str);
    }
}
测试类
 public static void main(String[] args){
        Watched watched = new ConcreteWathed();
        Watcher watcher1 = new ConcreteWather();
        Watcher watcher2 = new ConcreteWather();
        Watcher watcher3 = new ConcreteWather();

        watched.addWatcher(watcher1);
        watched.addWatcher(watcher2);
        watched.addWatcher(watcher3);

        watched.notifyWathers("I go");
    }
输出结果
I go
I go
I go

当然了,这只是简单的实现,只要晓得原理就行,除了自己实现,官方也给我们提供了观察者与被观察者接口。只要我们去实现接口就可以了。

利用系统提供的类和接口实现观察者模式

被观察者

public class XTObservable extends Observable {

    private int data = 0;

    public int getData(){
        return data;
    }

    public void setData(int i){
        if (this.data != i){
            this.data = i;
            setChanged();//发生改变
            notifyObservers();//通知观察者
        }
    }
}

观察者


public class XTobserver implements Observer {

    public XTobserver(XTObservable observable) {
        observable.addObserver(this);
    }

    @Override
    public void update(Observable observable, Object o) {
        System.out.println("data is changed" + ((XTObservable) observable).getData());
    }
}

测试类

public class Test {

    public static void main(String[] args) {
        XTObservable mObservable = new XTObservable();
        XTobserver mXTobserver = new XTobserver(mObservable);
        mObservable.setData(1);
        mObservable.setData(2);
        mObservable.setData(3);
    }
}

输出结果

data is changed1
data is changed2
data is changed3

上面已经手动实现观察者模式和通过系统提供类实现,当然这都不是重点,重点是Rx响应式编程

RxAndroid使用

一:使用前配置

在项目工程的build.gradle文件添加这样的一句话(如果使用lambda)

 classpath 'me.tatarka:gradle-retrolambda:2.5.0'(这一句在gradle版本下面紧接着)

在该module工程的build.gradle文件中添加

apply plugin: 'me.tatarka.retrolambda'(使用lambda)在文件的第二行

在buildTypes节点的下(不是节点内)添加下面一句

 compileOptions {
        sourceCompatibility JavaVersion.VERSION_1_8
        targetCompatibility JavaVersion.VERSION_1_8
    }

然后在依赖中添加下面几句(没有提示一定添加的可以根据自己选择性添加)

//rx一定添加
 compile 'io.reactivex:rxjava:1.1.0'
    compile 'io.reactivex:rxandroid:1.1.0'
    compile 'com.google.code.gson:gson:2.4'
    compile 'com.jakewharton:butterknife:7.0.1'
    compile 'com.squareup.picasso:picasso:2.5.2'
    //添加
    compile 'com.squareup.okhttp3:okhttp:3.+'

至此,使用环境已经配置好了,接下来我们来简单的使用一下。

利用create创建来使用Rx

/**
     * 使用create方式
     */
    public static void createObserable() {
        //定义被观察者
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (!subscriber.isUnsubscribed()) { //观察者和被观察者还有订阅消息
                    subscriber.onNext("hello"); //返回的数据
                    subscriber.onNext("hi");
                    subscriber.onNext(getUserName());  //因为是传入的是字符串泛型
                    subscriber.onCompleted(); //完成
                }
            }
        });

        //定义观察者
        Subscriber showSub = new Subscriber() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted");   //用于对话框消失
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getMessage());   //错误处理
            }

            @Override
            public void onNext(Object o) {
                Log.i(TAG, o.toString());
            }
        };

        observable.subscribe(showSub); //两者产生订阅
    }

    /**
     * 可以用来写成我们的下载返回数据
     *
     * @return
     */
    public static String getUserName() {
        return "jsonName";
    }

在主activity中调用,我们来看下控制台输出的结果:


也是一个测试,打印

  /**
     * 打印的功能  链式结构,更加易于代码的可毒性
     */
    public static void createPrint() {
        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    for (int i = 0; i < 10; i++) {
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                }
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getMessage());
            }

            @Override
            public void onNext(Integer integer) {
                Log.i(TAG, "result--->:" + integer);
            }
        });
    }

看下控制台结果


from函数

 /**
     * 使用在被观察者,返回的对象一般都是数据类型
     * 它接收一个集合作为输入,然后每次输出一个元素给subscriber
     */
    public static void from() {
        Integer[] items = {1, 2, 3, 4, 5, 6, 7, 8};
        Observable onservable = Observable.from(items);
        onservable.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                Log.i(TAG, o.toString());
            }
        });
    }

控制台结果


interval函数

/**
     * 指定某一时刻进行数据发送
     * interval()函数的两个参数:一个指定两次发射的时间间隔,另一个是用到的时间单位
     */
    public static void interval() {
        Integer[] items = {1, 2, 3, 4};
        Observable observable = Observable.interval(1, 1, TimeUnit.SECONDS);
        observable.subscribe(new Action1() {
            @Override
            public void call(Object o) {
                Log.i(TAG, o.toString());
            }
        });
    }

just函数

  /**
     * 假如我们只有3个独立的AppInfo对象并且我们想把他们转化为Observable并填充到RecyclerView的item中:
     * 这里我们有两个数组,然后通过转化为Observable组成一个item
     */
    public static void just() {
        Integer[] items1 = {1, 2, 3, 4};
        Integer[] items2 = {2, 4, 6, 8};

        Observable observable = Observable.just(items1, items2);
        observable.subscribe(new Subscriber<Integer[]>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getMessage());
            }

            @Override
            public void onNext(Integer[] integers) {
                for (int i = 0; i < integers.length; i++) {
                    Log.i(TAG, "result--->" + i);
                }
            }
        });
    }

输出结果:


range函数

  /**
     * 指定输出数据的范围
     */
    public static void range() {
        Observable observable = Observable.range(1, 4);
        observable.subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getMessage());
            }

            @Override
            public void onNext(Integer o) {
                Log.i(TAG, "next---->" + o);
            }
        });
    }

输出结果:


filter函数

 /**
     * 使用过滤功能  发送消息的时候,先过滤在发送
     */
    public static void filter() {
        Observable observable = Observable.just(1, 2, 3, 4, 5, 6);
        observable.filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer o) {
                return o < 5;
            }
        }).observeOn(Schedulers.io()).subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.i(TAG, "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, e.getMessage());
            }

            @Override
            public void onNext(Object o) {
                Log.i(TAG, o.toString());
            }
        });
    }

输出结果:



好了,几个常用到的函数已经介绍完了,接下来就用几个例子来说验证一下吧。

使用Rx+OkHttp下载图片

Rx下载的封装
 /**
     * 声明一个被观察者对象,作为结果返回
     */
    public Observable<byte[]> downLoadImage(String path) {
        return Observable.create(new Observable.OnSubscribe<byte[]>() {
            @Override
            public void call(Subscriber<? super byte[]> subscriber) {
                if (!subscriber.isUnsubscribed()) {  //存在订阅关系
                    //访问网络操作
                    //请求体
                    Request request = new Request.Builder().url(path).get().build();
                    //异步回调
                    mOkHttpClient.newCall(request).enqueue(new Callback() {
                        @Override
                        public void onFailure(Call call, IOException e) {
                            subscriber.onError(e);
                        }

                        @Override
                        public void onResponse(Call call, Response response) throws IOException {
                            if (response.isSuccessful()) {
                                byte[] bytes = response.body().bytes();
                                if (bytes != null) {
                                    subscriber.onNext(bytes);  //返回结果
                                }
                            }
                            subscriber.onCompleted();  //访问完成
                        }
                    });

                }
            }
        });
    }
在使用的时候调用
   //使用HTTP协议获取数据
            mUtils.downLoadImageOne(url)
                    .subscribeOn(Schedulers.io())  //在子线程请求
                    .observeOn(AndroidSchedulers.mainThread()) //结果返回到主线程这一步很厉害啊,不用我们去用handler或者async切换线程了
                    // 主要我们去调用一下代码,就已经帮我们切换好了线程,是不是感觉有点很厉害啊
                    .subscribe(new Subscriber<byte[]>() {
                @Override
                public void onCompleted() {
                    Log.i(TAG,"onCompleted");//对话框消失
                }

                @Override
                public void onError(Throwable e) {
                    Log.i(TAG,e.getMessage());
                }

                @Override
                public void onNext(byte[] bytes) {
                    Bitmap bitmap = BitmapFactory.decodeByteArray(bytes,0,bytes.length);
                    mImageView.setImageBitmap(bitmap);
                }
            });

Rx+okhttp实现登录

  /**
     * 
     * @param url  登录地址
     * @param params  请求参数
     * @return   后台返回的数据
     */
    public Observable<String> login(String url, Map<String, String> params) {

        return Observable.create((Observable.OnSubscribe<String>) subscriber -> {
            if (!subscriber.isUnsubscribed()) {
                //创建formbody
                FormBody.Builder builder = new FormBody.Builder();
                if (params != null && !params.isEmpty()) {
                    //循环获取body中的数据
                    for (Map.Entry<String, String> entry : params.entrySet()) {
                        builder.add(entry.getKey(), entry.getValue());
                    }
                }
                //请求体
                RequestBody requestBody = builder.build();
                Request request = new Request.Builder().url(url).post(requestBody).build();
                mOkHttpClient.newCall(request).enqueue(new Callback() {
                    @Override
                    public void onFailure(Call call, IOException e) {
                        subscriber.onError(e);
                    }

                    @Override
                    public void onResponse(Call call, Response response) throws IOException {
                        if (response.isSuccessful()) {
                            //交给观察者处理数据
                            subscriber.onNext(response.body().string());
                        }
                        //完成的回调
                        subscriber.onCompleted();
                    }
                });
            }
        });
    }
登录调用
  Map<String, String> params = new HashMap<String, String>();
            params.put("username", userName.getText().toString().trim());
            params.put("password", passWord.getText().toString().trim());
            mUtils.login(url, params).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.i(TAG, "onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, e.getMessage());
                }

                @Override
                public void onNext(String s) {
                    if (JsonUtils.parse(s)) {
                        Intent intent = new Intent(LoginActivity.this, ContentActivity.class);
                        startActivity(intent);
                    }
                }
            });


如果有想需要代码的,可以看这里,所有代码已经传至github。https://github.com/wuyinlei/RxAndroidDemo
好了,就先介绍到这里吧,这里在给大家推荐几篇比较好的博文还有。

推荐博文

推荐git

结语

Rx使用还是挺方便的,不过需要一定的学习成本,谨慎使用(嘿嘿)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,977评论 25 707
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • afinalAfinal是一个android的ioc,orm框架 https://github.com/yangf...
    passiontim阅读 15,417评论 2 45
  • 也许你会觉得深圳是一座很棒的城市,因为这里接纳了万千年轻人、有梦想的人在这里拼搏、奋发、成功、失败。也许你会觉得深...
    陈左阅读 245评论 0 0
  • 去爬山吧,山上有红红的果子,还有小火车。星期天,几个好朋友相约。 那时候不记得放学后还有作业,我们有无穷的...
    21雪绒花阅读 462评论 0 1