kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp
sar -n DEV 1

qdisc实现分析

1
2
3
4
tc qdisc show
echo pfifo > /proc/sys/net/core/default_qdisc
tc qdisc add dev eth0 root pfifo
tc qdisc del dev eth0 root

https://github.com/liucimin/Learning/blob/master/linux%E7%BD%91%E7%BB%9C%E7%9B%B8%E5%85%B3/Tc%20%E7%BD%91%E5%8D%A1%E5%A4%9A%E9%98%9F%E5%88%97%E6%97%B6%E6%AF%8F%E4%B8%AA%E9%98%9F%E5%88%97%E9%85%8D%E7%BD%AE%E5%85%AC%E5%B9%B3%E9%98%9F%E5%88%97sfq.md

http://man7.org/linux/man-pages/man8/tc-fq_codel.8.html


https://blog.csdn.net/one_clouder/article/details/52685249

二层发送中,实现qdisc的主要函数是 __dev_xmit_skb 和 net_tx_action,本篇将分析qdisc实现的原理,但是不涉及qdisc内部的算法,仅对框架进行分析。

1、__dev_xmit_skb 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
				 struct net_device *dev,
				 struct netdev_queue *txq)
{
	spinlock_t *root_lock = qdisc_lock(q);
	bool contended;
	int rc;
 
	qdisc_pkt_len_init(skb);
	qdisc_calculate_pkt_len(skb, q);
	/*
	 * Heuristic to force contended enqueues to serialize on a
	 * separate lock before trying to get qdisc main lock.
	 * This permits __QDISC___STATE_RUNNING owner to get the lock more
	 * often and dequeue packets faster.
	 */
	contended = qdisc_is_running(q);  //判断qdisc是否运行
	if (unlikely(contended))
		spin_lock(&q->busylock);
 
	spin_lock(root_lock);
	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
		kfree_skb(skb);
		rc = NET_XMIT_DROP;
	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&  //qisc没有运行,且没有缓存报文,则直接可以发送报文
		   qdisc_run_begin(q)) {
		/*
		 * This is a work-conserving queue; there are no old skbs
		 * waiting to be sent out; and the qdisc is not running -
		 * xmit the skb directly.
		 */
 
		qdisc_bstats_update(q, skb);
 
		if (sch_direct_xmit(skb, q, dev, txq, root_lock, true)) {
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);       //sch_direct_xmit返回为正值,说明qdisc中有报文待发送,尝试发送缓冲区报文
		} else
			qdisc_run_end(q); //正常发送完成,qdisc停止运行
 
		rc = NET_XMIT_SUCCESS;
	} else {
		rc = q->enqueue(skb, q) & NET_XMIT_MASK;   //qdisc running或者有缓存报文, 则把报文发动qdisc队列中
		if (qdisc_run_begin(q)) {         //尝试启动qdisc,如果qisc成功启动,则尝试发送报文
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);       //发送qdisc缓冲队列中的报文
		}
	}
	spin_unlock(root_lock);
	if (unlikely(contended))
		spin_unlock(&q->busylock);
	return rc;
}

2、__qdisc_run 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void __qdisc_run(struct Qdisc *q)
{
	int quota = weight_p;
	int packets;
 
	while (qdisc_restart(q, &packets)) {  //循环发送报文
		/*
		 * Ordered by possible occurrence: Postpone processing if
		 * 1. we've exceeded packet quota
		 * 2. another process needs the CPU;
		 */
		quota -= packets;
		if (quota <= 0 || need_resched()) {    //如果配额或需要调度,则触发软中断后退出
			__netif_schedule(q);
			break;
		}
	}
 
	qdisc_run_end(q); //qdisc停止
}

3、qdisc_restart函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static inline int qdisc_restart(struct Qdisc *q, int *packets)
{
	struct netdev_queue *txq;
	struct net_device *dev;
	spinlock_t *root_lock;
	struct sk_buff *skb;
	bool validate;
 
	/* Dequeue packet */
	skb = dequeue_skb(q, &validate, packets); //从缓存区中得到待发送的报文,因为流量限制原因,就算缓冲区有报文,也可能返回NULL
	if (unlikely(!skb))
		return 0;
 
	root_lock = qdisc_lock(q);
	dev = qdisc_dev(q);
	txq = skb_get_tx_queue(dev, skb);
 
	return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);    //发送报文
}

