在讲 Hystrix 插件前,我们需要先了解一下 Hystrix 的具体实现,Hystrix 限流框架基于 大名鼎鼎的 RxJava 实现,我们这节先来了解一下 RxJava。
1.8.6.1 响应式编程
响应式编程简称 FRP (Function Reatcive Programing)详细定义大家可以看一下 stackOverFlow 的这位大神解释,简单来说他就是一种基于事件的模型,我们需要知道某个事件是否发生有两种方式,一种就是主动轮训,我们把它称为 Proactive
方式。另一种就是被动接收反馈,我们称为 Reactive
。
响应式编程是未来的一种趋势,对于后端java 程序员关于响应式编程第一个应该会想到 Spring 5 引入的 WebFlux
,而在 Android 中,响应式编程发展却快得多,最主要原因是使用场景,Android 客户端中存在大量的和服务器交互的程序,所以存在大量的异步回掉过程,一般这种请看就会使用到 RxJava
,它可以更加简化代码的逻辑。
1.8.6.2 Rxjava 使用入门
我们先来看一下一个RxJava 调用的实例:
//创建被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("111");
subscriber.onNext("222");
subscriber.onNext("333");
subscriber.onCompleted();
}
});
//创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
System.out.println(s);
}
};
//注册,将观察者和被观察者关联,将会触发OnSubscribe.call方法
observable.subscribe(observer);
我们先来介绍一下 RxJava 最重要的两个成员:
-
Observable
可以理解为被观察者,RxJava
中使用了观察者模式。 -
Observer
可以理解为观察者。
Observable 方法 -
call
即被观察者的所有活动,这里可以调用subscriber.onNext
向订阅者发送消息。 -
subscribe
这个是所有方法的启动源头,我将其主要逻辑整理如下,首先他会调用Observer
的start
方法,告诉它开始了,然后调用自己的call
方法,假如发生异常,就调用subscriber
的onError
方法,然后unsubscribed
, 也就是将被观察者注册的观察者清空。
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
subscriber.onStart();
try {
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
subscriber.onError(RxJavaHooks.onObservableError(e));
return Subscriptions.unsubscribed();
}
}
接下来我们说一下 observer
的三个方法:
-
onCompleted
当Observable
的 call 完成后调用,它后onError
只有一个会被调用 -
onError
我们可以参考上面的源码,在 调用call
异常时调用 -
onNext
Observable
每个发生时间都会调用。
1.8.6.3 Rxjava 各种操作
-
Schedule
调度线程
我们将我们的代码稍微改一下
final CountDownLatch latch = new CountDownLatch(1);
//创建被观察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("111");
subscriber.onNext("222");
subscriber.onNext("333");
subscriber.onCompleted();
System.out.println(Thread.currentThread().getName());
}
});
//创建观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
System.out.println(Thread.currentThread().getName() + "-Complete");
latch.countDown();
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(String s) {
System.out.println(Thread.currentThread().getName() + "-" +s);
}
};
//注册,将观察者和被观察者关联,将会触发OnSubscribe.call方法
observable.subscribeOn(Schedulers.newThread()) // 指定 subscribe() 发生在新的线程
.observeOn(Schedulers.newThread()).subscribe(observer);
latch.await();
得到结果入下:
RxNewThreadScheduler-2
RxNewThreadScheduler-1-111
RxNewThreadScheduler-1-222
RxNewThreadScheduler-1-333
RxNewThreadScheduler-1-Complete
这对于我们来说确实是很酷的事情,这里subscribeOn
和 observeOn
分别指定了我们生产者在调用 call
的线程和消费者的线程。
- map 转换
我们可以参考如上面的图和下面的代码,假如observable
发射的是 Integer 类型,而Obsever
需要接收 String 那么我们就可以通过 map 进行转换。
observable.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread()).map(i -> i +"").subscribe(observer);
- flatmap
flatmap 对于很多新手来说是最难理解的,flat 可以理解为平铺,我们需要将Observable
发射的时间平铺开来的场景,如我们需要将 Stirng 使用空格分割成一个个单词进行消费,如下:
observable.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread()).flatMap(s -> Observable.from(s.split(" "))).subscribe(observer);
我们也可以参考图片,我们这里将一个OnNext 事件转为一个 Observable
然后将这些 Observable
汇总成一个 Observable
给 Obsever
进行消费。
-
from
如上,我们可以通过from 将一个数组快速转为Observable
。 -
filter
顾名思义,就是将一部分事件过滤掉,这里很多操作和 stream 是类似的。 -
distinct
去重。
1.8.6.4 总结
我们这次只是简单地学习了一下 RxJava 的原理,RxJava 还有很多好玩的地方等着我们去探索,有兴趣的小伙伴可以深入学习一下,非常适合入门响应式编程,我们下一节将接收 Hystrix 如何通过 RxJava 实现降级和熔断。