| 类别 | 方法 | 说明 | 示例 |
|---|---|---|---|
| 创建方法 | just(T... data) |
创建一个发出指定数据项的 Flux。 |
Flux.just(1, 2, 3) |
fromIterable(Iterable<? extends T> iterable) |
创建一个从 Iterable 中发出数据的 Flux。 |
Flux.fromIterable(Arrays.asList(1, 2, 3)) |
|
fromArray(T... data) |
创建一个从数组中发出数据的 Flux。 |
Flux.fromArray(new Integer[]{1, 2, 3}) |
|
range(int start, int count) |
创建一个发出从 start 开始的连续整数流的 Flux。 |
Flux.range(1, 5) |
|
interval(Duration period) |
创建一个每隔指定时间发出一个递增的长整型数字流的 Flux。 |
Flux.interval(Duration.ofSeconds(1)) |
|
empty() |
创建一个没有数据的 Flux,但会正常完成。 |
Flux.empty() |
|
error(Throwable throwable) |
创建一个只发出错误信号的 Flux。 |
Flux.error(new RuntimeException("Something went wrong")) |
|
never() |
创建一个不发出任何数据也不完成的 Flux。 |
Flux.never() |
|
| 转换操作符 | map(Function<? super T, ? extends R> mapper) |
对 Flux 中的每个元素应用转换操作。 |
flux.map(x -> x * 2) |
flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) |
将每个元素映射为另一个 Publisher,并将这些元素的结果合并成一个 Flux。 |
flux.flatMap(x -> Flux.just(x * 2)) |
|
filter(Predicate<? super T> predicate) |
根据条件过滤 Flux 中的元素。 |
flux.filter(x -> x % 2 == 0) |
|
take(long n) |
获取前 n 个元素。 |
flux.take(3) |
|
skip(long n) |
跳过前 n 个元素。 |
flux.skip(3) |
|
distinct() |
去重操作,发出不重复的元素。 | flux.distinct() |
|
concatWith(Publisher<? extends T> other) |
将当前 Flux 与另一个 Publisher 按顺序拼接。 |
flux.concatWith(Flux.just(6, 7)) |
|
mergeWith(Publisher<? extends T> other) |
将当前 Flux 与另一个 Publisher 合并,异步执行。 |
flux.mergeWith(Flux.just(6, 7)) |
|
| 错误处理方法 | onErrorReturn(T fallbackValue) |
在发生错误时返回一个默认值。 | flux.onErrorReturn(0) |
onErrorResume(Function<? super Throwable, ? extends Publisher<? extends T>> fallback) |
错误发生时提供一个替代的 Publisher。 |
flux.onErrorResume(e -> Flux.just(-1)) |
|
onErrorMap(Function<? super Throwable, ? extends Throwable> mapper) |
错误发生时将错误映射为新的错误。 | flux.onErrorMap(e -> new CustomException()) |
|
| 副作用处理方法 | doOnNext(Consumer<? super T> onNext) |
在每次发出元素时执行某些操作。 | flux.doOnNext(x -> System.out.println(x)) |
doOnComplete(Runnable onComplete) |
在 Flux 完成时执行某些操作。 |
flux.doOnComplete(() -> System.out.println("Completed")) |
|
doOnError(Consumer<? super Throwable> onError) |
在发生错误时执行某些操作。 | flux.doOnError(e -> System.out.println(e.getMessage())) |
|
doOnTerminate(Runnable onTerminate) |
在 Flux 完成或发生错误时执行操作。 |
flux.doOnTerminate(() -> System.out.println("Terminated")) |
|
doOnCancel(Runnable onCancel) |
在订阅者取消订阅时执行操作。 | flux.doOnCancel(() -> System.out.println("Canceled")) |
|
doOnDiscard(Class<T> clazz, Consumer<? super T> onDiscard) |
在丢弃元素时执行操作。 | flux.doOnDiscard(Integer.class, x -> System.out.println("Discarded")) |
|
| 组合操作符 | zipWith(Publisher<? extends T> other, BiFunction<? super T, ? super T, ? extends R> zipper) |
将两个 Flux 合并为一个 Flux。 |
flux1.zipWith(flux2, (a, b) -> a + b) |
mergeWith(Publisher<? extends T> other) |
异步合并多个 Flux。 |
flux.mergeWith(Flux.just(4, 5)) |
|
concatWith(Publisher<? extends T> other) |
按顺序合并多个 Flux。 |
flux.concatWith(Flux.just(6, 7)) |
|
| 条件方法 | takeUntil(Predicate<? super T> predicate) |
当满足条件时停止发出数据。 | flux.takeUntil(x -> x > 5) |
skipUntil(Predicate<? super T> predicate) |
跳过直到满足条件的数据。 | flux.skipUntil(x -> x > 5) |
|
timeout(Duration timeout) |
超时处理,如果超时就发送错误。 | flux.timeout(Duration.ofSeconds(5)) |
|
| 背压控制 | onBackpressureBuffer() |
在背压情况下,缓冲数据。 | flux.onBackpressureBuffer() |
onBackpressureDrop() |
在背压情况下丢弃数据。 | flux.onBackpressureDrop() |
|
onBackpressureLatest() |
在背压情况下仅保留最新数据。 | flux.onBackpressureLatest() |
|
| 其他方法 | repeat(long times) |
重复发射 Flux 中的元素指定次数。 |
flux.repeat(3) |
repeatWhen(Function<? super Flux<Object>, ? extends Publisher<?>> when) |
根据条件重复 Flux。 |
flux.repeatWhen(flux -> flux.take(5)) |
Flux常用方法
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- Spring5现处在第四个预发布版,正式版将要发布了,它带来的一大特性就是响应式框架Spring WebFlux。...
- 看了许多讲解RxJava的文章,有些文章讲解的内容是基于第一个版本的,有些文章的讲解是通过比较常用的一些API和基...
- Java异步编程实战 chap1 认识异步编程 异步编程概念与作用在使用同步编程方式时,由于每个线程同时只能发起一...