redis 发布订阅实现

系列

redis数据淘汰原理
redis过期数据删除策略
redis server事件模型
redis cluster mget 引发的讨论
redis 3.x windows 集群搭建
redis 命令执行过程
redis string底层数据结构
redis list底层数据结构
redis hash底层数据结构
redis set底层数据结构
redis zset底层数据结构
redis 客户端管理
redis 主从同步-slave端
redis 主从同步-master端
redis 主从超时检测
redis aof持久化
redis rdb持久化
redis 数据恢复过程
redis TTL实现原理
redis cluster集群建立
redis cluster集群选主

 redis的pub/sub当中主要分为subscribe/unsubscribe&psubscribe/punsubscribe两种模式,在这两种模式下支持publish命令。
 本篇文章的主要目的是为了讲解清楚这三个过程,包括整个执行过程以及相关的数据结构。

subscribe 数据结构

 首先subscribe的数据结构主要如下图,其中subscribe对应的pubsub_channels其实是一个hashtable的数据结构,其中key为channel的名字,value为订阅了这个channel的客户端连接。

subscribe数据结构


subscribe 过程

 当我们执行subscribe channelA channelB的命令的时候,在内部执行的时候实际上是分两次执行的,从下面的for循环我们可以看出来,相当于对同一个client的多个subscribe命令一次进行订阅处理。

void subscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
}



 在执行pubsubSubscribeChannel的整个过程分为以下几个阶段:

  • 将channel关联到client当中的pubsub_channels当中
  • 根据channel去server的pubsub_channels中查找对应的client的list列表
  • 不存在就创建client的list列表并添加到hashtable当中
  • 将client添加到channel对应的client list当中
/* 
 * 设置客户端 c 订阅频道 channel 。
 *
 * 订阅成功返回 1 ,如果客户端已经订阅了该频道,那么返回 0 。
 */
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // 将 channels 填接到 c->pubsub_channels 的集合中(值为 NULL 的字典视为集合)
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);

        // 关联示意图
        // {
        //  频道名        订阅频道的客户端
        //  'channel-a' : [c1, c2, c3],
        //  'channel-b' : [c5, c2, c1],
        //  'channel-c' : [c10, c2, c1]
        // }
        /* Add the client to the channel -> list of clients hash table */
        // 从 pubsub_channels 字典中取出保存着所有订阅了 channel 的客户端的链表
        // 如果 channel 不存在于字典,那么添加进去
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }

        // before:
        // 'channel' : [c1, c2]
        // after:
        // 'channel' : [c1, c2, c3]
        // 将客户端添加到链表的末尾
        listAddNodeTail(clients,c);
    }

    /* Notify the client */
    // 回复客户端。
    // 示例:
    // redis 127.0.0.1:6379> SUBSCRIBE xxx
    // Reading messages... (press Ctrl-C to quit)
    // 1) "subscribe"
    // 2) "xxx"
    // 3) (integer) 1
    addReply(c,shared.mbulkhdr[3]);
    // "subscribe\n" 字符串
    addReply(c,shared.subscribebulk);
    // 被订阅的客户端
    addReplyBulk(c,channel);
    // 客户端订阅的频道和模式总数
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;
}
     /* Pubsub */
    // 字典,键为频道,值为链表
    // 链表中保存了所有订阅某个频道的客户端
    // 新客户端总是被添加到链表的表尾
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */

    // 这个链表记录了客户端订阅的所有模式的名字
    list *pubsub_patterns;  /* A list of pubsub_patterns */


unsubscribe 过程

 在unsubscribeCommand过程中,区分了两种场景,一个是取消client相关的所有订阅,一个是取消client关联的某几个订阅,但是殊途同归最终都会执行pubsubUnsubscribeChannel取消订阅的。

void unsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
}



 在取消client的所有订阅的过程中,我们通过遍历client的pubsub_channels的所有channel,然后依次进行取消订阅操作。

