kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

reuseport使用

https://www.cnblogs.com/Anker/p/7076537.html

SO_REUSEPORT解决了什么问题

SO_REUSEPORT支持多个进程或者线程绑定到同一端口,提高服务器程序的性能,解决的问题:

允许多个套接字 bind()/listen() 同一个TCP/UDP端口
每一个线程拥有自己的服务器套接字
在服务器套接字上没有了锁的竞争

内核层面实现负载均衡

安全层面,监听同一个端口的套接字只能位于同一个用户下面

其核心的实现主要有三点:

扩展 socket option,增加 SO_REUSEPORT 选项,用来设置 reuseport。

修改 bind 系统调用实现,以便支持可以绑定到相同的 IP 和端口

修改处理新建连接的实现,查找 listener 的时候,能够支持在监听相同 IP 和端口的多个 sock 之间均衡选择。

有了SO_RESUEPORT后,每个进程可以自己创建socket、bind、listen、accept相同的地址和端口,各自是独立平等的。让多进程监听同一个端口,各个进程中accept socket fd不一样,有新连接建立时,内核只会唤醒一个进程来accept,并且保证唤醒的均衡性。

可优化 ??

___inet_lookup_listener -> compute_score() 中,每个cpu只选特定的sk,这样能减少锁竞争和cache吗 ???

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#define _GNU_SOURCE

#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <sys/wait.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <fcntl.h>


#include <sched.h>
#include <pthread.h>

#include <netinet/tcp.h>

#define IP        "192.168.3.6"
#define PORT      80
#define WORKER        8
#define MAXLINE       4096


int worker(int i)
{
	struct sockaddr_in address;
	bzero(&address, sizeof(address));
	address.sin_family = AF_INET;
	inet_pton( AF_INET, IP, &address.sin_addr);
	address.sin_port = htons(PORT);

	pid_t pid = getpid();
	unsigned cc = 0;
	cpu_set_t mask;
	CPU_ZERO(&mask);
	CPU_SET(i, &mask);

	printf("pid=%d %d\n", pid, i);
	if (sched_getaffinity(pid, sizeof(mask), &mask) < 0) {
		printf("sched_getaffinity err\n");
	}

	int listenfd = socket(PF_INET, SOCK_STREAM, 0);
	assert(listenfd >= 0);

	int val =1;
	/*set SO_REUSEPORT*/
	if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &val, sizeof(val)) < 0) {
		perror("setsockopt()");
	}

	val = 1000 + i;
	if (setsockopt(listenfd, SOL_TCP, TCP_MAXSEG, &val, sizeof(val))<0) {
		perror("setsockopt()");
	}

	int ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
	assert(ret != -1);

	ret = listen(listenfd, 5);
	assert(ret != -1);
	while (1) {
		cc ++;
		if (cc % 100 == 0)
			printf("thread=%d cc=%d\n", i, cc);
//        printf("I am worker %d, begin to accept connection.\n", i);
		struct sockaddr_in client_addr;
		socklen_t client_addrlen = sizeof( client_addr );
		int connfd = accept(listenfd, (struct sockaddr*)&client_addr, &client_addrlen);
		if (connfd != -1) {
//            printf("worker %d accept a connection success. ip:%s, prot:%d\n", i, inet_ntoa(client_addr.sin_addr), client_addr.sin_port);
		} else {
//            printf("worker %d accept a connection failed,error:%s", i, strerror(errno));
		}
		char buffer[MAXLINE];
//        int nbytes = read(connfd, buffer, MAXLINE);
//        printf("read from client is:%s\n", buffer);
//        write(connfd, buffer, nbytes);
		close(connfd);
	}
	return 0;
}

int main()
{
	int i = 0;
	for (i = 0; i < WORKER; i++) {
		printf("Create worker %d\n", i);
		pid_t pid = fork();
		/*child  process */
		if (pid == 0) {
			worker(i);
		}
		if (pid < 0) {
			printf("fork error");
		}
	}
	/*wait child process*/
	while (wait(NULL) != 0)
		;
	if (errno == ECHILD) {
		fprintf(stderr, "wait error:%s\n", strerror(errno));
	}
	return 0;
}

