kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp

qdisc实现分析

1
2
3
4
tc qdisc show
echo pfifo > /proc/sys/net/core/default_qdisc
tc qdisc add dev eth0 root pfifo
tc qdisc del dev eth0 root

https://github.com/liucimin/Learning/blob/master/linux%E7%BD%91%E7%BB%9C%E7%9B%B8%E5%85%B3/Tc%20%E7%BD%91%E5%8D%A1%E5%A4%9A%E9%98%9F%E5%88%97%E6%97%B6%E6%AF%8F%E4%B8%AA%E9%98%9F%E5%88%97%E9%85%8D%E7%BD%AE%E5%85%AC%E5%B9%B3%E9%98%9F%E5%88%97sfq.md

http://man7.org/linux/man-pages/man8/tc-fq_codel.8.html


https://blog.csdn.net/one_clouder/article/details/52685249

二层发送中,实现qdisc的主要函数是 __dev_xmit_skb 和 net_tx_action,本篇将分析qdisc实现的原理,但是不涉及qdisc内部的算法,仅对框架进行分析。

1、__dev_xmit_skb 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
				 struct net_device *dev,
				 struct netdev_queue *txq)
{
	spinlock_t *root_lock = qdisc_lock(q);
	bool contended;
	int rc;
 
	qdisc_pkt_len_init(skb);
	qdisc_calculate_pkt_len(skb, q);
	/*
	 * Heuristic to force contended enqueues to serialize on a
	 * separate lock before trying to get qdisc main lock.
	 * This permits __QDISC___STATE_RUNNING owner to get the lock more
	 * often and dequeue packets faster.
	 */
	contended = qdisc_is_running(q);  //判断qdisc是否运行
	if (unlikely(contended))
		spin_lock(&q->busylock);
 
	spin_lock(root_lock);
	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
		kfree_skb(skb);
		rc = NET_XMIT_DROP;
	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&  //qisc没有运行,且没有缓存报文,则直接可以发送报文
		   qdisc_run_begin(q)) {
		/*
		 * This is a work-conserving queue; there are no old skbs
		 * waiting to be sent out; and the qdisc is not running -
		 * xmit the skb directly.
		 */
 
		qdisc_bstats_update(q, skb);
 
		if (sch_direct_xmit(skb, q, dev, txq, root_lock, true)) {
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);       //sch_direct_xmit返回为正值,说明qdisc中有报文待发送,尝试发送缓冲区报文
		} else
			qdisc_run_end(q); //正常发送完成,qdisc停止运行
 
		rc = NET_XMIT_SUCCESS;
	} else {
		rc = q->enqueue(skb, q) & NET_XMIT_MASK;   //qdisc running或者有缓存报文, 则把报文发动qdisc队列中
		if (qdisc_run_begin(q)) {         //尝试启动qdisc,如果qisc成功启动,则尝试发送报文
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);       //发送qdisc缓冲队列中的报文
		}
	}
	spin_unlock(root_lock);
	if (unlikely(contended))
		spin_unlock(&q->busylock);
	return rc;
}

2、__qdisc_run 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void __qdisc_run(struct Qdisc *q)
{
	int quota = weight_p;
	int packets;
 
	while (qdisc_restart(q, &packets)) {  //循环发送报文
		/*
		 * Ordered by possible occurrence: Postpone processing if
		 * 1. we've exceeded packet quota
		 * 2. another process needs the CPU;
		 */
		quota -= packets;
		if (quota <= 0 || need_resched()) {    //如果配额或需要调度,则触发软中断后退出
			__netif_schedule(q);
			break;
		}
	}
 
	qdisc_run_end(q); //qdisc停止
}

3、qdisc_restart函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static inline int qdisc_restart(struct Qdisc *q, int *packets)
{
	struct netdev_queue *txq;
	struct net_device *dev;
	spinlock_t *root_lock;
	struct sk_buff *skb;
	bool validate;
 
	/* Dequeue packet */
	skb = dequeue_skb(q, &validate, packets); //从缓存区中得到待发送的报文,因为流量限制原因,就算缓冲区有报文,也可能返回NULL
	if (unlikely(!skb))
		return 0;
 
	root_lock = qdisc_lock(q);
	dev = qdisc_dev(q);
	txq = skb_get_tx_queue(dev, skb);
 
	return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);    //发送报文
}

4、dequeue_skb函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
				   int *packets)
{
	struct sk_buff *skb = q->gso_skb;
	const struct netdev_queue *txq = q->dev_queue;
 
	*packets = 1;
	*validate = true;
	if (unlikely(skb)) {
		/* check the reason of requeuing without tx lock first */
		txq = skb_get_tx_queue(txq->dev, skb);
		if (!netif_xmit_frozen_or_stopped(txq)) {
			q->gso_skb = NULL;
			q->q.qlen--;
		} else
			skb = NULL;
		/* skb in gso_skb were already validated */
		*validate = false;
	} else {
		if (!(q->flags & TCQ_F_ONETXQUEUE) ||
			!netif_xmit_frozen_or_stopped(txq)) {
			skb = q->dequeue(q);           //调用qdisc的dequeue函数获取skb
			if (skb && qdisc_may_bulk(q))<span style="white-space:pre">     </span>//如果还能继续获取skb,则一次性获取多个skb
				try_bulk_dequeue_skb(q, skb, txq, packets);
		}
	}
	return skb;
}

net_tx_action

net_tx_action为报文发送软中断,在处理报文发送软中断时,尝试该CPU softnet_data上所有qdisc发送报文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static void net_tx_action(struct softirq_action *h)
{
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
 
	if (sd->completion_queue) {
		struct sk_buff *clist;
 
		local_irq_disable();
		clist = sd->completion_queue;
		sd->completion_queue = NULL;
		local_irq_enable();
 
		while (clist) {
			struct sk_buff *skb = clist;
			clist = clist->next;
 
			WARN_ON(atomic_read(&skb->users));
			if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
				trace_consume_skb(skb);
			else
				trace_kfree_skb(skb, net_tx_action);
			__kfree_skb(skb);
		}
	}
 
	if (sd->output_queue) {
		struct Qdisc *head;
 
		local_irq_disable();
		head = sd->output_queue;
		sd->output_queue = NULL;
		sd->output_queue_tailp = &sd->output_queue;
		local_irq_enable();
 
		while (head) {
			struct Qdisc *q = head;
			spinlock_t *root_lock;
 
			head = head->next_sched;
 
			root_lock = qdisc_lock(q);
			if (spin_trylock(root_lock)) {
				smp_mb__before_atomic();
				clear_bit(__QDISC_STATE_SCHED,
					  &q->state);
				qdisc_run(q);         //尝试启动qdisc发送报文
				spin_unlock(root_lock);
			} else {
				if (!test_bit(__QDISC_STATE_DEACTIVATED,
						  &q->state)) {
					__netif_reschedule(q);
				} else {
					smp_mb__before_atomic();
					clear_bit(__QDISC_STATE_SCHED,
						  &q->state);
				}
			}
		}
	}
}

netmap 介绍及使用

https://blog.csdn.net/fengfengdiandia/article/details/52869290

https://blog.csdn.net/liyu123__/article/details/80853150

https://www.cnblogs.com/ne-liqian/p/9294757.html

https://wenku.baidu.com/view/af41b0f065ce05087632137a

netmap官网:http://info.iet.unipi.it/~luigi/netmap/

netmap的githab网址:https://github.com/luigirizzo/netmap

netmap是一个高效的收发报文的 I/O 框架,已经集成在 FreeBSD 的内部了。 当然,也可以在 Linux 下编译使用 。

一、架构

现在的网卡都使用多个 buffer 来发送和接收 packet,并有一个叫NIC ring的环形数组。

NIC ring 是静态分配的,它的槽指向mbufs链的部分缓冲区。

netmap 内存映射网卡的packet buffer到用户态,实现了自己的发送和接收报文的circular ring来对应网卡的 ring,使用 netmap 时,程序运行在用户态,即使出了问题也不会 crash 操作系统。

下图显示了一个接口可以有多个 netmap ring。

将文件描述符绑定到 NIC 时,应用程序可以选择将所有 ring或仅一个 ring附加到文件描述符。

使用所有 ring,相同的代码可以用于单队列或多队列 NIC。

使用一个 ring,可以通过每个 ring 一个进程/CPU core 来构建高性能系统,从而在系统中并行。

netmap 使用poll等待网卡的文件描述符可接收或可发送。

netmap 会建立一个字符设备/dev/netmap,然后通过nm_open来注册网卡为 netmap 模式。

  • 注意:这里顺便提一下,网卡进入 netmap 模式后,ifconfig 是看不到网卡统计信息变化的,wireshark 也抓不到报文,因为协议栈被旁路了。

内存映射的区域里面,有网卡的收发队列,这样可以通过将接收缓冲区的地址写在发送的 ring 里面实现零拷贝(Zero-copy)。

二、性能

