Redis是一个优秀的内存数据库,Stream是其中强大的功能,但也有很多细节,本篇是在学习中的笔记。
应用场景
记录用户行为(如点击,输入)
传感器的读数(多个维度,如温度、湿度)
通知
提供多种数据消费策略(xread,xreadgroup,xrange)
XADD
写入,支持多字段,和Hash数据类型相似,*代表让Redis自动生成ID,命令返回ID
ID由两个64位的数字组成
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
*也可以替换成自定义,最小是0-1,但其他的第二个数字可以为0,比如1-0
> XADD mystream 0-1 foo bar
0-1
第二位*的情况
> XADD somestream 0-* baz qux
0-2
如果没有key,不创建
> xadd mykey nomkstream * k v
(nil)
> xadd mykey * k v
"1682114234811-0"
> xadd mykey nomkstream * k1 v1
"1682114244867-0"
可以携带trim信息,MAXLEN和阈值直接可以加上~或=,表示近似或精确相等,默认精确,也可以用MINID
> xadd mykey * k1 v1
"1682135799495-0"
> xadd mykey * k2 v2
"1682135814851-0"
> xadd mykey * k3 v3
"1682135821706-0"
> xadd mykey * k4 v4
"1682135835368-0"
> xadd mykey MAXLEN 3 * k5 v5
"1682135847838-0"
> xrange mykey - +
1) 1) "1682135821706-0"
2) 1) "k3"
2) "v3"
2) 1) "1682135835368-0"
2) 1) "k4"
2) "v4"
3) 1) "1682135847838-0"
2) 1) "k5"
2) "v5"
XLEN
获取长度
> XLEN mystream
(integer) 1
XRANGE
根据范围获取数据,两个ID是闭区间,可以通过前面加上括号变成开区间
XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
XRANGE mystream 1518951480106 1518951480107
XRANGE mystream - + COUNT 2
XRANGE mystream (1519073279157-0 + COUNT 2
XREAD
0表示永远不会超时,BLOCK只配合$一起使用才能实现阻塞功能
XREAD BLOCK 0 STREAMS mystream $
BLOCK是有顺序的,先执行的先获得数据
XREAD也能同时取多个stream的数据,每个stream都要给定起始ID
XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
XREAD和XRANGE
都能迭代获取所有记录,XRANGE迭代时起始ID需要加上(表示排除
XREAD适合于通过迭代方式取完所有记录,XRANGE适合一次性获取记录(支持 -+)
XREAD支持阻塞式读($)
前者支持多个Key,后者只支持一个Key
XREAD
与PUBSUB的区别:没有存储功能
与LIST区别:其中一个消费者取完,另外一个消费者不能再次获得
=================
stream还提供了consume group功能,和kafka相似
- 每个消息只能发送給一个消费者
- 每个消费者有唯一的名称
- First ID的概念,没被发送的的第一个消息
- 消息可以ACK的操作,表示消息已经正确处理完成
- 消费者能看到已接收pending,还没ACK的历史数据
创建consumer group,如果不存在stream会创建失败,$表示stream中最大的id,stream中新的消息才能被发送到group里,0表示stream所有的消息都会被送到组里,这个id也能是一个0外的ID
XGROUP CREATE newstream mygroup $
XGROUP CREATE newstream mygroup 0
带MKSTREAM,如果不存在stream会自动创建,如果存在没影响
XGROUP CREATE newstream mygroup $ MKSTREAM
读取,如果没有未读消息直接返回null,消费者是隐式创建的,不需要单独命令
XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream
历史消息只要是没XACK,还在pending列表中,还是能获取到
XREADGROUP GROUP mygroup Alice STREAMS mystream 0
消费者是隐式创建的,不需要单独命令
XREADGROUP也支持读多个stream,但是group在不同stream里必须取相同名称
这是一个只读命令,返回最小最大id,并且返回消费者和他们名下的pending数量
XPENDING mystream mygroup
这个命令不返回消息内容,可以通过XRANGE查看
增加了范围,服务器给出了更详细的信息,包括每个已发送信息的距离上次发送的时长和总共发送次数
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob" // Owner
3) (integer) 74170458 // Idle时间
4) (integer) 1 // 发送次数
通过XCLAIM(索要)可以改变owner,这命令同时返回消息内容,如果不需要消息内容,可以在命令中加JUSTID选项
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
通过设置min-idle-time可以防止消息被同时改变owner
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
自动claim
> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
查看stream的信息
> XINFO STREAM mystream
> XINFO GROUPS mystream
> XINFO CONSUMERS mystream mygroup