SDC-Cluster-PipelineStoreResource-原理源码

PipelineStoreResource.getPipeline(pid)
PipelineStoreResource.savePipeline()
PipelineStoreResource.saveUiInfo()

PipelineStoreResource.getPipeline(pid)


# 向Spark-Executor上的一个Slave实例SDC发送 getPipeline(pid)请求,响应流程:

PipelineStoreResource.getPipelineInfo(String name){
    // 1. 先从磁盘加载其 pipelineId/info.json
    PipelineInfo pipelineInfo = store[PipelineStoreTask].getInfo(name){//SlavePipelineStoreTask.getInfo(String name)
        return pipelineStore[PipelineStoreTask].getInfo(name){//FilePipelineStoreTask.getInfo(String name)
            return getInfo(name, false){//FilePipelineStoreTask.getInfo(String name, boolean checkExistence)
                    synchronized (lockCache.getLock(name)) {
                      if (checkExistence && !hasPipeline(name)) {
                        throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
                      }
                      // 获取该SlaveSDC对应的Data目录,一般是在该容器所在机器本地的Container目录: nm-local-dir 下 containerId位置;
                      Path filePath = getInfoFile(name){
                        return getPipelineDir(name).resolve(INFO_FILE);{
                            getPipelineDir(name){
                                return storeDir[UnixPath].resolve(PipelineUtils.escapedPipelineName(name));{
                                    // storeDir = /home/app/appdata/hadoop/dataDir/tmpDir/nm-local-dir/usercache/app/appcache/application_1585661170516_0006/container_1585661170516_0006_01_000002/data/pipelines
                                }
                            }
                            .resolve(INFO_FILE);
                        }
                      }
                      try (InputStream infoFile = Files.newInputStream(filePath)) {
                        PipelineInfoJson pipelineInfoJsonBean = json.readValue(infoFile, PipelineInfoJson.class);
                        return pipelineInfoJsonBean.getPipelineInfo();
                      } catch (Exception ex) {
                        throw new PipelineStoreException(ContainerError.CONTAINER_0206, name, ex);
                      }
                    }
                
            }
        }
    }
    String title = name;
    
    // 
    switch (get) {
      case "pipeline":
        PipelineConfiguration pipeline = store.load(name, rev);{//SlavePipelineStoreTask.load(String name, String tagOrRev)
            return pipelineStore.load(name, tagOrRev);{//FilePipelineStoreTask.load(String name, String tagOrRev)
                synchronized (lockCache.getLock(name)) {
                  if (!hasPipeline(name)) {
                    throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
                  }
                  // getPipelineFile()是读取pipeline.json, 而getInfo()->getInfoFile()是读取 info.json文件;
                  /*
                  * info.json :     464B 字节; createed, lastModified, uuid, sdcId, metadate信息;
                  * pipeline.json:  15.2 KB:  其info字段及为 info.json的内容, 此外还包括几个大字段:
                        - configuration: Pipeline相关配置: exeMode, deliveryGuraantee等;
                        - stages:   各Stages的配置情况;
                        - startEventStages + stopEventStages; 
                        - issues;
                  */
                  Path pipelineFile = getPipelineFile(name);
                  try (InputStream pipelineFile = Files.newInputStream(getPipelineFile(name))) {
                    PipelineInfo info = getInfo(name);
                    // 
                    PipelineConfigurationJson pipelineConfigBean=json.readValue(pipelineFile, PipelineConfigurationJson.class);
                    PipelineConfiguration pipeline = pipelineConfigBean.getPipelineConfiguration();
                    pipeline.setPipelineInfo(info);

                    Map<String, Map> uiInfo;
                    if (Files.exists(getPipelineUiInfoFile(name))) {
                      try (InputStream uiInfoFile = Files.newInputStream(getPipelineUiInfoFile(name))) {
                        uiInfo = json.readValue(uiInfoFile, Map.class);
                        pipeline = injectUiInfo(uiInfo, pipeline);
                      }
                    }

                    return pipeline;
                  }
                  catch (Exception ex) {
                    throw new PipelineStoreException(ContainerError.CONTAINER_0206, name, ex.toString(), ex);
                  }
                }
                
            }
        }
        PipelineConfigurationValidator validator = new PipelineConfigurationValidator(stageLibrary, name, pipeline);
        // 对于从本地加载到的Pipeline.json, 还要validate()校验一下;否则可能显示不了;
        pipeline = validator.validate();
        data = BeanHelper.wrapPipelineConfiguration(pipeline);// 用PipelineConfigurationJson(pipelineConfiguration)对象包装输出;
        title = pipeline.getTitle() != null ? pipeline.getTitle() : pipeline.getInfo().getPipelineId();
        break;
      case "info":
        data = BeanHelper.wrapPipelineInfo(store.getInfo(name));
        break;
      case "history":// 返回的 PipelineRevInfo 对象仅简单封装了 date,user,rev字段; 可能以后版本再实现pipelineConfigHistory?
        data = BeanHelper.wrapPipelineRevInfo(store.getHistory(name));
        break;
      default:
        throw new IllegalArgumentException(Utils.format("Invalid value for parameter 'get': {}", get));
    }

    if (attachment) {
      Map<String, Object> envelope = new HashMap<String, Object>();
      envelope.put("pipelineConfig", data);

      RuleDefinitions ruleDefinitions = store.retrieveRules(name, rev);
      envelope.put("pipelineRules", BeanHelper.wrapRuleDefinitions(ruleDefinitions));

      return Response.ok().
          header("Content-Disposition", "attachment; filename=\"" + title + ".json\"").
          type(MediaType.APPLICATION_JSON).entity(envelope).build();
    } else {
      return Response.ok().type(MediaType.APPLICATION_JSON).entity(data).build();
    }
}

