用状态机控制业务状态扭转 Hello Spring StateMachine

转载原文 : https://www.codetd.com/article/1010726
Spring StateMachine 官方文档

一、状态机

有限状态机是一种用来进行对象行为建模的工具,其作用主要是描述对象在它的生命周期内所经历的状态序列,以及如何响应来自外界的各种事件。在电商场景(订单、物流、售后)、社交(IM消息投递)、分布式集群管理(分布式计算平台任务编排)等场景都有大规模的使用。

状态机的要素:

状态机可归纳为4个要素,现态、条件、动作、次态。“现态”和“条件”是因,“动作”和“次态”是果。

1 现态:指当前所处的状态
2 条件:又称“事件”,当一个条件被满足,将会触发一个动作,或者执行一次状态的迁移
3 动作:条件满足后执行的动作。动作执行完毕后,可以迁移到新的状态,也可以仍旧保持原状态。动作不是必须的,当条件满足后,也可以不执行任何动作,直接迁移到新的状态。
4 次态:条件满足后要迁往的新状态。“次态”是相对于“现态”而言的,“次态”一旦被激活,就转换成“现态”。

状态机动作类型:

进入动作:在进入状态时进行
退出动作:在退出状态时进行
输入动作:依赖于当前状态和输入条件进行
转移动作:在进行特定转移时进行

二、spring statemachine

spring statemachine是使用 Spring框架下的状态机概念创建的一种应用程序开发框架。它使得状态机结构层次化,简化了配置状态机的过程。

例子一:简单订单流程

image

使用过程:

1 引入依赖

<dependency>
    <groupId>org.springframework.statemachine</groupId>
    <artifactId>spring-statemachine-core</artifactId>
    <version>2.2.0.RELEASE</version>
</dependency>

2 创建订单状态枚举类和状态转换枚举类

/**
 * 订单状态
 */
public enum OrderStatus {
    // 待支付,待发货,待收货,订单结束
    WAIT_PAYMENT, WAIT_DELIVER, WAIT_RECEIVE, FINISH;
}
/**
 * 订单状态改变事件
 */
public enum OrderStatusChangeEvent {
    // 支付,发货,确认收货
    PAYED, DELIVERY, RECEIVED;
}

3 添加配置

/**
 * 订单状态机配置
 */
@Configuration
@EnableStateMachine(name = "orderStateMachine")
public class OrderStateMachineConfig extends StateMachineConfigurerAdapter<OrderStatus, OrderStatusChangeEvent> {

    /**
     * 配置状态
     * @param states
     * @throws Exception
     */
    @Override
    public void configure(StateMachineStateConfigurer<OrderStatus, OrderStatusChangeEvent> states) throws Exception {
        states
                .withStates()
                .initial(OrderStatus.WAIT_PAYMENT)
                .states(EnumSet.allOf(OrderStatus.class));
    }

    /**
     * 配置状态转换事件关系
     * @param transitions
     * @throws Exception
     */
    @Override
    public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderStatusChangeEvent> transitions) throws Exception {
        transitions
                .withExternal().source(OrderStatus.WAIT_PAYMENT).target(OrderStatus.WAIT_DELIVER).event(OrderStatusChangeEvent.PAYED)
                .and()
                .withExternal().source(OrderStatus.WAIT_DELIVER).target(OrderStatus.WAIT_RECEIVE).event(OrderStatusChangeEvent.DELIVERY)
                .and()
                .withExternal().source(OrderStatus.WAIT_RECEIVE).target(OrderStatus.FINISH).event(OrderStatusChangeEvent.RECEIVED);
    }

    /**
     * 持久化配置
     * 实际使用中,可以配合redis等,进行持久化操作
     * @return
     */
    @Bean
    public StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister(){
        return new DefaultStateMachinePersister<>(new StateMachinePersist<OrderStatus, OrderStatusChangeEvent, Order>() {
            @Override
            public void write(StateMachineContext<OrderStatus, OrderStatusChangeEvent> context, Order order) throws Exception {
                //此处并没有进行持久化操作
                System.out.println("持久化订单信息, order = " + order);
            }

            @Override
            public StateMachineContext<OrderStatus, OrderStatusChangeEvent> read(Order order) throws Exception {
                //此处直接获取order中的状态,其实并没有进行持久化读取操作
                return new DefaultStateMachineContext<>(order.getStatus(), null, null, null);
            }
        });
    }
}