/* 
 * 退订客户端 c 订阅的所有频道。
 *
 * 返回被退订频道的总数。
 */
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {

    // 频道迭代器
    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
    dictEntry *de;
    int count = 0;

    // 退订
    while((de = dictNext(di)) != NULL) {
        robj *channel = dictGetKey(de);

        count += pubsubUnsubscribeChannel(c,channel,notify);
    }

    /* We were subscribed to nothing? Still reply to the client. */
    // 如果在执行这个函数时,客户端没有订阅任何频道,
    // 那么向客户端发送回复
    if (notify && count == 0) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReply(c,shared.nullbulk);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }

    dictReleaseIterator(di);

    // 被退订的频道的数量
    return count;
}



 取消订阅的过程主要是订阅过程的逆向操作,整个过程如下:

  • 从client的pubsub_channels中删除channel
  • 从server的pubsub_channels查找channel对应的client list对象
  • 从client list对象中删除client对象
  • 如果client list此时为空,就从server.pubsub_channels中删除对应的channel
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was not subscribed to the specified channel. 
 *
 * 客户端 c 退订频道 channel 。
 *
 * 如果取消成功返回 1 ,如果因为客户端未订阅频道,而造成取消失败,返回 0 。
 */
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
    dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;

    /* Remove the channel from the client -> channels hash table */
    // 将频道 channel 从 client->channels 字典中移除
    incrRefCount(channel); /* channel may be just a pointer to the same object
                            we have in the hash tables. Protect it... */
    // 示意图:
    // before:
    // {
    //  'channel-x': NULL,
    //  'channel-y': NULL,
    //  'channel-z': NULL,
    // }
    // after unsubscribe channel-y :
    // {
    //  'channel-x': NULL,
    //  'channel-z': NULL,
    // }
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {

        // channel 移除成功,表示客户端订阅了这个频道,执行以下代码

        retval = 1;
        /* Remove the client from the channel -> clients list hash table */
        // 从 channel->clients 的 clients 链表中,移除 client 
        // 示意图:
        // before:
        // {
        //  'channel-x' : [c1, c2, c3],
        // }
        // after c2 unsubscribe channel-x:
        // {
        //  'channel-x' : [c1, c3]
        // }
        de = dictFind(server.pubsub_channels,channel);
        redisAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        redisAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);

        // 如果移除 client 之后链表为空,那么删除这个 channel 键
        // 示意图:
        // before
        // {
        //  'channel-x' : [c1]
        // }
        // after c1 ubsubscribe channel-x
        // then also delete 'channel-x' key in dict
        // {
        //  // nothing here
        // }
        if (listLength(clients) == 0) {
            /* Free the list and associated hash entry at all if this was
             * the latest client, so that it will be possible to abuse
             * Redis PUBSUB creating millions of channels. */
            dictDelete(server.pubsub_channels,channel);
        }
    }

    /* Notify the client */
    // 回复客户端
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        // "ubsubscribe" 字符串
        addReply(c,shared.unsubscribebulk);
        // 被退订的频道
        addReplyBulk(c,channel);
        // 退订频道之后客户端仍在订阅的频道和模式的总数
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));

    }

    decrRefCount(channel); /* it is finally safe to release it */

    return retval;
}


psubscribe 过程

 psubscribe是指支持pattern模式的subscribe,跟subscribe一样,内部也是依次执行订阅的。

void psubscribeCommand(redisClient *c) {
    int j;

    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
}



 psubscribe保存订阅关系是通过list对象进行保存的,整个订阅过程操作顺序如下:

  • 首先检查client本身的pubsub_patterns列表当中是否存在pattern
  • 不存在就添加到client.pubsub_patterns
  • 不存在就添加到server.pubsub_patterns
/*
 * 设置客户端 c 订阅模式 pattern 。
 *
 * 订阅成功返回 1 ,如果客户端已经订阅了该模式,那么返回 0 。
 */
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
    int retval = 0;

    // 在链表中查找模式,看客户端是否已经订阅了这个模式
    // 这里为什么不像 channel 那样,用字典来进行检测呢?
    // 虽然 pattern 的数量一般来说并不多
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        
        // 如果没有的话,执行以下代码

        retval = 1;

        pubsubPattern *pat;

        // 将 pattern 添加到 c->pubsub_patterns 链表中
        listAddNodeTail(c->pubsub_patterns,pattern);

        incrRefCount(pattern);

        // 创建并设置新的 pubsubPattern 结构
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;

        // 添加到末尾
        listAddNodeTail(server.pubsub_patterns,pat);
    }

    /* Notify the client */
    // 回复客户端。
    // 示例:
    // redis 127.0.0.1:6379> PSUBSCRIBE xxx*
    // Reading messages... (press Ctrl-C to quit)
    // 1) "psubscribe"
    // 2) "xxx*"
    // 3) (integer) 1
    addReply(c,shared.mbulkhdr[3]);
    // 回复 "psubscribe" 字符串
    addReply(c,shared.psubscribebulk);
    // 回复被订阅的模式
    addReplyBulk(c,pattern);
    // 回复客户端订阅的频道和模式的总数
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));

    return retval;
}


