30 天精通 RxJS(24): Observable operators - multicast, refCount, publish, share

转载
昨天我们介绍完了各种 Subject,不晓得各位读者还记不记得在一开始讲到 Subject 时,是希望能够让 Observable 有新订阅时,可以共用前一个订阅的执行而不要从头开始,如下面的例子

var source = Rx.Observable.interval(1000).take(3);

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subject = new Rx.Subject()

subject.subscribe(observerA)

source.subscribe(subject);

setTimeout(() => {
    subject.subscribe(observerB);
}, 1000);

// "A next: 0"
// "A next: 1"
// "B next: 1"
// "A next: 2"
// "B next: 2"
// "A complete!"
// "B complete!"

上面这段程式码我们用 subject 订阅了 source,再把 observerA 跟 observerB 一个个订阅到 subject,这样就可以让 observerA 跟 observerB 共用同一个执行。但这样的写法会让程式码看起来太过複杂,我们可以用 Observable 的 multicast operator 来简化这段程式

Operators

multicast

multicast 可以用来挂载 subject 并回传一个可连结(connectable)的 observable,如下

var source = Rx.Observable.interval(1000)
             .take(3)
             .multicast(new Rx.Subject());

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

source.subscribe(observerA); // subject.subscribe(observerA)

source.connect(); // source.subscribe(subject)

setTimeout(() => {
    source.subscribe(observerB); // subject.subscribe(observerB)
}, 1000);

JSBin | JSFiddle

上面这段程式码我们透过 multicast 来挂载一个 subject 之后这个 observable(source) 的订阅其实都是订阅到 subject 上。

source.subscribe(observerA); // subject.subscribe(observerA)

必须真的等到 执行 connect() 后才会真的用 subject 订阅 source,并开始送出元素,如果没有执行 connect() observable 是不会真正执行的。

source.connect();

另外值得注意的是这裡要退订的话,要把 connect() 回传的 subscription 退订才会真正停止 observable 的执行,如下

var source = Rx.Observable.interval(1000)
             .do(x => console.log('send: ' + x))
             .multicast(new Rx.Subject()); // 无限的 observable 

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subscriptionA = source.subscribe(observerA);

var realSubscription = source.connect();

var subscriptionB;
setTimeout(() => {
    subscriptionB = source.subscribe(observerB);
}, 1000);

setTimeout(() => {
    subscriptionA.unsubscribe();
    subscriptionB.unsubscribe(); 
    // 这裡虽然 A 跟 B 都退订了,但 source 还会继续送元素
}, 5000);

setTimeout(() => {
    realSubscription.unsubscribe();
    // 这裡 source 才会真正停止送元素
}, 7000);

JSBin | JSFiddle

上面这段的程式码,必须等到 realSubscription.unsubscribe() 执行完,source 才会真的结束。

虽然用了 multicast 感觉会让我们处理的对象少一点,但必须搭配 connect 一起使用还是让程式码有点複杂,通常我们会希望有 observer 订阅时,就立即执行并发送元素,而不要再多执行一个方法(connect),这时我们就可以用 refCount。

refCount

refCount 必须搭配 multicast 一起使用,他可以建立一个只要有订阅就会自动 connect 的 observable,范例如下

var source = Rx.Observable.interval(1000)
             .do(x => console.log('send: ' + x))
             .multicast(new Rx.Subject())
             .refCount();

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subscriptionA = source.subscribe(observerA);
// 订阅数 0 => 1

var subscriptionB;
setTimeout(() => {
    subscriptionB = source.subscribe(observerB);
    // 订阅数 0 => 2
}, 1000);

JSBin | JSFiddle

上面这段程式码,当 source 一被 observerA 订阅时(订阅数从 0 变成 1),就会立即执行并发送元素,我们就不需要再额外执行 connect。

同样的在退订时只要订阅数变成 0 就会自动停止发送

var source = Rx.Observable.interval(1000)
             .do(x => console.log('send: ' + x))
             .multicast(new Rx.Subject())
             .refCount();

var observerA = {
    next: value => console.log('A next: ' + value),
    error: error => console.log('A error: ' + error),
    complete: () => console.log('A complete!')
}

var observerB = {
    next: value => console.log('B next: ' + value),
    error: error => console.log('B error: ' + error),
    complete: () => console.log('B complete!')
}

var subscriptionA = source.subscribe(observerA);
// 订阅数 0 => 1

var subscriptionB;
setTimeout(() => {
    subscriptionB = source.subscribe(observerB);
    // 订阅数 0 => 2
}, 1000);

setTimeout(() => {
    subscriptionA.unsubscribe(); // 订阅数 2 => 1
    subscriptionB.unsubscribe(); // 订阅数 1 => 0,source 停止发送元素
}, 5000);

JSBin | JSFiddle

publish

其实 multicast(new Rx.Subject()) 很常用到,我们有一个简化的写法那就是 publish,下面这两段程式码是完全等价的

var source = Rx.Observable.interval(1000)
             .publish() 
             .refCount();

// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.Subject()) 
//             .refCount();

加上 Subject 的三种变形

var source = Rx.Observable.interval(1000)
             .publishReplay(1) 
             .refCount();

// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.ReplaySubject(1)) 
//             .refCount();

var source = Rx.Observable.interval(1000)
             .publishBehavior(0) 
             .refCount();

// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.BehaviorSubject(0)) 
//             .refCount();

var source = Rx.Observable.interval(1000)
             .publishLast() 
             .refCount();

// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.AsyncSubject(1)) 
//             .refCount();

share

另外 publish + refCount 可以在简写成 share

var source = Rx.Observable.interval(1000)
             .share();

// var source = Rx.Observable.interval(1000)
//             .publish() 
//             .refCount();

// var source = Rx.Observable.interval(1000)
//             .multicast(new Rx.Subject()) 
//             .refCount();

今日小结

今天主要讲解了 multicast 和 refCount 两个 operators 可以帮助我们既可能的简化程式码,并同时达到组播的效果。最后介绍 publish 跟 share 几个简化写法,这几个简化的写法是比较常见的,在理解 multicast 跟 refCount 运作方式后就能直接套用到 publish 跟 share 上。

不知道今天读者们有没有收穫呢? 如果有任何问题欢迎在下方留言给我,谢谢^^

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容