veth虚拟网络设备的qdisc

http://hustcat.github.io/veth/

http://hustcat.github.io/vlan-performance-problem/

veth设备qdisc队列,而环回设备/桥接设备是没qdisc队列的,参考br_dev_setup函数。

内核实现

在注册(创建)设备时,qdisc设置为noop_qdisc, register_netdevice -> dev_init_scheduler

1
2
3
4
5
6
7
8
void dev_init_scheduler(struct net_device *dev)
{
	dev->qdisc = &noop_qdisc;
	netdev_for_each_tx_queue(dev, dev_init_scheduler_queue, &noop_qdisc);
	dev_init_scheduler_queue(dev, &dev->rx_queue, &noop_qdisc);

	setup_timer(&dev->watchdog_timer, dev_watchdog, (unsigned long)dev);
}

打开设备时,如果没有配置qdisc时,就指定为默认的pfifo_fast队列: dev_open -> dev_activate,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
void dev_activate(struct net_device *dev)
{
	int need_watchdog;

	/* No queueing discipline is attached to device;
	   create default one i.e. pfifo_fast for devices,
	   which need queueing and noqueue_qdisc for
	   virtual interfaces
	 */

	if (dev->qdisc == &noop_qdisc)
		attach_default_qdiscs(dev);
...
}

static void attach_default_qdiscs(struct net_device *dev)
{
	struct netdev_queue *txq;
	struct Qdisc *qdisc;

	txq = netdev_get_tx_queue(dev, 0);

	if (!netif_is_multiqueue(dev) || dev->tx_queue_len == 0) {
		netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL);
		dev->qdisc = txq->qdisc_sleeping;
		atomic_inc(&dev->qdisc->refcnt);
	} else {///multi queue
		qdisc = qdisc_create_dflt(dev, txq, &mq_qdisc_ops, TC_H_ROOT);
		if (qdisc) {
			qdisc->ops->attach(qdisc);
			dev->qdisc = qdisc;
		}
	}
}

static void attach_one_default_qdisc(struct net_device *dev,
				     struct netdev_queue *dev_queue,
				     void *_unused)
{
	struct Qdisc *qdisc;

	if (dev->tx_queue_len) {
		qdisc = qdisc_create_dflt(dev, dev_queue,
					  &pfifo_fast_ops, TC_H_ROOT);
		if (!qdisc) {
			printk(KERN_INFO "%s: activation failed\n", dev->name);
			return;
		}

		/* Can by-pass the queue discipline for default qdisc */
		qdisc->flags |= TCQ_F_CAN_BYPASS;
	} else {
		qdisc =  &noqueue_qdisc;
	}
	dev_queue->qdisc_sleeping = qdisc;
}

创建noqueue

开始尝试直接删除设备默认的pfifo_fast队列,发现会出错:

1
2
3
4
5
6
# tc qdisc del dev vethd4ea root
RTNETLINK answers: No such file or directory
# tc  -s qdisc ls dev vethd4ea
qdisc pfifo_fast 0: root refcnt 2 bands 3 priomap  1 2 2 2 1 2 0 0 1 1 1 1 1 1 1 1
 Sent 29705382 bytes 441562 pkt (dropped 0, overlimits 0 requeues 0) 
 backlog 0b 0p requeues 0 

后来看到Jesper Brouer给出一个替换默认队列的方式,尝试了一下,成功完成。

替换默认的qdisc队列

1
2
3
4
5
6
7
8
# tc qdisc replace dev vethd4ea root pfifo limit 100
# tc  -s qdisc ls dev vethd4ea                      
qdisc pfifo 8001: root refcnt 2 limit 100p
 Sent 264 bytes 4 pkt (dropped 0, overlimits 0 requeues 0) 
 backlog 0b 0p requeues 0 
