Redis Stream类型的使用

一、背景

最近在看redis这方面的知识,发现在redis5中产生了一种新的数据类型Stream,它和kafka的设计有些类似,可以当作一个简单的消息队列来使用。

二、redis中Stream类型的特点

  1. 是可持久化的,可以保证数据不丢失。

  2. 支持消息的多播、分组消费。

  3. 支持消息的有序性。

三、Stream的结构

redis stream 结构图

解释:

  1. 消费者组: Consumer Group,即使用 XGROUP CREATE 命令创建的,一个消费者组中可以存在多个消费者,这些消费者之间是竞争关系。
    1. 同一条消息,只能被这个消费者组中的某个消费者获取。
    2. 多个消费者之间是相互独立的,互不干扰。
  2. 消费者: Consumer 消费消息。
  3. last_delivered_id: 这个id保证了在同一个消费者组中,一个消息只能被一个消费者获取。每当消费者组的某个消费者读取到了这个消息后,这个last_delivered_id的值会往后移动一位,保证消费者不会读取到重复的消息。
  4. pending_ids:记录了消费者读取到的消息id列表,但是这些消息可能还没有处理,如果认为某个消息处理,需要调用ack命令。这样就确保了某个消息一定会被执行一次。
  5. 消息内容:是一个键值对的格式。
  6. Stream 中 消息的 ID: 默认情况下,ID使用 * ,redis可以自动生成一个,格式为 时间戳-序列号,也可以自己指定,一般使用默认生成的即可,且后生成的id号要比之前生成的大。

四、Stream的命令

1、XADD 往Stream末尾添加消息

1、命令格式

xadd key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
xadd命令解释

2、举例

xadd 命令 返回的是数据的id, xx-yy (xx指的是毫秒数,yy指的是在这个毫秒内的第几条消息)

1、向流中增加一条数据,

127.0.0.1:6379> xadd stream-key * username zhangsan # 向stream-key这个流中增加一个 username 是zhangsan的数据 *表示自动生成id
"1635999858912-0" # 返回的是ID
127.0.0.1:6379> keys *
1) "stream-key" # 可以看到stream自动创建了
127.0.0.1:6379>

2、向流中增加数据,不自动创建流

127.0.0.1:6379> xadd not-exists-stream nomkstream * username lisi # 因为指定了nomkstream参数,而not-exists-stream之前不存在,所以加入失败
(nil)
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379>

3、手动指定ID的值

127.0.0.1:6379> xadd stream-key 1-1 username lisi # 此处id的值是自己传递的1-1,而不是使用*自动生成
"1-1" # 返回的是id的值
127.0.0.1:6379>

4、设置一个固定大小的Stream

1、精确指定Stream的大小

指定指定Stream的大小比模糊指定Stream的大小会稍微多少消耗一些性能。

精确限制流的长度
2、模糊指定Stream的大小
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * first first
"1636001034141-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * second second
"1636001044506-0"
127.0.0.1:6379> xadd stream-key maxlen ~ 1 * third third
"1636001057846-0"
127.0.0.1:6379> xinfo stream stream-key
 1) "length"
 2) (integer) 3
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1636001057846-0"
 9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1636001034141-0"
    2) 1) "first"
       2) "first"
13) "last-entry"
14) 1) "1636001057846-0"
    2) 1) "third"
       2) "third"
127.0.0.1:6379>

~ 模糊指定流的大小,可以看到指定的是1,实际上已经到了3.

2、XRANGE查看Stream中的消息

1、命令格式

xrange key start end [COUNT count]
xrange获取消息数据

2、准备数据

127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username zhangsan
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636003481706-0"
2) "1636003481706-1"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636003499055-0"
127.0.0.1:6379>

使用redis的事务操作,获取到同一毫秒产生的多条数据,时间戳一样,序列号不一样

3、举例

1、获取所有的数据(-+的使用)

127.0.0.1:6379> xrange stream-key - +
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
3) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

-: 表示最小id的值

+:表示最大id的值

2、获取指定id范围内的数据,闭区间

127.0.0.1:6379> xrange stream-key 1636003481706-1 1636003499055-0
1) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
2) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

3、获取指定id范围内的数据,开区间

127.0.0.1:6379> xrange stream-key (1636003481706-0 (1636003499055-0
1) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
127.0.0.1:6379>

(:表示开区间

4、获取某个毫秒后所有的数据

127.0.0.1:6379> xrange stream-key 1636003481706 +
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636003481706-1"
   2) 1) "username"
      2) "lisi"
3) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

直接写毫秒不写后面的序列号即可。

5、获取单条数据

127.0.0.1:6379> xrange stream-key 1636003499055-0 1636003499055-0
1) 1) "1636003499055-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

startend的值写的一样即可获取单挑数据。

6、获取固定条数的数据

127.0.0.1:6379> xrange stream-key - + count 1
1) 1) "1636003481706-0"
   2) 1) "username"
      2) "zhangsan"
