Rxjava 基础原理之总结

简述:

a library for composing asynchronous and event-based programs using obaservable for the
Java VM (一个对于构成使用的Java虚拟机观察序列号异步和基于事件的程序库)。

总结:随着程序逻辑变得越来越复杂,它依然能够保持简洁。

RxJava引入的目的:异步,代码更清晰

优点:采用观察者模式链式调用,简洁明了,以往实现子线程主线程切换需自己手动new Thread(推送线程池),
并且线程之间还需要使用handler进行通信,Rxjava一步到位,极其简单。

1.基础概念:

Observable(əbˈzɜ:vəbl):在观察者模式中称为“被观察者”

Observer(əbˈzɜ:və(r)):观察者模式中的“观察者”,可接收Observeable发送的数据

subscribe(səbˈskraɪb):订阅,观察者与被观察者,通过subscribe()方法进行订阅

subscriber(səbˈskraɪbə(r)):也是一种观察者,在2.0中它与Observer没什么实质的区别,不同的是Subscriber与Flowable联合使用

Flowable(f'laʊəbl):也是悲观餐者的一种,与Subscriber进行配合使用,实现背压操作

RxJava的异步实现方式:
    让Observable开启子线程执行耗时操作,完成耗时操作后,触发回调,通知Observer进行主线程UI更新。
    如此轻松便可以实现Android中的异步,且代码简洁明了,集中分布。
    RxJava中默认Observer和Observable都在同一线程执行任务。

RxJava的使用Action

2.Rxjava常用操作符

from()操作符:

接受数组或集合,返回一个按参数列表顺序发射这些数据的Observable。
源码:
public final static <T> Observable<T> from(Iterable<? extents T> iterable){
    return create(new OnSubscribeFromIterable<T>(iterable));
}
例如:
String[] array = {"Amy","Rookie","MLXG"};
Observable.from(array)
    .subscribe(new Observer<String>(){
        ...
    });

just()操作符:

接受1-9个参数,它们还可以是不同类型,返回一个按参数列表顺序发射这些数据的Observable。
例如:
    Observable.just(1,2.4,"adb")
        .subscribe(new Action1<String>(){
            ...
        });

map()操作符:

把原来的Observable对象转换成另一个Observable对象,方便Observer获得想要的数据形式,一对一 
列如:
    Observable.just("images/logo.png")              //输入类型 String
        .map(new Func1<String,Bitmap>(){
            @Verride
            public Bitmap call(String filePath){    //参数类型 String
                return getBitmapFromPath(filePath); //返回类型 Bitmap
            }
        })
        .subscribe(new Action1<Bitmap>(){
            @Override
            public void call(Bitmap bitmap){        //参数类型 bitmap
                showBitmap(bitmap)
            }
        });

flatMap()操作符:

返回任何它想返回的Observable对象,一对多 
列如:
    Student[] students = ...?;
    Subscriber<Course> subscriber = new Subscriber<Course>(){
        @Override
        public void onNext(Course course){
            ...
        }
    };
    Observable.from(students)
        .flatMap(new Func1<Student,Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                return Observable.from(student.getCourses());
            }
    })
    .subscribe(subscriber);

filter()操作符:

Func中对每项元素进行过滤处理,满足条件的元素才会继续发送,下面的过滤偶数。
列如:
    Observable.just(2,3,23,54,15)
        .filter(new Func1<Integer,Boolean>() {
            @Override
            public Boolean call(Integer integer){
                return integer % 2 == 0;
            }
        })
        .subscribe(new Observer<Integer>(){
            @Override
            public void onNext(Integer integer){
                ...
            }
            ...
        });

take()操作符:

输出最多指定数量的结果
列如:
    Observable.just(1,2,3,4)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .take(3)    //只发送前三个事件 
        ...

doOnNext()操作符:

用来在观察者Observer.onNext()方法调用之前进行一些初始化操作,保存/缓存网络结果
例如:
    Observable.just(1,2,3)
        .doOnNext(new Action1<Integer>(){
            @Override
            public void call(Integer integer){
                ...
            }
        })
        ...

Merge()操作符:

合并多个Observable,按照加入的Observable顺序将各个元素传递。
例如:
    Observable<Integer> obserable1 = Observable.just(2,12,34,32);
    Observable<Integer> obserable2 = Observable.just(32,12,43,2);
    Observable.merge(observable1,observable2)
            .subscribe(new Observer<Integer>(){
        ...
    });

zip()操作符:

将各个Observable个对应位置各个元素取出做操作,然后将结果传递。
例如:
    Observable<Integer> observable1 = Observable.just(1,2,3);
    Observable<Integer> observable2 = Observable.just(11,22);
    Observable.zip(observable1,observable2,newFunc2<Integer,Integer,Integer>(){
        @Override
        public Integer call(Integer integer1,Integer integer){
            return integer1+integer2;
        }   
    })
    .subscribe(new Observable<Integer>(){
        ...
        @Override
        public void onNext(Integer integer){
            //out 12、24、3
        }
    });

