深入讲解RxJava响应式编程框架,背压问题的几种应对模式

文章首发公众号:Java架构师联盟,每日更新技术好文

背压

本节首先介绍什么是背压(Backpressure)问题,然后介绍背压问题的几种应对模式。

深入讲解RxJava响应式编程框架,背压问题的几种应对模式

什么是背压问题

当上下游的流操作处于不同的线程时,如果上游弹射数据的速度快于下游接收处理数据的速度,对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,又不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压问题。

一个存在背压问题的演示实例代码如下:

package com.crazymaker.demo.rxJava.basic;
//省略import
@Slf4j
public class BackpressureDemo {
/**
*演示不使用背压
*/
@Test
public void testNoBackpressure() throws InterruptedException {
//被观察者(主题)
Observable observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//循环10次
for (int i = 0;i<10 ; i++) {
log.info("produce ->" + i);
subscriber.onNext(String.valueOf(i));
}
}
});
//观察者
Action1<String> subscriber = new Action1<String>() {
public void call(String s){
try {
//每消费一次间隔50毫秒
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("consumer ->" + s);
}
};
//订阅:observable与subscriber之间依然通过subscribe()进行关联
observable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(subscriber);
Thread.sleep(Integer.MAX_VALUE);
}
}</pre>

在实例代码中,observable发射操作执行在一条通过Schedulers.io()调度器获取的IO线程上,而观察者subscriber的消费操作执行在另一条通过Schedulers.newThread()调度器获取的新线程上。observable流不断发送数据,累积发送10次;观察者subscriber每隔50毫秒接收一条数据。

运行上面的演示程序后,输出的结果如下:

17:56:17.719 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->0
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->1
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->2
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->3
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->4
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->5
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->6
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->7
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->8
17:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->9
17:56:17.774 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->0
17:56:17.824 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->1
17:56:17.875 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->2
17:56:17.925 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->3
17:56:17.976 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->4
17:56:18.027 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->5
17:56:18.078 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->6
17:56:18.129 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7
17:56:18.179 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->8
17:56:18.230 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->9</pre>

上面的程序有一个特点:生产者observable弹射数据的速度大于下游消费者subscriber接收处理数据的速度,但是由于数据量小,因此上面的程序运行起来没有出现问题。

简单修改一下生产者,将原来的弹射10条改成无限制地弹射,代码如下:

//被观察者(主题)
Observable observable = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//无限制地循环
for (int i = 0; ; i++) {
//log.info("produce ->" + i); subscriber.onNext(String.valueOf(i));
}
}
});</pre>

再次运行该演示程序后,抛出的异常如下:

Caused by: rx.exceptions.MissingBackpressureException
at rx.internal.operators.OperatorObserveOnObserveOnSubscriber.onNext (OperatorObserveOn.java:160) at rx.internal.operators.OperatorSubscribeOnSubscribeOnSubscriber.onNext
(OperatorSubscribeOn.java:74)
at com.crazymaker.demo.rxJava.basic.BackpressureDemo1.call (BackpressureDemo.java:24) at com.crazymaker.demo.rxJava.basic.BackpressureDemo1.call
(BackpressureDemo.java:19)
at rx.Observable.unsafeSubscribe(Observable.java:10327)
at rx.internal.operators.OperatorSubscribeOnSubscribeOnSubscriber.call (OperatorSubscribeOn.java:100) at rx.internal.schedulers.CachedThreadSchedulerEventLoopWorker$1.call
(CachedThreadScheduler.java:230)
... 9 more</pre>

异常原因:由于上游observable流弹射数据的速度远远大于下游通过subscriber接收的速度,导致observable用于暂存弹射数据的队列空间耗尽,造成上游数据积压。

背压问题的几种应对模式

如何应对背压问题呢?在创建主题时可以使用Observable类的一个重载的create方法设置具体的背压模式,该方法的源代码如下:

public static <T> Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) {
return unsafeCreate(new OnSubscribeCreate<T>(emitter, backpressure));
}</pre>