# ip link show vethd4ea
9: vethd4ea: <BROADCAST,UP,LOWER_UP> mtu 1500 qdisc pfifo master docker0 state UP mode DEFAULT qlen 1000
link/ether 3a:15:3b:e1:d7:6d brd ff:ff:ff:ff:ff:ff

修改队列长度

1
# ifconfig vethd4ea txqueuelen 0

删除qdisc

1
2
3
4
# tc qdisc del dev vethd4ea root                    
# ip link show vethd4ea                
9: vethd4ea: <BROADCAST,UP,LOWER_UP> mtu 1500 qdisc noqueue master docker0 state UP mode DEFAULT 
link/ether 3a:15:3b:e1:d7:6d brd ff:ff:ff:ff:ff:ff

可以看到,UP的veth设备成功修改成noqueue。

qdisc 的创建过程

http://blog.chinaunix.net/uid-26902809-id-4106161.html

register_netdevice会初始化netdev的Tx调度discipline, 缺省使用noop_qdisc

1
2
3
4
5
6
7
8
9
10
11
12
13
register_netdevice
--->dev_init_scheduler


void dev_init_scheduler(struct net_device *dev)
{
	dev->qdisc = &noop_qdisc; //缺省为设备配置noop_qdisc
	netdev_for_each_tx_queue(dev, dev_init_scheduler_queue, &noop_qdisc); //缺省为每个队列配置noop_qdisc
	if (dev_ingress_queue(dev))
		dev_init_scheduler_queue(dev, dev_ingress_queue(dev), &noop_qdisc);

	setup_timer(&dev->watchdog_timer, dev_watchdog, (unsigned long)dev);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
dev_open
--->__dev_open
---->dev_activate
---->attach_default_qdiscs
---->attach_one_default_qdisc
为单队列的设备创建pfifo_fast的qdisc

static void attach_one_default_qdisc(struct net_device *dev,
				struct netdev_queue *dev_queue,
				void *_unused)
{
	struct Qdisc *qdisc = &noqueue_qdisc;

	if (dev->tx_queue_len) {
		qdisc = qdisc_create_dflt(dev_queue,
				&pfifo_fast_ops, TC_H_ROOT);
		if (!qdisc) {
			netdev_info(dev, "activation failed\n");
			return;
		}
	}
	dev_queue->qdisc_sleeping = qdisc;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
dev_open
--->__dev_open
---->dev_activate
---->attach_default_qdiscs
---->qdisc_create_dflt
为多队列的设备创建mq_qdisc, 创建完mq_qdisc, 接着调用mq_qdisc_ops->mq_init函数为每个队列创建pfifo_fast_ops的qdisc

struct Qdisc *qdisc_create_dflt(struct netdev_queue *dev_queue,
			struct Qdisc_ops *ops, unsigned int parentid)
{
	struct Qdisc *sch;

	sch = qdisc_alloc(dev_queue, ops);
	if (IS_ERR(sch))
		goto errout;
	sch->parent = parentid;

	if (!ops->init || ops->init(sch, NULL) == 0)
		return sch;

	qdisc_destroy(sch);
errout:
	return NULL;
}
EXPORT_SYMBOL(qdisc_create_dflt);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
dev_open
--->__dev_open
---->dev_activate
---->attach_default_qdiscs
static void attach_default_qdiscs(struct net_device *dev)
{
	struct netdev_queue *txq;
	struct Qdisc *qdisc;

	txq = netdev_get_tx_queue(dev, 0);

	if (!netif_is_multiqueue(dev) || dev->tx_queue_len == 0) {
		netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL);
		dev->qdisc = txq->qdisc_sleeping;
		atomic_inc(&dev->qdisc->refcnt);
	} else {
		qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);
		if (qdisc) {
			qdisc->ops->attach(qdisc);
			dev->qdisc = qdisc;
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
dev_open函数会调用dev_activate:
a. 为单队列的设备创建pfifo_fast的qdisc
b. 为多队列的设备创建mq_qdisc, 创建完mq_qdisc, 接着调用mq_qdisc_ops->mq_init函数为每个队列创建pfifo_fast_ops的qdisc
dev_open
--->__dev_open
---->dev_activate

void dev_activate(struct net_device *dev)
{
	int need_watchdog;

	/* No queueing discipline is attached to device;
	   create default one i.e. pfifo_fast for devices,
	   which need queueing and noqueue_qdisc for
	   virtual interfaces
	*/

	if (dev->qdisc == &noop_qdisc)
		attach_default_qdiscs(dev);

	if (!netif_carrier_ok(dev))
	/* Delay activation until next carrier-on event */
		return;

	need_watchdog = 0;
	netdev_for_each_tx_queue(dev, transition_one_qdisc, &need_watchdog);
	if (dev_ingress_queue(dev))
		transition_one_qdisc(dev, dev_ingress_queue(dev), NULL);

	if (need_watchdog) {
		dev->trans_start = jiffies;
		dev_watchdog_up(dev);
	}
}

qdisc实现分析

1
2
3
4
tc qdisc show
echo pfifo > /proc/sys/net/core/default_qdisc
tc qdisc add dev eth0 root pfifo
tc qdisc del dev eth0 root

https://github.com/liucimin/Learning/blob/master/linux%E7%BD%91%E7%BB%9C%E7%9B%B8%E5%85%B3/Tc%20%E7%BD%91%E5%8D%A1%E5%A4%9A%E9%98%9F%E5%88%97%E6%97%B6%E6%AF%8F%E4%B8%AA%E9%98%9F%E5%88%97%E9%85%8D%E7%BD%AE%E5%85%AC%E5%B9%B3%E9%98%9F%E5%88%97sfq.md

http://man7.org/linux/man-pages/man8/tc-fq_codel.8.html


https://blog.csdn.net/one_clouder/article/details/52685249

二层发送中,实现qdisc的主要函数是 __dev_xmit_skb 和 net_tx_action,本篇将分析qdisc实现的原理,但是不涉及qdisc内部的算法,仅对框架进行分析。

1、__dev_xmit_skb 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
				 struct net_device *dev,
				 struct netdev_queue *txq)
{
	spinlock_t *root_lock = qdisc_lock(q);
	bool contended;
	int rc;
 
	qdisc_pkt_len_init(skb);
	qdisc_calculate_pkt_len(skb, q);
	/*
	 * Heuristic to force contended enqueues to serialize on a
	 * separate lock before trying to get qdisc main lock.
	 * This permits __QDISC___STATE_RUNNING owner to get the lock more
	 * often and dequeue packets faster.
	 */
	contended = qdisc_is_running(q);  //判断qdisc是否运行
	if (unlikely(contended))
		spin_lock(&q->busylock);
 
	spin_lock(root_lock);
	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
		kfree_skb(skb);
		rc = NET_XMIT_DROP;
	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&  //qisc没有运行,且没有缓存报文,则直接可以发送报文
		   qdisc_run_begin(q)) {
		/*
		 * This is a work-conserving queue; there are no old skbs
		 * waiting to be sent out; and the qdisc is not running -
		 * xmit the skb directly.
		 */
 
		qdisc_bstats_update(q, skb);
 
		if (sch_direct_xmit(skb, q, dev, txq, root_lock, true)) {
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);       //sch_direct_xmit返回为正值,说明qdisc中有报文待发送,尝试发送缓冲区报文
		} else
			qdisc_run_end(q); //正常发送完成,qdisc停止运行
 
		rc = NET_XMIT_SUCCESS;
	} else {
		rc = q->enqueue(skb, q) & NET_XMIT_MASK;   //qdisc running或者有缓存报文, 则把报文发动qdisc队列中
		if (qdisc_run_begin(q)) {         //尝试启动qdisc,如果qisc成功启动,则尝试发送报文
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);       //发送qdisc缓冲队列中的报文
		}
	}
	spin_unlock(root_lock);
	if (unlikely(contended))
		spin_unlock(&q->busylock);
	return rc;
}

