1,分布式事务产生的背景;
分情况而定。
1,在单体项目中,多个不同的业务逻辑都是在同一个数据源中心实现事务管理,是不存在分布式事务的问题。因为在同一个数据源的情况下都是采用事务管理器,相当于每个事务管理器对应一个数据源。
2,在单体项目中,有多个不同的数据源,每个数据源中都有自己独立的事务管理器,互不影响,那么这时候也会存在多数据源事务管理:解决方案jta+Atomikos
3,在分布式/微服务架构中。每个服务都有自己的本地的事务。每个服务本地事务互不影响,那么这时候也会存在分布式事务的问题。
分布式事务产生的背景:订单服务调用派单服务接口成功之后,可能会引发错误。
2pc3pc思想,实际上都是解决我们在分布式系统中,每个节点保证数据一致性问题。
事务的定义。
对我们业务逻辑可以实现提交或者是回滚,保证数据一致性的情况。所以,要么提交,要么回滚。
原子性a 要么提交 要么回滚。
一致性 c
持久性d 事务一旦提交或者回滚后,不会再对该结果有任何的影响。
Base 与 cap理论。
1,cap定律
这个定理的内容是指的是在一个分布式系统中,Consistency(一致性),Availability(可用性),Partition tolerance(分区容错性),二者不可兼容。
1,一致性(C)
在分布式系统中的所有数据备份,是在同一时刻是否同样的值,(等同于所有节点访问同一份最新的数据副本)
2,可用性 A
在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求(对数据更新具有高可用性)
3,分区容错性(p) 形成脑裂问题:
以实际效果而言,分区相当于对通信的时限要求,系统如果不能再时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。
4,总结下
以上可以知道分区容错性(P)主要代表网络波动产生的错误,这是不可以避免的,且这三个模式不可以兼得,所以目前就2种模式: cp和Ap模式。
其中cp表示遵循一致性的原则,但不能保证高可用性,其中zookeeper作为注册中心就是采用cp模式,因为zookeeper有过半节点不可以的话整个zookeeper将不可用。
AP表示遵循于可用性原则,例如Eureka作为注册中心用的是AP模式,因为其为去中心化,采用你中有我我中有你的相互注册方式,只要集群中有一个节点可以使用,整个eureka服务就是可用的,但可能会出现短暂的数据不一致问题。
Ap保证可用性:但是不能保证每个副本数据数据一致性,
cp保证数据一致性;如果有过半的zk节点宕机的情况下,不能保证可用性,但是必须保证每个副本节点之间数据一致性,比如zk。
Base理论:
Base是 Basically Available(基本可用),Softstate(软状态)和Eventually consistent(最终一致性)三个短语的缩写。Base理论是对CAP定理逐步演化而来的,base理论核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式达到最终一致性。
1基本可用性;
基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性,注意:这绝不等于系统不可用。
比如: 响应时间的损失,正常情况下,在一个电子商务网站上进行购物的时候,消费者几乎能顺利完成每一笔订单,但是在一些介入大促销购物高峰的时候,由于消费者的购物行为激增,为了保护购物系统的稳定性,部分消费者可能被引导在一个降级页面。
2,软状态。
软状态指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,既允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。
3,最终一致性
最终一致性强调的是所有的数据副本,在经过一段时间的同步之后,最终都能够达到一个一致的状态,因此,最终一致性的本质需要系统保证数据能够达成一致,而不需要时时保证系统数据的强一致性。
2pc 与3pC
通过2pc和3pc 思想可以实现保证每个节点的数据一致性问题。
目前主流分布式解决框架
1,单体项目多数据源,可以jta+Atomilos;
2,基于Rabbitmq的形式解决 最终一致性思想;
3,基于Rocketmq解决分布式事务 ,采用事务消息。
4,lcn采用lcn模式,假关闭连接
5,Alibaba的seata 背景强大,已经成为了主流。
以上适合于微服务架构中,不适合于和外部接口保证分布式事务问题。
6,跨语言的方式实现解决分布式事务问题。类似于支付宝回调方式。
2阶段提交协议基本概念。
2阶段提交协议基本概念:
俩阶段提交协议可以理解为2pc,也就是分为参与者和协调者,协调者会通过2次阶段实现数据最终一致性的
2pc和3pc 的区别就是解决参与者超时问题和多加了一层询问。保证了数据传输的可靠性。
简单的回顾下lcn解决分布式事务。
http://www.txlcn.org/zh-cn/ LCN并不生产事务,LCN只是本地事务的协调工
现在官网已经不维护呢,可以参考:GitEE
https://gitee.com/wangliang1991/tx-lcn?_from=gitee_search
默认密码为:codingapi
lcn基本实现的原理:
1,发起方与参与方都与我们的lcn保持长连接;
2,发起方调用接口前,先向lcn管理器中申请一个全局的事务分组id;
3,发起方调用接口的时候在请求头里传递事务分组id.
4,参与方获取到请求头中有事务分组的id的,则当前业务逻辑执行完实现假关闭,不会提交或者回滚当前事务,
5,发起方调用完接口后,如果出现异常的情况下,在通知事务协调者回滚事务,这时候事务协调则告诉给参与者回滚当前的事务。
lcn 解决分布式事务的原理:
角色划分
1,全局事务协调者(组长);
2,发起方---调用接口者;
3,参与方---被别人调用接口
订单(发起方)调用派单(参与方)
1.发起方和参与方都会与我们的全局事务协调者保持长连接;
- 订单(发起方)连接到我们全局事务协调者,先生成一个事务全局的分组id
- 当我们发起方调用接口的时候,会再请求头中设置该事务全局分组id;
- 参与方从请求头中获取到该全局分组id,这是我们的数据源就不会提交。
- 发起方调用参与方接口完毕之后,如果报错或者没有问题的情况下,都会发送
一个通知给事务协调者,通知给其他的参与方到底是回滚还是提交。
LCN实现分布式事务方案:有可能会引发行锁问题。
整合和源码解读
spring-boot 2.1.6.RELEASE+spring-cloud Greenwich.RELEASE +seata 1.4
pom如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
</parent>
<groupId>com.taotao</groupId>
<artifactId>consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.RELEASE</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mybatis/mybatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.5.6</version>
</dependency>
<!-- Mysql驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.1.1</version>
</dependency>
<!--seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-seata</artifactId>
<version>2.1.0.RELEASE</version>
<exclusions>
<exclusion>
<artifactId>seata-all</artifactId>
<groupId>io.seata</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>com.taotao</groupId>
<artifactId>provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.taotao</groupId>
<artifactId>provider</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
file.conf
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = false
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThreadPrefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
# service configuration, only used in client side
service {
#transaction service group mapping
vgroupMapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
#client transaction configuration, only used in client side
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
sqlParserType = druid
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
}
undo {
dataValidation = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
## transaction log store, only used in server side
store {
## store mode: file、db
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8"
user = "root"
password = "root"
minConn = 1
maxConn = 10
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
}
}
## server configuration, only used in server side
server {
recovery {
#schedule committing retry period in milliseconds
committingRetryPeriod = 1000
#schedule asyn committing retry period in milliseconds
asynCommittingRetryPeriod = 1000
#schedule rollbacking retry period in milliseconds
rollbackingRetryPeriod = 1000
#schedule timeout retry period in milliseconds
timeoutRetryPeriod = 1000
}
undo {
logSaveDays = 7
#schedule delete expired undo_log in milliseconds
logDeletePeriod = 86400000
}
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
maxCommitRetryTimeout = "-1"
maxRollbackRetryTimeout = "-1"
rollbackRetryTimeoutUnlockEnable = false
}
## metrics configuration, only used in server side
metrics {
enabled = false
registryType = "compact"
# multi exporters use comma divided
exporterList = "prometheus"
exporterPrometheusPort = 9898
}
registry.conf:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
loadBalance = "RandomLoadBalance"
loadBalanceVirtualNodes = 10
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
apolloAccesskeySecret = ""
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
yml配置:
server:
port: 8002
##服务名称(服务注册到eureka名称)
spring:
main:
allow-bean-definition-overriding: true
cloud:
alibaba:
seata:
tx-service-group: my_test_tx_group
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/test1?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
username: root
password: root
application:
name: consumer
##服务注册到eureka 地址
eureka:
client:
service-url:
defaultZone: http://localhost:8761/eureka
###因为该应用为注册中心,不会注册自己
register-with-eurkea: true
###是否需要从eureka上获取注册信息
fetch-registry: true
mybatis:
configuration:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
logging:
level:
com.taotao.consumer.mapper: trace
添加 DataSourceConfiguration
package com.taotao.consumer;
import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfiguration {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource(){
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
@Primary
@Bean("dataSource")
public DataSourceProxy dataSource(DataSource druidDataSource){
return new DataSourceProxy(druidDataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy)throws Exception{
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
.getResources("classpath*:/mapper/*.xml"));
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
package com.taotao.consumer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@MapperScan("com.taotao.consumer.mapper")
@EnableEurekaClient
@EnableFeignClients
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
}
其他模块参照此模块配置,
seata的registry.conf 和file.conf配置和项目配置一致。
添加全局事务:
@GlobalTransactional
@Override
public int insertOnlineList(OnlineSheet onlineSheet) {
onlineSheetFeign.insertOnlineSheet(onlineSheet);
OnlineList onlineList = new OnlineList();
onlineList.setPassword("123");
onlineList.setList_id("1");
onlineListMapper.insertOnlineList(onlineList);
return 1 / Integer.parseInt(onlineSheet.getIndex());
}
https://gitee.com/lttwj/wj1/tree/master/seata/springcloud-seata
1,如何学会分析框架的源码?思想有哪些?
A: spring入口角度分析 springbean 生命周期ioc容器底层原理。
B. 报错日志法
2,seata 底层如何解决分布式事务的?
3,seata 如何生成全局xid
4,Seata如何生成前置和后置镜像。
5,seata 如何传递xid的
6, Seata 如何实现逆向回滚
7,如果协调者宕机了,参与事务是回滚还是提交
8,如果协调者宕机发起方没有通知协调者到底是提交还是回滚?