基础服务组件两大性能杀手:锁和缓存失效,dpvs 里面处处体现这两点的优化。再次强调一下,dpvs 是 dpdk 程序,特点是每个核尽可能不与其它核交互,这就要求共享数据都有一份拷贝,或是数据私有。举个例子,流表 (session) 保存连接信息,每个核独有。但这里有个问题,full-nat 模式下,返程数据 outbound packet 也必须分配到同一个 cpu,否则在流表中找不到 conn. 那么这块 dpvs 是如何优化的呢?
数据亲和性问题
如图所示,dpvs 机器有两块网卡,nic1 是 wan 外网网卡, nic0 是 lan 内网网卡。 当 client 发送 packet 时,网卡一般由 rss 来选择数据发送到哪个队列,一般这个队列都会绑定到某个核心 lcore. rss 一般根据四元组<dport, dip, sport, sip>来分配网卡队列。
但是,当 packet 由 rs 返回 dpvs 时,如果还是根据四元组来做 rss, 那么得到的队列必然无法对应到正确的 lcore. 这就会引起流表数据 miss, 如果再从其它 lcore 查表,必然会引起共享数据加锁,和 cpu cache 失效问题。怎么解决呢?
网卡数据导流
网卡流入数据,分配到哪个队列,哪个 cpu, 现在有很多实现方案。是否支持硬件队列又有不同方案,有点乱。归根结底,目的就是均匀分配流量到不同 cpu
上古时代网卡是没有硬件队列的,中断都打在 cpu0 上,所以有了穷人的方案 rps, 软件层面将中断分在不同核上。后来有了硬件队列,直接上 rss 将数据导向不同队列,再绑定 cpu 即可。再后来发现光有 rss 还不够,如果中断在 cpu0 上,处理数据的反而在 cpu1 那就会产生 cache miss, 所以有了 rfs,中断和处理都在同一个核。
但是这些没有解决 dpvs full-nat 返程数据的问题,这就引入了 flow director 机制,精准的分配网卡流量,而不是简单的四元组哈希。
dpvs解决方案
引入 fdir 机制,这里有两种方案。每个 lcore 分配一个 lip 本地地址,fdir 根据 lip 就会分配到正确的核。另外一种是 lip 都是同一个,根据 lport 本地端口,来分配正确的核。由于 ip 受限等原因,不可能一个 dpvs 上有几十个本地地址,所以 dpvs 采用第二种。
如上图所示,本地可用端口,根据 cpu 个数做掩码。将端口固定到某个 lcore. 举个例子,如果网卡有 16 个队列,那么就配置 16 个 cpu, 掩码是 0x0F, 端口根据掩码取余,就会对应到指定的队列和cpu
说完原理直接搂源码~~
配置文件 fdir
先看一下配置文件,只看 net device dpdk0 就可以
<init> device dpdk0 {
rx {
queue_number 8
descriptor_number 1024
rss all
}
tx {
queue_number 8
descriptor_number 1024
}
fdir {
mode perfect
pballoc 64k
status matched
}
! promisc_mode
kni_name dpdk0.kni
}
可以看到,rx 队列配置了 rss. 并且 dpdk0 网卡配置了 fdir, 具体细节暂不看。
默认 fdir 配置
在对网卡初始化时,会用到 default_port_conf,这里有关于网卡 fdir 配置
static struct rte_eth_conf default_port_conf = {
.rxmode = {
.mq_mode = ETH_MQ_RX_RSS,
.max_rx_pkt_len = ETHER_MAX_LEN,
.split_hdr_size = 0,
.header_split = 0,
.hw_ip_checksum = 1,
.hw_vlan_filter = 0,
.jumbo_frame = 0,
.hw_strip_crc = 0,
},
.rx_adv_conf = {
.rss_conf = {
.rss_key = NULL,
.rss_hf = /*ETH_RSS_IP*/ ETH_RSS_TCP,
},
},
.txmode = {
.mq_mode = ETH_MQ_TX_NONE,
},
.fdir_conf = {
.mode = RTE_FDIR_MODE_PERFECT,
.pballoc = RTE_FDIR_PBALLOC_64K,
.status = RTE_FDIR_REPORT_STATUS/*_ALWAYS*/,
.mask = {
.vlan_tci_mask = 0x0,
.ipv4_mask = {
.src_ip = 0x00000000,
.dst_ip = 0xFFFFFFFF,
},
.ipv6_mask = {
.src_ip = { 0, 0, 0, 0 },
.dst_ip = { 0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF },
},
.src_port_mask = 0x0000,
/* to be changed according to slave lcore number in use */
.dst_port_mask = 0x00F8,
.mac_addr_byte_mask = 0x00,
.tunnel_type_mask = 0,
.tunnel_id_mask = 0,
},
.drop_queue = 127,
.flex_conf = {
.nb_payloads = 0,
.nb_flexmasks = 0,
},
},
};
这里有两点,rx_adv_conf 关于 rss 配置,默认是 ETH_RSS_TCP. 最重要的是 fdir_conf,可以看到 mode, pballoc, status 等。这里关注 mask 即可,fdir 支持不同层的导流,ipv4_mask.src_ip 掩码是 0,ipv4_mask.dst_ip 位全置1,所以 fdir 只看目地 ip 不看源 ip,src_port_mask 是 0,dst_port_mask 非 0,也就是说 dpvs fdir 只根据 dst_ip, dst_port_mask 计算,也就是对应 <lip, lport>, 由于 lip 只有一个,所以等同于只看 lport, 那么如何设置正确的 dst_port_mask 掩码呢?
sa_pool 与 fdir 实始化
每个 lcore 有自己的 sa_pool, 用于管理本地分配的 <lip, lport>, 假如当前启用了 64 个 lcore, 一共有 65535-1024 可用端口,那么每个 lcore 在同一个 lip 上最多使用 (65535-1024)/64 个地址。
程序初始化时调用 sa_pool_init
初始化全局 fdir 表
int sa_pool_init(void)
{
int shift, err;
lcoreid_t cid;
uint16_t port_base;
// slave_lcore_nb 是核的个数,sa_lcore_mask 核对应的bit置为1的 mask
/* enabled lcore should not change after init */
netif_get_slave_lcores(&sa_nlcore, &sa_lcore_mask);
/* how many mask bits needed ? */
for (shift = 0; (0x1<<shift) < sa_nlcore; shift++)
;
if (shift >= 16)
return EDPVS_INVAL; /* bad config */
port_base = 0;
for (cid = 0; cid < RTE_MAX_LCORE; cid++) {
if (cid > 64 || !(sa_lcore_mask & (1L << cid)))
continue;
assert(rte_lcore_is_enabled(cid) && cid != rte_get_master_lcore());
sa_fdirs[cid].mask = ~((~0x0) << shift);
sa_fdirs[cid].lcore = cid;
sa_fdirs[cid].port_base = htons(port_base);
sa_fdirs[cid].soft_id = 0;
port_base++;
}
err = msg_type_mc_register(&sa_stats_msg);
return err;
}
-
netif_get_slave_lcores
获取当前启用的 lcore 个数,并生成掩码 - for 循环为每个核初始化全局 fdir 配置, mask 掩码由上一步获取。其中最重要的是 port_base, fdir 计算时,lport 经过掩码后,得到的值如果等于 port_base 就会分配到这个核心
在使用 ipvsadmin 添加 lip 时,ifa_add_set 调用 sa_pool_create
初始化 sa_pool
int sa_pool_create(struct inet_ifaddr *ifa, uint16_t low, uint16_t high)
{
struct sa_pool *ap;
int err;
lcoreid_t cid;
low = low ? : DEF_MIN_PORT;
high = high ? : DEF_MAX_PORT;
if (!ifa || low > high || low == 0 || high >= MAX_PORT) {
RTE_LOG(ERR, SAPOOL, "%s: bad arguments\n", __func__);
return EDPVS_INVAL;
}
for (cid = 0; cid < RTE_MAX_LCORE; cid++) {
uint32_t filtids[MAX_FDIR_PROTO];
struct sa_fdir *fdir = &sa_fdirs[cid];
/* skip master and unused cores */
if (cid > 64 || !(sa_lcore_mask & (1L << cid)))
continue;
assert(rte_lcore_is_enabled(cid) && cid != rte_get_master_lcore());
ap = rte_zmalloc(NULL, sizeof(struct sa_pool), 0);
if (!ap) {
err = EDPVS_NOMEM;
goto errout;
}
ap->ifa = ifa;
ap->low = low;
ap->high = high;
rte_atomic32_set(&ap->refcnt, 0);
err = sa_pool_alloc_hash(ap, sa_pool_hash_size, fdir);
if (err != EDPVS_OK) {
rte_free(ap);
goto errout;
}
/* if add filter failed, waste some soft-id is acceptable. */
filtids[0] = fdir->soft_id++;
filtids[1] = fdir->soft_id++;
err = sa_add_filter(ifa->af, ifa->idev->dev, cid, &ifa->addr,
fdir->port_base, filtids);
if (err != EDPVS_OK) {
sa_pool_free_hash(ap);
rte_free(ap);
goto errout;
}
ap->filter_id[0] = filtids[0];
ap->filter_id[1] = filtids[1];
ifa->sa_pools[cid] = ap;
}
return EDPVS_OK;
errout:
sa_pool_destroy(ifa);
return err;
}
- low, high 端口默认分别是 1025,65535
- for 循环,为每个 lcore 初始化 sa_pool, 并设置 fdir
-
rte_zmalloc
分配 sa_pool 结构体,并赋初值 ifa, low, high.sa_pool_alloc_hash
分配 socket 地址的哈希表,根据 fdir->mask &&
((uint16_t)port & fdir->mask) != ntohs(fdir->port_base) 为当前 lcore 分配地址。 -
sa_add_filter
调用__add_del_filter
增加 fdir filter, 用于 fdir 匹配
static int __add_del_filter(int af, struct netif_port *dev, lcoreid_t cid,
const union inet_addr *dip, __be16 dport,
uint32_t filter_id[MAX_FDIR_PROTO], bool add)
{
struct rte_eth_fdir_filter filt[MAX_FDIR_PROTO] = {
{
.action.behavior = RTE_ETH_FDIR_ACCEPT,
.action.report_status = RTE_ETH_FDIR_REPORT_ID,
.soft_id = filter_id[0],
},
{
.action.behavior = RTE_ETH_FDIR_ACCEPT,
.action.report_status = RTE_ETH_FDIR_REPORT_ID,
.soft_id = filter_id[1],
},
};
if (af == AF_INET) {
filt[0].input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_TCP;
filt[0].input.flow.tcp4_flow.ip.dst_ip = dip->in.s_addr;
filt[0].input.flow.tcp4_flow.dst_port = dport;
filt[1].input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV4_UDP;
filt[1].input.flow.udp4_flow.ip.dst_ip = dip->in.s_addr;
filt[1].input.flow.udp4_flow.dst_port = dport;
} else if (af == AF_INET6) {
filt[0].input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV6_TCP;
memcpy(filt[0].input.flow.ipv6_flow.dst_ip, &dip->in6, sizeof(struct in6_addr));
filt[0].input.flow.tcp6_flow.dst_port = dport;
filt[1].input.flow_type = RTE_ETH_FLOW_NONFRAG_IPV6_UDP;
memcpy(filt[1].input.flow.ipv6_flow.dst_ip, &dip->in6, sizeof(struct in6_addr));
filt[1].input.flow.udp6_flow.dst_port = dport;
} else {
return EDPVS_NOTSUPP;
}
queueid_t queue;
int err;
enum rte_filter_op op, rop;
#ifdef CONFIG_DPVS_SAPOOL_DEBUG
char ipaddr[64];
#endif
if (dev->netif_ops && dev->netif_ops->op_filter_supported) {
if (dev->netif_ops->op_filter_supported(dev, RTE_ETH_FILTER_FDIR) < 0) {
if (dev->nrxq <= 1)
return EDPVS_OK;
RTE_LOG(ERR, SAPOOL, "%s: FDIR is not supported by device %s. Only"
" single rxq can be configured.\n", __func__, dev->name);
return EDPVS_NOTSUPP;
}
} else {
RTE_LOG(ERR, SAPOOL, "%s: FDIR support of device %s is not known.\n",
__func__, dev->name);
return EDPVS_INVAL;
}
err = netif_get_queue(dev, cid, &queue);
if (err != EDPVS_OK)
return err;
filt[0].action.rx_queue = filt[1].action.rx_queue = queue;
op = add ? RTE_ETH_FILTER_ADD : RTE_ETH_FILTER_DELETE;
netif_mask_fdir_filter(af, dev, &filt[0]);
netif_mask_fdir_filter(af, dev, &filt[1]);
err = netif_fdir_filter_set(dev, op, &filt[0]);
if (err != EDPVS_OK)
return err;
err = netif_fdir_filter_set(dev, op, &filt[1]);
if (err != EDPVS_OK) {
rop = add ? RTE_ETH_FILTER_DELETE : RTE_ETH_FILTER_ADD;
netif_fdir_filter_set(dev, rop, &filt[0]);
return err;
}
return err;
}
- struct rte_eth_fdir_filter filt 定义 fdir 过滤条件结构体
- 分别针对 ipv4, ipv6 设置 filter
- 并不是所有网卡都支持 fdir, 调用
op_filter_supported
查看是否支持 - netif_get_queue(dev, cid, &queue) 获取当前 lcore 所绑定的队列 queue
- filt[0].action.rx_queue = filt[1].action.rx_queue = queue 绑定对应的网卡硬件队列
-
netif_mask_fdir_filter
获取当前 fdir 配置,然后添加到 filter 结构体里 -
netif_fdir_filter_set
将 filter 过滤条件更新到网卡,由于 netif_ops 是个函数指针结构体,根据是否 bond 网卡操作不同。最终都是调用 dpdk 提供的 apirte_eth_dev_filter_ctrl
实现。
流表如何使用 fdir
由于 dpdk 封装了回包时,根据 fdir filter 分配队列的逻辑。我们在 dpvs 代码只能看如何从 sa_pool 中分配本地地址。所有新建立的连接,都会调用 dp_vs_conn_new
注册流表
/* FNAT only: select and bind local address/port */
if (dest->fwdmode == DPVS_FWD_MODE_FNAT) {
if ((err = dp_vs_laddr_bind(new, dest->svc)) != EDPVS_OK)
goto unbind_dest;
}
看代码得知,只有 full-nat 才支持 local address/port
int dp_vs_laddr_bind(struct dp_vs_conn *conn, struct dp_vs_service *svc)
{
struct dp_vs_laddr *laddr = NULL;
int i;
uint16_t sport = 0;
struct sockaddr_storage dsin, ssin;
if (!conn || !conn->dest || !svc)
return EDPVS_INVAL;
if (svc->proto != IPPROTO_TCP && svc->proto != IPPROTO_UDP)
return EDPVS_NOTSUPP;
if (conn->flags & DPVS_CONN_F_TEMPLATE)
return EDPVS_OK;
/*
* some time allocate lport fails for one laddr,
* but there's also some resource on another laddr.
* use write lock since
* 1. __get_laddr will change svc->laddr_curr;
* 2. we uses svc->num_laddrs;
*/
rte_rwlock_write_lock(&svc->laddr_lock);
for (i = 0; i < dp_vs_laddr_max_trails && i < svc->num_laddrs; i++) {
/* select a local IP from service */
laddr = __get_laddr(svc);
if (!laddr) {
RTE_LOG(ERR, IPVS, "%s: no laddr available.\n", __func__);
rte_rwlock_write_unlock(&svc->laddr_lock);
return EDPVS_RESOURCE;
}
memset(&dsin, 0, sizeof(struct sockaddr_storage));
memset(&ssin, 0, sizeof(struct sockaddr_storage));
if (laddr->af == AF_INET) {
struct sockaddr_in *daddr, *saddr;
daddr = (struct sockaddr_in *)&dsin;
daddr->sin_family = laddr->af;
daddr->sin_addr = conn->daddr.in;
daddr->sin_port = conn->dport;
saddr = (struct sockaddr_in *)&ssin;
saddr->sin_family = laddr->af;
saddr->sin_addr = laddr->addr.in;
} else {
struct sockaddr_in6 *daddr, *saddr;
daddr = (struct sockaddr_in6 *)&dsin;
daddr->sin6_family = laddr->af;
daddr->sin6_addr = conn->daddr.in6;
daddr->sin6_port = conn->dport;
saddr = (struct sockaddr_in6 *)&ssin;
saddr->sin6_family = laddr->af;
saddr->sin6_addr = laddr->addr.in6;
}
if (sa_fetch(laddr->af, laddr->iface, &dsin, &ssin) != EDPVS_OK) {
char buf[64];
if (inet_ntop(laddr->af, &laddr->addr, buf, sizeof(buf)) == NULL)
snprintf(buf, sizeof(buf), "::");
#ifdef CONFIG_DPVS_IPVS_DEBUG
RTE_LOG(ERR, IPVS, "%s: [%d] no lport available on %s, "
"try next laddr.\n", __func__, rte_lcore_id(), buf);
#endif
put_laddr(laddr);
continue;
}
sport = (laddr->af == AF_INET ? (((struct sockaddr_in *)&ssin)->sin_port)
: (((struct sockaddr_in6 *)&ssin)->sin6_port));
break;
}
rte_rwlock_write_unlock(&svc->laddr_lock);
if (!laddr || sport == 0) {
#ifdef CONFIG_DPVS_IPVS_DEBUG
RTE_LOG(ERR, IPVS, "%s: [%d] no lport available !!\n",
__func__, rte_lcore_id());
#endif
if (laddr)
put_laddr(laddr);
return EDPVS_RESOURCE;
}
rte_atomic32_inc(&laddr->conn_counts);
/* overwrite related fields in out-tuplehash and conn */
conn->laddr = laddr->addr;
conn->lport = sport;
tuplehash_out(conn).daddr = laddr->addr;
tuplehash_out(conn).dport = sport;
conn->local = laddr;
return EDPVS_OK;
}
- svc 代表后端服务,由于所有核会修改 svc->laddr_curr, 所以需要加锁。这块大并发会不会有问题,违背了 dpdk 的原则呢?如果 laddr_curr 不在 svc 里是不是可以避免?
- for 循环开始偿试获取 laddr,如果成功,最后设置到 conn->laddr 和 conn->lport
-
__get_laddr
获取本地 lip, 因为内网网卡可能绑定很多个 lip, 所以选取时也会有一些负载均衡策略。轮循的话,是否可以省去 svc->laddr_lock 这个锁?好像不行,ipvsadmin 增删改 lip 的话也会有问题 -
sa_fetch
获取端口,来完整填充 dsin, ssin 地址,暂时只看 ipv4 的实现
static int sa4_fetch(struct netif_port *dev,
const struct sockaddr_in *daddr,
struct sockaddr_in *saddr)
{
struct inet_ifaddr *ifa;
struct flow4 fl;
struct route_entry *rt;
int err;
assert(saddr);
if (saddr && saddr->sin_addr.s_addr != INADDR_ANY && saddr->sin_port != 0)
return EDPVS_OK; /* everything is known, why call this function ? */
/* if source IP is assiged, we can find ifa->this_sa_pool
* without @daddr and @dev. */
if (saddr->sin_addr.s_addr) {
ifa = inet_addr_ifa_get(AF_INET, dev, (union inet_addr*)&saddr->sin_addr);
if (!ifa)
return EDPVS_NOTEXIST;
if (!ifa->this_sa_pool) {
RTE_LOG(WARNING, SAPOOL, "%s: fetch addr on IP without pool.", __func__);
inet_addr_ifa_put(ifa);
return EDPVS_INVAL;
}
err = sa_pool_fetch(sa_pool_hash(ifa->this_sa_pool,
(struct sockaddr_storage *)daddr),
(struct sockaddr_storage *)saddr);
if (err == EDPVS_OK)
rte_atomic32_inc(&ifa->this_sa_pool->refcnt);
inet_addr_ifa_put(ifa);
return err;
}
/* try to find source ifa by @dev and @daddr */
memset(&fl, 0, sizeof(struct flow4));
fl.fl4_oif = dev;
fl.fl4_daddr.s_addr = daddr ? daddr->sin_addr.s_addr : htonl(INADDR_ANY);
fl.fl4_saddr.s_addr = saddr ? saddr->sin_addr.s_addr : htonl(INADDR_ANY);
rt = route4_output(&fl);
if (!rt)
return EDPVS_NOROUTE;;
/* select source address. */
if (!rt->src.s_addr) {
inet_addr_select(AF_INET, rt->port, (union inet_addr *)&rt->dest,
RT_SCOPE_UNIVERSE, (union inet_addr *)&rt->src);
}
ifa = inet_addr_ifa_get(AF_INET, rt->port, (union inet_addr *)&rt->src);
if (!ifa) {
route4_put(rt);
return EDPVS_NOTEXIST;
}
route4_put(rt);
if (!ifa->this_sa_pool) {
RTE_LOG(WARNING, SAPOOL, "%s: fetch addr on IP without pool.",
__func__);
inet_addr_ifa_put(ifa);
return EDPVS_INVAL;
}
/* do fetch socket address */
err = sa_pool_fetch(sa_pool_hash(ifa->this_sa_pool,
(struct sockaddr_storage *)daddr),
(struct sockaddr_storage *)saddr);
if (err == EDPVS_OK)
rte_atomic32_inc(&ifa->this_sa_pool->refcnt);
inet_addr_ifa_put(ifa);
return err;
}
- saddr->sin_addr.s_addr 如果设置了源地址,也就是 lip, 那么自然 ifa 网络接口就确认了,直接从 ifa 对应的 sa_pool 分配地址即可。
- 如果没有源地址,那么就要根据路由,自动选择一个 lip,然后走同样的逻辑
static inline int sa_pool_fetch(struct sa_entry_pool *pool,
struct sockaddr_storage *ss)
{
assert(pool && ss);
struct sa_entry *ent;
struct sockaddr_in *sin = (struct sockaddr_in *)ss;
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)ss;
ent = list_first_entry_or_null(&pool->free_enties, struct sa_entry, list);
if (!ent) {
pool->miss_cnt++;
return EDPVS_RESOURCE;
}
if (ss->ss_family == AF_INET) {
sin->sin_family = AF_INET;
sin->sin_addr.s_addr = ent->addr.in.s_addr;
sin->sin_port = ent->port;
} else if (ss->ss_family == AF_INET6) {
sin6->sin6_family = AF_INET6;
sin6->sin6_addr = ent->addr.in6;
sin6->sin6_port = ent->port;
} else {
return EDPVS_NOTSUPP;
}
ent->flags |= SA_F_USED;
list_move_tail(&ent->list, &pool->used_enties);
rte_atomic16_inc(&pool->used_cnt);
rte_atomic16_dec(&pool->free_cnt);
return EDPVS_OK;
}
- list_first_entry_or_null 从 lcore 本地 sa_pool 的 free 队列里取出第一个元素,就是可用的地址资源
- 更新 sin 的地址和端口
- 将当前资源标记为使用
- 将当前资源添加到 used 列表中
- 增减统计计数信息
这里是不是有问题?资源并没有从 free_enties 列表中移除?下一次请求还会复用?我去给官方提个 pr...
如果连接释放了,那么资源也会回收到 sa_pool 中。代码实现在 sa_pool_release
里,同样资源也没从 used_entries 中释放出来... 好奇怪
更新2018-11-19: 学艺不精,list_move_tail
会从原有的队列中删除,再添加到新队列...
总结
dpvs 实现的细节还是很多的,代码在不断升级更新,就在上周刚刚支持了 ipv6... 社区还是很伟大的,希望 iqiyi 能一直开源这个项目~~