4、dequeue_skb函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
				   int *packets)
{
	struct sk_buff *skb = q->gso_skb;
	const struct netdev_queue *txq = q->dev_queue;
 
	*packets = 1;
	*validate = true;
	if (unlikely(skb)) {
		/* check the reason of requeuing without tx lock first */
		txq = skb_get_tx_queue(txq->dev, skb);
		if (!netif_xmit_frozen_or_stopped(txq)) {
			q->gso_skb = NULL;
			q->q.qlen--;
		} else
			skb = NULL;
		/* skb in gso_skb were already validated */
		*validate = false;
	} else {
		if (!(q->flags & TCQ_F_ONETXQUEUE) ||
			!netif_xmit_frozen_or_stopped(txq)) {
			skb = q->dequeue(q);           //调用qdisc的dequeue函数获取skb
			if (skb && qdisc_may_bulk(q))<span style="white-space:pre">     </span>//如果还能继续获取skb,则一次性获取多个skb
				try_bulk_dequeue_skb(q, skb, txq, packets);
		}
	}
	return skb;
}

net_tx_action

net_tx_action为报文发送软中断,在处理报文发送软中断时,尝试该CPU softnet_data上所有qdisc发送报文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static void net_tx_action(struct softirq_action *h)
{
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
 
	if (sd->completion_queue) {
		struct sk_buff *clist;
 
		local_irq_disable();
		clist = sd->completion_queue;
		sd->completion_queue = NULL;
		local_irq_enable();
 
		while (clist) {
			struct sk_buff *skb = clist;
			clist = clist->next;
 
			WARN_ON(atomic_read(&skb->users));
			if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
				trace_consume_skb(skb);
			else
				trace_kfree_skb(skb, net_tx_action);
			__kfree_skb(skb);
		}
	}
 
	if (sd->output_queue) {
		struct Qdisc *head;
 
		local_irq_disable();
		head = sd->output_queue;
		sd->output_queue = NULL;
		sd->output_queue_tailp = &sd->output_queue;
		local_irq_enable();
 
		while (head) {
			struct Qdisc *q = head;
			spinlock_t *root_lock;
 
			head = head->next_sched;
 
			root_lock = qdisc_lock(q);
			if (spin_trylock(root_lock)) {
				smp_mb__before_atomic();
				clear_bit(__QDISC_STATE_SCHED,
					  &q->state);
				qdisc_run(q);         //尝试启动qdisc发送报文
				spin_unlock(root_lock);
			} else {
				if (!test_bit(__QDISC_STATE_DEACTIVATED,
						  &q->state)) {
					__netif_reschedule(q);
				} else {
					smp_mb__before_atomic();
					clear_bit(__QDISC_STATE_SCHED,
						  &q->state);
				}
			}
		}
	}
}

netmap 介绍及使用

https://blog.csdn.net/fengfengdiandia/article/details/52869290

https://blog.csdn.net/liyu123__/article/details/80853150

https://www.cnblogs.com/ne-liqian/p/9294757.html

https://wenku.baidu.com/view/af41b0f065ce05087632137a

netmap官网:http://info.iet.unipi.it/~luigi/netmap/

netmap的githab网址:https://github.com/luigirizzo/netmap

netmap是一个高效的收发报文的 I/O 框架,已经集成在 FreeBSD 的内部了。 当然,也可以在 Linux 下编译使用 。

一、架构

现在的网卡都使用多个 buffer 来发送和接收 packet,并有一个叫NIC ring的环形数组。

NIC ring 是静态分配的,它的槽指向mbufs链的部分缓冲区。

netmap 内存映射网卡的packet buffer到用户态,实现了自己的发送和接收报文的circular ring来对应网卡的 ring,使用 netmap 时,程序运行在用户态,即使出了问题也不会 crash 操作系统。

