谢绝转载
序言
终于看完了volumeservice的初始化,铺垫了这么多,就是为了看一下卷相关方法的调用,那就快进入正题吧:
注意事项:
1.本文共有四篇,每篇都有编号,编号类似1.2.1这种,其中1是文章编号,因为后面的调用关系需要去前面篇幅中找,所以我标注了这个方便寻找.
2.我是按调用过程列出代码,如果当前函数有多个地方需要讲解,比如函数1.2中有两个地方需要讲解,那么要展开的地方便是1.2.1,1.2.2这样排列.
3.链接:
第一篇:https://www.jianshu.com/p/9900ec52f2c1 (命令的调用流程)
第二篇:https://www.jianshu.com/p/db08b7d57721 (卷服务初始化)
第三篇:https://www.jianshu.com/p/bbc73f5687a2 (plugin的管理)
第四篇:https://www.jianshu.com/p/a92b1b11c8dd (卷相关命令的执行)
卷相关命令的调用
4.1 initRouter函数
path | func name | line number |
---|---|---|
components/engine/cmd/dockerd/daemon.go | initRouter | 480 |
func initRouter(opts routerOptions) {
decoder := runconfig.ContainerDecoder{}
routers := []router.Router{
...
volume.NewRouter(opts.daemon.VolumesService()),
...
}
...
}
而volume.NewRouter方法的定义如下:
4.2 NewRouter函数
path | func name | line number |
---|---|---|
components/engine/api/server/router/volume/volume.go | NewRouter | 12 |
// NewRouter initializes a new volume router
func NewRouter(b Backend) router.Router {
r := &volumeRouter{
backend: b,
}
r.initRoutes()
return r
}
继续看路由的初始化:
4.3 initRoutes方法
path | func name | line number |
---|---|---|
components/engine/api/server/router/volume/volume.go | initRoutes | 25 |
func (r *volumeRouter) initRoutes() {
r.routes = []router.Route{
// GET
router.NewGetRoute("/volumes", r.getVolumesList),
router.NewGetRoute("/volumes/{name:.*}", r.getVolumeByName),
// POST
router.NewPostRoute("/volumes/create", r.postVolumesCreate),
router.NewPostRoute("/volumes/prune", r.postVolumesPrune, router.WithCancel),
// DELETE
router.NewDeleteRoute("/volumes/{name:.*}", r.deleteVolumes),
}
}
而具体的方法是通过传入的volumeservice来实现的,以List方法为例,即getVolumesList实际执行的是传入的volumeservice的List方法:
4.4 getVolumesList方法
path | func name | line number |
---|---|---|
components/engine/api/server/router/volume/volume_routes.go | getVolumesList | 17 |
func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
...
volumes, warnings, err := v.backend.List(ctx, filters)
...
}
4.5 List方法
我们就按最常规的方式,不指定具体的filter,看一下这个方法如何执行.
path | func name | line number |
---|---|---|
components/engine/volume/service/service.go | List | 226 |
func (s *VolumesService) List(ctx context.Context, filter filters.Args) (volumesOut []*types.Volume, warnings []string, err error) {
by, err := filtersToBy(filter, acceptedListFilters)
if err != nil {
return nil, nil, err
}
# 调用vs的find方法,vs是前面提到的VolumeStore
volumes, warnings, err := s.vs.Find(ctx, by)
if err != nil {
return nil, nil, err
}
return s.volumesToAPI(ctx, volumes, useCachedPath(true)), warnings, nil
}
我们按filter为空的情况继续执行s.vs.Find方法,Find方法中调用到了前面提到的ds[],也就是存储driver的Store:
4.6 Find方法
path | func name | line number |
---|---|---|
components/engine/volume/service/store.go | Find | 321 |
// Find lists volumes filtered by the past in filter.
// If a driver returns a volume that has name which conflicts with another volume from a different driver,
// the first volume is chosen and the conflicting volume is dropped.
func (s *VolumeStore) Find(ctx context.Context, by By) (vols []volume.Volume, warnings []string, err error) {
logrus.WithField("ByType", fmt.Sprintf("%T", by)).WithField("ByValue", fmt.Sprintf("%+v", by)).Debug("VolumeStore.Find")
switch f := by.(type) {
case nil, orCombinator, andCombinator, byDriver, ByReferenced, CustomFilter:
# 在filter中加载卷
warnings, err = s.filter(ctx, &vols, by)
...
}
if err != nil {
return nil, nil, &OpErr{Err: err, Op: "list"}
}
var out []volume.Volume
for _, v := range vols {
name := normalizeVolumeName(v.Name())
s.locks.Lock(name)
# 从内存中加载卷的名称
storedV, exists := s.getNamed(name)
// Note: it's not safe to populate the cache here because the volume may have been
// deleted before we acquire a lock on its name
# 如果存在并且driver不同,则保留第一个同名称的卷
if exists && storedV.DriverName() != v.DriverName() {
logrus.Warnf("Volume name %s already exists for driver %s, not including volume returned by %s", v.Name(), storedV.DriverName(), v.DriverName())
s.locks.Unlock(v.Name())
continue
}
out = append(out, v)
s.locks.Unlock(v.Name())
}
return out, warnings, nil
}
上面的方法先加载卷,然后再跟缓存中的进行对比,如果名称相同而driver不同,则跳过第二个同名称的卷. 加载卷的过程如下:
4.7 filter方法
path | func name | line number |
---|---|---|
components/engine/volume/service/store.go | filter | 222 |
func (s *VolumeStore) filter(ctx context.Context, vols *[]volume.Volume, by By) (warnings []string, err error) {
// note that this specifically does not support the `FromList` By type.
switch f := by.(type) {
case nil:
if *vols == nil {
var ls []volume.Volume
ls, warnings, err = s.list(ctx)
if err != nil {
return warnings, err
}
*vols = ls
}
...
}
return warnings, nil
}
继续看s.list方法,这个方法就会调用到我们在自定义的卷plugin中实现的接口方法:
4.8 list方法
path | func name | line number |
---|---|---|
components/engine/volume/service/store.go | list | 376 |
// list goes through each volume driver and asks for its list of volumes.
// TODO(@cpuguy83): plumb context through
func (s *VolumeStore) list(ctx context.Context, driverNames ...string) ([]volume.Volume, []string, error) {
var (
ls = []volume.Volume{} // do not return a nil value as this affects filtering
warnings []string
)
# volume.Driver是个接口
var dls []volume.Driver
# GetAllDrivers加载所有的VolumeDriver,返回值其实是一个driver的适配器
# s.drivers也就是之前讲到的ds[2.3.2.2 章节]
# 此处是重点
all, err := s.drivers.GetAllDrivers()
...
type vols struct {
vols []volume.Volume
err error
driverName string
}
chVols := make(chan vols, len(dls))
for _, vd := range dls {
go func(d volume.Driver) {
# 调用plugin中实现的List方法获取卷
# 此处是重点
vs, err := d.List()
if err != nil {
chVols <- vols{driverName: d.Name(), err: &OpErr{Err: err, Name: d.Name(), Op: "list"}}
return
}
for i, v := range vs {
s.globalLock.RLock()
vs[i] = volumeWrapper{v, s.labels[v.Name()], d.Scope(), s.options[v.Name()]}
s.globalLock.RUnlock()
}
chVols <- vols{vols: vs}
}(vd)
}
...
return ls, warnings, nil
}
上面的s.drivers.GetAllDrivers方法加载所有注册过的drivers:
4.8.1 GetAllDrivers方法
path | func name | line number |
---|---|---|
components/engine/volume/drivers/extpoint.go | GetAllDrivers | 177 |
// GetAllDrivers lists all the registered drivers
func (s *Store) GetAllDrivers() ([]volume.Driver, error) {
var plugins []getter.CompatPlugin
if s.pluginGetter != nil {
var err error
# 这里的store就是前面讲过的ds[2.3.2.3章节], ds的pluginGetter就是前面讲到的d.pluginStore
# 过滤卷driver的名称 extName = "VolumeDriver"
plugins, err = s.pluginGetter.GetAllByCap(extName)
if err != nil {
return nil, fmt.Errorf("error listing plugins: %v", err)
}
}
# 此处的volume.Driver是个接口
var ds []volume.Driver
s.mu.Lock()
defer s.mu.Unlock()
#先加缓存过的drivers
for _, d := range s.extensions {
ds = append(ds, d)
}
#加载没有缓存过的driver
for _, p := range plugins {
name := p.Name()
if _, ok := s.extensions[name]; ok {
continue
}
# 用plugin创建对应的driver适配器,适配器中的方法调用实际driver的方法
# 此处是重点
ext, err := makePluginAdapter(p)
if err != nil {
return nil, errors.Wrap(err, "error making plugin client")
}
if p.IsV1() {
s.extensions[name] = ext
}
ds = append(ds, ext)
}
return ds, nil
}
- 4.8.1.1 Driver接口
先看volume.Driver接口:
path | interface name | line number |
---|---|---|
components/engine/volume/volume.go | Driver | 19 |
// Driver is for creating and removing volumes.
type Driver interface {
// Name returns the name of the volume driver.
Name() string
// Create makes a new volume with the given name.
Create(name string, opts map[string]string) (Volume, error)
// Remove deletes the volume.
Remove(vol Volume) (err error)
// List lists all the volumes the driver has
List() ([]Volume, error)
// Get retrieves the volume with the requested name
Get(name string) (Volume, error)
// Scope returns the scope of the driver (e.g. `global` or `local`).
// Scope determines how the driver is handled at a cluster level
Scope() string
}
然后继续看makePluginAdapter方法
- 4.8.1.2 makePluginAdapter方法
可知返回值是volumeDriverAdapter,而volumeDriverAdapter结构体实现了volume.Driver接口:
path | func name | line number |
---|---|---|
components/engine/volume/drivers/extpoint.go | makePluginAdapter | 214 |
func makePluginAdapter(p getter.CompatPlugin) (*volumeDriverAdapter, error) {
# 如果是v1/http plugin client方式的plugin
if pc, ok := p.(getter.PluginWithV1Client); ok {
return &volumeDriverAdapter{name: p.Name(), scopePath: p.ScopedPath, proxy: &volumeDriverProxy{pc.Client()}}, nil
}
# 如果是PluginAddr方式的plugin
pa, ok := p.(getter.PluginAddr)
...
addr := pa.Addr()
# 此处的client与上面的pc.Client()都是同一个类型,即*plugins.client
client, err := plugins.NewClientWithTimeout(addr.Network()+"://"+addr.String(), nil, pa.Timeout())
if err != nil {
return nil, errors.Wrap(err, "error creating plugin client")
}
return &volumeDriverAdapter{name: p.Name(), scopePath: p.ScopedPath, proxy: &volumeDriverProxy{client}}, nil ...
}
不管是哪种方式的plugin,都要构造一个volumeDriverAdapter,其成员变量proxy作为plugin的客户端.
- 4.8.1.3 volumeDriverAdapter结构体
volumeDriverAdapter结构体定义如下:
path | struct name | line number |
---|---|---|
components/engine/volume/drivers/adapter.go | volumeDriverAdapter | 16 |
type volumeDriverAdapter struct {
name string
scopePath func(s string) string
capabilities *volume.Capability
proxy volumeDriver
}
其中,proxy是volumeDriver接口类型
- 4.8.1.4 volumeDriver接口
path | interface name | line number |
---|---|---|
components/engine/volume/drivers/extpoint.go | volumeDriver | 25 |
type volumeDriver interface {
// Create a volume with the given name
Create(name string, opts map[string]string) (err error)
// Remove the volume with the given name
Remove(name string) (err error)
// Get the mountpoint of the given volume
Path(name string) (mountpoint string, err error)
// Mount the given volume and return the mountpoint
Mount(name, id string) (mountpoint string, err error)
// Unmount the given volume
Unmount(name, id string) (err error)
// List lists all the volumes known to the driver
List() (volumes []*proxyVolume, err error)
// Get retrieves the volume with the requested name
Get(name string) (volume *proxyVolume, err error)
// Capabilities gets the list of capabilities of the driver
Capabilities() (capabilities volume.Capability, err error)
}
- 4.8.1.5 volumeDriverProxy结构体
新建volumeDriverAdapter的时候用volumeDriverProxy给成员变量proxy赋值,所以volumeDriverProxy中要实现volumeDriver的各种接口.
看下volumeDriverProxy的定义:
path | struct name | line number |
---|---|---|
components/engine/volume/drivers/proxy.go | volumeDriverProxy | 22 |
type volumeDriverProxy struct {
client
}
匿名成员client是一个接口:
- 4.8.1.6 client接口
path | interface name | line number |
---|---|---|
components/engine/volume/drivers/proxy.go | client | 18 |
type client interface {
CallWithOptions(string, interface{}, interface{}, ...func(*plugins.RequestOpts)) error
}
- 4.8.1.7 CallWithOptions方法
这个接口的实现即是2.2.5.3中的Client结构体,再看Client结构体的CallWithOptions方法的定义:
path | func name | line number |
---|---|---|
components/engine/pkg/plugins/client.go | CallWithOptions | 106 |
// CallWithOptions is just like call except it takes options
func (c *Client) CallWithOptions(serviceMethod string, args interface{}, ret interface{}, opts ...func(*RequestOpts)) error {
var buf bytes.Buffer
if args != nil {
if err := json.NewEncoder(&buf).Encode(args); err != nil {
return err
}
}
body, err := c.callWithRetry(serviceMethod, &buf, true, opts...)
if err != nil {
return err
}
defer body.Close()
if ret != nil {
if err := json.NewDecoder(body).Decode(&ret); err != nil {
logrus.Errorf("%s: error reading plugin resp: %v", serviceMethod, err)
return err
}
}
return nil
}
主要作用就是根据传入的serviceMethod调用对应的plugin中的方法.
4.8.2 List 方法
再回到4.8章节,去看d.List()方法调用,实际上就是volumeDriverAdapter的List方法
path | func name | line number |
---|---|---|
components/engine/volume/drivers/adapter.go | List | 43 |
func (a *volumeDriverAdapter) List() ([]volume.Volume, error) {
ls, err := a.proxy.List()
if err != nil {
return nil, err
}
var out []volume.Volume
for _, vp := range ls {
out = append(out, &volumeAdapter{
proxy: a.proxy,
name: vp.Name,
scopePath: a.scopePath,
driverName: a.name,
eMount: a.scopePath(vp.Mountpoint),
})
}
return out, nil
}
a.proxy.List()就调用到具体的你实现的plugin的List方法了.
- 4.8.2.1 List方法
path | func name | line number |
---|---|---|
components/engine/volume/drivers/proxy.go | List | 181 |
func (pp *volumeDriverProxy) List() (volumes []*proxyVolume, err error) {
var (
req volumeDriverProxyListRequest
ret volumeDriverProxyListResponse
)
if err = pp.CallWithOptions("VolumeDriver.List", req, &ret, plugins.WithRequestTimeout(shortTimeout)); err != nil {
return
}
volumes = ret.Volumes
if ret.Err != "" {
err = errors.New(ret.Err)
}
return
}
如果你使用过go-plugins-helpers的话,你应该熟悉下面的Handle方法:
path |
---|
volume/api.go |
const (
// DefaultDockerRootDirectory is the default directory where volumes will be created.
DefaultDockerRootDirectory = "/var/lib/docker-volumes"
manifest = `{"Implements": ["VolumeDriver"]}`
listPath = "/VolumeDriver.List"
...
)
func (h *Handler) initMux() {
...
h.HandleFunc(listPath, func(w http.ResponseWriter, r *http.Request) {
log.Println("Entering go-plugins-helpers listPath")
res, err := h.driver.List()
if err != nil {
sdk.EncodeResponse(w, NewErrorResponse(err.Error()), true)
return
}
sdk.EncodeResponse(w, res, false)
})
...
}
总结
以上就是volumeService的初始化以及加载对应docker volume plugin的过程,要弄懂的话根据这个流程看一遍,基本就明白了.所有的东西都是为了把源码中卷相关的操作和我们自己实现的go-plugins-helpers中的接口方法连接起来.