Activiti

新的聊天

用户:

你知道java里面的这个项目么 Activiti

ChatGPT:

是的,我了解 Activiti。Activiti 是一个开源的工作流引擎和业务流程管理(BPM)平台,主要用于实现业务流程的自动化。以下是 Activiti 的一些主要特点:

  1. 核心功能:
  • 流程定义和部署
  • 流程实例管理
  • 任务管理
  • 用户和用户组管理
  • 历史数据管理
  1. 主要特性:
  • 基于 BPMN 2.0 标准
  • 轻量级架构
  • 与 Spring 框架良好集成
  • 提供可视化流程设计器
  • 支持多种数据库
  • REST API 支持
  1. 常见应用场景:
  • 请假审批流程
  • 报销审批流程
  • 采购审批流程
  • 文档审批流程
  • 其他业务流程自动化
  1. 核心组件:
// 流程引擎配置
ProcessEngineConfiguration configuration = ProcessEngineConfiguration
    .createStandaloneProcessEngineConfiguration();

// 流程引擎
ProcessEngine processEngine = configuration.buildProcessEngine();

// 运行时服务
RuntimeService runtimeService = processEngine.getRuntimeService();

// 任务服务
TaskService taskService = processEngine.getTaskService();

// 仓库服务
RepositoryService repositoryService = processEngine.getRepositoryService();
  1. 基本使用示例:
// 部署流程定义
Deployment deployment = repositoryService.createDeployment()
    .addClasspathResource("processes/myProcess.bpmn20.xml")
    .deploy();

// 启动流程实例
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("myProcess");

// 查询任务
List<Task> tasks = taskService.createTaskQuery()
    .taskAssignee("john")
    .list();

// 完成任务
taskService.complete(task.getId());
  1. 与 Spring Boot 集成:
@Configuration
public class ActivitiConfig {
    @Bean
    public ProcessEngine processEngine() {
        return ProcessEngineConfiguration
            .createStandaloneProcessEngineConfiguration()
            .setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
            .setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
            .setJdbcDriver("org.h2.Driver")
            .setJdbcUsername("sa")
            .setJdbcPassword("")
            .buildProcessEngine();
    }
}
  1. 主要优势:
  • 开源免费
  • 社区活跃
  • 文档完善
  • 易于集成
  • 性能优秀
  • 功能丰富
  1. 注意事项:
  • 需要合理设计流程
  • 注意流程版本管理
  • 关注性能优化
  • 做好异常处理
  • 注意数据安全

Activiti 是一个成熟的工作流引擎,适合用于企业级应用开发。它提供了完整的流程管理功能,可以帮助开发者快速实现各种业务流程的自动化。

用户:

现在我们公司要做一个审批流,比如现在我希望实现一个请假审批流程,这个我该怎么用这个实现?

ChatGPT:

我来帮你设计一个基于 Activiti 的请假审批流程。我们可以分以下几个步骤实现:

  1. 首先,设计请假流程的 BPMN 模型(可以使用 Activiti Designer 设计):
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
             targetNamespace="http://www.activiti.org/processdef">
    <process id="leaveProcess" name="请假流程" isExecutable="true">
        <!-- 开始节点 -->
        <startEvent id="startEvent" name="开始"/>
        
        <!-- 填写请假申请 -->
        <userTask id="submitForm" name="填写请假申请"
                  activiti:assignee="${applicant}"/>
        
        <!-- 直接主管审批 -->
        <userTask id="leaderApprove" name="主管审批"
                  activiti:assignee="${leader}"/>
        
        <!-- 人事审批 -->
        <userTask id="hrApprove" name="人事审批"
                  activiti:assignee="${hr}"/>
        
        <!-- 结束节点 -->
        <endEvent id="endEvent" name="结束"/>
        
        <!-- 连线 -->
        <sequenceFlow sourceRef="startEvent" targetRef="submitForm"/>
        <sequenceFlow sourceRef="submitForm" targetRef="leaderApprove"/>
        <sequenceFlow sourceRef="leaderApprove" targetRef="hrApprove"/>
        <sequenceFlow sourceRef="hrApprove" targetRef="endEvent"/>
    </process>
</definitions>
  1. 创建请假实体类:
@Data
public class LeaveRequest {
    private String id;
    private String userId;      // 申请人ID
    private String reason;      // 请假原因
    private Date startTime;     // 开始时间
    private Date endTime;       // 结束时间
    private Integer days;       // 请假天数
    private String status;      // 状态
    private String processInstanceId; // 流程实例ID
}
  1. 创建请假服务接口:
@Service
@Transactional
public class LeaveService {
    @Autowired
    private RuntimeService runtimeService;
    
    @Autowired
    private TaskService taskService;
    
    @Autowired
    private RepositoryService repositoryService;
    
    // 发起请假申请
    public void startLeaveProcess(LeaveRequest leaveRequest) {
        // 保存请假信息
        // saveLeaveRequest(leaveRequest);
        
        // 启动流程实例
        Map<String, Object> variables = new HashMap<>();
        variables.put("applicant", leaveRequest.getUserId());
        variables.put("leader", "直接主管的用户ID");
        variables.put("hr", "人事的用户ID");
        
        ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
            "leaveProcess", 
            leaveRequest.getId(),  // 业务键
            variables
        );
        
        // 关联业务ID和流程实例ID
        leaveRequest.setProcessInstanceId(processInstance.getId());
        // updateLeaveRequest(leaveRequest);
    }
    
    // 审批
    public void approve(String taskId, boolean approved, String comment) {
        // 添加审批意见
        taskService.addComment(taskId, null, comment);
        
        Map<String, Object> variables = new HashMap<>();
        variables.put("approved", approved);
        
        // 完成任务
        taskService.complete(taskId, variables);
    }
    
    // 查询待办任务
    public List<Task> findTodoTasks(String userId) {
        return taskService.createTaskQuery()
                .taskAssignee(userId)
                .active()
                .list();
    }
}
  1. 创建控制器:
@RestController
@RequestMapping("/leave")
public class LeaveController {
    @Autowired
    private LeaveService leaveService;
    
    // 发起请假申请
    @PostMapping("/start")
    public Result startLeave(@RequestBody LeaveRequest leaveRequest) {
        leaveService.startLeaveProcess(leaveRequest);
        return Result.success();
    }
    
    // 审批
    @PostMapping("/approve")
    public Result approve(@RequestParam String taskId, 
                         @RequestParam boolean approved,
                         @RequestParam String comment) {
        leaveService.approve(taskId, approved, comment);
        return Result.success();
    }
    
    // 获取待办任务
    @GetMapping("/tasks")
    public Result getTodoTasks(@RequestParam String userId) {
        List<Task> tasks = leaveService.findTodoTasks(userId);
        return Result.success(tasks);
    }
}
  1. 配置文件 application.yml:
spring:
  datasource:
    url: jdbc:mysql://localhost:3306/activiti?useUnicode=true&characterEncoding=utf8
    username: root
    password: root
    driver-class-name: com.mysql.cj.jdbc.Driver
    
  activiti:
    database-schema-update: true
    history-level: full
    db-history-used: true
  1. 使用流程:
// 1. 员工发起请假申请
POST /leave/start
{
    "userId": "emp001",
    "reason": "年假",
    "startTime": "2023-05-01",
    "endTime": "2023-05-03",
    "days": 3
}

// 2. 主管查看待办任务
GET /leave/tasks?userId=leader001

// 3. 主管审批
POST /leave/approve?taskId=xxx&approved=true&comment=同意

// 4. 人事查看待办任务
GET /leave/tasks?userId=hr001

// 5. 人事审批
POST /leave/approve?taskId=xxx&approved=true&comment=同意

补充说明:

  1. 实际使用时需要加入更多的业务逻辑,如:

    • 请假天数的计算和验证
    • 各级审批的具体规则
    • 邮件通知
    • 流程跟踪
    • 审批历史查询
  2. 可以添加更多功能:

    • 驳回功能
    • 加签/减签
    • 转办功能
    • 催办功能
    • 流程撤回
  3. 安全性考虑:

    • 权限控制
    • 数据验证
    • 操作日志
    • 事务处理
  4. 建议增加的功能:

    • 流程状态查询
    • 审批历史查询
    • 请假统计
    • 导出功能

这是一个基本的请假流程实现,你可以根据实际需求进行调整和扩展。

用户:

我想知道这个数据是存在哪里了?比如这个接口:GET /leave/tasks?userId=hr001,它的数据是从哪里查出来的?

ChatGPT:

Activiti 会在数据库中创建多个表来存储流程相关的所有数据。当你启动 Activiti 时,它会自动创建大约 25-30 个表。以下是一些主要的表:

  1. 核心运行时表(以 ACT_RU_ 开头):
-- 运行时流程实例表
ACT_RU_EXECUTION

-- 运行时任务表(你查询的待办任务主要从这个表查询)
ACT_RU_TASK

-- 运行时变量表
ACT_RU_VARIABLE

-- 运行时身份链接表
ACT_RU_IDENTITYLINK
  1. 历史表(以 ACT_HI_ 开头):
-- 历史活动实例表
ACT_HI_ACTINST

-- 历史任务实例表
ACT_HI_TASKINST

-- 历史流程实例表
ACT_HI_PROCINST

-- 历史变量表
ACT_HI_VARINST
  1. 仓库表(以 ACT_RE_ 开头):
-- 流程定义表
ACT_RE_PROCDEF

-- 部署表
ACT_RE_DEPLOYMENT

当你调用 GET /leave/tasks?userId=hr001 这个接口时,实际上是查询 ACT_RU_TASK 表:

// 这段代码
taskService.createTaskQuery()
    .taskAssignee(userId)
    .active()
    .list();

// 会转换成类似这样的 SQL
SELECT * FROM ACT_RU_TASK 
WHERE ASSIGNEE_ = 'hr001' 
AND SUSPENSION_STATE_ = 1;

