Redis奇幻之旅(二)5. stream

5. stream

​ 很多同学并不认识这个数据结构,确实,在Redis 5.0之前并没有这个数据结构。这个数据结构称之为“流”,为什么叫“流”呢?这种数据就像流水一样,不是一次泼了一盆过来而是一点一点地“流”过来。当数据从生产方“流”到消费方的时候,消费方就可以对一点一点的数据进行处理,这样就保证了处理的时效性,也让机器能平缓地使用资源。

​ 事实上,Redis的stream是作者的另一个开源项目Disque移植过来的,它高度借鉴了Kafka的设计,如果你有Kafka相关的知识,那么stream对你来说也很简单。

5.1 相关命令

  • 消息队列相关命令:

    XADD - 添加消息到末尾

    XTRIM - 对流进行修剪,限制长度

    XDEL - 删除消息

    XLEN - 获取流包含的元素数量,即消息长度

    XRANGE - 获取消息列表,会自动过滤已经删除的消息

    XREVRANGE - 反向获取消息列表,ID 从大到小

    XREAD - 以阻塞或非阻塞方式获取消息列表

  • 消费者组相关命令:

    XGROUP CREATE - 创建消费者组

    XREADGROUP GROUP - 读取消费者组中的消息

    XACK - 将消息标记为"已处理"

    XGROUP SETID - 为消费者组设置新的最后递送消息ID

    XGROUP DELCONSUMER - 删除消费者

    XGROUP DESTROY - 删除消费者组

    XPENDING - 显示待处理消息的相关信息

    XCLAIM - 转移消息的归属权

    XINFO - 查看流和消费者组的相关信息

    XINFO GROUPS - 打印消费者组的信息

    XINFO STREAM - 打印流信息

5.2 结构源码

消息ID:

/* Stream item ID: a 128 bit number composed of a milliseconds time and
 * a sequence counter. IDs generated in the same millisecond (or in a past
 * millisecond if the clock jumped backward) will use the millisecond time
 * of the latest generated ID and an incremented sequence. */
typedef struct streamID {
    uint64_t ms;        /* Unix time in milliseconds. */
    uint64_t seq;       /* Sequence number. */
} streamID;

流迭代器:

/* We define an iterator to iterate stream items in an abstract way, without
 * caring about the radix tree + listpack representation. Technically speaking
 * the iterator is only used inside streamReplyWithRange(), so could just
 * be implemented inside the function, but practically there is the AOF
 * rewriting code that also needs to iterate the stream to emit the XADD
 * commands. */
typedef struct streamIterator {
    stream *stream;         /* The stream we are iterating. */
    streamID master_id;     /* ID of the master entry at listpack head. */
    uint64_t master_fields_count;       /* Master entries # of fields. */
    unsigned char *master_fields_start; /* Master entries start in listpack. */
    unsigned char *master_fields_ptr;   /* Master field to emit next. */
    int entry_flags;                    /* Flags of entry we are emitting. */
    int rev;                /* True if iterating end to start (reverse). */
    uint64_t start_key[2];  /* Start key as 128 bit big endian. */
    uint64_t end_key[2];    /* End key as 128 bit big endian. */
    raxIterator ri;         /* Rax iterator. */
    unsigned char *lp;      /* Current listpack. */
    unsigned char *lp_ele;  /* Current listpack cursor. */
    unsigned char *lp_flags; /* Current entry flags pointer. */
    /* Buffers used to hold the string of lpGet() when the element is
     * integer encoded, so that there is no string representation of the
     * element inside the listpack itself. */
    unsigned char field_buf[LP_INTBUF_SIZE];
    unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;

流:

typedef struct stream {
    rax *rax;               /* The radix tree holding the stream. */
    uint64_t length;        /* Number of elements inside this stream. */
    streamID last_id;       /* Zero if there are yet no items. */
    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

5.3 运行机制

​ 每个 stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

​ 每个 stream 都可以挂多个消费组(Consumer Group),每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动,同一个消费组的消费者只能读取之后的消息。

​ pending_ids是消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 它记录了当前已经被客户端读取的消息,但是还没有 ack(Acknowledge character:确认字符)的消息id。当消息被取走,这个对应的消息ID就会进入消费组的PEL(Pending Entries List)结构里,当ack之后,这个消息就会从PEL中删除。如果stream中的消息被消费者取走但是一直不ack,那么PEL就会一直增长,如果消费者巨多,就可能出现内存占用很大的情况。当Redis服务器向消费者发送数据的时候,客户端断开了连接,这样丢失的消息会在客户端重连之后继续发送给它,因为消息ID会保存在PEL之中。

​ 除了这些,stream还提供maxlen参数,来限制自身的最长消息数,这样就能保证数据不超过指定的长度。

5.4 异同

​ stream 虽然和 Kafka 非常像,有消费组、水位等概念,但它并不支持天然分区。也就是说,如果我们要进行partition的话,只能建立多个 stream key ,然后由客户端或者中间代理来将不同的消息路由到不同的 stream 中。

​ 在 stream 被发布之前,Redis本身就有 pub/sub 来实现消息队列的功能。不过pub/sub有个极大的缺点就是不能持久化,当被订阅的服务器pub一条消息,如果没有sub的客户端,那么这个消息就会丢失。在生产环境中我们没办法接受那么容易的丢失消息,并且 pub/sub 更适合用来做广播,并不适合做消息队列。作者在Reids Cluster 中大量的使用了 pub/sub 这个功能。

©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 225,124评论 6 523
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 96,453评论 3 404
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 172,386评论 0 368
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 61,136评论 1 301
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 70,142评论 6 400
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,593评论 1 315
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,958评论 3 429
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,944评论 0 279
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,477评论 1 324
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,512评论 3 346
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,639评论 1 355
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 37,227评论 5 351
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,971评论 3 340
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,397评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,550评论 1 277
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 50,203评论 3 381
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,713评论 2 366

推荐阅读更多精彩内容

  • 原文链接:Redis实现消息队列的方案 Redis作为内存中的数据结构存储,常用作数据库、缓存和消息代理。它支持数...
    这个ID狠温柔阅读 101,272评论 2 28
  • 夜莺2517阅读 127,732评论 1 9
  • 版本:ios 1.2.1 亮点: 1.app角标可以实时更新天气温度或选择空气质量,建议处女座就不要选了,不然老想...
    我就是沉沉阅读 6,909评论 1 6
  • 我是一名过去式的高三狗,很可悲,在这三年里我没有恋爱,看着同龄的小伙伴们一对儿一对儿的,我的心不好受。怎么说呢,高...
    小娘纸阅读 3,398评论 4 7
  • 这些日子就像是一天一天在倒计时 一想到他走了 心里就是说不出的滋味 从几个月前认识他开始 就意识到终究会发生的 只...
    栗子a阅读 1,627评论 1 3