4 添加订单状态监听器

@Component("orderStateListener")
@WithStateMachine(name = "orderStateMachine")
public class OrderStateListenerImpl {

    @OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")
    public boolean payTransition(Message<OrderStatusChangeEvent> message) {
        Order order = (Order) message.getHeaders().get("order");
        Long payOrder = (Long) message.getHeaders().get("payOrder");
        Assert.isTrue(!StringUtils.isEmpty(payOrder), "payOrder不能为空");
        order.setPayOrder(payOrder);
        order.setStatus(OrderStatus.WAIT_DELIVER);
        System.out.println("支付 headers=" + message.getHeaders().toString());
        return true;
    }

    @OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")
    public boolean deliverTransition(Message<OrderStatusChangeEvent> message) {
        Order order = (Order) message.getHeaders().get("order");
        Long deliverOrder = (Long) message.getHeaders().get("deliverOrder");
        Assert.isTrue(!StringUtils.isEmpty(deliverOrder), "deliverOrder不能为空");
        order.setDeliverOrder(deliverOrder);
        order.setStatus(OrderStatus.WAIT_RECEIVE);
        System.out.println("发货 headers=" + message.getHeaders().toString());
        return true;
    }

    @OnTransition(source = "WAIT_RECEIVE", target = "FINISH")
    public boolean receiveTransition(Message<OrderStatusChangeEvent> message) {
        Order order = (Order) message.getHeaders().get("order");
        Long confirmReceipt = (Long) message.getHeaders().get("confirmReceipt");
        Assert.isTrue(!StringUtils.isEmpty(confirmReceipt), "confirmReceipt不能为空");
        order.setConfirmReceipt(confirmReceipt);
        order.setStatus(OrderStatus.FINISH);
        System.out.println("收货 headers=" + message.getHeaders().toString());
        return true;
    }
}

5 service中使用

@Service("orderService")
public class OrderServiceImpl implements OrderService {

    @Resource
    private StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine;

    @Resource
    private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister;

    private Long id = 1L;

    private final Map<Long, Order> orders = new HashMap<>();

    @Override
    public Order creat(Long skuId, Integer skuNum) {
        System.out.println(String.format("threadName=%s, 创建订单, skuId = %s, skuNum = %s", Thread.currentThread().getName(), skuId, skuNum));
        Order order = new Order();
        order.setSkuId(skuId);
        order.setSkuNum(skuNum);
        order.setStatus(OrderStatus.WAIT_PAYMENT);
        order.setId(id++);
        orders.put(order.getId(), order);
        return order;
    }

    @Override
    public Order pay(Long id, Long payOrder) {
        Order order = orders.get(id);
        System.out.println(String.format("threadName=%s, 尝试支付, id = %s, payOrder = %s", Thread.currentThread().getName(), id, payOrder));
        Message<OrderStatusChangeEvent> message = MessageBuilder.withPayload(OrderStatusChangeEvent.PAYED)
                .setHeader("order", order)
                .setHeader("payOrder", payOrder)
                .build();
        if (!sendEvent(message, order)) {
            System.out.println(String.format("threadName=%s, 支付失败,状态异常, id = %s, payOrder = %s", Thread.currentThread().getName(), id, payOrder));
        }
        return orders.get(id);
    }