punsubscribe 过程

 punsubscribe也区分全部取消订阅和部分取消订阅,当然殊途同归最终都会走到pubsubUnsubscribePattern进行取消订阅。

void punsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllPatterns(c,1);
    } else {
        int j;

        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribePattern(c,c->argv[j],1);
    }
}



 遍历c->pubsub_patterns的所有pattern依次进行取消操作。

/* 
 * 退订客户端 c 订阅的所有模式。
 *
 * 返回被退订模式的数量。
 */
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
    listNode *ln;
    listIter li;
    int count = 0;

    // 迭代客户端订阅模式的链表
    listRewind(c->pubsub_patterns,&li);
    while ((ln = listNext(&li)) != NULL) {
        robj *pattern = ln->value;

        // 退订,并计算退订数
        count += pubsubUnsubscribePattern(c,pattern,notify);
    }

    // 如果在执行这个函数时,客户端没有订阅任何模式,
    // 那么向客户端发送回复
    if (notify && count == 0) {
        /* We were subscribed to nothing? Still reply to the client. */
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.punsubscribebulk);
        addReply(c,shared.nullbulk);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }

    // 退订总数
    return count;
}



 取消订阅的过程主要包括两个步骤:

  • c->pubsub_patterns当中删除对应的pattern
  • server.pubsub_patterns当中删除对应的pattern
/*
 * 取消客户端 c 对模式 pattern 的订阅。
 *
 * 取消成功返回 1 ,因为客户端未订阅 pattern 而造成取消失败,返回 0 。
 */
int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
    listNode *ln;
    pubsubPattern pat;
    int retval = 0;

    incrRefCount(pattern); /* Protect the object. May be the same we remove */

    // 先确认一下,客户端是否订阅了这个模式
    if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {

        retval = 1;

        // 将模式从客户端的订阅列表中删除
        listDelNode(c->pubsub_patterns,ln);

        // 设置 pubsubPattern 结构
        pat.client = c;
        pat.pattern = pattern;

        // 在服务器中查找
        ln = listSearchKey(server.pubsub_patterns,&pat);
        listDelNode(server.pubsub_patterns,ln);
    }

    /* Notify the client */
    // 回复客户端
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        // "punsubscribe" 字符串
        addReply(c,shared.punsubscribebulk);
        // 被退订的模式
        addReplyBulk(c,pattern);
        // 退订频道之后客户端仍在订阅的频道和模式的总数
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }

    decrRefCount(pattern);

    return retval;
}


publish 过程

 发送消息的过程很简单,也就是遍历server.pubsub_channels 和 server.pubsub_patterns然后匹配对应的channel,然后取出channel当中的client的list列表发送消息即可。

/* Publish a message 
 *
 * 将 message 发送到所有订阅频道 channel 的客户端,
 * 以及所有订阅了和 channel 频道匹配的模式的客户端。
 */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    // 取出包含所有订阅频道 channel 的客户端的链表
    // 并将消息发送给它们
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        // 遍历客户端链表,将 message 发送给它们
        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;

            // 回复客户端。
            // 示例:
            // 1) "message"
            // 2) "xxx"
            // 3) "hello"
            addReply(c,shared.mbulkhdr[3]);
            // "message" 字符串
            addReply(c,shared.messagebulk);
            // 消息的来源频道
            addReplyBulk(c,channel);
            // 消息内容
            addReplyBulk(c,message);

            // 接收客户端计数
            receivers++;
        }
    }

    /* Send to clients listening to matching channels */
    // 将消息也发送给那些和频道匹配的模式
    if (listLength(server.pubsub_patterns)) {

        // 遍历模式链表
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {

            // 取出 pubsubPattern
            pubsubPattern *pat = ln->value;

            // 如果 channel 和 pattern 匹配
            // 就给所有订阅该 pattern 的客户端发送消息
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {

                // 回复客户端
                // 示例:
                // 1) "pmessage"
                // 2) "*"
                // 3) "xxx"
                // 4) "hello"
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);

                // 对接收消息的客户端进行计数
                receivers++;
            }
        }

        decrRefCount(channel);
    }

    // 返回计数
    return receivers;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容