netmap 官网说在 10GigE 上测试,发包速率可以达到 14.88Mpps,收包的速率和发包相近。同时还支持多网卡队列。

三、编译安装

1
2
3
4
5
6
7
8
9
10
11
git clone https://github.com/luigirizzo/netmap.git
git clone https://github.com/abcdxyzk/netmap.git

cd netmap/LINUX
./configure --drivers=ixgbe --kernel-sources=/usr/src/linux-headers-4.15.18/ --kernel-dir=/usr/src/linux-headers-4.15.18/

rmmod ixgbe

insmod netmap.ko
insmod ixgbe/ixgbe.ko

四、发送、接收

发送

1
./build-apps/pkt-gen/pkt-gen -i enp3s0 -f tx -c 1 -p 1 -z -d 12.0.0.100:80

接收

1
gcc rcv.c -I../sys

cat rcv.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include <poll.h>
 
#define NETMAP_WITH_LIBS
#include <net/netmap_user.h>
 
unsigned long pps = 0;
 
static void receive_packets(struct netmap_ring *ring)
{
	int i;
	char *buf;
 
	while (!nm_ring_empty(ring)) {
		i   = ring->cur;
		buf = NETMAP_BUF(ring, ring->slot[i].buf_idx);
		pps++;

		ring->head = ring->cur = nm_ring_next(ring, i); 
	}
}
 
int main(void)
{
	struct nm_desc *d;
	struct pollfd fds;
	struct netmap_ring *ring;
	int i; 
 
	d = nm_open("netmap:eth1", NULL, 0, 0); 
  
	fds.fd     = d->fd;
	fds.events = POLLIN;
 
	while (1) {
		if (poll(&fds, 1, 1) < 0) {
			perror("poll()");
			exit(1);
		}
 
		for (i = d->first_rx_ring; i <= d->last_rx_ring; i++) {
			ring = NETMAP_RXRING(d->nifp, i);
			receive_packets(ring);
		}
	}

	return 0;
}

ixgbe驱动初始化

https://www.cnblogs.com/scottieyuyang/p/5663213.html

首先模块加载insmod ixgbe.ko

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
module_init(ixgbe_init_module);

module_init(ixgbe_init_module);
{
	int ret;
	pr_info("%s - version %s\n", ixgbe_driver_string, ixgbe_driver_version);
	pr_info("%s\n", ixgbe_copyright);

	ixgbe_dbg_init();
     ret = pci_register_driver(&ixgbe_driver);
	if (ret) {
		ixgbe_dbg_exit();
		return ret;
	}

#ifdef CONFIG_IXGBE_DCA
	dca_register_notify(&dca_notifier);
#endif

	return 0;
}

于是看pci设备的核心结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
static struct pci_driver ixgbe_driver = {
	.name     = ixgbe_driver_name,
	.id_table = ixgbe_pci_tbl,
	.probe    = ixgbe_probe,
	.remove   = ixgbe_remove,
#ifdef CONFIG_PM
	.suspend  = ixgbe_suspend,
	.resume   = ixgbe_resume,
#endif
	.shutdown = ixgbe_shutdown,
	.sriov_configure = ixgbe_pci_sriov_configure,
	.err_handler = &ixgbe_err_handler
};

当设备加载成功后,会执行ixgbe_probe函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int ixgbe_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
{
	/*分配struct net_device *netdev 结构体*/
	netdev = alloc_etherdev_mq(sizeof(struct ixgbe_adapter), indices);

	if (!netdev) {
		err = -ENOMEM;
		goto err_alloc_etherdev;
	}

	SET_NETDEV_DEV(netdev, &pdev->dev);

	/*分配struct ixgbe_adapter *adapter结构体*/
	adapter = netdev_priv(netdev);

	/*分配dev结构体的ops函数指针集合*/
	netdev->netdev_ops = &ixgbe_netdev_ops;

	err = ixgbe_sw_init(adapter);

	err = ixgbe_init_interrupt_scheme(adapter);
	/*设备注册完毕*/<br>
	err = register_netdev(netdev);
}

重点看ixgbe_init_interrupt_scheme(adapter)函数,该函数里面会初始化adapter结构体以及napi相关的东西

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
int ixgbe_init_interrupt_scheme(struct ixgbe_adapter *adapter)
{

	err = ixgbe_alloc_q_vectors(adapter);

}
static int ixgbe_alloc_q_vectors(struct ixgbe_adapter *adapter)
{

	if (q_vectors >= (rxr_remaining + txr_remaining)) {
		for (; rxr_remaining; v_idx++) {
			err = ixgbe_alloc_q_vector(adapter, q_vectors, v_idx,
						   0, 0, 1, rxr_idx);

			if (err)
				goto err_out;

			/* update counts and index */
			rxr_remaining--;
			rxr_idx++;
		}
	}
}
static int ixgbe_alloc_q_vector(struct ixgbe_adapter *adapter,
				int v_count, int v_idx,
				int txr_count, int txr_idx,
				int rxr_count, int rxr_idx)
{
	/* setup affinity mask and node */
	if (cpu != -1)
		cpumask_set_cpu(cpu, &q_vector->affinity_mask);
	q_vector->numa_node = node;

#ifdef CONFIG_IXGBE_DCA
	/* initialize CPU for DCA */
	q_vector->cpu = -1;

#endif
	/* initialize NAPI */
	netif_napi_add(adapter->netdev, &q_vector->napi,
			   ixgbe_poll, 64);
	napi_hash_add(&q_vector->napi);
}

到此为止,网卡设置初始化完毕  

其中涉及到如下几个结构体

ixgbe_adapter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/* board specific private data structure */
struct ixgbe_adapter {

	//发送的rings
	struct ixgbe_ring *tx_ring[MAX_TX_QUEUES] ____cacheline_aligned_in_smp;

	//接收的rings
	struct ixgbe_ring *rx_ring[MAX_RX_QUEUES];

	//这个vector里面包含了napi结构
	//应该是跟下面的entries一一对应起来做为是一个中断向量的东西吧
	struct ixgbe_q_vector *q_vector[MAX_Q_VECTORS];

	//这个里面估计是MSIX的多个中断对应的响应接口
	struct msix_entry *msix_entries;
}

struct ixgbe_q_vector {
	struct ixgbe_adapter *adapter;
ifdef CONFIG_IXGBE_DCA
	int cpu;            /* CPU for DCA */
#endif
	u16 v_idx;              /* index of q_vector within array, also used for
				 * finding the bit in EICR and friends that
				 * represents the vector for this ring */
	u16 itr;                /* Interrupt throttle rate written to EITR */
	struct ixgbe_ring_container rx, tx;

	struct napi_struct napi;/*napi结构体*/
	cpumask_t affinity_mask;
	int numa_node;
	struct rcu_head rcu;    /* to avoid race with update stats on free */
	char name[IFNAMSIZ + 9];

	/* for dynamic allocation of rings associated with this q_vector */
	struct ixgbe_ring ring[0] ____cacheline_internodealigned_in_smp;
};

struct napi_struct {
	/* The poll_list must only be managed by the entity which
	 * changes the state of the NAPI_STATE_SCHED bit.  This means
	 * whoever atomically sets that bit can add this napi_struct
	 * to the per-cpu poll_list, and whoever clears that bit
	 * can remove from the list right before clearing the bit.
	 */
	struct list_head    poll_list;

	unsigned long       state;
	int         weight;
	unsigned int        gro_count;
	int         (*poll)(struct napi_struct *, int);//poll的接口实现
#ifdef CONFIG_NETPOLL
	spinlock_t      poll_lock;
	int         poll_owner;
#endif
	struct net_device   *dev;
	struct sk_buff      *gro_list;
	struct sk_buff      *skb;
	struct list_head    dev_list;
};

然后当我们ifconfig dev up 时,会执行dev_ops->open函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static int ixgbe_open(struct net_device *netdev)
{
	/* allocate transmit descriptors */
	err = ixgbe_setup_all_tx_resources(adapter);
	if (err)
		goto err_setup_tx;

	/* allocate receive descriptors */
	err = ixgbe_setup_all_rx_resources(adapter);
	/*注册中断*/
	err = ixgbe_request_irq(adapter);
}

static int ixgbe_request_irq(struct ixgbe_adapter *adapter)
{
	struct net_device *netdev = adapter->netdev;
	int err;

	if (adapter->flags & IXGBE_FLAG_MSIX_ENABLED)
		err = ixgbe_request_msix_irqs(adapter);
	else if (adapter->flags & IXGBE_FLAG_MSI_ENABLED)
		err = request_irq(adapter->pdev->irq, ixgbe_intr, 0,
				  netdev->name, adapter);
	else
		err = request_irq(adapter->pdev->irq, ixgbe_intr, IRQF_SHARED,
				  netdev->name, adapter);

	if (err)
		e_err(probe, "request_irq failed, Error %d\n", err);

	return err;
}

