Redis的Pub/Sub为何不建议进行消息订阅

应用场景

pubsub可以应用于高性能高吞吐的发布订阅场景,订阅者通过sub订阅某一个channel,发布者往channel发布消息,当channel收到消息的时候,会将消息推送给channel下的所有订阅者。由于是纯内存操作,因此可以达到极高的吞吐和极短的延时。可以用于构建实时系统,任务通知等。纯内存操作带来了高性能低延时的收益,但是同时也带来了消息丢失的风险,此外还有其他的不足。

实现原理

订阅者新建一个连接并订阅一个channel,当该channel写入消息时,会向该连接推送消息。一个channel可以有多个订阅者,当channel写入消息时,所有的订阅者都会收到消息推送。

同一个channel也可以有多个发布者写入消息,消息写入以后并不会进行保存。消息写入后,会将消息发送给所有的订阅者,一旦订阅者断开连接,那么将不会收到该消息。如果出现网络异常导致订阅者连接重连,那么从连接断开到重连的这段期间,该channel的消息订阅者都不会收到。

为什么不推荐

1.消息丢失:Redis的Pub/Sub模式不会对消息进行持久化,如果订阅者在消息发布之前未连接到Redis服务器,它们将无法接收到之前发布的消息。这意味着如果订阅者在消息发布之前断开连接或重新启动,它们将错过这些消息。

2.内存占用:由于Redis将所有订阅者的订阅信息存储在内存中,当订阅者数量非常大时,可能会导致Redis服务器的内存占用过高。这会对Redis的性能和可伸缩性产生负面影响。

3.阻塞问题:当订阅者在执行阻塞操作(例如阻塞式读取)时,它们将无法处理其他的Redis命令。这可能会导致性能问题,特别是在高并发环境中。

4.无法保证消息传递顺序:在Pub/Sub模式中,消息的传递是异步的,并且无法保证消息的传递顺序。如果应用程序需要处理有序的消息,Pub/Sub模式可能不适合。

源码subscribeCommand

// 用法: SUBSCRIBE channel [channel ...]
// pubsub.c
void subscribeCommand(client *c) {
    int j;
    // n 个channel 的订阅,循环调用即可
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    // 添加pubsub订阅标识,方便其他地方判断
    c->flags |= CLIENT_PUBSUB;
}
// 具体的单个 channel 订阅实现
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
 * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    // step1. 将要订阅的 channel 添加到各自客户端的 pubsub_channels 容器中
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        // step2. 将要订阅的channel 添加到 server.pubsub_channels 中, 方便在publish时判定是否触发通知
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        // step3. 将客户端自身添加到相应的 server.pubsub_channels 对应的队列中去, 在通知时只需遍历该队列即可
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    // 响应客户端: 
    // *3 \r\n
    // $9\r\nsubscribe\r\n
    // channel
    // 111(该客户端总共订阅的channel数)
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}
// 客户端订阅的总channel数, 两种订阅方式相加
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
    return dictSize(c->pubsub_channels)+
           listLength(c->pubsub_patterns);
}

源码psubscribeCommand

// 用法: PSUBSCRIBE pattern [pattern ...]
// pubsub.c
void psubscribeCommand(client *c) {
    int j;
    // 同样是n个channel依次注册
    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
    c->flags |= CLIENT_PUBSUB;
}
// 注册单个模式匹配的 channel 订阅
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
    int retval = 0;
    // 直接查找对应的 pattern, 没有则添加
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容