2. NAPI机制

2. NAPI机制

上一篇博客已经分析了,内核是如何通过驱动收包的。但其中说到软中断时,对使用的NAPI接口的描述不是很清楚。所以这篇就来看看NAPI是何方圣神。

一, 框架

随着网络带宽的发展,网速越来越快,之前的中断收包模式已经无法适应目前千兆,万兆的带宽了。如果每个数据包大小等于MTU大小1460字节。当驱动以千兆网速收包时,CPU将每秒被中断91829次。在以MTU收包的情况下都会出现每秒被中断10万次的情况。过多的中断会引起一个问题,CPU一直陷入硬中断而没有时间来处理别的事情了。为了解决这个问题,内核在2.6中引入了NAPI机制。

NAPI就是混合中断和轮询的方式来收包,当有中断来了,驱动关闭中断,通知内核收包,内核软中断轮询当前网卡,在规定时间尽可能多的收包。时间用尽或者没有数据可收,内核再次开启中断,准备下一次收包。

二, NAPI接口

struct napi_struct 是内核处理软中断的入口,每个net_device都对应一个napi_struct,驱动在硬中断中将自己的napi_struct挂载到CPU的收包队列softnet_data。内核在软中断中轮询该队列,并执行napi_sturct中的回调函数int(*poll)(struct napi_struct *, int);,在poll函数中,驱动将网卡数据转换成skb_buff形式,最终发往协议栈。也就是说,协议栈对数据包的处理,使用的是软中断的时间片。如果协议栈处理耗费了过多的CPU时间的化,会直接影响到设备的网络性能。

/*
 * Structure for NAPI scheduling similar to tasklet but with weighting
 */
struct napi_struct {
    /* The poll_list must only be managed by the entity which
     * changes the state of the NAPI_STATE_SCHED bit.  This means
     * whoever atomically sets that bit can add this napi_struct
     * to the per-CPU poll_list, and whoever clears that bit
     * can remove from the list right before clearing the bit.
     */
    struct list_head    poll_list;

    unsigned long       state;//设备状态
    int         weight; //每次轮询最大处理数据包数量
    unsigned int        gro_count;
    int         (*poll)(struct napi_struct *, int);//轮询设备的回调函数
#ifdef CONFIG_NETPOLL
    int         poll_owner;
#endif
    struct net_device   *dev;
    struct sk_buff      *gro_list;
    struct sk_buff      *skb;
    struct hrtimer      timer;
    struct list_head    dev_list;
    struct hlist_node   napi_hash_node;
    unsigned int        napi_id;
};

有了保存数据的结构体,让我们在看看为它配套提供的接口函数吧:

netif_napi_add函数

驱动在初始化net_device时通过这函数将通过这个函数绑定一个napi_struct结构。驱动需要在这里注册软中断中用于轮询的网卡的poll函数。

void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
            int (*poll)(struct napi_struct *, int), int weight)
{
    INIT_LIST_HEAD(&napi->poll_list);
    hrtimer_init(&napi->timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL_PINNED);
    napi->timer.function = napi_watchdog;
    napi->gro_count = 0;
    napi->gro_list = NULL;
    napi->skb = NULL;
    napi->poll = poll;
    if (weight > NAPI_POLL_WEIGHT)
        pr_err_once("netif_napi_add() called with weight %d on device %s\n",
                weight, dev->name);
    napi->weight = weight;
    list_add(&napi->dev_list, &dev->napi_list);
    napi->dev = dev;
#ifdef CONFIG_NETPOLL
    napi->poll_owner = -1;
#endif
    set_bit(NAPI_STATE_SCHED, &napi->state);
    napi_hash_add(napi);
}

__napi_schedule函数

__napi_schedule函数,为驱动硬件中断提供的接口,驱动在硬件中断中,将自己的napi_struct挂载到当前CPU的softnet_data上。

/**
 * __napi_schedule - schedule for receive
 * @n: entry to schedule
 *
 * The entry's receive function will be scheduled to run.
 * Consider using __napi_schedule_irqoff() if hard irqs are masked.
 */
void __napi_schedule(struct napi_struct *n)
{
    unsigned long flags;

    local_irq_save(flags);
    ____napi_schedule(this_cpu_ptr(&softnet_data), n);
    local_irq_restore(flags);
}

/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
                     struct napi_struct *napi)
{
    list_add_tail(&napi->poll_list, &sd->poll_list);
    __raise_softirq_irqoff(NET_RX_SOFTIRQ); //设置了软中断接收标志位
}

napi_schedule_prep函数

napi_schedule_prep函数是上面__napi_schedule的配套函数,用于__napi_schedule调用前对napi_struct进行检查。前面博文e1000网卡的中断函数就是这样调用的。

if (likely(napi_schedule_prep(&adapter->napi))) {
        adapter->total_tx_bytes = 0;
        adapter->total_tx_packets = 0;
        adapter->total_rx_bytes = 0;
        adapter->total_rx_packets = 0;
        __napi_schedule(&adapter->napi);
 }

判断NAPI是否可以调度。如果NAPI没有被禁止,且不存在已被调度的NAPI,则允许调度NAPI,因为同一时刻只允许有一个NAPI poll instance。测试napi.state字段,只有当其不是NAPI_STATE_SCHED时,返回真,并设置为NAPI_STATE_SCHED.

