Redis事务
redis事务是一次可以执行多条命令,它的本质是一组命令的集合。
一个事务中所有的命令都会被序列化,在事务执行过程中会按照顺序执行队列中的命令。
redis事务是分为三个阶段:开始事务、命令入队、执行事务。
redis事务与MySQL事务对比
mysql是关系型数据库,他的事务核心就是acid,即原子性、一致性、隔离性、持久性
- 原子性:Redis事务的原子性分为两种情况:
一是语法异常,就是命令在加入事务命令队列的时候检测到错误(譬如set误写成sett等),这种情况下事务会进行回滚,可以保证原子性。
二是业务异常,就是命令在加入事务时正常,但是在运行的时候出现了错误(譬如将list的命令操作在string上),这种情况下事务不会回滚,错误的命令不执行,其他的命令正常执行,不能保证原子性
之所以会这样通过搜索redis官方文档可以找到相关解释:
- 根本原因可能还是Redis的定位是缓存。不加入回滚逻辑可以使它更简洁,稳定,快速,而这也是redis最大特点
- 官方解释是:运行时的异常不应该发生在生产环境,就是Redis不应为程序员的bug买单
- Redis可以以脚本的形式支持事务的执行。历史原因,先出现了事务,后出现的脚本。暂时不打算撤销事务的功能,它提供了无脚本也能支持简单事务的功能,但是如果随着后续脚本方案的流行可能会在后续版本移除事务功能
- 一致性
- 持久性:redis的持久性是由所使用的持久化相关的
- 当服务器在无持久化的内存模式下运作时,事务不具有持久性。因为一旦服务器停机,服务器所有的数据都将丢失
- 当服务器在rdb持久化模式下运作时,事务同样不具有持久性。因为服务器只会在特定的保存条件下才会执行BGSAVE命令,并且异步执行的BGSAVE命令不能保证事务的数据第一时间被保存到硬盘上
- 当服务器运行在aof持久化模式下,并且appendfsync选项的值为always时,程序总会在执行命令之后调用同步(sync)函数,将命令数据真正地保存到硬盘里
- 隔离性,由于redis是单线程的,它的事务执行是串行的,所以他的隔离级别就相当于mysql的序列化级别
事务命令入队流程
redis乐观锁
首先理解下乐观锁,乐观锁是一种思想,而不是具体的锁。所谓乐观锁,就是不加锁,但是也能实现锁的效果
redis借助于watch命令实现乐观锁,具体实现:redis watch命令控制一个或多个key,如果在事务执行之前(exec执行之前),监控的key被其他命令所改动,那么事务将会被打断
执行流程:
stream类型介绍
stream类型是redis5新出的一个数据类型,他的诞生主要是用来做消息队列的。Redis发布订阅功能也可以实现消息队列,但是它有个缺点就是消息无法持久化,比如消息没有被消费时出现了断网或者宕机的情况,那么消息会丢失。那么问题来了:
问题一:什么是消息队列?
消息队列一般简称MQ(Message Queue),简单理解就是把要发送的消息放在队列中执行。
把数据放到消息队列的叫做生产者,从消息队列里边取数据的叫做消费者问题二:为什么要用消息队列?
因为消息队列主要有三个优点:异步、削峰、解耦。
一、异步:就是在主任务成功后直接返回,把不重要的业务放到消息队列中执行,这样可加快程序响应速度,缓解高并发下数据库的访问压力
二、削峰:把请求放到消息队列中,防止某一刻系统流量过大,而导致系统宕机,例如做秒杀等活动的时候
三、解耦:某个系统的某个值和其他几个系统有依赖关系,这样改系统就可以将该值放到消息队列中,谁用谁取,不用因为要增加新系统而频繁改动代码问题三:消息队列的缺点?
一、系统可用性降低:在加入MQ之前,不用考虑消息丢失或者说MQ挂掉等等的情况,但是,引入MQ之后就需要去考虑了
二、系统复杂性提高:加入MQ之后,需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题
三、一致性问题:消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了
任何技术都是双刃剑,是否使用消息队列要看业务是否真的需要,场景是否真的适用,不能为了用而用
stream类型结构
- stream消息队列中可以有多个消费组
- 一个消费组中可以有多个消费者
- 一个消费组中每一个消费者不会消费同一条消息
-
这里的消费组和消费者是由我们手动去定义的,在消息正确消费后通过ack给生产者反馈确认消费的通知,这也是跟list实现消息队列的区别,list没有ack功能
stream类型使用
- xadd命令:用于在某个stream中追加消息
格式:XADD key ID field string [field string ...]
解释:ID,最常使用*,表示由Redis生成消息ID,这也是强烈建议的方案。
field string [field string], 就是当前消息内容,由1个或多个key-value构成。
例:
127.0.0.1:6379> xadd test * name ricky age 25 sex 1
"1610895610625-0"
127.0.0.1:6379> xadd test * name cindy age 30 sex 2
"1610895800414-0"
127.0.0.1:6379>
- xlen命令:返回结果为stream数据类型的长度
格式:XLEN key
例:
127.0.0.1:6379> xlen test
(integer) 2
- xrange命令:获取消息列表,会自动过滤已经删除的消息
格式:XRANGE key start end [COUNT count]
解释:start和end使用 - +表示查询全部,-表示最小值, +表示最大值,count表示查询的个数
例:
127.0.0.1:6379> xrange test - +
1) 1) "1610895610625-0"
2) 1) "name"
2) "ricky"
3) "age"
4) "25"
5) "sex"
6) "1"
2) 1) "1610895800414-0"
2) 1) "name"
2) "cindy"
3) "age"
4) "30"
5) "sex"
6) "2"
127.0.0.1:6379> xrange test - + count 1
1) 1) "1610895610625-0"
2) 1) "name"
2) "ricky"
3) "age"
4) "25"
5) "sex"
6) "1"
- xread命令:我们可以在不定义消费组的情况下进行Stream消息的独立消费,当Stream没有新消息时,甚至可以阻塞等待。Redis设计了一个单独的消费指令xread,可以将Stream当成普通的消息队列(list)来使用。使用xread时,我们可以完全忽略消费组(Consumer Group)的存在,就好比Stream就是一个普通的列表(list)。
格式:XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
解释:[COUNT count]与xrange一样;[BLOCK milliseconds]不加则命令是同步的,加上则为阻塞,即加上此命令后,没有可消费的消息时在超时前会阻塞等待生产者推送消息,单位毫秒;STREAMS固定格式;key不用解释;id就是在xadd时生成的id
例:
127.0.0.1:6379> xread STREAMS test 1610895610625-0
1) 1) "test"
2) 1) 1) "1610895800414-0"
2) 1) "name"
2) "cindy"
3) "age"
4) "30"
5) "sex"
6) "2"
127.0.0.1:6379> xread STREAMS test 0-0
1) 1) "test"
2) 1) 1) "1610895610625-0"
2) 1) "name"
2) "ricky"
3) "age"
4) "25"
5) "sex"
6) "1"
2) 1) "1610895800414-0"
2) 1) "name"
2) "cindy"
3) "age"
4) "30"
5) "sex"
6) "2"
127.0.0.1:6379> xread count 1 STREAMS test 0-0
1) 1) "test"
2) 1) 1) "1610895610625-0"
2) 1) "name"
2) "ricky"
3) "age"
4) "25"
5) "sex"
6) "1"
127.0.0.1:6379> xread block 1000 STREAMS tests 0-0
(nil)
(1.07s)
- xgroup create命令:创建消费组(Consumer Group),需要传递起始消息ID参数用来初始化last_delivered_id变量
例:
127.0.0.1:6379> xgroup create test group1 0-0 # 表示从头开始消费,group1是自定义的消费组的名称
OK
127.0.0.1:6379> xgroup create test group2 $ # $表示从尾部开始消费,只接受新消息,当前已存在的Stream消息会全部忽略
OK
- xinfo命令:获取Stream信息
127.0.0.1:6379> xinfo stream test
1) "length"
2) (integer) 2 #共2个消息
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
7) "last-generated-id"
8) "1610895800414-0"
9) "groups"
10) (integer) 2 #有2个消费组
11) "first-entry"
12) 1) "1610895610625-0"
2) 1) "name"
2) "ricky"
3) "age"
4) "25"
5) "sex"
6) "1"
13) "last-entry"
14) 1) "1610895800414-0"
2) 1) "name"
2) "cindy"
3) "age"
4) "30"
5) "sex"
6) "2"
- xreadgroup group命令:进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息ID。它同xread一样,也可以阻塞等待新消息。读到新消息后,对应的消息ID就会进入消费者的PEL(正在处理的消息)结构里,客户端处理完毕后使用xack指令通知服务器,本条消息已经处理完毕,该消息ID就会从PEL中移除
# >号表示从当前消费组的last_delivered_id后面开始读
# 每当消费者读取一条消息,last_delivered_id变量就会前进
127.0.0.1:6379> xreadgroup group group1 xk count 1 streams test >
1) 1) "test"
2) 1) 1) "1610895610625-0"
2) 1) "name"
2) "ricky"
3) "age"
4) "25"
5) "sex"
6) "1"
127.0.0.1:6379> xreadgroup group group1 xk count 1 streams test >
1) 1) "test"
2) 1) 1) "1610895800414-0"
2) 1) "name"
2) "cindy"
3) "age"
4) "30"
5) "sex"
6) "2"
127.0.0.1:6379> xreadgroup group group1 xk count 1 streams test >
(nil)
- xack命令:从PEL中删除一条或多条消息
127.0.0.1:6379> xack test group1 1610895610625-0
(integer) 1