5. stream
很多同学并不认识这个数据结构,确实,在Redis 5.0之前并没有这个数据结构。这个数据结构称之为“流”,为什么叫“流”呢?这种数据就像流水一样,不是一次泼了一盆过来而是一点一点地“流”过来。当数据从生产方“流”到消费方的时候,消费方就可以对一点一点的数据进行处理,这样就保证了处理的时效性,也让机器能平缓地使用资源。
事实上,Redis的stream是作者的另一个开源项目Disque移植过来的,它高度借鉴了Kafka的设计,如果你有Kafka相关的知识,那么stream对你来说也很简单。
5.1 相关命令
-
消息队列相关命令:
XADD - 添加消息到末尾
XTRIM - 对流进行修剪,限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID 从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表
-
消费者组相关命令:
XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息
XINFO GROUPS - 打印消费者组的信息
XINFO STREAM - 打印流信息
5.2 结构源码
消息ID:
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
* millisecond if the clock jumped backward) will use the millisecond time
* of the latest generated ID and an incremented sequence. */
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
流迭代器:
/* We define an iterator to iterate stream items in an abstract way, without
* caring about the radix tree + listpack representation. Technically speaking
* the iterator is only used inside streamReplyWithRange(), so could just
* be implemented inside the function, but practically there is the AOF
* rewriting code that also needs to iterate the stream to emit the XADD
* commands. */
typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
unsigned char *lp; /* Current listpack. */
unsigned char *lp_ele; /* Current listpack cursor. */
unsigned char *lp_flags; /* Current entry flags pointer. */
/* Buffers used to hold the string of lpGet() when the element is
* integer encoded, so that there is no string representation of the
* element inside the listpack itself. */
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
流:
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
5.3 运行机制
每个 stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
每个 stream 都可以挂多个消费组(Consumer Group),每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动,同一个消费组的消费者只能读取之后的消息。
pending_ids是消费者(Consumer)的状态变量,作用是维护消费者的未确认的 id。 它记录了当前已经被客户端读取的消息,但是还没有 ack(Acknowledge character:确认字符)的消息id。当消息被取走,这个对应的消息ID就会进入消费组的PEL(Pending Entries List)结构里,当ack之后,这个消息就会从PEL中删除。如果stream中的消息被消费者取走但是一直不ack,那么PEL就会一直增长,如果消费者巨多,就可能出现内存占用很大的情况。当Redis服务器向消费者发送数据的时候,客户端断开了连接,这样丢失的消息会在客户端重连之后继续发送给它,因为消息ID会保存在PEL之中。
除了这些,stream还提供maxlen参数,来限制自身的最长消息数,这样就能保证数据不超过指定的长度。
5.4 异同
stream 虽然和 Kafka 非常像,有消费组、水位等概念,但它并不支持天然分区。也就是说,如果我们要进行partition的话,只能建立多个 stream key ,然后由客户端或者中间代理来将不同的消息路由到不同的 stream 中。
在 stream 被发布之前,Redis本身就有 pub/sub 来实现消息队列的功能。不过pub/sub有个极大的缺点就是不能持久化,当被订阅的服务器pub一条消息,如果没有sub的客户端,那么这个消息就会丢失。在生产环境中我们没办法接受那么容易的丢失消息,并且 pub/sub 更适合用来做广播,并不适合做消息队列。作者在Reids Cluster 中大量的使用了 pub/sub 这个功能。