flink之jobgraph---JobGraph

关于JobGraph的解读

  • 1.在flink里,JobGraph代表一个flink dataflow程序,最终无论是低级的api还是高级的api编写的program都会转换为JobGraph;

  • 2.JobGraph通过将graph的vertices和intermediate results以DAG的形式汇聚在一起,需要注意Iterations(feedback edges)不包括在JobGraph中,在一些特殊指定的vertices,彼此通过feeback channel关联起来;

  • 3.通过JobGraph来进行job级别的参数配置,能够为每个vertex和intermediate result设置当前operation和intermediate data的特点内容;

关于源码

一。定义的属性或字段

/** List of task vertices included in this job graph. */
// 定义当前job相关的vertex,采用链表的方式存储,采用vertextid作为key,保障vertexid在job唯一
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();

/** The job configuration attached to this job. */
// 定义当前job的配置信息
private final Configuration jobConfiguration = new Configuration();

/** ID of this job. May be set if specific job id is desired (e.g. session management) */
// job id用于区别不同的job
private final JobID jobID;

/** Name of this job. */
// job执行时的名称
private final String jobName;

/** The number of seconds after which the corresponding ExecutionGraph is removed at the
 * job manager after it has been executed. */
// 指定对应的ExecutionGraph在jobmanager执行有效期多久,超时后会被removed。 
private long sessionTimeout = 0;

/** flag to enable queued scheduling */
// job schedule使用采用queue的方式按序执行
private boolean allowQueuedScheduling;

/** The mode in which the job is scheduled */
// job被执行模式:默认lazy模式
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;

// --- checkpointing ---
// 以下进行checkpoint设置
/** Job specific execution config */
// 执行job execution相关配置
private SerializedValue<ExecutionConfig> serializedExecutionConfig;

/** The settings for the job checkpoints */
// 设置当前job的checkpoint设置
private JobCheckpointingSettings snapshotSettings;

/** Savepoint restore settings. */
// 设置job的savepoint的恢复设置:常用语job恢复,默认为none
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();

// --- attached resources ---
// job执行时需要的一些附加的设置
/** Set of JAR files required to run this job. */
// 当用户使用附加的jar时 可以通过操作userJars来指定额外jar的路径
private final List<Path> userJars = new ArrayList<Path>();

/** Set of custom files required to run this job. */
// 当job执行需要一些额外的自定义files,通过userArtifacts来进行配置
private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();

/** Set of blob keys identifying the JAR files required to run this job. */
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();

/** List of classpaths required to run this job. */
// 指定job时需要指定的classpath
private List<URL> classpaths = Collections.emptyList();

二。关于方法或函数

在JobGraph有两个构造JobGprah的函数
构造函数一:

/**
 * Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed),
 * the given name and the given execution configuration (see {@link ExecutionConfig}).
 * The ExecutionConfig will be serialized and can't be modified afterwards.
 *
 * @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
 * @param jobName The name of the job.
 */
 // 通过指定的jobId和jobName构造jobgraph:未指定jobid或jobname时,都有对应的默认值
 // 针对该job来设置execution config相关配置内容,默认直接产生初始的
 // 1.关于jobid随机生成:jobid可人工指定,也可自动随机生成 
 // 2.关于jobname未指定:直接使用“(unnamed job)”代替
 // 3.设置ExecutionConfig:通常用来配置program执行的行为,比较常用配置选项:
 //  3.1 parallelism:program执行默认的并行度;
 //  3.2 retries:执行时失败重试的次数
 //  3.3 delay:通常是和retires有关,两次重试需要的间隔,延迟时间
 //  3.4 execution mode:通常是batch和pipelined。默认是pipelined
 //  3.5 是否开启“closure cleaner”:关于“closure cleaner”用于对function实现进行预处理,比如“closure”是匿名的存在类的内部(内部类),它会移除一些未使用的“closure”的引用,来修复一些和序列化有关的问题并减少“closure”的大小
 // 3.6 注册type和serializer:通常为了提升处理“泛型”和“pojo”的性能,不仅仅需要返回声明的类型,还包括这些类型的subclass才会需要我们手动进行type和serializer注册/声明。