报错PipelineConfig,并更新4个文件

# 当Stopped, StoppedError, Edit等非Running状态下时:拖拽算子触发 savePipeline(),更新pipeline.json/info.json/uiinfo.json, pipelineState.json4 个文件
PipelineStoreResource.savePipeline(String name,String rev,String description,PipelineConfigurationJson pipeline){
    if (store.isRemotePipeline(name, rev)) {
      throw new PipelineException(ContainerError.CONTAINER_01101, "SAVE_PIPELINE", name);
    }
    PipelineInfo pipelineInfo = store.getInfo(name);
    RestAPIUtils.injectPipelineInMDC(pipelineInfo.getTitle(), pipelineInfo.getPipelineId());
    PipelineConfiguration pipelineConfig = BeanHelper.unwrapPipelineConfiguration(pipeline);
    PipelineConfigurationValidator validator = new PipelineConfigurationValidator(stageLibrary, name, pipelineConfig);
    pipelineConfig = validator.validate();
    pipelineConfig = store.save(user, name, rev, description, pipelineConfig);{//CachePipelineStoreTask.save()
        synchronized (lockCache.getLock(name)) {
          PipelineConfiguration pipelineConf = pipelineStore.save(user, name, tag, tagDescription, pipeline);{
              
              synchronized (lockCache.getLock(name)) {
              if (!hasPipeline(name)) {
                throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
              }
              PipelineInfo savedInfo = getInfo(name);
              if (!savedInfo.getUuid().equals(pipeline.getUuid())) {
                throw new PipelineStoreException(ContainerError.CONTAINER_0205, name);
              }
              if (pipelineStateStore != null) {
                PipelineStatus pipelineStatus = pipelineStateStore.getState(name, tag).getStatus();
                if (pipelineStatus.isActive()) {
                  throw new PipelineStoreException(ContainerError.CONTAINER_0208, pipelineStatus);
                }
              }
              UUID uuid = UUID.randomUUID();
              PipelineInfo info = new PipelineInfo();
              try (
                  OutputStream infoFile = Files.newOutputStream(getInfoFile(name));
                  OutputStream pipelineFile = Files.newOutputStream(getPipelineFile(name));
                ){
                pipeline.setUuid(uuid);
                // 更新 info.json文件;
                json.writeValue(infoFile, BeanHelper.wrapPipelineInfo(info));
                // 更新 pipeline.json 文件;
                json.writeValue(pipelineFile, BeanHelper.wrapPipelineConfiguration(pipeline));
                if (pipelineStateStore != null) {
                  List<Issue> errors = new ArrayList<>();
                  PipelineBeanCreator.get().create(pipeline, errors, null);
                  // 更新 runInfo下 pipelineState.json文件;
                  pipelineStateStore.edited(user, name, tag,  PipelineBeanCreator.get().getExecutionMode(pipeline, errors), false);
                  pipeline.getIssues().addAll(errors);
                }

                Map<String, Object> uiInfo = extractUiInfo(pipeline);
                
                // 更新 uiinfo.json文件;
                saveUiInfo(name, tag, uiInfo);

              } catch (Exception ex) {
                throw new PipelineStoreException(ContainerError.CONTAINER_0204, name, ex.toString(), ex);
              }
              pipeline.setPipelineInfo(info);
              return pipeline;
            }
              
          }
          pipelineInfoMap.put(name, pipelineConf.getInfo());
          return pipelineConf;
        }
    }
    
    return Response.ok().entity(BeanHelper.wrapPipelineConfiguration(pipelineConfig)).build();
    
}

当Running状态时, 更新saveUiInfo()


 可以认为['STARTING', 'STARTING_ERROR', 'RUNNING', 'RUNNING_ERROR', 'RETRY', 'FINISHING', 'STOPPING', 'STOPPING_ERROR', 'CONNECTING', 'CONNECT_ERROR']这些状态只更新UI

# 当Running状态下: 拖动算子后, 向PipelineStoreResource.saveUiInfo()发送 保存UI的请求到 uiinfo.json 文件中
PipelineStoreResource.saveUiInfo(String name,String rev,Map uiInfo){
    // 从CachePipelineStoreTask的内存(pipelineInfoMap:Map<String,PipelineInfo>)中 读取PipelineInfo对象;
    PipelineInfo pipelineInfo = store[PipelineStoreTask].getInfo(name){//CachePipelineStoreTask.getInfo(String name)
        PipelineInfo pipelineInfo = pipelineInfoMap.get(name);// pipelineInfoMap: Map<String, PipelineInfo>
        if (pipelineInfo == null) {
          throw new PipelineStoreException(ContainerError.CONTAINER_0200, name);
        } else {
          return pipelineInfo;
        }
    }
    
    RestAPIUtils.injectPipelineInMDC(pipelineInfo.getTitle(), pipelineInfo.getPipelineId());
    store.saveUiInfo(name, rev, uiInfo);{
        pipelineStore.saveUiInfo(name, rev, uiInfo);{//FilePipelineStoreTask.saveUiInfo()
            // /home/app/appdata/streamset/data/pipelines/ClusterOriginKafkaDemo255cbfd4-56ff-4c93-a94a-551721a9e80b/uiinfo.json
            try (OutputStream uiInfoFile = Files.newOutputStream(getPipelineUiInfoFile(name))){
              json.writeValue(uiInfoFile, uiInfo);
            } catch (Exception ex) {
              throw new PipelineStoreException(ContainerError.CONTAINER_0405, name, ex.toString(), ex);
            }
        }
    }
    
    return Response.ok().build();
}


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容