下图显示了一个接口可以有多个 netmap ring。

将文件描述符绑定到 NIC 时,应用程序可以选择将所有 ring或仅一个 ring附加到文件描述符。

使用所有 ring,相同的代码可以用于单队列或多队列 NIC。

使用一个 ring,可以通过每个 ring 一个进程/CPU core 来构建高性能系统,从而在系统中并行。

netmap 使用poll等待网卡的文件描述符可接收或可发送。

netmap 会建立一个字符设备/dev/netmap,然后通过nm_open来注册网卡为 netmap 模式。

  • 注意:这里顺便提一下,网卡进入 netmap 模式后,ifconfig 是看不到网卡统计信息变化的,wireshark 也抓不到报文,因为协议栈被旁路了。

内存映射的区域里面,有网卡的收发队列,这样可以通过将接收缓冲区的地址写在发送的 ring 里面实现零拷贝(Zero-copy)。

二、性能

netmap 官网说在 10GigE 上测试,发包速率可以达到 14.88Mpps,收包的速率和发包相近。同时还支持多网卡队列。

三、编译安装

1
2
3
4
5
6
7
8
9
10
11
git clone https://github.com/luigirizzo/netmap.git
git clone https://github.com/abcdxyzk/netmap.git

cd netmap/LINUX
./configure --drivers=ixgbe --kernel-sources=/usr/src/linux-headers-4.15.18/ --kernel-dir=/usr/src/linux-headers-4.15.18/

rmmod ixgbe

insmod netmap.ko
insmod ixgbe/ixgbe.ko

四、发送、接收

发送

1
./build-apps/pkt-gen/pkt-gen -i enp3s0 -f tx -c 1 -p 1 -z -d 12.0.0.100:80

接收

1
gcc rcv.c -I../sys

cat rcv.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include <poll.h>
 
#define NETMAP_WITH_LIBS
#include <net/netmap_user.h>
 
unsigned long pps = 0;
 
static void receive_packets(struct netmap_ring *ring)
{
	int i;
	char *buf;
 
	while (!nm_ring_empty(ring)) {
		i   = ring->cur;
		buf = NETMAP_BUF(ring, ring->slot[i].buf_idx);
		pps++;

		ring->head = ring->cur = nm_ring_next(ring, i); 
	}
}
 
int main(void)
{
	struct nm_desc *d;
	struct pollfd fds;
	struct netmap_ring *ring;
	int i; 
 
	d = nm_open("netmap:eth1", NULL, 0, 0); 
  
	fds.fd     = d->fd;
	fds.events = POLLIN;
 
	while (1) {
		if (poll(&fds, 1, 1) < 0) {
			perror("poll()");
			exit(1);
		}
 
		for (i = d->first_rx_ring; i <= d->last_rx_ring; i++) {
			ring = NETMAP_RXRING(d->nifp, i);
			receive_packets(ring);
		}
	}

	return 0;
}

ixgbe驱动初始化

https://www.cnblogs.com/scottieyuyang/p/5663213.html

首先模块加载insmod ixgbe.ko

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
module_init(ixgbe_init_module);

module_init(ixgbe_init_module);
{
	int ret;
	pr_info("%s - version %s\n", ixgbe_driver_string, ixgbe_driver_version);
	pr_info("%s\n", ixgbe_copyright);

	ixgbe_dbg_init();
     ret = pci_register_driver(&ixgbe_driver);
	if (ret) {
		ixgbe_dbg_exit();
		return ret;
	}

#ifdef CONFIG_IXGBE_DCA
	dca_register_notify(&dca_notifier);
#endif

	return 0;
}

于是看pci设备的核心结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
static struct pci_driver ixgbe_driver = {
	.name     = ixgbe_driver_name,
	.id_table = ixgbe_pci_tbl,
	.probe    = ixgbe_probe,
	.remove   = ixgbe_remove,
#ifdef CONFIG_PM
	.suspend  = ixgbe_suspend,
	.resume   = ixgbe_resume,
#endif
	.shutdown = ixgbe_shutdown,
	.sriov_configure = ixgbe_pci_sriov_configure,
	.err_handler = &ixgbe_err_handler
};

