先来看看官方注释:
翻译一下注释:
///缓冲从上游发布者接收的元素。
///使用' ' Publisher/buffer(size:prefetch:whenFull:) ' '从上游发布者收集特定数量的元素,
///然后根据您指定的' ' Publishers/BufferingStrategy ' '和' ' Publishers/PrefetchStrategy ' '
///策略将它们重新发布到下游订阅者。
///如果发布者在达到' size '阈值之前完成,它会缓冲元素并在完成之前将它们发布到下游。
/// -参数
/// - size:要存储的最大元素数。
/// - prefetch:初始填充缓冲区的策略。
/// - whenFull:当缓冲区满时采取的动作。
/// -返回:缓冲从上游发布者接收的元素的发布者。
字面意思就是根据策略缓冲上游发布者发送的数据,并重新发布给下游,也就是一个背压的作用。举个例子,就像你拿个杯子边接水边用吸管喝,水龙头(上游发布者)一直不停放水,用杯子(buffer)接住,然后你用吸管(下游订阅者)喝水,你喝水得停下来咽吧(消费赶不上发布),所以这个杯子就起到缓冲的作用。
size很好理解,prefetch和whenFull好像不是字面意思那么简单,先分别看看两个参数的枚举
prefetch参数可设置的枚举有两个:
.keepFull:
在订阅时填充缓冲区,并在订阅后保持缓冲区满。
该策略首先在订阅者第一次连接时从上游发出等于缓冲区大小的请求。之后,它继续从上游请求元素,以尽量保持缓冲区满。
.byRequest:
避免预取,而是按需执行请求的策略。
该策略只是将下游的请求转发给上游的发布者。
whenFull的参数可设置的枚举有三个:
. dropNewest:
当缓冲区满时,丢弃新接收到的元素。
. dropOldest
当缓冲区满时,丢弃缓冲区中最老的元素。
. customError(暂时不讨论这个)
当缓冲区已满时,执行闭包以提供一个自定义错误。
whenFull好理解,字面意思,prefetch就有点搞不懂了,特别是byRequest策略,而且在实际使用过程中,whenFull策略在不同的prefetch策略中也有不同的表现。
于是我写了个demo来测试:
测试思路,通过一个上游发布者按照一定数量持续发送值,然后通过buffer进行缓存,下游使用自定义的订阅者来控制接收需求,比如上游发送10个值,下游请求5个值同时打印输出,来模拟观察buffer的不同策略的表现。
为了方便测试,写了个UI界面,如下:
定义一个变量,用来保存每次上游发送值的递增的结果
var index = 1;
发布者:
let customSubscriber = CustomSubscriber()
然后是自定义的订阅者:
class CustomSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
var subscription: Subscription?
func receive(subscription: Subscription) {
self.subscription = subscription
}
func receive(_ input: Int) -> Subscribers.Demand {
print(input)
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("---completion")
}
}
在接受订阅成功的方法里只是持有了subscription,并没有向上游请求数据,以模拟下游阻塞的情况,接收到值也是简单打印出来。
发送按钮的点击响应
@IBAction func sendOnClick(_ sender: Any) {
let sendCount: Int = Int(sendCountTF.text!) ?? 0
for i in index ..< index + sendCount {
publisher.send(i)
}
index += sendCount
}
这里的作用是读取输入框中的数字为sendCount,sendCount代表需要发送数据的个数,然后从index开始循环发送sendCount数量的整型数据,最后设置下一次发送数据的起点
接收按钮的点击响应:
@IBAction func receiveOnClick(_ sender: Any) {
let receiveCount: Int = Int(receiveCountTF.text!) ?? 0
customSubscriber.subscription?.request(.max(receiveCount))
}
这里的作用是利用subscription告诉上游(buffer)需要接收多少个数量的数据
开始测试:
通过buffer将发布者和订阅者连接起来,这里设置缓存大小为5,策略分别为byRequest和dropOldest:
override func viewDidLoad() {
super.viewDidLoad()
publisher.buffer(size: 5, prefetch: .byRequest, whenFull: . dropOldest)
.subscribe(customSubscriber)
}
设置发送个数为10,接收个数为3
步骤1:先点击发送,这个时候发布者会发送 1、2、3......8、9、10这10个整型数据,缓存大小为5,whenFull设置的是dropOldest,那么缓存区留下来的就是6、7、8、9、10这十个数据,
步骤2:然后点击接收,这时自定的订阅者会向buffer发布者请求3个数据,预期打印结果应该是6、7、8这三个数,实际与预期相符:
接着再点击发送,发布者继续发送10个数据分别是11、12、13......18、19、20,然后再点击接收,请尝试猜测一下打印结果。
嗯,跟预期差不多,buffer会继续填充缓冲区,满了之后会丢弃掉最早的数据。
我们换个策略,将dropOldest换做dropNewest,并重复上面的步骤
override func viewDidLoad() {
super.viewDidLoad()
publisher.buffer(size: 5, prefetch: .byRequest, whenFull: .dropNewest)
.subscribe(customSubscriber)
}
打印结果
跟预期结果一样,由于策略改为缓存满时丢弃最新的数据,第一次操作缓存区的内容为1、2、3、4、5,6到10的数据被丢弃了,打印的结果为1、2、3,缓冲区剩下4、5两个数,第二次操作由于缓冲区只剩3个位置,所以只缓存了11、12、13这3个数,缓冲区缓存的数据为4、5、11、12、13,其他的都丢弃了,最后订阅者请求三个数的时候只有4、5、11。
接下俩将byRequest改成keepFull,whenFull改为dropOldest
override func viewDidLoad() {
super.viewDidLoad()
publisher.buffer(size: 5, prefetch: .keepFull, whenFull: .dropOldest)
.subscribe(customSubscriber)
}
重复上面的操作:
进行步骤1的时候我发现打印的结果跟我预期的不一样:
因为设置的whenFull是dropOldest,我的预期是打印8、9、10,实际是打印了1、2、3
然后观察步骤2的打印:
步骤2打印了4、5、11,根据这个结果可以推测出,步骤1时,发布者发送的10个数据只有前面的5个被缓存了下来,后续的被丢弃了,下游发布者请求了3个数据,最后缓冲区剩下4、5两个数据,缓冲区留有3个空位,进行步骤2时只有11、12、13前3个数据缓存了下来,此时缓冲区的数据为4、5、11、12、13,下游请求3个数据时打印就为4、5、11
把dropOldest改为dropNewest再试试
override func viewDidLoad() {
super.viewDidLoad()
publisher.buffer(size: 5, prefetch: .keepFull, whenFull: .dropNewest)
.subscribe(customSubscriber)
}
直接看打印结果:
我发现得出的结论跟设置为dropOldest的时候一样,当prefetch设置为keepFull时,whenFull并不影响任何结果
结论:
refetch代表预取值的策略,字面意思是预取,我理解为对上游的需求
当设置为byRequest时,buffer对上游发布者的需求为unlimited,即对上游需求不收限制,上游发布的任何值都会发送给buffer,buffer根据whenFull策略决定缓冲区满时丢弃旧数据还是丢弃新数据
当设置为keepFull时,buffer在连接到上游发布者的同时,会告诉上游发布者自己的需求为size大小,那么上游只会发送size数量的数据给buffer,之后的数据不再发送,所以whenFull设置为何值都不影响结果。
以上结论是根据实现得出来的,如有不符,望指出。