以下是 ACT_RU_TASK 表的主要字段:

CREATE TABLE ACT_RU_TASK (
    ID_ varchar(64) NOT NULL,                    -- 任务ID
    NAME_ varchar(255),                          -- 任务名称
    TASK_DEF_KEY_ varchar(255),                 -- 任务定义的ID
    PROC_INST_ID_ varchar(64),                  -- 流程实例ID
    PROC_DEF_ID_ varchar(64),                   -- 流程定义ID
    ASSIGNEE_ varchar(255),                      -- 签收人/处理人
    PRIORITY_ integer,                           -- 优先级
    CREATE_TIME_ timestamp,                      -- 创建时间
    DUE_DATE_ timestamp,                        -- 到期时间
    SUSPENSION_STATE_ integer,                   -- 挂起状态
    -- 其他字段...
    PRIMARY KEY (ID_)
);

如果你想要查看完整的数据,可以直接查询数据库:

  1. 查看待办任务:
SELECT * FROM ACT_RU_TASK 
WHERE ASSIGNEE_ = 'hr001';
  1. 查看流程实例:
SELECT * FROM ACT_RU_EXECUTION;
  1. 查看历史任务:
SELECT * FROM ACT_HI_TASKINST;
  1. 查看流程变量:
SELECT * FROM ACT_RU_VARIABLE;

需要注意的是:

  1. 业务数据(如请假信息)通常需要自己创建表存储:
CREATE TABLE leave_request (
    id VARCHAR(32) PRIMARY KEY,
    user_id VARCHAR(32),
    reason VARCHAR(255),
    start_time DATETIME,
    end_time DATETIME,
    days INT,
    status VARCHAR(20),
    process_instance_id VARCHAR(64),
    create_time DATETIME,
    update_time DATETIME
);
  1. 关联查询示例:
-- 查询某个用户的请假申请及其当前任务状态
SELECT 
    l.*, 
    t.NAME_ as current_task,
    t.ASSIGNEE_ as current_assignee
FROM leave_request l
LEFT JOIN ACT_RU_TASK t ON l.process_instance_id = t.PROC_INST_ID_
WHERE l.user_id = 'emp001';
  1. 如果需要查询历史记录:
-- 查询某个请假申请的完整审批历史
SELECT 
    hi.START_TIME_,
    hi.END_TIME_,
    hi.NAME_,
    hi.ASSIGNEE_,
    c.MESSAGE_
FROM ACT_HI_TASKINST hi
LEFT JOIN ACT_HI_COMMENT c ON hi.ID_ = c.TASK_ID_
WHERE hi.PROC_INST_ID_ = '某个流程实例ID';

所以,Activiti 的数据主要分为三类:

  1. 流程定义数据(ACT_RE_*表)
  2. 运行时数据(ACT_RU_*表)
  3. 历史数据(ACT_HI_*表)

而业务相关的数据需要自己设计表结构来存储,并通过流程实例ID与 Activiti 的表关联。

用户:

那GET /leave/tasks?userId=hr001获取到的数据大概是怎么样的,能够满足展示需求么?

ChatGPT:

GET /leave/tasks?userId=hr001 直接获取到的 Task 对象包含的信息相对比较基础,可能无法完全满足实际展示需求。我来详细说明一下:

  1. 原始返回的 Task 对象包含的基础信息:
{
    "id": "1234",                    // 任务ID
    "name": "人事审批",              // 任务名称
    "createTime": "2023-05-01 10:00:00", // 任务创建时间
    "assignee": "hr001",             // 处理人
    "processInstanceId": "5678",     // 流程实例ID
    "processDefinitionId": "leaveProcess:1:9012", // 流程定义ID
    "taskDefinitionKey": "hrApprove" // 任务定义KEY
}
  1. 实际展示时通常需要的信息:
{
    // 任务基本信息
    "taskId": "1234",
    "taskName": "人事审批",
    "createTime": "2023-05-01 10:00:00",
    "dueDate": "2023-05-03 10:00:00",    // 截止时间
    "priority": 50,                       // 优先级
    
    // 请假申请信息
    "leaveInfo": {
        "applyId": "LEAVE202305010001",
        "applicantId": "emp001",
        "applicantName": "张三",         // 申请人姓名
        "department": "技术部",          // 部门
        "reason": "年假",               // 请假原因
        "startTime": "2023-05-05",      // 开始时间
        "endTime": "2023-05-07",        // 结束时间
        "days": 3,                      // 请假天数
        "leaveType": "年假",            // 请假类型
        "applyTime": "2023-05-01 09:30:00" // 申请时间
    },
    
    // 流程信息
    "processInfo": {
        "processInstanceId": "5678",
        "currentNode": "人事审批",
        "previousNode": "主管审批",
        "previousApprover": "leader001",
        "previousApproveTime": "2023-05-01 09:45:00",
        "previousComment": "同意"        // 上一步审批意见
    }
}
  1. 为了获取完整信息,需要扩展服务类:
@Service
public class LeaveService {
    @Autowired
    private TaskService taskService;
    
    @Autowired
    private RuntimeService runtimeService;
    
    @Autowired
    private HistoryService historyService;
    
    @Autowired
    private LeaveRequestMapper leaveRequestMapper; // 自定义的请假信息Mapper
    
    @Autowired
    private UserService userService; // 自定义的用户服务
    
    public List<TaskVO> findTodoTasks(String userId) {
        // 1. 查询任务列表
        List<Task> tasks = taskService.createTaskQuery()
                .taskAssignee(userId)
                .active()
                .orderByTaskCreateTime().desc()
                .list();
        
        // 2. 转换为展示对象
        return tasks.stream().map(task -> {
            TaskVO vo = new TaskVO();
            // 设置任务基本信息
            vo.setTaskId(task.getId());
            vo.setTaskName(task.getName());
            vo.setCreateTime(task.getCreateTime());
            vo.setDueDate(task.getDueDate());
            vo.setPriority(task.getPriority());
            
            // 获取流程实例ID
            String processInstanceId = task.getProcessInstanceId();
            
            // 查询请假信息
            LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
            LeaveInfoVO leaveInfo = convertToLeaveInfoVO(leaveRequest);
            
            // 查询申请人信息
            User applicant = userService.getUser(leaveRequest.getUserId());
            leaveInfo.setApplicantName(applicant.getName());
            leaveInfo.setDepartment(applicant.getDepartment());
            
            vo.setLeaveInfo(leaveInfo);
            
            // 查询流程信息
            ProcessInfoVO processInfo = getProcessInfo(processInstanceId);
            vo.setProcessInfo(processInfo);
            
            return vo;
        }).collect(Collectors.toList());
    }
    
    private ProcessInfoVO getProcessInfo(String processInstanceId) {
        ProcessInfoVO vo = new ProcessInfoVO();
        vo.setProcessInstanceId(processInstanceId);
        
        // 查询历史任务
        List<HistoricTaskInstance> historicTasks = historyService.createHistoricTaskInstanceQuery()
                .processInstanceId(processInstanceId)
                .finished()
                .orderByHistoricTaskInstanceEndTime().desc()
                .list();
        
        if (!historicTasks.isEmpty()) {
            HistoricTaskInstance previousTask = historicTasks.get(0);
            vo.setPreviousNode(previousTask.getName());
            vo.setPreviousApprover(previousTask.getAssignee());
            vo.setPreviousApproveTime(previousTask.getEndTime());
            
            // 查询审批意见
            List<Comment> comments = taskService.getTaskComments(previousTask.getId());
            if (!comments.isEmpty()) {
                vo.setPreviousComment(comments.get(0).getFullMessage());
            }
        }
        
        return vo;
    }
}
  1. 控制器改造:
@RestController
@RequestMapping("/leave")
public class LeaveController {
    @Autowired
    private LeaveService leaveService;
    
    @GetMapping("/tasks")
    public Result<List<TaskVO>> getTodoTasks(@RequestParam String userId) {
        List<TaskVO> tasks = leaveService.findTodoTasks(userId);
        return Result.success(tasks);
    }
}
  1. 前端展示示例:
<template>
  <div class="todo-tasks">
    <a-table :dataSource="tasks">
      <!-- 申请人信息 -->
      <a-table-column title="申请人" dataIndex="leaveInfo.applicantName" />
      <a-table-column title="部门" dataIndex="leaveInfo.department" />
      
      <!-- 请假信息 -->
      <a-table-column title="请假类型" dataIndex="leaveInfo.leaveType" />
      <a-table-column title="请假时间" >
        <template slot-scope="text, record">
          {{record.leaveInfo.startTime}} 至 {{record.leaveInfo.endTime}}
          (共{{record.leaveInfo.days}}天)
        </template>
      </a-table-column>
      <a-table-column title="请假原因" dataIndex="leaveInfo.reason" />
      
      <!-- 流程信息 -->
      <a-table-column title="当前节点" dataIndex="taskName" />
      <a-table-column title="上一审批人" dataIndex="processInfo.previousApprover" />
      <a-table-column title="上一审批意见" dataIndex="processInfo.previousComment" />
      
      <!-- 操作按钮 -->
      <a-table-column title="操作">
        <template slot-scope="text, record">
          <a-button @click="approve(record)">审批</a-button>
        </template>
      </a-table-column>
    </a-table>
  </div>
</template>

这样设计的好处是:

  1. 提供了完整的业务信息展示
  2. 减少前端的数据处理逻辑
  3. 减少前端的请求次数
  4. 提供更好的用户体验

建议:

  1. 添加分页查询
  2. 添加查询条件(如按申请时间、请假类型等筛选)
  3. 添加排序功能
  4. 添加缓存机制
  5. 考虑性能优化(如批量查询)

用户:

因为我自己想到的流程的处理有的时候是很复杂的,比如有多条线路,甚至有些是and的关系,有些是or的关系,有可能终止,有可能通过,甚至有些节点还有时间要求,这些都是这个Activiti自己完全管理的么?