static int ixgbe_request_msix_irqs(struct ixgbe_adapter *adapter)
{
	for (vector = 0; vector < adapter->num_q_vectors; vector++) {
		struct ixgbe_q_vector *q_vector = adapter->q_vector[vector];
		struct msix_entry *entry = &adapter->msix_entries[vector];

		err = request_irq(entry->vector, &ixgbe_msix_clean_rings, 0,
				  q_vector->name, q_vector);
	}
}

从上面的代码流程可以看出,最终注册的中断处理函数为ixgbe_msix_clean_rings

1
2
3
4
5
6
7
8
9
10
11
static irqreturn_t ixgbe_msix_clean_rings(int irq, void *data)
{
	struct ixgbe_q_vector *q_vector = data;

	/* EIAM disabled interrupts (on this vector) for us */

	if (q_vector->rx.ring || q_vector->tx.ring)
		napi_schedule(&q_vector->napi);

	return IRQ_HANDLED;
}

从上述代码中可以看,该中断处理函数仅仅作为napi的调度者

当数据包到来时,首先唤醒硬中断执行ixgbe_msix_clean_rings函数,最终napi_schedule会调用 __raise_softirq_irqoff 去触发一个软中断NET_RX_SOFTIRQ,然后又对应的软中断接口去实现往上的协议栈逻辑

然后看看napi 调度函数都做了些什么工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static inline void napi_schedule(struct napi_struct *n)
{
	if (napi_schedule_prep(n))
		__napi_schedule(n);
}
void __napi_schedule(struct napi_struct *n)
{
	unsigned long flags;

	local_irq_save(flags);
	____napi_schedule(this_cpu_ptr(&softnet_data), n);
	local_irq_restore(flags);
}

最终可以看出napi调度函数把napi结构体挂到了per cpu的私有数据结构softnet_data上
struct softnet_data {
	struct Qdisc        *output_queue;
	struct Qdisc        **output_queue_tailp;
	struct list_head    poll_list;
	struct sk_buff      *completion_queue;
	struct sk_buff_head process_queue;

	/* stats */
	unsigned int        processed;
	unsigned int        time_squeeze;
	unsigned int        cpu_collision;
	unsigned int        received_rps;

#ifdef CONFIG_RPS
	struct softnet_data *rps_ipi_list;

	/* Elements below can be accessed between CPUs for RPS */
	struct call_single_data csd ____cacheline_aligned_in_smp;
	struct softnet_data *rps_ipi_next;
	unsigned int        cpu;
	unsigned int        input_queue_head;
	unsigned int        input_queue_tail;
#endif
	unsigned int        dropped;
	struct sk_buff_head input_pkt_queue;
	struct napi_struct  backlog;/*napi结构体里面的双向链表中*/
};

NET_RX_SOFTIRQ是收到数据包的软中断信号对应的接口是net_rx_action

NET_TX_SOFTIRQ是发送完数据包后的软中断信号对应的接口是net_tx_action  

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static void net_rx_action(struct softirq_action *h)
{
	/* 获取每个cpu的数据*/
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
	while (!list_empty(&sd->poll_list)) {
		struct napi_struct *n;
				n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);

		if (test_bit(NAPI_STATE_SCHED, &n->state)) {
			work = n->poll(n, weight);
			trace_napi_poll(n);
		}
	}
}

于是就执行到初始化napi结构体中的poll函数,在这里为ixgbe_poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
int ixgbe_poll(struct napi_struct *napi, int budget)
{
	struct ixgbe_q_vector *q_vector =
				container_of(napi, struct ixgbe_q_vector, napi);
	struct ixgbe_adapter *adapter = q_vector->adapter;
	struct ixgbe_ring *ring;
	int per_ring_budget;
	bool clean_complete = true;

#ifdef CONFIG_IXGBE_DCA
	if (adapter->flags & IXGBE_FLAG_DCA_ENABLED)
		ixgbe_update_dca(q_vector);
#endif

	ixgbe_for_each_ring(ring, q_vector->tx)
		clean_complete &= !!ixgbe_clean_tx_irq(q_vector, ring);

	if (!ixgbe_qv_lock_napi(q_vector))
		return budget;

	/* attempt to distribute budget to each queue fairly, but don't allow
	 * the budget to go below 1 because we'll exit polling */
	if (q_vector->rx.count > 1)
		per_ring_budget = max(budget/q_vector->rx.count, 1);
	else
		per_ring_budget = budget;

	ixgbe_for_each_ring(ring, q_vector->rx)
		clean_complete &= (ixgbe_clean_rx_irq(q_vector, ring,
				   per_ring_budget) < per_ring_budget);

	ixgbe_qv_unlock_napi(q_vector);
	/* If all work not completed, return budget and keep polling */
	if (!clean_complete)
		return budget;

	/* all work done, exit the polling mode */
	napi_complete(napi);
	if (adapter->rx_itr_setting & 1)
		ixgbe_set_itr(q_vector);
	if (!test_bit(__IXGBE_DOWN, &adapter->state))
		ixgbe_irq_enable_queues(adapter, ((u64)1 << q_vector->v_idx));

	return 0;
}

static int ixgbe_clean_rx_irq(struct ixgbe_q_vector *q_vector,
				   struct ixgbe_ring *rx_ring,
				   const int budget)
{
	   ixgbe_rx_skb(q_vector, skb);
}

static void ixgbe_rx_skb(struct ixgbe_q_vector *q_vector,
			 struct sk_buff *skb)
{
	if (ixgbe_qv_busy_polling(q_vector))
		netif_receive_skb(skb);
	else
		napi_gro_receive(&q_vector->napi, skb);
}

int netif_receive_skb(struct sk_buff *skb)
{
	int ret;

	net_timestamp_check(netdev_tstamp_prequeue, skb);

	if (skb_defer_rx_timestamp(skb))
		return NET_RX_SUCCESS;

	rcu_read_lock();

#ifdef CONFIG_RPS
	if (static_key_false(&rps_needed)) {
		struct rps_dev_flow voidflow, *rflow = &voidflow;
		int cpu = get_rps_cpu(skb->dev, skb, &rflow);

		if (cpu >= 0) {
			ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
			rcu_read_unlock();
			return ret;
		}
	}
#endif
		/*最终协议栈开始收报*/
	ret = __netif_receive_skb(skb);
	rcu_read_unlock();
	return ret;
}

基于82599网卡的二层网络数据包接收

https://tqr.ink/2017/04/16/intel-82599-receive-packet/

本篇文档主要描述了网络数据包在二层的接收流程,主要包括以下三个部分:

  1)、82599网卡和数据包接收相关的内容;

  2)、ixgbe网卡驱动数据包接收相关的配置;

  3)、ixgbe网卡驱动napi接口的处理。

82599网卡和数据包接收相关的内容

  这一部分要介绍的是82599网卡中和数据包接收相关的内容。网络报文接收流程所涉及的内容很多,如报文过滤、mac层卸载、报文接收描述符、校验和卸载以及分离报文有效载荷和头部等,由于篇幅原因,这里只介绍了报文接收描述符相关的内容,其他内容会在后续描述中进行穿插。  

  说到网卡报文接收,就必须得说到报文接收描述符,因为报文接收描述符承载了报文从网卡流入到主存的过程。对于网卡硬件而言,当网卡收到网络报文的时候,会往报文接收描述符中指定的地址写入报文数据,而网卡驱动则会从报文接收描述符中指定的地址读取报文,并送往上层协议栈处理。

  除了上面说到的存放报文的内存地址,报文接收描述符中还有用于存储报文信息的域。对于82599网卡而言,其支持两种格式的报文接收描述符,即传统格式和高级格式。虽然有两种不同格式的报文接收描述符,但是两种格式的报文接收描述符所占用的内存大小是一样的(目前为16字节),只是对这块内存使用有所不同。对于两种不同格式的报文接收描述符,可以在网卡驱动初始化的时候进行配置,通过设置网卡的SRRCTL寄存器的DRSCTYPE域进而选择使用某种格式的报文接收描述符。在初始化阶段,网卡驱动会申请报文描述符,并填充描述符中相关的域,然后告诉网卡该描述符可用,后续网卡接收到报文就可以用报文描述符来存储报文相关的信息,然后网卡将报文描述符回写给网卡驱动,网卡驱动从中获取所需要信息,并交由上层进行处理。

传统格式报文接收描述符

  先来看下82599网卡中对传统格式报文接收描述符的定义,如下:

图1 传统格式报文接收描述符

  从上面的图中可以看到,报文接收描述符的低八个字节存放的是用于存放报文的内存起始地址,而高八个字节存放的是网卡对报文进行预处理得到的一些信息,如报文长度,VLAN Tag以及校验和信息等,这部分信息会在网卡回写报文描述符给驱动的时候存到描述符对应的域中。对于一些比较固定的功能,比如报文相关校验和计算,VLAN头的解析等功能都可以卸载到网卡,由网卡来操作,这样可以加速报文的处理。

