订阅发布模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。
一般来说,订阅有两种类型:
频道的发布订阅
模式的发布订阅
我们今天首先来说一说频道订阅,即用户关注订阅该频道之后,系统会根据发布者分发消息时分发到用户的邮箱,用户退出网页系统也会继续发送订阅消息,除非用户取消订阅才会停止。
其流程如下:
通信模式
RedisServer中可以创建若干channel
一个订阅者可以订阅多个channel
当发布者向一个频道中发布一条消息时,所有的订阅者都将会收到消息
Redis的发布订阅模型没有消息积压功能,即新加入的订阅者收不到发布者之前发布的消息
当订阅者收到消息时,消息内容如下
第一行:固定内容message
第二行:channel的名称
第三行:收到的新消息
发布与订阅
Redis 的发布与订阅功能由 PUBLISH(发布), SUBSCRIBE(订阅),CHANNEL(频道) 组成。
PUBLISH:
发送订阅消息,服务器接收到该命令之后,先遍历pubsub_channels找出频道订阅者,把消息发送给所有频道订阅者,然后遍历pubsub_patterns找出与channel匹配的模式,并将消息发送给订阅了这些模式的客户端。
SUBSCRIBE:
当一个客户端执行了 SUBSCRIBE(订阅) ,这个客户端将与被订阅频道之间就建立起了一种订阅关系.以键值对模式建立相关链接,存入下面结构中, 键为字符串对象, 值为链表对象.
struct redisServer{
dict* pubsub_channels;
};
频道的订阅(SUBSCRIBE)
主要规则就是判断当前是否有对应键,如果没有直接创建添加,如果有就加入末尾
* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel.
* 设置客户端 c 订阅频道 channel 。
* 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。
*/
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
// 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
// 关联示意图
// {
// 频道名 订阅频道的客户端
// 'channel-a' : [c1, c2, c3],
// 'channel-b' : [c5, c2, c1],
// 'channel-c' : [c10, c2, c1]
// }
/* Add the client to the channel -> list of clients hash table */
// 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
// 如果 channel 不存在于字典,那么添加进去
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
// before:
// 'channel' : [c1, c2]
// after:
// 'channel' : [c1, c2, c3]
// 将客户端添加到链表的末尾
listAddNodeTail(clients,c);
}
/* Notify the client */
// 回复客户端。
// 示例:
// redis 127.0.0.1:6379> SUBSCRIBE xxx
// Reading messages... (press Ctrl-C to quit)
// 1) "subscribe"
// 2) "xxx"
// 3) (integer) 1
addReply(c,shared.mbulkhdr[3]);
// "subscribe\n" 字符串
addReply(c,shared.subscribebulk);
// 被订阅的客户端
addReplyBulk(c,channel);
// 客户端订阅的频道和模式总数
addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
return retval;
}
频道的退订(UNSUBSCRIBE)
当一个客户端执行了 UNSUBSCRIBE(取消订阅) ,这个客户端将与被订阅频道之间建立起的订阅关系不再存在.简单的来说,从dict 中链表中删除对应的客户端节点.
void unsubscribeCommand(redisClient *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
} else {
int j;
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
}
}
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel.
*
* 客户端 c 退订频道 channel 。
*
* 如果取消成功返回 1 ,如果因为客户端未订阅频道,而造成取消失败,返回 0 。
*/
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
int retval = 0;
/* Remove the channel from the client -> channels hash table */
// 将频道 channel 从 client->channels 字典中移除
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
// 示意图:
// before:
// {
// 'channel-x': NULL,
// 'channel-y': NULL,
// 'channel-z': NULL,
// }
// after unsubscribe channel-y :
// {
// 'channel-x': NULL,
// 'channel-z': NULL,
// }
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
// channel 移除成功,表示客户端订阅了这个频道,执行以下代码
retval = 1;
/* Remove the client from the channel -> clients list hash table */
// 从 channel->clients 的 clients 链表中,移除 client
// 示意图:
// before:
// {
// 'channel-x' : [c1, c2, c3],
// }
// after c2 unsubscribe channel-x:
// {
// 'channel-x' : [c1, c3]
// }
de = dictFind(server.pubsub_channels,channel);
redisAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
redisAssertWithInfo(c,NULL,ln != NULL);
listDelNode(clients,ln);
// 如果移除 client 之后链表为空,那么删除这个 channel 键
// 示意图:
// before
// {
// 'channel-x' : [c1]
// }
// after c1 ubsubscribe channel-x
// then also delete 'channel-x' key in dict
// {
// // nothing here
// }
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(server.pubsub_channels,channel);
}
}
/* Notify the client */
// 回复客户端
if (notify) {
addReply(c,shared.mbulkhdr[3]);
// "ubsubscribe" 字符串
addReply(c,shared.unsubscribebulk);
// 被退订的频道
addReplyBulk(c,channel);
// 退订频道之后客户端仍在订阅的频道和模式的总数
addReplyLongLong(c,dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns));
}
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}