谢绝转载
序言
终于看完了volumeservice的初始化,铺垫了这么多,就是为了看一下卷相关方法的调用,那就快进入正题吧:
注意事项:
1.本文共有四篇,每篇都有编号,编号类似1.2.1这种,其中1是文章编号,因为后面的调用关系需要去前面篇幅中找,所以我标注了这个方便寻找.
2.我是按调用过程列出代码,如果当前函数有多个地方需要讲解,比如函数1.2中有两个地方需要讲解,那么要展开的地方便是1.2.1,1.2.2这样排列.
3.链接:
第一篇:https://www.jianshu.com/p/9900ec52f2c1      (命令的调用流程)
第二篇:https://www.jianshu.com/p/db08b7d57721    (卷服务初始化)
第三篇:https://www.jianshu.com/p/bbc73f5687a2     (plugin的管理)
第四篇:https://www.jianshu.com/p/a92b1b11c8dd    (卷相关命令的执行)
卷相关命令的调用
4.1 initRouter函数
| path | func name | line number | 
|---|---|---|
| components/engine/cmd/dockerd/daemon.go | initRouter | 480 | 
func initRouter(opts routerOptions) {
    decoder := runconfig.ContainerDecoder{}
    routers := []router.Router{
        ...
        volume.NewRouter(opts.daemon.VolumesService()),
        ...
    }
...
}
而volume.NewRouter方法的定义如下:
4.2 NewRouter函数
| path | func name | line number | 
|---|---|---|
| components/engine/api/server/router/volume/volume.go | NewRouter | 12 | 
// NewRouter initializes a new volume router
func NewRouter(b Backend) router.Router {
    r := &volumeRouter{
        backend: b,
    }
    r.initRoutes()
    return r
}
继续看路由的初始化:
4.3 initRoutes方法
| path | func name | line number | 
|---|---|---|
| components/engine/api/server/router/volume/volume.go | initRoutes | 25 | 
func (r *volumeRouter) initRoutes() {
    r.routes = []router.Route{
        // GET
        router.NewGetRoute("/volumes", r.getVolumesList),
        router.NewGetRoute("/volumes/{name:.*}", r.getVolumeByName),
        // POST
        router.NewPostRoute("/volumes/create", r.postVolumesCreate),
        router.NewPostRoute("/volumes/prune", r.postVolumesPrune, router.WithCancel),
        // DELETE
        router.NewDeleteRoute("/volumes/{name:.*}", r.deleteVolumes),
    }
}
而具体的方法是通过传入的volumeservice来实现的,以List方法为例,即getVolumesList实际执行的是传入的volumeservice的List方法:
4.4 getVolumesList方法
| path | func name | line number | 
|---|---|---|
| components/engine/api/server/router/volume/volume_routes.go | getVolumesList | 17 | 
func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
    ...
    volumes, warnings, err := v.backend.List(ctx, filters)
    ...
}
4.5 List方法
我们就按最常规的方式,不指定具体的filter,看一下这个方法如何执行.
| path | func name | line number | 
|---|---|---|
| components/engine/volume/service/service.go | List | 226 | 
func (s *VolumesService) List(ctx context.Context, filter filters.Args) (volumesOut []*types.Volume, warnings []string, err error) {
    by, err := filtersToBy(filter, acceptedListFilters)
    if err != nil {
        return nil, nil, err
    }
    # 调用vs的find方法,vs是前面提到的VolumeStore
    volumes, warnings, err := s.vs.Find(ctx, by)
    if err != nil {
        return nil, nil, err
    }
    return s.volumesToAPI(ctx, volumes, useCachedPath(true)), warnings, nil
}
我们按filter为空的情况继续执行s.vs.Find方法,Find方法中调用到了前面提到的ds[],也就是存储driver的Store:
4.6 Find方法
| path | func name | line number | 
|---|---|---|
| components/engine/volume/service/store.go | Find | 321 | 
// Find lists volumes filtered by the past in filter.
// If a driver returns a volume that has name which conflicts with another volume from a different driver,
// the first volume is chosen and the conflicting volume is dropped.
func (s *VolumeStore) Find(ctx context.Context, by By) (vols []volume.Volume, warnings []string, err error) {
    logrus.WithField("ByType", fmt.Sprintf("%T", by)).WithField("ByValue", fmt.Sprintf("%+v", by)).Debug("VolumeStore.Find")
    switch f := by.(type) {
    case nil, orCombinator, andCombinator, byDriver, ByReferenced, CustomFilter:
        # 在filter中加载卷
        warnings, err = s.filter(ctx, &vols, by)
    ...
    }
    if err != nil {
        return nil, nil, &OpErr{Err: err, Op: "list"}
    }
    var out []volume.Volume
    for _, v := range vols {
        name := normalizeVolumeName(v.Name())
        s.locks.Lock(name)
        # 从内存中加载卷的名称
        storedV, exists := s.getNamed(name)
        // Note: it's not safe to populate the cache here because the volume may have been
        // deleted before we acquire a lock on its name
        # 如果存在并且driver不同,则保留第一个同名称的卷
        if exists && storedV.DriverName() != v.DriverName() {
            logrus.Warnf("Volume name %s already exists for driver %s, not including volume returned by %s", v.Name(), storedV.DriverName(), v.DriverName())
            s.locks.Unlock(v.Name())
            continue
        }
        out = append(out, v)
        s.locks.Unlock(v.Name())
    }
    return out, warnings, nil
}
上面的方法先加载卷,然后再跟缓存中的进行对比,如果名称相同而driver不同,则跳过第二个同名称的卷. 加载卷的过程如下:
4.7 filter方法
| path | func name | line number | 
|---|---|---|
| components/engine/volume/service/store.go | filter | 222 | 
func (s *VolumeStore) filter(ctx context.Context, vols *[]volume.Volume, by By) (warnings []string, err error) {
    // note that this specifically does not support the `FromList` By type.
    switch f := by.(type) {
    case nil:
        if *vols == nil {
            var ls []volume.Volume
            ls, warnings, err = s.list(ctx)
            if err != nil {
                return warnings, err
            }
            *vols = ls
        }
    ...
    }
    return warnings, nil
}
继续看s.list方法,这个方法就会调用到我们在自定义的卷plugin中实现的接口方法:
4.8 list方法
| path | func name | line number | 
|---|---|---|
| components/engine/volume/service/store.go | list | 376 | 
// list goes through each volume driver and asks for its list of volumes.
// TODO(@cpuguy83): plumb context through
func (s *VolumeStore) list(ctx context.Context, driverNames ...string) ([]volume.Volume, []string, error) {
    var (
        ls       = []volume.Volume{} // do not return a nil value as this affects filtering
        warnings []string
    )
    # volume.Driver是个接口
    var dls []volume.Driver
    # GetAllDrivers加载所有的VolumeDriver,返回值其实是一个driver的适配器
    # s.drivers也就是之前讲到的ds[2.3.2.2 章节]
    # 此处是重点
    all, err := s.drivers.GetAllDrivers()
    ...
    type vols struct {
        vols       []volume.Volume
        err        error
        driverName string
    }
    chVols := make(chan vols, len(dls))
    for _, vd := range dls {
        go func(d volume.Driver) {
            # 调用plugin中实现的List方法获取卷
            # 此处是重点
            vs, err := d.List()
            if err != nil {
                chVols <- vols{driverName: d.Name(), err: &OpErr{Err: err, Name: d.Name(), Op: "list"}}
                return
            }
            for i, v := range vs {
                s.globalLock.RLock()
                vs[i] = volumeWrapper{v, s.labels[v.Name()], d.Scope(), s.options[v.Name()]}
                s.globalLock.RUnlock()
            }
            chVols <- vols{vols: vs}
        }(vd)
    }
    ...
    return ls, warnings, nil
}
上面的s.drivers.GetAllDrivers方法加载所有注册过的drivers:
4.8.1 GetAllDrivers方法
| path | func name | line number | 
|---|---|---|
| components/engine/volume/drivers/extpoint.go | GetAllDrivers | 177 | 
// GetAllDrivers lists all the registered drivers
func (s *Store) GetAllDrivers() ([]volume.Driver, error) {
    var plugins []getter.CompatPlugin
    if s.pluginGetter != nil {
        var err error
        # 这里的store就是前面讲过的ds[2.3.2.3章节], ds的pluginGetter就是前面讲到的d.pluginStore
        # 过滤卷driver的名称 extName = "VolumeDriver"
        plugins, err = s.pluginGetter.GetAllByCap(extName)
        if err != nil {
            return nil, fmt.Errorf("error listing plugins: %v", err)
        }
    }
    # 此处的volume.Driver是个接口
    var ds []volume.Driver
    s.mu.Lock()
    defer s.mu.Unlock()
    #先加缓存过的drivers
    for _, d := range s.extensions {
        ds = append(ds, d)
    }
    #加载没有缓存过的driver
    for _, p := range plugins {
        name := p.Name()
        if _, ok := s.extensions[name]; ok {
            continue
        }
        # 用plugin创建对应的driver适配器,适配器中的方法调用实际driver的方法
        # 此处是重点
        ext, err := makePluginAdapter(p)
        if err != nil {
            return nil, errors.Wrap(err, "error making plugin client")
        }
        if p.IsV1() {
            s.extensions[name] = ext
        }
        ds = append(ds, ext)
    }
    return ds, nil
}
- 4.8.1.1 Driver接口
先看volume.Driver接口:
| path | interface name | line number | 
|---|---|---|
| components/engine/volume/volume.go | Driver | 19 | 
// Driver is for creating and removing volumes.
type Driver interface {
    // Name returns the name of the volume driver.
    Name() string
    // Create makes a new volume with the given name.
    Create(name string, opts map[string]string) (Volume, error)
    // Remove deletes the volume.
    Remove(vol Volume) (err error)
    // List lists all the volumes the driver has
    List() ([]Volume, error)
    // Get retrieves the volume with the requested name
    Get(name string) (Volume, error)
    // Scope returns the scope of the driver (e.g. `global` or `local`).
    // Scope determines how the driver is handled at a cluster level
    Scope() string
}
然后继续看makePluginAdapter方法
- 4.8.1.2 makePluginAdapter方法
可知返回值是volumeDriverAdapter,而volumeDriverAdapter结构体实现了volume.Driver接口:
| path | func name | line number | 
|---|---|---|
| components/engine/volume/drivers/extpoint.go | makePluginAdapter | 214 | 
func makePluginAdapter(p getter.CompatPlugin) (*volumeDriverAdapter, error) {
    # 如果是v1/http plugin client方式的plugin
    if pc, ok := p.(getter.PluginWithV1Client); ok {
        return &volumeDriverAdapter{name: p.Name(), scopePath: p.ScopedPath, proxy: &volumeDriverProxy{pc.Client()}}, nil
    }
    # 如果是PluginAddr方式的plugin
    pa, ok := p.(getter.PluginAddr)
    ...
    addr := pa.Addr()
    # 此处的client与上面的pc.Client()都是同一个类型,即*plugins.client
    client, err := plugins.NewClientWithTimeout(addr.Network()+"://"+addr.String(), nil, pa.Timeout())
    if err != nil {
        return nil, errors.Wrap(err, "error creating plugin client")
    }
    return &volumeDriverAdapter{name: p.Name(), scopePath: p.ScopedPath, proxy: &volumeDriverProxy{client}}, nil    ...
}
不管是哪种方式的plugin,都要构造一个volumeDriverAdapter,其成员变量proxy作为plugin的客户端.
- 4.8.1.3 volumeDriverAdapter结构体
volumeDriverAdapter结构体定义如下:
| path | struct name | line number | 
|---|---|---|
| components/engine/volume/drivers/adapter.go | volumeDriverAdapter | 16 | 
type volumeDriverAdapter struct {
    name         string
    scopePath    func(s string) string
    capabilities *volume.Capability
    proxy        volumeDriver
}
其中,proxy是volumeDriver接口类型
- 4.8.1.4 volumeDriver接口
| path | interface name | line number | 
|---|---|---|
| components/engine/volume/drivers/extpoint.go | volumeDriver | 25 | 
type volumeDriver interface {
    // Create a volume with the given name
    Create(name string, opts map[string]string) (err error)
    // Remove the volume with the given name
    Remove(name string) (err error)
    // Get the mountpoint of the given volume
    Path(name string) (mountpoint string, err error)
    // Mount the given volume and return the mountpoint
    Mount(name, id string) (mountpoint string, err error)
    // Unmount the given volume
    Unmount(name, id string) (err error)
    // List lists all the volumes known to the driver
    List() (volumes []*proxyVolume, err error)
    // Get retrieves the volume with the requested name
    Get(name string) (volume *proxyVolume, err error)
    // Capabilities gets the list of capabilities of the driver
    Capabilities() (capabilities volume.Capability, err error)
}
- 4.8.1.5 volumeDriverProxy结构体
新建volumeDriverAdapter的时候用volumeDriverProxy给成员变量proxy赋值,所以volumeDriverProxy中要实现volumeDriver的各种接口.
看下volumeDriverProxy的定义:
| path | struct name | line number | 
|---|---|---|
| components/engine/volume/drivers/proxy.go | volumeDriverProxy | 22 | 
type volumeDriverProxy struct {
    client
}
匿名成员client是一个接口:
- 4.8.1.6 client接口
| path | interface name | line number | 
|---|---|---|
| components/engine/volume/drivers/proxy.go | client | 18 | 
type client interface {
    CallWithOptions(string, interface{}, interface{}, ...func(*plugins.RequestOpts)) error
}
- 4.8.1.7 CallWithOptions方法
 这个接口的实现即是2.2.5.3中的Client结构体,再看Client结构体的CallWithOptions方法的定义:
| path | func name | line number | 
|---|---|---|
| components/engine/pkg/plugins/client.go | CallWithOptions | 106 | 
// CallWithOptions is just like call except it takes options
func (c *Client) CallWithOptions(serviceMethod string, args interface{}, ret interface{}, opts ...func(*RequestOpts)) error {
    var buf bytes.Buffer
    if args != nil {
        if err := json.NewEncoder(&buf).Encode(args); err != nil {
            return err
        }
    }
    body, err := c.callWithRetry(serviceMethod, &buf, true, opts...)
    if err != nil {
        return err
    }
    defer body.Close()
    if ret != nil {
        if err := json.NewDecoder(body).Decode(&ret); err != nil {
            logrus.Errorf("%s: error reading plugin resp: %v", serviceMethod, err)
            return err
        }
    }
    return nil
}
主要作用就是根据传入的serviceMethod调用对应的plugin中的方法.
4.8.2 List 方法
再回到4.8章节,去看d.List()方法调用,实际上就是volumeDriverAdapter的List方法
| path | func name | line number | 
|---|---|---|
| components/engine/volume/drivers/adapter.go | List | 43 | 
func (a *volumeDriverAdapter) List() ([]volume.Volume, error) {
    ls, err := a.proxy.List()
    if err != nil {
        return nil, err
    }
    var out []volume.Volume
    for _, vp := range ls {
        out = append(out, &volumeAdapter{
            proxy:      a.proxy,
            name:       vp.Name,
            scopePath:  a.scopePath,
            driverName: a.name,
            eMount:     a.scopePath(vp.Mountpoint),
        })
    }
    return out, nil
}
a.proxy.List()就调用到具体的你实现的plugin的List方法了.
- 4.8.2.1 List方法
| path | func name | line number | 
|---|---|---|
| components/engine/volume/drivers/proxy.go | List | 181 | 
func (pp *volumeDriverProxy) List() (volumes []*proxyVolume, err error) {
    var (
        req volumeDriverProxyListRequest
        ret volumeDriverProxyListResponse
    )
    if err = pp.CallWithOptions("VolumeDriver.List", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil {
        return
    }
    volumes = ret.Volumes
    if ret.Err != "" {
        err = errors.New(ret.Err)
    }
    return
}
如果你使用过go-plugins-helpers的话,你应该熟悉下面的Handle方法:
| path | 
|---|
| volume/api.go | 
const (
    // DefaultDockerRootDirectory is the default directory where volumes will be created.
    DefaultDockerRootDirectory = "/var/lib/docker-volumes"
    manifest         = `{"Implements": ["VolumeDriver"]}`
    listPath         = "/VolumeDriver.List"
    ...
)
func (h *Handler) initMux() {
    ...
    h.HandleFunc(listPath, func(w http.ResponseWriter, r *http.Request) {
        log.Println("Entering go-plugins-helpers listPath")
        res, err := h.driver.List()
        if err != nil {
            sdk.EncodeResponse(w, NewErrorResponse(err.Error()), true)
            return
        }
        sdk.EncodeResponse(w, res, false)
    })
    ...
}
总结
以上就是volumeService的初始化以及加载对应docker volume plugin的过程,要弄懂的话根据这个流程看一遍,基本就明白了.所有的东西都是为了把源码中卷相关的操作和我们自己实现的go-plugins-helpers中的接口方法连接起来.