高级格式报文接收描述符

  相比于传统格式,高级格式的报文接收描述符可以用来支持更多的功能特性,如分离报文有效负载和报文头等。高级格式的报文描述符由于需要支持更多的功能特性,所以分为了读格式和回写格式。

  先来看下82599网卡中读格式的定义,如下图:

图2 高级格式报文接收描述符-读格式

  从图中可以看到,读格式的报文描述符中主要有四个部分,分别是报文缓冲区地址、A0位、头缓冲区地址和DD位。对于报文缓冲区地址和头缓冲区地址,顾名思义,存储的就是用来存放报文有效载荷和头部的缓冲区首地址。而对于DD位的作用,网卡驱动可以通过读取该位的值来判断该描述符对应的缓冲区中是否已经存放了网卡接收的报文。   再来看下82599网卡中回写格式的定义,如下图:

图3 高级格式报文接收描述符-回写格式

  回写代表的就是网卡往描述符对应的缓冲区中存放了报文数据,并将报文相关的元信息写入到描述符对应的域中,并设置DD位,以告诉网卡驱动该描述已经存放了报文信息。回写格式中涉及到很多和报文相关的信息,如接收报文时所使用的RSS类型,报文长度和报文接收状态等信息。这里不一一介绍,详细可以参考82599的datasheet。

报文接收描述符环形队列

  上面说到,报文接收描述符承载了报文从网卡流入到主存的过程,是网卡驱动和网卡都会操作的对象,那么自然而然会有以下几个疑问:

  1)、报文接收描述符是以何种组织形式在网卡驱动和网卡之间进行传递的?

  2)、网卡驱动怎么通知网卡报文接收描述符可用的?

  在报文接收流程中,报文接收描述符是通过环形队列来管理的,当然这个环形队列是逻辑上的,队列中的描述符在内存上是连续的。网卡或者网卡驱动在进行操作的时候,如果发现已经到达了队列的末尾,那么下次操作又会从队列头部开始,从而实现环形的操作逻辑。报文接收描述符环形队列的结构体如下:

图4 报文接收描述符环形队列结构

  对于第一和第二个问题,其中也已经在上面的描述符环形队列图中有体现。在对问题进行回答之前先要了解下82599网卡中和报文接收描述符环形队列相关的几个寄存器。

  1)、RDBA寄存器。这个寄存器存放了报文接收描述符环形队列的起始地址,也就是上图中Base指向的地址。

  2)、RDLEN寄存器。这个寄存器存放了报文接收描述符环形队列的长度,也就是接收描述符环形队列所占用的字节数,对应上图中的Size。

  3)、RDH寄存器。这个寄存器存放的是一个距离队列头部的偏移值,代表的是第一个可以被网卡用来存放报文信息的描述符。当网卡完成了将一个报文信息存放到描述符后,就会更新RDH寄存器的值,使之指向下一个即将用来存放报文信息的描述符。也就是说这个寄存器的值是由网卡来更新的,该寄存器对应上图中的Head。

  4)、RDT寄存器。这个寄存器存放的也是一个距离队列头部的偏移值,代表的是硬件可以用来存放报文信息的最后一个描述符的下一个描述符。当网卡驱动填充了报文描述中的报文缓冲区地址后就会更新该寄存器的值,使之指向下一个即将填充地址信息并给网卡使用的描述符,该寄存器对应上图中的Tail。

  在了解了这几个寄存器的作用之后,对于本节一开始提出的两个问题就比较容易知晓了。对于第一个问题,报文描述符是以环形队列的方式来组织的;对于第二个问题,因为网卡驱动在提供可用报文接收描述符给网卡后都会更新RDT寄存器的值,所以网卡可以根据RDT寄存器知道自己当前可用的描述符信息,简单来说RDH和RDT之间的描述符就是网卡可以使用的。

ixgbe网卡驱动数据包接收相关的配置

  第一部分已经讲了网卡对描述符的定义,以及网卡中用来操作描述符环形队列的几个相关的寄存器,对网卡是如何使用描述符有了一定的了解。这一部分我们一起来看下网卡驱动是如何使用描述符以及管理描述符环形队列的。

  报文接收描述符以及描述符环形队列是网卡和网卡驱动都会操作的对象,所以网卡和网卡驱动对接收报文描述符的定义也必须保持一致。与网卡相对应的,网卡驱动从软件的角度定义了接收报文描述符,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
union ixgbe_adv_rx_desc {
	struct {
		__le64 pkt_addr; /* Packet buffer address */
		__le64 hdr_addr; /* Header buffer address */
	} read;
	struct {
		struct {
			union {
				__le32 data;
				struct {
					__le16 pkt_info; /* RSS, Pkt type */
					__le16 hdr_info; /* Splithdr, hdrlen */
				} hs_rss;
			} lo_dword;
			union {
				__le32 rss; /* RSS Hash */
				struct {
					__le16 ip_id; /* IP id */
					__le16 csum; /* Packet Checksum */
				} csum_ip;
			} hi_dword;
		} lower;
		struct {
			__le32 status_error; /* ext status/error */
			__le16 length; /* Packet length */
			__le16 vlan; /* VLAN tag */
		} upper;
	} wb;  /* writeback */
};

  报文接收描述符环形队列是用做网络报文接收的,而在网卡中接收报文的最小单位是一个队列,即RX队列。所以一般来说就是一个RX队列对应一个报文接收描述符环形队列。

  从ixgbe驱动的实现可以知道,ixgbe使用一个叫做中断向量的对象来管理队列,其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct ixgbe_q_vector {
	struct ixgbe_adapter *adapter;
#ifdef CONFIG_IXGBE_DCA
	int cpu;      /* CPU for DCA */
#endif
	u16 v_idx;        /* index of q_vector within array, also used for
				 * finding the bit in EICR and friends that
				 * represents the vector for this ring */
	u16 itr;      /* Interrupt throttle rate written to EITR */

	/* 分别以链表方式管理中断向量中的rx和tx队列 */
	struct ixgbe_ring_container rx, tx;
	struct napi_struct napi;
	cpumask_t affinity_mask;
	int numa_node;
	struct rcu_head rcu;  /* to avoid race with update stats on free */
	char name[IFNAMSIZ + 9];

#ifdef CONFIG_NET_RX_BUSY_POLL
	atomic_t state;
#endif  /* CONFIG_NET_RX_BUSY_POLL */

	/* for dynamic allocation of rings associated with this q_vector */
	struct ixgbe_ring ring[0] ____cacheline_internodealigned_in_smp;
};

  在上面的定义中,struct ixgbe_q_vector对象最后一个类型为struct ixgbe_ring的柔性数组成员就是由该中断向量所管理的队列,这里包括了RX队列和TX队列。报文接收流程只需要关注其中的RX队列即可。一般来说一个中断向量会关联一个硬件中断。当网卡往中断向量中的某个RX队列的描述符中写入报文信息时,就会触发对应的硬件中断,然后中断子系统就会调用我们注册的中断处理函数来处理这个中断,在ixgbe驱动中对应的就是ixgbe_intr()(在msi-x中断模式下对应的是ixgbe_msix_clean_rings())。这里需要做一个说明,就是在legacy或者msi中断模式下,只会使用一个中断向量,对应的使用一个中断号;而在msi-x中断模式下,可能会有多个中断向量,对应的会有多个中断号,一般来说会把一个中断向量对应的中断号进行绑核处理,这样可以提高报文处理效率。而具体到某一个RX队列是如何同一个中断号进行关联的,这里还涉及到另外一个网卡寄存器,即Interrupt Vector Alloction(IVAR),这里不再详细介绍,可以参考ixgbe驱动的ixgbe_configure_msi_and_legacy()和ixgbe_configure_msix()函数,以及网卡中断部分的配置。

  在ixgbe网卡驱动的实现中,我们可以看到驱动是以一个叫做struct ixgbe_ring的对象来管理报文描述符环形队列(不管是接收还是发送),其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
struct ixgbe_ring {
	struct ixgbe_ring *next;  /* pointer to next ring in q_vector */
	struct ixgbe_q_vector *q_vector; /* backpointer to host q_vector */
	struct net_device *netdev;    /* netdev ring belongs to */
	struct device *dev;       /* device for DMA mapping */
	struct ixgbe_fwd_adapter *l2_accel_priv;

	/* 环形队列缓冲区中的报文描述符数组 */
	void *desc;           /* descriptor ring memory */

	/* 与报文描述符数组一一对应的报文缓冲区对象 */
	union {
		struct ixgbe_tx_buffer *tx_buffer_info;
		struct ixgbe_rx_buffer *rx_buffer_info;
	};
	unsigned long state;
	u8 __iomem *tail;  /* 指向RDT寄存器对应的内核虚拟地址 */

	/* 报文描述符数组对应的物理地址 */
	dma_addr_t dma;           /* phys. address of descriptor ring */
	unsigned int size;        /* length in bytes */

