关于JobGraph的解读
1.在flink里,JobGraph代表一个flink dataflow程序,最终无论是低级的api还是高级的api编写的program都会转换为JobGraph;
2.JobGraph通过将graph的vertices和intermediate results以DAG的形式汇聚在一起,需要注意Iterations(feedback edges)不包括在JobGraph中,在一些特殊指定的vertices,彼此通过feeback channel关联起来;
3.通过JobGraph来进行job级别的参数配置,能够为每个vertex和intermediate result设置当前operation和intermediate data的特点内容;
关于源码
一。定义的属性或字段
/** List of task vertices included in this job graph. */
// 定义当前job相关的vertex,采用链表的方式存储,采用vertextid作为key,保障vertexid在job唯一
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<JobVertexID, JobVertex>();
/** The job configuration attached to this job. */
// 定义当前job的配置信息
private final Configuration jobConfiguration = new Configuration();
/** ID of this job. May be set if specific job id is desired (e.g. session management) */
// job id用于区别不同的job
private final JobID jobID;
/** Name of this job. */
// job执行时的名称
private final String jobName;
/** The number of seconds after which the corresponding ExecutionGraph is removed at the
* job manager after it has been executed. */
// 指定对应的ExecutionGraph在jobmanager执行有效期多久,超时后会被removed。
private long sessionTimeout = 0;
/** flag to enable queued scheduling */
// job schedule使用采用queue的方式按序执行
private boolean allowQueuedScheduling;
/** The mode in which the job is scheduled */
// job被执行模式:默认lazy模式
private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
// --- checkpointing ---
// 以下进行checkpoint设置
/** Job specific execution config */
// 执行job execution相关配置
private SerializedValue<ExecutionConfig> serializedExecutionConfig;
/** The settings for the job checkpoints */
// 设置当前job的checkpoint设置
private JobCheckpointingSettings snapshotSettings;
/** Savepoint restore settings. */
// 设置job的savepoint的恢复设置:常用语job恢复,默认为none
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
// --- attached resources ---
// job执行时需要的一些附加的设置
/** Set of JAR files required to run this job. */
// 当用户使用附加的jar时 可以通过操作userJars来指定额外jar的路径
private final List<Path> userJars = new ArrayList<Path>();
/** Set of custom files required to run this job. */
// 当job执行需要一些额外的自定义files,通过userArtifacts来进行配置
private final Map<String, DistributedCache.DistributedCacheEntry> userArtifacts = new HashMap<>();
/** Set of blob keys identifying the JAR files required to run this job. */
private final List<PermanentBlobKey> userJarBlobKeys = new ArrayList<>();
/** List of classpaths required to run this job. */
// 指定job时需要指定的classpath
private List<URL> classpaths = Collections.emptyList();
二。关于方法或函数
在JobGraph有两个构造JobGprah的函数
构造函数一:
/**
* Constructs a new job graph with the given job ID (or a random ID, if {@code null} is passed),
* the given name and the given execution configuration (see {@link ExecutionConfig}).
* The ExecutionConfig will be serialized and can't be modified afterwards.
*
* @param jobId The id of the job. A random ID is generated, if {@code null} is passed.
* @param jobName The name of the job.
*/
// 通过指定的jobId和jobName构造jobgraph:未指定jobid或jobname时,都有对应的默认值
// 针对该job来设置execution config相关配置内容,默认直接产生初始的
// 1.关于jobid随机生成:jobid可人工指定,也可自动随机生成
// 2.关于jobname未指定:直接使用“(unnamed job)”代替
// 3.设置ExecutionConfig:通常用来配置program执行的行为,比较常用配置选项:
// 3.1 parallelism:program执行默认的并行度;
// 3.2 retries:执行时失败重试的次数
// 3.3 delay:通常是和retires有关,两次重试需要的间隔,延迟时间
// 3.4 execution mode:通常是batch和pipelined。默认是pipelined
// 3.5 是否开启“closure cleaner”:关于“closure cleaner”用于对function实现进行预处理,比如“closure”是匿名的存在类的内部(内部类),它会移除一些未使用的“closure”的引用,来修复一些和序列化有关的问题并减少“closure”的大小
// 3.6 注册type和serializer:通常为了提升处理“泛型”和“pojo”的性能,不仅仅需要返回声明的类型,还包括这些类型的subclass才会需要我们手动进行type和serializer注册/声明。
public JobGraph(JobID jobId, String jobName) {
this.jobID = jobId == null ? new JobID() : jobId;
this.jobName = jobName == null ? "(unnamed job)" : jobName;
try {
setExecutionConfig(new ExecutionConfig());
} catch (IOException e) {
// this should never happen, since an empty execution config is always serializable
throw new RuntimeException("bug, empty execution config is not serializable");
}
}
构造函数二:
// 从source开始来构建topology
// 1.根据jobgraph中已被注册的vertex构建一个list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
// early out on empty lists
if (this.taskVertices.isEmpty()) {
return Collections.emptyList();
}
List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
// start by finding the vertices with no input edges
// and the ones with disconnected inputs (that refer to some standalone data set)
// 首先找到jobgraph中已注册的source vertex
{
// 通过set来将重复的vertex进行去重 防止某个vertex重复被记录 形成循环
// 故而使用LinkedHashMap来存放对应的vertex 以链表的形式存储
// 1.首先遍历set中每个jobvertex
// 2.判断该jobvertex是否有input 没有的话 即认为是source
// 3.将所有的source存放list中,同时将对应的set中该jovvertex remove,防止重复记录
Iterator<JobVertex> iter = remaining.iterator();
while (iter.hasNext()) {
JobVertex vertex = iter.next();
if (vertex.hasNoConnectedInputs()) {
sorted.add(vertex);
iter.remove();
}
}
}
int startNodePos = 0;
// traverse from the nodes that were added until we found all elements
// 在前面的迭代循环中提取所有的source以及孤立的dataset,接下来要对sort中剩余的非source或孤立的dataset进行处理
// 1.通过移动startNodePos的值 防止jobgraph形成循环: 不超过list中记录的source及孤立dataset个数
// 2.以下操作的目的就是将jobgraph中属于source或孤立dataset提取出来,最终在streamgraph基础上形成jobgraph的topology
while (!remaining.isEmpty()) {
// first check if we have more candidates to start traversing from. if not, then the
// graph is cyclic, which is not permitted
// 首先要保证开始遍历的node位置仍处于list中,否则会导致对应的jobgraph就变循环的,是不能形成DAG topology的
if (startNodePos >= sorted.size()) {
throw new InvalidProgramException("The job graph is cyclic.");
}
JobVertex current = sorted.get(startNodePos++);
addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
}
return sorted;
}
//
// 1.首先收到所有的source输出的IntermediateDataSet
// 2.接着循环遍历每个IntermediateDataSet对应的consumer:JobEdge
// 3.再获取对应的JobEdge输出的target,并检查其target是否存在set中,不存在需要抛弃对应的jobedge
// 3.1 针对在set中的target ,则需要获取该target所有的input,同时需要移除重复的JobEdge
// 3.2 通过前面的3.1循环遍历出set中vertex没有predecessor的添加到target
// 3.3 循环迭代针对每个jobvertex进行如上的处理 直至所有的jobvertex遍历完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
// forward traverse over all produced data sets and all their consumers
// 以当前的jobvertex为主,获取其相关的输出结果IntermediateDataset,
// 接着获取每个IntermediateDataset对应的consumer:JobEdge
// 最终获取每个jobedge的输出接收的目标vertex:jobvertex(也就是我们需要遍历提取的jobvertex)
// 接下来再对获取到的jobvertex判断是否存在set集合中
// 不存在的 则直接忽略
// 存在的则需要 需要进行逆向检查,从目标target的JobVertex开始逆向寻找“长辈级”jobvertex(需要剔除当前jobvertex,因为目标jobvertex就是从当前jobvertex获取的,没有必要重复检查)
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
for (JobEdge edge : dataSet.getConsumers()) {
// a vertex can be added, if it has no predecessors that are still in the 'remaining' set
JobVertex v = edge.getTarget();
if (!remaining.contains(v)) {
continue;
}
boolean hasNewPredecessors = false;
// 要检查
for (JobEdge e : v.getInputs()) {
// skip the edge through which we came
// 防止jobedge重复,主要还是为了剔除从当前jobvertex流向目标jobvertex,没有必要重复检查
if (e == edge) {
continue;
}
// 获取该jobedge的source
// 获取目标jobvertex所有的关联的jobedge对应的IntermediateDataSets
// 这样就可以获取目标jobvertex对应的提供source的vertex
IntermediateDataSet source = e.getSource();
if (remaining.contains(source.getProducer())) {
hasNewPredecessors = true;
break;
}
}
// 添加存在set中且没有predecessor的jobvertex,同时需要完成添加到target中并清除set中记录
if (!hasNewPredecessors) {
target.add(v);
remaining.remove(v);
addNodesThatHaveNoNewPredecessors(v, target, remaining);
}
}
}
}
三。构建jobgraph
// 从source开始来构建topology
// 1.根据jobgraph中已被注册的vertex构建一个list用于
public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {
// early out on empty lists
if (this.taskVertices.isEmpty()) {
return Collections.emptyList();
}
List<JobVertex> sorted = new ArrayList<JobVertex>(this.taskVertices.size());
Set<JobVertex> remaining = new LinkedHashSet<JobVertex>(this.taskVertices.values());
// start by finding the vertices with no input edges
// and the ones with disconnected inputs (that refer to some standalone data set)
// 首先找到jobgraph中已注册的source vertex
{
// 通过set来将重复的vertex进行去重 防止某个vertex重复被记录 形成循环
// 故而使用LinkedHashMap来存放对应的vertex 以链表的形式存储
// 1.首先遍历set中每个jobvertex
// 2.判断该jobvertex是否有input 没有的话 即认为是source
// 3.将所有的source存放list中,同时将对应的set中该jovvertex remove,防止重复记录
Iterator<JobVertex> iter = remaining.iterator();
while (iter.hasNext()) {
JobVertex vertex = iter.next();
if (vertex.hasNoConnectedInputs()) {
sorted.add(vertex);
iter.remove();
}
}
}
int startNodePos = 0;
// traverse from the nodes that were added until we found all elements
// 在前面的迭代循环中提取所有的source,接下来要对sort中剩余的非source进行处理
// 1.通过移动startNodePos的值 防止jobgraph形成循环: 不超过list中记录的source个数
// 2.
while (!remaining.isEmpty()) {
// first check if we have more candidates to start traversing from. if not, then the
// graph is cyclic, which is not permitted
if (startNodePos >= sorted.size()) {
throw new InvalidProgramException("The job graph is cyclic.");
}
JobVertex current = sorted.get(startNodePos++);
addNodesThatHaveNoNewPredecessors(current, sorted, remaining);
}
return sorted;
}
//
// 1.首先收到所有的source输出的IntermediateDataSet
// 2.接着循环遍历每个IntermediateDataSet对应的consumer:JobEdge
// 3.再获取对应的JobEdge输出的target,并检查其target是否存在set中,不存在需要抛弃对应的jobedge
// 3.1 针对在set中的target ,则需要获取该target所有的input,同时需要移除重复的JobEdge
// 3.2 通过前面的3.1循环遍历出set中vertex没有predecessor的添加到target
// 3.3 循环迭代针对每个jobvertex进行如上的处理 直至所有的jobvertex遍历完
private void addNodesThatHaveNoNewPredecessors(JobVertex start, List<JobVertex> target, Set<JobVertex> remaining) {
// forward traverse over all produced data sets and all their consumers
for (IntermediateDataSet dataSet : start.getProducedDataSets()) {
for (JobEdge edge : dataSet.getConsumers()) {
// a vertex can be added, if it has no predecessors that are still in the 'remaining' set
JobVertex v = edge.getTarget();
if (!remaining.contains(v)) {
continue;
}
boolean hasNewPredecessors = false;
for (JobEdge e : v.getInputs()) {
// skip the edge through which we came
// 防止jobedge重复
if (e == edge) {
continue;
}
// 获取该jobedge的source
IntermediateDataSet source = e.getSource();
if (remaining.contains(source.getProducer())) {
hasNewPredecessors = true;
break;
}
}
// 添加存在set中且没有predecessor的jobvertex,同时需要完成添加到target中并清除set中记录
if (!hasNewPredecessors) {
target.add(v);
remaining.remove(v);
addNodesThatHaveNoNewPredecessors(v, target, remaining);
}
}
}
}