ChatGPT:

是的,Activiti 支持这些复杂的流程控制。我来详细解释一下各种场景的实现:

  1. 多条路线(网关):
<!-- 排他网关(XOR,只走一条路) -->
<exclusiveGateway id="daysCheck" name="请假天数判断">
    <sequenceFlow sourceRef="daysCheck" targetRef="leaderApprove">
        <conditionExpression>${days <= 3}</conditionExpression>
    </sequenceFlow>
    <sequenceFlow sourceRef="daysCheck" targetRef="directorApprove">
        <conditionExpression>${days > 3}</conditionExpression>
    </sequenceFlow>
</exclusiveGateway>

<!-- 并行网关(AND,所有路线都要走) -->
<parallelGateway id="multiApprove" name="多人同时审批">
    <sequenceFlow sourceRef="multiApprove" targetRef="leaderApprove"/>
    <sequenceFlow sourceRef="multiApprove" targetRef="hrApprove"/>
</parallelGateway>

<!-- 包容网关(OR,满足条件的都要走) -->
<inclusiveGateway id="conditionalApprove" name="条件审批">
    <sequenceFlow sourceRef="conditionalApprove" targetRef="leaderApprove">
        <conditionExpression>${days > 3}</conditionExpression>
    </sequenceFlow>
    <sequenceFlow sourceRef="conditionalApprove" targetRef="hrApprove">
        <conditionExpression>${type == 'sick'}</conditionExpression>
    </sequenceFlow>
</inclusiveGateway>
  1. 流程终止和通过:
<!-- 终止结束事件 -->
<endEvent id="terminateEnd" name="终止">
    <terminateEventDefinition/>
</endEvent>

<!-- 条件判断终止 -->
<sequenceFlow sourceRef="checkResult" targetRef="terminateEnd">
    <conditionExpression>${!approved}</conditionExpression>
</sequenceFlow>

<!-- 正常通过 -->
<sequenceFlow sourceRef="checkResult" targetRef="normalEnd">
    <conditionExpression>${approved}</conditionExpression>
</sequenceFlow>
  1. 时间控制:
<!-- 定时开始事件 -->
<startEvent id="timerStart">
    <timerEventDefinition>
        <timeDate>2023-12-31T23:59:59</timeDate>
    </timerEventDefinition>
</startEvent>

<!-- 超时边界事件 -->
<boundaryEvent id="timeoutAlert" attachedToRef="leaderApprove">
    <timerEventDefinition>
        <timeDuration>P2D</timeDuration> <!-- 2天后触发 -->
    </timerEventDefinition>
</boundaryEvent>

<!-- 定时任务 -->
<serviceTask id="reminderTask">
    <timerEventDefinition>
        <timeCycle>R3/PT10H</timeCycle> <!-- 每10小时执行一次,共执行3次 -->
    </timerEventDefinition>
</serviceTask>
  1. 复杂业务规则示例:
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL">
    <process id="complexLeaveProcess" name="复杂请假流程">
        <!-- 开始 -->
        <startEvent id="start"/>
        
        <!-- 填写请假单 -->
        <userTask id="submitForm" name="填写请假单"/>
        
        <!-- 判断请假类型 -->
        <exclusiveGateway id="leaveTypeCheck"/>
        
        <!-- 病假流程 -->
        <sequenceFlow sourceRef="leaveTypeCheck" targetRef="sickLeaveProcess">
            <conditionExpression>${type == 'sick'}</conditionExpression>
        </sequenceFlow>
        
        <!-- 年假流程 -->
        <sequenceFlow sourceRef="leaveTypeCheck" targetRef="annualLeaveProcess">
            <conditionExpression>${type == 'annual'}</conditionExpression>
        </sequenceFlow>
        
        <!-- 病假处理 -->
        <subProcess id="sickLeaveProcess">
            <!-- 并行审批 -->
            <parallelGateway id="sickLeaveParallel"/>
            
            <!-- 主管审批 -->
            <userTask id="leaderApprove" name="主管审批">
                <!-- 超时提醒 -->
                <boundaryEvent id="leaderTimeout">
                    <timerEventDefinition>
                        <timeDuration>P1D</timeDuration>
                    </timerEventDefinition>
                </boundaryEvent>
            </userTask>
            
            <!-- 医疗证明检查 -->
            <userTask id="medicalCheck" name="医疗证明检查"/>
            
            <!-- 汇总网关 -->
            <parallelGateway id="sickLeaveJoin"/>
        </subProcess>
        
        <!-- 年假处理 -->
        <subProcess id="annualLeaveProcess">
            <!-- 判断天数 -->
            <exclusiveGateway id="daysCheck"/>
            
            <!-- 短期请假 -->
            <sequenceFlow sourceRef="daysCheck" targetRef="shortLeave">
                <conditionExpression>${days <= 3}</conditionExpression>
            </sequenceFlow>
            
            <!-- 长期请假 -->
            <sequenceFlow sourceRef="daysCheck" targetRef="longLeave">
                <conditionExpression>${days > 3}</conditionExpression>
            </sequenceFlow>
            
            <!-- 包容网关 -->
            <inclusiveGateway id="approveGateway"/>
            
            <!-- 各种审批条件 -->
            <sequenceFlow sourceRef="approveGateway" targetRef="leaderApprove">
                <conditionExpression>${days > 0}</conditionExpression>
            </sequenceFlow>
            
            <sequenceFlow sourceRef="approveGateway" targetRef="hrApprove">
                <conditionExpression>${days > 3}</conditionExpression>
            </sequenceFlow>
            
            <sequenceFlow sourceRef="approveGateway" targetRef="directorApprove">
                <conditionExpression>${days > 5}</conditionExpression>
            </sequenceFlow>
        </subProcess>
        
        <!-- 结果处理 -->
        <exclusiveGateway id="resultCheck"/>
        
        <!-- 审批通过 -->
        <sequenceFlow sourceRef="resultCheck" targetRef="approved">
            <conditionExpression>${approved}</conditionExpression>
        </sequenceFlow>
        
        <!-- 审批拒绝 -->
        <sequenceFlow sourceRef="resultCheck" targetRef="rejected">
            <conditionExpression>${!approved}</conditionExpression>
        </sequenceFlow>
        
        <!-- 结束事件 -->
        <endEvent id="end"/>
    </process>
</definitions>
  1. 在代码中处理复杂逻辑:
@Service
public class LeaveProcessService {
    @Autowired
    private RuntimeService runtimeService;
    
    @Autowired
    private TaskService taskService;
    
    // 启动流程时设置变量
    public void startProcess(LeaveRequest leave) {
        Map<String, Object> variables = new HashMap<>();
        variables.put("type", leave.getType());
        variables.put("days", leave.getDays());
        variables.put("applicant", leave.getUserId());
        variables.put("approved", false);
        
        // 设置动态审批人
        variables.put("leader", getLeader(leave.getUserId()));
        variables.put("hr", getHrApprover(leave.getType()));
        variables.put("director", getDirector(leave.getDepartment()));
        
        runtimeService.startProcessInstanceByKey(
            "complexLeaveProcess",
            leave.getId(),
            variables
        );
    }
    
    // 完成任务时更新变量
    public void completeTask(String taskId, boolean approved, String comment) {
        // 添加评论
        taskService.addComment(taskId, null, comment);
        
        // 更新变量
        Map<String, Object> variables = new HashMap<>();
        variables.put("approved", approved);
        variables.put("comment", comment);
        variables.put("completeTime", new Date());
        
        // 完成任务
        taskService.complete(taskId, variables);
    }
    
    // 处理超时
    @Scheduled(fixedRate = 3600000) // 每小时执行
    public void handleTimeout() {
        // 查询超时任务
        List<Task> timeoutTasks = taskService.createTaskQuery()
            .active()
            .dueBeforeDate(new Date())
            .list();
            
        for (Task task : timeoutTasks) {
            // 发送提醒
            sendReminder(task);
            // 或自动处理
            autoProcess(task);
        }
    }
}

Activiti 提供了强大的流程控制功能:

  1. 支持各种类型的网关(排他、并行、包容)
  2. 支持子流程
  3. 支持定时事件
  4. 支持边界事件
  5. 支持条件判断
  6. 支持动态分配处理人
  7. 支持流程变量
  8. 支持事件监听

建议:

  1. 合理设计流程,避免过于复杂
  2. 注意处理异常情况
  3. 做好流程监控
  4. 考虑性能影响
  5. 保持流程版本管理

用户:

比如,现在我现在提交一个病假后,HR需要进行审批了,这个时候前端应该调用后端的哪一个接口?

ChatGPT:

对于 HR 审批这个操作,通常需要以下几个接口:

  1. 首先,HR 需要查看待办任务:
@GetMapping("/tasks")
public Result<List<TaskVO>> getTodoTasks(@RequestParam String userId) {
    // HR的userId查询待办任务
    List<TaskVO> tasks = leaveService.findTodoTasks(userId);
    return Result.success(tasks);
}
  1. 查看具体任务的详细信息:
@GetMapping("/task/{taskId}")
public Result<TaskDetailVO> getTaskDetail(@PathVariable String taskId) {
    TaskDetailVO detail = leaveService.getTaskDetail(taskId);
    return Result.success(detail);
}

@Service
public class LeaveService {
    @Autowired
    private TaskService taskService;
    
    public TaskDetailVO getTaskDetail(String taskId) {
        // 1. 获取任务信息
        Task task = taskService.createTaskQuery()
                .taskId(taskId)
                .singleResult();
                
        // 2. 获取流程实例ID
        String processInstanceId = task.getProcessInstanceId();
        
        // 3. 查询请假信息
        LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
        
        // 4. 查询流程历史记录
        List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
                .processInstanceId(processInstanceId)
                .finished()
                .orderByHistoricActivityInstanceEndTime().asc()
                .list();
                
        // 5. 组装详细信息
        TaskDetailVO detail = new TaskDetailVO();
        detail.setTask(task);
        detail.setLeaveRequest(leaveRequest);
        detail.setHistoryActivities(activities);
        
        return detail;
    }
}
  1. 提交审批结果:
@PostMapping("/task/approve")
public Result approveTask(@RequestBody ApproveDTO approveDTO) {
    leaveService.completeTask(approveDTO);
    return Result.success();
}

// 审批参数
@Data
public class ApproveDTO {
    private String taskId;        // 任务ID
    private Boolean approved;     // 是否同意
    private String comment;       // 审批意见
    private Map<String, Object> variables; // 其他变量
}

@Service
public class LeaveService {
    @Autowired
    private TaskService taskService;
    
    @Transactional
    public void completeTask(ApproveDTO approveDTO) {
        String taskId = approveDTO.getTaskId();
        
        // 1. 添加审批意见
        taskService.addComment(
            taskId,
            null,
            approveDTO.getComment()
        );
        
        // 2. 准备变量
        Map<String, Object> variables = new HashMap<>();
        variables.put("approved", approveDTO.getApproved());
        variables.put("approveTime", new Date());
        if (approveDTO.getVariables() != null) {
            variables.putAll(approveDTO.getVariables());
        }
        
        // 3. 完成任务
        taskService.complete(taskId, variables);
        
        // 4. 更新业务状态
        Task task = taskService.createTaskQuery()
                .taskId(taskId)
                .singleResult();
        String processInstanceId = task.getProcessInstanceId();
        
        LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
        leaveRequest.setStatus(approveDTO.getApproved() ? "approved" : "rejected");
        leaveRequestMapper.updateById(leaveRequest);
        
        // 5. 发送通知
        sendNotification(leaveRequest, approveDTO);
    }
}
  1. 前端调用示例:
// Vue组件示例
export default {
    data() {
        return {
            tasks: [],
            currentTask: null,
            approveForm: {
                approved: null,
                comment: '',
            }
        }
    },
    methods: {
        // 获取待办任务
        async loadTasks() {
            const res = await axios.get('/api/leave/tasks', {
                params: { userId: 'hr001' }
            });
            this.tasks = res.data.data;
        },
        
        // 查看任务详情
        async viewTask(taskId) {
            const res = await axios.get(`/api/leave/task/${taskId}`);
            this.currentTask = res.data.data;
        },
        
        // 提交审批
        async submitApprove() {
            const params = {
                taskId: this.currentTask.taskId,
                approved: this.approveForm.approved,
                comment: this.approveForm.comment,
                variables: {
                    // 可以添加其他流程变量
                }
            };
            
            try {
                await axios.post('/api/leave/task/approve', params);
                this.$message.success('审批提交成功');
                this.loadTasks(); // 刷新任务列表
            } catch (error) {
                this.$message.error('审批提交失败');
            }
        }
    },
    created() {
        this.loadTasks();
    }
}
  1. 前端页面示例:
<template>
    <div class="task-approval">
        <!-- 任务列表 -->
        <a-table :dataSource="tasks">
            <a-table-column title="申请人" dataIndex="leaveInfo.applicantName" />
            <a-table-column title="请假类型" dataIndex="leaveInfo.leaveType" />
            <a-table-column title="请假天数" dataIndex="leaveInfo.days" />
            <a-table-column title="操作">
                <template slot-scope="text, record">
                    <a-button @click="viewTask(record.taskId)">
                        查看详情
                    </a-button>
                </template>
            </a-table-column>
        </a-table>

        <!-- 审批对话框 -->
        <a-modal
            v-model="visible"
            title="请假审批"
            @ok="submitApprove"
        >
            <div class="task-detail">
                <!-- 请假详情展示 -->
                <div class="leave-info">
                    <p>申请人:{{currentTask?.leaveInfo.applicantName}}</p>
                    <p>请假类型:{{currentTask?.leaveInfo.leaveType}}</p>
                    <p>请假时间:{{currentTask?.leaveInfo.startTime}} 至 {{currentTask?.leaveInfo.endTime}}</p>
                    <p>请假原因:{{currentTask?.leaveInfo.reason}}</p>
                </div>

                <!-- 审批表单 -->
                <a-form>
                    <a-form-item label="审批结果">
                        <a-radio-group v-model="approveForm.approved">
                            <a-radio :value="true">同意</a-radio>
                            <a-radio :value="false">拒绝</a-radio>
                        </a-radio-group>
                    </a-form-item>
                    
                    <a-form-item label="审批意见">
                        <a-textarea
                            v-model="approveForm.comment"
                            :rows="4"
                            placeholder="请输入审批意见"
                        />
                    </a-form-item>
                </a-form>
            </div>
        </a-modal>
    </div>
</template>

这样设计的好处是:

  1. 接口职责清晰
  2. 数据结构完整
  3. 操作流程合理
  4. 异常处理完善
  5. 支持业务扩展

建议:

  1. 添加权限验证
  2. 添加数据验证
  3. 添加操作日志
  4. 添加事务处理
  5. 考虑并发情况

用户:

我看你这里写的最终调用的是completeTask接口,这个内部是怎么判断hr可以审批这个节点的申请这个操作的,如果一个流程中有两个节点都需要同一个人审批,这个会出错么?

ChatGPT:

很好的问题!让我详细解释 Activiti 是如何管理任务分配和权限的:

  1. 任务分配的判断机制:
@Service
public class LeaveService {
    @Autowired
    private TaskService taskService;
    
    public void completeTask(ApproveDTO approveDTO) {
        String taskId = approveDTO.getTaskId();
        String userId = approveDTO.getUserId(); // 当前操作人
        
        // 1. 首先验证任务是否存在且属于该用户
        Task task = taskService.createTaskQuery()
                .taskId(taskId)
                .taskAssignee(userId)  // 关键点:检查任务是否分配给该用户
                .singleResult();
                
        if (task == null) {
            throw new BusinessException("无权处理该任务或任务不存在");
        }
        
        // 2. 检查任务是否是当前活动的任务
        if (task.isSuspended()) {
            throw new BusinessException("该任务已被挂起");
        }
        
        // 3. 完成任务
        taskService.complete(taskId, approveDTO.getVariables());
    }
}
  1. 对于同一个人在不同节点审批的情况,Activiti 是通过 taskDefinitionKey 来区分的:
// BPMN定义
<userTask id="firstApprove" name="初审" activiti:assignee="${hr}"/>
<userTask id="finalApprove" name="终审" activiti:assignee="${hr}"/>

// 查询特定节点的任务
@Service
public class LeaveService {
    public List<Task> getTasksByDefinitionKey(String userId, String taskDefinitionKey) {
        return taskService.createTaskQuery()
                .taskAssignee(userId)
                .taskDefinitionKey(taskDefinitionKey) // 指定节点的key
                .list();
    }
    
    // 查询任务详情时也可以包含节点信息
    public TaskDetailVO getTaskDetail(String taskId) {
        Task task = taskService.createTaskQuery()
                .taskId(taskId)
                .singleResult();
                
        TaskDetailVO detail = new TaskDetailVO();
        detail.setTaskId(task.getId());
        detail.setTaskName(task.getName());
        detail.setTaskDefinitionKey(task.getTaskDefinitionKey()); // 节点标识
        // ... 其他信息
        return detail;
    }
}
  1. 更安全的任务处理方式:
@Service
public class LeaveService {
    @Autowired
    private TaskService taskService;
    
    @Transactional
    public void completeTask(ApproveDTO approveDTO) {
        String taskId = approveDTO.getTaskId();
        String userId = approveDTO.getUserId();
        
        // 1. 获取任务详情,包含节点信息
        Task task = taskService.createTaskQuery()
                .taskId(taskId)
                .taskAssignee(userId)
                .singleResult();
                
        if (task == null) {
            throw new BusinessException("无权处理该任务");
        }
        
        // 2. 根据节点类型执行不同的业务逻辑
        String definitionKey = task.getTaskDefinitionKey();
        switch (definitionKey) {
            case "firstApprove":
                handleFirstApprove(task, approveDTO);
                break;
            case "finalApprove":
                handleFinalApprove(task, approveDTO);
                break;
            default:
                throw new BusinessException("未知的任务节点");
        }
        
        // 3. 完成任务
        Map<String, Object> variables = new HashMap<>();
        variables.put("approved", approveDTO.getApproved());
        variables.put("approver", userId);
        variables.put("approveTime", new Date());
        variables.put("nodeKey", definitionKey);
        
        taskService.complete(taskId, variables);
    }
    
    // 处理初审逻辑
    private void handleFirstApprove(Task task, ApproveDTO approveDTO) {
        // 初审特定的业务逻辑
        if (!approveDTO.getApproved()) {
            // 如果拒绝,可能需要特殊处理
            handleReject(task, "初审拒绝");
        }
    }
    
    // 处理终审逻辑
    private void handleFinalApprove(Task task, ApproveDTO approveDTO) {
        // 终审特定的业务逻辑
        if (approveDTO.getApproved()) {
            // 审批通过后的处理
            handleApproved(task);
        }
    }
}
  1. 前端展示时区分不同节点:
export default {
    data() {
        return {
            tasks: [],
            nodeTypes: {
                firstApprove: '初审',
                finalApprove: '终审'
            }
        }
    },
    methods: {
        async loadTasks() {
            const res = await axios.get('/api/leave/tasks', {
                params: { userId: 'hr001' }
            });
            
            // 处理任务数据,添加节点类型描述
            this.tasks = res.data.data.map(task => ({
                ...task,
                nodeTypeName: this.nodeTypes[task.taskDefinitionKey]
            }));
        },
        
        // 根据节点类型显示不同的表单
        getFormByNodeType(nodeType) {
            switch(nodeType) {
                case 'firstApprove':
                    return 'first-approve-form';
                case 'finalApprove':
                    return 'final-approve-form';
                default:
                    return 'default-form';
            }
        }
    }
}
  1. 任务列表展示:
<template>
    <div>
        <a-table :dataSource="tasks">
            <a-table-column title="申请人" dataIndex="leaveInfo.applicantName" />
            <a-table-column title="审批节点" dataIndex="nodeTypeName" />
            <a-table-column title="创建时间" dataIndex="createTime" />
            <a-table-column title="操作">
                <template slot-scope="text, record">
                    <component 
                        :is="getFormByNodeType(record.taskDefinitionKey)"
                        :task="record"
                        @approve="handleApprove"
                    />
                </template>
            </a-table-column>
        </a-table>
    </div>
</template>
  1. 添加任务锁定机制(防止并发处理):
@Service
public class LeaveService {
    @Autowired
    private TaskService taskService;
    
    @Transactional
    public void completeTask(ApproveDTO approveDTO) {
        String taskId = approveDTO.getTaskId();
        String userId = approveDTO.getUserId();
        
        // 1. 尝试锁定任务
        try {
            taskService.claim(taskId, userId);
        } catch (ActivitiTaskAlreadyClaimedException e) {
            throw new BusinessException("该任务已被其他人处理");
        }
        
        try {
            // 2. 处理任务
            // ... 处理逻辑
            
            // 3. 完成任务
            taskService.complete(taskId, variables);
        } finally {
            // 4. 如果处理失败,解锁任务
            if (taskService.createTaskQuery().taskId(taskId).singleResult() != null) {
                taskService.unclaim(taskId);
            }
        }
    }
}

关键点总结:

  1. Activiti 通过 taskDefinitionKey 区分不同节点
  2. 每个任务都有明确的处理人(assignee)
  3. 完成任务前会验证处理人权限
  4. 可以为不同节点设置不同的处理逻辑
  5. 建议添加任务锁定机制防止并发处理

建议:

  1. 合理设计流程节点标识
  2. 做好节点权限控制
  3. 添加任务处理日志
  4. 考虑并发处理情况
  5. 做好异常处理

用户:

你上面提到任务挂起,这个是什么意思。任务有哪几种状态,每种状态标识什么意思?

ChatGPT:

在 Activiti 中,任务和流程实例都有自己的状态管理机制。让我详细解释:

  1. 任务(Task)的主要状态:
public class TaskStatus {
    // 1. 活动状态 (Active)
    // 任务刚创建,等待处理
    ACTIVE,

    // 2. 挂起状态 (Suspended)
    // 任务暂时被挂起,不能处理
    SUSPENDED,

    // 3. 完成状态 (Completed)
    // 任务已经处理完成
    COMPLETED,

    // 4. 删除状态 (Deleted)
    // 任务被删除
    DELETED
}
  1. 任务的认领状态:
public class TaskClaimStatus {
    // 1. 未认领 (Unclaimed)
    // assignee为null
    UNCLAIMED,

    // 2. 已认领 (Claimed)
    // 有具体的assignee
    CLAIMED,

    // 3. 委派状态 (Delegated)
    // 任务被委派给其他人处理
    DELEGATED
}
  1. 流程实例(ProcessInstance)的状态:
public class ProcessInstanceStatus {
    // 1. 活动状态 (Active)
    // 流程正在执行中
    ACTIVE,

    // 2. 挂起状态 (Suspended)
    // 流程被暂时挂起
    SUSPENDED,

    // 3. 完成状态 (Completed)
    // 流程正常结束
    COMPLETED,

    // 4. 终止状态 (Terminated)
    // 流程被强制终止
    TERMINATED
}
  1. 状态管理示例:
@Service
public class TaskManagementService {
    @Autowired
    private TaskService taskService;
    
    @Autowired
    private RuntimeService runtimeService;
    
    // 挂起任务
    public void suspendTask(String taskId) {
        taskService.suspendTaskById(taskId);
    }
    
    // 激活任务
    public void activateTask(String taskId) {
        taskService.activateTaskById(taskId);
    }
    
    // 认领任务
    public void claimTask(String taskId, String userId) {
        taskService.claim(taskId, userId);
    }
    
    // 取消认领
    public void unclaimTask(String taskId) {
        taskService.unclaim(taskId);
    }
    
    // 委派任务
    public void delegateTask(String taskId, String userId) {
        taskService.delegateTask(taskId, userId);
    }
    
    // 转办任务
    public void transferTask(String taskId, String userId) {
        taskService.setAssignee(taskId, userId);
    }
    
    // 挂起流程实例
    public void suspendProcessInstance(String processInstanceId) {
        runtimeService.suspendProcessInstanceById(processInstanceId);
    }
    
    // 激活流程实例
    public void activateProcessInstance(String processInstanceId) {
        runtimeService.activateProcessInstanceById(processInstanceId);
    }
}
  1. 状态查询示例:
@Service
public class TaskQueryService {
    @Autowired
    private TaskService taskService;
    
    // 查询活动的任务
    public List<Task> getActiveTasks() {
        return taskService.createTaskQuery()
                .active()
                .list();
    }
    
    // 查询挂起的任务
    public List<Task> getSuspendedTasks() {
        return taskService.createTaskQuery()
                .suspended()
                .list();
    }
    
    // 查询未认领的任务
    public List<Task> getUnclaimedTasks() {
        return taskService.createTaskQuery()
                .taskCandidateUser("userId")
                .taskUnassigned()
                .list();
    }
    
    // 查询某人的待办任务
    public List<Task> getUserTasks(String userId) {
        return taskService.createTaskQuery()
                .taskAssignee(userId)
                .active()
                .list();
    }
}
  1. 状态变更监听:
@Component
public class TaskStateListener implements TaskListener {
    
    @Override
    public void notify(DelegateTask delegateTask) {
        String eventName = delegateTask.getEventName();
        switch (eventName) {
            case EVENTNAME_CREATE:
                handleTaskCreated(delegateTask);
                break;
            case EVENTNAME_ASSIGNMENT:
                handleTaskAssigned(delegateTask);
                break;
            case EVENTNAME_COMPLETE:
                handleTaskCompleted(delegateTask);
                break;
            case EVENTNAME_DELETE:
                handleTaskDeleted(delegateTask);
                break;
        }
    }
}
  1. 业务场景示例:
@Service
public class LeaveProcessService {
    
    // 请假流程挂起(比如员工临时撤回修改)
    public void suspendLeaveProcess(String processInstanceId) {
        // 1. 挂起流程实例
        runtimeService.suspendProcessInstanceById(processInstanceId);
        
        // 2. 更新业务状态
        LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
        leaveRequest.setStatus("SUSPENDED");
        leaveRequestMapper.updateById(leaveRequest);
        
        // 3. 发送通知
        sendNotification("流程已挂起");
    }
    
    // 销假处理(比如员工提前销假)
    public void terminateLeaveProcess(String processInstanceId) {
        // 1. 终止流程实例
        runtimeService.deleteProcessInstance(processInstanceId, "提前销假");
        
        // 2. 更新业务状态
        LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
        leaveRequest.setStatus("TERMINATED");
        leaveRequestMapper.updateById(leaveRequest);
        
        // 3. 发送通知
        sendNotification("请假已销假");
    }
}
  1. 前端展示不同状态:
<template>
    <a-table :dataSource="tasks">
        <a-table-column title="状态">
            <template slot-scope="text, record">
                <a-tag :color="getStatusColor(record.status)">
                    {{ getStatusText(record.status) }}
                </a-tag>
            </template>
        </a-table-column>
        <a-table-column title="操作">
            <template slot-scope="text, record">
                <!-- 根据不同状态显示不同操作按钮 -->
                <a-button v-if="record.status === 'ACTIVE'"
                    @click="handleClaim(record)">
                    认领
                </a-button>
                <a-button v-if="record.status === 'CLAIMED'"
                    @click="handleComplete(record)">
                    完成
                </a-button>
                <a-button v-if="record.status === 'ACTIVE'"
                    @click="handleSuspend(record)">
                    挂起
                </a-button>
            </template>
        </a-table-column>
    </a-table>
</template>

状态管理的注意事项:

  1. 状态变更要保证原子性
  2. 注意状态变更的权限控制
  3. 保持业务状态与流程状态的一致性
  4. 记录状态变更历史
  5. 考虑状态变更的通知机制

建议:

  1. 合理使用状态
  2. 做好状态转换的验证
  3. 添加状态变更日志
  4. 考虑并发情况
  5. 做好异常处理

用户:

go语言如果要实现这种流程审批有什么办法么

ChatGPT:

在 Go 语言中实现工作流审批,有几种主要的方案:

  1. 使用 Temporal 工作流引擎(推荐):
// 定义工作流
func LeaveApprovalWorkflow(ctx workflow.Context, request LeaveRequest) error {
    // 工作流选项
    ao := workflow.ActivityOptions{
        StartToCloseTimeout: 24 * time.Hour,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Minute,
            BackoffCoefficient: 2.0,
            MaximumInterval:    time.Hour,
            MaximumAttempts:    3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, ao)

    // 1. 提交申请
    var submitResult SubmitResult
    err := workflow.ExecuteActivity(ctx, SubmitLeaveRequest, request).Get(ctx, &submitResult)
    if err != nil {
        return err
    }

    // 2. 主管审批
    var leaderApproval ApprovalResult
    err = workflow.ExecuteActivity(ctx, LeaderApprove, submitResult.RequestID).Get(ctx, &leaderApproval)
    if err != nil {
        return err
    }

    // 3. 如果主管拒绝,直接结束
    if !leaderApproval.Approved {
        return workflow.ExecuteActivity(ctx, NotifyRejection, request).Get(ctx, nil)
    }

    // 4. HR审批
    var hrApproval ApprovalResult
    err = workflow.ExecuteActivity(ctx, HRApprove, submitResult.RequestID).Get(ctx, &hrApproval)
    if err != nil {
        return err
    }

    // 5. 完成审批
    return workflow.ExecuteActivity(ctx, FinalizeApproval, request, hrApproval).Get(ctx, nil)
}

