新的聊天
用户:
你知道java里面的这个项目么 Activiti
ChatGPT:
是的,我了解 Activiti。Activiti 是一个开源的工作流引擎和业务流程管理(BPM)平台,主要用于实现业务流程的自动化。以下是 Activiti 的一些主要特点:
- 核心功能:
- 流程定义和部署
- 流程实例管理
- 任务管理
- 用户和用户组管理
- 历史数据管理
- 主要特性:
- 基于 BPMN 2.0 标准
- 轻量级架构
- 与 Spring 框架良好集成
- 提供可视化流程设计器
- 支持多种数据库
- REST API 支持
- 常见应用场景:
- 请假审批流程
- 报销审批流程
- 采购审批流程
- 文档审批流程
- 其他业务流程自动化
- 核心组件:
// 流程引擎配置
ProcessEngineConfiguration configuration = ProcessEngineConfiguration
.createStandaloneProcessEngineConfiguration();
// 流程引擎
ProcessEngine processEngine = configuration.buildProcessEngine();
// 运行时服务
RuntimeService runtimeService = processEngine.getRuntimeService();
// 任务服务
TaskService taskService = processEngine.getTaskService();
// 仓库服务
RepositoryService repositoryService = processEngine.getRepositoryService();
- 基本使用示例:
// 部署流程定义
Deployment deployment = repositoryService.createDeployment()
.addClasspathResource("processes/myProcess.bpmn20.xml")
.deploy();
// 启动流程实例
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("myProcess");
// 查询任务
List<Task> tasks = taskService.createTaskQuery()
.taskAssignee("john")
.list();
// 完成任务
taskService.complete(task.getId());
- 与 Spring Boot 集成:
@Configuration
public class ActivitiConfig {
@Bean
public ProcessEngine processEngine() {
return ProcessEngineConfiguration
.createStandaloneProcessEngineConfiguration()
.setDatabaseSchemaUpdate(ProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE)
.setJdbcUrl("jdbc:h2:mem:activiti;DB_CLOSE_DELAY=1000")
.setJdbcDriver("org.h2.Driver")
.setJdbcUsername("sa")
.setJdbcPassword("")
.buildProcessEngine();
}
}
- 主要优势:
- 开源免费
- 社区活跃
- 文档完善
- 易于集成
- 性能优秀
- 功能丰富
- 注意事项:
- 需要合理设计流程
- 注意流程版本管理
- 关注性能优化
- 做好异常处理
- 注意数据安全
Activiti 是一个成熟的工作流引擎,适合用于企业级应用开发。它提供了完整的流程管理功能,可以帮助开发者快速实现各种业务流程的自动化。
用户:
现在我们公司要做一个审批流,比如现在我希望实现一个请假审批流程,这个我该怎么用这个实现?
ChatGPT:
我来帮你设计一个基于 Activiti 的请假审批流程。我们可以分以下几个步骤实现:
- 首先,设计请假流程的 BPMN 模型(可以使用 Activiti Designer 设计):
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
targetNamespace="http://www.activiti.org/processdef">
<process id="leaveProcess" name="请假流程" isExecutable="true">
<!-- 开始节点 -->
<startEvent id="startEvent" name="开始"/>
<!-- 填写请假申请 -->
<userTask id="submitForm" name="填写请假申请"
activiti:assignee="${applicant}"/>
<!-- 直接主管审批 -->
<userTask id="leaderApprove" name="主管审批"
activiti:assignee="${leader}"/>
<!-- 人事审批 -->
<userTask id="hrApprove" name="人事审批"
activiti:assignee="${hr}"/>
<!-- 结束节点 -->
<endEvent id="endEvent" name="结束"/>
<!-- 连线 -->
<sequenceFlow sourceRef="startEvent" targetRef="submitForm"/>
<sequenceFlow sourceRef="submitForm" targetRef="leaderApprove"/>
<sequenceFlow sourceRef="leaderApprove" targetRef="hrApprove"/>
<sequenceFlow sourceRef="hrApprove" targetRef="endEvent"/>
</process>
</definitions>
- 创建请假实体类:
@Data
public class LeaveRequest {
private String id;
private String userId; // 申请人ID
private String reason; // 请假原因
private Date startTime; // 开始时间
private Date endTime; // 结束时间
private Integer days; // 请假天数
private String status; // 状态
private String processInstanceId; // 流程实例ID
}
- 创建请假服务接口:
@Service
@Transactional
public class LeaveService {
@Autowired
private RuntimeService runtimeService;
@Autowired
private TaskService taskService;
@Autowired
private RepositoryService repositoryService;
// 发起请假申请
public void startLeaveProcess(LeaveRequest leaveRequest) {
// 保存请假信息
// saveLeaveRequest(leaveRequest);
// 启动流程实例
Map<String, Object> variables = new HashMap<>();
variables.put("applicant", leaveRequest.getUserId());
variables.put("leader", "直接主管的用户ID");
variables.put("hr", "人事的用户ID");
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
"leaveProcess",
leaveRequest.getId(), // 业务键
variables
);
// 关联业务ID和流程实例ID
leaveRequest.setProcessInstanceId(processInstance.getId());
// updateLeaveRequest(leaveRequest);
}
// 审批
public void approve(String taskId, boolean approved, String comment) {
// 添加审批意见
taskService.addComment(taskId, null, comment);
Map<String, Object> variables = new HashMap<>();
variables.put("approved", approved);
// 完成任务
taskService.complete(taskId, variables);
}
// 查询待办任务
public List<Task> findTodoTasks(String userId) {
return taskService.createTaskQuery()
.taskAssignee(userId)
.active()
.list();
}
}
- 创建控制器:
@RestController
@RequestMapping("/leave")
public class LeaveController {
@Autowired
private LeaveService leaveService;
// 发起请假申请
@PostMapping("/start")
public Result startLeave(@RequestBody LeaveRequest leaveRequest) {
leaveService.startLeaveProcess(leaveRequest);
return Result.success();
}
// 审批
@PostMapping("/approve")
public Result approve(@RequestParam String taskId,
@RequestParam boolean approved,
@RequestParam String comment) {
leaveService.approve(taskId, approved, comment);
return Result.success();
}
// 获取待办任务
@GetMapping("/tasks")
public Result getTodoTasks(@RequestParam String userId) {
List<Task> tasks = leaveService.findTodoTasks(userId);
return Result.success(tasks);
}
}
- 配置文件 application.yml:
spring:
datasource:
url: jdbc:mysql://localhost:3306/activiti?useUnicode=true&characterEncoding=utf8
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
activiti:
database-schema-update: true
history-level: full
db-history-used: true
- 使用流程:
// 1. 员工发起请假申请
POST /leave/start
{
"userId": "emp001",
"reason": "年假",
"startTime": "2023-05-01",
"endTime": "2023-05-03",
"days": 3
}
// 2. 主管查看待办任务
GET /leave/tasks?userId=leader001
// 3. 主管审批
POST /leave/approve?taskId=xxx&approved=true&comment=同意
// 4. 人事查看待办任务
GET /leave/tasks?userId=hr001
// 5. 人事审批
POST /leave/approve?taskId=xxx&approved=true&comment=同意
补充说明:
-
实际使用时需要加入更多的业务逻辑,如:
- 请假天数的计算和验证
- 各级审批的具体规则
- 邮件通知
- 流程跟踪
- 审批历史查询
-
可以添加更多功能:
- 驳回功能
- 加签/减签
- 转办功能
- 催办功能
- 流程撤回
-
安全性考虑:
- 权限控制
- 数据验证
- 操作日志
- 事务处理
-
建议增加的功能:
- 流程状态查询
- 审批历史查询
- 请假统计
- 导出功能
这是一个基本的请假流程实现,你可以根据实际需求进行调整和扩展。
用户:
我想知道这个数据是存在哪里了?比如这个接口:GET /leave/tasks?userId=hr001,它的数据是从哪里查出来的?
ChatGPT:
Activiti 会在数据库中创建多个表来存储流程相关的所有数据。当你启动 Activiti 时,它会自动创建大约 25-30 个表。以下是一些主要的表:
- 核心运行时表(以 ACT_RU_ 开头):
-- 运行时流程实例表
ACT_RU_EXECUTION
-- 运行时任务表(你查询的待办任务主要从这个表查询)
ACT_RU_TASK
-- 运行时变量表
ACT_RU_VARIABLE
-- 运行时身份链接表
ACT_RU_IDENTITYLINK
- 历史表(以 ACT_HI_ 开头):
-- 历史活动实例表
ACT_HI_ACTINST
-- 历史任务实例表
ACT_HI_TASKINST
-- 历史流程实例表
ACT_HI_PROCINST
-- 历史变量表
ACT_HI_VARINST
- 仓库表(以 ACT_RE_ 开头):
-- 流程定义表
ACT_RE_PROCDEF
-- 部署表
ACT_RE_DEPLOYMENT
当你调用 GET /leave/tasks?userId=hr001 这个接口时,实际上是查询 ACT_RU_TASK 表:
// 这段代码
taskService.createTaskQuery()
.taskAssignee(userId)
.active()
.list();
// 会转换成类似这样的 SQL
SELECT * FROM ACT_RU_TASK
WHERE ASSIGNEE_ = 'hr001'
AND SUSPENSION_STATE_ = 1;
以下是 ACT_RU_TASK 表的主要字段:
CREATE TABLE ACT_RU_TASK (
ID_ varchar(64) NOT NULL, -- 任务ID
NAME_ varchar(255), -- 任务名称
TASK_DEF_KEY_ varchar(255), -- 任务定义的ID
PROC_INST_ID_ varchar(64), -- 流程实例ID
PROC_DEF_ID_ varchar(64), -- 流程定义ID
ASSIGNEE_ varchar(255), -- 签收人/处理人
PRIORITY_ integer, -- 优先级
CREATE_TIME_ timestamp, -- 创建时间
DUE_DATE_ timestamp, -- 到期时间
SUSPENSION_STATE_ integer, -- 挂起状态
-- 其他字段...
PRIMARY KEY (ID_)
);
如果你想要查看完整的数据,可以直接查询数据库:
- 查看待办任务:
SELECT * FROM ACT_RU_TASK
WHERE ASSIGNEE_ = 'hr001';
- 查看流程实例:
SELECT * FROM ACT_RU_EXECUTION;
- 查看历史任务:
SELECT * FROM ACT_HI_TASKINST;
- 查看流程变量:
SELECT * FROM ACT_RU_VARIABLE;
需要注意的是:
- 业务数据(如请假信息)通常需要自己创建表存储:
CREATE TABLE leave_request (
id VARCHAR(32) PRIMARY KEY,
user_id VARCHAR(32),
reason VARCHAR(255),
start_time DATETIME,
end_time DATETIME,
days INT,
status VARCHAR(20),
process_instance_id VARCHAR(64),
create_time DATETIME,
update_time DATETIME
);
- 关联查询示例:
-- 查询某个用户的请假申请及其当前任务状态
SELECT
l.*,
t.NAME_ as current_task,
t.ASSIGNEE_ as current_assignee
FROM leave_request l
LEFT JOIN ACT_RU_TASK t ON l.process_instance_id = t.PROC_INST_ID_
WHERE l.user_id = 'emp001';
- 如果需要查询历史记录:
-- 查询某个请假申请的完整审批历史
SELECT
hi.START_TIME_,
hi.END_TIME_,
hi.NAME_,
hi.ASSIGNEE_,
c.MESSAGE_
FROM ACT_HI_TASKINST hi
LEFT JOIN ACT_HI_COMMENT c ON hi.ID_ = c.TASK_ID_
WHERE hi.PROC_INST_ID_ = '某个流程实例ID';
所以,Activiti 的数据主要分为三类:
- 流程定义数据(ACT_RE_*表)
- 运行时数据(ACT_RU_*表)
- 历史数据(ACT_HI_*表)
而业务相关的数据需要自己设计表结构来存储,并通过流程实例ID与 Activiti 的表关联。
用户:
那GET /leave/tasks?userId=hr001获取到的数据大概是怎么样的,能够满足展示需求么?
ChatGPT:
GET /leave/tasks?userId=hr001 直接获取到的 Task 对象包含的信息相对比较基础,可能无法完全满足实际展示需求。我来详细说明一下:
- 原始返回的 Task 对象包含的基础信息:
{
"id": "1234", // 任务ID
"name": "人事审批", // 任务名称
"createTime": "2023-05-01 10:00:00", // 任务创建时间
"assignee": "hr001", // 处理人
"processInstanceId": "5678", // 流程实例ID
"processDefinitionId": "leaveProcess:1:9012", // 流程定义ID
"taskDefinitionKey": "hrApprove" // 任务定义KEY
}
- 实际展示时通常需要的信息:
{
// 任务基本信息
"taskId": "1234",
"taskName": "人事审批",
"createTime": "2023-05-01 10:00:00",
"dueDate": "2023-05-03 10:00:00", // 截止时间
"priority": 50, // 优先级
// 请假申请信息
"leaveInfo": {
"applyId": "LEAVE202305010001",
"applicantId": "emp001",
"applicantName": "张三", // 申请人姓名
"department": "技术部", // 部门
"reason": "年假", // 请假原因
"startTime": "2023-05-05", // 开始时间
"endTime": "2023-05-07", // 结束时间
"days": 3, // 请假天数
"leaveType": "年假", // 请假类型
"applyTime": "2023-05-01 09:30:00" // 申请时间
},
// 流程信息
"processInfo": {
"processInstanceId": "5678",
"currentNode": "人事审批",
"previousNode": "主管审批",
"previousApprover": "leader001",
"previousApproveTime": "2023-05-01 09:45:00",
"previousComment": "同意" // 上一步审批意见
}
}
- 为了获取完整信息,需要扩展服务类:
@Service
public class LeaveService {
@Autowired
private TaskService taskService;
@Autowired
private RuntimeService runtimeService;
@Autowired
private HistoryService historyService;
@Autowired
private LeaveRequestMapper leaveRequestMapper; // 自定义的请假信息Mapper
@Autowired
private UserService userService; // 自定义的用户服务
public List<TaskVO> findTodoTasks(String userId) {
// 1. 查询任务列表
List<Task> tasks = taskService.createTaskQuery()
.taskAssignee(userId)
.active()
.orderByTaskCreateTime().desc()
.list();
// 2. 转换为展示对象
return tasks.stream().map(task -> {
TaskVO vo = new TaskVO();
// 设置任务基本信息
vo.setTaskId(task.getId());
vo.setTaskName(task.getName());
vo.setCreateTime(task.getCreateTime());
vo.setDueDate(task.getDueDate());
vo.setPriority(task.getPriority());
// 获取流程实例ID
String processInstanceId = task.getProcessInstanceId();
// 查询请假信息
LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
LeaveInfoVO leaveInfo = convertToLeaveInfoVO(leaveRequest);
// 查询申请人信息
User applicant = userService.getUser(leaveRequest.getUserId());
leaveInfo.setApplicantName(applicant.getName());
leaveInfo.setDepartment(applicant.getDepartment());
vo.setLeaveInfo(leaveInfo);
// 查询流程信息
ProcessInfoVO processInfo = getProcessInfo(processInstanceId);
vo.setProcessInfo(processInfo);
return vo;
}).collect(Collectors.toList());
}
private ProcessInfoVO getProcessInfo(String processInstanceId) {
ProcessInfoVO vo = new ProcessInfoVO();
vo.setProcessInstanceId(processInstanceId);
// 查询历史任务
List<HistoricTaskInstance> historicTasks = historyService.createHistoricTaskInstanceQuery()
.processInstanceId(processInstanceId)
.finished()
.orderByHistoricTaskInstanceEndTime().desc()
.list();
if (!historicTasks.isEmpty()) {
HistoricTaskInstance previousTask = historicTasks.get(0);
vo.setPreviousNode(previousTask.getName());
vo.setPreviousApprover(previousTask.getAssignee());
vo.setPreviousApproveTime(previousTask.getEndTime());
// 查询审批意见
List<Comment> comments = taskService.getTaskComments(previousTask.getId());
if (!comments.isEmpty()) {
vo.setPreviousComment(comments.get(0).getFullMessage());
}
}
return vo;
}
}
- 控制器改造:
@RestController
@RequestMapping("/leave")
public class LeaveController {
@Autowired
private LeaveService leaveService;
@GetMapping("/tasks")
public Result<List<TaskVO>> getTodoTasks(@RequestParam String userId) {
List<TaskVO> tasks = leaveService.findTodoTasks(userId);
return Result.success(tasks);
}
}
- 前端展示示例:
<template>
<div class="todo-tasks">
<a-table :dataSource="tasks">
<!-- 申请人信息 -->
<a-table-column title="申请人" dataIndex="leaveInfo.applicantName" />
<a-table-column title="部门" dataIndex="leaveInfo.department" />
<!-- 请假信息 -->
<a-table-column title="请假类型" dataIndex="leaveInfo.leaveType" />
<a-table-column title="请假时间" >
<template slot-scope="text, record">
{{record.leaveInfo.startTime}} 至 {{record.leaveInfo.endTime}}
(共{{record.leaveInfo.days}}天)
</template>
</a-table-column>
<a-table-column title="请假原因" dataIndex="leaveInfo.reason" />
<!-- 流程信息 -->
<a-table-column title="当前节点" dataIndex="taskName" />
<a-table-column title="上一审批人" dataIndex="processInfo.previousApprover" />
<a-table-column title="上一审批意见" dataIndex="processInfo.previousComment" />
<!-- 操作按钮 -->
<a-table-column title="操作">
<template slot-scope="text, record">
<a-button @click="approve(record)">审批</a-button>
</template>
</a-table-column>
</a-table>
</div>
</template>
这样设计的好处是:
- 提供了完整的业务信息展示
- 减少前端的数据处理逻辑
- 减少前端的请求次数
- 提供更好的用户体验
建议:
- 添加分页查询
- 添加查询条件(如按申请时间、请假类型等筛选)
- 添加排序功能
- 添加缓存机制
- 考虑性能优化(如批量查询)
用户:
因为我自己想到的流程的处理有的时候是很复杂的,比如有多条线路,甚至有些是and的关系,有些是or的关系,有可能终止,有可能通过,甚至有些节点还有时间要求,这些都是这个Activiti自己完全管理的么?
ChatGPT:
是的,Activiti 支持这些复杂的流程控制。我来详细解释一下各种场景的实现:
- 多条路线(网关):
<!-- 排他网关(XOR,只走一条路) -->
<exclusiveGateway id="daysCheck" name="请假天数判断">
<sequenceFlow sourceRef="daysCheck" targetRef="leaderApprove">
<conditionExpression>${days <= 3}</conditionExpression>
</sequenceFlow>
<sequenceFlow sourceRef="daysCheck" targetRef="directorApprove">
<conditionExpression>${days > 3}</conditionExpression>
</sequenceFlow>
</exclusiveGateway>
<!-- 并行网关(AND,所有路线都要走) -->
<parallelGateway id="multiApprove" name="多人同时审批">
<sequenceFlow sourceRef="multiApprove" targetRef="leaderApprove"/>
<sequenceFlow sourceRef="multiApprove" targetRef="hrApprove"/>
</parallelGateway>
<!-- 包容网关(OR,满足条件的都要走) -->
<inclusiveGateway id="conditionalApprove" name="条件审批">
<sequenceFlow sourceRef="conditionalApprove" targetRef="leaderApprove">
<conditionExpression>${days > 3}</conditionExpression>
</sequenceFlow>
<sequenceFlow sourceRef="conditionalApprove" targetRef="hrApprove">
<conditionExpression>${type == 'sick'}</conditionExpression>
</sequenceFlow>
</inclusiveGateway>
- 流程终止和通过:
<!-- 终止结束事件 -->
<endEvent id="terminateEnd" name="终止">
<terminateEventDefinition/>
</endEvent>
<!-- 条件判断终止 -->
<sequenceFlow sourceRef="checkResult" targetRef="terminateEnd">
<conditionExpression>${!approved}</conditionExpression>
</sequenceFlow>
<!-- 正常通过 -->
<sequenceFlow sourceRef="checkResult" targetRef="normalEnd">
<conditionExpression>${approved}</conditionExpression>
</sequenceFlow>
- 时间控制:
<!-- 定时开始事件 -->
<startEvent id="timerStart">
<timerEventDefinition>
<timeDate>2023-12-31T23:59:59</timeDate>
</timerEventDefinition>
</startEvent>
<!-- 超时边界事件 -->
<boundaryEvent id="timeoutAlert" attachedToRef="leaderApprove">
<timerEventDefinition>
<timeDuration>P2D</timeDuration> <!-- 2天后触发 -->
</timerEventDefinition>
</boundaryEvent>
<!-- 定时任务 -->
<serviceTask id="reminderTask">
<timerEventDefinition>
<timeCycle>R3/PT10H</timeCycle> <!-- 每10小时执行一次,共执行3次 -->
</timerEventDefinition>
</serviceTask>
- 复杂业务规则示例:
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL">
<process id="complexLeaveProcess" name="复杂请假流程">
<!-- 开始 -->
<startEvent id="start"/>
<!-- 填写请假单 -->
<userTask id="submitForm" name="填写请假单"/>
<!-- 判断请假类型 -->
<exclusiveGateway id="leaveTypeCheck"/>
<!-- 病假流程 -->
<sequenceFlow sourceRef="leaveTypeCheck" targetRef="sickLeaveProcess">
<conditionExpression>${type == 'sick'}</conditionExpression>
</sequenceFlow>
<!-- 年假流程 -->
<sequenceFlow sourceRef="leaveTypeCheck" targetRef="annualLeaveProcess">
<conditionExpression>${type == 'annual'}</conditionExpression>
</sequenceFlow>
<!-- 病假处理 -->
<subProcess id="sickLeaveProcess">
<!-- 并行审批 -->
<parallelGateway id="sickLeaveParallel"/>
<!-- 主管审批 -->
<userTask id="leaderApprove" name="主管审批">
<!-- 超时提醒 -->
<boundaryEvent id="leaderTimeout">
<timerEventDefinition>
<timeDuration>P1D</timeDuration>
</timerEventDefinition>
</boundaryEvent>
</userTask>
<!-- 医疗证明检查 -->
<userTask id="medicalCheck" name="医疗证明检查"/>
<!-- 汇总网关 -->
<parallelGateway id="sickLeaveJoin"/>
</subProcess>
<!-- 年假处理 -->
<subProcess id="annualLeaveProcess">
<!-- 判断天数 -->
<exclusiveGateway id="daysCheck"/>
<!-- 短期请假 -->
<sequenceFlow sourceRef="daysCheck" targetRef="shortLeave">
<conditionExpression>${days <= 3}</conditionExpression>
</sequenceFlow>
<!-- 长期请假 -->
<sequenceFlow sourceRef="daysCheck" targetRef="longLeave">
<conditionExpression>${days > 3}</conditionExpression>
</sequenceFlow>
<!-- 包容网关 -->
<inclusiveGateway id="approveGateway"/>
<!-- 各种审批条件 -->
<sequenceFlow sourceRef="approveGateway" targetRef="leaderApprove">
<conditionExpression>${days > 0}</conditionExpression>
</sequenceFlow>
<sequenceFlow sourceRef="approveGateway" targetRef="hrApprove">
<conditionExpression>${days > 3}</conditionExpression>
</sequenceFlow>
<sequenceFlow sourceRef="approveGateway" targetRef="directorApprove">
<conditionExpression>${days > 5}</conditionExpression>
</sequenceFlow>
</subProcess>
<!-- 结果处理 -->
<exclusiveGateway id="resultCheck"/>
<!-- 审批通过 -->
<sequenceFlow sourceRef="resultCheck" targetRef="approved">
<conditionExpression>${approved}</conditionExpression>
</sequenceFlow>
<!-- 审批拒绝 -->
<sequenceFlow sourceRef="resultCheck" targetRef="rejected">
<conditionExpression>${!approved}</conditionExpression>
</sequenceFlow>
<!-- 结束事件 -->
<endEvent id="end"/>
</process>
</definitions>
- 在代码中处理复杂逻辑:
@Service
public class LeaveProcessService {
@Autowired
private RuntimeService runtimeService;
@Autowired
private TaskService taskService;
// 启动流程时设置变量
public void startProcess(LeaveRequest leave) {
Map<String, Object> variables = new HashMap<>();
variables.put("type", leave.getType());
variables.put("days", leave.getDays());
variables.put("applicant", leave.getUserId());
variables.put("approved", false);
// 设置动态审批人
variables.put("leader", getLeader(leave.getUserId()));
variables.put("hr", getHrApprover(leave.getType()));
variables.put("director", getDirector(leave.getDepartment()));
runtimeService.startProcessInstanceByKey(
"complexLeaveProcess",
leave.getId(),
variables
);
}
// 完成任务时更新变量
public void completeTask(String taskId, boolean approved, String comment) {
// 添加评论
taskService.addComment(taskId, null, comment);
// 更新变量
Map<String, Object> variables = new HashMap<>();
variables.put("approved", approved);
variables.put("comment", comment);
variables.put("completeTime", new Date());
// 完成任务
taskService.complete(taskId, variables);
}
// 处理超时
@Scheduled(fixedRate = 3600000) // 每小时执行
public void handleTimeout() {
// 查询超时任务
List<Task> timeoutTasks = taskService.createTaskQuery()
.active()
.dueBeforeDate(new Date())
.list();
for (Task task : timeoutTasks) {
// 发送提醒
sendReminder(task);
// 或自动处理
autoProcess(task);
}
}
}
Activiti 提供了强大的流程控制功能:
- 支持各种类型的网关(排他、并行、包容)
- 支持子流程
- 支持定时事件
- 支持边界事件
- 支持条件判断
- 支持动态分配处理人
- 支持流程变量
- 支持事件监听
建议:
- 合理设计流程,避免过于复杂
- 注意处理异常情况
- 做好流程监控
- 考虑性能影响
- 保持流程版本管理
用户:
比如,现在我现在提交一个病假后,HR需要进行审批了,这个时候前端应该调用后端的哪一个接口?
ChatGPT:
对于 HR 审批这个操作,通常需要以下几个接口:
- 首先,HR 需要查看待办任务:
@GetMapping("/tasks")
public Result<List<TaskVO>> getTodoTasks(@RequestParam String userId) {
// HR的userId查询待办任务
List<TaskVO> tasks = leaveService.findTodoTasks(userId);
return Result.success(tasks);
}
- 查看具体任务的详细信息:
@GetMapping("/task/{taskId}")
public Result<TaskDetailVO> getTaskDetail(@PathVariable String taskId) {
TaskDetailVO detail = leaveService.getTaskDetail(taskId);
return Result.success(detail);
}
@Service
public class LeaveService {
@Autowired
private TaskService taskService;
public TaskDetailVO getTaskDetail(String taskId) {
// 1. 获取任务信息
Task task = taskService.createTaskQuery()
.taskId(taskId)
.singleResult();
// 2. 获取流程实例ID
String processInstanceId = task.getProcessInstanceId();
// 3. 查询请假信息
LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
// 4. 查询流程历史记录
List<HistoricActivityInstance> activities = historyService.createHistoricActivityInstanceQuery()
.processInstanceId(processInstanceId)
.finished()
.orderByHistoricActivityInstanceEndTime().asc()
.list();
// 5. 组装详细信息
TaskDetailVO detail = new TaskDetailVO();
detail.setTask(task);
detail.setLeaveRequest(leaveRequest);
detail.setHistoryActivities(activities);
return detail;
}
}
- 提交审批结果:
@PostMapping("/task/approve")
public Result approveTask(@RequestBody ApproveDTO approveDTO) {
leaveService.completeTask(approveDTO);
return Result.success();
}
// 审批参数
@Data
public class ApproveDTO {
private String taskId; // 任务ID
private Boolean approved; // 是否同意
private String comment; // 审批意见
private Map<String, Object> variables; // 其他变量
}
@Service
public class LeaveService {
@Autowired
private TaskService taskService;
@Transactional
public void completeTask(ApproveDTO approveDTO) {
String taskId = approveDTO.getTaskId();
// 1. 添加审批意见
taskService.addComment(
taskId,
null,
approveDTO.getComment()
);
// 2. 准备变量
Map<String, Object> variables = new HashMap<>();
variables.put("approved", approveDTO.getApproved());
variables.put("approveTime", new Date());
if (approveDTO.getVariables() != null) {
variables.putAll(approveDTO.getVariables());
}
// 3. 完成任务
taskService.complete(taskId, variables);
// 4. 更新业务状态
Task task = taskService.createTaskQuery()
.taskId(taskId)
.singleResult();
String processInstanceId = task.getProcessInstanceId();
LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
leaveRequest.setStatus(approveDTO.getApproved() ? "approved" : "rejected");
leaveRequestMapper.updateById(leaveRequest);
// 5. 发送通知
sendNotification(leaveRequest, approveDTO);
}
}
- 前端调用示例:
// Vue组件示例
export default {
data() {
return {
tasks: [],
currentTask: null,
approveForm: {
approved: null,
comment: '',
}
}
},
methods: {
// 获取待办任务
async loadTasks() {
const res = await axios.get('/api/leave/tasks', {
params: { userId: 'hr001' }
});
this.tasks = res.data.data;
},
// 查看任务详情
async viewTask(taskId) {
const res = await axios.get(`/api/leave/task/${taskId}`);
this.currentTask = res.data.data;
},
// 提交审批
async submitApprove() {
const params = {
taskId: this.currentTask.taskId,
approved: this.approveForm.approved,
comment: this.approveForm.comment,
variables: {
// 可以添加其他流程变量
}
};
try {
await axios.post('/api/leave/task/approve', params);
this.$message.success('审批提交成功');
this.loadTasks(); // 刷新任务列表
} catch (error) {
this.$message.error('审批提交失败');
}
}
},
created() {
this.loadTasks();
}
}
- 前端页面示例:
<template>
<div class="task-approval">
<!-- 任务列表 -->
<a-table :dataSource="tasks">
<a-table-column title="申请人" dataIndex="leaveInfo.applicantName" />
<a-table-column title="请假类型" dataIndex="leaveInfo.leaveType" />
<a-table-column title="请假天数" dataIndex="leaveInfo.days" />
<a-table-column title="操作">
<template slot-scope="text, record">
<a-button @click="viewTask(record.taskId)">
查看详情
</a-button>
</template>
</a-table-column>
</a-table>
<!-- 审批对话框 -->
<a-modal
v-model="visible"
title="请假审批"
@ok="submitApprove"
>
<div class="task-detail">
<!-- 请假详情展示 -->
<div class="leave-info">
<p>申请人:{{currentTask?.leaveInfo.applicantName}}</p>
<p>请假类型:{{currentTask?.leaveInfo.leaveType}}</p>
<p>请假时间:{{currentTask?.leaveInfo.startTime}} 至 {{currentTask?.leaveInfo.endTime}}</p>
<p>请假原因:{{currentTask?.leaveInfo.reason}}</p>
</div>
<!-- 审批表单 -->
<a-form>
<a-form-item label="审批结果">
<a-radio-group v-model="approveForm.approved">
<a-radio :value="true">同意</a-radio>
<a-radio :value="false">拒绝</a-radio>
</a-radio-group>
</a-form-item>
<a-form-item label="审批意见">
<a-textarea
v-model="approveForm.comment"
:rows="4"
placeholder="请输入审批意见"
/>
</a-form-item>
</a-form>
</div>
</a-modal>
</div>
</template>
这样设计的好处是:
- 接口职责清晰
- 数据结构完整
- 操作流程合理
- 异常处理完善
- 支持业务扩展
建议:
- 添加权限验证
- 添加数据验证
- 添加操作日志
- 添加事务处理
- 考虑并发情况
用户:
我看你这里写的最终调用的是completeTask接口,这个内部是怎么判断hr可以审批这个节点的申请这个操作的,如果一个流程中有两个节点都需要同一个人审批,这个会出错么?
ChatGPT:
很好的问题!让我详细解释 Activiti 是如何管理任务分配和权限的:
- 任务分配的判断机制:
@Service
public class LeaveService {
@Autowired
private TaskService taskService;
public void completeTask(ApproveDTO approveDTO) {
String taskId = approveDTO.getTaskId();
String userId = approveDTO.getUserId(); // 当前操作人
// 1. 首先验证任务是否存在且属于该用户
Task task = taskService.createTaskQuery()
.taskId(taskId)
.taskAssignee(userId) // 关键点:检查任务是否分配给该用户
.singleResult();
if (task == null) {
throw new BusinessException("无权处理该任务或任务不存在");
}
// 2. 检查任务是否是当前活动的任务
if (task.isSuspended()) {
throw new BusinessException("该任务已被挂起");
}
// 3. 完成任务
taskService.complete(taskId, approveDTO.getVariables());
}
}
- 对于同一个人在不同节点审批的情况,Activiti 是通过 taskDefinitionKey 来区分的:
// BPMN定义
<userTask id="firstApprove" name="初审" activiti:assignee="${hr}"/>
<userTask id="finalApprove" name="终审" activiti:assignee="${hr}"/>
// 查询特定节点的任务
@Service
public class LeaveService {
public List<Task> getTasksByDefinitionKey(String userId, String taskDefinitionKey) {
return taskService.createTaskQuery()
.taskAssignee(userId)
.taskDefinitionKey(taskDefinitionKey) // 指定节点的key
.list();
}
// 查询任务详情时也可以包含节点信息
public TaskDetailVO getTaskDetail(String taskId) {
Task task = taskService.createTaskQuery()
.taskId(taskId)
.singleResult();
TaskDetailVO detail = new TaskDetailVO();
detail.setTaskId(task.getId());
detail.setTaskName(task.getName());
detail.setTaskDefinitionKey(task.getTaskDefinitionKey()); // 节点标识
// ... 其他信息
return detail;
}
}
- 更安全的任务处理方式:
@Service
public class LeaveService {
@Autowired
private TaskService taskService;
@Transactional
public void completeTask(ApproveDTO approveDTO) {
String taskId = approveDTO.getTaskId();
String userId = approveDTO.getUserId();
// 1. 获取任务详情,包含节点信息
Task task = taskService.createTaskQuery()
.taskId(taskId)
.taskAssignee(userId)
.singleResult();
if (task == null) {
throw new BusinessException("无权处理该任务");
}
// 2. 根据节点类型执行不同的业务逻辑
String definitionKey = task.getTaskDefinitionKey();
switch (definitionKey) {
case "firstApprove":
handleFirstApprove(task, approveDTO);
break;
case "finalApprove":
handleFinalApprove(task, approveDTO);
break;
default:
throw new BusinessException("未知的任务节点");
}
// 3. 完成任务
Map<String, Object> variables = new HashMap<>();
variables.put("approved", approveDTO.getApproved());
variables.put("approver", userId);
variables.put("approveTime", new Date());
variables.put("nodeKey", definitionKey);
taskService.complete(taskId, variables);
}
// 处理初审逻辑
private void handleFirstApprove(Task task, ApproveDTO approveDTO) {
// 初审特定的业务逻辑
if (!approveDTO.getApproved()) {
// 如果拒绝,可能需要特殊处理
handleReject(task, "初审拒绝");
}
}
// 处理终审逻辑
private void handleFinalApprove(Task task, ApproveDTO approveDTO) {
// 终审特定的业务逻辑
if (approveDTO.getApproved()) {
// 审批通过后的处理
handleApproved(task);
}
}
}
- 前端展示时区分不同节点:
export default {
data() {
return {
tasks: [],
nodeTypes: {
firstApprove: '初审',
finalApprove: '终审'
}
}
},
methods: {
async loadTasks() {
const res = await axios.get('/api/leave/tasks', {
params: { userId: 'hr001' }
});
// 处理任务数据,添加节点类型描述
this.tasks = res.data.data.map(task => ({
...task,
nodeTypeName: this.nodeTypes[task.taskDefinitionKey]
}));
},
// 根据节点类型显示不同的表单
getFormByNodeType(nodeType) {
switch(nodeType) {
case 'firstApprove':
return 'first-approve-form';
case 'finalApprove':
return 'final-approve-form';
default:
return 'default-form';
}
}
}
}
- 任务列表展示:
<template>
<div>
<a-table :dataSource="tasks">
<a-table-column title="申请人" dataIndex="leaveInfo.applicantName" />
<a-table-column title="审批节点" dataIndex="nodeTypeName" />
<a-table-column title="创建时间" dataIndex="createTime" />
<a-table-column title="操作">
<template slot-scope="text, record">
<component
:is="getFormByNodeType(record.taskDefinitionKey)"
:task="record"
@approve="handleApprove"
/>
</template>
</a-table-column>
</a-table>
</div>
</template>
- 添加任务锁定机制(防止并发处理):
@Service
public class LeaveService {
@Autowired
private TaskService taskService;
@Transactional
public void completeTask(ApproveDTO approveDTO) {
String taskId = approveDTO.getTaskId();
String userId = approveDTO.getUserId();
// 1. 尝试锁定任务
try {
taskService.claim(taskId, userId);
} catch (ActivitiTaskAlreadyClaimedException e) {
throw new BusinessException("该任务已被其他人处理");
}
try {
// 2. 处理任务
// ... 处理逻辑
// 3. 完成任务
taskService.complete(taskId, variables);
} finally {
// 4. 如果处理失败,解锁任务
if (taskService.createTaskQuery().taskId(taskId).singleResult() != null) {
taskService.unclaim(taskId);
}
}
}
}
关键点总结:
- Activiti 通过 taskDefinitionKey 区分不同节点
- 每个任务都有明确的处理人(assignee)
- 完成任务前会验证处理人权限
- 可以为不同节点设置不同的处理逻辑
- 建议添加任务锁定机制防止并发处理
建议:
- 合理设计流程节点标识
- 做好节点权限控制
- 添加任务处理日志
- 考虑并发处理情况
- 做好异常处理
用户:
你上面提到任务挂起,这个是什么意思。任务有哪几种状态,每种状态标识什么意思?
ChatGPT:
在 Activiti 中,任务和流程实例都有自己的状态管理机制。让我详细解释:
- 任务(Task)的主要状态:
public class TaskStatus {
// 1. 活动状态 (Active)
// 任务刚创建,等待处理
ACTIVE,
// 2. 挂起状态 (Suspended)
// 任务暂时被挂起,不能处理
SUSPENDED,
// 3. 完成状态 (Completed)
// 任务已经处理完成
COMPLETED,
// 4. 删除状态 (Deleted)
// 任务被删除
DELETED
}
- 任务的认领状态:
public class TaskClaimStatus {
// 1. 未认领 (Unclaimed)
// assignee为null
UNCLAIMED,
// 2. 已认领 (Claimed)
// 有具体的assignee
CLAIMED,
// 3. 委派状态 (Delegated)
// 任务被委派给其他人处理
DELEGATED
}
- 流程实例(ProcessInstance)的状态:
public class ProcessInstanceStatus {
// 1. 活动状态 (Active)
// 流程正在执行中
ACTIVE,
// 2. 挂起状态 (Suspended)
// 流程被暂时挂起
SUSPENDED,
// 3. 完成状态 (Completed)
// 流程正常结束
COMPLETED,
// 4. 终止状态 (Terminated)
// 流程被强制终止
TERMINATED
}
- 状态管理示例:
@Service
public class TaskManagementService {
@Autowired
private TaskService taskService;
@Autowired
private RuntimeService runtimeService;
// 挂起任务
public void suspendTask(String taskId) {
taskService.suspendTaskById(taskId);
}
// 激活任务
public void activateTask(String taskId) {
taskService.activateTaskById(taskId);
}
// 认领任务
public void claimTask(String taskId, String userId) {
taskService.claim(taskId, userId);
}
// 取消认领
public void unclaimTask(String taskId) {
taskService.unclaim(taskId);
}
// 委派任务
public void delegateTask(String taskId, String userId) {
taskService.delegateTask(taskId, userId);
}
// 转办任务
public void transferTask(String taskId, String userId) {
taskService.setAssignee(taskId, userId);
}
// 挂起流程实例
public void suspendProcessInstance(String processInstanceId) {
runtimeService.suspendProcessInstanceById(processInstanceId);
}
// 激活流程实例
public void activateProcessInstance(String processInstanceId) {
runtimeService.activateProcessInstanceById(processInstanceId);
}
}
- 状态查询示例:
@Service
public class TaskQueryService {
@Autowired
private TaskService taskService;
// 查询活动的任务
public List<Task> getActiveTasks() {
return taskService.createTaskQuery()
.active()
.list();
}
// 查询挂起的任务
public List<Task> getSuspendedTasks() {
return taskService.createTaskQuery()
.suspended()
.list();
}
// 查询未认领的任务
public List<Task> getUnclaimedTasks() {
return taskService.createTaskQuery()
.taskCandidateUser("userId")
.taskUnassigned()
.list();
}
// 查询某人的待办任务
public List<Task> getUserTasks(String userId) {
return taskService.createTaskQuery()
.taskAssignee(userId)
.active()
.list();
}
}
- 状态变更监听:
@Component
public class TaskStateListener implements TaskListener {
@Override
public void notify(DelegateTask delegateTask) {
String eventName = delegateTask.getEventName();
switch (eventName) {
case EVENTNAME_CREATE:
handleTaskCreated(delegateTask);
break;
case EVENTNAME_ASSIGNMENT:
handleTaskAssigned(delegateTask);
break;
case EVENTNAME_COMPLETE:
handleTaskCompleted(delegateTask);
break;
case EVENTNAME_DELETE:
handleTaskDeleted(delegateTask);
break;
}
}
}
- 业务场景示例:
@Service
public class LeaveProcessService {
// 请假流程挂起(比如员工临时撤回修改)
public void suspendLeaveProcess(String processInstanceId) {
// 1. 挂起流程实例
runtimeService.suspendProcessInstanceById(processInstanceId);
// 2. 更新业务状态
LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
leaveRequest.setStatus("SUSPENDED");
leaveRequestMapper.updateById(leaveRequest);
// 3. 发送通知
sendNotification("流程已挂起");
}
// 销假处理(比如员工提前销假)
public void terminateLeaveProcess(String processInstanceId) {
// 1. 终止流程实例
runtimeService.deleteProcessInstance(processInstanceId, "提前销假");
// 2. 更新业务状态
LeaveRequest leaveRequest = leaveRequestMapper.selectByProcessInstanceId(processInstanceId);
leaveRequest.setStatus("TERMINATED");
leaveRequestMapper.updateById(leaveRequest);
// 3. 发送通知
sendNotification("请假已销假");
}
}
- 前端展示不同状态:
<template>
<a-table :dataSource="tasks">
<a-table-column title="状态">
<template slot-scope="text, record">
<a-tag :color="getStatusColor(record.status)">
{{ getStatusText(record.status) }}
</a-tag>
</template>
</a-table-column>
<a-table-column title="操作">
<template slot-scope="text, record">
<!-- 根据不同状态显示不同操作按钮 -->
<a-button v-if="record.status === 'ACTIVE'"
@click="handleClaim(record)">
认领
</a-button>
<a-button v-if="record.status === 'CLAIMED'"
@click="handleComplete(record)">
完成
</a-button>
<a-button v-if="record.status === 'ACTIVE'"
@click="handleSuspend(record)">
挂起
</a-button>
</template>
</a-table-column>
</a-table>
</template>
状态管理的注意事项:
- 状态变更要保证原子性
- 注意状态变更的权限控制
- 保持业务状态与流程状态的一致性
- 记录状态变更历史
- 考虑状态变更的通知机制
建议:
- 合理使用状态
- 做好状态转换的验证
- 添加状态变更日志
- 考虑并发情况
- 做好异常处理
用户:
go语言如果要实现这种流程审批有什么办法么
ChatGPT:
在 Go 语言中实现工作流审批,有几种主要的方案:
- 使用 Temporal 工作流引擎(推荐):
// 定义工作流
func LeaveApprovalWorkflow(ctx workflow.Context, request LeaveRequest) error {
// 工作流选项
ao := workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Minute,
BackoffCoefficient: 2.0,
MaximumInterval: time.Hour,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, ao)
// 1. 提交申请
var submitResult SubmitResult
err := workflow.ExecuteActivity(ctx, SubmitLeaveRequest, request).Get(ctx, &submitResult)
if err != nil {
return err
}
// 2. 主管审批
var leaderApproval ApprovalResult
err = workflow.ExecuteActivity(ctx, LeaderApprove, submitResult.RequestID).Get(ctx, &leaderApproval)
if err != nil {
return err
}
// 3. 如果主管拒绝,直接结束
if !leaderApproval.Approved {
return workflow.ExecuteActivity(ctx, NotifyRejection, request).Get(ctx, nil)
}
// 4. HR审批
var hrApproval ApprovalResult
err = workflow.ExecuteActivity(ctx, HRApprove, submitResult.RequestID).Get(ctx, &hrApproval)
if err != nil {
return err
}
// 5. 完成审批
return workflow.ExecuteActivity(ctx, FinalizeApproval, request, hrApproval).Get(ctx, nil)
}
// 活动实现
type Activities struct {
leaveService *service.LeaveService
}
func (a *Activities) SubmitLeaveRequest(ctx context.Context, request LeaveRequest) (*SubmitResult, error) {
// 实现提交逻辑
return a.leaveService.Submit(request)
}
func (a *Activities) LeaderApprove(ctx context.Context, requestID string) (*ApprovalResult, error) {
// 实现主管审批逻辑
return a.leaveService.LeaderApprove(requestID)
}
- 使用自定义状态机:
// 定义状态
type Status string
const (
StatusDraft Status = "draft"
StatusSubmitted Status = "submitted"
StatusApproving Status = "approving"
StatusApproved Status = "approved"
StatusRejected Status = "rejected"
)
// 定义流程节点
type Node struct {
ID string
Name string
Type string
Handlers []string
Next []string
}
// 流程定义
type WorkflowDefinition struct {
ID string
Nodes map[string]*Node
}
// 流程实例
type WorkflowInstance struct {
ID string
DefinitionID string
BusinessID string
CurrentNode string
Status Status
Variables map[string]interface{}
CreateTime time.Time
UpdateTime time.Time
}
// 工作流引擎
type WorkflowEngine struct {
db *gorm.DB
}
// 启动流程
func (e *WorkflowEngine) StartWorkflow(ctx context.Context, definitionID string, businessID string, variables map[string]interface{}) (*WorkflowInstance, error) {
instance := &WorkflowInstance{
ID: uuid.New().String(),
DefinitionID: definitionID,
BusinessID: businessID,
Status: StatusSubmitted,
Variables: variables,
CreateTime: time.Now(),
}
if err := e.db.Create(instance).Error; err != nil {
return nil, err
}
return instance, nil
}
// 完成任务
func (e *WorkflowEngine) CompleteTask(ctx context.Context, taskID string, variables map[string]interface{}) error {
return e.db.Transaction(func(tx *gorm.DB) error {
// 1. 获取任务
task := &Task{}
if err := tx.First(task, taskID).Error; err != nil {
return err
}
// 2. 更新任务状态
task.Status = StatusCompleted
if err := tx.Save(task).Error; err != nil {
return err
}
// 3. 创建下一个节点的任务
nextTasks, err := e.createNextTasks(ctx, tx, task)
if err != nil {
return err
}
return nil
})
}
- 使用消息队列实现:
// 定义消息
type ApprovalMessage struct {
RequestID string `json:"request_id"`
NodeID string `json:"node_id"`
ApproverID string `json:"approver_id"`
Action string `json:"action"`
Comment string `json:"comment"`
Timestamp time.Time `json:"timestamp"`
}
// 消息处理器
type ApprovalHandler struct {
db *gorm.DB
queue *rabbitmq.Channel
service *LeaveService
}
// 处理审批消息
func (h *ApprovalHandler) HandleApproval(msg ApprovalMessage) error {
return h.db.Transaction(func(tx *gorm.DB) error {
// 1. 获取请假申请
request := &LeaveRequest{}
if err := tx.First(request, msg.RequestID).Error; err != nil {
return err
}
// 2. 更新审批状态
if err := h.service.UpdateApprovalStatus(tx, request, msg); err != nil {
return err
}
// 3. 发送通知
if err := h.service.SendNotification(request, msg); err != nil {
return err
}
return nil
})
}
- 数据库设计:
-- 流程定义表
CREATE TABLE workflow_definitions (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
version INT NOT NULL,
content JSON NOT NULL,
status VARCHAR(20) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
-- 流程实例表
CREATE TABLE workflow_instances (
id VARCHAR(36) PRIMARY KEY,
definition_id VARCHAR(36) NOT NULL,
business_id VARCHAR(36) NOT NULL,
current_node VARCHAR(36) NOT NULL,
status VARCHAR(20) NOT NULL,
variables JSON,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
-- 任务表
CREATE TABLE tasks (
id VARCHAR(36) PRIMARY KEY,
instance_id VARCHAR(36) NOT NULL,
node_id VARCHAR(36) NOT NULL,
assignee VARCHAR(36),
status VARCHAR(20) NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL
);
- API 实现:
// 路由定义
func SetupRoutes(r *gin.Engine, h *Handler) {
r.POST("/workflows/leave/start", h.StartLeaveWorkflow)
r.POST("/workflows/tasks/:taskId/complete", h.CompleteTask)
r.GET("/workflows/tasks", h.GetUserTasks)
r.GET("/workflows/instances/:instanceId", h.GetWorkflowInstance)
}
// 处理器
type Handler struct {
engine *WorkflowEngine
service *LeaveService
}
// 启动请假流程
func (h *Handler) StartLeaveWorkflow(c *gin.Context) {
var req LeaveRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
}
instance, err := h.engine.StartWorkflow(c, "leave_approval", req.ID, map[string]interface{}{
"userId": req.UserID,
"days": req.Days,
"reason": req.Reason,
})
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, instance)
}
// 完成任务
func (h *Handler) CompleteTask(c *gin.Context) {
taskID := c.Param("taskId")
var req CompleteTaskRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
}
err := h.engine.CompleteTask(c, taskID, req.Variables)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, gin.H{"message": "success"})
}
- 服务实现:
type LeaveService struct {
db *gorm.DB
notifier *Notifier
}
// 提交请假申请
func (s *LeaveService) SubmitLeave(ctx context.Context, req LeaveRequest) error {
return s.db.Transaction(func(tx *gorm.DB) error {
// 1. 保存请假记录
leave := &Leave{
ID: uuid.New().String(),
UserID: req.UserID,
Days: req.Days,
Reason: req.Reason,
Status: StatusSubmitted,
CreatedAt: time.Now(),
}
if err := tx.Create(leave).Error; err != nil {
return err
}
// 2. 启动工作流
if err := s.workflowEngine.StartWorkflow(ctx, "leave_approval", leave.ID, nil); err != nil {
return err
}
return nil
})
}
// 审批
func (s *LeaveService) Approve(ctx context.Context, taskID string, approved bool, comment string) error {
return s.db.Transaction(func(tx *gorm.DB) error {
// 1. 完成任务
if err := s.workflowEngine.CompleteTask(ctx, taskID, map[string]interface{}{
"approved": approved,
"comment": comment,
}); err != nil {
return err
}
// 2. 更新请假状态
// ...
// 3. 发送通知
// ...
return nil
})
}
建议:
- 选择合适的工作流引擎
- 做好事务处理
- 实现审计日志
- 添加超时处理
- 考虑并发情况
- 实现错误重试
- 添加监控指标
这些实现方案各有优势:
- Temporal:功能强大,支持复杂流程
- 状态机:简单直观,适合简单流程
- 消息队列:解耦性好,适合分布式系统
选择哪种方案取决于你的具体需求。
用户:
这个支持BPMN 模型么?
ChatGPT:
Go 语言生态中支持 BPMN 模型的工作流引擎相对较少,但有几个选择:
- Zeebe (现在是 Camunda Platform 8 的一部分):
// 使用 Zeebe 客户端
import (
"github.com/camunda/zeebe/clients/go/v8/pkg/pb"
"github.com/camunda/zeebe/clients/go/v8/pkg/zbc"
)
// 创建客户端
client, err := zbc.NewClient(&zbc.ClientConfig{
GatewayAddress: "localhost:26500",
})
// 部署 BPMN
deployment, err := client.NewDeployCommand().
AddResourceFile("leave-process.bpmn").
Send(context)
// 启动流程实例
result, err := client.NewCreateInstanceCommand().
BPMNProcessId("leave-process").
LatestVersion().
Variables(map[string]interface{}{
"userId": "123",
"days": 3,
"reason": "vacation",
}).
Send(context)
- Flowable (有非官方的 Go 客户端):
// 使用 REST API 调用 Flowable
type FlowableClient struct {
baseURL string
httpClient *http.Client
}
// 部署 BPMN
func (c *FlowableClient) DeployBPMN(filename string, content []byte) error {
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
part, err := writer.CreateFormFile("file", filename)
if err != nil {
return err
}
_, err = part.Write(content)
if err != nil {
return err
}
err = writer.Close()
if err != nil {
return err
}
req, err := http.NewRequest("POST", c.baseURL+"/repository/deployments", body)
// ... 处理请求
}
// 启动流程实例
func (c *FlowableClient) StartProcess(processKey string, variables map[string]interface{}) error {
payload := map[string]interface{}{
"processDefinitionKey": processKey,
"variables": variables,
}
jsonData, err := json.Marshal(payload)
if err != nil {
return err
}
req, err := http.NewRequest("POST", c.baseURL+"/runtime/process-instances", bytes.NewBuffer(jsonData))
// ... 处理请求
}
- 自己实现 BPMN 解析和执行引擎:
// BPMN 模型结构
type BPMNProcess struct {
ID string `xml:"id,attr"`
Name string `xml:"name,attr"`
Elements []Element `xml:",any"`
}
type Element interface {
GetID() string
GetType() string
Execute(ctx Context) error
}
// BPMN 解析器
type BPMNParser struct {
// ...
}
func (p *BPMNParser) ParseBPMN(content []byte) (*BPMNProcess, error) {
var process BPMNProcess
err := xml.Unmarshal(content, &process)
if err != nil {
return nil, err
}
return &process, nil
}
// BPMN 执行引擎
type BPMNEngine struct {
db *gorm.DB
definitions map[string]*BPMNProcess
}
// 注册 BPMN 流程定义
func (e *BPMNEngine) DeployBPMN(process *BPMNProcess) error {
e.definitions[process.ID] = process
return nil
}
// 启动流程实例
func (e *BPMNEngine) StartProcess(processID string, variables map[string]interface{}) (*ProcessInstance, error) {
process, exists := e.definitions[processID]
if !exists {
return nil, errors.New("process not found")
}
instance := &ProcessInstance{
ID: uuid.New().String(),
ProcessID: processID,
Status: "ACTIVE",
Variables: variables,
}
if err := e.db.Create(instance).Error; err != nil {
return nil, err
}
// 开始执行流程
ctx := NewContext(instance, e)
go e.executeProcess(ctx, process)
return instance, nil
}
- 完整的 BPMN 实现示例:
// BPMN 元素定义
type UserTask struct {
ID string `xml:"id,attr"`
Name string `xml:"name,attr"`
Assignee string `xml:"activiti:assignee,attr"`
Candidates []string
}
type SequenceFlow struct {
ID string `xml:"id,attr"`
SourceRef string `xml:"sourceRef,attr"`
TargetRef string `xml:"targetRef,attr"`
Condition string `xml:"conditionExpression"`
}
// BPMN 执行上下文
type Context struct {
Instance *ProcessInstance
Variables map[string]interface{}
Engine *BPMNEngine
}
// 用户任务处理
func (t *UserTask) Execute(ctx Context) error {
// 创建任务
task := &Task{
ID: uuid.New().String(),
InstanceID: ctx.Instance.ID,
NodeID: t.ID,
Name: t.Name,
Assignee: t.Assignee,
Status: "ACTIVE",
}
if err := ctx.Engine.db.Create(task).Error; err != nil {
return err
}
// 等待任务完成
return ctx.Engine.waitForTask(task.ID)
}
// 网关处理
type ExclusiveGateway struct {
ID string `xml:"id,attr"`
OutgoingFlows []SequenceFlow
}
func (g *ExclusiveGateway) Execute(ctx Context) error {
// 评估条件
for _, flow := range g.OutgoingFlows {
if evaluateCondition(flow.Condition, ctx.Variables) {
return ctx.Engine.executeElement(flow.TargetRef, ctx)
}
}
return errors.New("no valid path found")
}
- API 接口:
type WorkflowAPI struct {
engine *BPMNEngine
}
// 部署 BPMN
func (api *WorkflowAPI) DeployBPMN(c *gin.Context) {
file, err := c.FormFile("file")
if err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
}
content, err := file.Open()
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
defer content.Close()
process, err := api.engine.parser.ParseBPMN(content)
if err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
}
err = api.engine.DeployBPMN(process)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, gin.H{"message": "deployed successfully"})
}
// 启动流程
func (api *WorkflowAPI) StartProcess(c *gin.Context) {
var req StartProcessRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
}
instance, err := api.engine.StartProcess(req.ProcessID, req.Variables)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, instance)
}
建议:
- 如果需要完整的 BPMN 支持,建议使用 Zeebe
- 如果是简单流程,可以考虑自己实现
- 需要注意:
- BPMN 规范复杂
- 需要处理并发
- 需要考虑性能
- 需要实现持久化
- 需要支持版本控制
选择建议:
- 企业级应用:使用 Zeebe
- 简单应用:自己实现简化版
- 中等复杂度:考虑使用 Flowable + REST API
用户:
我看这些go的案例都需要自己新建表,比如WorkflowEngine,前面说到Activiti,大概有25~30个表,但是WorkflowEngine案例里面的表却没那么多,这个是因为WorkflowEngine不如Activiti完善么?WorkflowEngine如何定义复杂的审批流?
ChatGPT:
是的,你说得对。自定义的 WorkflowEngine 确实不如 Activiti 完善,主要体现在以下几个方面:
- Activiti 的表结构设计更完整:
-- Activiti 的表主要分类:
-- 1. 流程定义表(ACT_RE_*)
-- 2. 运行时表(ACT_RU_*)
-- 3. 历史表(ACT_HI_*)
-- 4. 身份表(ACT_ID_*)
-- 5. 通用数据表(ACT_GE_*)
-- 而简单的 WorkflowEngine 通常只有基础表:
CREATE TABLE workflow_definitions (
id VARCHAR(36) PRIMARY KEY,
name VARCHAR(100),
version INT,
content TEXT
);
CREATE TABLE workflow_instances (
id VARCHAR(36) PRIMARY KEY,
definition_id VARCHAR(36),
status VARCHAR(20)
);
CREATE TABLE workflow_tasks (
id VARCHAR(36) PRIMARY KEY,
instance_id VARCHAR(36),
node_id VARCHAR(36),
assignee VARCHAR(36)
);
- Activiti 支持更复杂的 BPMN 元素:
<!-- Activiti 支持的复杂流程示例 -->
<definitions>
<process id="complexProcess">
<!-- 1. 多实例任务 -->
<multiInstanceLoopCharacteristics>
<loopCardinality>3</loopCardinality>
</multiInstanceLoopCharacteristics>
<!-- 2. 子流程 -->
<subProcess id="subProcess">
<!-- 子流程内容 -->
</subProcess>
<!-- 3. 调用活动 -->
<callActivity id="callSubProcess"
calledElement="otherProcess">
</callActivity>
<!-- 4. 事件子流程 -->
<subProcess id="eventSubProcess" triggeredByEvent="true">
<startEvent id="errorStart">
<errorEventDefinition errorRef="someError"/>
</startEvent>
</subProcess>
<!-- 5. 复杂网关 -->
<complexGateway id="complexDecision">
<activationCondition>${execution.getVariable('complete')}</activationCondition>
</complexGateway>
</process>
</definitions>
<!-- 而简单的 WorkflowEngine 通常只支持基础元素 -->
<definitions>
<process id="simpleProcess">
<startEvent id="start"/>
<userTask id="task1"/>
<exclusiveGateway id="gateway"/>
<endEvent id="end"/>
</process>
</definitions>
- 要在 Go 中实现类似 Activiti 的复杂功能,需要大量额外代码:
// 1. 多实例任务支持
type MultiInstanceTask struct {
BaseTask
Instances int
CompletedCount int
Collection []interface{}
ElementVariable string
}
func (t *MultiInstanceTask) Execute(ctx *ExecutionContext) error {
// 创建多个任务实例
for i := 0; i < t.Instances; i++ {
task := &UserTask{
ID: fmt.Sprintf("%s_%d", t.ID, i),
Name: t.Name,
Assignee: t.getAssignee(i),
}
// 执行每个实例
if err := task.Execute(ctx); err != nil {
return err
}
}
return nil
}
// 2. 子流程支持
type SubProcess struct {
BaseElement
Elements []FlowElement
Async bool
}
func (s *SubProcess) Execute(ctx *ExecutionContext) error {
// 创建子流程上下文
subCtx := NewExecutionContext(ctx.Engine)
subCtx.Variables = ctx.Variables.Copy()
// 执行子流程
for _, element := range s.Elements {
if err := element.Execute(subCtx); err != nil {
return err
}
}
return nil
}
// 3. 复杂网关支持
type ComplexGateway struct {
BaseGateway
ActivationCondition string
OutgoingFlows []*SequenceFlow
}
func (g *ComplexGateway) Execute(ctx *ExecutionContext) error {
// 评估激活条件
if !evaluateCondition(g.ActivationCondition, ctx.Variables) {
return nil
}
// 评估所有出口流
var validFlows []*SequenceFlow
for _, flow := range g.OutgoingFlows {
if evaluateCondition(flow.Condition, ctx.Variables) {
validFlows = append(validFlows, flow)
}
}
// 执行有效的流程路径
for _, flow := range validFlows {
if err := ctx.Engine.executeFlow(flow, ctx); err != nil {
return err
}
}
return nil
}
// 4. 事件处理支持
type EventDefinition interface {
HandleEvent(ctx *ExecutionContext) error
}
type TimerEventDefinition struct {
TimeDate string
TimeDuration string
TimeCycle string
}
func (t *TimerEventDefinition) HandleEvent(ctx *ExecutionContext) error {
// 处理定时器事件
switch {
case t.TimeDate != "":
return t.handleTimeDate(ctx)
case t.TimeDuration != "":
return t.handleTimeDuration(ctx)
case t.TimeCycle != "":
return t.handleTimeCycle(ctx)
}
return nil
}
// 5. 表达式引擎支持
type ExpressionEngine interface {
Evaluate(expression string, variables map[string]interface{}) (interface{}, error)
}
type GovaluateExpressionEngine struct {
// 使用 Govaluate 库实现表达式计算
}
func (e *GovaluateExpressionEngine) Evaluate(expression string, variables map[string]interface{}) (interface{}, error) {
expr, err := govaluate.NewEvaluableExpression(expression)
if err != nil {
return nil, err
}
return expr.Evaluate(variables)
}
- 需要添加更多的数据表来支持复杂功能:
-- 多实例任务表
CREATE TABLE workflow_multi_instance_tasks (
id VARCHAR(36) PRIMARY KEY,
task_id VARCHAR(36),
parent_task_id VARCHAR(36),
sequence_number INT,
total_instances INT,
completed_count INT
);
-- 变量表
CREATE TABLE workflow_variables (
id VARCHAR(36) PRIMARY KEY,
instance_id VARCHAR(36),
scope_id VARCHAR(36),
name VARCHAR(100),
type VARCHAR(50),
value TEXT
);
-- 事件表
CREATE TABLE workflow_events (
id VARCHAR(36) PRIMARY KEY,
instance_id VARCHAR(36),
event_type VARCHAR(50),
element_id VARCHAR(36),
payload JSON,
created_at TIMESTAMP
);
-- 定时器表
CREATE TABLE workflow_timers (
id VARCHAR(36) PRIMARY KEY,
instance_id VARCHAR(36),
element_id VARCHAR(36),
type VARCHAR(20),
due_date TIMESTAMP,
repeat_count INT,
handler VARCHAR(100)
);
-- 历史记录表
CREATE TABLE workflow_history (
id VARCHAR(36) PRIMARY KEY,
instance_id VARCHAR(36),
element_id VARCHAR(36),
element_type VARCHAR(50),
start_time TIMESTAMP,
end_time TIMESTAMP,
duration BIGINT,
status VARCHAR(20)
);
- 完整的流程引擎还需要:
- 版本控制系统
- 流程迁移功能
- 流程监控功能
- 流程分析功能
- 任务委派功能
- 流程回滚功能
- 并行处理能力
- 异常处理机制
- 事务管理
- 权限控制
- 审计日志
所以,如果你需要实现复杂的工作流程:
- 建议使用成熟的工作流引擎(如 Zeebe)
- 或者基于现有工作流引擎进行扩展
- 自己实现的话需要投入大量开发资源
如果只是简单的审批流程:
- 可以使用简化版的工作流引擎
- 根据实际需求逐步扩展功能
- 关注核心业务流程的实现
用户:
我们没有那么多资源开发的,我们如果希望最小成本的实现支持复杂的审批流的功能,用go语言你推荐用哪一个?
ChatGPT:
对于 Go 语言,我强烈推荐使用 Temporal,原因如下:
- 优势:
- 完整的工作流引擎
- 官方提供 Go SDK
- 部署维护相对简单
- 社区活跃,文档完善
- 支持复杂流程
- 可靠性高
- 自带 UI 界面
- 基本使用示例:
// 1. 定义工作流
func LeaveApprovalWorkflow(ctx workflow.Context, request LeaveRequest) error {
// 设置重试策略
options := workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Minute,
MaximumInterval: time.Hour,
MaximumAttempts: 3,
},
}
ctx = workflow.WithActivityOptions(ctx, options)
// 定义流程步骤
var result ApprovalResult
// 1. 部门主管审批
err := workflow.ExecuteActivity(ctx, DepartmentApproval, request).Get(ctx, &result)
if err != nil {
return err
}
if !result.Approved {
return workflow.ExecuteActivity(ctx, NotifyRejection, request).Get(ctx, nil)
}
// 2. 判断是否需要总监审批
if request.Days > 5 {
err = workflow.ExecuteActivity(ctx, DirectorApproval, request).Get(ctx, &result)
if err != nil {
return err
}
if !result.Approved {
return workflow.ExecuteActivity(ctx, NotifyRejection, request).Get(ctx, nil)
}
}
// 3. HR审批
err = workflow.ExecuteActivity(ctx, HRApproval, request).Get(ctx, &result)
if err != nil {
return err
}
// 4. 完成审批
return workflow.ExecuteActivity(ctx, FinalizeApproval, request, result).Get(ctx, nil)
}
// 2. 定义活动
type Activities struct {
leaveService *LeaveService
}
func (a *Activities) DepartmentApproval(ctx context.Context, request LeaveRequest) (ApprovalResult, error) {
// 实现部门审批逻辑
return a.leaveService.ProcessDepartmentApproval(request)
}
func (a *Activities) HRApproval(ctx context.Context, request LeaveRequest) (ApprovalResult, error) {
// 实现HR审批逻辑
return a.leaveService.ProcessHRApproval(request)
}
// 3. 启动工作流
func StartLeaveWorkflow(c *gin.Context) {
client := c.MustGet("temporal").(*client.Client)
var request LeaveRequest
if err := c.ShouldBindJSON(&request); err != nil {
c.JSON(400, gin.H{"error": err.Error()})
return
}
workflowOptions := client.StartWorkflowOptions{
ID: "leave_" + uuid.New().String(),
TaskQueue: "leave-approval",
}
we, err := client.ExecuteWorkflow(context.Background(), workflowOptions, LeaveApprovalWorkflow, request)
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, gin.H{"workflowId": we.GetID()})
}
- 复杂流程示例:
// 并行审批流程
func ParallelApprovalWorkflow(ctx workflow.Context, request LeaveRequest) error {
ctx = workflow.WithActivityOptions(ctx, options)
// 创建通道接收审批结果
future1 := workflow.ExecuteActivity(ctx, DepartmentApproval, request)
future2 := workflow.ExecuteActivity(ctx, HRApproval, request)
// 等待所有审批完成
var result1, result2 ApprovalResult
err1 := future1.Get(ctx, &result1)
err2 := future2.Get(ctx, &result2)
if err1 != nil || err2 != nil {
return errors.New("approval failed")
}
// 所有审批都通过才算通过
if !result1.Approved || !result2.Approved {
return workflow.ExecuteActivity(ctx, NotifyRejection, request).Get(ctx, nil)
}
return workflow.ExecuteActivity(ctx, FinalizeApproval, request).Get(ctx, nil)
}
// 带超时的审批流程
func TimeoutApprovalWorkflow(ctx workflow.Context, request LeaveRequest) error {
// 设置超时选项
options := workflow.ActivityOptions{
StartToCloseTimeout: 24 * time.Hour,
HeartbeatTimeout: time.Hour,
}
ctx = workflow.WithActivityOptions(ctx, options)
// 创建子工作流处理超时
childWorkflow := workflow.ChildWorkflowOptions{
WorkflowID: "timeout_" + request.ID,
WorkflowRunTimeout: 3 * 24 * time.Hour,
}
ctx = workflow.WithChildOptions(ctx, childWorkflow)
var result ApprovalResult
err := workflow.ExecuteActivity(ctx, DepartmentApproval, request).Get(ctx, &result)
if err != nil {
if temporal.IsTimeoutError(err) {
return workflow.ExecuteActivity(ctx, HandleTimeout, request).Get(ctx, nil)
}
return err
}
return nil
}
- 查询任务:
func GetUserTasks(c *gin.Context) {
client := c.MustGet("temporal").(*client.Client)
userId := c.Query("userId")
// 查询该用户的所有待办任务
resp, err := client.ListWorkflow(context.Background(), &workflowservice.ListWorkflowExecutionsRequest{
Query: fmt.Sprintf("WorkflowType = 'LeaveApprovalWorkflow' AND currentApprover = '%s'", userId),
})
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
c.JSON(200, resp.Executions)
}
- 配置示例:
func main() {
// 创建 Temporal 客户端
c, err := client.NewClient(client.Options{
HostPort: "localhost:7233",
})
if err != nil {
log.Fatalln("Unable to create client", err)
}
defer c.Close()
// 注册工作流和活动
w := worker.New(c, "leave-approval", worker.Options{})
w.RegisterWorkflow(LeaveApprovalWorkflow)
w.RegisterActivity(&Activities{leaveService: NewLeaveService()})
// 启动 worker
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("Unable to start worker", err)
}
}
使用 Temporal 的好处:
-
开箱即用的功能:
- 工作流版本控制
- 重试机制
- 超时处理
- 并行处理
- 子工作流
- 活动监控
-
运维友好:
- Web UI 监控
- 详细的执行历史
- 方便的调试工具
- 完善的监控指标
-
可靠性保证:
- 状态持久化
- 故障恢复
- 事务一致性
- 幂等性保证
建议:
- 先运行官方示例熟悉基本概念
- 从简单流程开始,逐步添加复杂性
- 利用内置的重试和超时机制
- 做好监控和日志
- 考虑使用 SDK 的高级特性