此方法的第二个参数用于指定一种背压模式。背压模式有多种,比较常用的有“最近模式” Emitter.BackpressureMode.LATEST。这种模式的含义为:如果消费跟不上,那么仅仅缓存最近弹射出来的数据,将老旧一点的数据直接丢弃。

使用“最近模式”背压,改写4.8.1节的测试用例,代码如下:

/**
*演示使用“最近模式”背压
*/
@Test
public void testBackpressure() throws InterruptedException {
//主题实例,使用背压
Observable observable = Observable.create(
new Action1<Emitter<String>> () {
@Override
public void call(Emitter<String> emitter) {
//无限循环
for (int i = 0; ; i++) {
//log.info("produce ->" + i);
emitter.onNext(String.valueOf(i));
}
}
}, Emitter.BackpressureMode.LATEST);
//订阅者(观察者)
Action1<String> subscriber = new Action1<String>() {
public void call(String s) {
try {
//每消费一次间隔50毫秒
Thread.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("consumer ->" + s);
}
};
//订阅: observable与subscriber之间依然通过subscribe()进行关联
observable
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(subscriber);
Thread.sleep(Integer.MAX_VALUE);
}</pre>

运行这个演示程序,部分输出的结果节选如下:

18:51:54.736 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->0
18:51:54.745 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->1
//省略部分输出
18:51:55.217 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->123
18:51:55.220 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->124
18:51:55.224 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->125
18:51:55.228 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->126
18:51:55.232 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->127
18:51:55.236 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337652
18:51:55.240 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337653
18:51:55.244 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337654
//省略部分输出
18:51:55.595 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337747
18:51:55.598 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->14161628</pre>

从输出的结果可以看到,上游主题连续不断地弹射,下游订阅者在接收完127后直接跳到了7337652,其间弹射出来的几百万数据(相对旧一点的数据)就直接被丢弃了。

除了 Emitter.BackpressureMode.LATEST“最近模式”外,RxJava在Emitter<T>接口中通过一个枚举常量定义了以下几种背压模式:

enum BackpressureMode {
/**
*No backpressure is applied(无背压模式)
*可能导致rx.exceptions.MissingBackpressureException异常
或者IllegalStateException异常
/
NONE,
/

如果消费者跟不上,就抛出rx.exceptions.MissingBackpressureException异常
/
ERROR,
/

缓存所有的onNext方法弹射出来的消息,等待消费者慢慢地消费
/
BUFFER,
/

如果下游消费跟不上,就丢弃onNext方法弹射出来的新消息
/
DROP,
/

*如果消费者跟不上,就丢掉旧的消息,缓存onNext方法弹射出来的新消息
*/
LATEST
}</pre>

对于以上RxJava背压模式,介绍如下:

(1)BackpressureMode.DROP:在这种模式下,Observable主题使用固定大小为128的缓冲区。如果下游订阅者无法处理,流的第一个元素就会缓存下来,后续的会被丢弃。

(2)BackpressureMode.LATEST:这种模式与BackpressureMode.DROP类似,并且Observable主题也使用固定大小为128的缓冲区。BackpressureMode.LATEST的缓存策略不同,使用最新的弹出元素替换缓冲区缓存的元素。当消费者可以处理下一个元素时,它收到的是Observable最近一次弹出的元素。

(3)BackpressureMode.NONE和BackpressureMode.ERROR:在这两种模式中发送的数据不使用背压。如果上游observable主题弹射数据的速度大于下游通过subscriber接收的速度,造成上游数据积压,就会抛出 MissingBackpressureException异常。

(4)BackpressureMode.BUFFER:在这种模式下,有一个无限的缓冲区(初始化时是128),下游消费不了的元素全部会放到缓冲区中。如果缓冲区中持续地积累,就会导致内存耗尽,抛出OutOfMemoryException异常。
文章首发公众号:Java架构师联盟,每日更新技术好文

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容