Job是高性能工作负载的基本对象,本文档提供了在volcano中定义job的方法
对job的定义和k8s中对job的定义方法类似,如Status,Spec等,下面描述了job的主要功能
-
Multiple Pod Template
由于大多数高性能工作负载包含不同类型的任务,例如TensorFlow(PS/Worker)、SPark(驱动/执行器);Job通过taskSpecs来支持multiple pod template
,定义如下。此外Policies
将在Error Handling部分中描述。
// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
...
// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,5,opt,name=tasks"`
}
// TaskSpec specifies the task specification of Job
type TaskSpec struct {
// Name specifies the name of task
Name string `json:"name,omitempty" protobuf:"bytes,1,opt,name=name"`
// Replicas specifies the replicas of this TaskSpec in Job
Replicas int32 `json:"replicas,omitempty" protobuf:"bytes,2,opt,name=replicas"`
// Specifies the pod that will be created for this TaskSpec
// when executing a Job
Template v1.PodTemplateSpec `json:"template,omitempty" protobuf:"bytes,3,opt,name=template"`
// Specifies the lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}
JobController
会基于spec.tasks
中的templates
和 replicas
来创建pods; pod的the controlled OwnerReference
会被分给job. 如下是 YAML with multiple pod template的示例.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
tasks:
- name: "ps"
replicas: 2
template:
spec:
containers:
- name: ps
image: ps-img
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: worker
image: worker-img
-
Job Input/Output
多数高性能工作负载任务会有数据读写,如下是任务读写的示例
type VolumeSpec struct {
MountPath string `json:"mountPath" protobuf:"bytes,1,opt,name=mountPath"`
// defined the PVC name
// + optional
VolumeClaimName string `json:"volumeClaimName,omitempty" protobuf:"bytes,2,opt,name=volumeClaimName"`
// VolumeClaim defines the PVC used by the VolumeSpec.
// + optional
VolumeClaim *PersistentVolumeClaim `json:"claim,omitempty" protobuf:"bytes,3,opt,name=claim"`
}
type JobSpec struct{
...
// The volumes mount on Job
// +optional
Volumes []VolumeSpec `json:"volumes,omitempty" protobuf:"bytes,1,opt,name=volumes"`
}
Job的Volumes
可以为nil
,也就意味着用户可以自行管理数据。如果*VolumeSpec.volumeClaim
是 nil
且 *VolumeSpec.volumeClaimName
是 nil
或不存在于 PersistentVolumeClaim,emptyDir
volume 将被用于每个 Task/Pod.
-
Conditions and Phases
The following phases are introduced to give a simple, high-level summary of where the Job is in its lifecycle; and the conditions array, the reason and message field contain more detail about the job’s status.
type JobPhase string
const (
// Pending is the phase that job is pending in the queue, waiting for scheduling decision
Pending JobPhase = "Pending"
// Aborting is the phase that job is aborted, waiting for releasing pods
Aborting JobPhase = "Aborting"
// Aborted is the phase that job is aborted by user or error handling
Aborted JobPhase = "Aborted"
// Running is the phase that minimal available tasks of Job are running
Running JobPhase = "Running"
// Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
Restarting JobPhase = "Restarting"
// Completed is the phase that all tasks of Job are completed successfully
Completed JobPhase = "Completed"
// Terminating is the phase that the Job is terminated, waiting for releasing pods
Terminating JobPhase = "Terminating"
// Teriminated is the phase that the job is finished unexpected, e.g. events
Teriminated JobPhase = "Terminated"
)
// JobState contains details for the current state of the job.
type JobState struct {
// The phase of Job.
// +optional
Phase JobPhase `json:"phase,omitempty" protobuf:"bytes,1,opt,name=phase"`
// Unique, one-word, CamelCase reason for the phase's last transition.
// +optional
Reason string `json:"reason,omitempty" protobuf:"bytes,2,opt,name=reason"`
// Human-readable message indicating details about last transition.
// +optional
Message string `json:"message,omitempty" protobuf:"bytes,3,opt,name=message"`
}
// JobStatus represents the current state of a Job
type JobStatus struct {
// Current state of Job.
State JobState `json:"state,omitempty" protobuf:"bytes,1,opt,name=state"`
......
}
The following table shows available transactions between different phases. The phase can not transfer to the target phase if the cell is empty.
Restarting
, Aborting
and Terminating
are temporary states to avoid race condition, e.g. there’ll be several PodeEvictedEvents
because of TerminateJobAction
which should not be handled again.
-
Error Handling
job创建后会有相关事件,如Pod succeeded, Pod failed。因此LifecyclePolicy
基于用户的配置来处理不同事件
// Event is the type of Event related to the Job
type Event string
const (
// AllEvents means all event
AllEvents Event = "*"
// PodFailedEvent is triggered if Pod was failed
PodFailedEvent Event = "PodFailed"
// PodEvictedEvent is triggered if Pod was deleted
PodEvictedEvent Event = "PodEvicted"
// These below are several events can lead to job 'Unknown'
// 1. Task Unschedulable, this is triggered when part of
// pods can't be scheduled while some are already running in gang-scheduling case.
JobUnknownEvent Event = "Unknown"
// OutOfSyncEvent is triggered if Pod/Job were updated
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"
)
// Action is the type of event handling
type Action string
const (
// AbortJobAction if this action is set, the whole job will be aborted:
// all Pod of Job will be evicted, and no Pod will be recreated
AbortJobAction Action = "AbortJob"
// RestartJobAction if this action is set, the whole job will be restarted
RestartJobAction Action = "RestartJob"
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
// CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"
// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
// SyncJobAction is the action to sync Job/Pod status.
SyncJobAction Action = "SyncJob"
)
// LifecyclePolicy specifies the lifecycle and error handling of task and job.
type LifecyclePolicy struct {
Event Event `json:"event,omitempty" protobuf:"bytes,1,opt,name=event"`
Action Action `json:"action,omitempty" protobuf:"bytes,2,opt,name=action"`
Timeout *metav1.Duration `json:"timeout,omitempty" protobuf:"bytes,3,opt,name=timeout"`
}
JobSpec
和 TaskSpec
都包含 lifecycle policy: JobSpec
中的policies为默认policy 如果 TaskSpec
中为空; TaskSpec
中的policy会覆盖默认的policy
// JobSpec describes how the job execution will look like and when it will actually run
type JobSpec struct {
...
// Specifies the default lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,5,opt,name=policies"`
// Tasks specifies the task specification of Job
// +optional
Tasks []TaskSpec `json:"tasks,omitempty" protobuf:"bytes,6,opt,name=tasks"`
}
// TaskSpec specifies the task specification of Job
type TaskSpec struct {
...
// Specifies the lifecycle of tasks
// +optional
Policies []LifecyclePolicy `json:"policies,omitempty" protobuf:"bytes,4,opt,name=policies"`
}
下面是job和task的LifecyclePolicy
使用示范
对于ml,如果任意task失败或者evicted了,那么job应当被重启。为简化配置,job层的LifecyclePolicy
将配置如下,如果task没有设置LifecyclePolicy
,那么所有task会使用spec.policies
中的policy
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
# If any event here, restart the whole job.
policies:
- event: *
action: RestartJob
tasks:
- name: "ps"
replicas: 1
template:
spec:
containers:
- name: ps
image: ps-img
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: worker
image: worker-img
...
一些大数据框架如spark会有不同需求,如果driver task失败那么整个job将重启,如果executor task失败那么只需重启task,OnFailure
用于executor而RestartJob
用于driver的spec.tasks.policies
,如下所示
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
tasks:
- name: "driver"
replicas: 1
policies:
- event: *
action: RestartJob
template:
spec:
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
restartPolicy: OnFailure
-
Features Interaction
Admission Controller
必须包括下列验证以保证期望运行- spec.minAvailable <= sum(spec.taskSpecs.replicas)
- no duplicated name in spec.taskSpecs array
- no duplicated event handler in LifecyclePolicy array, both job policies and task policies
CoScheduling
也就是gang-scheduling,spec.minAvailable
用于指明会同时调度多少个pods,spec.minAvailable
的默认值is the summary ofspec.tasks.replicas
。
如果spec.minAvailable
> sum(spec.tasks.replicas
)那么job的创建会被拒绝
如果spec.minAvailable
< sum(spec.tasks.replicas
), 那么spec.tasks
的pod会被随机创建
参照Task Priority within Job部分查看如何按顺序创建task
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: tf-job
spec:
# minAvailable to run job
minAvailable: 6
tasks:
- name: "ps"
replicas: 1
template:
spec:
containers:
- name: "ps"
image: "ps-img"
- name: "worker"
replicas: 5
template:
spec:
containers:
- name: "worker"
image: "worker-img"
Task Priority within Job
除了multiple pod template,每个任务的优先级可能不同。PodTemplate
中的PriorityClass
用于定义job中task的优先级。下面是运行spark job的一个示例,1 driver with 5 executors, the driver’s priority is master-pri which is higher than normal pods; as spec.minAvailable is 3, the scheduler will make sure one driver with 2 executors will be scheduled if not enough resources.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
minAvailable: 3
tasks:
- name: "driver"
replicas: 1
template:
spec:
priorityClass: "master-pri"
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
注意:虽然高优先级pod会优先调度,但kubelet之间还是存在race condition以至于低优先级pod可能会先启动。 job/task dependency
用于处理这种race condition
Resource sharing between Job
默认情况下spec.minAvailable
is set to the summary of spec.tasks.replicas
,如果设定为一个更小的值,那么超出spec.minAvailable
部分的pod会共享jobs之间的资源
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: spark-job
spec:
minAvailable: 3
tasks:
- name: "driver"
replicas: 1
template:
spec:
priorityClass: "master-pri"
containers:
- name: driver
image: driver-img
- name: "executor"
replicas: 5
template:
spec:
containers:
- name: executor
image: executor-img
Plugins for Job
As many jobs of AI frame, e.g. TensorFlow, MPI, Mxnet, need set env, pods communicate, ssh sign in without password. We provide Job api plugins to give users a better focus on core business. Now we have three plugins, every plugin has parameters, if not provided, we use default.
env: set VK_TASK_INDEX to each container, is a index for giving the identity to container.
svc: create Serivce and *.host to enable pods communicate.
ssh: sign in ssh without password, e.g. use command mpirun or mpiexec.
apiVersion: batch.volcano.sh/v1alpha1
kind: Job
metadata:
name: mpi-job
spec:
minAvailable: 2
schedulerName: scheduler
policies:
- event: PodEvicted
action: RestartJob
plugins:
ssh: []
env: []
svc: []
tasks:
- replicas: 1
name: mpimaster
template:
spec:
containers:
image: mpi-image
name: mpimaster
- replicas: 2
name: mpiworker
template:
spec:
containers:
image: mpi-image
name: mpiworker