3.Scheduler(调度号)切换线程

Schedulers.immediate(): 
    直接在当前线程运行,相当于不指定线程,默认

Schedulers.newThread():
    总是启动新线程,并在新线程操作

Schedulers.io():
    用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;
    对于普通的计算任务,请使用Schedulers.computation();
    Schedulers.io()默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。

Schedulers.computation():
    用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量       
Schedulers.trampoline():
    当其它排队的任务完成后,在当前线程排队开始执行。

SubscribeOn\ObserveOn

subscribeOn():
    指定Observable(被观察者)所在的线程,或叫做事件产生的线程。
observeOn():
    指定Observer(观察者)所运行在的线程,或叫做事件消费的线程。

4.Fowable与Subscriber

当被观察者发射数据的速度大于观察者接收处理数据的速度,造成观察者的调度器中数据缓冲池无限堆积,
超出了缓冲池的最大容量,导致OOM.
例如:
Observable.create(new ObservableOnSubscribe<String>(){
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception{
            int a = 0;
            while(true){
                e.onNext("data:"+(i++));
            }
        }
    }).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())  
        .subscribe(new Consumer<String>(){
            @Override
            public void accept(String s) throws Exception{
            Thread.sleep(2000);
            print(s);
        }
    });

而此时Flowable的背压策略就很好的解决这个问题.
例如:

    Flowable.create(new FlowableOnSubscribe<String>(){
            @Override
            public void subcribe(@NonNull FlowableEmitter<String> e) throws Exception{
                int i = 0;
                while(true){
                    e.onNext("data:"+(i++));
                }
            }
        },BackpressureStrategy.DROP)    //超出缓冲池的数据丢弃
            .subecribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<String>(){
                Subscription subscription;
                @Override
                public void onSubsrcibe(Subscription s){
                    subscription = s;
                    subscription.request(1);
                }
                @Override
                public void onNext(String s){
                    try{
                        Thread.sleep(2000);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                    printThred(2);
                    subscription.request(1);    //处理完了,再请求数据
                }
                
                ...
        });

该背压策略是超出缓冲池的数据被丢弃,而观察者要求处理一个 发送我一个数据。

Backpressure的策略

a.被观察者和观察者在异步线程的情况下,如果被观察者发射事件的速度大于
  观察者接收事件的速度,就会产生Backpressure问题。
  但是同步情况下,Backpressure问题不会存在。

b.Backpressure的策略仅仅是调度Subscriber接收事件,并不影响Flowable
  发送事件。观察者可以根据自身实际情况按需拉取数据,而不是被动接收。
  最终实现了上游被观察者发送事件的速度的控制,实现了背压的策略。

c.Backpressure的策略有5种:ERROR,BUFFER,DROP,LATEST,MISSING

ERROR:

用来缓存观察者处理不了暂时缓存下来的数据,缓冲池的默认大小为128,即只能缓存128个事件。
如果缓冲池溢出,就会立刻抛出MissingBackpressureException异常。

BUFFER:

即把默认容器为128的缓存池成一个大的缓存池,支持很多的数据,这种方式
比较消耗内存。

DROP:

当消费者处理不了的时候就丢弃,消费者通过request传入其需求n(事件个数),
然后生产着把n个事件传递给消费者供其消费,其他消费不掉的丢弃。

LATEST:

基本和DROP一致,消费者通过request传入需求n,然后生产者把n个事
件传递给消费者供其消费,其他消费不掉的事件就丢弃。
唯一区别是LATEST总能使消费者能够接收到生产者产生的最好一个事件。

MISSING:

没有缓冲池,接收第一个数据之后,后面的都丢弃。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • 在正文开始之前的最后,放上GitHub链接和引入依赖的gradle代码: Github: https://gith...
    苏苏说zz阅读 673评论 0 2
  • 转一篇文章 原地址:http://gank.io/post/560e15be2dca930e00da1083 前言...
    jack_hong阅读 902评论 0 2
  • 前言我从去年开始使用 RxJava ,到现在一年多了。今年加入了 Flipboard 后,看到 Flipboard...
    占导zqq阅读 9,158评论 6 151
  • 一、Retrofit详解 ·Retrofit的官网地址为 : http://square.github.io/re...
    余生_d630阅读 1,807评论 0 5
  • 《异类》的作者是马尔科姆格拉德威尔,《引爆点》也是出自他之手。 《异》这本书想讲一个道理——成功人士的成功,很大程...
    英子Lucy阅读 222评论 0 0