2、__qdisc_run 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void __qdisc_run(struct Qdisc *q)
{
	int quota = weight_p;
	int packets;
 
	while (qdisc_restart(q, &packets)) {  //循环发送报文
		/*
		 * Ordered by possible occurrence: Postpone processing if
		 * 1. we've exceeded packet quota
		 * 2. another process needs the CPU;
		 */
		quota -= packets;
		if (quota <= 0 || need_resched()) {    //如果配额或需要调度,则触发软中断后退出
			__netif_schedule(q);
			break;
		}
	}
 
	qdisc_run_end(q); //qdisc停止
}

3、qdisc_restart函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static inline int qdisc_restart(struct Qdisc *q, int *packets)
{
	struct netdev_queue *txq;
	struct net_device *dev;
	spinlock_t *root_lock;
	struct sk_buff *skb;
	bool validate;
 
	/* Dequeue packet */
	skb = dequeue_skb(q, &validate, packets); //从缓存区中得到待发送的报文,因为流量限制原因,就算缓冲区有报文,也可能返回NULL
	if (unlikely(!skb))
		return 0;
 
	root_lock = qdisc_lock(q);
	dev = qdisc_dev(q);
	txq = skb_get_tx_queue(dev, skb);
 
	return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);    //发送报文
}

4、dequeue_skb函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
				   int *packets)
{
	struct sk_buff *skb = q->gso_skb;
	const struct netdev_queue *txq = q->dev_queue;
 
	*packets = 1;
	*validate = true;
	if (unlikely(skb)) {
		/* check the reason of requeuing without tx lock first */
		txq = skb_get_tx_queue(txq->dev, skb);
		if (!netif_xmit_frozen_or_stopped(txq)) {
			q->gso_skb = NULL;
			q->q.qlen--;
		} else
			skb = NULL;
		/* skb in gso_skb were already validated */
		*validate = false;
	} else {
		if (!(q->flags & TCQ_F_ONETXQUEUE) ||
			!netif_xmit_frozen_or_stopped(txq)) {
			skb = q->dequeue(q);           //调用qdisc的dequeue函数获取skb
			if (skb && qdisc_may_bulk(q))<span style="white-space:pre">     </span>//如果还能继续获取skb,则一次性获取多个skb
				try_bulk_dequeue_skb(q, skb, txq, packets);
		}
	}
	return skb;
}

