Buffer操作符功能介绍
如果本来是Observable<String>
的一个对象,那么我只能在onNext
中一个一个处理String
,通过Buffer
操作符,我们可以把n
个String
组合在一起,然后在onNext
中进行处理
用途
可以操作一些需要一起处理的数据。比如2个数据为一组,我始终要打印最大的数据,或者3个数据为一组,打印最大值。
来一段代码
btSub.setOnClickListener({
observable?.subscribe({ msg ->
tvContent.text = tvContent.text.toString() + "\n" + msg.toString()
})
})
observable = Observable.just("test1","test2","test3").buffer(2)
代码的作用就是每2个字符串 打印在一行文本中。
先来学习下just
是什么鬼
Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("test1")
t.onNext("test2")
t.onNext("test3")
}
})
....
Observable.just("test1","test2","test3")
这2行代码的效果其实是一致的(当我们,深入源代码的时候,其实会发现有一些不同,但是我们现在先一个个学习操作符,然后再会过来看这些,其实会非常的简单)
这里我就不对just
进行深入了,主要以操作符为主,如果感兴趣的可以自己看看。
看看源代码
Observable
...
public final Observable<List<T>> buffer(int count) {
return buffer(count, count);
}
...
public final Observable<List<T>> buffer(int count, int skip) {
return lift(new OperatorBufferWithSize<T>(count, skip));
}
...
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
...
这里我们看到了一个很牛逼的东西,也就是lift
,在前言中,其实我也给大家推荐了一篇关于RxJava
的文章,写的很经典,大家最好先看看。
既然提到了lift
那么就不得不提到Operator
接口了。前面其实我也已经介绍过,RxJava
中最重要的4个类或接口,如果忘了,或者不太了解的,大家可以再去看下那
篇文章我是最简单的操作符-----Create
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
public interface Func1<T, R> extends Function {
R call(T t);
}
这段代码大家仔细一看就发现,其实Operator
也是一个Func1
嘛,只有一个call
接口,用来把一个Subscriber
转化成了另外一个Subscriber
。
既然已经了解了这Operator
,那么我们继续看lift
到底做了什么。一步步看代码。
前面我介绍了一种方法去阅读源代码,就是我们已经知道了它的功能,就带着他这个功能去阅读源码,这里也是适用的。
以我开头的demo代码为例子,其实就是把Subscriber<List<String>>
转化为了Subscriber<String>
这里其实我自己理解起来都非常困难,为什么是把Subscriber<List<String>>
转化为了Subscriber<String>
而不是把Subscriber<String>
转化为了Subscriber<List<String>>
先不解释,先直接看OperatorBufferWithSize
的代码
OperatorBufferWithSize
...
public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
...
}
...
因为我们最习惯的思考逻辑就是从上到下,从左到右,因为一开始我们是Test1
,Test2
,Test3
,然后呢组成,[Test1,Test2]
,[Test3]
这样子去处理,所以很正常的理解为把Subscriber<String>
转化为了Subscriber<List<String>>
。这也是为什么我觉得RxJava源码看起来比较累的原因。
在前言中介绍的那篇经典的文章中就有一幅图,我就不引用了,大家可以自己去看一下,RxJava的运行顺序是先下再上,再从上到下。我们直接继续从源码中找寻答案把。
Observable
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
OnSubscribeLift
...
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
st.onStart();
parent.call(st);
}
...
}
RxJavaHooks
我就不过多介绍了,在前几篇文章中已经有所解释,所以以上代码简化为
...
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = operator.call(o);
try {
st.onStart();
parent.call(st);
}
...
}
当我们点击demo中的按钮,就是直接先调用上面的call
方法。这里的parent
其实是Observable
中传入的onSubscribe
,其实也就是前面所提到的
Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("test1")
t.onNext("test2")
t.onNext("test3")
}
}
所以直接看到这里,其实大家应该可以看出来为什么是把Subscriber<List<String>>
转化为Subscriber<String>
了。大家再来重新看这段代码。
OnSubscribeLift
...
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = operator.call(o);
try {
st.onStart();
parent.call(st);
}
...
这里的参数Subscriber<? super R> o
,就是我们在demo中subscribe
订阅方法中传入的Subscriber
。所以R
其实就是List<String>
,而在call
方法中最终调用的是parent.call
所传的参数是Subscriber<String>
。调用的是下面这个方法
Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("test1")
t.onNext("test2")
t.onNext("test3")
}
}
所以我说,
RxJava
的执行方式呢,最后面的OnSubscribe
的call
先执行,然后再调用前面的OnSubscribe
,最前面的OnSubscribe
开始调用onNext
方法,再一步一步往下执行。
这里我们对整个方法的执行流程有了一个大致的了解,但是还没有开始去解析buffer
操作符的真正实现的代码。
下面我们还是继续看
OperatorBufferWithSize
...
public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
if (skip == count) {
BufferExact<T> parent = new BufferExact<T>(child, count);
...
return parent;
}
if (skip > count) {
BufferSkip<T> parent = new BufferSkip<T>(child, count, skip);
...
return parent;
}
BufferOverlap<T> parent = new BufferOverlap<T>(child, count, skip);
...
return parent;
}
...
这里非常明显的分为了3中情况,skip
代表是忽略几个元素,count
代表几个元素为1组。什么意思呢?
我们这里直接举个例子。
Observable.just("Test1","Test2","Test3","Test4","Test5")
元素多一点,大家好理解,那么我skip=2,count=2,那么就会被分为3组,"[Test1,Test2]","[Test3,Test4]","[Test5]"。如果skip=3,count=2,那么就会被分为2组,"[Test1,Test2]","[Test4,Test5]"。
大家自己尝试下,我就不举例了,我就想说明,这里分了3种情况做了不同处理,其实没什么关系,我们只需看明白一种就好了,其他2种也是大同小异的。我们的demo中
observable = Observable.just("test1", "test2", "test3").buffer(2)
只传了一个2
,就是skip==2&&count==2
。
所以我们直接看
...
if (skip == count) {
BufferExact<T> parent = new BufferExact<T>(child, count);
...
return parent;
}
...
static final class BufferExact<T> extends Subscriber<T> {
final Subscriber<? super List<T>> actual;
final int count;
List<T> buffer;
public BufferExact(Subscriber<? super List<T>> actual, int count) {
this.actual = actual;
this.count = count;
this.request(0L);
}
@Override
public void onNext(T t) {
List<T> b = buffer;
if (b == null) {
b = new ArrayList<T>(count);
buffer = b;
}
b.add(t);
if (b.size() == count) {
buffer = null;
actual.onNext(b);
}
}
...
前面介绍了Operator
就是为了把 Subscriber<List<String>>
转为 Subscriber<String>
,而这里的BufferExact
就是这个Subscriber<String>
。
所以我们最主要的还是看onNext
的实现。
public void onNext(T t) {
List<T> b = buffer;
if (b == null) {
b = new ArrayList<T>(count);
buffer = b;
}
b.add(t);
if (b.size() == count) {
buffer = null;
actual.onNext(b);
}
}
实现非常的简单,先把onNext传递过来的数据先保存在 List
中,等凑够了count
数量后,在调用后面的Subscriber<List<String>>
。
所以总的来说
buffer
操作符的实现是简单的,但是由于涉及到了lift
和Operator
所以整体看起来还是比较复杂的。大家可以自己琢磨。
附加
前面直接说
Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("test1")
t.onNext("test2")
t.onNext("test3")
}
})
Observable.just("test1", "test2", "test3")
这2个是差不多的。其实我们可以再深入下。
Observable
...
public static <T> Observable<T> just(T t1, T t2, T t3) {
return from((T[])new Object[] { t1, t2, t3 });
}
...
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}
return unsafeCreate(new OnSubscribeFromArray<T>(array));
}
...
其实就是创建了一个OnSubscribeFromArray
,也就是一个OnSubscribe
。继续深入
OnSubscribeFromArray
...
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array));
}
..
static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {
...
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}
...
如果是OnSubscribe
其实我们主要就是看一个call
方法,这里是直接child.setProducer
,前面也介绍过,setProducer
其实无法是直接调用Producer
的request
的方法。最后我们发现其实主要就看2个方法fastPath
和slowPath
。
...
void fastPath() {
final Subscriber<? super T> child = this.child;
for (T t : array) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(t);
}
if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}
...
void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;
long e = 0L;
int i = index;
for (;;) {
while (r != 0L && i != n) {
if (child.isUnsubscribed()) {
return;
}
child.onNext(array[i]);
i++;
if (i == n) {
if (!child.isUnsubscribed()) {
child.onCompleted();
}
return;
}
r--;
e--;
}
r = get() + e;
if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
...
其实直接看fastPath
会更加清爽一点,就是遍历数组,然后挨个调用onNext
,那么数组是什么呢,就是前面传入的"test1", "test2", "test3"
。
至于slowPath
呢其实关键代码也是onNext
。可以先不深入去理解它具体的意思,其实看名字就可以看出来,一个是快速处理,一个是慢慢处理。
总结
整体下来我也已经尽量涉及到了每行代码,但是中间的跳转确实会让很多人发懵,如果觉得哪里讲的还不够清楚,可以指出,我可以修正。