当设备加载成功后,会执行ixgbe_probe函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int ixgbe_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
	/*分配struct net_device *netdev 结构体*/
	netdev = alloc_etherdev_mq(sizeof(struct ixgbe_adapter), indices);

	if (!netdev) {
		err = -ENOMEM;
		goto err_alloc_etherdev;
	}

	SET_NETDEV_DEV(netdev, &pdev->dev);

	/*分配struct ixgbe_adapter *adapter结构体*/
	adapter = netdev_priv(netdev);

	/*分配dev结构体的ops函数指针集合*/
	netdev->netdev_ops = &ixgbe_netdev_ops;

	err = ixgbe_sw_init(adapter);

	err = ixgbe_init_interrupt_scheme(adapter);
	/*设备注册完毕*/<br>
	err = register_netdev(netdev);
}

重点看ixgbe_init_interrupt_scheme(adapter)函数,该函数里面会初始化adapter结构体以及napi相关的东西

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int ixgbe_init_interrupt_scheme(struct ixgbe_adapter *adapter)
{

	err = ixgbe_alloc_q_vectors(adapter);

}
static int ixgbe_alloc_q_vectors(struct ixgbe_adapter *adapter)
{

	if (q_vectors >= (rxr_remaining + txr_remaining)) {
		for (; rxr_remaining; v_idx++) {
			err = ixgbe_alloc_q_vector(adapter, q_vectors, v_idx,
						   0, 0, 1, rxr_idx);

			if (err)
				goto err_out;

			/* update counts and index */
			rxr_remaining--;
			rxr_idx++;
		}
	}
}
static int ixgbe_alloc_q_vector(struct ixgbe_adapter *adapter,
				int v_count, int v_idx,
				int txr_count, int txr_idx,
				int rxr_count, int rxr_idx)
{
	/* setup affinity mask and node */
	if (cpu != -1)
		cpumask_set_cpu(cpu, &q_vector->affinity_mask);
	q_vector->numa_node = node;

#ifdef CONFIG_IXGBE_DCA
	/* initialize CPU for DCA */
	q_vector->cpu = -1;

#endif
	/* initialize NAPI */
	netif_napi_add(adapter->netdev, &q_vector->napi,
			   ixgbe_poll, 64);
	napi_hash_add(&q_vector->napi);
}

到此为止,网卡设置初始化完毕  

其中涉及到如下几个结构体

ixgbe_adapter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/* board specific private data structure */
struct ixgbe_adapter {

	//发送的rings
	struct ixgbe_ring *tx_ring[MAX_TX_QUEUES] ____cacheline_aligned_in_smp;

	//接收的rings
	struct ixgbe_ring *rx_ring[MAX_RX_QUEUES];

	//这个vector里面包含了napi结构
	//应该是跟下面的entries一一对应起来做为是一个中断向量的东西吧
	struct ixgbe_q_vector *q_vector[MAX_Q_VECTORS];

	//这个里面估计是MSIX的多个中断对应的响应接口
	struct msix_entry *msix_entries;
}

struct ixgbe_q_vector {
	struct ixgbe_adapter *adapter;
ifdef CONFIG_IXGBE_DCA
	int cpu;            /* CPU for DCA */
#endif
	u16 v_idx;              /* index of q_vector within array, also used for
				 * finding the bit in EICR and friends that
				 * represents the vector for this ring */
	u16 itr;                /* Interrupt throttle rate written to EITR */
	struct ixgbe_ring_container rx, tx;

	struct napi_struct napi;/*napi结构体*/
	cpumask_t affinity_mask;
	int numa_node;
	struct rcu_head rcu;    /* to avoid race with update stats on free */
	char name[IFNAMSIZ + 9];

	/* for dynamic allocation of rings associated with this q_vector */
	struct ixgbe_ring ring[0] ____cacheline_internodealigned_in_smp;
};

struct napi_struct {
	/* The poll_list must only be managed by the entity which
	 * changes the state of the NAPI_STATE_SCHED bit.  This means
	 * whoever atomically sets that bit can add this napi_struct
	 * to the per-cpu poll_list, and whoever clears that bit
	 * can remove from the list right before clearing the bit.
	 */
	struct list_head    poll_list;

