概念
发布-订阅模式又叫观察者模式,它定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知。
描述
不论是在程序世界里还是现实生活中,发布—订阅模式的应用都非常广泛。
比如,小明最近看上了一套房子,到了售楼处之后才被告知,该楼盘的房子早已售罄。好在售楼处工作人员告诉小明,不久后还有一些尾盘推出,开发商正在办理相关手续,手续办好后便可以购买。但到底是什么时候,目前还没有人能够知道。于是小明记下了售楼处的电话,以后每天都会打电话过去询问是不是已经到了购买时间。除了小明,还有小红、小强、小龙也会每天向售楼处咨询这个问题。一个星期过后,该工作人员决定辞职,因为厌倦了每天回答1000个相同内容的电话。
当然现实中没有这么笨的销售公司,实际上故事是这样的:小明离开之前,把电话号码留在了售楼处。售楼处工作人员答应他,新楼盘一推出就马上发信息通知小明。小红、小强和小龙也是一样,他们的电话号码都被记在售楼处的花名册上,新楼盘推出的时候,售楼处工作人员会翻开花名册,遍历上面的电话号码,依次发送一条短信来通知他们。
在上面的例子中,发送短信通知就是一个典型的发布—订阅模式,小明、小红等购买者都是订阅者,他们订阅了房子开售的消息。售楼处作为发布者,会在合适的时候遍历花名册上的电话号码,依次给购房者发布消息。
使用发布—订阅模式有着显而易见的优点:
- 购房者不用再天天给售楼处打电话咨询开售时间,在合适的时间点,售楼处作为发布者会通知这些消息订阅者。
- 购房者和售楼处之间不再强耦合在一起,当有新的购房者出现时,他只需把手机号码留在售楼处,售楼处不关心购房者的任何情况,不管购房者是男是女还是一只猴子。而售楼处的任何变动也不会影响购买者,比如售楼处工作人员离职,售楼处从一楼搬到二楼,这些改变都跟购房者无关,只要售楼处记得发短信这件事情。
第一点说明发布订阅模式可以广泛应用于异步编程中,这是一种替代传递回调函数的方案。比如,我们可以订阅ajax请求的error、succ等事件。或者如果想在动画的每一帧完成之后做一些事情,那我们可以订阅一个事件,然后在动画的每一帧完成之后发布这个事件。在异步编程中使用发布-订阅模式,我们就无需过多关注对象在异步运行期间的内部状态,而只需要订阅感兴趣的事件发生点。
第二点说明发布-订阅模式可以取代对象之间的硬编码的通知机制,一个对象不用再显示地调用另外一个对象的某个接口。发布-订阅模式让两个对象松耦合地联系在一起,虽然不太清楚彼此的细节,但这不影响它们之间相互通信。当有新的订阅者出现时,发布者的代码不需要任何修改;同样发布者需要改变时,也不会影响到之前的订阅者。只要之前约定的事件名没有变化,就可以自由地改变它们。
应用
DOM事件
实际上,只要在DOM节点上面绑定过事件函数,那就使用过发布—订阅模式。
document.body.addEventListener('click',function(){
alert(2) // 订阅click事件监听函数
},false)
document.body.click() //触发事件
当然我们可以随意增加或者删除订阅者,增加任何订阅者都不会影响发布者代码的编写。
document.body.addEventListener('click',function(){
alert(2);
},false);
document.body.addEventListener('click',function(){
alert(3);
},false);
document.body.addEventListener('click',function(){
alert(4);
},false);
document.body.click(); //模拟用户点击
发布订阅模式在实际开发中应用非常广泛,从架构上来看,无论是MVC、MVVM都少不了发布订阅模式的参加。
Promise/A+的实现
Promise实现遵循promise/A+规范,我们按照规范书写个Promise。
// promise 三个状态
const PENDING = "pending"
const FULFILLED = "fulfilled"
const REJECTED = "rejected"
function Promise(executor) {
let _this = this // 当前promise实例对象
_this.status = PENDING // 初始状态
_this.value = undefined // fulfilled状态时 返回的信息
_this.reason = undefined // rejected状态时 拒绝的原因
_this.onFulfilledCallbacks = [] // 存储fulfilled状态对应的onFulfilled函数
_this.onRejectedCallbacks = [] // 存储rejected状态对应的onRejected函数
function resolve(value) { // value成功态时接收的终值
if(value instanceof Promise) {
return value.then(resolve, reject)
}
/** 为什么resolve加setTimeout?
* 2.2.4规范 onFulfilled 和 onRejected 只允许在 execution context 栈仅包含平台代码时运行。
* 这里的平台代码指的是引擎、环境以及 promise 的实施代码。
* 实践中要确保 onFulfilled 和 onRejected 方法异步执行,且应该在 then 方法被调用的那一轮事件循环之后的新执行栈中执行。
*/
setTimeout(() => {
// 由pending状态 => fulfilled状态 (避免调用多次resolve reject)
if (_this.status === PENDING) {
_this.status = FULFILLED
_this.value = value
_this.onFulfilledCallbacks.forEach(cb => cb(_this.value))
}
})
}
function reject(reason) {
setTimeout(() => {
// 由pending状态 => rejected状态 (避免调用多次resolve reject)
if (_this.status === PENDING) {
_this.status = REJECTED
_this.reason = reason
_this.onRejectedCallbacks.forEach(cb => cb(_this.reason))
}
})
}
try {
executor(resolve, reject)
} catch (e) {
reject(e)
}
}
/**
* resolve中的值几种情况:
* 1.普通值
* 2.promise对象
* 3.thenable对象/函数
*/
/**
* 针对resolve中不同值情况 进行处理
* @param promise2 promise1.then方法返回的新的promise对象
* @param x promise1中onFulfilled的返回值
* @param resolve promise2的resolve方法
* @param reject promise2的reject方法
*/
function resolvePromise(promise2, x, resolve, reject) {
if (promise2 === x) {
return reject(new TypeError('循环引用'))
}
let called = false // 避免多次调用
if (x instanceof Promise) { // x是一个promise对象
if (x.status === PENDING) { // 如果为等待态需等待直至x被执行或拒绝 并解析y值
x.then(y => {
resolvePromise(promise2, y, resolve, reject)
}, reason => {
reject(reason)
})
} else { // 如果x已经处于执行态/拒绝态(值已经被解析为普通值),用相同的值执行传递下去 promise
x.then(resolve, reject)
}
// 如果x为对象或者函数
} else if (x != null && ((typeof x === 'object') || (typeof x === 'function'))) {
try { // 是否是thenable对象(具有then方法的对象/函数)
let then = x.then
if (typeof then === 'function') {
then.call(x, y => {
if(called) return
called = true
resolvePromise(promise2, y, resolve, reject)
}, reason => {
if(called) return
called = true
reject(reason)
})
} else { // 说明是一个普通对象/函数
resolve(x)
}
} catch(e) {
if(called) return
called = true
reject(e)
}
} else { // 普通值
resolve(x)
}
}
/**
* [注册fulfilled状态/rejected状态对应的回调函数]
* @param {function} onFulfilled,fulfilled状态时执行的函数,onRejected rejected状态时执行的函数
* @return {function} newPromsie 返回一个新的promise对象
*/
Promise.prototype.then = function(onFulfilled, onRejected) {
const _this = this
let newPromise
// 处理参数默认值 保证参数后续能够继续执行
onFulfilled =
typeof onFulfilled === "function" ? onFulfilled : value => value
onRejected =
typeof onRejected === "function" ? onRejected : reason => {
throw reason
}
if (_this.status === FULFILLED) {
return newPromise = new Promise((resolve, reject) => {
setTimeout(() => {
try{
let x = onFulfilled(_this.value)
resolvePromise(newPromise, x, resolve, reject)
} catch(e) {
reject(e)
}
})
})
}
if (_this.status === REJECTED) {
return newPromise = new Promise((resolve, reject) => {
setTimeout(() => {
try {
let x = onRejected(_this.reason)
resolvePromise(newPromise, x, resolve, reject)
} catch(e) {
reject(e)
}
})
})
}
if (_this.status === PENDING) {
// 当异步调用resolve/rejected时 将onFulfilled/onRejected收集暂存到集合中
return newPromise = new Promise((resolve, reject) => {
_this.onFulfilledCallbacks.push((value) => {
try {
let x = onFulfilled(value)
resolvePromise(newPromise, x, resolve, reject)
} catch(e) {
reject(e)
}
})
_this.onRejectedCallbacks.push((reason) => {
try {
let x = onRejected(reason)
resolvePromise(newPromise, x, resolve, reject)
} catch(e) {
reject(e)
}
})
})
}
}
/**
* Promise.all Promise进行并行处理
* 参数: promise对象组成的数组作为参数
* 返回值: 返回一个Promise实例
* 当这个数组里的所有promise对象全部变为resolve状态的时候,才会resolve。
*/
Promise.all = function(promises) {
return new Promise((resolve, reject) => {
let done = gen(promises.length, resolve)
promises.forEach((promise, index) => {
promise.then((value) => {
done(index, value)
}, reject)
})
})
}
function gen(length, resolve) {
let count = 0
let values = []
return function(i, value) {
values[i] = value
if (++count === length) {
console.log(values)
resolve(values)
}
}
}
/**
* Promise.race
* 参数: 接收 promise对象组成的数组作为参数
* 返回值: 返回一个Promise实例
* 只要有一个promise对象进入 FulFilled 或者 Rejected 状态的话,就会继续进行后面的处理(取决于哪一个更快)
*/
Promise.race = function(promises) {
return new Promise((resolve, reject) => {
promises.forEach((promise, index) => {
promise.then(resolve, reject)
})
})
}
// 用于promise方法链时 捕获前面onFulfilled/onRejected抛出的异常
Promise.prototype.catch = function(onRejected) {
return this.then(null, onRejected)
}
Promise.resolve = function (value) {
return new Promise(resolve => {
resolve(value)
})
}
Promise.reject = function (reason) {
return new Promise((resolve, reject) => {
reject(reason)
})
}
/**
* 基于Promise实现Deferred的
* Deferred和Promise的关系
* - Deferred 拥有 Promise
* - Deferred 具备对 Promise的状态进行操作的特权方法(resolve reject)
*/
Promise.deferred = function() { // 延迟对象
let defer = {}
defer.promise = new Promise((resolve, reject) => {
defer.resolve = resolve
defer.reject = reject
})
return defer
}
try {
module.exports = Promise
} catch (e) {
}
代码参考Promise详解与实现(Promise/A+规范)
MVVM框架
Vue的数据双向绑定实现就采用了订阅发布者模式。核心实现由发布订阅类Dep 、数据劫持类 Observer 、观察者类Watcher 。
发布订阅模式把要执行的函数统一存储在一个数组中管理,当达到某个执行条件时,循环这个数组并执行每一个成员。
class Dep {
constructor() {
this.subs = [];
}
// 添加订阅
addSub(watcher) {
this.subs.push(watcher);
}
// 通知
notify() {
this.subs.forEach(watcher => watcher.update());
}
}
class Observer {
constructor (data) {
this.observe(data);
}
// 添加数据监听
observe(data) {
if(!data || typeof data !== 'object') {
return;
}
Object.keys(data).forEach(key => {
// 劫持(实现数据响应式)
this.defineReactive(data, key, data[key]);
this.observe(data[key]); // 深度劫持
});
}
// 数据响应式
defineReactive (object, key, value) {
let _this = this;
// 每个变化的数据都会对应一个数组,这个数组是存放所有更新的操作
let dep = new Dep();
// 获取某个值被监听到
Object.defineProperty(object, key, {
enumerable: true,
configurable: true,
get () { // 当取值时调用的方法
Dep.target && dep.addSub(Dep.target);
return value;
},
set (newValue) { // 当给 data 属性中设置的值适合,更改获取的属性的值
if(newValue !== value) {
_this.observe(newValue); // 重新赋值如果是对象进行深度劫持
value = newValue;
dep.notify(); // 通知所有人数据更新了
}
}
});
}
}
class Watcher {
constructor(vm, exp, callback) {
this.vm = vm;
this.exp = exp;
this.callback = callback;
// 更改前的值
this.value = this.get();
}
get() {
// 将当前的 watcher 添加到 Dep 类的静态属性上
Dep.target = this;
// 获取值触发数据劫持
let value = CompileUtil.getVal(this.vm, this.exp);
// 清空 Dep 上的 Watcher,防止重复添加
Dep.target = null;
return value;
}
update() {
// 获取新值
let newValue = CompileUtil.getVal(this.vm, this.exp);
// 获取旧值
let oldValue = this.value;
// 如果新值和旧值不相等,就执行 callback 对 dom 进行更新
if(newValue !== oldValue) {
this.callback(newValue);
}
}
}
完整的代码请看模拟 Vue 手写一个 MVVM
Redux
Redux
createStore()
设计采用了订阅发布模式,具体代码如下:
const createStore = (reducer) => {
let state
let listeners = []
const getState = () => state
const dispatch = (action) => {
state = reducer(state, action)
listeners.forEach(listener => listener())
}
const subscribe = (listener) => {
listeners.push(listener)
return () => {
listeners = listeners.filter(item => item !== listener)
}
}
dispatch({})
return {getState, dispatch, subscribe}
}
// reducer 纯函数,具体的action执行逻辑
const counter = (state = 0, action) => {
switch (action.type) {
case 'INCREMENT':
return state + 1;
case 'DECREMENT':
return state - 1;
default:
return state;
}
}
const store = createStore(counter);
//把数据渲染到界面上
const render = () => {
document.body.innerText = store.getState();
}
// 订阅状态变化事件,当状态变化时用监听函数
store.subscribe(render);
render();
var INCREASE_ACTION = {type: 'INCREMENT'};
document.addEventListener('click', function (e) {
//触发一个Action
store.dispatch(INCREASE_ACTION);
})
小结
发布-订阅模式的优点非常明显,一为时间上的解耦,二为对象上的解耦。它的应用非常广泛,既可以用在异步编程中,也可以帮助我们完成更松耦合的代码编写。
当然,发布—订阅模式也不是完全没有缺点。创建订阅者本身要消耗一定的时间和内存,而且订阅一个消息后,也许此消息最后都未发生,但这个订阅者会始终存在于内存中。另外,发布—订阅模式虽然可以弱化对象之间的联系,但如果过度使用的话,对象和对象之间的必要联系也将被深埋在背后,会导致程序难以跟踪维护和理解。特别是有多个发布者和订阅者嵌套到一起的时候,要跟踪一个bug不是件轻松的事情。
参考文献
《JavaScript设计模式与开发实践》