在官方文档 Best Practices 提到:In cases where the output data of your task is used within subsequent tasks in your workflow but is substantially large (> 100KB), consider uploading this data to an object store (S3 or similar) and set the location to the object in your task output. The subsequent tasks can then download this data from the given location and use it during execution. 在实现这一功能可以使用 External Payload Storage
1. 概述
conductor 可以配置 workflow 和 task 的输入输出内容到其它地方如(S3)以减轻 Conductor 的存储压力
2.ExternalPayloadStorage
官方提供了基于 AWS S3 的ExternalPayloadStorage实现S3PayloadStorage
。我们还可以继承该接口实现其它存储如腾讯云的COS。
ExternalPayloadStorage有三个关键方法
- getLocation
获取一个链接(uri)该链接提供给 client 端用于上传或下载输入、输出内容 - upload
上传输入、输出数据到外部存储 - download
下载外部存储的输入、输出数据
2.服务端
下载数据(outputdata)
TaskResource#getExternalStorageLocation
提供了一个http接口用于客户端获取链接
ExternalPayloadStorageUtils#downloadPayload
下载外部存储数据
ExecutionDAOFacade#populateWorkflowAndTaskPayloadData
从这个方法可以看到只要存在 PayloadStoragePath
时就会下载数据。最后workflowModel.getTasks().forEach(this::populateTaskData);
会把所有task的数据也下载
上传数据(inputdata)
ExternalPayloadStorageUtils#verifyAndUpload
上传外部存储数据。从该方法可以看到,conductor 会进行inputdata的大小判断,超过设置的值后,则会上传数据 到外部存储,并设置PayloadStoragePath
3.客户端
上传数据(outputdata)
TaskClient#evaluateAndUploadLargePayload
该方法判断输出数据是否达到设置的值且开启了isExternalPayloadStorageEnabled。通过后执行上传
TaskClient#uploadToExternalPayloadStorage
上传数据。首先向服务端获取上传uri(GET tasks/externalStorageLocation)然后把数据通过 PUT 方法上传到 uri 中
TaskPollExecutor#updateTaskResult
判断是否存在 optionalExternalStorageLocation ,如果存则清空outputdata并设置externalOutputPayloadStoragePath
下载数据(inputdata)
TaskClient#populateTaskPayloads
从这个方法可以看到只要存在 PayloadStoragePath
时就会下载数据
ClientBase#downloadFromExternalStorage
首先向服务端获取下载 uri(GET tasks/externalStorageLocation)然后把通过 GET 方法下载 uri 中的数据
4. 注意
需要注意的是,spring 包中是直接new TaskClient()
ConductorClientAutoConfiguration
@ConditionalOnMissingBean
@Bean
public TaskClient taskClient(ClientProperties clientProperties) {
TaskClient taskClient = new TaskClient();
taskClient.setRootURI(clientProperties.getRootUri());
return taskClient;
}
而 TaskClient()
直接 new DefaultConductorClientConfiguration()
因此,上限值都是固定值不能通过配置参数改变
public TaskClient() {
this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null);
}