Flux常用方法

类别 方法 说明 示例
创建方法 just(T... data) 创建一个发出指定数据项的 Flux Flux.just(1, 2, 3)
fromIterable(Iterable<? extends T> iterable) 创建一个从 Iterable 中发出数据的 Flux Flux.fromIterable(Arrays.asList(1, 2, 3))
fromArray(T... data) 创建一个从数组中发出数据的 Flux Flux.fromArray(new Integer[]{1, 2, 3})
range(int start, int count) 创建一个发出从 start 开始的连续整数流的 Flux Flux.range(1, 5)
interval(Duration period) 创建一个每隔指定时间发出一个递增的长整型数字流的 Flux Flux.interval(Duration.ofSeconds(1))
empty() 创建一个没有数据的 Flux,但会正常完成。 Flux.empty()
error(Throwable throwable) 创建一个只发出错误信号的 Flux Flux.error(new RuntimeException("Something went wrong"))
never() 创建一个不发出任何数据也不完成的 Flux Flux.never()
转换操作符 map(Function<? super T, ? extends R> mapper) Flux 中的每个元素应用转换操作。 flux.map(x -> x * 2)
flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) 将每个元素映射为另一个 Publisher,并将这些元素的结果合并成一个 Flux flux.flatMap(x -> Flux.just(x * 2))
filter(Predicate<? super T> predicate) 根据条件过滤 Flux 中的元素。 flux.filter(x -> x % 2 == 0)
take(long n) 获取前 n 个元素。 flux.take(3)
skip(long n) 跳过前 n 个元素。 flux.skip(3)
distinct() 去重操作,发出不重复的元素。 flux.distinct()
concatWith(Publisher<? extends T> other) 将当前 Flux 与另一个 Publisher 按顺序拼接。 flux.concatWith(Flux.just(6, 7))
mergeWith(Publisher<? extends T> other) 将当前 Flux 与另一个 Publisher 合并,异步执行。 flux.mergeWith(Flux.just(6, 7))
错误处理方法 onErrorReturn(T fallbackValue) 在发生错误时返回一个默认值。 flux.onErrorReturn(0)
onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) 错误发生时提供一个替代的 Publisher flux.onErrorResume(e -> Flux.just(-1))
onErrorMap(Function<? super Throwable, ? extends Throwable> mapper) 错误发生时将错误映射为新的错误。 flux.onErrorMap(e -> new CustomException())
副作用处理方法 doOnNext(Consumer<? super T> onNext) 在每次发出元素时执行某些操作。 flux.doOnNext(x -> System.out.println(x))
doOnComplete(Runnable onComplete) Flux 完成时执行某些操作。 flux.doOnComplete(() -> System.out.println("Completed"))
doOnError(Consumer<? super Throwable> onError) 在发生错误时执行某些操作。 flux.doOnError(e -> System.out.println(e.getMessage()))
doOnTerminate(Runnable onTerminate) Flux 完成或发生错误时执行操作。 flux.doOnTerminate(() -> System.out.println("Terminated"))
doOnCancel(Runnable onCancel) 在订阅者取消订阅时执行操作。 flux.doOnCancel(() -> System.out.println("Canceled"))
doOnDiscard(Class<T> clazz, Consumer<? super T> onDiscard) 在丢弃元素时执行操作。 flux.doOnDiscard(Integer.class, x -> System.out.println("Discarded"))
组合操作符 zipWith(Publisher<? extends T> other, BiFunction<? super T, ? super T, ? extends R> zipper) 将两个 Flux 合并为一个 Flux flux1.zipWith(flux2, (a, b) -> a + b)
mergeWith(Publisher<? extends T> other) 异步合并多个 Flux flux.mergeWith(Flux.just(4, 5))
concatWith(Publisher<? extends T> other) 按顺序合并多个 Flux flux.concatWith(Flux.just(6, 7))
条件方法 takeUntil(Predicate<? super T> predicate) 当满足条件时停止发出数据。 flux.takeUntil(x -> x > 5)
skipUntil(Predicate<? super T> predicate) 跳过直到满足条件的数据。 flux.skipUntil(x -> x > 5)
timeout(Duration timeout) 超时处理,如果超时就发送错误。 flux.timeout(Duration.ofSeconds(5))
背压控制 onBackpressureBuffer() 在背压情况下,缓冲数据。 flux.onBackpressureBuffer()
onBackpressureDrop() 在背压情况下丢弃数据。 flux.onBackpressureDrop()
onBackpressureLatest() 在背压情况下仅保留最新数据。 flux.onBackpressureLatest()
其他方法 repeat(long times) 重复发射 Flux 中的元素指定次数。 flux.repeat(3)
repeatWhen(Function<? super Flux<Object>, ? extends Publisher<?>> when) 根据条件重复 Flux flux.repeatWhen(flux -> flux.take(5))
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容