127.0.0.1:6379>

使用 count进行限制

3、XREVRANGE反向查看Stream中的消息

XREVRANGE key end start [COUNT count]

使用方式和XRANGE类似,略。

4、XDEL删除消息

1、命令格式

xdel key ID [ID ...]

2、准备数据

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004176924-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636004183638-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636004189211-0"
127.0.0.1:6379>

3、举例

需求:往Stream中加入3条消息,然后删除第2条消息

127.0.0.1:6379> xdel stream-key 1636004183638-0
(integer) 1 # 返回的是删除记录的数量
127.0.0.1:6379> xrang stream -key - +
127.0.0.1:6379> xrange stream-key - +
1) 1) "1636004176924-0"
   2) 1) "username"
      2) "zhangsan"
2) 1) "1636004189211-0"
   2) 1) "username"
      2) "wangwu"
127.0.0.1:6379>

注意:

需要注意的是,我们从Stream中删除一个消息,这个消息并不是被真正的删除了,而是被标记为删除,这个时候这个消息还是占据着内容空间的。如果所有Stream中所有的消息都被标记删除,这个时候才会回收内存空间。但是这个Stream并不会被删除。

6、XLEN查看Stream中元素的长度

1、命令格式

xlen key

2、举例

查看Stream中元素的长度

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636004690578-0"
127.0.0.1:6379> xlen stream-key
(integer) 1
127.0.0.1:6379> xlen not-exists-stream-key
(integer) 0
127.0.0.1:6379>

注意:

如果xlen后方的key不存在则返回0,否则返回元素的个数。

7、XTRIM对Stream中的元素进行修剪

1、命令格式

xtrim key MAXLEN|MINID [=|~] threshold [LIMIT count]

2、准备数据

127.0.0.1:6379>  xadd stream-key * username zhangsan
"1636009745401-0"
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> xadd stream-key * username lisi
QUEUED
127.0.0.1:6379(TX)> xadd stream-key * username wangwu
QUEUED
127.0.0.1:6379(TX)> exec
1) "1636009763955-0"
2) "1636009763955-1"
127.0.0.1:6379> xadd stream-key * username zhaoliu
"1636009769625-0"
127.0.0.1:6379>

3、举例

1、maxlen精确限制

127.0.0.1:6379> xtrim stream-key maxlen 2 # 保留最后的2个消息
(integer) 2
127.0.0.1:6379> xrange stream-key - + # 可以看到之前加入的2个消息被删除了
1) 1) "1636009763955-1"
   2) 1) "username"
      2) "wangwu"
2) 1) "1636009769625-0"
   2) 1) "username"
      2) "zhaoliu"
127.0.0.1:6379>

上方的意思是,保留stream-key这个Stream中最后的2个消息。

2、minid模糊限制

minid 是删除比这个id小的数据,本地测试的时候没有测试出来,略。

8、XREAD独立消费消息

XREAD只是读取消息,读取完之后并不会删除消息。 使用XREAD读取消息,是完全独立与消费者组的,多个客户端可以同时读取消息。

1、命令格式

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
xread命令解释

2、准备数据

127.0.0.1:6379> xadd stream-key * username zhangsan
"1636011801365-0"
127.0.0.1:6379> xadd stream-key * username lisi
"1636011806261-0"
127.0.0.1:6379> xadd stream-key * username wangwu
"1636011810905-0"
127.0.0.1:6379>

3、举例

1、获取用户名是wangwu的数据

127.0.0.1:6379> xread streams stream-key 1636011806261-0 # 此处写的是lisi的id,即读取到的数据需要是 > 1636011806261-0
1) 1) "stream-key"
   2) 1) 1) "1636011810905-0"
         2) 1) "username"
            2) "wangwu"

2、获取2条数据

127.0.0.1:6379> xread count 2 streams stream-key 0-0
1) 1) "stream-key"
   2) 1) 1) "1636011801365-0"
         2) 1) "username"
            2) "zhangsan"
      2) 1) "1636011806261-0"
         2) 1) "username"
            2) "lisi"
127.0.0.1:6379>

count限制单次读取最后的消息,因为当前读取可能没有这么多。

3、非阻塞读取Stream对尾的数据

即读取队列尾的下一个消息,在非阻塞模式下始终是nil

127.0.0.1:6379> xread streams stream-key $
(nil)

4、阻塞读取Stream对尾的数据

阻塞读取对尾消息

注意:

  1. $表示读取队列最新进来的一个消息,不是Stream的最后一个消息。是xread block执行后,再次使用xadd添加消息后,xread block才会返回。

  2. block 0表示永久阻塞,当消息到来时,才接触阻塞。block 1000表示阻塞1000ms,如果1000ms还没有消息到来,则返回nil

  3. xread进行顺序消费 当使用xread进行顺序消息时,需要记住返回的消息id,同时下次调用xread时,需要将上次返回的消息id传递进去。

  4. xread读取消息,完全无视消费组,此时Stream就可以理解为一个普通的list。