public JobGraph(JobID jobId, String jobName) {
   this.jobID = jobId == null ? new JobID() : jobId;
   this.jobName = jobName == null ? "(unnamed job)" : jobName;

   try {
      setExecutionConfig(new ExecutionConfig());
   } catch (IOException e) {
      // this should never happen, since an empty execution config is always serializable
      throw new RuntimeException("bug, empty execution config is not serializable");
   }
}

构造函数二:

// 从source开始来构建topology
// 1.根据jobgraph中已被注册的vertex构建一个list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
   // early out on empty lists
   if (this.taskVertices.isEmpty()) {
      return Collections.emptyList();
   }

   List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
   Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());

   // start by finding the vertices with no input edges
   // and the ones with disconnected inputs (that refer to some standalone data set)
   // 首先找到jobgraph中已注册的source vertex 
   {
     // 通过set来将重复的vertex进行去重 防止某个vertex重复被记录 形成循环
     // 故而使用LinkedHashMap来存放对应的vertex 以链表的形式存储
     // 1.首先遍历set中每个jobvertex
     // 2.判断该jobvertex是否有input 没有的话 即认为是source
     // 3.将所有的source存放list中,同时将对应的set中该jovvertex remove,防止重复记录
      Iterator<JobVertex> iter = remaining.iterator();
      while (iter.hasNext()) {
         JobVertex vertex = iter.next();

         if (vertex.hasNoConnectedInputs()) {
            sorted.add(vertex);
            iter.remove();
         }
      }
   }

   int startNodePos = 0;

   // traverse from the nodes that were added until we found all elements
   // 在前面的迭代循环中提取所有的source以及孤立的dataset,接下来要对sort中剩余的非source或孤立的dataset进行处理
   // 1.通过移动startNodePos的值 防止jobgraph形成循环: 不超过list中记录的source及孤立dataset个数
   // 2.以下操作的目的就是将jobgraph中属于source或孤立dataset提取出来,最终在streamgraph基础上形成jobgraph的topology
   while (!remaining.isEmpty()) {

      // first check if we have more candidates to start traversing from. if not, then the
      // graph is cyclic, which is not permitted
      // 首先要保证开始遍历的node位置仍处于list中,否则会导致对应的jobgraph就变循环的,是不能形成DAG topology的
      if (startNodePos >= sorted.size()) {
         throw new InvalidProgramException("The job graph is cyclic.");
      }

      JobVertex current = sorted.get(startNodePos++);
      addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
   }

   return sorted;
}

// 
// 1.首先收到所有的source输出的IntermediateDataSet
// 2.接着循环遍历每个IntermediateDataSet对应的consumer:JobEdge
// 3.再获取对应的JobEdge输出的target,并检查其target是否存在set中,不存在需要抛弃对应的jobedge
//  3.1 针对在set中的target ,则需要获取该target所有的input,同时需要移除重复的JobEdge
//  3.2 通过前面的3.1循环遍历出set中vertex没有predecessor的添加到target
//  3.3 循环迭代针对每个jobvertex进行如上的处理 直至所有的jobvertex遍历完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
  
   // forward traverse over all produced data sets and all their consumers
   // 以当前的jobvertex为主,获取其相关的输出结果IntermediateDataset,
   // 接着获取每个IntermediateDataset对应的consumer:JobEdge
   // 最终获取每个jobedge的输出接收的目标vertex:jobvertex(也就是我们需要遍历提取的jobvertex)
   // 接下来再对获取到的jobvertex判断是否存在set集合中
   // 不存在的 则直接忽略
   // 存在的则需要 需要进行逆向检查,从目标target的JobVertex开始逆向寻找“长辈级”jobvertex(需要剔除当前jobvertex,因为目标jobvertex就是从当前jobvertex获取的,没有必要重复检查)
   for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
      for (JobEdge edge : dataSet.getConsumers()) {

         // a vertex can be added, if it has no predecessors that are still in the 'remaining' set
         JobVertex v = edge.getTarget();
         if (!remaining.contains(v)) {
            continue;
         }

         boolean hasNewPredecessors = false;
          
         // 要检查
         for (JobEdge e : v.getInputs()) {
            // skip the edge through which we came
            // 防止jobedge重复,主要还是为了剔除从当前jobvertex流向目标jobvertex,没有必要重复检查
            if (e == edge) {
               continue;
            }

            // 获取该jobedge的source
            // 获取目标jobvertex所有的关联的jobedge对应的IntermediateDataSets
           // 这样就可以获取目标jobvertex对应的提供source的vertex
            IntermediateDataSet source = e.getSource();
            if (remaining.contains(source.getProducer())) {
               hasNewPredecessors = true;
               break;
            }
         }
         
         // 添加存在set中且没有predecessor的jobvertex,同时需要完成添加到target中并清除set中记录
         if (!hasNewPredecessors) {
            target.add(v);
            remaining.remove(v);
            addNodesThatHaveNoNewPredecessors(v, target, remaining);
         }
      }
   }
}