	unsigned long       state;
	int         weight;
	unsigned int        gro_count;
	int         (*poll)(struct napi_struct *, int);//poll的接口实现
#ifdef CONFIG_NETPOLL
	spinlock_t      poll_lock;
	int         poll_owner;
#endif
	struct net_device   *dev;
	struct sk_buff      *gro_list;
	struct sk_buff      *skb;
	struct list_head    dev_list;
};

然后当我们ifconfig dev up 时,会执行dev_ops->open函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int ixgbe_open(struct net_device *netdev)
{
	/* allocate transmit descriptors */
	err = ixgbe_setup_all_tx_resources(adapter);
	if (err)
		goto err_setup_tx;

	/* allocate receive descriptors */
	err = ixgbe_setup_all_rx_resources(adapter);
	/*注册中断*/
	err = ixgbe_request_irq(adapter);
}

static int ixgbe_request_irq(struct ixgbe_adapter *adapter)
{
	struct net_device *netdev = adapter->netdev;
	int err;

	if (adapter->flags & IXGBE_FLAG_MSIX_ENABLED)
		err = ixgbe_request_msix_irqs(adapter);
	else if (adapter->flags & IXGBE_FLAG_MSI_ENABLED)
		err = request_irq(adapter->pdev->irq, ixgbe_intr, 0,
				  netdev->name, adapter);
	else
		err = request_irq(adapter->pdev->irq, ixgbe_intr, IRQF_SHARED,
				  netdev->name, adapter);

	if (err)
		e_err(probe, "request_irq failed, Error %d\n", err);

	return err;
}

static int ixgbe_request_msix_irqs(struct ixgbe_adapter *adapter)
{
	for (vector = 0; vector < adapter->num_q_vectors; vector++) {
		struct ixgbe_q_vector *q_vector = adapter->q_vector[vector];
		struct msix_entry *entry = &adapter->msix_entries[vector];

		err = request_irq(entry->vector, &ixgbe_msix_clean_rings, 0,
				  q_vector->name, q_vector);
	}
}

从上面的代码流程可以看出,最终注册的中断处理函数为ixgbe_msix_clean_rings

1
2
3
4
5
6
7
8
9
10
11
static irqreturn_t ixgbe_msix_clean_rings(int irq, void *data)
{
	struct ixgbe_q_vector *q_vector = data;

	/* EIAM disabled interrupts (on this vector) for us */

	if (q_vector->rx.ring || q_vector->tx.ring)
		napi_schedule(&q_vector->napi);

	return IRQ_HANDLED;
}

从上述代码中可以看,该中断处理函数仅仅作为napi的调度者

当数据包到来时,首先唤醒硬中断执行ixgbe_msix_clean_rings函数,最终napi_schedule会调用 __raise_softirq_irqoff 去触发一个软中断NET_RX_SOFTIRQ,然后又对应的软中断接口去实现往上的协议栈逻辑

然后看看napi 调度函数都做了些什么工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static inline void napi_schedule(struct napi_struct *n)
{
	if (napi_schedule_prep(n))
		__napi_schedule(n);
}
void __napi_schedule(struct napi_struct *n)
{
	unsigned long flags;

	local_irq_save(flags);
	____napi_schedule(this_cpu_ptr(&softnet_data), n);
	local_irq_restore(flags);
}

最终可以看出napi调度函数把napi结构体挂到了per cpu的私有数据结构softnet_data上
struct softnet_data {
	struct Qdisc        *output_queue;
	struct Qdisc        **output_queue_tailp;
	struct list_head    poll_list;
	struct sk_buff      *completion_queue;
	struct sk_buff_head process_queue;

	/* stats */
	unsigned int        processed;
	unsigned int        time_squeeze;
	unsigned int        cpu_collision;
	unsigned int        received_rps;

#ifdef CONFIG_RPS
	struct softnet_data *rps_ipi_list;

