RxJava使用(二)

1.线程控制

正常情况下, 上游和下游是工作在同一个线程中的, 也就是说上游在哪个线程发事件, 下游就在哪个线程接收事件.

也就是说

    当我们在主线程中去创建一个上游Observable来发送事件, 则这个上游默认就在主线程发送事件.
    当我们在主线程去创建一个下游Observer来接收事件, 则这个下游默认就在主线程中接收事件,
    上下游默认是在同一个线程工作.

要实现线程切换,调用

   .subscribeOn(Schedulers.newThread())                                              
   .observeOn(AndroidSchedulers.mainThread())  

subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程.

    多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
    多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.

在RxJava中, 已经内置了很多线程选项供我们选择:

Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程

2.变换操作符map

它的作用就是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化.
借用图


image.png

图中map中的函数作用是将圆形事件转换为矩形事件, 从而导致下游接收到的事件就变为了矩形.用代码来表示这个例子就是:

Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer integer) throws Exception {
            return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, s);
        }
    });

在上游我们发送的是数字类型, 而在下游我们接收的是String类型, 中间起转换作用的就是map操作符

3.操作符FlatMap

FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.


image.png

中间flatMap的作用是将圆形的事件转换为一个发送矩形事件和三角形事件的新的上游Observable.
欲知详情请看分解


image.png

上游每发送一个事件, flatMap都将创建一个新的水管, 然后发送转换之后的新的事件, 下游接收到的就是这些新的水管发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 也就是图中所看到的, 并不是事件1就在事件2的前面. 如果需要保证顺序则需要使用concatMap.
  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("I am value " + integer);
            }
            return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, s);
        }
    });

我们在flatMap中将上游发来的每个事件转换为一个新的发送三个String事件的水管, 为了看到flatMap结果是无序的,所以加了10毫秒的延时。

concatMap操作符和flatMap用法一样,只是concatMap它的结果是严格按照上游发送的顺序来发送的
           api.register(new RegisterRequest())            //发起注册请求
            .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
            .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求注册结果
            .doOnNext(new Consumer<RegisterResponse>() {
                @Override
                public void accept(RegisterResponse registerResponse) throws Exception {
                    //先根据注册的响应结果去做一些操作
                }
            })
            .observeOn(Schedulers.io())                 //回到IO线程去发起登录请求
            .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {
                @Override
                public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {
                    return api.login(new LoginRequest());
                }
            })
            .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求登录的结果
            .subscribe(new Consumer<LoginResponse>() {
                @Override
                public void accept(LoginResponse loginResponse) throws Exception {
                    Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show();
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show();
                }
            });

4.操作符Zip

Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。


image.png

我们有两根水管,其中一根水管负责发送圆形事件 , 另外一根水管负责发送三角形事件 , 通过Zip操作符, 使得圆形事件 和三角形事件 合并为了一个矩形事件 .


image.png
组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形1 事件和三角形B 事件进行合并, 也不可能出现圆形2 和三角形A 进行合并的情况.
最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量 相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了.
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {         
@Override                                                                                      
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                   
    Log.d(TAG, "emit 1");                                                                      
    emitter.onNext(1);                                                                         
    Thread.sleep(1000);                                                                        
                                                                                               
    Log.d(TAG, "emit 2");                                                                      
    emitter.onNext(2);                                                                         
    Thread.sleep(1000);                                                                        
                                                                                               
    Log.d(TAG, "emit 3");                                                                      
    emitter.onNext(3);                                                                         
    Thread.sleep(1000);                                                                        
                                                                                               
    Log.d(TAG, "emit 4");                                                                      
    emitter.onNext(4);                                                                         
    Thread.sleep(1000);                                                                        
                                                                                               
    Log.d(TAG, "emit complete1");                                                              
    emitter.onComplete();                                                                      
}                                                                                              
}).subscribeOn(Schedulers.io());                                                                   
                                                                                              
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {           
@Override                                                                                      
public void subscribe(ObservableEmitter<String> emitter) throws Exception {                    
    Log.d(TAG, "emit A");                                                                      
    emitter.onNext("A");                                                                       
    Thread.sleep(1000);                                                                        
                                                                                               
    Log.d(TAG, "emit B");                                                                      
    emitter.onNext("B");                                                                       
    Thread.sleep(1000);                                                                        
                                                                                               
    Log.d(TAG, "emit C");                                                                      
    emitter.onNext("C");                                                                       
    Thread.sleep(1000);                                                                        
                                                                                               
    Log.d(TAG, "emit complete2");                                                              
    emitter.onComplete();                                                                      
}                                                                                              
}).subscribeOn(Schedulers.io());                                                                   
                                                                                               
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {               
@Override                                                                                      
public String apply(Integer integer, String s) throws Exception {                              
    return integer + s;                                                                        
}                                                                                              
 }).subscribe(new Observer<String>() {                    
@Override                                                                                      
public void onSubscribe(Disposable d) {                                                        
    Log.d(TAG, "onSubscribe");                                                                 
}                                                                                              
                                                                                               
@Override                                                                                      
public void onNext(String value) {                                                             
    Log.d(TAG, "onNext: " + value);                                                            
}                                                                                              
                                                                                               
@Override                                                                                      
public void onError(Throwable e) {                                                             
    Log.d(TAG, "onError");                                                                     
}                                                                                              
                                                                                               
@Override                                                                                      
public void onComplete() {                                                                     
    Log.d(TAG, "onComplete");                                                                  
}                                                                                              
 })

使用subscribeOn(Schedulers.io()); 是使它们不在同一个线程

诶! 这下就对了嘛, 两根水管同时开始发送, 每发送一个, Zip就组合一个, 再将组合结果发送给下游.

不对呀! 可能细心点的朋友又看出端倪了, 第一根水管明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢?

这是因为我们之前说了, zip发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 在这个例子里我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢? 所以本着节约是美德的思想, 就干脆打断它的狗腿, 不让它发了.

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容