Seata分布式事务

一、seata-server的安装与配置

下载地址:https://github.com/seata/seata/releases
下载版本:1.3.0

1.1、解压并修改配置文件

解压seata-server安装包到指定目录,

修改conf目录下的file.conf配置文件


## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "db"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://10.17.146.218:3306/seata" ##取本机ip地址
    user = "cpor"
    password = "cpor201519"
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    host = "127.0.0.1"
    port = "6379"
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    queryLimit = 100
  }

}

修改conf目录下的registry.conff配置文件

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  eureka {
    serviceUrl = "http://localhost:21000/eureka"  ##取注册中心端口号
    application = "EUREKA-SERVER" ##取注册中心服务名称
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

二、数据库准备

2.1、创建业务数据库

order:存储订单的数据库;
storage:存储库存的数据库;
account:存储账户信息的数据库。

2.2、创建表的sql语句:

-- 订单表
CREATE TABLE `order` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  `count` int(11) DEFAULT NULL COMMENT '数量',
  `money` decimal(11,0) DEFAULT NULL COMMENT '金额',
  `status` int(11) DEFAULT NULL COMMENT '订单状态',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=20 DEFAULT CHARSET=utf8;

--  库存表
CREATE TABLE `storage` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT,
  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  `total` int(11) DEFAULT NULL COMMENT '总库存',
  `used` int(11) DEFAULT NULL COMMENT '已用库存',
  `residue` int(11) DEFAULT NULL COMMENT '剩余库存',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

--  账户信息表
CREATE TABLE `account` (
  `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
  `user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
  `total` decimal(10,0) DEFAULT NULL COMMENT '总额度',
  `used` decimal(10,0) DEFAULT NULL COMMENT '已用余额',
  `residue` decimal(10,0) DEFAULT '0' COMMENT '剩余可用额度',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

2.3、创建日志回滚表

使用Seata还需要在每个数据库中创建日志表

branch_table表
global_table表
lock_table表
undo_log表

2.4、创建表sql语句

CREATE TABLE `branch_table` (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `lock_key` varchar(128) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint(4) DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(32) DEFAULT NULL,
  `transaction_service_group` varchar(32) DEFAULT NULL,
  `transaction_name` varchar(128) DEFAULT NULL,
  `timeout` int(11) DEFAULT NULL,
  `begin_time` bigint(20) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `lock_table` (
  `row_key` varchar(128) NOT NULL,
  `xid` varchar(96) DEFAULT NULL,
  `transaction_id` mediumtext,
  `branch_id` mediumtext,
  `resource_id` varchar(256) DEFAULT NULL,
  `table_name` varchar(32) DEFAULT NULL,
  `pk` varchar(36) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`row_key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8;

三、微服务工程配置(网飞)

3.1、注册中心

3.1.1、yml配置

server:
  port: 21000
spring:
  application:
    #配置微服务的名称
    name: eureka-server
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:21000/eureka
    fetch-registry: false

3.1.2、pom文件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <artifactId>seata-demo</artifactId>
        <groupId>com.pzh</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <artifactId>springcloud-eureka</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

3.1.3、启动配置

@SpringBootApplication
@EnableEurekaServer
public class EurekaServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaServiceApplication.class, args);
    }

}

3.2、订单服务

4.1.1、yml文件配置

server:
  port: 8083
spring:
  application:
    #配置微服务的名称
    name: eureka-order
  cloud:
    alibaba:
      seata:
        tx-service-group: my_test_tx_group #自定义事务组名称需要与seata-server中的对应
  datasource:
    #===========JDBC 配置===========
    url: jdbc:mysql://localhost:3306/seata?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false
    # 初始化线程池数量  最大数  最小数
    driver-class-name: com.mysql.jdbc.Driver
    username: cpor
    password: cpor201519
    type: com.alibaba.druid.pool.DruidDataSource

eureka:
  client:
    service-url:
      defaultZone: http://eureka1:21000/eureka
#开启Feign的断路器功能
feign:
  hystrix:
    enabled: true

mybatis:
  mapper-locations: classpath:mapper/*.xml
  # settings 下的配置
  configuration:
    # 开启驼峰命名功能 其它的一些属性参考 mybatis-config.xml中的settings属性
    map-underscore-to-camel-case: true

4.1.2、pom文件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>seata-demo</artifactId>
        <groupId>com.pzh</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>mircoservice-order</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
            <version>1.4.4.RELEASE</version>
        </dependency>

        <!--        调用方添加feign的依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <!--        添加监控依赖-->
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>1.5.18</version>
            <scope>compile</scope>
        </dependency>
        <!--        统一配置依赖-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.0.0</version>
        </dependency>
        <!--seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-seata</artifactId>
            <version>2.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.31</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

4.1.3、启动配置

@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableEurekaClient
@MapperScan("com.pzh.mircoserviceorder.mapper")
@EnableFeignClients
public class MircoserviceOrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(MircoserviceOrderApplication.class, args);
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource dataSource() {
        DruidDataSource druidDataSource = new DruidDataSource();
        return druidDataSource;
    }

    @Primary
    @Bean("dataSourceProxy")
    public DataSourceProxy dataSourceProxy(DataSource dataSource) {
        return new DataSourceProxy(dataSource);
    }

    @Bean("jdbcTemplate")
    @ConditionalOnBean(DataSourceProxy.class)
    public JdbcTemplate jdbcTemplate(DataSourceProxy dataSourceProxy) {
        return new JdbcTemplate(dataSourceProxy);
    }
}

4.1.4、配置两个.conf文件

把file和registry放在resources文件夹下


image.png

file.conf

transport {
 # tcp udt unix-domain-socket
 type = "TCP"
 #NIO NATIVE
 server = "NIO"
 #enable heartbeat
 heartbeat = true
 #thread factory for netty
 thread-factory {
   boss-thread-prefix = "NettyBoss"
   worker-thread-prefix = "NettyServerNIOWorker"
   server-executor-thread-prefix = "NettyServerBizHandler"
   share-boss-worker = false
   client-selector-thread-prefix = "NettyClientSelector"
   client-selector-thread-size = 1
   client-worker-thread-prefix = "NettyClientWorkerThread"
   # netty boss thread size,will not be used for UDT
   boss-thread-size = 1
   #auto default pin or 8
   worker-thread-size = 8
 }
 shutdown {
   # when destroy server, wait seconds
   wait = 3
 }
 serialization = "seata"
 compressor = "none"
}
service {
 #vgroup->rgroup
 vgroupMapping.my_test_tx_group = "EUREKA-ORDER"  ##取注册中心服务名称
 #only support single node
 EUREKA-ORDER.grouplist = "10.73.33.41:8091" ## ip取本机ip地址,端口号为seata的端口号
 #degrade current not support
 enableDegrade = false
 #disable
 disable = false
 #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
 max.commit.retry.timeout = "-1"
 max.rollback.retry.timeout = "-1"
}

client {
 async.commit.buffer.limit = 10000
 lock {
   retry.internal = 10
   retry.times = 30
 }
 report.retry.count = 5
}

## transaction log store
store {
 ## store mode: file、db
 mode = "db"

 ## file store
 file {
   dir = "sessionStore"

   # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
   max-branch-session-size = 16384
   # globe session size , if exceeded throws exceptions
   max-global-session-size = 512
   # file buffer size , if exceeded allocate new buffer
   file-write-buffer-cache-size = 16384
   # when recover batch read size
   session.reload.read_size = 100
   # async, sync
   flush-disk-mode = async
 }

 ## database store
 db {
   ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
   datasource = "dbcp"
   ## mysql/oracle/h2/oceanbase etc.
   db-type = "mysql"
   url = "jdbc:mysql://10.17.146.218:3306/seata"
   user = "cpor"
   password = "cpor201519"
   min-conn = 1
   max-conn = 3
   global.table = "global_table"
   branch.table = "branch_table"
   lock-table = "lock_table"
   query-limit = 100
 }
}
lock {
 ## the lock store mode: local、remote
 mode = "remote"

 local {
   ## store locks in user's database
 }

 remote {
   ## store locks in the seata's server
 }
}
recovery {
 committing-retry-delay = 30
 asyn-committing-retry-delay = 30
 rollbacking-retry-delay = 30
 timeout-retry-delay = 30
}

transaction {
 undo.data.validation = true
 undo.log.serialization = "jackson"
}

## metrics settings
metrics {
 enabled = false
 registry-type = "compact"
 # multi exporters use comma divided
 exporter-list = "prometheus"
 exporter-prometheus-port = 9898
}

registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "file"

  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  file {
    name = "file.conf"
  }
}

注:库存,账号信息服务与订单服务一样

四、操作数据库接口

4.1、订单服务

xml文件

    <insert id="create" parameterType="com.pzh.mircoserviceorder.entity.Order">
      INSERT INTO `order` (user_id, product_id,`count`,money,status)
                       VALUES
                       ( #{order.userId}, #{order.productId},#{order.count},#{order.money},#{order.status});
    </insert>
    <update id="updateStatus">
        UPDATE `order`
          SET status = #{status}
        WHERE user_id = #{userId};
    </update>

4.2、库存服务

xml文件

    <update id="decrease">
        UPDATE storage
        SET used    = used + #{count},
            residue = residue - #{count}
        WHERE product_id = #{productId}
    </update>

4.3、账号信息服务

xml文件

<update id="decrease">
        UPDATE account
        SET residue = residue - #{money},
            used    = used + #{money}
        WHERE user_id = #{userId};
    </update>

五、服务之间的接口调用

5.1、库存服务

@RestController
@RequestMapping("/storage")
public class StorageController {
    @Resource
    private IStorageService storageService;

   /*
    *扣减库存
   */
    @RequestMapping("/decrease")
    void decrease(Long productId,Integer count){
        storageService.decrease(productId, count);
    }
}

5.2、账号信息服务

@RestController
@RequestMapping("/account")
public class AccountController {
    @Resource
    private IAccountService accountService;

    /**
     * 扣减账户余额
     */
    @RequestMapping("/decrease")
    void decrease(Long userId,BigDecimal money) throws Exception {
        accountService.decreaseMoney(userId, money);
    }
}

5.3、服务之间的调用:

在订单服务中调用库存账号信息服务

5.3.1、创建两个接口

image.png
@FeignClient(value = "eureka-account")
public interface IAccountService {
    /**
     * 扣减账户余额
     * @param userId
     * @param money
     * @return
     */
    @RequestMapping("/account/decrease")
    CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
@FeignClient(value = "eureka-storage")
public interface IStorageService {
    /**
     * 扣减库存
     * @param productId
     * @param count
     * @return
     */
    @GetMapping(value = "/storage/decrease")
    CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}

5.3.2、使用两个接口

private static final Logger LOGGER = LoggerFactory.getLogger(OrderServiceImpl.class);

    @Resource
    private OrderMapper orderMapper;
    @Resource
    private IAccountService accountService;
    @Resource
    private IStorageService storageService;
    @Override
    @GlobalTransactional(name = "eureka-order",rollbackFor = Exception.class)
    public void create(Order order) {
        LOGGER.info("------->下单开始");
        //本应用创建订单
        orderMapper.create(order);

        //远程调用库存服务扣减库存
        LOGGER.info("------->order-service中扣减库存开始");
        storageService.decrease(order.getProductId(),order.getCount());
        LOGGER.info("------->order-service中扣减库存结束");

        //远程调用账户服务扣减余额
        LOGGER.info("------->order-service中扣减余额开始");
        accountService.decrease(order.getUserId(),order.getMoney());
        LOGGER.info("------->order-service中扣减余额结束");

        //修改订单状态为已完成
        LOGGER.info("------->order-service中修改订单状态开始");
        orderMapper.updateStatus(order.getUserId(),1);
        LOGGER.info("------->order-service中修改订单状态结束");

        LOGGER.info("------->下单结束");
    }

六、启动步骤:

step1:启动seata

image.png

image.png

step2:启动注册中心

step3:启动三个服务

七、测试

7.1、初始化数据库数据

image.png

7.2、在订单服务创建一个调用接口

@RestController
@RequestMapping("/order")
public class OrderController {

    @Resource
    private IOrderService orderService;
    @RequestMapping("/create")
    public String create(){
        Order order = new Order();
        order.setUserId(1L);
        order.setProductId(1L);
        order.setMoney(BigDecimal.valueOf(99.9));
        order.setStatus(0);
        order.setCount(10);
        orderService.create(order);
        return "SUCCESS";
    }
}

7.3、调用接口

http://localhost:8083/order/create

结果

image.png

7.4、制造一个分布式事务问题

这里我们会创建三个服务,一个订单服务,一个库存服务,一个账户服务。当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。

在扣减账户余额时抛出异常


image.png

测试结果
数据没有任何改变,在order表中,id在15-19过程中,就是测试的时候回滚了16,17,18三条数据

image.png

八、观察工作过程

undo_log表在调用服务时会记录数据,工作完成后会自动删除表记录


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。