	/* 环形队列缓冲区中的报文描述符个数 */
	u16 count;            /* amount of descriptors */

	/*
	 * 环形队列缓冲区关联的rx队列索引,这个索引是用来在adapter->rx数组索引环形队列缓冲区的
	 */
	u8 queue_index; /* needed for multiqueue queue management */
	u8 reg_idx;           /* holds the special value that gets
					 * the hardware register offset
					 * associated with this ring, which is
					 * different for DCB and RSS modes
					 */
	/*
	 * next_to_use是环形队列缓冲区中将要提供给硬件使用的第一个报文描述符的索引,对应的就是RDT寄存器
	 * next_to_clean是环形队列缓冲区中驱动将要处理的第一个报文描述符的索引
	 */
	u16 next_to_use;
	u16 next_to_clean;

	unsigned long last_rx_timestamp;

	union {
		u16 next_to_alloc;
		struct {
			u8 atr_sample_rate;
			u8 atr_count;
		};
	};

	u8 dcb_tc;
	struct ixgbe_queue_stats stats;
	struct u64_stats_sync syncp;
	union {
		struct ixgbe_tx_queue_stats tx_stats;
		struct ixgbe_rx_queue_stats rx_stats;
	};
} ____cacheline_internodealigned_in_smp;

  struct ixgbe_ring对象中最重要的几个成员都已经做了注解,其中的desc成员就是报文描述符队列,从这里的实现也可以看出,报文描述符队列实际上是线性的,其逻辑上的环形操作是通过struct ixgbe_ring对象中的成员,如next_to_clean、next_to_alloc和next_to_use等来实现的。另外,struct ixgbe_ring对象中还有一个类型为dma_addr_t的dma成员,该成员就是desc成员对应的物理地址,有desc成员的内核虚拟地址进行一致性dma映射得到。这样ixgbe驱动可以通过desc来操作描述符环形队列,而网卡可以通过dma成员来操作描述符环形队列。

  下面一起来看下ixgbe驱动是如何建立一个描述符环形队列管理对象的。其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
int ixgbe_setup_rx_resources(struct ixgbe_ring *rx_ring)
{
	struct device *dev = rx_ring->dev;
	int orig_node = dev_to_node(dev);
	int ring_node = -1;
	int size;

	size = sizeof(struct ixgbe_rx_buffer) * rx_ring->count;

	if (rx_ring->q_vector)
		ring_node = rx_ring->q_vector->numa_node;

	rx_ring->rx_buffer_info = vzalloc_node(size, ring_node);
	if (!rx_ring->rx_buffer_info)
		rx_ring->rx_buffer_info = vzalloc(size);
	if (!rx_ring->rx_buffer_info)
		goto err;

	u64_stats_init(&rx_ring->syncp);

	/* Round up to nearest 4K */
	rx_ring->size = rx_ring->count * sizeof(union ixgbe_adv_rx_desc);
	rx_ring->size = ALIGN(rx_ring->size, 4096);

	set_dev_node(dev, ring_node);
	rx_ring->desc = dma_alloc_coherent(dev,
					   rx_ring->size,
					   &rx_ring->dma,
					   GFP_KERNEL);
	set_dev_node(dev, orig_node);
	if (!rx_ring->desc)
		rx_ring->desc = dma_alloc_coherent(dev, rx_ring->size,
						   &rx_ring->dma, GFP_KERNEL);
	if (!rx_ring->desc)
		goto err;

	rx_ring->next_to_clean = 0;
	rx_ring->next_to_use = 0;

	return 0;
err:
	vfree(rx_ring->rx_buffer_info);
	rx_ring->rx_buffer_info = NULL;
	dev_err(dev, "Unable to allocate memory for the Rx descriptor ring\n");
	return -ENOMEM;
}

  函数ixgbe_setup_rx_resources()处理流程很清晰:

  1)、根据之前配置好的环形队列中报文接收描述符个数申请报文描述符数组所需要的内存,以及对应的用来管理报文缓冲区地址信息的缓冲区对象,这个时候缓冲区对象中用来存放报文内容的地址仍然是无效的,因为还没有申请内存,在函数ixgbe_alloc_rx_buffers()处理完成之后,缓冲区对象中存放报文内容的地址就是有效的,可以提供给网卡用来存放报文数据。此外,对报文接收描述符数组内存进行一致性dma映射,获取对应的物理地址,网卡需要使用物理地址,而不是虚拟地址。

  2)、初始化描述符环形队列操作所涉及到的索引成员,包括next_to_use和next_to_clean。

  经过ixgbe_setup_rx_resources()函数的处理,就已经成功创建了一个描述符环形的管理对象。接下来就需要告诉网卡这个描述符环形队列的信息,这个就是函数ixgbe_configure_rx_ring()所要做的事情了,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
void ixgbe_configure_rx_ring(struct ixgbe_adapter *adapter,
			     struct ixgbe_ring *ring)
{
	struct ixgbe_hw *hw = &adapter->hw;

	/* 环形队列缓冲区中报文描述符数组对应的物理地址 */
	u64 rdba = ring->dma;
	u32 rxdctl;
	u8 reg_idx = ring->reg_idx;

	/* disable queue to avoid issues while updating state */
	rxdctl = IXGBE_READ_REG(hw, IXGBE_RXDCTL(reg_idx));
	ixgbe_disable_rx_queue(adapter, ring);

	/*
	 * 将报文描述符数组的首地址写入到RDBAH和RDBAL寄存器中,并将描述符数组的长度
	 * 写入到RDLEN寄存器中,这样网卡芯片就知道了报文描述符的信息,后续可以收到
	 * 合适的网络报文后,就会将报文存放到描述符里面的dma地址中,并递增内部的
	 * head寄存器值
	 */
	IXGBE_WRITE_REG(hw, IXGBE_RDBAL(reg_idx), (rdba & DMA_BIT_MASK(32)));
	IXGBE_WRITE_REG(hw, IXGBE_RDBAH(reg_idx), (rdba >> 32));
	IXGBE_WRITE_REG(hw, IXGBE_RDLEN(reg_idx),
			ring->count * sizeof(union ixgbe_adv_rx_desc));
	/* Force flushing of IXGBE_RDLEN to prevent MDD */
	IXGBE_WRITE_FLUSH(hw);

	/*
	 * 初始状态下,网卡芯片的head和tail指针都为0,表示网卡没有可用的报文描述符
	 * 等后面驱动申请了n个报文描述符中的dma地址后,就会将tail寄存器值设置为n,
	 * 表示目前网卡可用的报文描述符数量为n个。这样,等网卡收到了合适的报文之后
	 * 就会存到报文描述符中的dma地址处。
	 */
	IXGBE_WRITE_REG(hw, IXGBE_RDH(reg_idx), 0);
	IXGBE_WRITE_REG(hw, IXGBE_RDT(reg_idx), 0);
	ring->tail = adapter->io_addr + IXGBE_RDT(reg_idx);

	ixgbe_configure_srrctl(adapter, ring);
	ixgbe_configure_rscctl(adapter, ring);

	if (hw->mac.type == ixgbe_mac_82598EB) {
		/*
		 * enable cache line friendly hardware writes:
		 * PTHRESH=32 descriptors (half the internal cache),
		 * this also removes ugly rx_no_buffer_count increment
		 * HTHRESH=4 descriptors (to minimize latency on fetch)
		 * WTHRESH=8 burst writeback up to two cache lines
		 */
		rxdctl &= ~0x3FFFFF;
		rxdctl |=  0x080420;
	}

	/* enable receive descriptor ring */
	rxdctl |= IXGBE_RXDCTL_ENABLE;
	IXGBE_WRITE_REG(hw, IXGBE_RXDCTL(reg_idx), rxdctl);

	ixgbe_rx_desc_queue_enable(adapter, ring);

	/* 申请报文描述符中用于存储报文数据的内存 */
	ixgbe_alloc_rx_buffers(ring, ixgbe_desc_unused(ring));
}

  从该函数的实现就可以看到,网卡驱动就是通过将接收报文描述符数组对应的物理地址写入到RDBA寄存器,并初始化RDH和RDT寄存器。通过写RDBA、RDH和RDT寄存器,网卡就知道了当前的描述符环形队列的信息。接着调用函数ixgbe_alloc_rx_buffers()申请用来存放报文数据的内存,并将对应的物理地址保存到接收描述符中,然后设置RDT寄存器,这样网卡就可以使用RDH和RDT之间的描述符进行接收报文处理了,ixgbe_alloc_rx_buffers()函数的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
