注: 内核代码是 4.9 版本
协议栈从报文接收说起,报文接收从网卡驱动说起。
两种方式,NAPI 和 非NAPI。
NAPI(New API) 是Linux内核针对网络数据传输做出的一个优化措施。
其目的是在大量数据传输时, 在收到硬件中断后,通过poll方式将传输过来的数据包统一处理, 通过禁止网络设备中断以减少硬件中断数量((Interrupt Mitigation),从而实现更高的数据传输。
其中要点:
1、硬件中断后开始处理报文。中断处理函数只是触发软中断准备接收报文;
2、软中断中通过pool方式处理报文。通过轮训的方式一次性处理多个报文;
3、禁止网络设备中断以减少硬件中断数量。同上,在软中断处理函数中,将禁止中断,处理完成后,开启中断,这样一次中断处理多个报文。
先从NAPI方式说起
以 Inter 的 e1000 的驱动为例,e1000在加载驱动后会调用 e1000_probe 做设备识别和初始化重要的是设备的napi结构 和 poll函数是在这个函数中设置的:
- 为设备创建协议栈网络设备结构net_device,初始化;
- 映射硬件内存,后面用于DMA收包;
- 初始化设备napi结构,挂载poll函数(轮训收包),初始将adapter->napi 挂到 netdev->napi_list,后面中断处理函数中会将其加入到每cpu变量softnet_data的poll_list中并触发软中断poll处理报文;
- 挂载 net_device_ops,其中包括ndo_open函数(网卡UP后会调用),ndo_open会调用的 e1000_request_irq注册网卡中断处理函数e1000_intr。还包括网卡发包ndo_start_xmit等重要函数。
static int e1000_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
struct net_device *netdev;
struct e1000_adapter *adapter;
struct e1000_hw *hw;
......
// 为网卡创建网络设备对象 net_device 结构,并完成组册
netdev = alloc_etherdev(sizeof(struct e1000_adapter));
if (!netdev)
goto err_alloc_etherdev;
SET_NETDEV_DEV(netdev, &pdev->dev);
// 设置网卡私有数据
pci_set_drvdata(pdev, netdev);
adapter = netdev_priv(netdev);
adapter->netdev = netdev;
adapter->pdev = pdev;
adapter->msg_enable = netif_msg_init(debug, DEFAULT_MSG_ENABLE);
adapter->bars = bars;
adapter->need_ioport = need_ioport;
hw = &adapter->hw;
hw->back = adapter;
err = -EIO;
// 映射寄存器IO区域(后面拷贝报文)
hw->hw_addr = pci_ioremap_bar(pdev, BAR_0);
if (!hw->hw_addr)
goto err_ioremap;
......
// 挂载网络设备操作接口
netdev->netdev_ops = &e1000_netdev_ops;
e1000_set_ethtool_ops(netdev);
netdev->watchdog_timeo = 5 * HZ;
// 初始化并挂载设备的NAPI接口,e1000_clean 是其poll函数,软中断中调用处理报文
// adapter->napi 挂到 netdev->napi_list
netif_napi_add(netdev, &adapter->napi, e1000_clean, 64);
strncpy(netdev->name, pci_name(pdev), sizeof(netdev->name) - 1);
......
}
然后是NAPI的接收流程。
在网卡中断之前,数据包到达网卡之后,就通过DMA直接将数据从网卡拷贝到内存的环形缓冲区了,成为 ring buffer,和非NAPI不同。
中断处理函数,如果没有在运行的NAPI任务,调度一个新的NAPI任务,会调用通用的NAPI处理函数,__napi_schedule,将设备的napi 挂载到当前CPU的 softnet_data 的待轮训设备列表poll_list中,并触发软中断。
napi_schedule_prep 中会检查并设置 napi_struct的 NAPI_STATE_SCHED位,并在流程结束后(软中断处理完报文)调用 __napi_complete,清楚状态位。
static irqreturn_t e1000_intr(int irq, void *data)
{
struct net_device *netdev = data;
struct e1000_adapter *adapter = netdev_priv(netdev);
struct e1000_hw *hw = &adapter->hw;
......
// 如果没有在运行的NAPI任务,调度一个新的NAPI任务
if (likely(napi_schedule_prep(&adapter->napi))) {
adapter->total_tx_bytes = 0;
adapter->total_tx_packets = 0;
adapter->total_rx_bytes = 0;
adapter->total_rx_packets = 0;
// 调用通用的NAPI处理函数
__napi_schedule(&adapter->napi);
} else {
/* this really should not happen! if it does it is basically a
* bug, but not a hard error, so enable ints and continue
*/
if (!test_bit(__E1000_DOWN, &adapter->flags))
e1000_irq_enable(adapter);
}
return IRQ_HANDLED;
}
void __napi_schedule(struct napi_struct *n)
{
unsigned long flags;
local_irq_save(flags);
____napi_schedule(this_cpu_ptr(&softnet_data), n);
local_irq_restore(flags);
}
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
// 将设备的napi 挂载带了 sd 的poll_list中。
list_add_tail(&napi->poll_list, &sd->poll_list);
// 触发软中断
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
}
软中断接收处理函数是net_rx_action,在网络设备模块初始化时注册。open_softirq 在软中断向量表中为指定的软中断编号设置处理函数。
net_dev_init:
open_softirq(NET_TX_SOFTIRQ, net_tx_action);
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
kernel/softirq.c
void open_softirq(int nr, void (*action)(struct softirq_action *))
{
softirq_vec[nr].action = action;
}
内核执行软中断的地方如下:
- 在中断处理程序的后半部分执行软中断,对执行时间有限制:不能超过2毫秒,并且最多执行10次。
- 每个处理器有一个软中断线程,调度策略是SCHED_NORMAL,优先级是120。
- 开启软中断的函数local_bh_enable()。
在中断处理程序的后半部分,调用函数irq_exit()以退出中断上下文,处理软中断,下面是一个例子:
void __irq_entry do_IRQ(unsigned int irq)
{
irq_enter();
check_stack_overflow();
generic_handle_irq(irq);
irq_exit();
}
kernel/softirq.c
void irq_exit(void)
{
…
preempt_count_sub(HARDIRQ_OFFSET);
if (! in_interrupt() && local_softirq_pending())
invoke_softirq();
…
}
如果没有强制中断线程化,那么调用函数__do_softirq()执行软中断。
static inline void invoke_softirq(void)
{
if (ksoftirqd_running(local_softirq_pending()))
return;
if (!force_irqthreads) {
#ifdef CONFIG_HAVE_IRQ_EXIT_ON_IRQ_STACK
/*
* We can safely execute softirq on the current stack if
* it is the irq stack, because it should be near empty
* at this stage.
*/
__do_softirq();
#else
/*
* Otherwise, irq_exit() is called on the task stack that can
* be potentially deep already. So call softirq in its own stack
* to prevent from any overrun.
*/
do_softirq_own_stack();
#endif
} else {
wakeup_softirqd();
}
}
__do_softirq 判断哪个 softirq 被 pending,计算 softirq 时间,用于统计,更新 softirq 执行相关的统计数据,执行 pending softirq 的处理函数。
asmlinkage __visible void __softirq_entry __do_softirq(void)
{
unsigned long end = jiffies + MAX_SOFTIRQ_TIME;
unsigned long old_flags = current->flags;
int max_restart = MAX_SOFTIRQ_RESTART;
struct softirq_action *h;
bool in_hardirq;
__u32 pending;
int softirq_bit;
/*
* Mask out PF_MEMALLOC s current task context is borrowed for the
* softirq. A softirq handled such as network RX might set PF_MEMALLOC
* again if the socket is related to swap
*/
current->flags &= ~PF_MEMALLOC;
//获取当前CPU的软中断寄存器__softirq_pending值到局部变量pending
pending = local_softirq_pending();
account_irq_enter_time(current);
__local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET); //增加preempt_count中的softirq域计数,表明当前在软中断上下文中
in_hardirq = lockdep_softirq_start();
restart:
/* Reset the pending bitmask before enabling irqs */
set_softirq_pending(0); //清除软中断寄存器__softirq_pending
local_irq_enable(); //打开本地中断,中断处理函数会被中断打断,但不会被软中断打断。因为在irq_exit的时候会判断是否在中断上下文,如果在中断上下文不会去执行软中断。即当一个软中断被中断打断,然后在中断上半部返回时,它不会去执行新的软中断,所以软中断在一个[cpu]上总是串行执行。
h = softirq_vec; //指向softirq_vec第一个元素,即软中断HI_SOFTIRQ对应的处理函数
while ((softirq_bit = ffs(pending))) { //ffs()找到pending中第一个置位的比特位,返回值是第一个为1的位序号。这里的位是从低位开始,这也和优先级相吻合,低位优先得到执行。如果没有则返回0,退出循环。
unsigned int vec_nr;
int prev_count;
h += softirq_bit - 1; //根据sofrirq_bit找到对应的软中断描述符,即软中断处理函数
vec_nr = h - softirq_vec;
prev_count = preempt_count();
kstat_incr_softirqs_this_cpu(vec_nr);
trace_softirq_entry(vec_nr);
h->action(h); //执行对应软中断函数
trace_softirq_exit(vec_nr);
if (unlikely(prev_count != preempt_count())) {
pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
vec_nr, softirq_to_name[vec_nr], h->action,
prev_count, preempt_count());
preempt_count_set(prev_count);
}
h++;
pending >>= softirq_bit;
}
rcu_bh_qs();
local_irq_disable(); //关闭本地中断
pending = local_softirq_pending(); //再次检查是否有软中断产生,在上一次检查至此这段时间有新软中断产生。
//再次触发软中断执行的三个条件:1.软中断处理时间不超过2jiffies,200Hz的系统对应10ms;2.当前没有有进程需要调度,即!need_resched();3.这种循环不超过10次。
if (pending) {
if (time_before(jiffies, end) && !need_resched() &&
--max_restart)
goto restart;
wakeup_softirqd(); //如果上面的条件不满足,则唤醒ksoftirq内核线程来处理软中断
}
lockdep_softirq_end(in_hardirq);
account_irq_exit_time(current);
__local_bh_enable(SOFTIRQ_OFFSET);
WARN_ON_ONCE(in_interrupt());
tsk_restore_flags(current, old_flags, PF_MEMALLOC);
}
内核ksoftirqd%d(%d对应CPU的ID)内核线程用于处理CPU上的软中断。内核在初始化的时候,会在每个CPU上启动一个这样的内核线程用来处理每个CPU上的软中断。
最终会调用到上面注册的收发报的软中断处理函数。
ksoftirqd 是在中断下半部没处理完的情况下被唤醒继续处理的,并不是处理所有的报文。
// 注册
static struct smp_hotplug_thread softirq_threads = {
.store = &ksoftirqd,
.thread_should_run = ksoftirqd_should_run,
.thread_fn = run_ksoftirqd,
.thread_comm = "ksoftirqd/%u",
};
static __init int spawn_ksoftirqd(void)
{
register_cpu_notifier(&cpu_nfb);
BUG_ON(smpboot_register_percpu_thread(&softirq_threads));
return 0;
}
early_initcall(spawn_ksoftirqd);
// ksoftirqd 处理函数
static void run_ksoftirqd(unsigned int cpu)
{
local_irq_disable();
if (local_softirq_pending()) {
/*
* We can safely run softirq on inline stack, as we are not deep
* in the task stack here.
*/
__do_softirq();
local_irq_enable();
cond_resched_rcu_qs();
return;
}
local_irq_enable();
}
asmlinkage __visible void __softirq_entry __do_softirq(void)
{
...
while ((softirq_bit = ffs(pending))) {
unsigned int vec_nr;
int prev_count;
h += softirq_bit - 1;
vec_nr = h - softirq_vec;
...
h->action(h);
...
h++;
pending >>= softirq_bit;
}
...
}
收包软中断处理函数,最终调用设备poll 函数处理报文。
static __latent_entropy void net_rx_action(struct softirq_action *h)
{
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget;
LIST_HEAD(list);
LIST_HEAD(repoll);
local_irq_disable();
list_splice_init(&sd->poll_list, &list);
local_irq_enable();
for (;;) {
struct napi_struct *n;
if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
return;
break;
}
// 这里始终取第一个,处理完成摘除节点处理是在napi_poll中。
n = list_first_entry(&list, struct napi_struct, poll_list);
// napi_poll 主要是为了调用设备注册的poll 函数,如果报文未处理完(通过poll 函数的配额判断)会通过repoll记录这个设备的napi结构,后面再挂到sd中。
budget -= napi_poll(n, &repoll);
/* If softirq window is exhausted then punt.
* Allow this to run for 2 jiffies since which will allow
* an average latency of 1.5/HZ.
*/
// budget 是每次软中断执行的配额,配额用尽或者poll函数执行的时间超过2个tick,结束处理
if (unlikely(budget <= 0 ||
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}
__kfree_skb_flush();
local_irq_disable();
// 把这个napi重新加到sd->poll_list头部,等待下次软中断再次poll
list_splice_tail_init(&sd->poll_list, &list);
list_splice_tail(&repoll, &list);
list_splice(&list, &sd->poll_list);
if (!list_empty(&sd->poll_list))
// 存在未处理完的情况,再次触发软中断,等待下次处理
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
net_rps_action_and_irq_enable(sd);
}
static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{
void *have;
int work, weight;
// 摘节点
list_del_init(&n->poll_list);
have = netpoll_poll_lock(n);
// poll 函数的配额
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidentally calling ->poll() when NAPI is not scheduled.
*/
work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
// 调用设备的poll 函数处理报文
work = n->poll(n, weight);
trace_napi_poll(n, work, weight);
}
WARN_ON_ONCE(work > weight);
// 配额未用尽,结束处理
if (likely(work < weight))
goto out_unlock;
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
if (unlikely(napi_disable_pending(n))) {
napi_complete(n);
goto out_unlock;
}
if (n->gro_list) {
/* flush too old packets
* If HZ < 1000, flush all packets.
*/
napi_gro_flush(n, HZ >= 1000);
}
/* Some drivers may have called napi_schedule
* prior to exhausting their budget.
*/
if (unlikely(!list_empty(&n->poll_list))) {
pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
n->dev ? n->dev->name : "backlog");
goto out_unlock;
}
// 配额用尽,说明存在报文未处理完,记录,等待下次软中断处理。
list_add_tail(&n->poll_list, repoll);
out_unlock:
netpoll_poll_unlock(have);
return work;
}
设备的poll 函数,e1000_clean。adapter->clean_rx 是在e1000_open中挂载的e1000_clean_rx_irq,它循环处理设备ring buf中的报文,并调用 e1000_receive_skb处理。
e1000_clean_rx_irq 函数循环处理rx_ring 中的报文,注意设备的ndo_open函数中已经为ring buf中每个buffer info分配了skb,并映射DMA 内存区域到skb->data,报文时机已经拷贝到skb了。
static int e1000_clean(struct napi_struct *napi, int budget)
{
struct e1000_adapter *adapter = container_of(napi, struct e1000_adapter,
napi);
int tx_clean_complete = 0, work_done = 0;
tx_clean_complete = e1000_clean_tx_irq(adapter, &adapter->tx_ring[0]);
adapter->clean_rx(adapter, &adapter->rx_ring[0], &work_done, budget);
if (!tx_clean_complete)
work_done = budget;
/* If budget not fully consumed, exit the polling mode */
if (work_done < budget) {
if (likely(adapter->itr_setting & 3))
e1000_set_itr(adapter);
napi_complete_done(napi, work_done);
if (!test_bit(__E1000_DOWN, &adapter->flags))
e1000_irq_enable(adapter);
}
return work_done;
}
static void e1000_alloc_rx_buffers(struct e1000_ring *rx_ring,
int cleaned_count, gfp_t gfp) {
......
skb = __netdev_alloc_skb_ip_align(netdev, bufsz, gfp);
if (!skb) {
/* Better luck next round */
adapter->alloc_rx_buff_failed++;
break;
}
buffer_info->skb = skb;
map_skb:
buffer_info->dma = dma_map_single(&pdev->dev, skb->data,
adapter->rx_buffer_len,
DMA_FROM_DEVICE);
......
}
static bool e1000_clean_rx_irq(struct e1000_ring *rx_ring, int *work_done,
int work_to_do)
{
struct e1000_adapter *adapter = rx_ring->adapter;
struct net_device *netdev = adapter->netdev;
struct pci_dev *pdev = adapter->pdev;
struct e1000_hw *hw = &adapter->hw;
union e1000_rx_desc_extended *rx_desc, *next_rxd;
struct e1000_buffer *buffer_info, *next_buffer;
u32 length, staterr;
unsigned int i;
int cleaned_count = 0;
bool cleaned = false;
unsigned int total_rx_bytes = 0, total_rx_packets = 0;
i = rx_ring->next_to_clean;
rx_desc = E1000_RX_DESC_EXT(*rx_ring, i);
staterr = le32_to_cpu(rx_desc->wb.upper.status_error);
buffer_info = &rx_ring->buffer_info[i];
while (staterr & E1000_RXD_STAT_DD) {
struct sk_buff *skb;
if (*work_done >= work_to_do)
break;
(*work_done)++;
dma_rmb(); /* read descriptor and rx_buffer_info after status DD */
skb = buffer_info->skb;
buffer_info->skb = NULL;
prefetch(skb->data - NET_IP_ALIGN);
......
e1000_receive_skb(adapter, netdev, skb, staterr,
rx_desc->wb.upper.vlan);
......
staterr = le32_to_cpu(rx_desc->wb.upper.status_error);
}
rx_ring->next_to_clean = i;
......
return cleaned;
}
e1000_receive_skb 解析eth头,获取上次协议类型,以及设置 skb->pkt_type,然后调用napi_gro_receive ,在开启GRO的情况下尝试走GRO接收,否者将数据上送协议栈。
GRO(generic receive offload)主要思想就是,组合一些类似的数据包(基于一些数据域)为一个大的数据包(一个skb),然后feed给协议栈,这里主要是利用Scatter-gather IO,也就是skb的struct skb_shared_info域来合并数据包。
static void e1000_receive_skb(struct e1000_adapter *adapter, u8 status,
__le16 vlan, struct sk_buff *skb)
{
// 解析eth头,获取上次协议类型,以及设置 skb->pkt_type
skb->protocol = eth_type_trans(skb, adapter->netdev);
if (status & E1000_RXD_STAT_VP) {
u16 vid = le16_to_cpu(vlan) & E1000_RXD_SPC_VLAN_MASK;
__vlan_hwaccel_put_tag(skb, htons(ETH_P_8021Q), vid);
}
napi_gro_receive(&adapter->napi, skb);
}
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
skb_mark_napi_id(skb, napi);
trace_napi_gro_receive_entry(skb);
skb_gro_reset_offset(skb);
return napi_skb_finish(dev_gro_receive(napi, skb), skb);
}
EXPORT_SYMBOL(napi_gro_receive);
// 根据 dev_gro_receive 的返回结果处理报文
static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)
{
switch (ret) {
// 不支持GRO,送入协议栈
case GRO_NORMAL:
if (netif_receive_skb_internal(skb))
ret = GRO_DROP;
break;
case GRO_DROP:
kfree_skb(skb);
break;
case GRO_MERGED_FREE:
/* //表示skb可以被free,因为gro已经将skb合并并保存起来。 */
if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
napi_skb_free_stolen_head(skb);
else
__kfree_skb(skb);
break;
// 报文已经被保存,但没做合并,skb被接管不需要释放
case GRO_HELD:
case GRO_MERGED:
break;
}
return ret;
}
最后送入协议栈的处理都是调用的 netif_receive_skb_internal,如果配置了RPS,会走RPS接收流程,选中CPU后,走一遍非NAPI收包流程,不做详细说。否者,调用__netif_receive_skb 上送上次协议栈。
RPS全称Receive packet Steering,用于在软件层面实现报文在多个CPU之间的负载均衡以及提高报文处理的缓存命中率,和它类似的还有一个RFS(Receive Flow Steering)rps和rfs出现的原因主要有以下两个:
1、 对于多队列网卡,网卡硬件接收队列与cpu核数在数量上不匹配导致报文在cpu之间分配不均。
2、 对于单队列网卡,rps和rfs可以在软件层面将报文平均分配到多个cpu上。
static int netif_receive_skb_internal(struct sk_buff *skb)
{
int ret;
net_timestamp_check(netdev_tstamp_prequeue, skb);
if (skb_defer_rx_timestamp(skb))
return NET_RX_SUCCESS;
rcu_read_lock();
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
// 根据报文以及入接口信息获取CPU以及rps_dev_flow
int cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu >= 0) {
/* 这个是非NAPI中断收包的流程,这里选中了CPU之后,将报文放入softnet_data的input_pkt_queue队列中,
softnet_data是每CPU的私有数据对象,它自带一个napi_struct成员backlog,函数enqueue_to_backlog
就是将backlog加入到softnet_data的待轮训设备列表中,并触发软中断,在软中断处理函数net_rx_action()
中,同样会调用backlog代表的CPU共用轮训设备的poll函数,为process_backlog(net_dev_init中初始化),
该函数会调用__netif_receive_skb()函数将报文送到上层协议栈处理。
*/
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
return ret;
}
}
#endif
ret = __netif_receive_skb(skb);
rcu_read_unlock();
return ret;
}
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;
sd = &per_cpu(softnet_data, cpu);
local_irq_save(flags);
rps_lock(sd);
if (!netif_running(skb->dev))
goto drop;
qlen = skb_queue_len(&sd->input_pkt_queue);
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
if (qlen) {
enqueue:
// 报文挂载到input_pkt_queue中
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);
local_irq_restore(flags);
return NET_RX_SUCCESS;
}
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
*/
// 测试释放已经有调度实例
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))
// 这里将sd 自己的napi_struct 挂到了待轮训列表,设备无关的,共用的。
____napi_schedule(sd, &sd->backlog);
}
goto enqueue;
}
drop:
sd->dropped++;
rps_unlock(sd);
local_irq_restore(flags);
atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}
后面就是根据上层协议类型,调用对应的协议接收处理函数处理报文,算是正式进入上层协议栈了。无论是否是能了RPS,最终都会调用__netif_receive_skb 调用__netif_receive_skb_core,主要做:
1、ptype_all处理,例如抓包程序、raw socket等;
2、内核ingress处理,包括两部分,先做tc ingress 的处理,通过tc 挂载的bpf ingress prog程序在这里被调用。再调用 nf_ingress 进入netfilter ingress hook点处理;
3、如果存在vlan头,做vlan报文的处理(vlan_do_receive),解析vlan信息,根据vlanid查找vlan子接口,找到后,替换skb->dev返回another_round处重新处理报文,相当于vlan子接口接收到了报文,应该支持多层vlan,QinQ的情况下多次走这个流程;
4、特殊设备接口处理,例如OVS、linux bridge、bond、macvlan、ipvlan等;
5、ptype_base处理,交给协议栈处理,例如ip、arp等;
static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
struct packet_type *ptype, *pt_prev;
rx_handler_func_t *rx_handler;
struct net_device *orig_dev;
bool deliver_exact = false;
int ret = NET_RX_DROP;
__be16 type;
net_timestamp_check(!netdev_tstamp_prequeue, skb);
// bpf tracepoint 埋点
trace_netif_receive_skb(skb);
orig_dev = skb->dev;
skb_reset_network_header(skb);
if (!skb_transport_header_was_set(skb))
skb_reset_transport_header(skb);
skb_reset_mac_len(skb);
pt_prev = NULL;
another_round:
// 送上层协议栈前设置iif
skb->skb_iif = skb->dev->ifindex;
__this_cpu_inc(softnet_data.processed);
if (skb->protocol == cpu_to_be16(ETH_P_8021Q) ||
skb->protocol == cpu_to_be16(ETH_P_8021AD)) {
// 这里解析了vlan头,再skb中记录vlan id,上层协议类型,并下移skb->data
skb = skb_vlan_untag(skb);
if (unlikely(!skb))
goto out;
}
#ifdef CONFIG_NET_CLS_ACT
if (skb->tc_verd & TC_NCLS) {
skb->tc_verd = CLR_TC_NCLS(skb->tc_verd);
goto ncls;
}
#endif
if (pfmemalloc)
goto skip_taps;
// ptype_all,如tcpdump,所有包都会调用注册的handle处理
list_for_each_entry_rcu(ptype, &ptype_all, list) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
// 协议区分全局和设备指定的
list_for_each_entry_rcu(ptype, &skb->dev->ptype_all, list) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
skip_taps:
#ifdef CONFIG_NET_INGRESS
if (static_key_false(&ingress_needed)) {
skb = sch_handle_ingress(skb, &pt_prev, &ret, orig_dev);
if (!skb)
goto out;
if (nf_ingress(skb, &pt_prev, &ret, orig_dev) < 0)
goto out;
}
#endif
#ifdef CONFIG_NET_CLS_ACT
skb->tc_verd = 0;
ncls:
#endif
if (pfmemalloc && !skb_pfmemalloc_protocol(skb))
goto drop;
if (skb_vlan_tag_present(skb)) {
if (pt_prev) {
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = NULL;
}
// 这里主要做的事情是更新skb->dev为vlan id对应的设备(vlan子接口),然后再走一遍接收处理。
if (vlan_do_receive(&skb))
goto another_round;
else if (unlikely(!skb))
goto out;
}
/*
bridge、ovs的接口,都会走到。
如果一个dev被添加到一个bridge(做为bridge的一个接口),这个接口设备的rx_handler将被设置为,
br_handle_frame函数这是在br_add_if函数中设置的,而br_add_if (net/bridge/br_if.c)是在向
网桥设备上添加接口时设置的。进入br_handle_frame也就进入了bridge的逻辑代码。*/
rx_handler = rcu_dereference(skb->dev->rx_handler);
if (rx_handler) {
if (pt_prev) {
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = NULL;
}
switch (rx_handler(&skb)) {
case RX_HANDLER_CONSUMED: // 报文已经被消费,结束处理
ret = NET_RX_SUCCESS;
goto out;
case RX_HANDLER_ANOTHER: // skb->dev 被修改,重新走一次
goto another_round;
case RX_HANDLER_EXACT: /* 精确传递到ptype->dev == skb->dev */
deliver_exact = true;
case RX_HANDLER_PASS:
break;
default:
BUG();
}
}
if (unlikely(skb_vlan_tag_present(skb))) {
// 还有vlan标记,说明找不到vlanid对应的设备,存在vlanid,则判定是到其他设备的包
if (skb_vlan_tag_get_id(skb))
skb->pkt_type = PACKET_OTHERHOST;
/* Note: we might in the future use prio bits
* and set skb->priority like in vlan_do_receive()
* For the time being, just ignore Priority Code Point
*/
skb->vlan_tci = 0;
}
type = skb->protocol;
/* deliver only exact match when indicated */
/* 设置三层协议,下面提交都是按照解析三层协议提交的,调用最终的三层协议注册的handler,如ip_rcv */
if (likely(!deliver_exact)) {
deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
&ptype_base[ntohs(type) &
PTYPE_HASH_MASK]);
}
deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
&orig_dev->ptype_specific);
if (unlikely(skb->dev != orig_dev)) {
deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
&skb->dev->ptype_specific);
}
if (pt_prev) {
if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
goto drop;
else
ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
} else {
drop:
if (!deliver_exact)
atomic_long_inc(&skb->dev->rx_dropped);
else
atomic_long_inc(&skb->dev->rx_nohandler);
kfree_skb(skb);
/* Jamal, now you will not able to escape explaining
* me how you were going to use this. :-)
*/
ret = NET_RX_DROP;
}
out:
return ret;
}
static inline int deliver_skb(struct sk_buff *skb,
struct packet_type *pt_prev,
struct net_device *orig_dev)
{
if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
return -ENOMEM;
atomic_inc(&skb->users);
return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}
各个协议的处理函数,是在各个协议模块初始化的时候注册的,如下面ipv4协议,注册的ip_rcv处理函数。
static struct packet_type ip_packet_type __read_mostly = {
.type = cpu_to_be16(ETH_P_IP),
.func = ip_rcv,
};
static int __init inet_init(void)
{
...
dev_add_pack(&ip_packet_type);
...
}
补充 5.15 版本内核__netif_receive_skb_core 新增部分功能
- 所有处理前,调用do_xdp_generic 完成generic xdp的处理,调用设备上挂载的xdp prog程序,返回XDP_PASS的情况下继续处理,否则报文已经被xdp接管处理,直接返回;
- 调用 sch_handle_ingress完成设备 ingress hook点的处理。
static int __netif_receive_skb_core(struct sk_buff **pskb, bool pfmemalloc,
struct packet_type **ppt_prev)
{
struct packet_type *ptype, *pt_prev;
rx_handler_func_t *rx_handler;
struct sk_buff *skb = *pskb;
struct net_device *orig_dev;
bool deliver_exact = false;
int ret = NET_RX_DROP;
__be16 type;
net_timestamp_check(!netdev_tstamp_prequeue, skb);
trace_netif_receive_skb(skb);
orig_dev = skb->dev;
skb_reset_network_header(skb);
if (!skb_transport_header_was_set(skb))
skb_reset_transport_header(skb);
skb_reset_mac_len(skb);
pt_prev = NULL;
another_round:
skb->skb_iif = skb->dev->ifindex;
__this_cpu_inc(softnet_data.processed);
if (static_branch_unlikely(&generic_xdp_needed_key)) {
int ret2;
migrate_disable();
ret2 = do_xdp_generic(rcu_dereference(skb->dev->xdp_prog), skb);
migrate_enable();
if (ret2 != XDP_PASS) {
ret = NET_RX_DROP;
goto out;
}
}
if (eth_type_vlan(skb->protocol)) {
skb = skb_vlan_untag(skb);
if (unlikely(!skb))
goto out;
}
if (skb_skip_tc_classify(skb))
goto skip_classify;
if (pfmemalloc)
goto skip_taps;
list_for_each_entry_rcu(ptype, &ptype_all, list) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
list_for_each_entry_rcu(ptype, &skb->dev->ptype_all, list) {
if (pt_prev)
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = ptype;
}
skip_taps:
#ifdef CONFIG_NET_INGRESS
if (static_branch_unlikely(&ingress_needed_key)) {
bool another = false;
skb = sch_handle_ingress(skb, &pt_prev, &ret, orig_dev,
&another);
if (another)
goto another_round;
if (!skb)
goto out;
if (nf_ingress(skb, &pt_prev, &ret, orig_dev) < 0)
goto out;
}
#endif
skb_reset_redirect(skb);
skip_classify:
if (pfmemalloc && !skb_pfmemalloc_protocol(skb))
goto drop;
if (skb_vlan_tag_present(skb)) {
if (pt_prev) {
ret = deliver_skb(skb, pt_prev, orig_dev);
pt_prev = NULL;
}
if (vlan_do_receive(&skb))
goto another_round;
else if (unlikely(!skb))
goto out;
}
......
return ret;
}
非NAPI方式
非NAPI方式,一般流程是(eth口):
1、设备驱动程序调用netdev_alloc_skb 分配sk_buf,并完成数据拷贝;
2、调用eth_type_trans 解析eth头,设置上层协议类型和 pkt_type
3、调用netif_rx --> netif_rx_internal -->enqueue_to_backlog -->___napi_schedule流程,将报文挂载到softnet_data的input_pkt_queue,将 softnet_data的 napi_struct结构的backlog 挂载到 softnet_data 的待轮训设备列表中,触发软中断;
4、软中断中调用 softnet_data的backlog poll函数,处理input_pkt_queue中的报文,即process_backlog.
可以看到,这里的软中断之前,报文已经拷贝input_pkt_queue中,而当软中断开始运行时,input_pkt_queue中可能已经有不同网卡的报文了,process_backlog作为一个CPU 通用的poll函数处理不同网卡的报文。
static int process_backlog(struct napi_struct *napi, int quota)
{
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
bool again = true;
int work = 0;
/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}
napi->weight = weight_p;
while (again) {
struct sk_buff *skb;
// 最重要的,出队,调用__netif_receive_skb 送协议栈,同上面NAPI方式。
while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
__netif_receive_skb(skb);
rcu_read_unlock();
input_queue_head_incr(sd);
if (++work >= quota)
return work;
}
local_irq_disable();
rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
again = false;
} else {
skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
}
rps_unlock(sd);
local_irq_enable();
}
return work;
}