我是用来组合的操作符------Buffer

Buffer操作符功能介绍

如果本来是Observable<String>的一个对象,那么我只能在onNext中一个一个处理String,通过Buffer操作符,我们可以把nString组合在一起,然后在onNext中进行处理

用途

可以操作一些需要一起处理的数据。比如2个数据为一组,我始终要打印最大的数据,或者3个数据为一组,打印最大值。

来一段代码
 btSub.setOnClickListener({
            observable?.subscribe({ msg ->
                tvContent.text = tvContent.text.toString() + "\n" + msg.toString()
            })
        })
        observable = Observable.just("test1","test2","test3").buffer(2)

代码的作用就是每2个字符串 打印在一行文本中。

先来学习下just是什么鬼
 Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("test1")
                t.onNext("test2")
                t.onNext("test3")
            }
        })
....
 Observable.just("test1","test2","test3")

这2行代码的效果其实是一致的(当我们,深入源代码的时候,其实会发现有一些不同,但是我们现在先一个个学习操作符,然后再会过来看这些,其实会非常的简单)
这里我就不对just进行深入了,主要以操作符为主,如果感兴趣的可以自己看看。

看看源代码

Observable

...
public final Observable<List<T>> buffer(int count) {
        return buffer(count, count);
    }
...
 public final Observable<List<T>> buffer(int count, int skip) {
        return lift(new OperatorBufferWithSize<T>(count, skip));
    }
...
 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
...

这里我们看到了一个很牛逼的东西,也就是lift,在前言中,其实我也给大家推荐了一篇关于RxJava的文章,写的很经典,大家最好先看看。

既然提到了lift那么就不得不提到Operator接口了。前面其实我也已经介绍过,RxJava中最重要的4个类或接口,如果忘了,或者不太了解的,大家可以再去看下那
篇文章我是最简单的操作符-----Create

 public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }

public interface Func1<T, R> extends Function {
    R call(T t);
}

这段代码大家仔细一看就发现,其实Operator也是一个Func1嘛,只有一个call接口,用来把一个Subscriber转化成了另外一个Subscriber

既然已经了解了这Operator,那么我们继续看lift到底做了什么。一步步看代码。

前面我介绍了一种方法去阅读源代码,就是我们已经知道了它的功能,就带着他这个功能去阅读源码,这里也是适用的。

以我开头的demo代码为例子,其实就是把Subscriber<List<String>>转化为了Subscriber<String>

这里其实我自己理解起来都非常困难,为什么是把Subscriber<List<String>>转化为了Subscriber<String>而不是把Subscriber<String>转化为了Subscriber<List<String>>

先不解释,先直接看OperatorBufferWithSize的代码

OperatorBufferWithSize

...
public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
...
}
...

因为我们最习惯的思考逻辑就是从上到下,从左到右,因为一开始我们是Test1Test2Test3,然后呢组成,[Test1,Test2][Test3]这样子去处理,所以很正常的理解为把Subscriber<String>转化为了Subscriber<List<String>>。这也是为什么我觉得RxJava源码看起来比较累的原因。

前言中介绍的那篇经典的文章中就有一幅图,我就不引用了,大家可以自己去看一下,RxJava的运行顺序是先下再上,再从上到下。我们直接继续从源码中找寻答案把。

Observable

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }

OnSubscribeLift

...
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
            try {
                st.onStart();
                parent.call(st);
            }
...
    }

RxJavaHooks我就不过多介绍了,在前几篇文章中已经有所解释,所以以上代码简化为

...
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = operator.call(o);
            try {
                st.onStart();
                parent.call(st);
            }
...
    }

当我们点击demo中的按钮,就是直接先调用上面的call方法。这里的parent其实是Observable中传入的onSubscribe,其实也就是前面所提到的

Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("test1")
                t.onNext("test2")
                t.onNext("test3")
            }
        }

所以直接看到这里,其实大家应该可以看出来为什么是把Subscriber<List<String>>转化为Subscriber<String>了。大家再来重新看这段代码。

OnSubscribeLift

...
   public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = operator.call(o);
            try {
                st.onStart();
                parent.call(st);
            }
...

这里的参数Subscriber<? super R> o,就是我们在demo中subscribe订阅方法中传入的Subscriber。所以R其实就是List<String>,而在call方法中最终调用的是parent.call所传的参数是Subscriber<String>。调用的是下面这个方法

Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("test1")
                t.onNext("test2")
                t.onNext("test3")
            }
        }

所以我说,RxJava的执行方式呢,最后面的OnSubscribecall先执行,然后再调用前面的OnSubscribe,最前面的OnSubscribe开始调用onNext方法,再一步一步往下执行。

这里我们对整个方法的执行流程有了一个大致的了解,但是还没有开始去解析buffer操作符的真正实现的代码。

下面我们还是继续看

OperatorBufferWithSize

