Redis学习之发布与订阅

发布与订阅

一、介绍

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

  • SUBSCRIBE channel [channel ...] 订阅1-N个频道

  • UNSUBSCRIBE [channel [channel ...]] 指示客户端退订给定的频道。

  • PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合给定模式的频道。

  • PUNSUBSCRIBE [pattern [pattern ...]] 指示客户端退订所有给定模式。

  • PUBLISH channel message 将信息 message 发送到指定的频道 channel 。

  • PUBSUB CHANNELS [pattern] 列出当前的活跃频道。

  • PUBSUB NUMSUB [channel-1 ... channel-N] 返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。

  • PUBSUB NUMPAT 返回订阅模式的数量

二、数据结构

redis服务器通过一个(频道名称->客户端链表)字典,和一个(匹配模式)链表来完成发布与订阅的功能。

struct redisServer {
.....
    dict *pubsub_channels;  /*所有频道的订阅关系 字典 (频道名称->client链表)*/
    list *pubsub_patterns;  /*保存所有订阅频道模式链表 */
.....
}
//pubsub_patterns链表的Node
typedef struct pubsubPattern {
    client *client;//用户
    robj *pattern;//订阅的频道模式
} pubsubPattern;

三、实现

  • subscribeCommand

    //subscribe c c c ... c
    void subscribeCommand(client *c) {
        int j;
    
        for (j = 1; j < c->argc; j++)
            pubsubSubscribeChannel(c,c->argv[j]);//订阅每个频道
        c->flags |= CLIENT_PUBSUB;
    }
    //返回1 成功监听 返回0监听已经存在
    int pubsubSubscribeChannel(client *c, robj *channel) {
        dictEntry *de;
        list *clients = NULL;
        int retval = 0;
    
        if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {//自己的订阅频道字典加入
            retval = 1;
            incrRefCount(channel);
            /* Add the client to the channel -> list of clients hash table */
            de = dictFind(server.pubsub_channels,channel);//在服务器中订阅频道字典找
            if (de == NULL) {//没找到这个频道 创建
                clients = listCreate();
                dictAdd(server.pubsub_channels,channel,clients);
                incrRefCount(channel);
            } else {//找到拿出来
                clients = dictGetVal(de);
            }
            listAddNodeTail(clients,c);//加入到list里面去
        }
        addReplyPubsubSubscribed(c,channel);//通知客户端
        return retval;
    }
    
  • psubscribeCommand

    void psubscribeCommand(client *c) {
        int j;
    
        for (j = 1; j < c->argc; j++)
            pubsubSubscribePattern(c,c->argv[j]);//订阅匹配频道
        c->flags |= CLIENT_PUBSUB;
    }
    //加入订阅频道模式成功返回1,否则返回0
    int pubsubSubscribePattern(client *c, robj *pattern) {
        int retval = 0;
        if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {//没找到的话
            retval = 1;
            pubsubPattern *pat;
            listAddNodeTail(c->pubsub_patterns,pattern);//加入到c里面的模式频道
            incrRefCount(pattern);
            pat = zmalloc(sizeof(*pat));
            pat->pattern = getDecodedObject(pattern);
            pat->client = c;
            listAddNodeTail(server.pubsub_patterns,pat);//加入到server里面的模式频道
        }
        addReplyPubsubPatSubscribed(c,pattern);
        return retval;
    }
    
  • publish

    void publishCommand(client *c) {
        int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);//发送一个消息
        if (server.cluster_enabled)//如果做了Redis集群
            clusterPropagatePublish(c->argv[1],c->argv[2]);
        else
            forceCommandPropagation(c,PROPAGATE_REPL);//强制命令传播
        addReplyLongLong(c,receivers);//告诉哦用户发送了几个订阅者
    }
    /* 发布一个消息*/
    int pubsubPublishMessage(robj *channel, robj *message) {
        int receivers = 0;
        dictEntry *de;
        listNode *ln;
        listIter li;
        de = dictFind(server.pubsub_channels,channel);
        if (de) {//找到此频道的话
            list *list = dictGetVal(de);
            listNode *ln;
            listIter li;
    
            listRewind(list,&li);
            while ((ln = listNext(&li)) != NULL) {
                client *c = ln->value;
                addReplyPubsubMessage(c,channel,message);//发送消息给每一个订阅者
                receivers++;
            }
        }
        /* 模式匹配频道*/
        if (listLength(server.pubsub_patterns)) {
            listRewind(server.pubsub_patterns,&li);
            channel = getDecodedObject(channel);
            while ((ln = listNext(&li)) != NULL) {
                pubsubPattern *pat = ln->value;
    
                if (stringmatchlen((char*)pat->pattern->ptr,
                                    sdslen(pat->pattern->ptr),
                                    (char*)channel->ptr,
                                    sdslen(channel->ptr),0))//匹配 发送
                {
                    addReplyPubsubPatMessage(pat->client,
                        pat->pattern,channel,message);
                    receivers++;
                }
            }
            decrRefCount(channel);
        }
        return receivers;
    }
    

    其中,反向操作就是以上的操作的逆顺序。

四、参考

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容