// 活动实现
type Activities struct {
    leaveService *service.LeaveService
}

func (a *Activities) SubmitLeaveRequest(ctx context.Context, request LeaveRequest) (*SubmitResult, error) {
    // 实现提交逻辑
    return a.leaveService.Submit(request)
}

func (a *Activities) LeaderApprove(ctx context.Context, requestID string) (*ApprovalResult, error) {
    // 实现主管审批逻辑
    return a.leaveService.LeaderApprove(requestID)
}
  1. 使用自定义状态机:
// 定义状态
type Status string

const (
    StatusDraft     Status = "draft"
    StatusSubmitted Status = "submitted"
    StatusApproving Status = "approving"
    StatusApproved  Status = "approved"
    StatusRejected  Status = "rejected"
)

// 定义流程节点
type Node struct {
    ID       string
    Name     string
    Type     string
    Handlers []string
    Next     []string
}

// 流程定义
type WorkflowDefinition struct {
    ID    string
    Nodes map[string]*Node
}

// 流程实例
type WorkflowInstance struct {
    ID           string
    DefinitionID string
    BusinessID   string
    CurrentNode  string
    Status      Status
    Variables   map[string]interface{}
    CreateTime  time.Time
    UpdateTime  time.Time
}

// 工作流引擎
type WorkflowEngine struct {
    db *gorm.DB
}

// 启动流程
func (e *WorkflowEngine) StartWorkflow(ctx context.Context, definitionID string, businessID string, variables map[string]interface{}) (*WorkflowInstance, error) {
    instance := &WorkflowInstance{
        ID:           uuid.New().String(),
        DefinitionID: definitionID,
        BusinessID:   businessID,
        Status:       StatusSubmitted,
        Variables:    variables,
        CreateTime:   time.Now(),
    }

    if err := e.db.Create(instance).Error; err != nil {
        return nil, err
    }

    return instance, nil
}

// 完成任务
func (e *WorkflowEngine) CompleteTask(ctx context.Context, taskID string, variables map[string]interface{}) error {
    return e.db.Transaction(func(tx *gorm.DB) error {
        // 1. 获取任务
        task := &Task{}
        if err := tx.First(task, taskID).Error; err != nil {
            return err
        }

        // 2. 更新任务状态
        task.Status = StatusCompleted
        if err := tx.Save(task).Error; err != nil {
            return err
        }

        // 3. 创建下一个节点的任务
        nextTasks, err := e.createNextTasks(ctx, tx, task)
        if err != nil {
            return err
        }

        return nil
    })
}
  1. 使用消息队列实现:
// 定义消息
type ApprovalMessage struct {
    RequestID  string    `json:"request_id"`
    NodeID     string    `json:"node_id"`
    ApproverID string    `json:"approver_id"`
    Action     string    `json:"action"`
    Comment    string    `json:"comment"`
    Timestamp  time.Time `json:"timestamp"`
}

// 消息处理器
type ApprovalHandler struct {
    db      *gorm.DB
    queue   *rabbitmq.Channel
    service *LeaveService
}

// 处理审批消息
func (h *ApprovalHandler) HandleApproval(msg ApprovalMessage) error {
    return h.db.Transaction(func(tx *gorm.DB) error {
        // 1. 获取请假申请
        request := &LeaveRequest{}
        if err := tx.First(request, msg.RequestID).Error; err != nil {
            return err
        }

        // 2. 更新审批状态
        if err := h.service.UpdateApprovalStatus(tx, request, msg); err != nil {
            return err
        }

        // 3. 发送通知
        if err := h.service.SendNotification(request, msg); err != nil {
            return err
        }

        return nil
    })
}
  1. 数据库设计:
-- 流程定义表
CREATE TABLE workflow_definitions (
    id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    version INT NOT NULL,
    content JSON NOT NULL,
    status VARCHAR(20) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

-- 流程实例表
CREATE TABLE workflow_instances (
    id VARCHAR(36) PRIMARY KEY,
    definition_id VARCHAR(36) NOT NULL,
    business_id VARCHAR(36) NOT NULL,
    current_node VARCHAR(36) NOT NULL,
    status VARCHAR(20) NOT NULL,
    variables JSON,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);

-- 任务表
CREATE TABLE tasks (
    id VARCHAR(36) PRIMARY KEY,
    instance_id VARCHAR(36) NOT NULL,
    node_id VARCHAR(36) NOT NULL,
    assignee VARCHAR(36),
    status VARCHAR(20) NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP NOT NULL
);
  1. API 实现:
// 路由定义
func SetupRoutes(r *gin.Engine, h *Handler) {
    r.POST("/workflows/leave/start", h.StartLeaveWorkflow)
    r.POST("/workflows/tasks/:taskId/complete", h.CompleteTask)
    r.GET("/workflows/tasks", h.GetUserTasks)
    r.GET("/workflows/instances/:instanceId", h.GetWorkflowInstance)
}

// 处理器
type Handler struct {
    engine *WorkflowEngine
    service *LeaveService
}

// 启动请假流程
func (h *Handler) StartLeaveWorkflow(c *gin.Context) {
    var req LeaveRequest
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(400, gin.H{"error": err.Error()})
        return
    }

    instance, err := h.engine.StartWorkflow(c, "leave_approval", req.ID, map[string]interface{}{
        "userId": req.UserID,
        "days": req.Days,
        "reason": req.Reason,
    })
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }

    c.JSON(200, instance)
}

// 完成任务
func (h *Handler) CompleteTask(c *gin.Context) {
    taskID := c.Param("taskId")
    var req CompleteTaskRequest
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(400, gin.H{"error": err.Error()})
        return
    }

    err := h.engine.CompleteTask(c, taskID, req.Variables)
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }

    c.JSON(200, gin.H{"message": "success"})
}
  1. 服务实现:
type LeaveService struct {
    db *gorm.DB
    notifier *Notifier
}

// 提交请假申请
func (s *LeaveService) SubmitLeave(ctx context.Context, req LeaveRequest) error {
    return s.db.Transaction(func(tx *gorm.DB) error {
        // 1. 保存请假记录
        leave := &Leave{
            ID:        uuid.New().String(),
            UserID:    req.UserID,
            Days:      req.Days,
            Reason:    req.Reason,
            Status:    StatusSubmitted,
            CreatedAt: time.Now(),
        }
        if err := tx.Create(leave).Error; err != nil {
            return err
        }

        // 2. 启动工作流
        if err := s.workflowEngine.StartWorkflow(ctx, "leave_approval", leave.ID, nil); err != nil {
            return err
        }

        return nil
    })
}

// 审批
func (s *LeaveService) Approve(ctx context.Context, taskID string, approved bool, comment string) error {
    return s.db.Transaction(func(tx *gorm.DB) error {
        // 1. 完成任务
        if err := s.workflowEngine.CompleteTask(ctx, taskID, map[string]interface{}{
            "approved": approved,
            "comment": comment,
        }); err != nil {
            return err
        }

        // 2. 更新请假状态
        // ...

        // 3. 发送通知
        // ...

        return nil
    })
}

建议:

  1. 选择合适的工作流引擎
  2. 做好事务处理
  3. 实现审计日志
  4. 添加超时处理
  5. 考虑并发情况
  6. 实现错误重试
  7. 添加监控指标

这些实现方案各有优势:

  • Temporal:功能强大,支持复杂流程
  • 状态机:简单直观,适合简单流程
  • 消息队列:解耦性好,适合分布式系统

选择哪种方案取决于你的具体需求。

用户:

这个支持BPMN 模型么?

ChatGPT:

Go 语言生态中支持 BPMN 模型的工作流引擎相对较少,但有几个选择:

  1. Zeebe (现在是 Camunda Platform 8 的一部分):
// 使用 Zeebe 客户端
import (
    "github.com/camunda/zeebe/clients/go/v8/pkg/pb"
    "github.com/camunda/zeebe/clients/go/v8/pkg/zbc"
)

// 创建客户端
client, err := zbc.NewClient(&zbc.ClientConfig{
    GatewayAddress: "localhost:26500",
})

// 部署 BPMN
deployment, err := client.NewDeployCommand().
    AddResourceFile("leave-process.bpmn").
    Send(context)

// 启动流程实例
result, err := client.NewCreateInstanceCommand().
    BPMNProcessId("leave-process").
    LatestVersion().
    Variables(map[string]interface{}{
        "userId": "123",
        "days": 3,
        "reason": "vacation",
    }).
    Send(context)
  1. Flowable (有非官方的 Go 客户端):
// 使用 REST API 调用 Flowable
type FlowableClient struct {
    baseURL    string
    httpClient *http.Client
}

// 部署 BPMN
func (c *FlowableClient) DeployBPMN(filename string, content []byte) error {
    body := &bytes.Buffer{}
    writer := multipart.NewWriter(body)
    
    part, err := writer.CreateFormFile("file", filename)
    if err != nil {
        return err
    }
    
    _, err = part.Write(content)
    if err != nil {
        return err
    }
    
    err = writer.Close()
    if err != nil {
        return err
    }
    
    req, err := http.NewRequest("POST", c.baseURL+"/repository/deployments", body)
    // ... 处理请求
}

// 启动流程实例
func (c *FlowableClient) StartProcess(processKey string, variables map[string]interface{}) error {
    payload := map[string]interface{}{
        "processDefinitionKey": processKey,
        "variables": variables,
    }
    
    jsonData, err := json.Marshal(payload)
    if err != nil {
        return err
    }
    
    req, err := http.NewRequest("POST", c.baseURL+"/runtime/process-instances", bytes.NewBuffer(jsonData))
    // ... 处理请求
}
  1. 自己实现 BPMN 解析和执行引擎:
