分布式事务
两阶段提交 2PC
两阶段提交包含一个协调者和多个参与者,参与者通过与协调者通信来决定事务是提交还是回滚;
主要流程:
第一阶段(prepare):准备阶段 即所有的参与者准备执行事务并锁住需要的资源。参与者ready时,向协调者报告已准备就绪。
第二阶段 (commit/rollback):当协调者确认所有参与者都ready后,向所有参与者发送commit命令。
存在的问题:
(1) 同步阻塞:所有事务参与者在等待其他参与者响应的时候都处于同步阻塞状态,无法进行其他操作。
(2) 单点问题:协调者在方案中起到非常大的作用,发生故障将会造成很大的影响,特别是在二阶段发生故障,所有参与者会一直等待状态,无法完成其他操作。
(3) 数据不一致:在阶段二,如果协调者只发送了部分Commit消息,此时网络发生异常,那么只有部分参与者接收到commit消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
(4) 太过保守:任意一个节点失败就会导致整个事务失败,没有完善的容错机制。
优点:
简单易懂:协议简单,易于理解和实现,广泛用于分布式系统中。
一致性保证:通过两阶段提交,能够保证分布式事务的一致性。
缺点:
阻塞问题:如果协调者崩溃,参与者会处于等待状态,导致系统阻塞。
性能瓶颈:两阶段的提交涉及大量的网络通信和协调,可能导致性能下降。
单点故障:协调者如果发生故障,可能会导致系统无法继续执行事务。
三阶段提交 3PC
三阶段提交:CanCommit 阶段、PreCommit 阶段、DoCommit 阶段,简称3PC
三阶段提交协议(Three-phase commit protocol,3PC),是二阶段提交(2PC)的改进版本。与两阶段提交不同的是,三阶段提交有两个改动点:在协调者和参与者中都引入超时机制,同时引入了预提交阶段。
在第一阶段和第二阶段中插入的预提交阶段,保证了在最后提交阶段之前各参与节点的状态是一致的。
即 3PC 把 2PC 的准备阶段再次一分为二,这样三阶段提交就有 CanCommit、PreCommit、DoCommit 三个阶段。当 CanCommit、PreCommit、DoCommit的任意一个步骤失败或者等待超时,执行RollBack。
通过引入PreCommit阶段,3PC在一定程度上解决了2PC中协调者单点故障的问题,因为即使协调者在PreCommit阶段后发生故障,参与者也可以根据自身的状态来决定是否提交事务。然而,3PC并不是完美的解决方案,它仍然有一些缺点,比如增加了协议的复杂性和可能的性能开销。因此,在选择是否使用3PC时,需要根据具体的业务场景和需求进行权衡。
优点:
避免阻塞:通过引入额外的阶段,能够避免 2PC 中协调者崩溃导致的阻塞问题。
容错性强:通过超时和恢复机制,提高了协议的容错能力。
缺点:
性能较低:由于多了一个阶段,3PC 的性能比 2PC 更低。
协议复杂:相较于 2PC,3PC 的实现更为复杂。
部分问题没有解决:3PC 无法完全避免所有的故障场景,仍可能出现阻塞。
TCC
关于 TCC(Try-Confirm-Cancel)的概念,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。
TCC分为3个阶段
Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必须业务资源(准隔离性)
Confirm 阶段:确认执行真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源,Confirm 操作要求具备幂等设计,Confirm 失败后需要进行重试。
Cancel 阶段:取消执行,释放 Try 阶段预留的业务资源。Cancel 阶段的异常和 Confirm 阶段异常处理方案基本上一致,要求满足幂等设计。
存在的问题
严重依赖于你自己写代码来回滚和补偿了,会造成补偿代码巨大,非常恶心,在一些场景中,一些业务流程可能用TCC不太好定义及处理。
比较适合的场景:
这个就是除非你是真的一致性要求太高,是你系统中核心之核心的场景,比如常见的就是资金类的场景,那你可以用TCC方案了,自己编写大量的业务逻辑,自己判断一个事务中的各个环节是否OK,不OK就执行补偿/回滚代码,而且最好是你的各个业务执行的时间都比较短。
比如说:一般跟钱相关的,支付、交易的场景,可以使用TCC,严格保证分布式事务要么全部成功,要么全部自动回滚,严格保证资金的正确性。
特点如下:
并发度较高,无长期资源锁定。
开发量较大,需要提供Try/Confirm/Cancel接口。
一致性较好,不会发生SAGA已扣款最后又转账失败的情况
TCC适用于订单类业务,对中间状态有约束的业务
本地消息表
本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,这种思路是来源于ebay。
基本思路就是:
消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。
消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。
生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。
这种方案遵循BASE理论,采用的是最终一致性,笔者认为是这几种方案里面比较适合实际业务场景的,即不会出现像2PC那样复杂的实现(当调用链很长的时候,2PC的可用性是非常低的),也不会像TCC那样可能出现确认或者回滚不了的情况。
优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。在 .NET中 有现成的解决方案。
缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。
事务消息
这个就是干脆不用本地消息表了,直接基于MQ来实现事务,比如RocketMQ就支持消息事务
大概实现:
(1) A系统先发送一个prepared消息到MQ,如果这个prepared消息发送失败那么就直接取消操作别执行了。
(2) 如果这个消息发送成功过了,那么接着执行本地事务,如果成功就告诉MQ发送确认消息,如果失败就告诉MQ回滚消息。
(3) 如果发送了确认消息,那么此时B系统会接收到确认消息,然后执行本地的事务。
(4) MQ会自动定时轮询所有prepared消息回调你的接口,问你,这个消息是不是本地事务处理失败了,所以没有发送确认消息?那么是继续重试还是回滚?一般来说这里你就可以查下数据库看之前本地事务是否执行,如果回滚了,那么这里也回滚吧。这个就是避免可能本地事务执行成功了,别确认消息发送失败了。
(5) 这个方案里,要是系统B的事务失败了咋办?重试喽,自动不断重试直到成功,如果实在是不行,要么就是针对重要的资金类业务进行回滚,比如B系统本地回滚后,想办法通知系统A也回滚(可以使用zookeeper的通知监听机制来进行通知),或者是发送报警由人工来手工回滚和补偿。
最大努力通知方案
(1) 系统A本地事务执行完之后,发送个消息到MQ
(2) 这里会有个专门消费MQ的最大努力通知服务,这个服务会消费MQ然后写入数据库中记录下来,或者是放入内存队列也可以,接着调用系统B的接口。
(3) 要是系统B执行成功就ok了,要是系统B执行失败了,那么最大努力通知服务就定时尝试重新调用系统B,反复N次后,最后还是不行就放弃。
消息队列
消息队列作用
- 异步处理
- 削峰/限流
- 降低系统耦合性
除了这三点之外,消息队列还有其他的一些应用场景,例如实现分布式事务、顺序保证和数据流处理
消息队列带来的问题
系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了!
系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了! 多个系统直接的事务怎么处理,又涉及到了分布式事务。
消息堆积:生产者产生消息的速度远大于消费消费消息的速度导致消息在队列中堆积,导致消息延迟消费或者业务数据延迟消费处理。
RocketMQ
Kafka
kafka保证消息顺序
kafka本身天然支持同一个topic中的同一个partition的消息是有序的,但是对于同一个topic不同partition的之间的顺序是无法保证的;
- 一个topic只创建一个partition,所有的消息都会被发送到partition中,消息会根据发送顺序依次存储到消息队列(partition);
- 发送消息的时候根据一定的规则指定topic的分区(partition),可以保证同一个数据根据这个规则发送到同一个partition;
Kafka消息不丢失
- 生产者
生产者发送消息底层是异步实现,发送消息时由于网络波动导致消息发送失败,但是程序中没有获取发送的结果,可能引起发送消息丢失;
// 这种直接调用get会阻塞直到有结果返回,就是一种同步方式了
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}
//这种异步方式如果最后没有获取结果可能会有数据丢失情况
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
- 在异步发送中获取发送结果,如果失败将消息持久化到DB或者打印相关日志亦或者将消息发送到其他消息队列,方便后续数据回溯;
- 配置生产者重试次数,并且在重试之间增加时间间隔,防止由于快速重试引起的并发以及快速重试没有达到重试效果的情况;
2.消费者
消费者从某一个topic的某一个partition中获取到的一条消息,消费者默认会自动提交offset,但是程序只是获取到了消息还没有执行业务逻辑,从业务上看就是消息丢失了。
- 关闭消费者端自动提交功能,每次获取到消息的时候不自动提交offset,执行完业务逻辑的时候在进行手动offset的提交;这里需要注意,消息可能会被重复消费两次,这里需要在代码层保证数据的幂等性
3.kafka的broker
kafka是将发送到partition中的,每个partition又存在多个副本,其中副本分为leader和follower两种,每一个partition都至少有一个leader类型的副本,多个(可以没有)follower类型的副本。如果kafka参数(replication.factor每个partition的副本数、min.insync.replicas、acks、replica.lag.time.max.ms、unclean.leader.election.enable)配置的不合理,导致leader在故障时重新选举leader的时候会有消息丢失的情况。
- replication.factor每个partition的副本数量,最好设置大于等于3,可以保证高可用数据安全;冗余数据;
- min.insync.replicas每个partition分区在ISR副本集合中最小的数量;
- ISR In-Sync Replica Set(ISR)是Kafka为每个Partition维护的一个副本集合,这些副本与Leader Replica保持数据同步,即已接收到并持久化了Leader发布的所有消息;
- 数据同步:Leader Replica接收到Producer发送的消息后,将其写入本地日志,并通过Pull模式等待Follower Replica主动拉取。Follower Replica从Leader Replica拉取数据并写入本地日志后,将拉取偏移量(fetch offset)返回给Leader。
2.同步状态监测:Leader Replica持续监控每个Follower Replica的拉取偏移量,将其与自身的最新消息偏移量(log end offset)进行比较。若Follower Replica的拉取偏移量与Leader相差不超过一定阈值(由replica.lag.time.max.ms参数控制),则认为该Follower处于同步状态,将其纳入ISR。
3.ISR调整:当Follower Replica因网络延迟、 Broker故障等原因导致拉取偏移量落后过多,超出阈值时,Leader Replica会将其从ISR中移除。当Follower Replica恢复同步后,再次将其加入ISR。ISR是动态集合
- 数据同步:Leader Replica接收到Producer发送的消息后,将其写入本地日志,并通过Pull模式等待Follower Replica主动拉取。Follower Replica从Leader Replica拉取数据并写入本地日志后,将拉取偏移量(fetch offset)返回给Leader。
ISR机制实例说明
场景一:Leader故障与切换
假设有一个包含三个副本(A为Leader,B、C为Follower)的Partition,ISR为{A, B, C}。当Leader A发生故障时:
ZooKeeper检测到A失联,触发Leader选举。
由于B、C均在ISR中,且与A保持同步,二者均有资格成为新Leader。
ZooKeeper选择其中一个(如B)作为新Leader,同时更新Partition的Leader信息。
生产者与消费者感知到Leader变更,开始与新Leader B交互。
原Follower C继续从新Leader B拉取数据,保持同步,确保Partition服务不受影响。
场景二:网络波动与数据一致性
在生产环境中,网络波动可能导致Follower Replica暂时落后:
Follower C因网络问题导致拉取延迟,其fetch offset落后于Leader A的log end offset,超过阈值。Leader A将C从ISR中移除,此时ISR变为{A, B}。
当网络恢复后,C立即追赶数据,一旦其fetch offset与A的log end offset差距缩小到阈值内,C重新加入ISR。
在整个过程中,由于ISR始终保持至少一个同步副本(A或B),即使C暂时落后,消息写入与消费仍能正常进行,保证了数据一致性。
- acks生产者Producer参数,用于确认有多少副本同步成功了才认为消息发送成功;设置 acks = all。acks 是 Kafka 生产者(Producer) 很重要的一个参数。acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后就算被成功发送。当我们配置 acks = all 表示只有所有 ISR 列表的副本全部收到消息时,生产者才会接收到来自服务器的响应. 这种模式是最高级别的,也是最安全的,可以确保不止一个 Broker 接收到了消息. 该模式的延迟会很高.
- replica.lag.time.max.ms参数ISR副本中follower副本允许落后leader副本最大延迟,控制Follower Replica被认为是同步状态的最大延迟时间。增大该值可容忍更大网络延迟,减少ISR频繁变动,但可能延长故障检测时间;减小该值可更快检测到滞后副本,但可能导致ISR更不稳定。
- unclean.leader.election.enable是否允许在非ISR集合中选择副本作为leader副本.Kafka 0.11.0.0 版本开始 unclean.leader.election.enable 参数的默认值由原来的 true 改为 false我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 unclean.leader.election.enable = false 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
kafka消费重复消费 - 消费者已经消费但是没有提交offset;
- kafka在消费者消费超时或者网络原因,引起kafka认为消费者假死,Kafka分区进行rebalance;
第一种情况好理解,消费者没有提交offset,导致下一次消费还是从当前消息的offset处消费,导致当前消息重复被消费;
第二种情况:当一个消费者消费超时消费者还没有提交offset,Kafka会进行分区的rebalance,当前消费者被剔除,然后rebalance后新加入的消费者还是会拉去到当前消息继续消费,会导致当前消息重复消费;
MySQL优化手段
select 字段
尽量不要使用select * 查询数据;select * 容易引起以下问题;
- 导致不能使用覆盖索引(索引还是生效的只是需要回表查询所有字段内容),还需要再次回表;
- 导致网络开销增加以及数据传输时间,尤其是在有大字段的时候,比如text字段等;
- 会消耗更多的CPU资源来解析相关字段;
深分页问题
普通的分页在数据量小
SELECT `score`,`name` FROM `hhsj_order` ORDER BY `score` DESC LIMIT 10000, 10;
的时候耗费时间还是比较短的。
如果数据量变大,达到百万甚至是千万级别,普通的分页耗费的时间就非常长了。
优化成子查询
SELECT `score`,`name` FROM `hhsj_order` WHERE id >= (SELECT id FROM `hhsj_order` LIMIT 1000000, 1) LIMIT 10
或者延迟查询
SELECT `score`,`name` FROM `hhsj_order` a, (SELECT id from `hhsj_order` ORDER BY `score` DESC LIMIT 1000000, 10) b where a.id = b.id
-- 使用 INNER JOIN 进行延迟关联
SELECT t1.*
FROM t_order t1
INNER JOIN (SELECT id FROM t_order where id > 1000000 LIMIT 10) t2 ON t1.id = t2.id;
尽量避免多表做 join
阿里巴巴《Java 开发手册》中有这样一段描述:
【强制】超过三个表禁止 join。需要 join 的字段,数据类型保持绝对一致;多表关联查询时,保证被关联 的字段需要有索引。
说明:即使双表join也要注意表索引、SQL性能。
建议不要使用外键与级联
选择合适的字段类型
存储字节越小,占用也就空间越小,性能也越好。
a.某些字符串可以转换成数字类型存储比如可以将 IP 地址转换成整型数据。
数字是连续的,性能更好,占用空间也更小。
MySQL 提供了两个方法来处理 ip 地址
- INET_ATON() : 把 ip 转为无符号整型 (4-8 位)
- INET_NTOA() :把整型的 ip 转为地址
插入数据前,先用 INET_ATON() 把 ip 地址转为整型,显示数据时,使用 INET_NTOA() 把整型的 ip 地址转为地址显示即可。
b.对于非负型的数据 (如自增 ID,整型 IP,年龄) 来说,要优先使用无符号整型来存储。
无符号相对于有符号可以多出一倍的存储空间
SIGNED INT -2147483648~2147483647UNSIGNED INT 0~4294967295
c.小数值类型(比如年龄、状态表示如 0/1)优先使用 TINYINT 类型。
d.金额字段用 decimal,避免精度丢失。
尽量用 UNION ALL 代替 UNION
UNION 会把两个结果集的所有数据放到临时表中后再进行去重操作,更耗时,更消耗 CPU 资源。
UNION ALL 不会再对结果集进行去重操作,获取到的数据包含重复的项。
不过,如果实际业务场景中不允许产生重复数据的话,还是可以使用 UNION。
正确使用索引
正确使用索引可以大大加快数据的检索速度(大大减少检索的数据量)。
选择合适的字段创建索引
- 不为 NULL 的字段 :索引字段的数据应该尽量不为 NULL,因为对于数据为 NULL 的字段,数据库较难优化。如果字段频繁被查询,但又避免不了为 NULL,建议使用 0,1,true,false 这样语义较为清晰的短值或短字符作为替代。
- 被频繁查询的字段 :我们创建索引的字段应该是查询操作非常频繁的字段。
CREATE INDEX idx_user_email ON users(email);
- 被作为条件查询的字段 :被作为 WHERE 条件查询的字段,应该被考虑建立索引。
- 频繁需要排序的字段 :索引已经排序,这样查询可以利用索引的排序,加快排序查询时间。
- 被经常频繁用于连接的字段 :经常用于连接的字段可能是一些外键列,对于外键列并不一定要建立外键,只是说该列涉及到表与表的关系。对于频繁被连接查询的字段,可以考虑建立索引,提高多表连接查询的效率。
被频繁更新的字段应该慎重建立索引
虽然索引能带来查询上的效率,但是维护索引的成本也是不小的。 如果一个字段不被经常查询,反而被经常修改,那么就更不应该在这种字段上建立索引了。
尽可能的考虑建立联合索引而不是单列索引
因为索引是需要占用磁盘空间的,可以简单理解为每个索引都对应着一颗 B+树。如果一个表的字段过多,索引过多,那么当这个表的数据达到一个体量后,索引占用的空间也是很多的,且修改索引时,耗费的时间也是较多的。如果是联合索引,多个字段在一个索引上,那么将会节约很大磁盘空间,且修改数据的操作效率也会提升。
注意避免冗余索引
冗余索引指的是索引的功能相同,能够命中索引(a, b)就肯定能命中索引(a) ,那么索引(a)就是冗余索引。如(name,city )和(name )这两个索引就是冗余索引,能够命中前者的查询肯定是能够命中后者的 在大多数情况下,都应该尽量扩展已有的索引而不是创建新索引。
考虑在字符串类型的字段上使用前缀索引代替普通索引
前缀索引仅限于字符串类型,较普通索引会占用更小的空间,所以可以考虑使用前缀索引带替普通索引。
避免索引失效
索引失效也是慢查询的主要原因之一,常见的导致索引失效的情况有下面这些:
- 使用 SELECT * 进行查询;
- 创建了组合索引,但查询条件未准守最左匹配原则;
- 在索引列上进行计算、函数、类型转换等操作;
- 以 % 开头的 LIKE 查询比如 like '%abc';;
- 查询条件中使用 or,且 or 的前后条件中有一个列没有索引,涉及的索引都不会被使用到;
- 发生隐式转换;
删除长期未使用的索引
-- 删除不必要的索引
DROP INDEX idx_user_name ON users;
删除长期未使用的索引,不用的索引的存在会造成不必要的性能损耗 MySQL 5.7 可以通过查询 sys 库的 schema_unused_indexes 视图来查询哪些索引从未被使用