void ixgbe_alloc_rx_buffers(struct ixgbe_ring *rx_ring, u16 cleaned_count)
{
	union ixgbe_adv_rx_desc *rx_desc;
	struct ixgbe_rx_buffer *bi;
	u16 i = rx_ring->next_to_use;

	/* nothing to do */
	if (!cleaned_count)
		return;

	/*
	 * 获取下一个将要提供给硬件使用的报文描述符(对应的索引为rx_ring->next_to_use),
	 * 以及报文描述符对应的缓冲区对象,缓冲区对象中保存了用于存放报文数据的内存地址信息,
	 * 当然用于存放报文的内存对应的物理地址也会保存到报文描述符中。
	 */
	rx_desc = IXGBE_RX_DESC(rx_ring, i);
	bi = &rx_ring->rx_buffer_info[i];

	/*
	 * 这个地方执行这个计算的目的是什么呢?我们知道报文描述符队列在逻辑上是环形的(
	 * 实际上是线性的,因为内存地址是线性分布的),当我们操作这个队列到达末尾的时候,
	 * 通过将索引重新指向队列开头来实现环形操作。所以呢,在计算之后,i表示的就是
	 * 目前位置距离队列末尾之间还没有提供给硬件使用的报文描述符个数的相反数,也就是
	 * 当前处理位置和队列末尾距离。
	 * 在下面的循环中,每处理一个报文描述符(申请用于存放报文数据的内存)都会将i递增,
	 * 当i等于0的时候,说明达到了队列的末尾,下次处理就要从队列头开始了,从而实现
	 * 队列的环形操作。
	 */
	i -= rx_ring->count;

	do {
		/*
		 * 申请用于存放报文数据的内存,并进行dma流式映射
		 */
		if (!ixgbe_alloc_mapped_page(rx_ring, bi))
			break;

		/*
		 * Refresh the desc even if buffer_addrs didn't change
		 * because each write-back erases this info.
		 */
		/* rx_desc->read.pkt_addr存放的地址就是用于存放报文的dma起始地址 */
		rx_desc->read.pkt_addr = cpu_to_le64(bi->dma + bi->page_offset);

		/* rx_desc和bi递增,指向下一个描述符和对应的缓冲区对象 */
		rx_desc++;
		bi++;
		i++;
		/*
		 * 如果i == 0,说明操作环形队列缓冲区已经转了一圈了,这个时候就需要重新让
		 * rx_desc和bi分别指向描述符数组和缓冲区数组的起始位置,从头开始处理,当然
		 * 对应的i值也就要重新计算了,此时的值为队列中描述符个数的相反数。
		 */
		if (unlikely(!i)) {
			/*
			 * 考虑下为什么描述符环形队列中已经被网卡使用过的描述符中存放报文内容的
			 * 内存需要重新申请并进行流式dma映射呢?我们知道,一个描述符中用来存放
			 * 报文的内存(实际上是一个页),接收完报文后如果空间足够,有可能被其他描述符
			 * 重用,或者报文较大而产生分片,这个时候并不会从描述符中存放报文的内存中
			 * 将报文数据拷贝到skb->data中,而是将描述符中存放报文的页内存挂载到
			 * skb_shinfo(skb)->frags数组中,无论前面的哪种情况,本描述符中用于
			 * 存放报文数据的内存在本描述符用于接收报文之后都不能再被该描述符继续使用了,
			 * 所需每次都需要重新申请内存,或者重用之前的报文描述符的页内存。这也是为什么
			 * 在函数ixgbe_fetch_rx_buffer()末尾会将rx_buffer->page置空的原因。
			 */
			rx_desc = IXGBE_RX_DESC(rx_ring, 0);
			bi = rx_ring->rx_buffer_info;
			i -= rx_ring->count;
		}

		/* clear the status bits for the next_to_use descriptor */
		rx_desc->wb.upper.status_error = 0;

		cleaned_count--;
	} while (cleaned_count);

	/*
	 * i加上rx_ring->count之后指向的就是最后一个可用(对网卡芯片来说)的报文描述符的
	 * 下一个位置,,这个时候需要将这个索引值i写入到网卡芯片的tail寄存器中,让网卡
	 * 芯片知道目前可用的报文描述数量(tail - head)
	 */
	i += rx_ring->count;

	if (rx_ring->next_to_use != i) {
		/*
		 * 因为i指向的是最后一个可用报文描述符的下一个位置,这个位置也是下一次要
		 * 提供给网卡芯片使用的报文描述符的位置
		 */
		rx_ring->next_to_use = i;

		/* update next to alloc since we have filled the ring */
		rx_ring->next_to_alloc = i;

		/* Force memory writes to complete before letting h/w
		 * know there are new descriptors to fetch.  (Only
		 * applicable for weak-ordered memory model archs,
		 * such as IA-64).
		 */
		wmb();
		/* 将i值写入到tail寄存器中 */
		writel(i, rx_ring->tail);
	}
}

  补充说明:RDT寄存器由网卡驱动在提供报文接收描述符给网卡之后更新,而RDH寄存器由网卡在回写一个报文接收描述符给驱动之后更新。

ixgbe网卡驱动napi接口的处理

  NAPI是Linux中综合了中断和轮询方式的网卡数据处理API。下面描述下ixgbe中是如何使用NAPI方式来进行收包处理的。

NAPI对象

  在Linux中,NAPI接口提供了一个NAPI对象,这个是设备使用NAPI接口进行数据包处理的必要条件,先来看下其定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct napi_struct {
	/* The poll_list must only be managed by the entity which
	 * changes the state of the NAPI_STATE_SCHED bit.  This means
	 * whoever atomically sets that bit can add this napi_struct
	 * to the per-CPU poll_list, and whoever clears that bit
	 * can remove from the list right before clearing the bit.
	 */
	struct list_head  poll_list;

	unsigned long     state;
	int           weight;
	unsigned int      gro_count;
	int           (*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLL
	spinlock_t        poll_lock;
	int           poll_owner;
#endif
	struct net_device *dev;
	struct sk_buff        *gro_list;
	struct sk_buff        *skb;
	struct hrtimer        timer;
	struct list_head  dev_list;
	struct hlist_node napi_hash_node;
	unsigned int      napi_id;
};

  一般来说,如果某个设备要使用NAPI接口进行数据包处理,那么该设备会在自己的设备对象中定义一个struct napi_struct类型的对象成员。在第二部分讲到过,ixgbe驱动中每个中断向量会关联一个中断号,从而在硬中断处理函数能获取到中断向量,而如果利用NAPI进行数据包处理的话,也就必须要获取到对应的struct napi_struct类型的对象,所以自然而然地ixgbe驱动将struct napi_struct类型的对象定义在了中断向量中。

  下面对其中的部分重要成员进行简单的介绍:

  1)、 poll_list。用于将本设备加入到cpu私有数据中类型为struct softnet_data的对象的待轮询设备链表中。

  2)、state。设备的状态,有如下几种:

1
2
3
4
5
6
7
enum {
	NAPI_STATE_SCHED, /* Poll is scheduled */
	NAPI_STATE_DISABLE,   /* Disable pending */
	NAPI_STATE_NPSVC, /* Netpoll - don't dequeue from poll_list */
	NAPI_STATE_HASHED,    /* In NAPI hash (busy polling possible) */
	NAPI_STATE_NO_BUSY_POLL,/* Do not add in napi_hash, no busy polling */
};

  3)、weight。设备每次轮询所能处理的包的最大数量。

  4)、poll。设备注册的轮询回调,在该回调中一般会遍历设备的所有rx队列,取出报文,送往上层处理。

NAPI初始化

  从驱动实现我们知道,ixgbe驱动在中断向量中定义了一个类型为struct napi_struct的NAPI实例。在ixgbe驱动初始化的时候,会在创建中断向量的时候初始化其对应NAPI实例,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static int ixgbe_alloc_q_vector(struct ixgbe_adapter *adapter,
				int v_count, int v_idx,
				int txr_count, int txr_idx,
				int rxr_count, int rxr_idx)
{
	struct ixgbe_q_vector *q_vector;
	struct ixgbe_ring *ring;
	int node = NUMA_NO_NODE;
	int cpu = -1;
	int ring_count, size;
	u8 tcs = netdev_get_num_tc(adapter->netdev);

	/* 计算这个中断向量所需要申请的环形队列缓冲区的总数量 */
	ring_count = txr_count + rxr_count;

	/* 申请中断向量内存以及环形队列缓冲区对应的柔性数组内存。 */
	size = sizeof(struct ixgbe_q_vector) +
	       (sizeof(struct ixgbe_ring) * ring_count);
	……
	/* allocate q_vector and rings */
	/* numa架构下,在cpu所在的本地内存申请中断向量所需内存 */
	q_vector = kzalloc_node(size, GFP_KERNEL, node);
	if (!q_vector)
		q_vector = kzalloc(size, GFP_KERNEL);
	if (!q_vector)
		return -ENOMEM;

	/* initialize NAPI */
	/* 初始化napi收包方式 */
	netif_napi_add(adapter->netdev, &q_vector->napi,
		       ixgbe_poll, 64);
	……
	return 0;
}

  从函数ixgbe_alloc_q_vector()调用netif_napi_add()初始化NAPI对象可以看到,ixgbe驱动注册的poll回调钩子是ixgbe_poll(),而每次轮询最大可处理的数据包为64个。

