计算机就跟比基尼一样,省去了人们许多的胡思乱想。
buffer(Publisher<?> other)
buffer(Publisher<?> other, Supplier<C> bufferSupplier)
根据other
信号来决定每次缓存的个数, 当other.onNext
的时候把缓存提交到消费者的onNext
public final Flux<List<T>> buffer(Publisher<?> other)
public final <C extends Collection<? super T>> Flux<C> buffer(Publisher<?> other, Supplier<C> bufferSupplier)
public void test5() throws InterruptedException {
Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6).delayElements(Duration.ofSeconds(2));
Flux<List<Integer>> buffer = flux.buffer(Duration.ofSeconds(3));
buffer.subscribe(integers -> {
System.out.println("产品到达,进行卸货:");
integers.stream().forEach(integer -> {
System.out.println(integer);
});
});
Thread.sleep(20000);
}
buffer(Duration bufferingTimespan)
buffer(Duration bufferingTimespan, Scheduler timer)
这两个方法是上面两个方法的应用, 传入的other
是FluxInterval
, 周期性触发onNext
示意图说明的很清楚.