需求背景:
k8s为了和存储厂商解耦,定义了csi接口规范,存储厂商按照csi规范实现接。csi最核心的几个接口:创建卷、进行卷挂载。创建卷即访问第三方存储服务创建存储卷,卷挂载即在容器启动时找到卷挂载进容器运行目录。这样容器进程在运行中就可以像本地写文件一样在存储卷中写数据了。那么存储卷能不能挂载到宿主机的任意目录呢?如共享存储cephfs的存储卷, 一端挂载到容器做数据推理,一端挂载到slurm集群用户的的工作目录。则slurm用户在工作目录下可以实时读到推理数据的结果进一步使用slurm脚本做训练。
流程图:
代码流程
创建csi客户端
func newCsiDriverClient(driverName csiDriverName, endpoint string) (*csiDriverClient, error) {
if driverName == "" {
return nil, fmt.Errorf("driver name is empty")
}
nodeV1ClientCreator := newV1NodeClient
return &csiDriverClient{
driverName: driverName,
addr: csiAddr(endpoint),
nodeV1ClientCreator: nodeV1ClientCreator,
}, nil
}
csiClient, err := newCsiDriverClient("cephfs.csi.ceph.com", "/var/lib/kubelet/plugins/cephfs.csi.ceph.com/csi.sock")
从从pvc找到pv,从pv详情中找到存储卷id,以及挂载属性
volumeId := pv.Spec.CSI.VolumeHandle
volumeContext := map[string]string{}
for k, v := range pv.Spec.CSI.VolumeAttributes {
volumeContext[k] = v
}
将存储卷挂载到csi目录
func getStageMountPath(pv api.PersistentVolume) string {
result := sha256.Sum256([]byte(fmt.Sprintf("%s", &pv.Spec.CSI.VolumeHandle)))
volSha := fmt.Sprintf("%x", result)
driver := "cephfs.csi.ceph.com"
return filepath.Join("/var/lib/kubelet/plugins/kubernetes.io/csi/", driver, volSha, globalMountInGlobalPath)
}
tageVolumePath := getStageMountPath(pv)
// 创建存储卷csi挂载目录
if err = MkdirAllWithPathCheck(stageVolumePath, 0750); err != nil {
log.Infof("mkdir stageVolumePath err, %v", err)
return err
}
// 将存储卷挂载到csi目录(身份信息从csi对接secret进行获取)
err = csiClient.NodeStageVolume(
context.Background(),
volumeId,
map[string]string{},
stageVolumePath,
"",
api.ReadWriteMany,
map[string]string{"adminID": "admin", "adminKey": "AQCXnExlOddSCBAA5H2Ig0SFekW5dkRo+InHVQ==", "userID": "admin", "userKey": "AQCXnExlOddSCBAA5H2Ig0SFekW5dkRo+InHVQ=="}, // secretes
volumeContext,
[]string{},
nil,
)
将存储卷挂载到目标目录
// 和容器的差别就在这里,容器的mountpath是容器的运行目录,如:/var/lib/kubelet/pods/146d22f8-721f-4a05-afd0-12bc8b67be41/volumes,我们是自定义用户数据目录
mountPath := "/home/demo"
err = csiClient.NodePublishVolume(
context.Background(),
volumeId,
false,
stageVolumePath,
mountPath,
api.ReadWriteMany,
map[string]string{},
volumeContext,
map[string]string{},
"",
[]string{},
nil,
)