Kubeflow简介
虽然KubeFlow提供了一大堆组件,涵盖了机器学习的方方面面,但模型训练肯定是KubeFlow最重要的功能。KubeFlow针对各种各样的机器学习框架提供了训练的能力。方式是定义了各种各样的Operator,这些Operator的本质,是K8S的Custom Resource。
Kubeflow Trainer源码分析
Kubeflow Trainer 通过Kubernetes自定义资源(CRDs)扩展了 Kubernetes API,定义、调度和监控训练作业。采用模块化、基于插件的架构,将基础设施管理与特定机器学习框架的操作分离开来,统一了v1版本的PytorchJob和MPIJob等。

Kubeflow Trainer架构
Reconcile是Kubernetes调度资源的核心操作。
pkg/controller/trainjob_controller.go
func (r *TrainJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var trainJob trainer.TrainJob
//...
err = r.reconcileObjects(ctx, runtime, &trainJob)
func (r *TrainJobReconciler) reconcileObjects(ctx context.Context, runtime jobruntimes.Runtime, trainJob *trainer.TrainJob) error {
objects, err := runtime.NewObjects(ctx, trainJob)
//...
for _, object := range objects {
//...
if err := r.client.Patch(ctx, obj, client.Apply, client.FieldOwner("trainer"), client.ForceOwnership); err != nil {
return err
}
var gvk schema.GroupVersionKind
if gvk, err = apiutil.GVKForObject(obj.DeepCopyObject(), r.client.Scheme()); err != nil {
return err
}
logKeysAndValues := []any{
"groupVersionKind", gvk.String(),
"namespace", obj.GetNamespace(),
"name", obj.GetName(),
}
}
runtime实现了插件机制:
pkg/runtime/framework/core/framework.go
type Framework struct {
registry fwkplugins.Registry
plugins map[string]framework.Plugin
enforceMLPlugins []framework.EnforceMLPolicyPlugin
enforcePodGroupPolicyPlugins []framework.EnforcePodGroupPolicyPlugin
customValidationPlugins []framework.CustomValidationPlugin
watchExtensionPlugins []framework.WatchExtensionPlugin
podNetworkPlugins []framework.PodNetworkPlugin
componentBuilderPlugins []framework.ComponentBuilderPlugin
terminalConditionPlugins []framework.TerminalConditionPlugin
}
机器学习框架对应的插件根据自身需要实现上面plugin的一个或多个接口。例如Torch插件实现了CustomValidationPlugin和EnforceMLPolicyPlugin接口。
pkg/runtime/framework/plugins/torch/torch.go
type Torch struct{}
var _ framework.EnforceMLPolicyPlugin = (*Torch)(nil)
var _ framework.CustomValidationPlugin = (*Torch)(nil)
func (t *Torch) Validate(runtimeInfo *runtime.Info, _, newObj *trainer.TrainJob) (admission.Warnings, field.ErrorList) {
}
func (t *Torch) EnforceMLPolicy(info *runtime.Info, trainJob *trainer.TrainJob) error {
}