kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp
sar -n DEV 1

qdisc 的创建过程

http://blog.chinaunix.net/uid-26902809-id-4106161.html

register_netdevice会初始化netdev的Tx调度discipline, 缺省使用noop_qdisc

1
2
3
4
5
6
7
8
9
10
11
12
13
register_netdevice
--->dev_init_scheduler


void dev_init_scheduler(struct net_device *dev)
{
	dev->qdisc = &noop_qdisc; //缺省为设备配置noop_qdisc
	netdev_for_each_tx_queue(dev, dev_init_scheduler_queue, &noop_qdisc); //缺省为每个队列配置noop_qdisc
	if (dev_ingress_queue(dev))
		dev_init_scheduler_queue(dev, dev_ingress_queue(dev), &noop_qdisc);

	setup_timer(&dev->watchdog_timer, dev_watchdog, (unsigned long)dev);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
dev_open
--->__dev_open
---->dev_activate
---->attach_default_qdiscs
---->attach_one_default_qdisc
为单队列的设备创建pfifo_fast的qdisc

static void attach_one_default_qdisc(struct net_device *dev,
				struct netdev_queue *dev_queue,
				void *_unused)
{
	struct Qdisc *qdisc = &noqueue_qdisc;

	if (dev->tx_queue_len) {
		qdisc = qdisc_create_dflt(dev_queue,
				&pfifo_fast_ops, TC_H_ROOT);
		if (!qdisc) {
			netdev_info(dev, "activation failed\n");
			return;
		}
	}
	dev_queue->qdisc_sleeping = qdisc;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
dev_open
--->__dev_open
---->dev_activate
---->attach_default_qdiscs
---->qdisc_create_dflt
为多队列的设备创建mq_qdisc, 创建完mq_qdisc, 接着调用mq_qdisc_ops->mq_init函数为每个队列创建pfifo_fast_ops的qdisc

struct Qdisc *qdisc_create_dflt(struct netdev_queue *dev_queue,
			struct Qdisc_ops *ops, unsigned int parentid)
{
	struct Qdisc *sch;

	sch = qdisc_alloc(dev_queue, ops);
	if (IS_ERR(sch))
		goto errout;
	sch->parent = parentid;

	if (!ops->init || ops->init(sch, NULL) == 0)
		return sch;

	qdisc_destroy(sch);
errout:
	return NULL;
}
EXPORT_SYMBOL(qdisc_create_dflt);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
dev_open
--->__dev_open
---->dev_activate
---->attach_default_qdiscs
static void attach_default_qdiscs(struct net_device *dev)
{
	struct netdev_queue *txq;
	struct Qdisc *qdisc;

	txq = netdev_get_tx_queue(dev, 0);

	if (!netif_is_multiqueue(dev) || dev->tx_queue_len == 0) {
		netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL);
		dev->qdisc = txq->qdisc_sleeping;
		atomic_inc(&dev->qdisc->refcnt);
	} else {
		qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);
		if (qdisc) {
			qdisc->ops->attach(qdisc);
			dev->qdisc = qdisc;
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
dev_open函数会调用dev_activate:
a. 为单队列的设备创建pfifo_fast的qdisc
b. 为多队列的设备创建mq_qdisc, 创建完mq_qdisc, 接着调用mq_qdisc_ops->mq_init函数为每个队列创建pfifo_fast_ops的qdisc
dev_open
--->__dev_open
---->dev_activate

void dev_activate(struct net_device *dev)
{
	int need_watchdog;

	/* No queueing discipline is attached to device;
	   create default one i.e. pfifo_fast for devices,
	   which need queueing and noqueue_qdisc for
	   virtual interfaces
	*/

	if (dev->qdisc == &noop_qdisc)
		attach_default_qdiscs(dev);

	if (!netif_carrier_ok(dev))
	/* Delay activation until next carrier-on event */
		return;

	need_watchdog = 0;
	netdev_for_each_tx_queue(dev, transition_one_qdisc, &need_watchdog);
	if (dev_ingress_queue(dev))
		transition_one_qdisc(dev, dev_ingress_queue(dev), NULL);

	if (need_watchdog) {
		dev->trans_start = jiffies;
		dev_watchdog_up(dev);
	}
}

qdisc实现分析

1
2
3
4
tc qdisc show
echo pfifo > /proc/sys/net/core/default_qdisc
tc qdisc add dev eth0 root pfifo
tc qdisc del dev eth0 root

https://github.com/liucimin/Learning/blob/master/linux%E7%BD%91%E7%BB%9C%E7%9B%B8%E5%85%B3/Tc%20%E7%BD%91%E5%8D%A1%E5%A4%9A%E9%98%9F%E5%88%97%E6%97%B6%E6%AF%8F%E4%B8%AA%E9%98%9F%E5%88%97%E9%85%8D%E7%BD%AE%E5%85%AC%E5%B9%B3%E9%98%9F%E5%88%97sfq.md

http://man7.org/linux/man-pages/man8/tc-fq_codel.8.html


https://blog.csdn.net/one_clouder/article/details/52685249

二层发送中,实现qdisc的主要函数是 __dev_xmit_skb 和 net_tx_action,本篇将分析qdisc实现的原理,但是不涉及qdisc内部的算法,仅对框架进行分析。

1、__dev_xmit_skb 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
				 struct net_device *dev,
				 struct netdev_queue *txq)
{
	spinlock_t *root_lock = qdisc_lock(q);
	bool contended;
	int rc;
 
	qdisc_pkt_len_init(skb);
	qdisc_calculate_pkt_len(skb, q);
	/*
	 * Heuristic to force contended enqueues to serialize on a
	 * separate lock before trying to get qdisc main lock.
	 * This permits __QDISC___STATE_RUNNING owner to get the lock more
	 * often and dequeue packets faster.
	 */
	contended = qdisc_is_running(q);  //判断qdisc是否运行
	if (unlikely(contended))
		spin_lock(&q->busylock);
 
	spin_lock(root_lock);
	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
		kfree_skb(skb);
		rc = NET_XMIT_DROP;
	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&  //qisc没有运行,且没有缓存报文,则直接可以发送报文
		   qdisc_run_begin(q)) {
		/*
		 * This is a work-conserving queue; there are no old skbs
		 * waiting to be sent out; and the qdisc is not running -
		 * xmit the skb directly.
		 */
 
		qdisc_bstats_update(q, skb);
 
		if (sch_direct_xmit(skb, q, dev, txq, root_lock, true)) {
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);       //sch_direct_xmit返回为正值,说明qdisc中有报文待发送,尝试发送缓冲区报文
		} else
			qdisc_run_end(q); //正常发送完成,qdisc停止运行
 
		rc = NET_XMIT_SUCCESS;
	} else {
		rc = q->enqueue(skb, q) & NET_XMIT_MASK;   //qdisc running或者有缓存报文, 则把报文发动qdisc队列中
		if (qdisc_run_begin(q)) {         //尝试启动qdisc,如果qisc成功启动,则尝试发送报文
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);       //发送qdisc缓冲队列中的报文
		}
	}
	spin_unlock(root_lock);
	if (unlikely(contended))
		spin_unlock(&q->busylock);
	return rc;
}

