概述
下面以 v1.14.4 版本的代码作为例子解释,Cilium 在选择 Chain 模式的部署下,是如何创建相关的资源的。
TL;DR
Cilium Chain 模式,Cilium 不负责容器组网的相关操作,可以认为只负责了 Endpoint, CiliumEndpoint, Identity, Policy 相关的资源的创建。
cmdAdd // plugins/cilium-cni/main.go
|-loadNetConf(args.StdinData) // plugins/cilium-cni/main.go
|-parsePrevResult(plugin) // plugins/cilium-cni/types/types.go
|-chainAction.Add
|-GenericVethChainer.Add // plugins/cilium-cni/chaining/generic-veth/generic-veth.go
|-ns.GetNS(pluginCtx.Args.Netns) // plugins/cilium-cni/chaining/generic-veth/generic-veth.go
|-ep := &models.EndpointChangeRequest // plugins/cilium-cni/chaining/generic-veth/generic-veth.go
|-EndpointCreate(ep) // plugins/cilium-cni/chaining/generic-veth/generic-veth.go
|-EndpointCreate // pkg/client/endpoint.go
|-PutEndpointID // api/v1/client/endpoint/endpoint_client.go
/\
||
\/
ServeHTTP // api/server/restapi/endpint/put_endpoint_id.go
|-Handler.Handle() // api/server/restapi/endpint/put_endpoint_id.go
|- Handle() // daemon/cmd/endpoint.go
|-createEndpoint // daemon/cmd/endpoint.go
|-NewEndpointFromChangeModel // pkg/endpoint/api.go
|-endpointmanager.AddEndpoint // daemon/cmd/endpoint.go
| |-Expose // pkg/endpointmanager/manager.go
| |-AllocateID // pkg/endpoint/manager.go
| |-RunK8sCiliumEndpointSync(e) // pkg/k8s/watchers/endpointsynchronizer.go
|-ep.UpdateLabels // pkg/endpoint/endpoint.go
| |-replaceInformationLabels // pkg/endpoint/endpoint.go
| |-ReplaceIdentityLabels // pkg/endpoint/endpoint.go
| |-RunIdentityResolver // pkg/endpoint/endpoint.go
| |-identityLabelsChanged // pkg/endpoint/endpoint.go
| |-AllocateIdentity // pkg/identity/cache/allocator.go
| |-forcePolicyComputation // pkg/endpoint/endpoint.go
| |-SetIdentity // pkg/identity/cache/allocator.go
| |-runIPIdentitySync // pkg/endpoint/policy.go
| |-UpsertIPToKVStore // pkg/ipcache/kvstore.go
|-Regenerate // pkg/endpoint/policy.go
|-regenerate // pkg/endpoint/policy.go
|-regenerateBPF // pkg/endpoint/bpf.go
|-runPreCompilationSteps
| |-regeneratePolicy
| |-writeHeaderfile
|-realizeBPFState
|-CompileAndLoad // pkt/datapath/loader/loader.go
|-compileAndLoad // pkt/datapath/loader/loader.go
|-compileDatapath // pkt/datapath/loader/loader.go
| |-compile // pkt/datapath/loader/compile.go
| |-compileAndLink // pkt/datapath/loader/compile.go
|-reloadDatapath // pkt/datapath/loader/loader.go
|-replaceDatapath // pkt/datapath/loader/netlink.go
loadNetConf()
读取的文件如下,依赖于 CNI 的链式调用,kubelet 会先调用 contivk8s.bin,然后再调用 cilium-cni,后面关于创建 Endpoint 这些资源的流程,跟非 Chain 模式的 CNI 是类似的。
{
"name": "generic-veth",
"cniVersion": "0.1.0",
"plugins": [
{
"type": "contivk8s.bin"
},
{
"type": "cilium-cni"
}
]
}
查看网卡的信息。
[root@ns-k8s-noah-staging001-node-s0092 ~]# ethtool -i vvport16894
driver: veth
version: 1.0
firmware-version:
expansion-rom-version:
bus-info:
supports-statistics: yes
supports-test: no
supports-eeprom-access: no
supports-register-dump: no
supports-priv-flags: no
代码实现
需要实现三个方法。
type ChainingPlugin interface {
// Add is called on CNI ADD. It is given the plugin context from the previous plugin. It must return a CNI result or an error.
Add(ctx context.Context, pluginContext PluginContext, client *client.Client) (res *cniTypesVer.Result, err error)
// Delete is called on CNI DELETE. It is given the plugin context from
the previous plugin.
Delete(ctx context.Context, pluginContext PluginContext, delClient *lib.DeletionFallbackClient) (err error)
// Check is called on CNI CHECK. The plugin should verify (to the best of its ability) that everything is reasonably configured, else return error.
Check(ctx context.Context, pluginContext PluginContext, client *client.Client) error
}
cmdAdd
func cmdAdd(args *skel.CmdArgs) (err error) {
n, err := types.LoadNetConf(args.StdinData)
if err != nil {
return fmt.Errorf("unable to parse CNI configuration \"%s\": %s", args.StdinData, err)
}
if err = setupLogging(n); err != nil {
return fmt.Errorf("unable to setup logging: %w", err)
}
logger := log.WithField("eventUUID", uuid.New())
if n.EnableDebug {
if err := gops.Listen(gops.Options{}); err != nil {
log.WithError(err).Warn("Unable to start gops")
} else {
defer gops.Close()
}
}
logger.Debugf("Processing CNI ADD request %#v", args)
logger.Debugf("CNI NetConf: %#v", n)
if n.PrevResult != nil {
logger.Debugf("CNI Previous result: %#v", n.PrevResult)
}
cniArgs := types.ArgsSpec{}
if err = cniTypes.LoadArgs(args.Args, &cniArgs); err != nil {
return fmt.Errorf("unable to extract CNI arguments: %s", err)
}
logger.Debugf("CNI Args: %#v", cniArgs)
c, err := client.NewDefaultClientWithTimeout(defaults.ClientConnectTimeout)
if err != nil {
return fmt.Errorf("unable to connect to Cilium daemon: %s", client.Hint(err))
}
// If CNI ADD gives us a PrevResult, we're a chained plugin and *must* detect a
// valid chained mode. If no chained mode we understand is specified, error out.
// Otherwise, continue with normal plugin execution.
if len(n.NetConf.RawPrevResult) != 0 {
if chainAction, err := getChainedAction(n, logger); chainAction != nil {
var (
res *cniTypesV1.Result
ctx = chainingapi.PluginContext{
Logger: logger,
Args: args,
CniArgs: cniArgs,
NetConf: n,
}
)
res, err = chainAction.Add(context.TODO(), ctx, c)
if err != nil {
logger.WithError(err).Warn("Chained ADD failed")
return err
}
logger.Debugf("Returning result %#v", res)
return cniTypes.PrintResult(res, n.CNIVersion)
} else if err != nil {
logger.WithError(err).Error("Invalid chaining mode")
return err
} else {
// no chained action supplied; this is an error
logger.Error("CNI PrevResult supplied, but not in chaining mode -- this is invalid, please set chaining-mode in CNI configuration")
return fmt.Errorf("CNI PrevResult supplied, but not in chaining mode -- this is invalid, please set chaining-mode in CNI configuration")
}
}
// 非Chain模式下的CNI插件,不是我们关注的
// ...
}
GenericVethChainer.Add
func (f *GenericVethChainer) Add(ctx context.Context, pluginCtx chainingapi.PluginContext, cli *client.Client) (res *cniTypesVer.Result, err error) {
err = cniVersion.ParsePrevResult(&pluginCtx.NetConf.NetConf)
if err != nil {
err = fmt.Errorf("unable to understand network config: %s", err)
return
}
var prevRes *cniTypesVer.Result
prevRes, err = cniTypesVer.NewResultFromResult(pluginCtx.NetConf.PrevResult)
if err != nil {
err = fmt.Errorf("unable to get previous network result: %s", err)
return
}
defer func() {
if err != nil {
pluginCtx.Logger.WithError(err).
WithFields(logrus.Fields{"cni-pre-result": pluginCtx.NetConf.PrevResult}).
Errorf("Unable to create endpoint")
}
}()
var (
hostMac, vethHostName, vethLXCMac, vethIP, vethIPv6 string
vethHostIdx, peerIndex int
peer netlink.Link
netNs ns.NetNS
)
netNs, err = ns.GetNS(pluginCtx.Args.Netns)
if err != nil {
err = fmt.Errorf("failed to open netns %q: %s", pluginCtx.Args.Netns, err)
return
}
defer netNs.Close()
if err = netNs.Do(func(_ ns.NetNS) error {
links, err := netlink.LinkList()
if err != nil {
return err
}
for _, link := range links {
pluginCtx.Logger.Debugf("Found interface in container %+v", link.Attrs())
// 针对veth处理
if link.Type() != "veth" {
continue
}
// 这些信息都是从prevResult获取
vethLXCMac = link.Attrs().HardwareAddr.String()
veth, ok := link.(*netlink.Veth)
if !ok {
return fmt.Errorf("link %s is not a veth interface", vethHostName)
}
peerIndex, err = netlink.VethPeerIndex(veth)
if err != nil {
return fmt.Errorf("unable to retrieve index of veth peer %s: %s", vethHostName, err)
}
addrs, err := netlink.AddrList(link, netlink.FAMILY_V4)
if err == nil && len(addrs) > 0 {
vethIP = addrs[0].IPNet.IP.String()
} else if err != nil {
pluginCtx.Logger.WithError(err).WithFields(logrus.Fields{
logfields.Interface: link.Attrs().Name}).Warn("No valid IPv4 address found")
}
addrsv6, err := netlink.AddrList(link, netlink.FAMILY_V6)
if err == nil && len(addrsv6) > 0 {
vethIPv6 = addrsv6[0].IPNet.IP.String()
} else if err != nil {
pluginCtx.Logger.WithError(err).WithFields(logrus.Fields{
logfields.Interface: link.Attrs().Name}).Warn("No valid IPv6 address found")
}
return nil
}
return fmt.Errorf("no link found inside container")
}); err != nil {
return
}
peer, err = netlink.LinkByIndex(peerIndex)
if err != nil {
err = fmt.Errorf("unable to lookup link %d: %s", peerIndex, err)
return
}
hostMac = peer.Attrs().HardwareAddr.String()
vethHostName = peer.Attrs().Name
vethHostIdx = peer.Attrs().Index
switch {
case vethHostName == "":
err = errors.New("unable to determine name of veth pair on the host side")
return
case vethLXCMac == "":
err = errors.New("unable to determine MAC address of veth pair on the container side")
return
case vethIP == "" && vethIPv6 == "":
err = errors.New("unable to determine IP address of the container")
return
case vethHostIdx == 0:
err = errors.New("unable to determine index interface of veth pair on the host side")
return
}
var disabled = false
ep := &models.EndpointChangeRequest{
Addressing: &models.AddressPair{
IPV4: vethIP,
IPV6: vethIPv6,
},
ContainerID: pluginCtx.Args.ContainerID,
State: models.EndpointStateWaitingDashForDashIdentity.Pointer(),
HostMac: hostMac,
InterfaceIndex: int64(vethHostIdx),
Mac: vethLXCMac,
InterfaceName: vethHostName,
K8sPodName: string(pluginCtx.CniArgs.K8S_POD_NAME),
K8sNamespace: string(pluginCtx.CniArgs.K8S_POD_NAMESPACE),
SyncBuildEndpoint: true,
DatapathConfiguration: &models.EndpointDatapathConfiguration{
// aws-cni requires ARP passthrough between Linux and the pod
RequireArpPassthrough: true,
// The route is pointing directly into the veth of the pod, install a host-facing egress program to implement ingress policy and to provide reverse NAT
RequireEgressProg: true,
// The IP is managed by the aws-cni plugin, no need for Cilium to manage any aspect of addressing
ExternalIpam: true,
// All routing is performed by the Linux stack
RequireRouting: &disabled,
},
}
err = cli.EndpointCreate(ep)
if err != nil {
pluginCtx.Logger.WithError(err).WithFields(logrus.Fields{
logfields.ContainerID: ep.ContainerID}).Warn("Unable to create endpoint")
err = fmt.Errorf("unable to create endpoint: %s", err)
return
}
pluginCtx.Logger.WithFields(logrus.Fields{
logfields.ContainerID: ep.ContainerID}).Debug("Endpoint successfully created")
res = prevRes
return
}
关于compile
cilium-1.14.4/pkg/datapath/loader/compile.go
从一个 cilium-agent 的容器内执行下面的命令。
root@ns-k8s-noah-staging001-node-s0093:/home/cilium# clang --version
clang version 10.0.0 (https://github.com/llvm/llvm-project.git 0598a534371d5fd6debd129b1378b39b923b9787)
Target: x86_64-unknown-linux-gnu
Thread model: posix
InstalledDir: /usr/local/bin
可以从日志中,找到 cilium 的编译语句实际如下。
clang -emit-llvm -g -O2 --target=bpf -std=gnu89 -nostdinc -D__NR_CPUS__=24 -Wall -Wextra -Werror -Wshadow -Wno-address-of-packed-member -Wno-unknown-warning-option -Wno-gnu-variable-sized-type-not-at-end -Wdeclaration-after-statement -Wimplicit-int-conversion -Wenum-conversion -I/var/run/cilium/state/globals -I/var/run/cilium/state/templates/46b04c5af0a1cda6e6efde8d345f9cfee515c727b888a49f38393b6cb0c84e2f -I/var/lib/cilium/bpf -I/var/lib/cilium/bpf/include -c /var/lib/cilium/bpf/bpf_host.c -o -
这条命令使用 clang 编译器来编译一个名为 bpf_host.c 的 C 语言源文件,这个文件位于 /var/lib/cilium/bpf 路径下。这个操作是在编译 eBPF 程序,这类程序通常用于 Linux 内核中的网络处理、监视和安全策略执行等任务。命令中的每个选项和参数都有特定的作用:
- -emit-llvm: 指示clang生成LLVM中间表示 (Intermediate Representation, IR) 而不是生成机器代码。LLVM IR是一种低级的、类型丰富的汇编语言,它提供了LLVM优化和代码生成工具链所需的信息
- -g: 在生成的文件中包括调试信息,这有助于后续的调试工作
- -O2: 启用优化级别2,这是编译器的标准优化级别,试图在不牺牲太多编译时间的情况下提供性能提升
- --target=bpf: 指定目标架构为BPF,这是编译eBPF程序时必需的,因为eBPF程序需要在内核中执行
- -std=gnu89: 使用GNU89标准来编译C代码,这是ANSI C(也称为C89)的一个变种,它包括了GNU的一些扩展
- -nostdinc: 不在标准系统目录中查找头文件,通常用于确保只使用指定的头文件,而不是系统路径中的头文件
- -D__NR_CPUS__=24: 定义宏NR_CPUS并设置其值为24,这可能与eBPF程序中处理的CPU核心数量有关
- -Wall -Wextra -Werror: 启用所有(-Wall)和额外(-Wextra)的警告,并将所有警告作为错误处理(-Werror),这有助于确保代码质量
- -Wshadow: 警告局部变量遮蔽(shadowing)了其他变量
- -Wno-address-of-packed-member: 不警告访问打包结构体(使用了 attribute((packed)))成员的地址
- -Wno-unknown-warning-option: 不警告未知的编译器警告选项
- -Wno-gnu-variable-sized-type-not-at-end: 不警告 GNU 扩展的非最后位置的可变大小类型
- -Wdeclaration-after-statement: 警在任何语句之后声明变量(在C89中不允许)
- -Wimplicit-int-conversion: 警告在整数类型之间进行的隐式转换
- -Wenum-conversion: 警告枚举类型到不同类型的转换
- -I: 后面跟的是包含目录的路径,clang 将在这些路径中查找包含的头文件
- -c: 表示编译操作,而不进行链接
- /var/lib/cilium/bpf/bpf_host.c: 指定要编译的源文件
- -o -: 将输出(此处是 LLVM IR)写到标准输出(stdout),而不是写入文件。这通常用于将输出传递给管道中的另一个命令
综上所述,此命令编译了一个 eBPF 程序,产生的输出是 LLVM 中间表示的形式,包含了调试信息,且进行了一些优化。这个过程在编译过程中使用了许多严格的警告选项来保证代码质量,并且避免了使用标准包含路径,而是添加了特定的 Cilium 相关的包含路径。
编译的信息来源
// epInfoCache describes the set of lxcmap entries necessary to describe an Endpoint
// in the BPF maps. It is generated while holding the Endpoint lock, then used
// after releasing that lock to push the entries into the datapath.
// Functions below implement the EndpointFrontend interface with this cached information.
type epInfoCache struct {
// revision is used by the endpoint regeneration code to determine
// whether this cache is out-of-date wrt the underlying endpoint.
revision uint64
// For datapath.loader.endpoint
epdir string
id uint64
ifName string
// For datapath.EndpointConfiguration
identity identity.NumericIdentity
mac mac.MAC
ipv4 netip.Addr
ipv6 netip.Addr
conntrackLocal bool
requireARPPassthrough bool
requireEgressProg bool
requireRouting bool
requireEndpointRoute bool
policyVerdictLogFilter uint32
cidr4PrefixLengths, cidr6PrefixLengths []int
options *option.IntOptions
lxcMAC mac.MAC
ifIndex int
// endpoint is used to get the endpoint's logger.
//
// Do NOT use this for fetching endpoint data directly; this structure
// is intended as a safe cache of endpoint data that is assembled while
// holding the endpoint lock, for use beyond the holding of that lock.
// Dereferencing fields in this endpoint is not guaranteed to be safe.
endpoint *Endpoint
}
tc操作
pkg/datapath/loader/netlink.go
包含一些 tc 的操作。编译和加载,分开了两个步骤。
func (l *Loader) compileAndLoad(ctx context.Context, ep datapath.Endpoint, dirs *directoryInfo, stats *metrics.SpanStat) error {
stats.BpfCompilation.Start()
// 先编译
err := compileDatapath(ctx, dirs, ep.IsHost(), ep.Logger(Subsystem))
stats.BpfCompilation.End(err == nil)
if err != nil {
return err
}
stats.BpfLoadProg.Start()
// 后加载
err = l.reloadDatapath(ctx, ep, dirs)
stats.BpfLoadProg.End(err == nil)
return err
}
清理
Helm delete 的时候,会把网卡这些删除。但是在安装之前会有 uninstall 的脚本,以防止之前安装的组件会有残留。
preStop:
exec:
command:
- /cni-uninstall.sh
查看脚本的内容,实际就是去做一些清理配置文件的工作。
#!/bin/bash
set -e
HOST_PREFIX=${HOST_PREFIX:-/host}
BIN_NAME=cilium-cni
CNI_DIR=${CNI_DIR:-${HOST_PREFIX}/opt/cni}
CNI_CONF_DIR=${CNI_CONF_DIR:-${HOST_PREFIX}/etc/cni/net.d}
CILIUM_CUSTOM_CNI_CONF=${CILIUM_CUSTOM_CNI_CONF:-false}
if [[ "$(cat /tmp/cilium/config-map/cni-uninstall 2>/dev/null || true)" != "true" ]]; then
echo "cni-uninstall disabled, not removing CNI configuration"
exit
fi
# Do not interact with the host's CNI directory when the user specified they
# are managing CNI configs externally.
if [ "${CILIUM_CUSTOM_CNI_CONF}" != "true" ]; then
# .conf/.conflist/.json (undocumented) are read by kubelet/dockershim's CNI implementation.
# Remove any active Cilium CNI configurations to prevent scheduling Pods during agent
# downtime. Configs belonging to other CNI implementations have already been renamed
# to *.cilium_bak during agent startup.
echo "Removing active Cilium CNI configurations from ${CNI_CONF_DIR}..."
find "${CNI_CONF_DIR}" -maxdepth 1 -type f \
-name '*cilium*' -and \( \
-name '*.conf' -or \
-name '*.conflist' \
\) -delete
fi
RestfulAPI
cilium api server 用来处理 Endpoint 的请求,默认情况下,通过 UNIX 进行通信。类似的,cilium 的客户端工具,可以通过读取这个 unix-socket 和这个 HTTP 服务进行交互,读取 cilium-agent 的一些实时数据。
默认情况下,只支持 unix 通信,如果想暴露出 http 接口,可以修改源码来实现(不建议)。
curl --unix-socket /var/run/cilium/cilium.sock http://localhost/v1/cluster/nodes
curl --unix-socket /var/run/cilium/cilium.sock http://localhost/v1/config
curl --unix-socket /var/run/cilium/cilium.sock http://localhost/v1/healthz
Cilium Operator
甚至都不负责 CEP 的创建,但是会协助 Cilium 资源的 GC。