项目搭建
我搭建的是一个最基础的用户下单,减库存,减用户金额,创建订单的一个微服务框架。因为后面需要测试微服务下的分布式事务,这里测试的是 多数据源 下的分布式事务。
项目结构如下:
|-- demo
|-- entity 实体对象(为了让其他服务拥有所有服务对象)
|-- order 订单 (pom导入了 entity )
|-- stock 库存 (pom导入了 entity )
|-- user 用户 (pom导入了 entity )
这里使用了mybatis-plus的相关技术,不懂得请自行百度,数据源配置如下:
spring:
main:
allow-bean-definition-overriding: true
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
druid:
stat-view-servlet:
enabled: true
url-pattern: "/druid/*"
allow: 127.0.0.1
deny: 192.168.1.73
reset-enable: true
login-username: admin
login-password: admin@2020
web-stat-filter:
enabled: true
url-pattern: "/*"
exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
dynamic:
druid:
filters: stat,wall
initial-size: 10
min-idle: 10
maxActive: 200
maxWait: 10000
useUnfairLock: true
validation-query: 'select 1'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
primary: user
datasource:
user:
url: jdbc:mysql://0.0.0.0:3306/user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
order:
url: jdbc:mysql://0.0.0.0:3306/order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
stock:
url: jdbc:mysql://0.0.0.0:3306/stock?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
表结构创建
所建的表都不在同一个库下,分为order(表为orders)、user、stock三个库,各自的库建各自的表,每个库都要建一个相同的 undo_log 表,如下:
/*表: orders*/--------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ -------- --------- ------ ------ ------- ------ ------------------------------- ---------
id int (NULL) NO PRI (NULL) select,insert,update,references
uid int (NULL) YES (NULL) select,insert,update,references
count int (NULL) YES (NULL) select,insert,update,references
time datetime (NULL) YES (NULL) select,insert,update,references
money int (NULL) YES (NULL) select,insert,update,references
/*表: stock*/--------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ ------ --------- ------ ------ ------- ------ ------------------------------- ---------
id int (NULL) NO PRI (NULL) select,insert,update,references
num int (NULL) YES (NULL) select,insert,update,references
price int (NULL) YES (NULL) select,insert,update,references
/*表: account*/----------------
/*列信息*/-----------
Field Type Collation Null Key Default Extra Privileges Comment
------ ------ --------- ------ ------ ------- -------------- ------------------------------- ---------
id int (NULL) NO PRI (NULL) auto_increment select,insert,update,references
amount int (NULL) YES (NULL) select,insert,update,references
表数据如下:
stock: id = 1,num = 500,price = 5
account: id = 1,amount = 2000
编写过程
我把所有的接口和实现均写在 user 服务下进行测试,如下图:
AccountServer
public interface AccountServer {
/**
* 从胡账户中指支出
**/
void debit(Integer uid, Integer money);
}
@Service
public class AccountServerImpl implements AccountServer {
@Autowired
private AccountMapper accountMapper;
@DS("user")
@Override
public void debit(Integer uid, Integer money) {
Account account = accountMapper.selectById(uid);
account.setAmount(account.getAmount() - money);
accountMapper.updateById(account);
}
}
StockServer
public interface StockServer {
/**
* 扣除库存数量,返回金额
**/
Integer deduct(Integer stockId, Integer count);
}
@Service
public class StockServerImpl implements StockServer {
@Autowired
private StockMapper stockMapper;
// 库存这里需要乐观锁,但是这里我就不做了
@DS("stock")
@Override
public Integer deduct(Integer stockId, Integer count) {
Stock stock = stockMapper.selectById(stockId);
stock.setNum(stock.getNum() - count);
stockMapper.updateById(stock);
return stock.getPrice().intValue() * count.intValue();
}
}
OrderServer
public interface OrderServer {
/**
* 创建订单
**/
void create(Integer uid, Integer stockId, Integer count,Integer money);
}
@Service
public class OrderServerImpl implements OrderServer {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountServer accountServer;
@DS("order")
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {
accountServer.debit(uid,money);
Order order = new Order();
order.setId(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);
orderMapper.insert(order);
}
}
BusinessService
public interface BusinessService {
/**
* 采购
**/
void purchase(Integer uid, Integer stockId, Integer count);
}
@Service
public class BusinessServiceImpl implements BusinessService {
@Autowired
private StockServer stockServer;
@Autowired
private OrderServer orderServer;
@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}
}
UserApplication
@SpringBootApplication
public class UserApplication implements CommandLineRunner {
@Autowired
private BusinessService businessService;
public static void main(String[] args) {
SpringApplication.run(UserApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
/**
* 执行采购,用户id=1,库存id=1,采购数量count=3
**/
businessService.purchase(1,1,3);
}
}
无seata事务处理测试
测试加 @Transactional 是否有效
@Transactional(rollbackFor = Exception.class)
@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}
测试结果:
java.sql.SQLSyntaxErrorException: Table 'user.stock' doesn't exist;
什么原因呢?其实就是 @Transactional 的事务传播策略默认为 Propagation.REQUIRED,如果当前没有事务,就新建一个事务,如果已经存在一个事务,就加入到这个事务中。也就是说不同之间的服务调用使用的是同一个库的事务,所以他就查同一个库下的这张表。
避免这种情况可以在子服务的方法上加 @Transactional(propagation = Propagation.REQUIRES_NEW),新建事务,如果当前存在事务,把当前事务挂起。意思就是,A调B的过程中,A方法用的是A库的事务,B方法用的是B库的事务,相互独立不受影响。
代码修改如下:
// 库存这里需要乐观锁,但是这里我就不做了
@DS("stock")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public Integer deduct(Integer stockId, Integer count) {
Stock stock = stockMapper.selectById(stockId);
stock.setNum(stock.getNum() - count);
stockMapper.updateById(stock);
return stock.getPrice().intValue() * count.intValue();
}
@DS("order")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {
accountServer.debit(uid,money);
Order order = new Order();
order.setUid(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);
if(1==1){
throw new RuntimeException("下单异常");
}
orderMapper.insert(order);
}
@DS("user")
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Override
public void debit(Integer uid, Integer money) {
Account account = accountMapper.selectById(uid);
account.setAmount(account.getAmount() - money);
accountMapper.updateById(account);
}
测试下单异常
@Service
public class OrderServerImpl implements OrderServer {
@Autowired
private OrderMapper orderMapper;
@Autowired
private AccountServer accountServer;
@DS("order")
@Override
public void create(Integer uid, Integer stockId, Integer count,Integer money) {
accountServer.debit(uid,money);
Order order = new Order();
order.setUid(uid);
order.setTime(new Date());
order.setCount(count);
order.setMoney(money);
if(1==1){
throw new RuntimeException("下单异常");
}
orderMapper.insert(order);
}
}
测试结果:
stock:id=1,num=497,price=5
account:id=1,amount=1985
order:
用户账户扣除,库存扣除,下单失败。
也就是说 @Transactional 处理不了分布式事务,只能处理同一个库的事务。
添加 Seata 分布式事务
添加依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
</exclusion>
</exclusions>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.3.0</version>
</dependency>
修改配置
server:
port: 8082
spring:
main:
allow-bean-definition-overriding: true
autoconfigure:
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
druid:
stat-view-servlet:
enabled: true
url-pattern: "/druid/*"
allow: 127.0.0.1
deny: 192.168.1.73
reset-enable: true
login-username: admin
login-password: admin@2020
web-stat-filter:
enabled: true
url-pattern: "/*"
exclusions: "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"
dynamic:
druid:
filters: stat,wall
initial-size: 30
min-idle: 20
maxActive: 200
maxWait: 10000
useUnfairLock: true
validation-query: 'select 1'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
primary: user
# 启用严格模式
strict: true
# 开启分布式事务
seata: true
# 事务模式 为AT
seata-mode: AT
datasource:
user:
url: jdbc:mysql://10.240.30.100:3306/user?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
# 建表脚本,启动时会运行
# schema: classpath:db/schema-account.sql
order:
url: jdbc:mysql://10.240.30.100:3306/order?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
stock:
url: jdbc:mysql://10.240.30.100:3306/stock?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&&serverTimezone=Asia/Shanghai
driver-class-name: com.mysql.cj.jdbc.Driver
type: com.alibaba.druid.pool.DruidDataSource
username: dev
password: mysql@dev.2020
# 事务配置
seata:
enabled: true
# 启用自动代理数据源
enable-auto-data-source-proxy: false
# 随便起个名字,但最好与服务名称一致
application-id: ${spring.application.name}
# 此处的名称一定要与 service.vgroupMapping 下配置的参数保持一致
tx-service-group: my_test_tx_group
# 目的是从nacos 获取配置信息
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
username: nacos
password: nacos@root@2020
namespace:
group: SEATA_GROUP
# registry 目的是从nacos找 seata-server 服务
registry:
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
username: nacos
password: nacos@root@2020
namespace:
seata-server 端的config 和 registry 是注册中心 和 配置中心。
client 配置的 registry 是从naocs所在的注册中心获取seata-server所在的集群或服务,用来连接seata-serve,config 是从naocs 所在config,获取配置。
关于 nacos 和 seata-server的配置请看 分布式事务 Seata(三) Seata搭建
添加全局事务
@Transactional
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public void purchase(Integer uid, Integer stockId, Integer count) {
Integer money = stockServer.deduct(stockId, count);
orderServer.create(uid, stockId, count,money);
}
测试
stock:id=1,num=500,price=5
account:id=1,amount=2000
order:
证明多数据源事务处理成功。