三。构建jobgraph

// 从source开始来构建topology
// 1.根据jobgraph中已被注册的vertex构建一个list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
   // early out on empty lists
   if (this.taskVertices.isEmpty()) {
      return Collections.emptyList();
   }

   List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
   Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());

   // start by finding the vertices with no input edges
   // and the ones with disconnected inputs (that refer to some standalone data set)
   // 首先找到jobgraph中已注册的source vertex 
   {
     // 通过set来将重复的vertex进行去重 防止某个vertex重复被记录 形成循环
     // 故而使用LinkedHashMap来存放对应的vertex 以链表的形式存储
     // 1.首先遍历set中每个jobvertex
     // 2.判断该jobvertex是否有input 没有的话 即认为是source
     // 3.将所有的source存放list中,同时将对应的set中该jovvertex remove,防止重复记录
      Iterator<JobVertex> iter = remaining.iterator();
      while (iter.hasNext()) {
         JobVertex vertex = iter.next();

         if (vertex.hasNoConnectedInputs()) {
            sorted.add(vertex);
            iter.remove();
         }
      }
   }

   int startNodePos = 0;

   // traverse from the nodes that were added until we found all elements
   // 在前面的迭代循环中提取所有的source,接下来要对sort中剩余的非source进行处理
   // 1.通过移动startNodePos的值 防止jobgraph形成循环: 不超过list中记录的source个数
   // 2.
   while (!remaining.isEmpty()) {

      // first check if we have more candidates to start traversing from. if not, then the
      // graph is cyclic, which is not permitted
      if (startNodePos >= sorted.size()) {
         throw new InvalidProgramException("The job graph is cyclic.");
      }

      JobVertex current = sorted.get(startNodePos++);
      addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
   }

   return sorted;
}

// 
// 1.首先收到所有的source输出的IntermediateDataSet
// 2.接着循环遍历每个IntermediateDataSet对应的consumer:JobEdge
// 3.再获取对应的JobEdge输出的target,并检查其target是否存在set中,不存在需要抛弃对应的jobedge
//  3.1 针对在set中的target ,则需要获取该target所有的input,同时需要移除重复的JobEdge
//  3.2 通过前面的3.1循环遍历出set中vertex没有predecessor的添加到target
//  3.3 循环迭代针对每个jobvertex进行如上的处理 直至所有的jobvertex遍历完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {

   // forward traverse over all produced data sets and all their consumers
   for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
      for (JobEdge edge : dataSet.getConsumers()) {

         // a vertex can be added, if it has no predecessors that are still in the 'remaining' set
         JobVertex v = edge.getTarget();
         if (!remaining.contains(v)) {
            continue;
         }

         boolean hasNewPredecessors = false;

         for (JobEdge e : v.getInputs()) {
            // skip the edge through which we came
            // 防止jobedge重复
            if (e == edge) {
               continue;
            }

            // 获取该jobedge的source
            IntermediateDataSet source = e.getSource();
            if (remaining.contains(source.getProducer())) {
               hasNewPredecessors = true;
               break;
            }
         }
         
         // 添加存在set中且没有predecessor的jobvertex,同时需要完成添加到target中并清除set中记录
         if (!hasNewPredecessors) {
            target.add(v);
            remaining.remove(v);
            addNodesThatHaveNoNewPredecessors(v, target, remaining);
         }
      }
   }
}

四。jobgraph结构

jobgraph

五。源码

JobGraph.java源码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容