net_tx_action

net_tx_action为报文发送软中断,在处理报文发送软中断时,尝试该CPU softnet_data上所有qdisc发送报文。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static void net_tx_action(struct softirq_action *h)
{
	struct softnet_data *sd = this_cpu_ptr(&softnet_data);
 
	if (sd->completion_queue) {
		struct sk_buff *clist;
 
		local_irq_disable();
		clist = sd->completion_queue;
		sd->completion_queue = NULL;
		local_irq_enable();
 
		while (clist) {
			struct sk_buff *skb = clist;
			clist = clist->next;
 
			WARN_ON(atomic_read(&skb->users));
			if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED))
				trace_consume_skb(skb);
			else
				trace_kfree_skb(skb, net_tx_action);
			__kfree_skb(skb);
		}
	}
 
	if (sd->output_queue) {
		struct Qdisc *head;
 
		local_irq_disable();
		head = sd->output_queue;
		sd->output_queue = NULL;
		sd->output_queue_tailp = &sd->output_queue;
		local_irq_enable();
 
		while (head) {
			struct Qdisc *q = head;
			spinlock_t *root_lock;
 
			head = head->next_sched;
 
			root_lock = qdisc_lock(q);
			if (spin_trylock(root_lock)) {
				smp_mb__before_atomic();
				clear_bit(__QDISC_STATE_SCHED,
					  &q->state);
				qdisc_run(q);         //尝试启动qdisc发送报文
				spin_unlock(root_lock);
			} else {
				if (!test_bit(__QDISC_STATE_DEACTIVATED,
						  &q->state)) {
					__netif_reschedule(q);
				} else {
					smp_mb__before_atomic();
					clear_bit(__QDISC_STATE_SCHED,
						  &q->state);
				}
			}
		}
	}
}