9、消费者组相关操作

1、消费者组命令

消费者组命令

2、准备数据

1、创建Stream的名称是 stream-key

2、创建2个消息,aa和bb

127.0.0.1:6379> xadd stream-key * aa aa
"1636362619125-0"
127.0.0.1:6379> xadd stream-key * bb bb
"1636362623191-0"

3、创建消费者组

1、创建一个从头开始消费的消费者组
xgroup create stream-key(Stream 名) g1(消费者组名) 0-0(表示从头开始消费)
2、创建一个从Stream最新的一个消息消费的消费者组
xgroup create stream-key g2 $

$表示从最后一个元素消费,不包括Stream中的最后一个元素,即消费最新的消息。

4、创建一个从某个消息之后消费的消费者组
xgroup create stream-key g3 1636362619125-0  #1636362619125-0 这个是上方aa消息的id的值

1636362619125-0某个消息的具体的ID,这个g3消费者组中的消息都是大于>这个id的消息。

3、从消费者中读取消息

127.0.0.1:6379> xreadgroup group g1(消费组名) c1(消费者名,自动创建) count 3(读取3条) streams stream-key(Stream 名) >(从该消费者组中还未分配给另外的消费者的消息开始读取)
1) 1) "stream-key"
   2) 1) 1) "1636362619125-0"
         2) 1) "aa"
            2) "aa"
      2) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> xreadgroup group g2 c1 count 3 streams stream-key >
(nil) # 返回 nil 是因为 g2消费组是从最新的一条信息开始读取(创建消费者组时使用了$),需要在另外的窗口执行`xadd`命令,才可以再次读取到消息
127.0.0.1:6379> xreadgroup group g3 c1 count 3 streams stream-key >  #只读取到一条消息是因为,在创建消费者组时,指定了aa消息的id,bb消息的id大于aa,所以读取出来了。
1) 1) "stream-key"
   2) 1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379>

4、读取消费者的pending消息

127.0.0.1:6379> xgroup create stream-key g4 0-0
OK
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 2
   5) "idle"
   6) (integer) 88792
127.0.0.1:6379> xinfo consumers stream-key g4
(empty array)
127.0.0.1:6379> xreadgroup group g1 c1 count 1 streams stream-key 1636362619125-0
1) 1) "stream-key"
   2) 1) 1) "1636362623191-0"
         2) 1) "bb"
            2) "bb"
127.0.0.1:6379> xreadgroup group g4 c1 count 1 block 0 streams stream-key 1636362619125-0
1) 1) "stream-key"
   2) (empty array)
127.0.0.1:6379>
从消费者的pending列表中读取消息

5、转移消费者的消息

127.0.0.1:6379> xpending stream-key g1 - + 10 c1
1) 1) "1636362619125-0"
   2) "c1"
   3) (integer) 2686183
   4) (integer) 1
2) 1) "1636362623191-0"
   2) "c1"
   3) (integer) 102274
   4) (integer) 7
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
(empty array)
127.0.0.1:6379> xclaim stream-key g1 c2 102274 1636362623191-0
1) 1) "1636362623191-0"
   2) 1) "bb"
      2) "bb"
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 17616
   4) (integer) 8
127.0.0.1:6379>
xclaim转移消息

也可以通过xautoclaim来实现。

6、一些监控命令

1、查看消费组中消费者的pending消息
127.0.0.1:6379> xpending stream-key g1 - + 10 c2
1) 1) "1636362623191-0"
   2) "c2"
   3) (integer) 1247680
   4) (integer) 8
127.0.0.1:6379>
2、查看消费组中的消费者信息
127.0.0.1:6379> xinfo consumers stream-key g1
1) 1) "name"
   2) "c1"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1474864
2) 1) "name"
   2) "c2"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 1290069
127.0.0.1:6379>
3、查看消费组信息
127.0.0.1:6379> xinfo groups stream-key
1) 1) "name"
   2) "g1"
   3) "consumers"
   4) (integer) 2
   5) "pending"
   6) (integer) 2
   7) "last-delivered-id"
   8) "1636362623191-0"
2) 1) "name"
   2) "g2"
   3) "consumers"
......
4、查看Stream信息
127.0.0.1:6379> xinfo stream stream-key
 1) "length"
 2) (integer) 2
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "last-generated-id"
 8) "1636362623191-0"
 9) "groups"
10) (integer) 4
11) "first-entry"
12) 1) "1636362619125-0"
    2) 1) "aa"
       2) "aa"
13) "last-entry"
14) 1) "1636362623191-0"
    2) 1) "bb"
       2) "bb"
127.0.0.1:6379>

五、参考文档

1、https://redis.io/topics/streams-intro

2、https://www.runoob.com/redis/redis-stream.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,386评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,142评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,704评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,702评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,716评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,573评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,314评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,230评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,680评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,873评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,991评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,706评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,329评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,910评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,038评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,158评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,941评论 2 355

推荐阅读更多精彩内容