    @Override
    public Order deliver(Long id, Long deliverOrder) {
        Order order = orders.get(id);
        System.out.println(String.format("threadName=%s, 尝试发货, id = %s, deliverOrder = %s", Thread.currentThread().getName(), id, deliverOrder));
        Message<OrderStatusChangeEvent> message = MessageBuilder.withPayload(OrderStatusChangeEvent.DELIVERY)
                .setHeader("order", order)
                .setHeader("deliverOrder", deliverOrder)
                .build();
        if (!sendEvent(message, orders.get(id))) {
            System.out.println(String.format("threadName=%s, 发货失败,状态异常, id = %s, deliverOrder = %s", Thread.currentThread().getName(), id, deliverOrder));
        }
        return orders.get(id);
    }


    @Override
    public Order receive(Long id, Long confirmReceipt) {
        Order order = orders.get(id);
        System.out.println(String.format("threadName=%s, 尝试收货, id = %s, confirmReceipt = %s", Thread.currentThread().getName(), id, confirmReceipt));
        Message<OrderStatusChangeEvent> message = MessageBuilder.withPayload(OrderStatusChangeEvent.RECEIVED)
                .setHeader("order", order)
                .setHeader("confirmReceipt", confirmReceipt)
                .build();
        if (!sendEvent(message, orders.get(id))) {
            System.out.println(String.format("threadName=%s, 收货失败,状态异常, id = %s, confirmReceipt = %s", Thread.currentThread().getName(), id, confirmReceipt));
        }
        return orders.get(id);
    }


    @Override
    public Map<Long, Order> getOrders() {
        return orders;
    }


    /**
     * 发送订单状态转换事件
     *
     * @param message
     * @param order
     * @return
     */
    private synchronized boolean sendEvent(Message<OrderStatusChangeEvent> message, Order order) {
        boolean result = false;
        try {
            orderStateMachine.start();
            //尝试恢复状态机状态
            persister.restore(orderStateMachine, order);
            //添加延迟用于线程安全测试
            Thread.sleep(1000);
            result = orderStateMachine.sendEvent(message);
            //持久化状态机状态
            persister.persist(orderStateMachine, order);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            orderStateMachine.stop();
        }
        return result;
    }
}

6 测试

@SpringBootTest
public class OrderServiceImplTest {

    @Autowired
    private OrderService orderService;

    @Test
    public void testMultThread() throws InterruptedException {
        Random randon = new Random();
        Order order1 = orderService.creat((long) randon.nextInt(999), randon.nextInt(999));
        Order order2 = orderService.creat((long) randon.nextInt(999), randon.nextInt(999));

        orderService.pay(order1.getId(), (long) randon.nextInt(999));

        new Thread(() -> {
            orderService.deliver(order1.getId(), (long) randon.nextInt(999));
            orderService.receive(order1.getId(), (long) randon.nextInt(999));
        }).start();

        orderService.pay(order2.getId(), (long) randon.nextInt(999));
        orderService.deliver(order2.getId(), (long) randon.nextInt(999));
        orderService.receive(order2.getId(), (long) randon.nextInt(999));

        Thread.currentThread().join(6 * 1000);

        System.out.println(orderService.getOrders());

    }

}

例子二:状态机工厂

有些时候,一个状态机不够用,因为我们可能要处理多个订单。这个时候就要用到了状态机工厂。

1、不同线程启用不同statemachine实例处理
2、用工厂模式创建statemachine,且用StateMachinePersist根据recruit对象不同状态反序列化statemachine

1 配置修改

/**
 * 订单状态机配置
 */
@Configuration
@EnableStateMachineFactory(name = "orderStateMachineFactory")
public class OrderStateMachineConfig extends EnumStateMachineConfigurerAdapter<OrderStatus, OrderStatusChangeEvent> {

    /**
     * 订单状态机ID
     */
    public static final String orderStateMachineId = "orderStateMachineId";

    /**
     * 配置状态
     *
     * @param states
     * @throws Exception
     */
    @Override
    public void configure(StateMachineStateConfigurer<OrderStatus, OrderStatusChangeEvent> states) throws Exception {
        states
                .withStates()
                .initial(OrderStatus.WAIT_PAYMENT)
                .states(EnumSet.allOf(OrderStatus.class));
    }

