背景
上文说到BizManager的实现,其实整体看,这块东西跟spring statemachine并没有什么关系,纯粹是个人写的收不住了,把近期的一些东西整理下。对这部分不感兴趣的可以跳过,直接看下一章节,那块是从外部调用状态机引擎的实现,还更有用一些。
BaseBizManager定义
BaseBizManager统一规范业务处理的接口,其定义如下:
import org.springframework.statemachine.StateMachine;
/**
* 定义写服务的入口process模板方法
*
* @param <T>
* @param <R>
*/
@FunctionalInterface
public interface BaseBizManager<T, R> {
/**
* process模板,用于处理通用写服务相关方法,包括处理幂等、记录日志、事务保证等
*
* @param request
* @return
* @throws BusinessException
*/
R process(T request, StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>... stateMachines) throws BusinessException;
}
注意这里用了个可选参数stateMachines,有些场景在处理逻辑内部是不需要状态机的,可不传。
抽象类实现
抽象类中使用到了spring的反射工具类,封装了一层,先提供出来:
import org.springframework.util.ReflectionUtils;
import java.lang.reflect.Field;
import java.util.Objects;
public class ReflectionUtil {
/**
* 对Spring的ReflectionUtils中getValue方法做简单封装
*
* @param object
* @param key
* @param defaultVal
* @return
*/
public static Object getValue(Object object, String key, Object defaultVal) {
Field field = ReflectionUtils.findField(object.getClass(), key);
if (Objects.isNull(field)) {
return defaultVal;
}
field.setAccessible(true);
return ReflectionUtils.getField(field, object);
}
}
然后是对应的抽象类实现
import com.vipfins.finance.middleplatform.order.util.ReflectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.statemachine.StateMachine;
import org.springframework.transaction.annotation.Transactional;
/**
* 处理订单更新模板类, 写DB操作、日志、处理统一事务\幂等操作
*/
@Slf4j
public abstract class AbstractBizManagerImpl<T,R> implements BaseBizManager<T,R>{
@Autowired
private BizOrderIdemRepository orderIdemRepository;
@Autowired
private BizOrderLogEventPublisher bizOrderLogEventPublisher;
@Override
@Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
public R process(T request, StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>... stateMachines) throws BusinessException {
try {
// 幂等控制
if (checkIdem(request)) {
log.info("check idempotence bingo,request={}", request);
throw new BusinessException(BizOrderErrorCode.SUCCESS, "幂等操作,本次请求忽略");
}
// 实际业务处理
R resp = doProcess(request, stateMachines);
log.info("response = {}", resp);
return resp;
} catch (BusinessException e) {
log.error("process Business Exception = {}", e);
throw new BusinessException(e.getErrorCode(), ExceptionUtil.getErrorMsg(e));
} catch (Exception e) {
log.error("process Exception = {}", e);
throw new BusinessException(BizOrderErrorCode.ORDER_GENERIC_EXCEPTION, ExceptionUtil.getErrorMsg(e));
}
}
/**
* 实际的业务操作
*
* @param request 业务请求
* @param stateMachines 将上游处理后的stateMachine传递进来,后续持久化,可选参数
* @return 业务结果
*/
public abstract R doProcess(T request, StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>... stateMachines) throws Exception;
/**
* 判断是否幂等
* 幂等 ==> 返回true
*
* @param request
* @return
*/
private boolean checkIdem(T request) {
boolean result = false;
// 反射获取请求中基础数据
String bizOrderId = (String) ReflectionUtil.getValue(request, "bizCode", "");
String operationType = (String) ReflectionUtil.getValue(request, "operationType", "");
String sourceId = (String) ReflectionUtil.getValue(request, "sourceId", "");
String idemNo = bizOrderId + operationType + sourceId;
BizOrderIdem idem = new BizOrderIdem(idemNo, bizOrderId);
// 违反唯一性约束
try {
orderIdemRepository.insert(idem);
} catch (DuplicateKeyException e) {
result = true;
log.error("接口重复消费, idemNo = {}, orderCode = {}, exception = {}", idemNo, bizOrderId, e);
} catch (Exception e) {
log.error("未知异常,exception={}", e);
}
return result;
}
}
不同场景的BizManager实现
综合来看,有三种不同的bizManager实现:
- 创建订单,创建逻辑自成一系,与其他业务逻辑思路均不相同
- 订单状态变化,这块主要关注订单状态的变化,当前是是什么,之后是什么,基本上所有订单状态变化的服务实现思路都一致
- 订单状态自动迁移,比如订单从审核拒绝自动迁移到关闭状态,这种需要主动触发spring statemachine event。
下面分别针对这三种实现方式说明:
订单创建
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Map;
@Component("bizOrderCreateBizManager")
@Slf4j
public class BizOrderCreateBizManagerImpl extends AbstractBizManagerImpl<BizOrderCreateRequest, BizOrderCreateResponse> {
@Resource
private BaseConvertor<BizOrderCreateModel, BizOrder> createBizOrderConvertor;
@Resource
private BaseConvertor<BizOrderExtendsCreateModel, BizOrderExtends> createBizOrderExtendsConvertor;
@Resource
private BaseConvertor<BizOrderChannelCreateModel, BizOrderChannel> createBizOrderChannelConvertor;
@Resource
private BaseConvertor<BizOrderActivityCreateModel, BizOrderActivity> createBizOrderActivityConvertor;
@Autowired
private BizOrderRepository bizOrderRepository;
@Autowired
private BizOrderExtendsRepository bizOrderExtendsRepository;
@Autowired
private BizOrderChannelRepository bizOrderChannelRepository;
@Autowired
private BizOrderActivityRepository bizOrderActivityRepository;
@Autowired
private BizOrderLogEventPublisher bizOrderLogEventPublisher;
@Autowired
private FinMerchantContractQueryService contractQueryService;
/**
* 实际的业务操作
*
* @param request 业务请求
* @return 结果
*/
@Override
public BizOrderCreateResponse doProcess(BizOrderCreateRequest request, StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>... stateMachines) throws Exception {
BizOrderCreateModel createModel = request.getBizOrderCreateModel();
...... // 校验逻辑
// 构造对应的订单模型. 同时修改
BizOrder bizOrder = createBizOrderConvertor.modelToEntityConvertor(createModel);
bizOrder.setCallSystem(request.getCallSystem());
// MDC日志埋点
LogUtil.setBizId(bizOrder.getBizOrderId());
// 对于XXX业务,在创建订单时由于没有详细金额,而是在签约是才把资金等信息传递过来,前期对于传递过来的数据不做处理(无效)
if (!BizOrderBizTypeEnum.isIn(bizOrder.getBizType(), BizOrderBizTypeEnum.EMPLOAN) &&
CollectionUtils.isNotEmpty(request.getSubBizOrderCreateModels())) {
// 判断主订单中金额是否等于所有子订单金额
BigDecimal totalAmount = bizOrder.getRealAmount();
BigDecimal totalSumFromSub = request.getSubBizOrderCreateModels().parallelStream().map(model ->
BigDecimal.valueOf(model.getRealAmount()))
.reduce(BigDecimal.ZERO, BigDecimal::add);
if(!totalAmount.equals(totalSumFromSub)){
throw new BusinessException(BizOrderErrorCode.ORDER_AMOUNT_NOT_MATCH,"主子订单金额不匹配");
}
bizOrder.setOrderLevel(BizOrderLevelEnum.MAIN.getOrderLevel()); // 主订单
request.getSubBizOrderCreateModels().parallelStream().forEach(subOrderModel -> {
BizOrder subBizOrder = createBizOrderConvertor.modelToEntityConvertor(subOrderModel);
// 重新设置parentId及orderLevel
subBizOrder.setOrderLevel(BizOrderLevelEnum.DETAIL.getOrderLevel());
subBizOrder.setParentId(bizOrder.getBizOrderId());
bizOrderRepository.insertSelective(subBizOrder);
});
}
bizOrderRepository.insertSelective(bizOrder);
......
BizOrderCreateResponse createResponse = new BizOrderCreateResponse();
createResponse.setBizOrderId(bizOrder.getBizOrderId());
// send log event
Map attrMap = com.google.common.collect.Maps.newHashMap();
attrMap.put(AttributesKeyEnum.TARGET_STATUS.getShortKeyName(), createModel.getOrderStatus());
attrMap.put(AttributesKeyEnum.CALL_SYSTEM.getShortKeyName(), request.getCallSystem());
// 发送记录日志的event
bizOrderLogEventPublisher.bizOrderEventPublish(bizOrder,
request.getOperationType(),
BizOrderStatusEnum.CREATE.getStatus(),
attrMap);
return createResponse;
}
}
这里的convertor都是基于Orika的,有兴趣的可以度娘下了解。
可以看到,其实处理很简单,就是将请求参数构造成订单对象,然后入库,加入了一部分数据校验。
订单状态变迁—有明确目标
订单状态变迁,由于重复代码比较多,所以这里抽象出来了另外一个模板类,如下:
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.statemachine.StateMachine;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.time.Instant;
import java.util.Date;
import java.util.Map;
/**
* 主要处理订单状态变更,变更到稳定节点状态,而非审核中Auditing这种中间状态
*
* 直接落库,发送log event,不需要其他操作
*/
@Slf4j
public abstract class BaseStatusSimpleChangeBizManagerImpl<T, R> extends AbstractBizManagerImpl<T, R> {
@Autowired
private BizOrderRepository bizOrderRepository;
@Autowired
private BizOrderLogEventPublisher bizOrderLogEventPublisher;
/**
* 实际的业务操作
*
* @param request 业务请求
* @return 业务结果
*/
public OrderBaseResponse doProcess(BizOrderStatusRequest request, StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>... stateMachines) throws Exception {
// 重新获取订单信息,肯定不是空,不然就在上层拦截了
BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(request.getBizCode());
BizOrderStatusModel statusModel = request.getBizOrderStatusModel();
BizOrder newBizOrder = new BizOrder();
newBizOrder.setOrderStatus(wrapTargetStatus(statusModel)); // 前面已经处理对应的状态设置
newBizOrder.setUpdateTime(Date.from(Instant.now()));
newBizOrder.setFinishReason(wrapFinishReason(statusModel)); // 需要子类处理
// 判断是否需要处理attributes 及 effectMoney
if (null != statusModel.getAttributesMap() && statusModel.getAttributesMap().size() > 0) {
Map<String, String> curAttributes = AttributeUtil.fromString(bizOrder.getAttributesStr());
curAttributes.putAll(statusModel.getAttributesMap());
newBizOrder.setAttributesStr(AttributeUtil.toString(curAttributes));
}
newBizOrder.setBizOrderId(bizOrder.getBizOrderId());
// 订单信息保存
int updateCount = bizOrderRepository.updateByPrimaryKeySelective(newBizOrder);
if (1 != updateCount) {
throw new BusinessException(BizOrderErrorCode.ORDER_UPDATE_ERROR, "订单状态变更失败");
}
// send log event
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 如果effectMoney不为空,则记录到log中
Map<String,String> attributesMap = Maps.newHashMap();
if (null != statusModel.getEffectAmount()) {
attributesMap.put(AttributesKeyEnum.EFFECT_MONEY_AMOUNT.getShortKeyName(), statusModel.getEffectAmount().toString());
}
if(null != statusModel.getAttributesMap()) {
attributesMap.putAll(statusModel.getAttributesMap());
}
attributesMap.put(AttributesKeyEnum.CALL_SYSTEM.getShortKeyName(), request.getCallSystem());
bizOrderLogEventPublisher.bizOrderEventPublish(newBizOrder, request.getOperationType(),
bizOrder.getOrderStatus(), attributesMap);
}
});
return new OrderBaseResponse(); // 返回值不会用到
}
/**
* 构造目标状态
*
* @param statusModel 状态模型
* @return 结果
*/
public abstract String wrapTargetStatus(BizOrderStatusModel statusModel);
/**
* 构造关闭原因,仅需要在close、success、cancel场景下处理
* @param statusModel 状态模型
* @return 结果
*/
public abstract String wrapFinishReason(BizOrderStatusModel statusModel);
}
比如订单状态变为取消,就可以简化实现成这个样子(实现BaseStatusSimpleChangeBizManagerImpl):
import org.apache.commons.lang3.StringUtils;
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Component;
@Component("bizOrderCancelBizManager")
public class BizOrderCancelBizManagerImpl extends BaseStatusSimpleChangeBizManagerImpl<BizOrderStatusRequest, OrderBaseResponse> {
/**
* 实际的业务操作
*
* @param request 业务请求
* @return 业务结果
*/
@Override
public OrderBaseResponse doProcess(BizOrderStatusRequest request, StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>... stateMachines) throws Exception {
return super.doProcess(request,stateMachines);
}
/**
* 构造目标状态
*
* @param statusModel 状态模型
* @return 结果
*/
@Override
public String wrapTargetStatus(BizOrderStatusModel statusModel) {
return BizOrderStatusEnum.CANCEL.getStatus();
}
/**
* 关闭原因,仅需要在close、success、cancel场景下处理
*
* @param statusModel
* @return
*/
@Override
public String wrapFinishReason(BizOrderStatusModel statusModel) {
if (StringUtils.isBlank(statusModel.getFinishReason())) {
return "CANCEL_FROM_" + StringUtils.upperCase(statusModel.getCurrentOrderStatus());
}
return statusModel.getFinishReason();
}
}
基本上每个简单的状态变化逻辑,都是如此实现。
订单状态变迁--需要再次发送消息
这种同样抽象出了一个模板类,如下:
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.util.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import java.time.Instant;
import java.util.Date;
/**
* 主要主动处理订单到达中间状态的变更处理,比如toXXXAction对应的基类
*
* 需要发送statemachine event 用来串联下一步操作,并写db,发log event
*/
@Slf4j
public abstract class BaseStatusToUnstableTargetBizManagerImpl<T, R> extends AbstractBizManagerImpl<T, R> {
@Autowired
private BizOrderRepository bizOrderRepository;
@Autowired
private BizOrderLogEventPublisher bizOrderLogEventPublisher;
@Autowired
private BeanMapper beanMapper;
/**
* 实际的业务操作
*
* @param request 业务请求
* @param stateMachines 将上游处理后的stateMachine传递进来,后续持久化
* @return 业务结果
*/
OrderBaseResponse doProcess(BizOrderStatusRequest request, StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>... stateMachines) throws Exception {
// 修改状态
BizOrderStatusModel statusModel = request.getBizOrderStatusModel();
BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(statusModel.getBizOrderId());
String currentStatus = bizOrder.getOrderStatus();
BizOrder newBizOrder = new BizOrder();
newBizOrder.setOrderStatus(statusModel.getTargetOrderStatus());
newBizOrder.setBizOrderId(bizOrder.getBizOrderId());
newBizOrder.setUpdateTime(Date.from(Instant.now()));
int updateCount = bizOrderRepository.updateByPrimaryKeySelective(newBizOrder);
if (1 != updateCount) {
throw new BusinessException(BizOrderErrorCode.ORDER_UPDATE_ERROR, "更新订单状态失败");
}
// send spring statemachine event
if (null != stateMachines && stateMachines.length > 0) {
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine = stateMachines[0];
// 重新构造request
statusModel.setFinishReason(wrapFinishReason(statusModel.getTargetOrderStatus()) + statusModel.getFinishReason());
statusModel.setCurrentOrderStatus(statusModel.getTargetOrderStatus());
statusModel.setTargetOrderStatus(wrapTargetOrderStatus());
request.setBizOrderStatusModel(statusModel); // need or not
Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.
withPayload(wrapToEvent())
.setHeader(BizOrderConstants.BIZORDER_CONTEXT_KEY, request)
.build();
stateMachine.sendEvent(eventMsg);
}
// send log event
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
// @Retryable(maxAttempts = 5)
public void afterCommit() {
// 发送记录日志的event
bizOrderLogEventPublisher.bizOrderEventPublish(bizOrder,
request.getOperationType(), currentStatus,
Maps.newHashMap(AttributesKeyEnum.CALL_SYSTEM.getShortKeyName(), request.getCallSystem()));
}
});
return new OrderBaseResponse();
}
/**
* 构造待发送的statemachine event
*
* @return 对应的event
*/
public abstract BizOrderStatusChangeEventEnum wrapToEvent();
/**
* 构造对应的状态
*
* @return 状态
*/
public abstract String wrapTargetOrderStatus();
/**
* 构造对应的结束原因,只有finish态才需要返回,否则直接return null即可
*
* @param curOrderStatus 当前订单状态
* @return 订单结束原因
*/
public abstract String wrapFinishReason(String curOrderStatus);
}
同样的,比如订单需要自动跳转到关单情况就可以实现如下:
import org.springframework.statemachine.StateMachine;
import org.springframework.stereotype.Component;
/**
* 跳转到close状态 前一节点的逻辑处理
*/
@Component("bizOrderToCloseBizManager")
public class BizOrderToCloseBizManagerImpl extends BaseStatusToUnstableTargetBizManagerImpl<BizOrderStatusRequest, OrderBaseResponse> {
/**
* 实际的业务操作
*
* @param request 业务请求
* @param stateMachines 将上游处理后的stateMachine传递进来,后续持久化
* @return 业务结果
*/
@Override
public OrderBaseResponse doProcess(BizOrderStatusRequest request, StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>... stateMachines) throws Exception {
return super.doProcess(request, stateMachines);
}
/**
* 构造待发送的statemachine event
*
* @return 对应的event
*/
@Override
public BizOrderStatusChangeEventEnum wrapToEvent() {
return BizOrderStatusChangeEventEnum.EVT_SYS_CLOSE;
}
/**
* 构造对应的状态
*
* @return 状态
*/
@Override
public String wrapTargetOrderStatus() {
return BizOrderStatusEnum.CLOSE.getStatus() + "";
}
/**
* 构造对应的结束原因,只有finish态才需要返回,否则直接return null即可
*
* @param curOrderStatus 当前订单状态
* @return 订单结束原因
*/
@Override
public String wrapFinishReason(String curOrderStatus) {
return "CLOSE_FROM_" + curOrderStatus + "||";
}
}