如何做好【订阅管理】?

订阅发布模式(Subscribe/Publish)

订阅发布模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。

一般来说,订阅有两种类型:

频道的发布订阅

模式的发布订阅

我们今天首先来说一说频道订阅,即用户关注订阅该频道之后,系统会根据发布者分发消息时分发到用户的邮箱,用户退出网页系统也会继续发送订阅消息,除非用户取消订阅才会停止。

其流程如下:

通信模式

RedisServer中可以创建若干channel

一个订阅者可以订阅多个channel

当发布者向一个频道中发布一条消息时,所有的订阅者都将会收到消息

Redis的发布订阅模型没有消息积压功能,即新加入的订阅者收不到发布者之前发布的消息

当订阅者收到消息时,消息内容如下

第一行:固定内容message

第二行:channel的名称

第三行:收到的新消息

发布与订阅

Redis 的发布与订阅功能由 PUBLISH(发布), SUBSCRIBE(订阅),CHANNEL(频道) 组成。

PUBLISH:

发送订阅消息,服务器接收到该命令之后,先遍历pubsub_channels找出频道订阅者,把消息发送给所有频道订阅者,然后遍历pubsub_patterns找出与channel匹配的模式,并将消息发送给订阅了这些模式的客户端。

 SUBSCRIBE:

当一个客户端执行了 SUBSCRIBE(订阅) ,这个客户端将与被订阅频道之间就建立起了一种订阅关系.以键值对模式建立相关链接,存入下面结构中, 键为字符串对象, 值为链表对象.

struct redisServer{

    dict* pubsub_channels;

};

频道的订阅(SUBSCRIBE)

主要规则就是判断当前是否有对应键,如果没有直接创建添加,如果有就加入末尾

* Subscribe a client to a channel. Returns 1 if the operation succeeded, or

* 0 if the client was already subscribed to that channel.

* 设置客户端 c 订阅频道 channel 。

* 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。

*/

int pubsubSubscribeChannel(redisClient *c, robj *channel) {

    dictEntry *de;

    list *clients = NULL;

    int retval = 0;

    /* Add the channel to the client -> channels hash table */

    // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)

    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {

        retval = 1;

        incrRefCount(channel);

        // 关联示意图

        // {

        //  频道名        订阅频道的客户端

        //  'channel-a' : [c1, c2, c3],

        //  'channel-b' : [c5, c2, c1],

        //  'channel-c' : [c10, c2, c1]

        // }

        /* Add the client to the channel -> list of clients hash table */

        // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表

        // 如果 channel 不存在于字典,那么添加进去

        de = dictFind(server.pubsub_channels,channel);

        if (de == NULL) {

            clients = listCreate();

            dictAdd(server.pubsub_channels,channel,clients);

            incrRefCount(channel);

        } else {

            clients = dictGetVal(de);

        }

        // before:

        // 'channel' : [c1, c2]

        // after:

        // 'channel' : [c1, c2, c3]

        // 将客户端添加到链表的末尾

        listAddNodeTail(clients,c);

    }

    /* Notify the client */

    // 回复客户端。

    // 示例:

    // redis 127.0.0.1:6379> SUBSCRIBE xxx

    // Reading messages... (press Ctrl-C to quit)

    // 1) "subscribe"

    // 2) "xxx"

    // 3) (integer) 1

    addReply(c,shared.mbulkhdr[3]);

    // "subscribe\n" 字符串

    addReply(c,shared.subscribebulk);

    // 被订阅的客户端

    addReplyBulk(c,channel);

    // 客户端订阅的频道和模式总数

    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;

}

频道的退订(UNSUBSCRIBE)

当一个客户端执行了 UNSUBSCRIBE(取消订阅) ,这个客户端将与被订阅频道之间建立起的订阅关系不再存在.简单的来说,从dict 中链表中删除对应的客户端节点.

void unsubscribeCommand(redisClient *c) {

    if (c->argc == 1) {

        pubsubUnsubscribeAllChannels(c,1);

    } else {

        int j;

        for (j = 1; j < c->argc; j++)

            pubsubUnsubscribeChannel(c,c->argv[j],1);

    }

}

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or

* 0 if the client was not subscribed to the specified channel.

*

* 客户端 c 退订频道 channel 。

*

* 如果取消成功返回 1 ,如果因为客户端未订阅频道,而造成取消失败,返回 0 。

*/

int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {

    dictEntry *de;

    list *clients;

    listNode *ln;

    int retval = 0;

    /* Remove the channel from the client -> channels hash table */

    // 将频道 channel 从 client->channels 字典中移除

    incrRefCount(channel); /* channel may be just a pointer to the same object

                            we have in the hash tables. Protect it... */

    // 示意图:

    // before:

    // {

    //  'channel-x': NULL,

    //  'channel-y': NULL,

    //  'channel-z': NULL,

    // }

    // after unsubscribe channel-y :

    // {

    //  'channel-x': NULL,

    //  'channel-z': NULL,

    // }

    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {

        // channel 移除成功,表示客户端订阅了这个频道,执行以下代码

        retval = 1;

        /* Remove the client from the channel -> clients list hash table */

        // 从 channel->clients 的 clients 链表中,移除 client

        // 示意图:

        // before:

        // {

        //  'channel-x' : [c1, c2, c3],

        // }

        // after c2 unsubscribe channel-x:

        // {

        //  'channel-x' : [c1, c3]

        // }

        de = dictFind(server.pubsub_channels,channel);

        redisAssertWithInfo(c,NULL,de != NULL);

        clients = dictGetVal(de);

        ln = listSearchKey(clients,c);

        redisAssertWithInfo(c,NULL,ln != NULL);

        listDelNode(clients,ln);

        // 如果移除 client 之后链表为空,那么删除这个 channel 键

        // 示意图:

        // before

        // {

        //  'channel-x' : [c1]

        // }

        // after c1 ubsubscribe channel-x

        // then also delete 'channel-x' key in dict

        // {

        //  // nothing here

        // }

        if (listLength(clients) == 0) {

            /* Free the list and associated hash entry at all if this was

            * the latest client, so that it will be possible to abuse

            * Redis PUBSUB creating millions of channels. */

            dictDelete(server.pubsub_channels,channel);

        }

    }

    /* Notify the client */

    // 回复客户端

    if (notify) {

        addReply(c,shared.mbulkhdr[3]);

        // "ubsubscribe" 字符串

        addReply(c,shared.unsubscribebulk);

        // 被退订的频道

        addReplyBulk(c,channel);

        // 退订频道之后客户端仍在订阅的频道和模式的总数

        addReplyLongLong(c,dictSize(c->pubsub_channels)+

                      listLength(c->pubsub_patterns));

    }

    decrRefCount(channel); /* it is finally safe to release it */

    return retval;

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容