NAPI调度

  在ixgbe驱动中因为使用了NAPI接口进行数据包处理,所以对应的上半部实现就变成了当硬中断触发后,在硬中断处理函数中调用NAPI的调度接口napi_schedule_irqoff()将设备加入到cpu私有数据中类型为struct softnet_data的对象的待轮询设备链表中,并触发软中断。以msi-x中断模式为例,其对应的具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
static irqreturn_t ixgbe_msix_clean_rings(int irq, void *data)
{
	struct ixgbe_q_vector *q_vector = data;

	/* EIAM disabled interrupts (on this vector) for us */

	if (q_vector->rx.ring || q_vector->tx.ring)
		napi_schedule_irqoff(&q_vector->napi);

	return IRQ_HANDLED;
}

  而下半部的处理就是在网络子系统的软中断处理函数net_rx_action()中遍历cpu私有数据中类型为struct softnet_data的对象中的待轮询设备链表,依次调用每个设备注册的poll回调钩子进行报文接收处理,其对应的具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static __latent_entropy void net_rx_action(struct softirq_action *h)
{
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
	unsigned long time_limit = jiffies + 2;
	int budget = netdev_budget;
	LIST_HEAD(list);
	LIST_HEAD(repoll);

	local_irq_disable();
	list_splice_init(&sd->poll_list, &list);
	local_irq_enable();

	for (;;) {
		struct napi_struct *n;

		if (list_empty(&list)) {
			if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
				return;
			break;
		}

		n = list_first_entry(&list, struct napi_struct, poll_list);
		budget -= napi_poll(n, &repoll);

		/* If softirq window is exhausted then punt.
		 * Allow this to run for 2 jiffies since which will allow
		 * an average latency of 1.5/HZ.
		 */
		if (unlikely(budget <= 0 ||
			     time_after_eq(jiffies, time_limit))) {
			sd->time_squeeze++;
			break;
		}
	}

	__kfree_skb_flush();
	local_irq_disable();

	list_splice_tail_init(&sd->poll_list, &list);
	list_splice_tail(&repoll, &list);
	list_splice(&list, &sd->poll_list);
	if (!list_empty(&sd->poll_list))
		__raise_softirq_irqoff(NET_RX_SOFTIRQ);

	net_rps_action_and_irq_enable(sd);
}

  上面说到过,在下半部的软中断处理函数中会调用设备注册的回调函数poll进行收包处理,而ixgbe驱动中对应的轮询回调函数就是ixgbe_poll()。在这个函数中会遍历NAPI对象关联的中断向量中的所有RX队列,将收到的每一个报文通过调用函数__netif_receive_skb()送往上层协议栈进行处理,具体处理细节可以参考驱动实现。

  通过上面对ixgbe驱动中使用NAPI接口的描述,我们可以总结出NAPI接口的数据包接收流程如下:

图5 NAPI调度流程

  注:上面的流程图中NAPI假设上层会关闭和打开的硬中断

基于82599网卡的二层网络数据包发送

https://tqr.ink/2017/05/01/intel-82599-transmit-packet/

这篇文档主要介绍了网络数据包在二层的发送流程。网络数据包在二层的发送主要包括了网络设备层和驱动层两个部分,所以下面将会从这两个方面讲述报文在二层的发送流程。

网络设备层在报文发送时的处理流程

  当网络协议栈上层准备好了待发送的报文,即构造了一个管理着待发送报文数据的skb对象之后,便会调用网络设备层的主入口函数dev_queue_xmit()进行后续的发送处理。

  当上层已经准备好的skb对象达到网络设备层之后,一般来说并不是直接交给网卡驱动的(在没有设置TCQ_F_CAN_BYPASS的情况下),而是会用类型为struct netdev_queue的发送队列先将skb对象缓存起来,接着依次处理发送队列中的skb对象,将其中的报文数据交给网卡发送出去。而在类型为struct netdev_queue的发送队列中,真正用来缓存skb对象的则是类型为struct Qdisc的实例,该类型的实例通常会实现一组出入队列的回调函数,来实现skb的缓存,重传和移除等操作。当发送队列中struct Qdisc的实例设置了TCQ_F_CAN_BYPASS标志的时候,会将上层下发的skb对象直接通过网卡驱动交给网卡进行发送。

  下图是网络设备层的报文发送主要流程:

图1 网络设备层报文发送流程图

  从上图中我们可以看到在网络设备层用struct Qdisc队列实例来缓存skb对象时,会调用函数 __qdisc_run() 来处理struct Qdisc队列实例中的skb对象,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void __qdisc_run(struct Qdisc *q)
{
  int quota = weight_p;
  int packets;

  /*
   * 循环发送qdisc队列中的报文,直到达到了发送阈值,或者队列中的报文发送完毕,
   * 或者时间片到了,其他进程需要使用cpu
   */
  while (qdisc_restart(q, &packets)) {
      /*
       * Ordered by possible occurrence: Postpone processing if
       * 1. we've exceeded packet quota
       * 2. another process needs the CPU;
       */
      quota -= packets;

      /*
       * 如果quota <= 0,说明qdisc队列中仍然有报文没有发送完,这个时候需要触发
       * 软中断,在软中断处理函数中发送剩余报文
       */
      if (quota <= 0 || need_resched()) {
          __netif_schedule(q);
          break;
      }
  }

  qdisc_run_end(q);
}

  从 __qdisc_run() 函数的实现我们可以看到会有如下两种情况发生:

  1) 一次性将struct Qdisc队列实例中所有的skb对象通过网卡驱动交给网卡发送出去。

  2) 在某次处理struct Qdisc队列实例中的skb对象时,由于某些原因中途停止了,队列实例中可能还有skb对象没有处理完。

  当struct Qdisc队列实例中还有skb对象没有处理完时,就会调用netif_schedule()函数触发一次发送软中断(NET_TX_SOFTIRQ),并将struct Qdisc队列实例加入到cpu私有数据对象softnet_data的output_queue链表成员中,在软中断中会遍历output_queue,继续处理其中的struct Qdisc队列实例剩余的skb对象。发送软中断处理函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static __latent_entropy void net_tx_action(struct softirq_action *h)
{
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);

	……
	/*
	 * softnet_data->output_queue链表不为空,说明其中存有数据没有发送完毕的qdisc
	 * 队列,那么这个时候需要调用qdisc_run()尝试将队列中的报文发送出去
	 */
	if (sd->output_queue) {
		struct Qdisc *head;

		local_irq_disable();
		head = sd->output_queue;
		sd->output_queue = NULL;
		sd->output_queue_tailp = &sd->output_queue;
		local_irq_enable();

		while (head) {
			struct Qdisc *q = head;
			spinlock_t *root_lock;

			head = head->next_sched;

			root_lock = qdisc_lock(q);
			spin_lock(root_lock);
			/* We need to make sure head->next_sched is read
			 * before clearing __QDISC_STATE_SCHED
			 */
			smp_mb__before_atomic();
			clear_bit(__QDISC_STATE_SCHED, &q->state);
			qdisc_run(q);
			spin_unlock(root_lock);
		}
	}
}

  在net_tx_action()函数的实现中可以看到,其间接又调用了 __qdisc_run() 函数,这说明只要struct Qdisc队列实例中有skb对象没有处理完,就会继续触发发送软中断直到所有的队列中所有的skb对象都被处理完。

ixgbe驱动中和数据发送相关的内容

  在二层网络报文接收流程中,接收报文描述符承载了报文从网卡到主存的过程,与之相对应的,发送报文描述符则承载了报文从主存到网卡的过程。对于网卡驱动而言,当收到来自协议栈上层下发的网络报文时,网卡驱动会将存放着报文数据的地址写入到报文发送描述符中,并将填充了报文地址信息的描述符传递给网卡,而网卡则从报文发送描述符中存放的地址中读取报文数据。

报文发送描述符

  对于82599网卡而言,其支持两种格式的报文发送描述符,即传统格式和高级格式。虽然有两种不同格式的报文发送描述符,但是两种格式的报文发送描述符所占用的内存大小是一样的(目前为16字节),只是对这块内存使用有所不同。对于两种不同格式的报文发送描述符,可以通过设置报文发送描述符中的TDESC.DEXT位进行区分,当该位设置为0的时候,表明使用的是传统格式;当该位设置为1的时候,表明使用的是高级格式。下面介绍高级格式的报文发送描述符。

  相比于传统格式,高级格式的报文发送描述符可以用来支持更多的功能特性。高级格式的报文描述符由于需要支持更多的功能特性,所以分为了读格式和回写格式。

  先来看下82599网卡中读格式的定义,如下图:

图2 高级格式报文发送描述符-读格式

  从图中可以看到,读格式的报文发送描述符中主要包含了报文数据所在的内存地址和一些报文元信息,如报文长度等。这里不再详述,详细可以参考82599网卡的datasheet。

  再来看下82599网卡中回写格式的定义:

图3 高级格式报文发送描述符-回写格式

  从图中可以看到,回写格式的报文发送描述符中有效的域很少,只有STA,而STA中有效位有只有DD位,网卡驱动可以通过该位是否被置位来判断报文发送描述符对应的报文数据是否已经被网卡处理过了。

  在初始化阶段,网卡驱动会申请一定数量的报文发送描述符,并将这些内存进行dma一致性映射,获取对应的物理地址,并写入到网卡的寄存器中,这样网卡驱动和网卡就能同时操作这些报文发送描述符了。当网卡驱动收到协议栈下发的skb对象后,会将skb对象中存放的报文数据进行dma映射,获取对应的物理地址,并存放到报文发送描述符中对应的成员中,这样网卡就能从报文发送描述符中获取存放了报文数据的物理地址,然后从该地址中读取报文数据,并发送到网络中去。

报文发送描述符环形队列

  上面说到,报文发送描述符承载了报文从主存流入到网卡的过程,是网卡驱动和网卡都会操作的对象,那么自然而然会有以下几个疑问:

  1)、报文发送描述符是以何种组织形式在网卡驱动和网卡之间进行传递的?

  2)、网卡驱动怎么通知网卡报文发送描述符可用的?

  在报文发送流程中,报文发送描述符是通过环形队列来管理的,当然这个环形队列是逻辑上的,队列中的描述符在内存上是连续的。网卡或者网卡驱动在进行操作的时候,如果发现已经到达了队列的末尾,那么下次操作又会从队列头部开始,从而实现环形的操作逻辑。报文发送描述符环形队列的结构体如下:

图4 报文发送描述符环形队列结构

  对于第一和第二个问题,其中也已经在上面的描述符环形队列图中有体现。在对问题进行回答之前先要了解下82599网卡中和报文发送描述符环形队列相关的几个寄存器。

  1)、TDBA寄存器。这个寄存器存放了报文发送描述符环形队列的起始地址,也就是上图中Base指向的地址。

  2)、TDLEN寄存器。这个寄存器存放了报文发送描述符环形队列的长度,也就是报文发送描述符环形队列所占用的字节数,对应上图中的Size。

  3)、TDH寄存器。这个寄存器存放的是一个距离队列头部的偏移值,代表的是第一个填充了报文地址信息的描述符。当网卡处理完一个描述符对应的报文数据后,就会更新TDH寄存器的值,使之指向下一个填充了报文地址信息的描述符。也就是说这个寄存器的值是由网卡来更新的,该寄存器对应上图中的Head。

  4)、TDT寄存器。这个寄存器存放的也是一个距离队列头部的偏移值,代表的是最后一个存放了报文地址信息的描述符的下一个描述符。当网卡驱动将一个skb对象中所有的数据分段对应的物理地址都填充到了对应的报文发送描述符中后,就会更新该寄存器的值,使之指向下一个即将被填充报文地址信息并给网卡使用的描述符,该寄存器对应上图中的Tail。

  在了解了这几个寄存器的作用之后,对于本节一开始提出的两个问题就比较容易知晓了。对于第一个问题,报文描述符是以环形队列的方式来组织的;对于第二个问题,因为网卡驱动在填充完一个skb对象中数据分段的报文地址信息到报文发送描述符后,网卡驱动都会更新TDT寄存器的值,所以网卡可以根据TDT寄存器知道自己当前可用的描述符信息,简单来说TDH和TDT之间的描述符就是网卡可以使用的。

数据分段和报文发送描述符关系

  上面说到网卡驱动会将skb对象中存放了报文数据的内存进行dma映射,并将得到的物理地址存放到报文发送描述符中,而一个skb对象中可能存有多个数据分段,对于这种情况,网卡驱动则会将一个数据分段对应一个报文发送描述符,其对应关系如下图:

图5 数据分段与报文发送描述符对应关系

报文发送描述符的回收

当网卡完成报文发送之后,就会触发硬件中断。这里需要注意的是,新的数据包达到或者外发数据包的传输已经完成所触发的中断对应的中断号是同一个,所以在这个中断号对应的中断处理函数中需要考虑到是新的数据包达到所产生的中断,还是外发数据包的传输已经完成触发的中断。

  另外,ixgbe驱动中因为使用了NAPI的收包方式,所以在中断处理函数中只是调用NAPI模块调度接口napi_schedule_irqoff()将设备加入到cpu私有数据中类型为struct softnet_data的对象的待轮询设备链表中,并触发软中断,而在软中断处理函数net_rx_action()中又只会调用设备注册的NAPI回调函数poll。所以无论是新的数据包达到,或者是外发数据包的传输已经完成所触发的中断,最终都会调用设备注册给NAPI接口的poll回调函数,因此报文发送描述符的回收也是在这个函数中完成的,在ixgbe驱动中,poll回调函数就是ixgbe_poll()。在ixgbe_poll()函数中又会调用ixgbe_clean_tx_irq()函数来完成报文发送描述符的回收,该函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
static bool ixgbe_clean_tx_irq(struct ixgbe_q_vector *q_vector,
			       struct ixgbe_ring *tx_ring, int napi_budget)
{
	struct ixgbe_adapter *adapter = q_vector->adapter;
	struct ixgbe_tx_buffer *tx_buffer;
	union ixgbe_adv_tx_desc *tx_desc;
	unsigned int total_bytes = 0, total_packets = 0;
	unsigned int budget = q_vector->tx.work_limit;
   
	/* 获取第一个可以被网卡驱动处理的描述符索引 */
	unsigned int i = tx_ring->next_to_clean;

	if (test_bit(__IXGBE_DOWN, &adapter->state))
		return true;

	tx_buffer = &tx_ring->tx_buffer_info[i];
	tx_desc = IXGBE_TX_DESC(tx_ring, i);
	i -= tx_ring->count;

	do {

		/*
		 * tx_buffer->next_to_watch保存的是环形队列中第一个没有存放某个报文数据
		 * 的报文发送描述符
		 */
		union ixgbe_adv_tx_desc *eop_desc = tx_buffer->next_to_watch;

		/* if next_to_watch is not set then there is no work pending */
		/*
		 * 如果某个报文发送描述符对应的报文缓冲区的next_to_watch成员没有设置,
		 * 说明这个缓冲区对象并不是某个报文对应的第一个缓冲区(当报文以共享方式
		 * 存放的时候,一个报文可能会对应多个缓冲区)。
		 */
		if (!eop_desc)
			break;

		read_barrier_depends();

		/*
		 * 如果eop_desc描述符对应的dd为没有被设置,说明网卡还没有处理完属于该报文
		 * 对应的所有缓冲区,所以就暂时不处理对应的描述符了,而是要等到属于该报文
		 * 的所有描述符都被网卡处理完了之后才去处理属于该报文的所有描述符
		 */
		if (!(eop_desc->wb.status & cpu_to_le32(IXGBE_TXD_STAT_DD)))
			break;

		tx_buffer->next_to_watch = NULL;

		total_bytes += tx_buffer->bytecount;
		total_packets += tx_buffer->gso_segs;

		napi_consume_skb(tx_buffer->skb, napi_budget);

		/* 取消skb->data指向内存的dma映射,以让cpu可以使用该块内存 */
		dma_unmap_single(tx_ring->dev,
				 dma_unmap_addr(tx_buffer, dma),
				 dma_unmap_len(tx_buffer, len),
				 DMA_TO_DEVICE);

		tx_buffer->skb = NULL;
		dma_unmap_len_set(tx_buffer, len, 0);

		/* 取消skb中分片数据对应内存的dma映射,以让cpu可以使用该块内存 */
		while (tx_desc != eop_desc) {
			tx_buffer++;
			tx_desc++;
			i++;
			if (unlikely(!i)) {
				i -= tx_ring->count;
				tx_buffer = tx_ring->tx_buffer_info;
				tx_desc = IXGBE_TX_DESC(tx_ring, 0);
			}

			/* unmap any remaining paged data */
			if (dma_unmap_len(tx_buffer, len)) {
				dma_unmap_page(tx_ring->dev,
					       dma_unmap_addr(tx_buffer, dma),
					       dma_unmap_len(tx_buffer, len),
					       DMA_TO_DEVICE);
				dma_unmap_len_set(tx_buffer, len, 0);
			}
		}

		/* move us one more past the eop_desc for start of next pkt */
		tx_buffer++;
		tx_desc++;
		i++;
		if (unlikely(!i)) {
			i -= tx_ring->count;
			tx_buffer = tx_ring->tx_buffer_info;
			tx_desc = IXGBE_TX_DESC(tx_ring, 0);
		}

		/* issue prefetch for next Tx descriptor */
		prefetch(tx_desc);

		/* update budget accounting */
		budget--;
	} while (likely(budget));

	/* 更新环形队列中的next_to_clean */
	i += tx_ring->count;
	tx_ring->next_to_clean = i;
	……

	return !!budget;
}

  到这里,报文在二层的发送流程就介绍完了。