    /**
     * 配置状态转换事件关系
     *
     * @param transitions
     * @throws Exception
     */
    @Override
    public void configure(StateMachineTransitionConfigurer<OrderStatus, OrderStatusChangeEvent> transitions) throws Exception {
        transitions
                .withExternal().source(OrderStatus.WAIT_PAYMENT).target(OrderStatus.WAIT_DELIVER).event(OrderStatusChangeEvent.PAYED)
                .and()
                .withExternal().source(OrderStatus.WAIT_DELIVER).target(OrderStatus.WAIT_RECEIVE).event(OrderStatusChangeEvent.DELIVERY)
                .and()
                .withExternal().source(OrderStatus.WAIT_RECEIVE).target(OrderStatus.FINISH).event(OrderStatusChangeEvent.RECEIVED);
    }

    /**
     * 持久化配置
     * 实际使用中,可以配合redis等,进行持久化操作
     *
     * @return
     */
    @Bean
    public StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister() {
        return new DefaultStateMachinePersister<>(new StateMachinePersist<OrderStatus, OrderStatusChangeEvent, Order>() {
            @Override
            public void write(StateMachineContext<OrderStatus, OrderStatusChangeEvent> context, Order order) throws Exception {
                //此处并没有进行持久化操作
                order.setStatus(context.getState());
                System.out.println("持久化订单信息, order = " + order);
            }

            @Override
            public StateMachineContext<OrderStatus, OrderStatusChangeEvent> read(Order order) throws Exception {
                //此处直接获取order中的状态,其实并没有进行持久化读取操作
                StateMachineContext<OrderStatus, OrderStatusChangeEvent> result = new DefaultStateMachineContext<>(order.getStatus(), null, null, null, null, orderStateMachineId);
                return result;
            }
        });
    }
}

2 service中使用

@Service("orderService")
public class OrderServiceImpl implements OrderService {

    @Resource
    private StateMachineFactory<OrderStatus, OrderStatusChangeEvent> orderStateMachineFactory;

    @Resource
    private StateMachinePersister<OrderStatus, OrderStatusChangeEvent, Order> persister;

    private Long id = 1L;

    private final Map<Long, Order> orders = new HashMap<>();

    @Override
    public Order creat(Long skuId, Integer skuNum) {
        System.out.println(String.format("threadName=%s, 创建订单, skuId = %s, skuNum = %s", Thread.currentThread().getName(), skuId, skuNum));
        Order order = new Order();
        order.setSkuId(skuId);
        order.setSkuNum(skuNum);
        order.setStatus(OrderStatus.WAIT_PAYMENT);
        order.setId(id++);
        orders.put(order.getId(), order);
        return order;
    }

    @Override
    public Order pay(Long id, Long payOrder) {
        Order order = orders.get(id);
        System.out.println(String.format("threadName=%s, 尝试支付, id = %s, payOrder = %s", Thread.currentThread().getName(), id, payOrder));
        Message<OrderStatusChangeEvent> message = MessageBuilder.withPayload(OrderStatusChangeEvent.PAYED)
                .setHeader("order", order)
                .setHeader("payOrder", payOrder)
                .build();
        if (!sendEvent(message, order)) {
            System.out.println(String.format("threadName=%s, 支付失败,状态异常, id = %s, payOrder = %s", Thread.currentThread().getName(), id, payOrder));
        }
        return orders.get(id);
    }


    @Override
    public Order deliver(Long id, Long deliverOrder) {
        Order order = orders.get(id);
        System.out.println(String.format("threadName=%s, 尝试发货, id = %s, deliverOrder = %s", Thread.currentThread().getName(), id, deliverOrder));
        Message<OrderStatusChangeEvent> message = MessageBuilder.withPayload(OrderStatusChangeEvent.DELIVERY)
                .setHeader("order", order)
                .setHeader("deliverOrder", deliverOrder)
                .build();
        if (!sendEvent(message, orders.get(id))) {
            System.out.println(String.format("threadName=%s, 发货失败,状态异常, id = %s, deliverOrder = %s", Thread.currentThread().getName(), id, deliverOrder));
        }
        return orders.get(id);
    }