/**
 *  napi_schedule_prep - check if napi can be scheduled
 *  @n: napi context
 *
 * Test if NAPI routine is already running, and if not mark
 * it as running.  This is used as a condition variable
 * insure only one NAPI poll instance runs.  We also make
 * sure there is no pending NAPI disable.
 */
bool napi_schedule_prep(struct napi_struct *n)
{
    unsigned long val, new;

    do {
        val = READ_ONCE(n->state);
        if (unlikely(val & NAPIF_STATE_DISABLE))
            return false;
        new = val | NAPIF_STATE_SCHED;

        /* Sets STATE_MISSED bit if STATE_SCHED was already set
         * This was suggested by Alexander Duyck, as compiler
         * emits better code than :
         * if (val & NAPIF_STATE_SCHED)
         *     new |= NAPIF_STATE_MISSED;
         */
        new |= (val & NAPIF_STATE_SCHED) / NAPIF_STATE_SCHED *
                           NAPIF_STATE_MISSED;
    } while (cmpxchg(&n->state, val, new) != val);

    return !(val & NAPIF_STATE_SCHED);
}

上面的三个函数netif_napi_add,__napi_schedule,napi_schedule_prep是驱动使用NAPI收包机制的接口,下面再看看内核软中断使用NAPI的接口函数吧。

napi_poll函数

这函数是被软中断处理函数net_rx_action调用的。这个函数将在napi_struct.weight规定的时间内,被net_rx_action循环调用,直到时间片用尽或者网卡当前DMA中所有缓存的数据包被处理完。如果是由于时间片用尽而退出的的话,napi_struct会重新挂载到softnet_data上,而如果是所有数据包处理完退出的,napi_struct会从softnet_data上移除并重新打开网卡硬件中断。

static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{
    void *have;
    int work, weight;

    list_del_init(&n->poll_list);

    have = netpoll_poll_lock(n);

    weight = n->weight;

    /* This NAPI_STATE_SCHED test is for avoiding a race
     * with netpoll's poll_napi().  Only the entity which
     * obtains the lock and sees NAPI_STATE_SCHED set will
     * actually make the ->poll() call.  Therefore we avoid
     * accidentally calling ->poll() when NAPI is not scheduled.
     */
    work = 0;
    if (test_bit(NAPI_STATE_SCHED, &n->state)) {
        work = n->poll(n, weight);  //调用网卡注册的poll函数
        trace_napi_poll(n, work, weight);
    }

    WARN_ON_ONCE(work > weight);

    if (likely(work < weight))
        goto out_unlock;

    /* Drivers must not modify the NAPI state if they
     * consume the entire weight.  In such cases this code
     * still "owns" the NAPI instance and therefore can
     * move the instance around on the list at-will.
     */
    if (unlikely(napi_disable_pending(n))) {
        napi_complete(n);
        goto out_unlock;
    }

    if (n->gro_list) {
        /* flush too old packets
         * If HZ < 1000, flush all packets.
         */
        napi_gro_flush(n, HZ >= 1000);
    }

    /* Some drivers may have called napi_schedule
     * prior to exhausting their budget.
     */
    if (unlikely(!list_empty(&n->poll_list))) {
        pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
                 n->dev ? n->dev->name : "backlog");
        goto out_unlock;
    }

    list_add_tail(&n->poll_list, repoll);

out_unlock:
    netpoll_poll_unlock(have);

    return work;
}

napi_gro_receive函数

准确来说napi_gro_receive函数是驱动通过poll注册,内核调用的函数。通过这函数的的调用,skb将会传给协议栈的入口函数__netif_receive_skbdev_gro_receive函数用于对数据包的合并,他将合并napi_struct.gro_list链表上的skb。GRO是一个网络子系统的另一套机制,以后再看。

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
    skb_mark_napi_id(skb, napi);
    trace_napi_gro_receive_entry(skb);

    skb_gro_reset_offset(skb);

    return napi_skb_finish(dev_gro_receive(napi, skb), skb);
}

总结

netif_napi_add:驱动初始时向内核注册软软中断处理回调poll函数

__napi_schedule:网卡硬件中断用来触发软中断

napi_poll:软中断处理函数net_rx_action用来回调上面驱动初始化是通过netif_napi_add注册的回调收包poll函数

napi_gro_receivepoll函数用来将网卡上的数据包发给协议栈处理。

到这,NAPI机制下的收包处理流程就很清晰了。IRQ->__napi_schedule->进入软中断->net_rx_action->napi_poll->驱动注册的poll->napi_gro_receive

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 网卡收包 内核网络模块如何初始化? 内核如何通过网卡驱动收发数据包? 驱动收到的数据怎么交给协议栈处理? 一,框架...
    霜晨月_ScY阅读 7,340评论 0 5
  • 1.Libpcap 的工作原理 Libpcap的工作原理可以描述为,当一个数据包到达网卡时,通过网络分接口(即旁路...
    shaarawy18阅读 19,317评论 1 15
  • 背景 2017年年初以来,随着Redis产品的用户量越来越大,接入服务越来越多,再加上美团点评Memcache和R...
    SithCait阅读 3,964评论 0 0
  • NAPI真的是kernel开发者词穷想的名字吧,你看看kernel里面各种名字,不知道为啥就不能起个好听点的。 言...
    ybzhao阅读 14,703评论 3 4
  • 本来想继续写 socket 实现,发现网络栈是一个整体,不搞懂网卡与内核的交互,就缺少了最重要的一环。参考内核 4...
    董泽润阅读 11,183评论 0 5

友情链接更多精彩内容