...
 public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
        if (skip == count) {
            BufferExact<T> parent = new BufferExact<T>(child, count);
            ...
            return parent;
        }
        if (skip > count) {
            BufferSkip<T> parent = new BufferSkip<T>(child, count, skip);
            ...
            return parent;
        }
        BufferOverlap<T> parent = new BufferOverlap<T>(child, count, skip);
        ...
        return parent;
    }
...

这里非常明显的分为了3中情况,skip代表是忽略几个元素,count代表几个元素为1组。什么意思呢?
我们这里直接举个例子。
Observable.just("Test1","Test2","Test3","Test4","Test5")
元素多一点,大家好理解,那么我skip=2,count=2,那么就会被分为3组,"[Test1,Test2]","[Test3,Test4]","[Test5]"。如果skip=3,count=2,那么就会被分为2组,"[Test1,Test2]","[Test4,Test5]"。

大家自己尝试下,我就不举例了,我就想说明,这里分了3种情况做了不同处理,其实没什么关系,我们只需看明白一种就好了,其他2种也是大同小异的。我们的demo中
observable = Observable.just("test1", "test2", "test3").buffer(2)只传了一个2,就是skip==2&&count==2

所以我们直接看

...
  if (skip == count) {
            BufferExact<T> parent = new BufferExact<T>(child, count);
            ...
            return parent;
        }
...
static final class BufferExact<T> extends Subscriber<T> {
        final Subscriber<? super List<T>> actual;
        final int count;

        List<T> buffer;

        public BufferExact(Subscriber<? super List<T>> actual, int count) {
            this.actual = actual;
            this.count = count;
            this.request(0L);
        }

        @Override
        public void onNext(T t) {
            List<T> b = buffer;
            if (b == null) {
                b = new ArrayList<T>(count);
                buffer = b;
            }

            b.add(t);

            if (b.size() == count) {
                buffer = null;
                actual.onNext(b);
            }
        }

...

前面介绍了Operator就是为了把 Subscriber<List<String>>转为 Subscriber<String>,而这里的BufferExact就是这个Subscriber<String>
所以我们最主要的还是看onNext的实现。

public void onNext(T t) {
            List<T> b = buffer;
            if (b == null) {
                b = new ArrayList<T>(count);
                buffer = b;
            }

            b.add(t);

            if (b.size() == count) {
                buffer = null;
                actual.onNext(b);
            }
        }

实现非常的简单,先把onNext传递过来的数据先保存在 List中,等凑够了count数量后,在调用后面的Subscriber<List<String>>

所以总的来说buffer操作符的实现是简单的,但是由于涉及到了liftOperator所以整体看起来还是比较复杂的。大家可以自己琢磨。

附加

前面直接说

 Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("test1")
                t.onNext("test2")
                t.onNext("test3")
            }
        })
Observable.just("test1", "test2", "test3")

这2个是差不多的。其实我们可以再深入下。

Observable

...
public static <T> Observable<T> just(T t1, T t2, T t3) {
        return from((T[])new Object[] { t1, t2, t3 });
    }
...
public static <T> Observable<T> from(T[] array) {
        int n = array.length;
        if (n == 0) {
            return empty();
        } else
        if (n == 1) {
            return just(array[0]);
        }
        return unsafeCreate(new OnSubscribeFromArray<T>(array));
    }
...

其实就是创建了一个OnSubscribeFromArray,也就是一个OnSubscribe。继续深入

OnSubscribeFromArray

...
public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }
..
static final class FromArrayProducer<T>
    extends AtomicLong
    implements Producer {
     ...
        @Override
        public void request(long n) {
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }
...

如果是OnSubscribe其实我们主要就是看一个call方法,这里是直接child.setProducer,前面也介绍过,setProducer其实无法是直接调用Producerrequest的方法。最后我们发现其实主要就看2个方法fastPathslowPath

...
 void fastPath() {
            final Subscriber<? super T> child = this.child;

            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }

                child.onNext(t);
            }

            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }
...
void slowPath(long r) {
            final Subscriber<? super T> child = this.child;
            final T[] array = this.array;
            final int n = array.length;

            long e = 0L;
            int i = index;

            for (;;) {

                while (r != 0L && i != n) {
                    if (child.isUnsubscribed()) {
                        return;
                    }

                    child.onNext(array[i]);

                    i++;

                    if (i == n) {
                        if (!child.isUnsubscribed()) {
                            child.onCompleted();
                        }
                        return;
                    }

                    r--;
                    e--;
                }

                r = get() + e;

                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }
            }
        }
...

其实直接看fastPath会更加清爽一点,就是遍历数组,然后挨个调用onNext,那么数组是什么呢,就是前面传入的"test1", "test2", "test3"

至于slowPath呢其实关键代码也是onNext。可以先不深入去理解它具体的意思,其实看名字就可以看出来,一个是快速处理,一个是慢慢处理。

总结

整体下来我也已经尽量涉及到了每行代码,但是中间的跳转确实会让很多人发懵,如果觉得哪里讲的还不够清楚,可以指出,我可以修正。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容