2、__qdisc_run 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void __qdisc_run(struct Qdisc *q)
{
	int quota = weight_p;
	int packets;
 
	while (qdisc_restart(q, &packets)) {  //循环发送报文
		/*
		 * Ordered by possible occurrence: Postpone processing if
		 * 1. we've exceeded packet quota
		 * 2. another process needs the CPU;
		 */
		quota -= packets;
		if (quota <= 0 || need_resched()) {    //如果配额或需要调度,则触发软中断后退出
			__netif_schedule(q);
			break;
		}
	}
 
	qdisc_run_end(q); //qdisc停止
}

3、qdisc_restart函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static inline int qdisc_restart(struct Qdisc *q, int *packets)
{
	struct netdev_queue *txq;
	struct net_device *dev;
	spinlock_t *root_lock;
	struct sk_buff *skb;
	bool validate;
 
	/* Dequeue packet */
	skb = dequeue_skb(q, &validate, packets); //从缓存区中得到待发送的报文,因为流量限制原因,就算缓冲区有报文,也可能返回NULL
	if (unlikely(!skb))
		return 0;
 
	root_lock = qdisc_lock(q);
	dev = qdisc_dev(q);
	txq = skb_get_tx_queue(dev, skb);
 
	return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);    //发送报文
}

4、dequeue_skb函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
				   int *packets)
{
	struct sk_buff *skb = q->gso_skb;
	const struct netdev_queue *txq = q->dev_queue;
 
	*packets = 1;
	*validate = true;
	if (unlikely(skb)) {
		/* check the reason of requeuing without tx lock first */
		txq = skb_get_tx_queue(txq->dev, skb);
		if (!netif_xmit_frozen_or_stopped(txq)) {
			q->gso_skb = NULL;
			q->q.qlen--;
		} else
			skb = NULL;
		/* skb in gso_skb were already validated */
		*validate = false;
	} else {
		if (!(q->flags & TCQ_F_ONETXQUEUE) ||
			!netif_xmit_frozen_or_stopped(txq)) {
			skb = q->dequeue(q);           //调用qdisc的dequeue函数获取skb
			if (skb && qdisc_may_bulk(q))<span style="white-space:pre">     </span>//如果还能继续获取skb,则一次性获取多个skb
				try_bulk_dequeue_skb(q, skb, txq, packets);
		}
	}
	return skb;
}

net_tx_action

