PipelineStoreResource.getPipeline(pid)
PipelineStoreResource.savePipeline()
PipelineStoreResource.saveUiInfo()
PipelineStoreResource.getPipeline(pid)
# 向Spark-Executor上的一个Slave实例SDC发送 getPipeline(pid)请求,响应流程:
PipelineStoreResource.getPipelineInfo(String name){
// 1. 先从磁盘加载其 pipelineId/info.json
PipelineInfo pipelineInfo = store[PipelineStoreTask].getInfo(name){//SlavePipelineStoreTask.getInfo(String name)
return pipelineStore[PipelineStoreTask].getInfo(name){//FilePipelineStoreTask.getInfo(String name)
return getInfo(name, false){//FilePipelineStoreTask.getInfo(String name, boolean checkExistence)
synchronized (lockCache.getLock(name)) {
if (checkExistence && !hasPipeline(name)) {
throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
}
// 获取该SlaveSDC对应的Data目录,一般是在该容器所在机器本地的Container目录: nm-local-dir 下 containerId位置;
Path filePath = getInfoFile(name){
return getPipelineDir(name).resolve(INFO_FILE);{
getPipelineDir(name){
return storeDir[UnixPath].resolve(PipelineUtils.escapedPipelineName(name));{
// storeDir = /home/app/appdata/hadoop/dataDir/tmpDir/nm-local-dir/usercache/app/appcache/application_1585661170516_0006/container_1585661170516_0006_01_000002/data/pipelines
}
}
.resolve(INFO_FILE);
}
}
try (InputStream infoFile = Files.newInputStream(filePath)) {
PipelineInfoJson pipelineInfoJsonBean = json.readValue(infoFile, PipelineInfoJson.class);
return pipelineInfoJsonBean.getPipelineInfo();
} catch (Exception ex) {
throw new PipelineStoreException(ContainerError.CONTAINER_0206, name, ex);
}
}
}
}
}
String title = name;
//
switch (get) {
case "pipeline":
PipelineConfiguration pipeline = store.load(name, rev);{//SlavePipelineStoreTask.load(String name, String tagOrRev)
return pipelineStore.load(name, tagOrRev);{//FilePipelineStoreTask.load(String name, String tagOrRev)
synchronized (lockCache.getLock(name)) {
if (!hasPipeline(name)) {
throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
}
// getPipelineFile()是读取pipeline.json, 而getInfo()->getInfoFile()是读取 info.json文件;
/*
* info.json : 464B 字节; createed, lastModified, uuid, sdcId, metadate信息;
* pipeline.json: 15.2 KB: 其info字段及为 info.json的内容, 此外还包括几个大字段:
- configuration: Pipeline相关配置: exeMode, deliveryGuraantee等;
- stages: 各Stages的配置情况;
- startEventStages + stopEventStages;
- issues;
*/
Path pipelineFile = getPipelineFile(name);
try (InputStream pipelineFile = Files.newInputStream(getPipelineFile(name))) {
PipelineInfo info = getInfo(name);
//
PipelineConfigurationJson pipelineConfigBean=json.readValue(pipelineFile, PipelineConfigurationJson.class);
PipelineConfiguration pipeline = pipelineConfigBean.getPipelineConfiguration();
pipeline.setPipelineInfo(info);
Map<String, Map> uiInfo;
if (Files.exists(getPipelineUiInfoFile(name))) {
try (InputStream uiInfoFile = Files.newInputStream(getPipelineUiInfoFile(name))) {
uiInfo = json.readValue(uiInfoFile, Map.class);
pipeline = injectUiInfo(uiInfo, pipeline);
}
}
return pipeline;
}
catch (Exception ex) {
throw new PipelineStoreException(ContainerError.CONTAINER_0206, name, ex.toString(), ex);
}
}
}
}
PipelineConfigurationValidator validator = new PipelineConfigurationValidator(stageLibrary, name, pipeline);
// 对于从本地加载到的Pipeline.json, 还要validate()校验一下;否则可能显示不了;
pipeline = validator.validate();
data = BeanHelper.wrapPipelineConfiguration(pipeline);// 用PipelineConfigurationJson(pipelineConfiguration)对象包装输出;
title = pipeline.getTitle() != null ? pipeline.getTitle() : pipeline.getInfo().getPipelineId();
break;
case "info":
data = BeanHelper.wrapPipelineInfo(store.getInfo(name));
break;
case "history":// 返回的 PipelineRevInfo 对象仅简单封装了 date,user,rev字段; 可能以后版本再实现pipelineConfigHistory?
data = BeanHelper.wrapPipelineRevInfo(store.getHistory(name));
break;
default:
throw new IllegalArgumentException(Utils.format("Invalid value for parameter 'get': {}", get));
}
if (attachment) {
Map<String, Object> envelope = new HashMap<String, Object>();
envelope.put("pipelineConfig", data);
RuleDefinitions ruleDefinitions = store.retrieveRules(name, rev);
envelope.put("pipelineRules", BeanHelper.wrapRuleDefinitions(ruleDefinitions));
return Response.ok().
header("Content-Disposition", "attachment; filename=\"" + title + ".json\"").
type(MediaType.APPLICATION_JSON).entity(envelope).build();
} else {
return Response.ok().type(MediaType.APPLICATION_JSON).entity(data).build();
}
}
报错PipelineConfig,并更新4个文件
# 当Stopped, StoppedError, Edit等非Running状态下时:拖拽算子触发 savePipeline(),更新pipeline.json/info.json/uiinfo.json, pipelineState.json4 个文件
PipelineStoreResource.savePipeline(String name,String rev,String description,PipelineConfigurationJson pipeline){
if (store.isRemotePipeline(name, rev)) {
throw new PipelineException(ContainerError.CONTAINER_01101, "SAVE_PIPELINE", name);
}
PipelineInfo pipelineInfo = store.getInfo(name);
RestAPIUtils.injectPipelineInMDC(pipelineInfo.getTitle(), pipelineInfo.getPipelineId());
PipelineConfiguration pipelineConfig = BeanHelper.unwrapPipelineConfiguration(pipeline);
PipelineConfigurationValidator validator = new PipelineConfigurationValidator(stageLibrary, name, pipelineConfig);
pipelineConfig = validator.validate();
pipelineConfig = store.save(user, name, rev, description, pipelineConfig);{//CachePipelineStoreTask.save()
synchronized (lockCache.getLock(name)) {
PipelineConfiguration pipelineConf = pipelineStore.save(user, name, tag, tagDescription, pipeline);{
synchronized (lockCache.getLock(name)) {
if (!hasPipeline(name)) {
throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
}
PipelineInfo savedInfo = getInfo(name);
if (!savedInfo.getUuid().equals(pipeline.getUuid())) {
throw new PipelineStoreException(ContainerError.CONTAINER_0205, name);
}
if (pipelineStateStore != null) {
PipelineStatus pipelineStatus = pipelineStateStore.getState(name, tag).getStatus();
if (pipelineStatus.isActive()) {
throw new PipelineStoreException(ContainerError.CONTAINER_0208, pipelineStatus);
}
}
UUID uuid = UUID.randomUUID();
PipelineInfo info = new PipelineInfo();
try (
OutputStream infoFile = Files.newOutputStream(getInfoFile(name));
OutputStream pipelineFile = Files.newOutputStream(getPipelineFile(name));
){
pipeline.setUuid(uuid);
// 更新 info.json文件;
json.writeValue(infoFile, BeanHelper.wrapPipelineInfo(info));
// 更新 pipeline.json 文件;
json.writeValue(pipelineFile, BeanHelper.wrapPipelineConfiguration(pipeline));
if (pipelineStateStore != null) {
List<Issue> errors = new ArrayList<>();
PipelineBeanCreator.get().create(pipeline, errors, null);
// 更新 runInfo下 pipelineState.json文件;
pipelineStateStore.edited(user, name, tag, PipelineBeanCreator.get().getExecutionMode(pipeline, errors), false);
pipeline.getIssues().addAll(errors);
}
Map<String, Object> uiInfo = extractUiInfo(pipeline);
// 更新 uiinfo.json文件;
saveUiInfo(name, tag, uiInfo);
} catch (Exception ex) {
throw new PipelineStoreException(ContainerError.CONTAINER_0204, name, ex.toString(), ex);
}
pipeline.setPipelineInfo(info);
return pipeline;
}
}
pipelineInfoMap.put(name, pipelineConf.getInfo());
return pipelineConf;
}
}
return Response.ok().entity(BeanHelper.wrapPipelineConfiguration(pipelineConfig)).build();
}
当Running状态时, 更新saveUiInfo()
可以认为['STARTING', 'STARTING_ERROR', 'RUNNING', 'RUNNING_ERROR', 'RETRY', 'FINISHING', 'STOPPING', 'STOPPING_ERROR', 'CONNECTING', 'CONNECT_ERROR']这些状态只更新UI
# 当Running状态下: 拖动算子后, 向PipelineStoreResource.saveUiInfo()发送 保存UI的请求到 uiinfo.json 文件中
PipelineStoreResource.saveUiInfo(String name,String rev,Map uiInfo){
// 从CachePipelineStoreTask的内存(pipelineInfoMap:Map<String,PipelineInfo>)中 读取PipelineInfo对象;
PipelineInfo pipelineInfo = store[PipelineStoreTask].getInfo(name){//CachePipelineStoreTask.getInfo(String name)
PipelineInfo pipelineInfo = pipelineInfoMap.get(name);// pipelineInfoMap: Map<String, PipelineInfo>
if (pipelineInfo == null) {
throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
} else {
return pipelineInfo;
}
}
RestAPIUtils.injectPipelineInMDC(pipelineInfo.getTitle(), pipelineInfo.getPipelineId());
store.saveUiInfo(name, rev, uiInfo);{
pipelineStore.saveUiInfo(name, rev, uiInfo);{//FilePipelineStoreTask.saveUiInfo()
// /home/app/appdata/streamset/data/pipelines/ClusterOriginKafkaDemo255cbfd4-56ff-4c93-a94a-551721a9e80b/uiinfo.json
try (OutputStream uiInfoFile = Files.newOutputStream(getPipelineUiInfoFile(name))){
json.writeValue(uiInfoFile, uiInfo);
} catch (Exception ex) {
throw new PipelineStoreException(ContainerError.CONTAINER_0405, name, ex.toString(), ex);
}
}
}
return Response.ok().build();
}