开发一个容器网络插件可以很简单,也可以很复杂。其中必需实现的部分有两个:
- cni实现,对接容器运行时,创建pod时将pod接入容器网络;
- 主机互联实现,没有容器网络插件只运行在单机上的,pod之间如何跨主机互通时必须实现的功能。
其他可选部分:
- Service,出于性能等方面的考虑部分网络方案会重写Service实现来替换k8s的kube-proxy,如cilium、calico ebpf datapath、kube-ovn等;
- Networkpolicy,网络策略;
- QoS,如带宽管理、Traffic priority;
- 可视化,如cilium hubble。
cni plugin
k8s pod的概念不同于容器,pod是k8s的最小调度单位,而不是容器,pod包含多个容器。Pod 的实现需要使用一个中间容器,这个容器叫作 Infra 容器。在这个 Pod 中,Infra 容器永远都是第一个被创建的容器,而其他用户定义的容器,则通过 Join Network Namespace 的方式,与 Infra 容器关联在一起。这样的组织关系,可以用下面这样一个示意图来表达:
创建一个 Pod 的第一步,就是创建并启动一个 Infra 容器,用来“hold”住这个 Pod 的 Network Namespace。CNI 的设计思想,就是:Kubernetes 在启动 Infra 容器之后,就可以直接调用 CNI 网络插件,为这个 Infra 容器的 Network Namespace,配置符合预期的网络栈。
infra容器又叫 pause 容器,通过docker命令查看容器时可以看到如下信息,就是pause容器。
🐳 → docker ps
CONTAINER ID IMAGE COMMAND ...
3b45e983c859 gcr.io/google_containers/pause-amd64:3.1 “/pause”
dbfc35b00062 gcr.io/google_containers/pause-amd64:3.1 “/pause”
c4e998ec4d5d gcr.io/google_containers/pause-amd64:3.1 “/pause”
508102acf1e7 gcr.io/google_containers/pause-amd64:3.1 “/pause”
另外k8s的代码中,创建一个pod时,会通过cri 创建一个Sandbox,这个东西本质上也是infra容器/pause容器,其被当作 Pod 中所有容器的“父容器”并为每个业务容器提供以下功能:
- 在 Pod 中它作为共享 Linux Namespace(Network、UTS 等)的基础;
- 启用 PID Namespace 共享,它为每个 Pod 提供 1 号进程,并收集 Pod 内的僵尸进程。
cni 插件和配置文件
cni插件二进制文件保存在/opt/cni/bin/下:
root@master:~# ls /opt/cni/bin/
bandwidth bridge cilium-cni dhcp dummy firewall flannel host-device host-local ipvlan loopback macvlan portmap ptp sbr static tuning vlan vrf
这些 CNI 的基础可执行文件,按照功能可以分为三类:
- 第一类,叫作 Main 插件,它是用来创建具体网络设备的二进制文件。比如,bridge(网桥设备)、ipvlan、loopback(lo 设备)、macvlan、ptp(Veth Pair 设备),以及 vlan。我在前面提到过的 Flannel、Weave 等项目,都属于“网桥”类型的 CNI 插件。所以在具体的实现中,它们往往会调用 bridge 这个二进制文件。这个流程,我马上就会详细介绍到。
- 第二类,叫作 IPAM(IP Address Management)插件,它是负责分配 IP 地址的二进制文件。比如,dhcp,这个文件会向 DHCP 服务器发起请求;host-local,则会使用预先配置的 IP 地址段来进行分配;static用于为容器分配静态的IP地址,主要是调试使用。
- 第三类,是由 CNI 社区维护的内置 CNI 插件。比如:cilium-cni 、flannel 是cilium和flannel容器网络方案的cni插件;tuning,是一个通过 sysctl 调整网络设备参数的二进制文件;portmap,是一个通过 iptables 配置端口映射的二进制文件;bandwidth,是一个使用 Token Bucket Filter (TBF) 来进行限流的二进制文件。
cni配置文件保存在 /etc/cni/net.d/ 下,如下存在cilium和flannel两个配置文件,由于中间使用cilium替换了flannel,所以可以看到flannel的配置文件被bak掉了:
root@master:~# ls /etc/cni/net.d/
05-cilium.conf 10-flannel.conflist.cilium_bak
root@master:~# cat /etc/cni/net.d//05-cilium.conf
{
"cniVersion": "0.3.1",
"name": "cilium",
"type": "cilium-cni",
"enable-debug": false,
"log-file": "/var/run/cilium/cilium-cni.log"
}
root@master:~# cat /etc/cni/net.d/10-flannel.conflist.cilium_bak
{
"name": "cbr0",
"cniVersion": "0.3.1",
"plugins": [
{
"type": "flannel",
"delegate": {
"hairpinMode": true,
"isDefaultGateway": true
}
},
{
"type": "portmap",
"capabilities": {
"portMappings": true
}
}
]
}
cni 插件的工作原理
在 Kubernetes 中,处理容器网络相关的逻辑不在 kubelet 主干代码里执行,而是会在具体的 CRI(Container Runtime Interface,容器运行时接口)实现里完成。对于 Docker 项目来说,它的 CRI 实现叫作 dockershim,你可以在 kubelet 的代码里找到它。
在看代码之前,先了解一下cni 接口的实现方式,不同于http restful或者gRPC实现组件间的通讯接口,它是对可执行程序(CNI插件)的调用(exec)。由容器运行时负责执行CNI插件,并通过环境变量传递运行时信息,通过CNI插件的标准输入(stdin)来传递配置文件信息,通过标准输出(stdout)接收插件的执行结果。代码中的实现就是组织配置文件、环境变量,调用cni插件二进制文件,处理返回结果的流程。
举一个直观的例子,假如我们要调用bridge插件将容器接入到主机网桥,则调用的命令看起来长这样:
# CNI_COMMAND=ADD 顾名思义表示创建。
# XXX=XXX 其他参数定义见下文。
# < config.json 表示从标准输入传递配置文件
CNI_COMMAND=ADD XXX=XXX ./bridge < config.json
上面讲到,创建一个pod时,会通过cri 创建一个Sandbox,创建Sandbox的过程中会调用
kubeGenericRuntimeManager.SyncPod -->
kubeGenericRuntimeManager.createPodSandbox -->
PodSandboxManager.RunPodSandbox -->
cniNetworkPlugin.SetUpPod
创建容器由cri完成,对于docker而言,运行时是 dockershim,代码在 pkg/kubelet/dockershim 。上面代码中的runtimeService对象就是dockerService对象,调用 dockerService.RunPodSandbox()。
注: k8s 1.24版本之后dockershim被移除,在 cri-dockerd 中单独维护。这里使用的是1.23版本。
func (m *kubeGenericRuntimeManager) createPodSandbox(ctx context.Context, pod *v1.Pod, attempt uint32) (string, string, error) {
......
// #
podSandBoxID, err := m.runtimeService.RunPodSandbox(ctx, podSandboxConfig, runtimeHandler)
......
return podSandBoxID, "", nil
}
func (ds *dockerService) RunPodSandbox(ctx context.Context, r *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) {
......
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations, networkOptions)
......
}
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations, options map[string]string) error {
if err := plugin.checkInitialized(); err != nil {
return err
}
netnsPath, err := plugin.host.GetNetNS(id.ID)
if err != nil {
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
}
// Todo get the timeout from parent ctx
cniTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), network.CNITimeoutSec*time.Second)
defer cancelFunc()
// Windows doesn't have loNetwork. It comes only with Linux
if plugin.loNetwork != nil {
if _, err = plugin.addToNetwork(cniTimeoutCtx, plugin.loNetwork, name, namespace, id, netnsPath, annotations, options); err != nil {
return err
}
}
_, err = plugin.addToNetwork(cniTimeoutCtx, plugin.getDefaultNetwork(), name, namespace, id, netnsPath, annotations, options)
return err
}
只关注调用cni的流程发,上面代码调用 plugin.addToNetwork 将pod加入到容器网络,容器网络通过plugin.getDefaultNetwork()获取的默认容器,默认容器网络是如何获取的:
/*
*/
type cniNetwork struct {
name string
NetworkConfig *libcni.NetworkConfigList
CNIConfig libcni.CNI
Capabilities []string
}
type NetworkConfig struct {
Network *types.NetConf
Bytes []byte
}
type NetworkConfigList struct {
Name string
CNIVersion string
DisableCheck bool
Plugins []*NetworkConfig
Bytes []byte
}
type NetConf struct {
CNIVersion string `json:"cniVersion,omitempty"`
Name string `json:"name,omitempty"`
Type string `json:"type,omitempty"`
Capabilities map[string]bool `json:"capabilities,omitempty"`
IPAM IPAM `json:"ipam,omitempty"`
DNS DNS `json:"dns"`
RawPrevResult map[string]interface{} `json:"prevResult,omitempty"`
PrevResult Result `json:"-"`
}
func getDefaultCNINetwork(confDir string, binDirs []string) (*cniNetwork, error) {
files, err := libcni.ConfFiles(confDir, []string{".conf", ".conflist", ".json"})
switch {
case err != nil:
return nil, err
case len(files) == 0:
return nil, fmt.Errorf("no networks found in %s", confDir)
}
cniConfig := &libcni.CNIConfig{Path: binDirs}
sort.Strings(files)
for _, confFile := range files {
var confList *libcni.NetworkConfigList
if strings.HasSuffix(confFile, ".conflist") {
confList, err = libcni.ConfListFromFile(confFile)
if err != nil {
klog.InfoS("Error loading CNI config list file", "path", confFile, "err", err)
continue
}
} else {
conf, err := libcni.ConfFromFile(confFile)
if err != nil {
klog.InfoS("Error loading CNI config file", "path", confFile, "err", err)
continue
}
// Ensure the config has a "type" so we know what plugin to run.
// Also catches the case where somebody put a conflist into a conf file.
if conf.Network.Type == "" {
klog.InfoS("Error loading CNI config file: no 'type'; perhaps this is a .conflist?", "path", confFile)
continue
}
confList, err = libcni.ConfListFromConf(conf)
if err != nil {
klog.InfoS("Error converting CNI config file to list", "path", confFile, "err", err)
continue
}
}
if len(confList.Plugins) == 0 {
klog.InfoS("CNI config list has no networks, skipping", "configList", string(confList.Bytes[:maxStringLengthInLog(len(confList.Bytes))]))
continue
}
// Before using this CNI config, we have to validate it to make sure that
// all plugins of this config exist on disk
caps, err := cniConfig.ValidateNetworkList(context.TODO(), confList)
if err != nil {
klog.InfoS("Error validating CNI config list", "configList", string(confList.Bytes[:maxStringLengthInLog(len(confList.Bytes))]), "err", err)
continue
}
klog.V(4).InfoS("Using CNI configuration file", "path", confFile)
return &cniNetwork{
name: confList.Name,
NetworkConfig: confList,
CNIConfig: cniConfig,
Capabilities: caps,
}, nil
}
return nil, fmt.Errorf("no valid networks found in %s", confDir)
}
遍历/etc/cni/net.d/目录下,".conf", ".conflist", ".json"后缀的配置文件,使用第一个合法有效的插件类。目录下的配置文件使用数字为前缀就是为了排序。有效性检查要求配置文件中包含必须的字段,且插件的“type”必须在/opt/cni/bin/下存在才能使用。
除了网络插件配置之外,"Adding pod to network"之前,还需要封装一个运行时配置libcni.RuntimeConf,包含Pod的信息、容器的信息、容器中应用对外暴露的端口信息、通过annotations声明的pod出入向带宽信息、节点PodCIDR信息、DNS信息等。这些都是Pod的配置或运行时信息,跟网络插件无关。
### docker 运行时情况下,容器id 和 容器 netnspath
contid=$(docker run -d --net=none --name nginx nginx) # 容器ID
pid=$(docker inspect -f '{{ .State.Pid }}' $contid) # 容器进程ID
netnspath=/proc/$pid/ns/net # 命名空间路径
type RuntimeConf struct {
ContainerID string // podSandboxID.ID
NetNS string // podNetnsPath
IfName string // network.DefaultInterfaceName
Args [][2]string //
// A dictionary of capability-specific data passed by the runtime
// to plugins as top-level keys in the 'runtimeConfig' dictionary
// of the plugin's stdin data. libcni will ensure that only keys
// in this map which match the capabilities of the plugin are passed
// to the plugin
CapabilityArgs map[string]interface{}
// DEPRECATED. Will be removed in a future release.
CacheDir string
}
func (plugin *cniNetworkPlugin) buildCNIRuntimeConf(podName string, podNs string, podSandboxID kubecontainer.ContainerID, podNetnsPath string, annotations, options map[string]string) (*libcni.RuntimeConf, error) {
rt := &libcni.RuntimeConf{
ContainerID: podSandboxID.ID,
NetNS: podNetnsPath,
IfName: network.DefaultInterfaceName,
CacheDir: plugin.cacheDir,
Args: [][2]string{
{"IgnoreUnknown", "1"},
{"K8S_POD_NAMESPACE", podNs},
{"K8S_POD_NAME", podName},
{"K8S_POD_INFRA_CONTAINER_ID", podSandboxID.ID},
},
}
// port mappings are a cni capability-based args, rather than parameters
// to a specific plugin
portMappings, err := plugin.host.GetPodPortMappings(podSandboxID.ID)
if err != nil {
return nil, fmt.Errorf("could not retrieve port mappings: %v", err)
}
portMappingsParam := make([]cniPortMapping, 0, len(portMappings))
for _, p := range portMappings {
if p.HostPort <= 0 {
continue
}
portMappingsParam = append(portMappingsParam, cniPortMapping{
HostPort: p.HostPort,
ContainerPort: p.ContainerPort,
Protocol: strings.ToLower(string(p.Protocol)),
HostIP: p.HostIP,
})
}
rt.CapabilityArgs = map[string]interface{}{
portMappingsCapability: portMappingsParam,
}
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations)
if err != nil {
return nil, fmt.Errorf("failed to get pod bandwidth from annotations: %v", err)
}
if ingress != nil || egress != nil {
bandwidthParam := cniBandwidthEntry{}
if ingress != nil {
// see: https://github.com/containernetworking/cni/blob/master/CONVENTIONS.md and
// https://github.com/containernetworking/plugins/blob/master/plugins/meta/bandwidth/README.md
// Rates are in bits per second, burst values are in bits.
bandwidthParam.IngressRate = int(ingress.Value())
// Limit IngressBurst to math.MaxInt32, in practice limiting to 2Gbit is the equivalent of setting no limit
bandwidthParam.IngressBurst = math.MaxInt32
}
if egress != nil {
bandwidthParam.EgressRate = int(egress.Value())
// Limit EgressBurst to math.MaxInt32, in practice limiting to 2Gbit is the equivalent of setting no limit
bandwidthParam.EgressBurst = math.MaxInt32
}
rt.CapabilityArgs[bandwidthCapability] = bandwidthParam
}
// Set the PodCIDR
rt.CapabilityArgs[ipRangesCapability] = [][]cniIPRange{{{Subnet: plugin.podCidr}}}
// Set dns capability args.
if dnsOptions, ok := options["dns"]; ok {
dnsConfig := runtimeapi.DNSConfig{}
err := json.Unmarshal([]byte(dnsOptions), &dnsConfig)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal dns config %q: %v", dnsOptions, err)
}
if dnsParam := buildDNSCapabilities(&dnsConfig); dnsParam != nil {
rt.CapabilityArgs[dnsCapability] = *dnsParam
}
}
return rt, nil
}
func (plugin *cniNetworkPlugin) addToNetwork(ctx context.Context, network *cniNetwork, podName string, podNamespace string, podSandboxID kubecontainer.ContainerID, podNetnsPath string, annotations, options map[string]string) (cnitypes.Result, error) {
rt, err := plugin.buildCNIRuntimeConf(podName, podNamespace, podSandboxID, podNetnsPath, annotations, options)
if err != nil {
klog.ErrorS(err, "Error adding network when building cni runtime conf")
return nil, err
}
netConf, cniNet := network.NetworkConfig, network.CNIConfig
klog.V(4).InfoS("Adding pod to network", "pod", klog.KRef(podNamespace, podName), "podSandboxID", podSandboxID, "podNetnsPath", podNetnsPath, "networkType", netConf.Plugins[0].Network.Type, "networkName", netConf.Name)
res, err := cniNet.AddNetworkList(ctx, netConf, rt)
if err != nil {
klog.ErrorS(err, "Error adding pod to network", "pod", klog.KRef(podNamespace, podName), "podSandboxID", podSandboxID, "podNetnsPath", podNetnsPath, "networkType", netConf.Plugins[0].Network.Type, "networkName", netConf.Name)
return nil, err
}
klog.V(4).InfoS("Added pod to network", "pod", klog.KRef(podNamespace, podName), "podSandboxID", podSandboxID, "networkName", netConf.Name, "response", res)
return res, nil
}
func (c *CNIConfig) AddNetworkList(ctx context.Context, list *NetworkConfigList, rt *RuntimeConf) (types.Result, error) {
var err error
var result types.Result
for _, net := range list.Plugins {
result, err = c.addNetwork(ctx, list.Name, list.CNIVersion, net, result, rt)
if err != nil {
return nil, err
}
}
if err = c.cacheAdd(result, list.Bytes, list.Name, rt); err != nil {
return nil, fmt.Errorf("failed to set network %q cached result: %v", list.Name, err)
}
return result, nil
}
func (c *CNIConfig) addNetwork(ctx context.Context, name, cniVersion string, net *NetworkConfig, prevResult types.Result, rt *RuntimeConf) (types.Result, error) {
c.ensureExec()
pluginPath, err := c.exec.FindInPath(net.Network.Type, c.Path)
if err != nil {
return nil, err
}
if err := utils.ValidateContainerID(rt.ContainerID); err != nil {
return nil, err
}
if err := utils.ValidateNetworkName(name); err != nil {
return nil, err
}
if err := utils.ValidateInterfaceName(rt.IfName); err != nil {
return nil, err
}
newConf, err := buildOneConfig(name, cniVersion, net, prevResult, rt)
if err != nil {
return nil, err
}
return invoke.ExecPluginWithResult(ctx, pluginPath, newConf.Bytes, c.args("ADD", rt), c.exec)
}
最终调用 ExecPluginWithResult,参数 netconf 为插件配置文件信息,args为 RuntimeConf 信息、插件的执行动作(目前有ADD、DEL、CHECK、VERSION),插件二进制文件path皮装的信息。调用 ExecPlugin 执行插件二进制文件时加工成对应的环境变量。这样插件配置文件和args分别作为执行插件二进制时的环境变量和标准输入。
c.Env = environ
c.Stdin = bytes.NewBuffer(stdinData)
func (c *CNIConfig) args(action string, rt *RuntimeConf) *invoke.Args {
return &invoke.Args{
Command: action,
ContainerID: rt.ContainerID,
NetNS: rt.NetNS,
PluginArgs: rt.Args,
IfName: rt.IfName,
Path: strings.Join(c.Path, string(os.PathListSeparator)),
}
}
func (args *Args) AsEnv() []string {
env := os.Environ()
pluginArgsStr := args.PluginArgsStr
if pluginArgsStr == "" {
pluginArgsStr = stringify(args.PluginArgs)
}
// Duplicated values which come first will be overridden, so we must put the
// custom values in the end to avoid being overridden by the process environments.
env = append(env,
"CNI_COMMAND="+args.Command,
"CNI_CONTAINERID="+args.ContainerID,
"CNI_NETNS="+args.NetNS,
"CNI_ARGS="+pluginArgsStr,
"CNI_IFNAME="+args.IfName,
"CNI_PATH="+args.Path,
)
return dedupEnv(env)
}
func ExecPluginWithResult(ctx context.Context, pluginPath string, netconf []byte, args CNIArgs, exec Exec) (types.Result, error) {
if exec == nil {
exec = defaultExec
}
stdoutBytes, err := exec.ExecPlugin(ctx, pluginPath, netconf, args.AsEnv())
if err != nil {
return nil, err
}
// Plugin must return result in same version as specified in netconf
versionDecoder := &version.ConfigDecoder{}
confVersion, err := versionDecoder.Decode(netconf)
if err != nil {
return nil, err
}
return version.NewResult(confVersion, stdoutBytes)
}
func (e *RawExec) ExecPlugin(ctx context.Context, pluginPath string, stdinData []byte, environ []string) ([]byte, error) {
stdout := &bytes.Buffer{}
stderr := &bytes.Buffer{}
c := exec.CommandContext(ctx, pluginPath)
c.Env = environ
c.Stdin = bytes.NewBuffer(stdinData)
c.Stdout = stdout
c.Stderr = stderr
// Retry the command on "text file busy" errors
for i := 0; i <= 5; i++ {
err := c.Run()
// Command succeeded
if err == nil {
break
}
// If the plugin is currently about to be written, then we wait a
// second and try it again
if strings.Contains(err.Error(), "text file busy") {
time.Sleep(time.Second)
continue
}
// All other errors except than the busy text file
return nil, e.pluginErr(err, stdout.Bytes(), stderr.Bytes())
}
// Copy stderr to caller's buffer in case plugin printed to both
// stdout and stderr for some reason. Ignore failures as stderr is
// only informational.
if e.Stderr != nil && stderr.Len() > 0 {
_, _ = stderr.WriteTo(e.Stderr)
}
return stdout.Bytes(), nil
}
手动调用CNI
贴一个网上的例子,手动调用CNI将容器加入容器网络。
- docker 创建一个none网路的容器
contid=$(docker run -d --net=none --name nginx nginx) # 容器ID
pid=$(docker inspect -f '{{ .State.Pid }}' $contid) # 容器进程ID
netnspath=/proc/$pid/ns/net # 命名空间路径
启动容器的同时,记录一下容器ID,命名空间路径,方便后续传递给CNI插件。容器启动后,可以看到除了lo网卡,容器没有其他的网络设置:
nsenter -t $pid -n ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
- 使用bridge插件为容器创建网络接口,并连接到主机网桥。创建bridge.json配置文件,内容如下:
{
"cniVersion": "0.4.0",
"name": "mynet",
"type": "bridge",
"bridge": "mynet0",
"isDefaultGateway": true,
"forceAddress": false,
"ipMasq": true,
"hairpinMode": true,
"ipam": {
"type": "host-local",
"subnet": "10.10.0.0/16"
}
}
- 调用bridge插件ADD操作,指定必要的环境变量,并把bridge.json 作为标准输入
CNI_COMMAND=ADD CNI_CONTAINERID=$contid CNI_NETNS=$netnspath CNI_IFNAME=eth0 CNI_PATH=~/cni/bin ~/cni/bin/bridge < bridge.json
调用成功的话,会输出类似的返回值:
{
"cniVersion": "0.4.0",
"interfaces": [
....
],
"ips": [
{
"version": "4",
"interface": 2,
"address": "10.10.0.2/16", //给容器分配的IP地址
"gateway": "10.10.0.1"
}
],
"routes": [
.....
],
"dns": {}
}
- 再次查看容器中的网络配置:
nsenter -t $pid -n ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
5: eth0@if40: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default
link/ether c2:8f:ea:1b:7f:85 brd ff:ff:ff:ff:ff:ff link-netnsid 0
inet 10.10.0.2/16 brd 10.10.255.255 scope global eth0
valid_lft forever preferred_lft forever