	/* Elements below can be accessed between CPUs for RPS */
	struct call_single_data csd ____cacheline_aligned_in_smp;
	struct softnet_data *rps_ipi_next;
	unsigned int        cpu;
	unsigned int        input_queue_head;
	unsigned int        input_queue_tail;
#endif
	unsigned int        dropped;
	struct sk_buff_head input_pkt_queue;
	struct napi_struct  backlog;/*napi结构体里面的双向链表中*/
};

NET_RX_SOFTIRQ是收到数据包的软中断信号对应的接口是net_rx_action

NET_TX_SOFTIRQ是发送完数据包后的软中断信号对应的接口是net_tx_action  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void net_rx_action(struct softirq_action *h)
{
	/* 获取每个cpu的数据*/
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
	while (!list_empty(&sd->poll_list)) {
		struct napi_struct *n;
				n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);

		if (test_bit(NAPI_STATE_SCHED, &n->state)) {
			work = n->poll(n, weight);
			trace_napi_poll(n);
		}
	}
}

于是就执行到初始化napi结构体中的poll函数,在这里为ixgbe_poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
int ixgbe_poll(struct napi_struct *napi, int budget)
{
	struct ixgbe_q_vector *q_vector =
				container_of(napi, struct ixgbe_q_vector, napi);
	struct ixgbe_adapter *adapter = q_vector->adapter;
	struct ixgbe_ring *ring;
	int per_ring_budget;
	bool clean_complete = true;

#ifdef CONFIG_IXGBE_DCA
	if (adapter->flags & IXGBE_FLAG_DCA_ENABLED)
		ixgbe_update_dca(q_vector);
#endif

	ixgbe_for_each_ring(ring, q_vector->tx)
		clean_complete &= !!ixgbe_clean_tx_irq(q_vector, ring);

	if (!ixgbe_qv_lock_napi(q_vector))
		return budget;

	/* attempt to distribute budget to each queue fairly, but don't allow
	 * the budget to go below 1 because we'll exit polling */
	if (q_vector->rx.count > 1)
		per_ring_budget = max(budget/q_vector->rx.count, 1);
	else
		per_ring_budget = budget;

	ixgbe_for_each_ring(ring, q_vector->rx)
		clean_complete &= (ixgbe_clean_rx_irq(q_vector, ring,
				   per_ring_budget) < per_ring_budget);

	ixgbe_qv_unlock_napi(q_vector);
	/* If all work not completed, return budget and keep polling */
	if (!clean_complete)
		return budget;

	/* all work done, exit the polling mode */
	napi_complete(napi);
	if (adapter->rx_itr_setting & 1)
		ixgbe_set_itr(q_vector);
	if (!test_bit(__IXGBE_DOWN, &adapter->state))
		ixgbe_irq_enable_queues(adapter, ((u64)1 << q_vector->v_idx));

	return 0;
}

static int ixgbe_clean_rx_irq(struct ixgbe_q_vector *q_vector,
				   struct ixgbe_ring *rx_ring,
				   const int budget)
{
	   ixgbe_rx_skb(q_vector, skb);
}

static void ixgbe_rx_skb(struct ixgbe_q_vector *q_vector,
			 struct sk_buff *skb)
{
	if (ixgbe_qv_busy_polling(q_vector))
		netif_receive_skb(skb);
	else
		napi_gro_receive(&q_vector->napi, skb);
}

int netif_receive_skb(struct sk_buff *skb)
{
	int ret;

	net_timestamp_check(netdev_tstamp_prequeue, skb);

	if (skb_defer_rx_timestamp(skb))
		return NET_RX_SUCCESS;

	rcu_read_lock();

#ifdef CONFIG_RPS
	if (static_key_false(&rps_needed)) {
		struct rps_dev_flow voidflow, *rflow = &voidflow;
		int cpu = get_rps_cpu(skb->dev, skb, &rflow);

		if (cpu >= 0) {
			ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
			rcu_read_unlock();
			return ret;
		}
	}
#endif
		/*最终协议栈开始收报*/
	ret = __netif_receive_skb(skb);
	rcu_read_unlock();
	return ret;
}