RxJava_zip操作符操作流程源码

zip 操作符

zip 的字面意思就是压缩的含义,它可以将两个事件源 Observable 发送的事件通过 zip 操作将结果发送给订阅者 Observer 上。

举个例子:

假如一个页面是负责展示商品的信息的,这个商品的信息包括普通信息(例如价格,参数),还有商品的评价信息。这两类信息来源于两个接口,需求就是在两个接口信息都返回之后,才展示该商品信息。

基于上面这种需求,我们可以将商品普通信息作为一个接口去请求,评价信息作为一个接口去请求。这样就是两个 Observable 对象,通过 zip 操作符将两个 Observable 请求的结果转化成合并成一个商品信息的 Java Bean 之后发送给对应的订阅者,那么这样就完成了需求了。


zip操作符.png

zip 操作

从 Observable 的 zip 操作方法来看,它定义了 3 个泛型

  • T1 第一个 Observable 发送的数据类型;
  • T2 第二个 Observable 发送的数据类型;
  • R 需要通过 zip 操作符转化后的数据类型;

在 Function 接口中提供了将一种数据类型转化为另一个种数据类型的功能,而 BiFunction 接口种提供了将两种数据类型转化为另一种数据类型的功能。

Functions.toFunction(zipper) 目的就是將两个数据转化为为一个数据的操作,转变为将一个 Object[] 转换为一个数据,实际上就是将 BitFunction 的功能转为 Function 的功能。

public static <T1, T2, R> Observable<R> zip(
        ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
        BiFunction<? super T1, ? super T2, ? extends R> zipper) {
    return zipArray(Functions.toFunction(zipper), false, bufferSize(), source1, source2);
}

zipArray

zipArray 方法返回一个 ObservableZip 对象。 它是如何将多个 Observable 发送的事件汇总到一起发送的呢?

在 ObservableZip 内部有一个 ZipCoodinator 类,它负责汇总数据发送功能。

  • row 属性就是用于存放当前需要结合的两个数据的数组。
  • actual 属性就是外界定义的订阅者 Observer

  • zippper 就是上面通过 Functions.toFunction(zipper) 转化而来的接口对象。Function<? super Object[], ? extends R> zipper。

  • observers 它是 ZipObserver 类型的数组。 这是去订阅 Observable 的订阅者数组,有多少个 Observable 那么在 observers 的数组就会创建多少个 ZipObserver 对象。

两个 Observable 事件源是怎么被订阅的?

ZipCoordinator 内部会创建两个 ZipObserver 分别去订阅对应的 Observable。Observable 发送的事件将由 ZipObserver 去接收。

for (int i = 0; i < len; i++) {
    if (cancelled) {
        return;
    }
   // 让每一个订阅者 zipObserver 去订阅对应的事件源。
  // 注意这里有可能是同步/异步。
    sources[i].subscribe(s[i]);
}

订阅者接受事件

每一个事件源发送的事件将会对应的发送到对应的订阅者中去接收。在订阅事件时若是没有做线程切换操作的话,那么 source[i].subscribe[s[i]] 这是同步操作。

@Override
public void onNext(T t) {
    queue.offer(t);
    parent.drain();
}
@Override
public void onError(Throwable t) {
    error = t;
    done = true;
    parent.drain();
}
@Override
public void onComplete() {
    done = true;
    parent.drain();
}

同步操作订阅者接受事件

什么是同步订阅?

同步的意思就是多个 Observable 发送事件是按顺序发送的,因为他们是处于同一个线程中,因此会出现一种现象,就是第一个 Observable 的数据发送完毕之后,第二个 Observable 才能开始发送数据。

按顺序发送数据,那么发送的数据是怎么存储的?

queue 是 ZipObserver 内部维护的存储数据源发送的数据的数据结构。上面提到同步操作,会将第一个 Observable 的数据源发送完毕之后才开始发送第二个 Observable 的数据,那么先前发送的数据就会存放在 queue 中。

  • onNext(T t)

接收事件时,会被回调的一个方法,t 就是事件源发送的数据。

@Override
public void onNext(T t) {
    //将数据 t 保存起来
    queue.offer(t);
    //取出操作,取出的数据不一定是数据 t 哦。
    parent.drain();
}

内部是通过 writeToQueue(buffer, e, index, offset); 方法将数据写入到 buffer 中的。

parent.drain() 方法取出之前通过 offer 存放的数据

public void drain() {
    if (getAndIncrement() != 0) {
        return;
    }
    int missing = 1;
    final ZipObserver<T, R>[] zs = observers;
    final Observer<? super R> a = actual;
    final T[] os = row;
    final boolean delayError = this.delayError;
    for (;;) {
        for (;;) {
            int i = 0;
            int emptyCount = 0;
            for (ZipObserver<T, R> z : zs) {
                if (os[i] == null) {
                    boolean d = z.done;
                    //取出
                    T v = z.queue.poll();
                    boolean empty = v == null;
                    if (checkTerminated(d, empty, a, delayError, z)) {
                        return;
                    }
                    if (!empty) {
                        os[i] = v;
                    } else {
                        emptyCount++;
                    }
                } else {
                    if (z.done && !delayError) {
                        Throwable ex = z.error;
                        if (ex != null) {
                            clear();
                            a.onError(ex);
                            return;
                        }
                    }
                }
                i++;
            }
            if (emptyCount != 0) {
                break;
            }
            R v;
            try {
                v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                clear();
                a.onError(ex);
                return;
            }
            a.onNext(v);
            Arrays.fill(os, null);
        }
        missing = addAndGet(-missing);
        if (missing == 0) {
            return;
        }
    }
}

这个方法调用对应的 ZipObserver.queue.poll() 方法取出一个元素。

T v = z.queue.poll();

我们上面讲到 raw 属性,这个属性就是用存放即将合并发送的两个数据的数据。

os[i] = v;

转换数据并发送数据


///转换数据
R v;//类型 R 就是最重要转化后的类型。
v = ObjectHelper.requireNonNull(zipper.apply(os.clone()), "The zipper returned a null value");

//发送数据
a.onNext(v);

zipper.apply 方法就是上面提到的将一个 Object[] 转化为一个 R 类型的数据。这个方法是由用户去实现的,只有用户才知道怎么实现,就想上面的举例中提到的,它将商品的基本信息和评价信息分别存放到 Object[] 数组中,之后通过 zipper.apply 转化为一个 Goods 对象。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容

  • 本篇文章介主要绍RxJava中操作符是以函数作为基本单位,与响应式编程作为结合使用的,对什么是操作、操作符都有哪些...
    嘎啦果安卓兽阅读 2,853评论 0 10
  • 前言 按照官方的分类,操作符大致分为以下几种: Creating Observables(Observable的创...
    小玉1991阅读 1,050评论 0 1
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 作者: maplejaw本篇只解析标准包中的操作符。对于扩展包,由于使用率较低,如有需求,请读者自行查阅文档。 创...
    maplejaw_阅读 45,653评论 8 93
  • 头一次用简书,让我想起了以前咬着笔一字一字码文的日子,虽然常常半途而废不写结局,虽然那些故事从来都是写给自己看...
    SPEC凪涼阅读 222评论 0 0