    @Override
    public Order receive(Long id, Long confirmReceipt) {
        Order order = orders.get(id);
        System.out.println(String.format("threadName=%s, 尝试收货, id = %s, confirmReceipt = %s", Thread.currentThread().getName(), id, confirmReceipt));
        Message<OrderStatusChangeEvent> message = MessageBuilder.withPayload(OrderStatusChangeEvent.RECEIVED)
                .setHeader("order", order)
                .setHeader("confirmReceipt", confirmReceipt)
                .build();
        if (!sendEvent(message, orders.get(id))) {
            System.out.println(String.format("threadName=%s, 收货失败,状态异常, id = %s, confirmReceipt = %s", Thread.currentThread().getName(), id, confirmReceipt));
        }
        return orders.get(id);
    }


    @Override
    public Map<Long, Order> getOrders() {
        return orders;
    }


    /**
     * 发送订单状态转换事件
     *
     * @param message
     * @param order
     * @return
     */
    private boolean sendEvent(Message<OrderStatusChangeEvent> message, Order order) {
        synchronized (String.valueOf(order.getId()).intern()) {
            boolean result = false;
            StateMachine<OrderStatus, OrderStatusChangeEvent> orderStateMachine = orderStateMachineFactory.getStateMachine(OrderStateMachineConfig.orderStateMachineId);
            System.out.println("id=" + order.getId() + " 状态机 orderStateMachine" + orderStateMachine);
            try {
                orderStateMachine.start();
                //尝试恢复状态机状态
                persister.restore(orderStateMachine, order);
                System.out.println("id=" + order.getId() + " 状态机 orderStateMachine id=" + orderStateMachine.getId());
                //添加延迟用于线程安全测试
                Thread.sleep(1000);
                result = orderStateMachine.sendEvent(message);
                //持久化状态机状态
                persister.persist(orderStateMachine, order);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                orderStateMachine.stop();
            }
            return result;
        }
    }
}

3 listener中配置id

@Component("orderStateListener")@WithStateMachine(id = OrderStateMachineConfig.orderStateMachineId)public class OrderStateListenerImpl {    @OnTransition(source = "WAIT_PAYMENT", target = "WAIT_DELIVER")    public boolean payTransition(Message<OrderStatusChangeEvent> message) {        Order order = (Order) message.getHeaders().get("order");        Long payOrder = (Long) message.getHeaders().get("payOrder");        Assert.isTrue(!StringUtils.isEmpty(payOrder), "payOrder不能为空");        order.setPayOrder(payOrder);        order.setStatus(OrderStatus.WAIT_DELIVER);        System.out.println("支付 headers=" + message.getHeaders().toString());        return true;    }    @OnTransition(source = "WAIT_DELIVER", target = "WAIT_RECEIVE")    public boolean deliverTransition(Message<OrderStatusChangeEvent> message) {        Order order = (Order) message.getHeaders().get("order");        Long deliverOrder = (Long) message.getHeaders().get("deliverOrder");        Assert.isTrue(!StringUtils.isEmpty(deliverOrder), "deliverOrder不能为空");        order.setDeliverOrder(deliverOrder);        order.setStatus(OrderStatus.WAIT_RECEIVE);        System.out.println("发货 headers=" + message.getHeaders().toString());        return true;    }    @OnTransition(source = "WAIT_RECEIVE", target = "FINISH")    public boolean receiveTransition(Message<OrderStatusChangeEvent> message) {        Order order = (Order) message.getHeaders().get("order");        Long confirmReceipt = (Long) message.getHeaders().get("confirmReceipt");        Assert.isTrue(!StringUtils.isEmpty(confirmReceipt), "confirmReceipt不能为空");        order.setConfirmReceipt(confirmReceipt);        order.setStatus(OrderStatus.FINISH);        System.out.println("收货 headers=" + message.getHeaders().toString());        return true;    }}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容