Rxjava2 操作符汇总
构造
create/generate ?
just/empty/error/never/range(Long)
from(Array/Callable/Future/Iterable/Publisher)
startWith/repeat/cache/defaultIfEmpty 增加默认值 开头/重复/缓存/返回默认值
加时
- defer/timer/interval(Range) 延迟执行
- delay/timeout/timestamp 延时、超时、时间戳
异常
- onError(ResumeNext/Return/ReturnItem) 错误处理
- onExceptionResumeNext 放行error、throwable
- retry(Until/When)
并发与并行
- parallel、serialize 并行
- observeOn、subscribeOn 指定线程
获取结果
blocking[First(T)/Last(T)/ForEach/Iterable(int)/Latest/MostRecent(T)/Single/Subscribe] 获取结果
blockingNext ?
forEach(While) 直接消费结果
subscribe 直接消费结果
限流
distinct(UntilChanged)去重
filter 按条件过滤
ignoreElements 忽略所有
limit 限制个数
(take/skip)(Last/Until/While)
elementAt(OrError)
(first/last/single)(Element/OrError)
debounce 过滤,防止抖动,如果指定时间间接内没有其他数据,则正常。
sample 抽样
throttle(First/Last/WithtimeOut) ?
window 窗口
onBackpressureBuffer/Drop/Latest 背压
合并源
amb(Array) with 多个流取最快的一个
concat(Array)(Eager/DelayError) with 多个流顺序合并
switchOnNext(DelayError) 不断切换到最近开始的流
merge(Array)(DelayError) with 多个流程乱序合并
combineLastest(DelayError) 多个流流速不同时,流速快的与流速慢的最新一个合并。 A1 B1 C2….
withLatestFrom
zip(Array/Iterable) with 多个流一一对应,合并处理,最先结束为截止点 A1 B2 C3 …
join ?
转换
buffer(count,skip)、(timespan,timeskip, unit)分批
map/cast 转换
flatMap/concatMap(Eager)(DelayError) 流转换, 乱序、顺序
switchMap(DelayError)?
all/any/contains 条件判断
isEmpty/sequenceEqual 两个流一一比较是否相同,可以先后顺序不同?
collect/reduce/scan 归并
groupBy 分组
sorted 排序
toList/Map/Multimap/SortedList 集合
日志
- doOnSubscribe/Request/Next/Complete/Error/Cancel
- doOnTerminate = complete/error
- doFinally = terminate/cancel
- doOnLifecycle = subscribe/request/cancel
- doOnEach = all
- doAfterNext/AfterTerminate
新增操作
- compose 组合多个操作
- lift 创建新的操作
实例
maven依赖如下:
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
</dependency>
</dependencies>
代码示例如下:
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.StopWatch;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.IntStream;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import static java.util.stream.Collectors.joining;
@Slf4j
public class RxjavaTest {
static void sleep(long delayed, TimeUnit unit){
try {
unit.sleep(delayed);
} catch (InterruptedException e) {
}
}
String findById(Long id){
log.info("called findById {}", id);
sleep(200, TimeUnit.MILLISECONDS);
return id + "-name";
}
String findById(String id){
log.info("called findById {}", id);
sleep(200, TimeUnit.MILLISECONDS);
return id + "-name";
}
String findByIdWithError(Long id){
String result = findById(id);
if(id%3==0){
throw new RuntimeException("id.not.exist");
}
return result;
}
static void trace(Runnable runnable){
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try {
runnable.run();
}finally {
stopWatch.stop();
System.out.printf("trace = %s", stopWatch.prettyPrint());
}
}
/**
* 各种简单创建接口
* @throws Exception
*/
@Test
public void test_create() throws Exception{
//列举
Observable.just(1,2).blockingSubscribe(System.out::println);
//空
Observable.empty().blockingSubscribe(System.out::println);
//错误
Observable.error(new RuntimeException("error")).onErrorReturnItem("default").blockingSubscribe(System.out::println);
//范围
Observable.rangeLong(1, 10).blockingSubscribe(System.out::println);
//列表
Observable.fromIterable(Arrays.asList(1,2)).blockingSubscribe(System.out::println);
//数组
Observable.fromArray(new Integer[]{1,2}).blockingSubscribe(System.out::println);
//重复
Observable.just(1,2).repeat(3).blockingSubscribe(System.out::println);
}
/**
* 将分页接口包装成流
*
* @throws Exception
*/
@Test
public void test_page_observable() throws Exception{
Observable<String> observable = Observable.<String>create(emitter -> {
try {
int pageNo = 1;
while (pageNo < 5) {
int current = pageNo;
log.info("start read pageNo={}", pageNo);
IntStream.range(0, 10).mapToObj(i -> "sku" + String.valueOf((current-1)*10000+i)).forEach(emitter::onNext);
sleep(10, TimeUnit.MILLISECONDS);
log.info("end read pageNo={}", pageNo);
++pageNo;
}
emitter.onComplete();
} catch (Exception e) {
log.error("{}", e);
} finally {
emitter.onComplete();
}
});
observable
.blockingSubscribe(System.out::println);
TimeUnit.SECONDS.sleep(1);
log.info("将读取放在单独的线程中执行:");
observable
.subscribeOn(Schedulers.io())
.blockingSubscribe(System.out::println);
TimeUnit.SECONDS.sleep(1);
log.info("将读取放在单独的线程中, 并且分批执行:");
observable
.subscribeOn(Schedulers.io())
.buffer(10) //分批处理
.blockingSubscribe(System.out::println);
}
/**
08:50:15.564 [main] INFO top.zhacker.rxjava.RxjavaTest - called findById 1
08:50:15.854 [main] INFO top.zhacker.rxjava.RxjavaTest - subscribe observable
08:50:15.914 [main] INFO top.zhacker.rxjava.RxjavaTest - subscribe observable
08:50:15.914 [main] INFO top.zhacker.rxjava.RxjavaTest - called findById 1
08:50:16.115 [main] INFO top.zhacker.rxjava.RxjavaTest - subscribe observable
08:50:16.116 [main] INFO top.zhacker.rxjava.RxjavaTest - called findById 1
*/
@Test
public void test_defer(){
Observable<String> observableNotDeferred = Observable.just(findById(1L));
log.info("subscribe observable");
observableNotDeferred.blockingSubscribe();
Observable<String> observableDeferred = Observable.defer(()-> Observable.just(findById(1L)));
log.info("subscribe observable");
observableDeferred.blockingSubscribe();
Observable<String> observable = Observable.fromCallable(()-> findById(1L));
log.info("subscribe observable");
observable.blockingSubscribe();
}
/**
* Timed[time=1530762785174, unit=MILLISECONDS, value=1]
* Timed[time=1530762785175, unit=MILLISECONDS, value=2]
* Timed[time=1530762785175, unit=MILLISECONDS, value=3]
* Timed[time=1530762787179, unit=MILLISECONDS, value=5]
* Timed[time=1530762788180, unit=MILLISECONDS, value=6]
* Timed[time=1530762789179, unit=MILLISECONDS, value=7]
* Timed[time=1530762790182, unit=MILLISECONDS, value=8]
* Timed[time=1530762791180, unit=MILLISECONDS, value=9]
*/
@Test
public void test_delay(){
Observable
.just(1,2,3)
.delay(2,TimeUnit.SECONDS) //延迟2s
.timestamp() //加时间戳 Timed<T>
.blockingSubscribe(System.out::println);
Observable
.intervalRange(5, 5, 2, 1, TimeUnit.SECONDS) //延迟2s,之后每个延迟1s
.timestamp()
.blockingSubscribe(System.out::println);
}
/**
*
* @throws Exception
*/
@Test
public void test_error_retry() throws Exception{
Observable.fromCallable(()-> findByIdWithError(3L))
.doOnError(e-> log.warn("", e))
.retry(2, e-> e.getMessage().equals("id.not.exist"))
.blockingSubscribe();
}
/**
*
* @throws Exception
*/
@Test
public void test_error_returnItem() throws Exception{
Observable.fromCallable(()-> findByIdWithError(3L))
.doOnError(e-> log.warn("", e))
.onErrorReturnItem("default result")
.blockingSubscribe(System.out::println);
}
/**
*
* @throws Exception
*/
@Test
public void test_error_resumeNext() throws Exception{
Observable.fromCallable(()-> findByIdWithError(3L))
.doOnError(e-> log.warn("", e))
.onErrorResumeNext(Observable.just("default result"))
.blockingSubscribe(System.out::println);
}
/**
*
* @throws Exception
*/
@Test
public void test_timeout_retry() throws Exception{
Observable.fromCallable(()-> findById(1L))
.timeout(100, TimeUnit.MILLISECONDS)
.doOnError(e-> log.warn("", e))
.retry(2, e-> e instanceof TimeoutException)
.blockingSubscribe();
}
/**
* start handle id= 1
* start handle id= 2
* start handle id= 3
* start handle id= 4
* start handle id= 5
* result for :5
* result for :1
* result for :2
* result for :3
* result for :4
* trace = StopWatch '': running time (millis) = 591
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00591 100%
* @throws Exception
*/
@Test
public void test_flatMap_without_Errors() throws Exception{
trace(()->{
Observable.rangeLong(1,5)
.doOnNext(id-> System.out.printf("start handle id= %s\n", id))
.flatMap(id-> Observable.fromCallable(()->{
return findById(id);
}).subscribeOn(Schedulers.io()), 5)
.doOnNext(result-> System.out.printf("%s\n", result))
.blockingSubscribe();
});
}
/**
*
* start handle id= 1
* start handle id= 2
* start handle id= 3
* start handle id= 4
* start handle id= 5
* result for :2
* result for :4
* result for :1
* result for :5
* trace = StopWatch '': running time (millis) = 457
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00457 100%
*
* java.lang.RuntimeException: id.not.exist
*
* @throws Exception
*/
@Test
public void test_flatMap_throw_Errors_direct() throws Exception{
trace(()->{
Observable.rangeLong(1,5)
.doOnNext(id-> System.out.printf("start handle id= %s\n", id))
.flatMap(id-> Observable.fromCallable(()->{
return findByIdWithError(id);
}).subscribeOn(Schedulers.io()), 5)
.doOnNext(result-> System.out.printf("%s\n", result))
.blockingSubscribe();
});
}
/**
*start handle id= 1
* start handle id= 2
* start handle id= 3
* start handle id= 4
* start handle id= 5
* result for :2
* result for :1
* result for :4
* result for :5
* trace = StopWatch '': running time (millis) = 450
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00450 100%
*
* java.lang.RuntimeException: id.not.exist
*/
@Test
public void test_flatMap_delayErrors(){
trace(()->{
Observable.rangeLong(1,5)
.doOnNext(id-> System.out.printf("start handle id= %s\n", id))
.flatMap(id-> Observable.fromCallable(()->{
return findByIdWithError(id);
}).subscribeOn(Schedulers.io()), true, 5)
.doOnNext(result-> System.out.printf("%s\n", result))
.blockingSubscribe();
});
}
/**
* start handle id= 1
* start handle id= 2
* start handle id= 3
* start handle id= 4
* start handle id= 5
* result for :1
* result for :2
* trace = StopWatch '': running time (millis) = 350
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00350 100%
*
* java.lang.RuntimeException: id.not.exist
* @throws Exception
*/
@Test
public void test_concatMap_direct_throw_Errors() throws Exception{
trace(()->{
Observable.rangeLong(1,5)
.doOnNext(id-> System.out.printf("start handle id= %s\n", id))
.concatMapEager(id-> Observable.<String>fromCallable(()->{
return findByIdWithError(id);
}).subscribeOn(Schedulers.io()))
.doOnNext(result-> System.out.printf("%s\n", result))
.blockingSubscribe();
});
}
/**
* start handle id= 1
* start handle id= 2
* start handle id= 3
* start handle id= 4
* start handle id= 5
* result for :1
* result for :2
* result for :4
* result for :5
* trace = StopWatch '': running time (millis) = 410
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00410 100%
*
* java.lang.RuntimeException: id.not.exist
* @throws Exception
*/
@Test
public void test_concatMap_delayErrors() throws Exception{
trace(()->{
Observable.rangeLong(1,5)
.doOnNext(id-> System.out.printf("start handle id= %s\n", id))
.concatMapEagerDelayError(id-> Observable.fromCallable(()->{
return findByIdWithError(id);
}).subscribeOn(Schedulers.io()), true)
.doOnNext(result-> System.out.printf("%s\n", result))
.blockingSubscribe();
});
}
/**
21
22
23
trace = StopWatch '': running time (millis) = 120
-----------------------------------------
ms % Task name
-----------------------------------------
00120 100%
* @throws Exception
*/
@Test
public void test_amb() throws Exception{
trace(()->{
Observable.ambArray(
Observable.intervalRange(1, 3, 50,100,TimeUnit.MILLISECONDS),
Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS),
Observable.range(21, 3).delay(30,TimeUnit.MILLISECONDS)
).blockingSubscribe(System.out::println);
});
}
/**
* 21
* 22
* 23
* 11
* 1
* 12
* 2
* 13
* 3
* trace = StopWatch '': running time (millis) = 389
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00389 100%
* @throws Exception
*/
@Test
public void test_merge() throws Exception{
trace(()->{
Observable.merge(
Observable.intervalRange(1, 3, 50,100,TimeUnit.MILLISECONDS),
Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS),
Observable.range(21, 3).delay(30,TimeUnit.MILLISECONDS)
).blockingSubscribe(System.out::println);
});
}
/**
* 1
* 2
* 3
* 11
* 12
* 13
* 21
* 22
* 23
* trace = StopWatch '': running time (millis) = 670
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00670 100%
* @throws Exception
*/
@Test
public void test_concat() throws Exception{
trace(()->{
Observable.concat(
Observable.intervalRange(1, 3, 50,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
Observable.range(21, 3).delay(30,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())
).blockingSubscribe(System.out::println);
});
}
/**
* 1
* 2
* 3
* 11
* 12
* 13
* 21
* 22
* 23
* trace = StopWatch '': running time (millis) = 407
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00407 100%
* @throws Exception
*/
@Test
public void test_concatEager() throws Exception{
trace(()->{
Observable.concatEager(Arrays.asList(
Observable.intervalRange(1, 3, 50,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
Observable.range(21, 3).delay(300,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())),
3, 3
).blockingSubscribe(System.out::println);
});
}
/**
* 1,11,21
* 2,12,22
* 3,13,23
* trace = StopWatch '': running time (millis) = 337
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00337 100%
* @throws Exception
*/
@Test
public void test_zip() throws Exception{
trace(()->{
Observable.zipIterable(Arrays.asList(
Observable.intervalRange(1, 4, 50,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
Observable.range(21, 4).delay(100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())),
a-> Arrays.stream(a).map(String::valueOf).collect(joining(",")),true,10
).blockingSubscribe(System.out::println);
});
}
/**
* 1,11,21
* 1,11,22
* 1,11,23
* 1,11,24
* 1,12,24
* 2,12,24
* 2,13,24
* trace = StopWatch '': running time (millis) = 361
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 00361 100%
* @throws Exception
*/
@Test
public void test_combineLatest() throws Exception{
trace(()->{
Observable.combineLatest(Arrays.asList(
Observable.intervalRange(1, 2, 50,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
Observable.intervalRange(11, 3, 40,100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()),
Observable.range(21, 4).delay(100,TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())),
a-> Arrays.stream(a).map(String::valueOf).collect(joining(",")),10
).blockingSubscribe(System.out::println);
});
}
/**
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 23256 054% without-buffer
* 19987 046% with-buffer
*
* @throws Exception
*/
@Test
public void test_without_backPressure() throws Exception{
StopWatch stopWatch = new StopWatch();
Observable<String> observable = Observable.<String>create(emitter -> {
try {
int pageNo = 1;
while (pageNo <= 5) {
final int tmp = pageNo;
log.info("start read pageNo={}", pageNo);
IntStream.range(0,10).mapToObj(i-> String.valueOf((tmp-1)*10000+i)).forEach(sku->{
sleep(10, TimeUnit.MILLISECONDS);
emitter.onNext(sku);
});
log.info("end read pageNo={}", pageNo);
++ pageNo;
}
emitter.onComplete();
} catch (Exception e) {
log.error("{}", e);
} finally {
emitter.onComplete();
}
}).subscribeOn(Schedulers.io());
stopWatch.start("without-buffer");
observable
.doOnNext(list-> log.info("process {}", list))
.flatMap(a-> Observable
.fromCallable(()->findById(a))
.subscribeOn(Schedulers.io()),
false, 3)
.blockingSubscribe();
stopWatch.stop();
stopWatch.start("with-buffer");
observable
.buffer(3)
.flatMapIterable(list-> Observable.fromIterable(list)
.flatMap(a-> Observable
.fromCallable(()-> findById(a))
.subscribeOn(Schedulers.io()),
false, list.size())
.toList()
.blockingGet())
.blockingSubscribe();
stopWatch.stop();
log.info("{}", stopWatch.prettyPrint());
}
/**
* -----------------------------------------
* ms % Task name
* -----------------------------------------
* 04152 055% manual subscribe
* 03362 045% with buffer
* @throws Exception
*/
@Test
public void test_flowable_backpressure() throws Exception{
Flowable<String> flowable = Flowable.<String>create(emitter -> {
try {
int pageNo = 1;
while (pageNo < 5) {
final int tmp = pageNo;
log.info("start pageNo={}", pageNo);
Flowable.range(1,10).map(i-> String.valueOf(tmp*10000+i)).blockingSubscribe(sku-> {
while(emitter.requested()==0){
if(emitter.isCancelled()){
break;
}
}
sleep(10, TimeUnit.MILLISECONDS);
emitter.onNext(sku);
});
log.info("end pageNo={}", pageNo);
++ pageNo;
}
emitter.onComplete();
} catch (Exception e) {
log.error("{}", e);
} finally {
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());
StopWatch stopWatch = new StopWatch();
stopWatch.start("manual subscribe");
Subscriber<String> subscriber = new Subscriber<String>() {
Subscription mSubscription;
@Override
public void onSubscribe(Subscription s) {
log.info("onSubscribe");
mSubscription = s;
s.request(1);
}
@Override
public void onNext(String sku) {
log.info("onNext: {}", sku);
mSubscription.request(1);
sleep(100, TimeUnit.MILLISECONDS);
}
@Override
public void onError(Throwable t) {
log.info("onError: ", t);
mSubscription.cancel();
}
@Override
public void onComplete() {
log.info("onComplete");
mSubscription.cancel();
}
};
flowable.blockingSubscribe(subscriber);
stopWatch.stop();
stopWatch.start("with buffer");
flowable
.buffer(3)
.doOnNext(list-> log.info("process batch {}", list))
.map(list-> Flowable.fromIterable(list)
.flatMap(a-> Flowable
.fromCallable(()->findById(a))
.subscribeOn(Schedulers.io()),
false,
list.size())
.toList()
.blockingGet())
.flatMap(Flowable::fromIterable)
.blockingSubscribe();
stopWatch.stop();
log.info("{}", stopWatch.prettyPrint());
}
}