// BPMN 模型结构
type BPMNProcess struct {
    ID       string      `xml:"id,attr"`
    Name     string      `xml:"name,attr"`
    Elements []Element   `xml:",any"`
}

type Element interface {
    GetID() string
    GetType() string
    Execute(ctx Context) error
}

// BPMN 解析器
type BPMNParser struct {
    // ... 
}

func (p *BPMNParser) ParseBPMN(content []byte) (*BPMNProcess, error) {
    var process BPMNProcess
    err := xml.Unmarshal(content, &process)
    if err != nil {
        return nil, err
    }
    return &process, nil
}

// BPMN 执行引擎
type BPMNEngine struct {
    db          *gorm.DB
    definitions map[string]*BPMNProcess
}

// 注册 BPMN 流程定义
func (e *BPMNEngine) DeployBPMN(process *BPMNProcess) error {
    e.definitions[process.ID] = process
    return nil
}

// 启动流程实例
func (e *BPMNEngine) StartProcess(processID string, variables map[string]interface{}) (*ProcessInstance, error) {
    process, exists := e.definitions[processID]
    if !exists {
        return nil, errors.New("process not found")
    }
    
    instance := &ProcessInstance{
        ID:        uuid.New().String(),
        ProcessID: processID,
        Status:    "ACTIVE",
        Variables: variables,
    }
    
    if err := e.db.Create(instance).Error; err != nil {
        return nil, err
    }
    
    // 开始执行流程
    ctx := NewContext(instance, e)
    go e.executeProcess(ctx, process)
    
    return instance, nil
}
  1. 完整的 BPMN 实现示例:
// BPMN 元素定义
type UserTask struct {
    ID         string `xml:"id,attr"`
    Name       string `xml:"name,attr"`
    Assignee   string `xml:"activiti:assignee,attr"`
    Candidates []string
}

type SequenceFlow struct {
    ID        string `xml:"id,attr"`
    SourceRef string `xml:"sourceRef,attr"`
    TargetRef string `xml:"targetRef,attr"`
    Condition string `xml:"conditionExpression"`
}

// BPMN 执行上下文
type Context struct {
    Instance  *ProcessInstance
    Variables map[string]interface{}
    Engine    *BPMNEngine
}

// 用户任务处理
func (t *UserTask) Execute(ctx Context) error {
    // 创建任务
    task := &Task{
        ID:         uuid.New().String(),
        InstanceID: ctx.Instance.ID,
        NodeID:     t.ID,
        Name:       t.Name,
        Assignee:   t.Assignee,
        Status:     "ACTIVE",
    }
    
    if err := ctx.Engine.db.Create(task).Error; err != nil {
        return err
    }
    
    // 等待任务完成
    return ctx.Engine.waitForTask(task.ID)
}

// 网关处理
type ExclusiveGateway struct {
    ID           string `xml:"id,attr"`
    OutgoingFlows []SequenceFlow
}

func (g *ExclusiveGateway) Execute(ctx Context) error {
    // 评估条件
    for _, flow := range g.OutgoingFlows {
        if evaluateCondition(flow.Condition, ctx.Variables) {
            return ctx.Engine.executeElement(flow.TargetRef, ctx)
        }
    }
    return errors.New("no valid path found")
}
  1. API 接口:
type WorkflowAPI struct {
    engine *BPMNEngine
}

// 部署 BPMN
func (api *WorkflowAPI) DeployBPMN(c *gin.Context) {
    file, err := c.FormFile("file")
    if err != nil {
        c.JSON(400, gin.H{"error": err.Error()})
        return
    }
    
    content, err := file.Open()
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }
    defer content.Close()
    
    process, err := api.engine.parser.ParseBPMN(content)
    if err != nil {
        c.JSON(400, gin.H{"error": err.Error()})
        return
    }
    
    err = api.engine.DeployBPMN(process)
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }
    
    c.JSON(200, gin.H{"message": "deployed successfully"})
}

// 启动流程
func (api *WorkflowAPI) StartProcess(c *gin.Context) {
    var req StartProcessRequest
    if err := c.ShouldBindJSON(&req); err != nil {
        c.JSON(400, gin.H{"error": err.Error()})
        return
    }
    
    instance, err := api.engine.StartProcess(req.ProcessID, req.Variables)
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }
    
    c.JSON(200, instance)
}

建议:

  1. 如果需要完整的 BPMN 支持,建议使用 Zeebe
  2. 如果是简单流程,可以考虑自己实现
  3. 需要注意:
    • BPMN 规范复杂
    • 需要处理并发
    • 需要考虑性能
    • 需要实现持久化
    • 需要支持版本控制

选择建议:

  1. 企业级应用:使用 Zeebe
  2. 简单应用:自己实现简化版
  3. 中等复杂度:考虑使用 Flowable + REST API

用户:

我看这些go的案例都需要自己新建表,比如WorkflowEngine,前面说到Activiti,大概有25~30个表,但是WorkflowEngine案例里面的表却没那么多,这个是因为WorkflowEngine不如Activiti完善么?WorkflowEngine如何定义复杂的审批流?

ChatGPT:

是的,你说得对。自定义的 WorkflowEngine 确实不如 Activiti 完善,主要体现在以下几个方面:

  1. Activiti 的表结构设计更完整:
-- Activiti 的表主要分类:
-- 1. 流程定义表(ACT_RE_*)
-- 2. 运行时表(ACT_RU_*)
-- 3. 历史表(ACT_HI_*)
-- 4. 身份表(ACT_ID_*)
-- 5. 通用数据表(ACT_GE_*)

-- 而简单的 WorkflowEngine 通常只有基础表:
CREATE TABLE workflow_definitions (
    id VARCHAR(36) PRIMARY KEY,
    name VARCHAR(100),
    version INT,
    content TEXT
);

CREATE TABLE workflow_instances (
    id VARCHAR(36) PRIMARY KEY,
    definition_id VARCHAR(36),
    status VARCHAR(20)
);

CREATE TABLE workflow_tasks (
    id VARCHAR(36) PRIMARY KEY,
    instance_id VARCHAR(36),
    node_id VARCHAR(36),
    assignee VARCHAR(36)
);
  1. Activiti 支持更复杂的 BPMN 元素:
<!-- Activiti 支持的复杂流程示例 -->
<definitions>
    <process id="complexProcess">
        <!-- 1. 多实例任务 -->
        <multiInstanceLoopCharacteristics>
            <loopCardinality>3</loopCardinality>
        </multiInstanceLoopCharacteristics>

        <!-- 2. 子流程 -->
        <subProcess id="subProcess">
            <!-- 子流程内容 -->
        </subProcess>

        <!-- 3. 调用活动 -->
        <callActivity id="callSubProcess" 
            calledElement="otherProcess">
        </callActivity>

        <!-- 4. 事件子流程 -->
        <subProcess id="eventSubProcess" triggeredByEvent="true">
            <startEvent id="errorStart">
                <errorEventDefinition errorRef="someError"/>
            </startEvent>
        </subProcess>

        <!-- 5. 复杂网关 -->
        <complexGateway id="complexDecision">
            <activationCondition>${execution.getVariable('complete')}</activationCondition>
        </complexGateway>
    </process>
</definitions>

<!-- 而简单的 WorkflowEngine 通常只支持基础元素 -->
<definitions>
    <process id="simpleProcess">
        <startEvent id="start"/>
        <userTask id="task1"/>
        <exclusiveGateway id="gateway"/>
        <endEvent id="end"/>
    </process>
</definitions>
  1. 要在 Go 中实现类似 Activiti 的复杂功能,需要大量额外代码:
// 1. 多实例任务支持
type MultiInstanceTask struct {
    BaseTask
    Instances     int
    CompletedCount int
    Collection    []interface{}
    ElementVariable string
}

func (t *MultiInstanceTask) Execute(ctx *ExecutionContext) error {
    // 创建多个任务实例
    for i := 0; i < t.Instances; i++ {
        task := &UserTask{
            ID: fmt.Sprintf("%s_%d", t.ID, i),
            Name: t.Name,
            Assignee: t.getAssignee(i),
        }
        // 执行每个实例
        if err := task.Execute(ctx); err != nil {
            return err
        }
    }
    return nil
}

// 2. 子流程支持
type SubProcess struct {
    BaseElement
    Elements []FlowElement
    Async    bool
}

func (s *SubProcess) Execute(ctx *ExecutionContext) error {
    // 创建子流程上下文
    subCtx := NewExecutionContext(ctx.Engine)
    subCtx.Variables = ctx.Variables.Copy()
    
    // 执行子流程
    for _, element := range s.Elements {
        if err := element.Execute(subCtx); err != nil {
            return err
        }
    }
    return nil
}

// 3. 复杂网关支持
type ComplexGateway struct {
    BaseGateway
    ActivationCondition string
    OutgoingFlows      []*SequenceFlow
}

func (g *ComplexGateway) Execute(ctx *ExecutionContext) error {
    // 评估激活条件
    if !evaluateCondition(g.ActivationCondition, ctx.Variables) {
        return nil
    }
    
    // 评估所有出口流
    var validFlows []*SequenceFlow
    for _, flow := range g.OutgoingFlows {
        if evaluateCondition(flow.Condition, ctx.Variables) {
            validFlows = append(validFlows, flow)
        }
    }
    
    // 执行有效的流程路径
    for _, flow := range validFlows {
        if err := ctx.Engine.executeFlow(flow, ctx); err != nil {
            return err
        }
    }
    return nil
}

// 4. 事件处理支持
type EventDefinition interface {
    HandleEvent(ctx *ExecutionContext) error
}

type TimerEventDefinition struct {
    TimeDate      string
    TimeDuration  string
    TimeCycle     string
}

func (t *TimerEventDefinition) HandleEvent(ctx *ExecutionContext) error {
    // 处理定时器事件
    switch {
    case t.TimeDate != "":
        return t.handleTimeDate(ctx)
    case t.TimeDuration != "":
        return t.handleTimeDuration(ctx)
    case t.TimeCycle != "":
        return t.handleTimeCycle(ctx)
    }
    return nil
}

