我和rx的对话来继续了解冷观察者和热观察者
wow我们又多了一天来学习新的知识这真的很棒。
hello 伙伴们。希望你们能做的更好。这是我们的第六部分。在这部分我将继续和rx扯淡
还有一部分重要的事情。夏天和冬天Observable 意味着“冷“,“热”。
简介
这并不是这一部分的介绍因为他只是上一部分的延伸。在开始之前,我们将对最后一部分进行一些修改,在最后部分。我们将遇到一个可观察rx,它将给我们一些建议去学习rx,之后他将分享一些方法,怎么去创建被观察者并且告诉我们什么事“冷 ”“热”观察者。
rx:虽然有很多但是我能但是关于两种被观察者我能做出解释。一种称之为冷被观察者另一种被叫做热观察者,一段时间之后开发者们习惯了这些“冷 热”这些都是很简单的概念,但是我将通过一些幼稚的例子来讲解这些概念,因此你们将会有一种理念,并且我将告诉你们如何在代码中使用这种理念
me:当然。我会试着在你之前去尝试。让你来告诉我这是对的还是错的。
rx: 哈哈。 你知道现在有多少人了解那些销售人员用什么口号在市场 商店前
来吸引人们
me:我想是的,大部分亚洲国家都不了解这种概念,比如巴基斯坦,印度。你能不能试着再举一些其他的例子。所以全世界的人都能轻易地抓住这个概念。
rx: 小case.那么有多少人了解咖啡厅?
me:我认为所有人吧
rx: good,那么现在有两杯咖啡厅。一家叫做热音乐咖啡厅一家叫做冷音乐咖啡厅任何人都可以去冷音乐咖啡厅买到咖啡并且做到任何地方。在咖啡厅中有1款个耳机能够让每一个做的地方人都能听到,他们有三个剧本,这款智能耳机是每一个带上的人都从第一个开始播放。如果换了一个人戴耳机那么也将从头播放。如果摘掉耳机那么将停止。
在热音乐咖啡馆里,他们有一个完整的音乐系统。当你走进那家咖啡馆时,你会开始听诗,因为他们有一个非常好的音乐系统,有非常大的扬声器。他们也有无限的诗歌,作为第一个咖啡馆的男孩打开咖啡馆,他/她开始了这个系统。所以系统是独立于咖啡馆的客户,任何一个人进入咖啡馆,他都会从那个时间开始听诗,在他进入咖啡馆之前,他不知道已经播放了多少首诗。在观察中,这是相同的概念。
就拿冷音乐咖啡厅来说吧。冷观察者总是很懒的。就像你通过fromArray创建一个被观察者或者其他方法,这就像这个耳机,当你订阅的时候,你就会得到他们的数据,就像任何人带着都会开始播放诗一样。 现在观察者取消关注被观察着了。 你不会得到任何数据就像关机关闭了一样
最后很重要的一点。冷咖啡馆有很多耳机,但是每个耳机的都是在穿戴的时候开发播放。如果一个人在听到第二首的时候,另一个人戴上耳机 那么他将从第一手诗开始播放,我想表达的就是每个人都有一个独立的歌单,同理我们又三个订阅者、他们都订阅了一个冷的被观察者,他们将会得到一个独立的数据流 ,这意味着被观察者将会调用三次onnext,当订阅这订阅的时候,通过这种方式,我们可以说冷的被观察者依赖于用户就想耳机一样
现在,“热观察者”就像一个热咖啡咖啡馆的音乐系统。一旦开了咖啡馆,音乐系统就开始播放诗歌,照顾任何一个人。它总是演奏诗歌,任何人到来,他会从那一刻开始听那首诗。同样的情况也发生在热点观察中,一旦它们被创建并开始发送数据,任何订阅用户都将订阅这个可观察的数据并且从那个特定的时间点开始获取数据他将永远不会得到旧的值。热观察者是独立于用户的,他们不关心任何以前的数据。任何用户订阅的任何时候都将从这一点开始获取数据。我想我会在代码中使用同样的例子,之后我会给你们一些实际的例子。
public class HotVsCold {
public static void main(String[] args) throws InterruptedException {
List poemsPlayList = Arrays.asList("Poem 1", "Poem 2", "Poem 3");
Observable coldMusicCoffeCafe = Observable.fromArray(poemsPlayList);
Consumer client1 = poem-> System.out.println(poem);
Consumer client2 = poem-> System.out.println(poem);
Consumer client3 = poem-> System.out.println(poem);
Consumer client4 = poem-> System.out.println(poem);
coldMusicCoffeCafe.subscribe(client1);
coldMusicCoffeCafe.subscribe(client2);
System.out.println(System.currentTimeMillis());
Thread.sleep(2000);
System.out.println(System.currentTimeMillis());
coldMusicCoffeCafe.subscribe(client3);
coldMusicCoffeCafe.subscribe(client4);
}
}
这是代码中的一个非常简单的例子。我有4个客户,我有一个播放列表,我把它变成冷音乐咖啡的被观察者。后,前两个客户与被观察到联系后来我有一个2秒等待,然后3和4端订阅冷可观测的,最后当我们看到输出我们可以很容易地看到所有用户或客户将诗歌从开始到结束。
Output:
[Poem 1, Poem 2, Poem 3]
[Poem 1, Poem 2, Poem 3]
1494142518697
1494142520701
[Poem 1, Poem 2, Poem 3]
[Poem 1, Poem 2, Poem 3]
热被观察者
public static void main(String[] args) throws InterruptedException {
Observable hotMusicCoffeeCafe = Observable.interval(1000, TimeUnit.MILLISECONDS);
ConnectableObservable connectableObservable = hotMusicCoffeeCafe.publish();
connectableObservable.connect(); // Cafe open on this line and cafe boy start the systemConsumer client1 = poem-> System.out.println("Client 1 poem"+poem);
Consumer client2 = poem-> System.out.println("Client 2 poem"+poem);
Consumer client3 = poem-> System.out.println("Client 3 poem"+poem);
Consumer client4 = poem-> System.out.println("Client 4 poem"+poem);
Thread.sleep(2000); // After two poems already played client 1 enter. So he should listens from poem 2.
connectableObservable.subscribe(client1);
Thread.sleep(1000); // Client two should start listening poem 3
connectableObservable.subscribe(client2);
Thread.sleep(4000); // Client 3 and 4 enter will start from poem 9.
connectableObservable.subscribe(client3);
connectableObservable.subscribe(client4);
while (true);
}
热音乐咖啡馆开着,咖啡馆启动了这个系统。诗歌开始播放,就像我们所说的连接方法一样。暂时不要把注意力集中在连接上,只试着抓住这个概念。在两首诗或几秒钟后,第一个顾客进入咖啡馆,他就会开始聆听第二首诗。然后下一个顾客在1秒后进入,所以他开始听第三首诗。后来的顾客在4秒的顾客2之后进入了咖啡馆。现在他们开始听第9首诗了。您可以看到,这种热的可观察性与订阅者无关。一旦他开始发送数据,他并不关心任何一个人是否订阅。另一方面,所有订阅用户将从他们订阅的时间获得数据,他们永远不会得到已经发出的历史或事件。
现在我有了一种感觉,你可以理解“热”和“冷”的概念。现在是时候看看如何用点的形式来创造这些可见的东西了。
1cold observable
~所有的隐式被观察者都是冷被观察者,也就是说我们使用creat()或者fromarray()或者其他方式创建的被观察者的时候他都是冷被观察者
~当订阅冷被观察这的订阅者 他们总是会得到完整的独立的数据流
~没有订阅者去订阅冷被观察者的时候 ,他们就什么都做不了。他们很懒
2Hotobservable
~热被观察者一旦创建他们就开始发送数据。而并不关心订阅者的情况
~所有的订阅者在订阅热被观察者的时候如果在一个特定的时间段(相同)那么他们将获取相同的数据
me:额 额 额 好。那么你能告诉我怎么去吧一个冷被观察者转化成一个热被观察者吗
rx: 当然。这真的很简单
List integers = new ArrayList<>();
Observable.range(0, 10000)
.subscribe(count -> integers.add(count));
Observable>listObservable= Observable.fromArray(integers);
来看上面这段例子,listObservable 是一个冷被观察者。 让我们来看一下他们是怎么转化的
Observable> listObservable = Observable.fromArray(integers);
ConnectableObservableconnectableObservable= listObservable.publish();
me:在上面的代码中我们通过publish来吧我们的冷被观察者转化成为热被观察者,所以我们可以这么说。所有的冷都可以通过publish方法转化为热并且他总是和你的被观察者联系起来,但是现在它并没有发送数据 ,这是一个棘手的问题,任何被观察者我们都能调用publish
它意味着订阅者在相同的时间段订阅这个被观察者 那么他们将得到相同的数据。从他们订阅的时间段开始算。 正如我们知道的在热音乐咖啡厅中 他们想获得相同的数据。只有他们在相同的时间段开始订阅才可以。……现在有趣的一点是,如果有很多订阅用户订阅了连接表,他们就什么也得不到。也许你们会感到困惑。基本上有两件事。当我将调用publish()时,它的意思是,这个可观察的数据将会发出一个单独的数据,或者这个可观察到的数据来源是向所有用户发送数据的单一数据源,但是要启动数据,我需要调用connect()方法,如下所示。
Observable> listObservable = Observable.fromArray(integers);
ConnectableObservable connectableObservable = listObservable.publish();
connectableObservable.connect();
现在看 这个真的很简单 记着publish将吧冷的转化成热的 但是并没有发送数据,为了发射数据我们需要调用connect()方法。当我转化完的数据调用connect,将会在没有任何订阅的情况下开始发送数据,也许有上千的订阅者,现在还有一起其他的的方法真的很有用 refCount(), share(), replay() ,但是我将停下来、我将做一个更好的例子让你有一个回顾真的了解这个概念。
me: oh man. 这真的很长很繁琐。但是真的很简单。
rx: 哈哈哈、。其实我还没说什么。 那些只是用一种很简单的方式让每个人都理解这个概念,那就是真的事一个很简单的话题
me:阿拉我赞美你。你说的太对了。现在我用一个例子来更准确的理解这个概念
如下所示 我们有一个被观察者
Observable just = Observable.just("Hello guys");
现在两个被不同的订阅者
public class HotVsCold {
public static void main(String[] args) {
Observable just = Observable.just("Hello guys");
just.subscribe(s-> System.out.println(s));
just.subscribe(s-> System.out.println(s));
}
}
Output:
Hello guys
Hello guys
现在我有一个问题 这个Observable 是冷? 还是热?我知道你想说没有调用publish 所以他是冷的, 有些时间 我的observable 是从第三方的库产生的 我并不了解这个observable 现在我要举一个新的例子,因为很多事情会对每个人都很清楚。
public static void main(String[] args) {
Random random = new Random();
Observable just = Observable.create(source->source.onNext(random.nextInt()));
just.subscribe(s-> System.out.println(s));
just.subscribe(s-> System.out.println(s));
}
这里我有一个随机值,所以是时候检查程序输出,讨论它是冷的还是热的?
Output:
1531768121
607951518
所以两个值都是不同的。它的意思是,这是冷被观察者,因为每次得到一个新的值,根据对冷被观察者定义,它们从不共享数据。每次他们生成一个新的或新的数据,或者用简单的单词onNext()方法调用两个不同的订阅者。
现在让我们把冷转化成热
public static void main(String[] args) {
Random random = new Random();
Observable just = Observable.create(source->source.onNext(random.nextInt()));ConnectableObservablepublish=just.publish();publish.subscribe(s-> System.out.println(s));
publish.subscribe(s-> System.out.println(s));
publish.connect();}
Output:
1926621976
1926621976
这一次,我在两个订阅者中都得到了相同的数据。它的意思是hotObservable,因为热观测总是从单个源发送数据,或者简单地说,我们从onNext()方法的一次调用中获得了数据。接下来,我将解释发布()和connect()方法的调用。
当我调用publish 方法的时候。它意味着我这个被观察者独立于订阅者并且对于订阅者来说发射的是相同的数据,简单的说 Hot Observable 将把onnext这个相同的方法推送给所有的订阅者,也许这让你有一点困惑,在订阅着订阅了之后我调用了connect(),我想像你们展示,Hot Observable是独立的,数据的发射有onnext的一个调用来完成。我们知道Hot Observable在我们调用connect的时候数据开始发射,所以我们订阅两个订阅者,然后调用connect方法。这两种都可以得到相同的数据 下面我在举一个例子
Random random = new Random();
Observable just =Observable.create(source->source.onNext(random.nextInt()));ConnectableObservablepublish= just.publish();publish.connect();publish.subscribe(s-> System.out.println(s));
publish.subscribe(s-> System.out.println(s));
这里仅仅有一处不同,我们来试试他输出的是什么
没错是空的,困惑吗? 我来讲解一下。正如你看到的我创建了一个我吧一个随机值创建成Observable,它只调用一次。在我创建的过程中,我通过调用publish()方法将这种冷转换为热观察。转换之后,我调用了connect()方法。因为这是一个热观察,我们知道它是独立于订户的所以它开始发射随机数我们知道这只会产生一个随机数。连接()我们的订阅者,订阅但那时我们没有得到任何数据,因为热观察已经释放出一个值。我认为事情对每个人都很清楚。现在我们可以在热观察发射中加入log。所以我们可以确认,我说的是真的。
public static void main(String[] args) {
Random random = new Random();
Observable just = Observable.create(source -> {
int value = random.nextInt();System.out.println("Emitted data: " + value);source.onNext(value);
}
);
ConnectableObservable publish = just.publish();
publish.connect();
publish.subscribe(s -> System.out.println(s));
publish.subscribe(s -> System.out.println(s));
}
Output:
Emitted data: -690044789
现在正如你看的HotObservable 在调用connect之后开始发送数据,订阅的过程在这之后,这就是我们为什么获取的是空。、
在下一步之前我们要温习一下
1。所有的observables都是隐式的冷观察。
2 想要把冷转化为热。我们要调用publish方法。它返回给我们一个被订阅者联系单。这也是一个热observables,但却没有开始发射数据。
3调用connect开始发射数据
rx:打扰了,在下一个级别前。你试着写一段代码
me: 当然
public static void main(String[] args) throws InterruptedException {
Random random = new Random();Observable just = Observable.create(source -> {Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(aLong -> {int value = random.nextInt();System.out.println("Emitted data: " + value);source.onNext(value);});}); // Simple same Observable which we are using only I added a one thing now this will produce data after every one second.ConnectableObservable publish = just.publish();
publish.connect();Thread.sleep(2000); // Hot observable start emitting data and our new subscribers will subscribe after 2 second.publish.subscribe(s -> System.out.println(s));
publish.subscribe(s -> System.out.println(s));
while (true);
}Output:
Emitted data: -918083931
Emitted data: 697720136
Emitted data: 416474929
416474929
416474929
Emitted data: -930074666
-930074666
-930074666
Emitted data: 1694552310
1694552310
1694552310
Emitted data: -61106201
-61106201
-61106201
现在我们可以很简单的看出上面的输出、
根据我们一开始的讨论, Hot Observable 是不停工作的。 Hot Observable 发射数据,虽然没有订阅者 但是我们还是得到三个值,
在2秒之后,我们订阅了两个新的订阅者,他们开始获得新的数据值,并且它们都得到了相同的值。
是时候把我们的这个概念带到下一个层次了。因为我们已经抓住了冷和Hot Observables的概念。对于下一个热点,我将用场景的形式来解释。
场景1
我想要一个 Hot Observable,并且订阅 以前所有的值,这个Hot Observable早就发射的值还有新的值应该同步。为了解决这个问题我们有一个非常简单的方法。这叫做replay()。我们只需要调用那个方法。
public static void main(String[] args) throws InterruptedException {
Random random = new Random();
Observable just = Observable.create(
source -> {
Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(aLong -> {
int value = random.nextInt();
System.out.println("Emitted data: " + value);
source.onNext(value);
});
}
);
ConnectableObservable publish =just.replay();publish.connect();Thread.sleep(2000);
publish.subscribe(s -> System.out.println("Subscriber 1: "+s));
publish.subscribe(s -> System.out.println("Subscriber 2: "+s));
while (true);
}
Output:
Emitted data: -1320694608
Emitted data: -1198449126
Emitted data: -1728414877
Emitted data: -498499026
Subscriber 1: -1320694608
Subscriber 1: -1198449126
Subscriber 1: -1728414877
Subscriber 1: -498499026
Subscriber 2: -1320694608
Subscriber 2: -1198449126
Subscriber 2: -1728414877
Subscriber 2: -498499026
Emitted data: -1096683631
Subscriber 1: -1096683631
Subscriber 2: -1096683631
Emitted data: -268791291
Subscriber 1: -268791291
Subscriber 2: -268791291
如果您在这里查看我们的输出和代码。您可以很容易地在热观察中轻松地获得replay()的概念。首先,我创建了一个可以在创建后启动数据的热观察。然后在2秒钟之后,我们的第一个和第二个订阅用户就会订阅那个热点,但是在那个时候,热观测已经发出了4个值。你可以看到,在输出中,我们的订阅者首先获得已经发射的值,然后他们与可观测的数据发射同步
2场景
我想要一个Hot Observable,当第一个订阅者订阅了这个Hot Observable的时候才开始数据发射,当所有的订阅者都不订阅那个Hot Observable的时候,应该停止。
这次又是挺简单的
public static void main(String[] args) throws InterruptedException {
Observable observable = Observable.interval(500, TimeUnit.MILLISECONDS).publish().refCount();
Consumer firstSubscriber = s -> System.out.println("Subscriber 1: "+s);
Consumer secondSubscriber = s -> System.out.println("Subscriber 2: "+s);Disposable subscribe1 = observable.subscribe(firstSubscriber);Disposable subscribe2 = observable.subscribe(secondSubscriber);Thread.sleep(2000);subscribe1.dispose();Thread.sleep(2000);subscribe2.dispose();Consumer thirdSubscriber = s -> System.out.println("Subscriber 3: "+s);Disposable subscribe3 = observable.subscribe(thirdSubscriber);Thread.sleep(2000);subscribe3.dispose();while (true);
}
Output:
Subscriber 1: 0
Subscriber 2: 0
Subscriber 1: 1
Subscriber 2: 1
Subscriber 1: 2
Subscriber 2: 2
Subscriber 1: 3
Subscriber 2: 3
Subscriber 2: 4
Subscriber 2: 5
Subscriber 2: 6
Subscriber 2: 7
Subscriber 3: 0
Subscriber 3: 1
Subscriber 3: 2
首先也是最重要的一点这是一个Hot Observable,但只有当第一个用户订阅了这个Observable数据时才会开始数据发射,所有的用户都没有订阅这个Observable,将停止数据发射
正如你在上面的输出中看到的。当最初的两个订阅者订阅了Hot Observable数据发射时,后来有一个订阅者取消订阅,但Hot Observable并没有停止,因为还有一个订阅者,但后来第二个也取消了如此热的Hot Observable停止数据发射。在2秒后,第三个用户订阅了同样的Hot Observable,但这次Hot Observable又开始了数据发射,再次取消后就在此停止
me: wow wow 你用了一个很好demo 让我印象深刻
rx :Thanks Observable.
Observable: 那你现在还没有其他的问题。没有就滚
Me: ("▔□▔) 你能不能告诉我Subject 的概念 还有不同的类型像Publish Behaviour 等
Observable:嗯。在这个概念之前我有一种感觉。我应该告诉您关于ObservableAPI的情况以及它们是如何工作的,以及如何使用Lambda或函数接口,而不需要使用完整的Observable者界面。你认为什么?
Me: 没错。我赖定你了
伙伴们。这一章节很长。但是我要在这里停下了。下次在见