如何做好【订阅管理】?

订阅发布模式(Subscribe/Publish)

订阅发布模式定义了一种一对多的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知所有订阅者对象,使它们能够自动更新自己的状态。

一般来说,订阅有两种类型:

频道的发布订阅

模式的发布订阅

我们今天首先来说一说频道订阅,即用户关注订阅该频道之后,系统会根据发布者分发消息时分发到用户的邮箱,用户退出网页系统也会继续发送订阅消息,除非用户取消订阅才会停止。

其流程如下:

通信模式

RedisServer中可以创建若干channel

一个订阅者可以订阅多个channel

当发布者向一个频道中发布一条消息时,所有的订阅者都将会收到消息

Redis的发布订阅模型没有消息积压功能,即新加入的订阅者收不到发布者之前发布的消息

当订阅者收到消息时,消息内容如下

第一行:固定内容message

第二行:channel的名称

第三行:收到的新消息

发布与订阅

Redis 的发布与订阅功能由 PUBLISH(发布), SUBSCRIBE(订阅),CHANNEL(频道) 组成。

PUBLISH:

发送订阅消息,服务器接收到该命令之后,先遍历pubsub_channels找出频道订阅者,把消息发送给所有频道订阅者,然后遍历pubsub_patterns找出与channel匹配的模式,并将消息发送给订阅了这些模式的客户端。

 SUBSCRIBE:

当一个客户端执行了 SUBSCRIBE(订阅) ,这个客户端将与被订阅频道之间就建立起了一种订阅关系.以键值对模式建立相关链接,存入下面结构中, 键为字符串对象, 值为链表对象.

struct redisServer{

    dict* pubsub_channels;

};

频道的订阅(SUBSCRIBE)

主要规则就是判断当前是否有对应键,如果没有直接创建添加,如果有就加入末尾

* Subscribe a client to a channel. Returns 1 if the operation succeeded, or

* 0 if the client was already subscribed to that channel.

* 设置客户端 c 订阅频道 channel 。

* 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。

*/

int pubsubSubscribeChannel(redisClient *c, robj *channel) {

    dictEntry *de;

    list *clients = NULL;

    int retval = 0;

    /* Add the channel to the client -> channels hash table */

    // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)

    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {

        retval = 1;

        incrRefCount(channel);

        // 关联示意图

        // {

        //  频道名        订阅频道的客户端

        //  'channel-a' : [c1, c2, c3],

        //  'channel-b' : [c5, c2, c1],

        //  'channel-c' : [c10, c2, c1]

        // }

        /* Add the client to the channel -> list of clients hash table */

        // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表

        // 如果 channel 不存在于字典,那么添加进去

        de = dictFind(server.pubsub_channels,channel);

        if (de == NULL) {

            clients = listCreate();

            dictAdd(server.pubsub_channels,channel,clients);

            incrRefCount(channel);

        } else {

            clients = dictGetVal(de);

        }

        // before:

        // 'channel' : [c1, c2]

        // after:

        // 'channel' : [c1, c2, c3]

        // 将客户端添加到链表的末尾

        listAddNodeTail(clients,c);

    }

    /* Notify the client */

    // 回复客户端。

    // 示例:

    // redis 127.0.0.1:6379> SUBSCRIBE xxx

    // Reading messages... (press Ctrl-C to quit)

    // 1) "subscribe"

    // 2) "xxx"

    // 3) (integer) 1

    addReply(c,shared.mbulkhdr[3]);

    // "subscribe\n" 字符串

    addReply(c,shared.subscribebulk);

    // 被订阅的客户端

    addReplyBulk(c,channel);

    // 客户端订阅的频道和模式总数

    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;

}

频道的退订(UNSUBSCRIBE)

当一个客户端执行了 UNSUBSCRIBE(取消订阅) ,这个客户端将与被订阅频道之间建立起的订阅关系不再存在.简单的来说,从dict 中链表中删除对应的客户端节点.

void unsubscribeCommand(redisClient *c) {

    if (c->argc == 1) {

        pubsubUnsubscribeAllChannels(c,1);

    } else {

        int j;

        for (j = 1; j < c->argc; j++)

            pubsubUnsubscribeChannel(c,c->argv[j],1);

    }

}

/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or

* 0 if the client was not subscribed to the specified channel.

*

* 客户端 c 退订频道 channel 。

*

* 如果取消成功返回 1 ,如果因为客户端未订阅频道,而造成取消失败,返回 0 。

*/

int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {

    dictEntry *de;

    list *clients;

    listNode *ln;

    int retval = 0;

    /* Remove the channel from the client -> channels hash table */

    // 将频道 channel 从 client->channels 字典中移除

    incrRefCount(channel); /* channel may be just a pointer to the same object

                            we have in the hash tables. Protect it... */

    // 示意图:

    // before:

    // {

    //  'channel-x': NULL,

    //  'channel-y': NULL,

    //  'channel-z': NULL,

    // }

    // after unsubscribe channel-y :

    // {

    //  'channel-x': NULL,

    //  'channel-z': NULL,

    // }

    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {

        // channel 移除成功,表示客户端订阅了这个频道,执行以下代码

        retval = 1;

        /* Remove the client from the channel -> clients list hash table */

        // 从 channel->clients 的 clients 链表中,移除 client

        // 示意图:

        // before:

        // {

        //  'channel-x' : [c1, c2, c3],

        // }

        // after c2 unsubscribe channel-x:

        // {

        //  'channel-x' : [c1, c3]

        // }

        de = dictFind(server.pubsub_channels,channel);

        redisAssertWithInfo(c,NULL,de != NULL);

        clients = dictGetVal(de);

        ln = listSearchKey(clients,c);

        redisAssertWithInfo(c,NULL,ln != NULL);

        listDelNode(clients,ln);

        // 如果移除 client 之后链表为空,那么删除这个 channel 键

        // 示意图:

        // before

        // {

        //  'channel-x' : [c1]

        // }

        // after c1 ubsubscribe channel-x

        // then also delete 'channel-x' key in dict

        // {

        //  // nothing here

        // }

        if (listLength(clients) == 0) {

            /* Free the list and associated hash entry at all if this was

            * the latest client, so that it will be possible to abuse

            * Redis PUBSUB creating millions of channels. */

            dictDelete(server.pubsub_channels,channel);

        }

    }

    /* Notify the client */

    // 回复客户端

    if (notify) {

        addReply(c,shared.mbulkhdr[3]);

        // "ubsubscribe" 字符串

        addReply(c,shared.unsubscribebulk);

        // 被退订的频道

        addReplyBulk(c,channel);

        // 退订频道之后客户端仍在订阅的频道和模式的总数

        addReplyLongLong(c,dictSize(c->pubsub_channels)+

                      listLength(c->pubsub_patterns));

    }

    decrRefCount(channel); /* it is finally safe to release it */

    return retval;

}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容