// 5. 表达式引擎支持
type ExpressionEngine interface {
    Evaluate(expression string, variables map[string]interface{}) (interface{}, error)
}

type GovaluateExpressionEngine struct {
    // 使用 Govaluate 库实现表达式计算
}

func (e *GovaluateExpressionEngine) Evaluate(expression string, variables map[string]interface{}) (interface{}, error) {
    expr, err := govaluate.NewEvaluableExpression(expression)
    if err != nil {
        return nil, err
    }
    return expr.Evaluate(variables)
}
  1. 需要添加更多的数据表来支持复杂功能:
-- 多实例任务表
CREATE TABLE workflow_multi_instance_tasks (
    id VARCHAR(36) PRIMARY KEY,
    task_id VARCHAR(36),
    parent_task_id VARCHAR(36),
    sequence_number INT,
    total_instances INT,
    completed_count INT
);

-- 变量表
CREATE TABLE workflow_variables (
    id VARCHAR(36) PRIMARY KEY,
    instance_id VARCHAR(36),
    scope_id VARCHAR(36),
    name VARCHAR(100),
    type VARCHAR(50),
    value TEXT
);

-- 事件表
CREATE TABLE workflow_events (
    id VARCHAR(36) PRIMARY KEY,
    instance_id VARCHAR(36),
    event_type VARCHAR(50),
    element_id VARCHAR(36),
    payload JSON,
    created_at TIMESTAMP
);

-- 定时器表
CREATE TABLE workflow_timers (
    id VARCHAR(36) PRIMARY KEY,
    instance_id VARCHAR(36),
    element_id VARCHAR(36),
    type VARCHAR(20),
    due_date TIMESTAMP,
    repeat_count INT,
    handler VARCHAR(100)
);

-- 历史记录表
CREATE TABLE workflow_history (
    id VARCHAR(36) PRIMARY KEY,
    instance_id VARCHAR(36),
    element_id VARCHAR(36),
    element_type VARCHAR(50),
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    duration BIGINT,
    status VARCHAR(20)
);
  1. 完整的流程引擎还需要:
  • 版本控制系统
  • 流程迁移功能
  • 流程监控功能
  • 流程分析功能
  • 任务委派功能
  • 流程回滚功能
  • 并行处理能力
  • 异常处理机制
  • 事务管理
  • 权限控制
  • 审计日志

所以,如果你需要实现复杂的工作流程:

  1. 建议使用成熟的工作流引擎(如 Zeebe)
  2. 或者基于现有工作流引擎进行扩展
  3. 自己实现的话需要投入大量开发资源

如果只是简单的审批流程:

  1. 可以使用简化版的工作流引擎
  2. 根据实际需求逐步扩展功能
  3. 关注核心业务流程的实现

用户:

我们没有那么多资源开发的,我们如果希望最小成本的实现支持复杂的审批流的功能,用go语言你推荐用哪一个?

ChatGPT:

对于 Go 语言,我强烈推荐使用 Temporal,原因如下:

  1. 优势:
  • 完整的工作流引擎
  • 官方提供 Go SDK
  • 部署维护相对简单
  • 社区活跃,文档完善
  • 支持复杂流程
  • 可靠性高
  • 自带 UI 界面
  1. 基本使用示例:
// 1. 定义工作流
func LeaveApprovalWorkflow(ctx workflow.Context, request LeaveRequest) error {
    // 设置重试策略
    options := workflow.ActivityOptions{
        StartToCloseTimeout: 24 * time.Hour,
        RetryPolicy: &temporal.RetryPolicy{
            InitialInterval:    time.Minute,
            MaximumInterval:    time.Hour,
            MaximumAttempts:    3,
        },
    }
    ctx = workflow.WithActivityOptions(ctx, options)

    // 定义流程步骤
    var result ApprovalResult

    // 1. 部门主管审批
    err := workflow.ExecuteActivity(ctx, DepartmentApproval, request).Get(ctx, &result)
    if err != nil {
        return err
    }
    if !result.Approved {
        return workflow.ExecuteActivity(ctx, NotifyRejection, request).Get(ctx, nil)
    }

    // 2. 判断是否需要总监审批
    if request.Days > 5 {
        err = workflow.ExecuteActivity(ctx, DirectorApproval, request).Get(ctx, &result)
        if err != nil {
            return err
        }
        if !result.Approved {
            return workflow.ExecuteActivity(ctx, NotifyRejection, request).Get(ctx, nil)
        }
    }

    // 3. HR审批
    err = workflow.ExecuteActivity(ctx, HRApproval, request).Get(ctx, &result)
    if err != nil {
        return err
    }

    // 4. 完成审批
    return workflow.ExecuteActivity(ctx, FinalizeApproval, request, result).Get(ctx, nil)
}

// 2. 定义活动
type Activities struct {
    leaveService *LeaveService
}

func (a *Activities) DepartmentApproval(ctx context.Context, request LeaveRequest) (ApprovalResult, error) {
    // 实现部门审批逻辑
    return a.leaveService.ProcessDepartmentApproval(request)
}

func (a *Activities) HRApproval(ctx context.Context, request LeaveRequest) (ApprovalResult, error) {
    // 实现HR审批逻辑
    return a.leaveService.ProcessHRApproval(request)
}

// 3. 启动工作流
func StartLeaveWorkflow(c *gin.Context) {
    client := c.MustGet("temporal").(*client.Client)
    
    var request LeaveRequest
    if err := c.ShouldBindJSON(&request); err != nil {
        c.JSON(400, gin.H{"error": err.Error()})
        return
    }
    
    workflowOptions := client.StartWorkflowOptions{
        ID:        "leave_" + uuid.New().String(),
        TaskQueue: "leave-approval",
    }
    
    we, err := client.ExecuteWorkflow(context.Background(), workflowOptions, LeaveApprovalWorkflow, request)
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }
    
    c.JSON(200, gin.H{"workflowId": we.GetID()})
}
  1. 复杂流程示例:
// 并行审批流程
func ParallelApprovalWorkflow(ctx workflow.Context, request LeaveRequest) error {
    ctx = workflow.WithActivityOptions(ctx, options)
    
    // 创建通道接收审批结果
    future1 := workflow.ExecuteActivity(ctx, DepartmentApproval, request)
    future2 := workflow.ExecuteActivity(ctx, HRApproval, request)
    
    // 等待所有审批完成
    var result1, result2 ApprovalResult
    err1 := future1.Get(ctx, &result1)
    err2 := future2.Get(ctx, &result2)
    
    if err1 != nil || err2 != nil {
        return errors.New("approval failed")
    }
    
    // 所有审批都通过才算通过
    if !result1.Approved || !result2.Approved {
        return workflow.ExecuteActivity(ctx, NotifyRejection, request).Get(ctx, nil)
    }
    
    return workflow.ExecuteActivity(ctx, FinalizeApproval, request).Get(ctx, nil)
}

// 带超时的审批流程
func TimeoutApprovalWorkflow(ctx workflow.Context, request LeaveRequest) error {
    // 设置超时选项
    options := workflow.ActivityOptions{
        StartToCloseTimeout: 24 * time.Hour,
        HeartbeatTimeout:    time.Hour,
    }
    ctx = workflow.WithActivityOptions(ctx, options)
    
    // 创建子工作流处理超时
    childWorkflow := workflow.ChildWorkflowOptions{
        WorkflowID:        "timeout_" + request.ID,
        WorkflowRunTimeout: 3 * 24 * time.Hour,
    }
    ctx = workflow.WithChildOptions(ctx, childWorkflow)
    
    var result ApprovalResult
    err := workflow.ExecuteActivity(ctx, DepartmentApproval, request).Get(ctx, &result)
    if err != nil {
        if temporal.IsTimeoutError(err) {
            return workflow.ExecuteActivity(ctx, HandleTimeout, request).Get(ctx, nil)
        }
        return err
    }
    
    return nil
}
  1. 查询任务:
func GetUserTasks(c *gin.Context) {
    client := c.MustGet("temporal").(*client.Client)
    userId := c.Query("userId")
    
    // 查询该用户的所有待办任务
    resp, err := client.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{
        Query: fmt.Sprintf("WorkflowType = 'LeaveApprovalWorkflow' AND currentApprover = '%s'", userId),
    })
    if err != nil {
        c.JSON(500, gin.H{"error": err.Error()})
        return
    }
    
    c.JSON(200, resp.Executions)
}
  1. 配置示例:
func main() {
    // 创建 Temporal 客户端
    c, err := client.NewClient(client.Options{
        HostPort: "localhost:7233",
    })
    if err != nil {
        log.Fatalln("Unable to create client", err)
    }
    defer c.Close()
    
    // 注册工作流和活动
    w := worker.New(c, "leave-approval", worker.Options{})
    w.RegisterWorkflow(LeaveApprovalWorkflow)
    w.RegisterActivity(&Activities{leaveService: NewLeaveService()})
    
    // 启动 worker
    err = w.Run(worker.InterruptCh())
    if err != nil {
        log.Fatalln("Unable to start worker", err)
    }
}

使用 Temporal 的好处:

  1. 开箱即用的功能:

    • 工作流版本控制
    • 重试机制
    • 超时处理
    • 并行处理
    • 子工作流
    • 活动监控
  2. 运维友好:

    • Web UI 监控
    • 详细的执行历史
    • 方便的调试工具
    • 完善的监控指标
  3. 可靠性保证:

    • 状态持久化
    • 故障恢复
    • 事务一致性
    • 幂等性保证

建议:

  1. 先运行官方示例熟悉基本概念
  2. 从简单流程开始,逐步添加复杂性
  3. 利用内置的重试和超时机制
  4. 做好监控和日志
  5. 考虑使用 SDK 的高级特性
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容