net_tx_action为报文发送软中断,在处理报文发送软中断时,尝试该CPU softnet_data上所有qdisc发送报文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static void net_tx_action(struct softirq_action *h)
{
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
 
	if (sd->completion_queue) {
		struct sk_buff *clist;
 
		local_irq_disable();
		clist = sd->completion_queue;
		sd->completion_queue = NULL;
		local_irq_enable();
 
		while (clist) {
			struct sk_buff *skb = clist;
			clist = clist->next;
 
			WARN_ON(atomic_read(&skb->users));
			if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
				trace_consume_skb(skb);
			else
				trace_kfree_skb(skb, net_tx_action);
			__kfree_skb(skb);
		}
	}
 
	if (sd->output_queue) {
		struct Qdisc *head;
 
		local_irq_disable();
		head = sd->output_queue;
		sd->output_queue = NULL;
		sd->output_queue_tailp = &sd->output_queue;
		local_irq_enable();
 
		while (head) {
			struct Qdisc *q = head;
			spinlock_t *root_lock;
 
			head = head->next_sched;
 
			root_lock = qdisc_lock(q);
			if (spin_trylock(root_lock)) {
				smp_mb__before_atomic();
				clear_bit(__QDISC_STATE_SCHED,
					  &q->state);
				qdisc_run(q);         //尝试启动qdisc发送报文
				spin_unlock(root_lock);
			} else {
				if (!test_bit(__QDISC_STATE_DEACTIVATED,
						  &q->state)) {
					__netif_reschedule(q);
				} else {
					smp_mb__before_atomic();
					clear_bit(__QDISC_STATE_SCHED,
						  &q->state);
				}
			}
		}
	}
}

netmap 介绍及使用

https://blog.csdn.net/fengfengdiandia/article/details/52869290

https://blog.csdn.net/liyu123__/article/details/80853150

https://www.cnblogs.com/ne-liqian/p/9294757.html

https://wenku.baidu.com/view/af41b0f065ce05087632137a

netmap官网:http://info.iet.unipi.it/~luigi/netmap/

netmap的githab网址:https://github.com/luigirizzo/netmap

netmap是一个高效的收发报文的 I/O 框架,已经集成在 FreeBSD 的内部了。 当然,也可以在 Linux 下编译使用 。

一、架构

现在的网卡都使用多个 buffer 来发送和接收 packet,并有一个叫NIC ring的环形数组。

NIC ring 是静态分配的,它的槽指向mbufs链的部分缓冲区。

netmap 内存映射网卡的packet buffer到用户态,实现了自己的发送和接收报文的circular ring来对应网卡的 ring,使用 netmap 时,程序运行在用户态,即使出了问题也不会 crash 操作系统。

下图显示了一个接口可以有多个 netmap ring。

将文件描述符绑定到 NIC 时,应用程序可以选择将所有 ring或仅一个 ring附加到文件描述符。

使用所有 ring,相同的代码可以用于单队列或多队列 NIC。

使用一个 ring,可以通过每个 ring 一个进程/CPU core 来构建高性能系统,从而在系统中并行。

netmap 使用poll等待网卡的文件描述符可接收或可发送。

netmap 会建立一个字符设备/dev/netmap,然后通过nm_open来注册网卡为 netmap 模式。

  • 注意:这里顺便提一下,网卡进入 netmap 模式后,ifconfig 是看不到网卡统计信息变化的,wireshark 也抓不到报文,因为协议栈被旁路了。

内存映射的区域里面,有网卡的收发队列,这样可以通过将接收缓冲区的地址写在发送的 ring 里面实现零拷贝(Zero-copy)。

二、性能

netmap 官网说在 10GigE 上测试,发包速率可以达到 14.88Mpps,收包的速率和发包相近。同时还支持多网卡队列。

三、编译安装

1
2
3
4
5
6
7
8
9
10
11
git clone https://github.com/luigirizzo/netmap.git
git clone https://github.com/abcdxyzk/netmap.git

cd netmap/LINUX
./configure --drivers=ixgbe --kernel-sources=/usr/src/linux-headers-4.15.18/ --kernel-dir=/usr/src/linux-headers-4.15.18/

rmmod ixgbe

insmod netmap.ko
insmod ixgbe/ixgbe.ko

四、发送、接收

发送

1
./build-apps/pkt-gen/pkt-gen -i enp3s0 -f tx -c 1 -p 1 -z -d 12.0.0.100:80

接收

1
gcc rcv.c -I../sys

cat rcv.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include <poll.h>
 
#define NETMAP_WITH_LIBS
#include <net/netmap_user.h>
 
unsigned long pps = 0;
 
static void receive_packets(struct netmap_ring *ring)
{
	int i;
	char *buf;
 
	while (!nm_ring_empty(ring)) {
		i   = ring->cur;
		buf = NETMAP_BUF(ring, ring->slot[i].buf_idx);
		pps++;

		ring->head = ring->cur = nm_ring_next(ring, i); 
	}
}
 
int main(void)
{
	struct nm_desc *d;
	struct pollfd fds;
	struct netmap_ring *ring;
	int i; 
 
	d = nm_open("netmap:eth1", NULL, 0, 0); 
  
	fds.fd     = d->fd;
	fds.events = POLLIN;
 
	while (1) {
		if (poll(&fds, 1, 1) < 0) {
			perror("poll()");
			exit(1);
		}
 
		for (i = d->first_rx_ring; i <= d->last_rx_ring; i++) {
			ring = NETMAP_RXRING(d->nifp, i);
			receive_packets(ring);
		}
	}

	return 0;
}