netmap 介绍及使用

https://blog.csdn.net/fengfengdiandia/article/details/52869290

https://blog.csdn.net/liyu123__/article/details/80853150

https://www.cnblogs.com/ne-liqian/p/9294757.html

https://wenku.baidu.com/view/af41b0f065ce05087632137a

netmap官网:http://info.iet.unipi.it/~luigi/netmap/

netmap的githab网址:https://github.com/luigirizzo/netmap

netmap是一个高效的收发报文的 I/O 框架,已经集成在 FreeBSD 的内部了。 当然,也可以在 Linux 下编译使用 。

一、架构

现在的网卡都使用多个 buffer 来发送和接收 packet,并有一个叫NIC ring的环形数组。

NIC ring 是静态分配的,它的槽指向mbufs链的部分缓冲区。

netmap 内存映射网卡的packet buffer到用户态,实现了自己的发送和接收报文的circular ring来对应网卡的 ring,使用 netmap 时,程序运行在用户态,即使出了问题也不会 crash 操作系统。

下图显示了一个接口可以有多个 netmap ring。

将文件描述符绑定到 NIC 时,应用程序可以选择将所有 ring或仅一个 ring附加到文件描述符。

使用所有 ring,相同的代码可以用于单队列或多队列 NIC。

使用一个 ring,可以通过每个 ring 一个进程/CPU core 来构建高性能系统,从而在系统中并行。

netmap 使用poll等待网卡的文件描述符可接收或可发送。

netmap 会建立一个字符设备/dev/netmap,然后通过nm_open来注册网卡为 netmap 模式。

  • 注意:这里顺便提一下,网卡进入 netmap 模式后,ifconfig 是看不到网卡统计信息变化的,wireshark 也抓不到报文,因为协议栈被旁路了。

内存映射的区域里面,有网卡的收发队列,这样可以通过将接收缓冲区的地址写在发送的 ring 里面实现零拷贝(Zero-copy)。

二、性能

netmap 官网说在 10GigE 上测试,发包速率可以达到 14.88Mpps,收包的速率和发包相近。同时还支持多网卡队列。

三、编译安装

1
2
3
4
5
6
7
8
9
10
11
git clone https://github.com/luigirizzo/netmap.git
git clone https://github.com/abcdxyzk/netmap.git

cd netmap/LINUX
./configure --drivers=ixgbe --kernel-sources=/usr/src/linux-headers-4.15.18/ --kernel-dir=/usr/src/linux-headers-4.15.18/

rmmod ixgbe

insmod netmap.ko
insmod ixgbe/ixgbe.ko

四、发送、接收

发送

1
./build-apps/pkt-gen/pkt-gen -i enp3s0 -f tx -c 1 -p 1 -z -d 12.0.0.100:80

接收

1
gcc rcv.c -I../sys

cat rcv.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#include <stdio.h>
#include <poll.h>
 
#define NETMAP_WITH_LIBS
#include <net/netmap_user.h>
 
unsigned long pps = 0;
 
static void receive_packets(struct netmap_ring *ring)
{
	int i;
	char *buf;
 
	while (!nm_ring_empty(ring)) {
		i   = ring->cur;
		buf = NETMAP_BUF(ring, ring->slot[i].buf_idx);
		pps++;

		ring->head = ring->cur = nm_ring_next(ring, i); 
	}
}
 
int main(void)
{
	struct nm_desc *d;
	struct pollfd fds;
	struct netmap_ring *ring;
	int i; 
 
	d = nm_open("netmap:eth1", NULL, 0, 0); 
  
	fds.fd     = d->fd;
	fds.events = POLLIN;
 
	while (1) {
		if (poll(&fds, 1, 1) < 0) {
			perror("poll()");
			exit(1);
		}
 
		for (i = d->first_rx_ring; i <= d->last_rx_ring; i++) {
			ring = NETMAP_RXRING(d->nifp, i);
			receive_packets(ring);
		}
	}

	return 0;
}