kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp
sar -n DEV 1

socket接收连接 sys_accept

http://linux.chinaunix.net/techdoc/net/

http://linux.chinaunix.net/techdoc/net/2008/12/30/1055672.shtml

这一节我们开始分析如何接收TCP的socket的连接请求,象以前的分析章节一样我们先看练习中的用户界面

1
accept(server_sockfd, (struct sockaddr *)&client_address, client_len);

还是以前的分析方法,这里要注意第二个参数,client_address,它是在我们的测试程序中另外声明用于保存客户端socket地址的数据结构变量。其他二个参数无需多说。还是按照以前的方式我们直接看sys_socketcall()函数的代码部分

1
2
3
4
case SYS_ACCEPT:
	err = sys_accept(a0, (struct sockaddr __user *)a1,
		 (int __user *)a[2]);
	break;

显然是进入sys_accept()这个函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
sys_socketcall()-->sys_accept()
asmlinkage long sys_accept(int fd, struct sockaddr __user *upeer_sockaddr,
			 int __user *upeer_addrlen)
{
	struct socket *sock, *newsock;
	struct file *newfile;
	int err, len, newfd, fput_needed;
	char address[MAX_SOCK_ADDR];
	sock = sockfd_lookup_light(fd, &err, &fput_needed);
	if (!sock)
		goto out;
	err = -ENFILE;
	if (!(newsock = sock_alloc()))
		goto out_put;
	newsock->type = sock->type;
	newsock->ops = sock->ops;
	/*
	 * We don't need try_module_get here, as the listening socket (sock)
	 * has the protocol module (sock->ops->owner) held.qinjian
	 */
	__module_get(newsock->ops->owner);
	newfd = sock_alloc_fd(&newfile);
	if (unlikely(newfd  0)) {
		err = newfd;
		sock_release(newsock);
		goto out_put;
	}
	err = sock_attach_fd(newsock, newfile);
	if (err  0)
		goto out_fd_simple;
	err = security_socket_accept(sock, newsock);
	if (err)
		goto out_fd;
	err = sock->ops->accept(sock, newsock, sock->file->f_flags);
	if (err  0)
		goto out_fd;
	if (upeer_sockaddr) {
		if (newsock->ops->getname(newsock, (struct sockaddr *)address,
					 &len, 2)  0) {
			err = -ECONNABORTED;
			goto out_fd;
		}
		err = move_addr_to_user(address, len, upeer_sockaddr,
					upeer_addrlen);
		if (err  0)
			goto out_fd;
	}
	/* File flags are not inherited via accept() unlike another OSes.QJ */
	fd_install(newfd, newfile);
	err = newfd;
	security_socket_post_accept(sock, newsock);
out_put:
	fput_light(sock->file, fput_needed);
out:
	return err;
out_fd_simple:
	sock_release(newsock);
	put_filp(newfile);
	put_unused_fd(newfd);
	goto out_put;
out_fd:
	fput(newfile);
	put_unused_fd(newfd);
	goto out_put;
}

这个函数总的作用就是使服务端的socket能够创建与客户端连接的“子连接”,也就是会利用服务器端的socket创建一个新的能与客户端建立连接的socket,而且会把新连接的socket的id号,返回到我们测试程序中的client_sockfd,同时也把客户端的socket地址保存在client_address中,函数中首先会进入sockfd_lookup_light()中找到我们服务器端的socket,这个函数前面章节中用到多次了不再进入细细分析了,接着函数中调用sock_alloc()函数创建一个新的socket,此后为这个新创建的socket分配一个可用的文件号,然后能过sock_attach_fd使其与文件号挂钩。最重要的当属这句代码

1
err = sock->ops->accept(sock, newsock, sock->file->f_flags);

这部分开始入手分析TCP的socket是如何执行的,这里会进入inet_stream_ops中执行,可能有些朋友是直接阅读本文的,最好是看一下前面的章节理清是如何进入这个函数的,我们这里不再重复了。

1
2
3
4
5
const struct proto_ops inet_stream_ops = {
	。。。。。。
	.accept         = inet_accept,
	。。。。。。
};

我们再次看一下af_inet.c中的这个数据结构,很显然进入了inet_accept()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sys_socketcall()-->sys_accept()-->inet_accept()
int inet_accept(struct socket *sock, struct socket *newsock, int flags)
{
	struct sock *sk1 = sock->sk;
	int err = -EINVAL;
	struct sock *sk2 = sk1->sk_prot->accept(sk1, flags, &err);
	if (!sk2)
		goto do_err;
	lock_sock(sk2);
	BUG_TRAP((1  sk2->sk_state) &
		 (TCPF_ESTABLISHED | TCPF_CLOSE_WAIT | TCPF_CLOSE));
	sock_graft(sk2, newsock);
	newsock->state = SS_CONNECTED;
	err = 0;
	release_sock(sk2);
do_err:
	return err;
}

进入这个函数的时候已经找到了我们前面建立的socket结构,而newsock是我们新分配建立的socket结构,我们看到上面函数中执行了

1
struct sock *sk2 = sk1->sk_prot->accept(sk1, flags, &err);

进而进入了钩子函数中执行,那里的struct proto tcp_prot结构变量可以看到

1
2
3
4
5
struct proto tcp_prot = {
	。。。。。。
	.accept            = inet_csk_accept,
	。。。。。。
};

很显然是执行的inet_csk_accept()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
sys_socketcall()-->sys_accept()-->inet_accept()-->inet_csk_accept()
struct sock *inet_csk_accept(struct sock *sk, int flags, int *err)
{
	struct inet_connection_sock *icsk = inet_csk(sk);
	struct sock *newsk;
	int error;
	lock_sock(sk);
	/* We need to make sure that this socket is listening,
	 * and that it has something pending.qinjian
	 */
	error = -EINVAL;
	if (sk->sk_state != TCP_LISTEN)
		goto out_err;
	/* Find already established connection */
	if (reqsk_queue_empty(&icsk->icsk_accept_queue)) {
		long timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
		/* If this is a non blocking socket don't sleep */
		error = -EAGAIN;
		if (!timeo)
			goto out_err;
		error = inet_csk_wait_for_connect(sk, timeo);
		if (error)
			goto out_err;
	}
	newsk = reqsk_queue_get_child(&icsk->icsk_accept_queue, sk);
	BUG_TRAP(newsk->sk_state != TCP_SYN_RECV);
out:
	release_sock(sk);
	return newsk;
out_err:
	newsk = NULL;
	*err = error;
	goto out;
}

象往常叙述的一样首先是在sock中取得struct inet_connection_sock结构,然后判断一下sock的状态是否已经处于监听状态,如果没有处于监听状态的话就不能接收了,只好出错返回了。接着是检查icsk中的icsk_accept_queue请求队列是否为空,因为我们练习中还未启动客户端程序,所以此时还没有连接请求到来,这个队列现在是空的,所以进入if语句,sock_rcvtimeo()是根据是否允许“阻塞”即等待,而取得sock结构中的sk_rcvtimeo时间值,然后根据这个值进入inet_csk_wait_for_connect()函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
sys_socketcall()-->sys_accept()-->inet_accept()-->inet_csk_accept()-->inet_csk_wait_for_connect()
static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
	struct inet_connection_sock *icsk = inet_csk(sk);
	DEFINE_WAIT(wait);
	int err;
	/*
	 * True wake-one mechanism for incoming connections: only
	 * one process gets woken up, not the 'whole herd'.
	 * Since we do not 'race & poll' for established sockets
	 * anymore, the common case will execute the loop only once.
	 *
	 * Subtle issue: "add_wait_queue_exclusive()" will be added
	 * after any current non-exclusive waiters, and we know that
	 * it will always _stay_ after any new non-exclusive waiters
	 * because all non-exclusive waiters are added at the
	 * beginning of the wait-queue. As such, it's ok to "drop"
	 * our exclusiveness temporarily when we get woken up without
	 * having to remove and re-insert us on the wait queue.wumingxiaozu
	 */
	for (;;) {
		prepare_to_wait_exclusive(sk->sk_sleep, &wait,
					 TASK_INTERRUPTIBLE);
		release_sock(sk);
		if (reqsk_queue_empty(&icsk->icsk_accept_queue))
			timeo = schedule_timeout(timeo);
		lock_sock(sk);
		err = 0;
		if (!reqsk_queue_empty(&icsk->icsk_accept_queue))
			break;
		err = -EINVAL;
		if (sk->sk_state != TCP_LISTEN)
			break;
		err = sock_intr_errno(timeo);
		if (signal_pending(current))
			break;
		err = -EAGAIN;
		if (!timeo)
			break;
	}
	finish_wait(sk->sk_sleep, &wait);
	return err;
}

函数首先是调用了宏来声明一个等待队列

1
2
3
4
5
6
#define DEFINE_WAIT(name)                                \
wait_queue_t name = {                                    \
	.private      = current,                             \
	.func         = autoremove_wake_function,            \
	.task_list    = LIST_HEAD_INIT((name).task_list),    \
}

关于等待队列的具体概念我们留在以后专门的章节中论述,这里可以看出是根据当前进程而建立的名为wait的等待队列,接着函数中调用了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sys_socketcall()-->sys_accept()-->inet_accept()-->inet_csk_accept()-->inet_csk_wait_for_connect()-->prepare_to_wait_exclusive()
void
prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
	unsigned long flags;
	wait->flags |= WQ_FLAG_EXCLUSIVE;
	spin_lock_irqsave(&q->lock, flags);
	if (list_empty(&wait->task_list))
		__add_wait_queue_tail(q, wait);
	/*
	 * don't alter the task state if this is just going to
	  * queue an async wait queue callback wumingxiaozu
	 */
	if (is_sync_wait(wait))
		set_current_state(state);
	spin_unlock_irqrestore(&q->lock, flags);
}

接着要把这里创建的wait,即当前进程的这里的等待队列挂入sk中的sk_sleep队列,这样我们可以理解到多个进程都可以对一个socket并发的连接,这个函数与我们所说的等待队列部分内容是密切相关的,我们只简单的叙述一下,函数中主要是将我们上面建立的等待队列插入到这里的sock结构中的sk_sleep所指定的等待队列头中,此后再次调用reqsk_queue_empty()函数检查一下icsk_accept_queue是否为空,如果还为空就说明没有连接请求到来,开始睡眠等待了,schedule_timeout()这个函数与时钟密切相关,所以请朋友们参考其他资料,这里是根据我们上面得到的定时时间来进入睡眠的。

当从这个函数返回时,再次锁住sock防止其他进程打扰,然后这里还是判断一下icsk_accept_queue是否为空,如果还为空的话就要跳出for循环了,醒来后还要检查一下是否是因为信号而醒来的,如果有信号就要处理信号signal_pending(),最后如果睡眠的时间已经用完了也会跳出循环,跳出循环后就要将这里的等待队列从sock中的sk_sleep中摘链。

我们回到inet_csk_accept()函数中继续往下看,如果这时队列icsk_accept_queue不为空,即有连接请求到来怎么办呢,继续看下面的代码

1
newsk = reqsk_queue_get_child(&icsk->icsk_accept_queue, sk);

这里看到是进入了reqsk_queue_get_child函数中

1
2
3
4
5
6
7
8
9
10
11
sys_socketcall()-->sys_accept()-->inet_accept()-->inet_csk_accept()-->reqsk_queue_get_child()
static inline struct sock *reqsk_queue_get_child(struct request_sock_queue *queue,
						 struct sock *parent)
{
	struct request_sock *req = reqsk_queue_remove(queue);
	struct sock *child = req->sk;
	BUG_TRAP(child != NULL);
	sk_acceptq_removed(parent);
	__reqsk_free(req);
	return child;
}

函数中首先是调用了reqsk_queue_remove()从队列中摘下一个已经到来的request_sock结构

1
2
3
4
5
6
7
8
9
10
sys_socketcall()-->sys_accept()-->inet_accept()-->inet_csk_accept()-->reqsk_queue_get_child()-->reqsk_queue_remove()
static inline struct request_sock *reqsk_queue_remove(struct request_sock_queue *queue)
{
	struct request_sock *req = queue->rskq_accept_head;
	BUG_TRAP(req != NULL);
	queue->rskq_accept_head = req->dl_next;
	if (queue->rskq_accept_head == NULL)
		queue->rskq_accept_tail = NULL;
	return req;
}

很明显上面函数中是从队列的rskq_accept_head摘下一个已经到来的request_sock这个结构是从客户端请求连接时挂入的,reqsk_queue_get_child()函数在这里把request_sock中载运的sock结构返回到inet_csk_accept中的局部变量newsk使用。而sk_acceptq_removed是递减我们服务器端sock中的sk_ack_backlog。

然后__reqsk_free释放掉request_sock结构。回到inet_csk_accept函数中,然后返回我们间接从icsk->icsk_accept_queue队列中获得了与客户端密切相关的sock结构。这个与客户端密切相关的结构是由我们服务器端在响应底层驱动的数据包过程中建立的,我们将在后边讲解完客户端的连接请求把这一过程补上,这里假设我们已经接收到了客户端的数据包并且服务器端为此专门建了这个与客户端数据包相联系的sock结构,接着返回到inet_accept()函数中,接着调用sock_graft()函数,注意参数sock_graft(sk2, newsock);sk2是我们上边叙述的与客户端密切相关的sock结构,是从接收队列中获得的。

而newsock,则是我们服务器端为了这个代表客户端的sock结构而准备的新的socket。我们以前说过,socket结构在具体应用上分为二部分,另一部分是这里的sock结构,因为sock是与具体的协议即以前所说的规程的相关,所以变化比较大,而socket比较通用,所以我们上面通过socket_alloc()只是分配了通用部分的socket结构,并没有建立对应协议的sock结构,那么我们分配的新的socket的所需要的sock是从哪里来的呢,我们可以在代码中看到他是取的代表客户端的sock结构,与我们新建的socket挂入的,看一下这个关键的函数

1
2
3
4
5
6
7
8
9
10
sys_socketcall()-->sys_accept()-->inet_accept()-->sock_graft()
static inline void sock_graft(struct sock *sk, struct socket *parent)
{
	write_lock_bh(&sk->sk_callback_lock);
	sk->sk_sleep = &parent->wait;
	parent->sk = sk;
	sk->sk_socket = parent;
	security_sock_graft(sk, parent);
	write_unlock_bh(&sk->sk_callback_lock);
}

上面传递的参数是

1
sock_graft(sk2, newsock);

sk2是代表我们客户端的sock,newsock是我们服务器端的新socket,可以看出上面的sock_graft,graft是嫁接的意思,从函数面上就可以理解了,然后其内部就是将服务器端新建的socket与客户端的sock“挂钩了”,从此以后,这个socket就是服务器端与客户端通讯的桥梁了。这样回到上面的inet_accept函数时,我们看到将newsock->state = SS_CONNECTED;也就是状态改变成了连接状态,而以前的服务器的socket并没有任何的状态改变,那个socket继续覆行他的使命“孵化”新的socket。回到我们的sys_accept()函数中下面接着看,我们在练习中看到需要获得客户端的地址,在那个章节中我们又走到了

1
newsock->ops->getname(newsock, (struct sockaddr )address, &len, 2)

这要看我们在sys_accpet()函数中新创建的newsock的ops钩子结构了,很明显我们在sys_accept()函数中看到了newsock->ops = sock->ops;所以newsock是使用的已经建立的服务器端的inet_stream_ops结构变量,我们可以在这个结构中看到

1
2
3
4
5
const struct proto_ops inet_stream_ops = {
	。。。。。。
	.getname     = inet_getname,
	。。。。。。
};

因此进入了inet_getname()函数,这个函数在/net/ipv4/af_inet.c中的683行处。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sys_accept()-->inet_getname()
int inet_getname(struct socket *sock, struct sockaddr *uaddr,
			int *uaddr_len, int peer)
{
	struct sock *sk        = sock->sk;
	struct inet_sock *inet    = inet_sk(sk);
	struct sockaddr_in *sin    = (struct sockaddr_in *)uaddr;
	sin->sin_family = AF_INET;
	if (peer) {
		if (!inet->dport ||
		 (((1  sk->sk_state) & (TCPF_CLOSE | TCPF_SYN_SENT)) &&
		 peer == 1))
			return -ENOTCONN;
		sin->sin_port = inet->dport;
		sin->sin_addr.s_addr = inet->daddr;
	} else {
		__be32 addr = inet->rcv_saddr;
		if (!addr)
			addr = inet->saddr;
		sin->sin_port = inet->sport;
		sin->sin_addr.s_addr = addr;
	}
	memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
	*uaddr_len = sizeof(*sin);
	return 0;
}

在上面的代码中,关键的是这二句

1
2
sin->sin_port = inet->dport;
sin->sin_addr.s_addr = inet->daddr;

这里直接将我们练习中的准备接收的数组address转换成tcp的地址结构struct sockaddr_in指针,然后直接用上面二句赋值了,我们看到他是使用的我们刚刚提到的从icsk->icsk_accept_queue接收队列中得到的sock进而得到了inet_sock专用于INET的sock结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
struct inet_sock {
	/* sk and pinet6 has to be the first two members of inet_sock */
	struct sock        sk;
#if defined(CONFIG_IPV6) || defined(CONFIG_IPV6_MODULE)
	struct ipv6_pinfo    *pinet6;
#endif
	/* Socket demultiplex comparisons on incoming packets.wumingxiaozu */
	__be32               daddr;
	__be32               rcv_saddr;
	__be16               dport;
	__u16                num;
	__be32               saddr;
	__s16                uc_ttl;
	__u16                cmsg_flags;
	struct ip_options    *opt;
	__be16               sport;
	__u16                id;
	__u8                 tos;
	__u8                 mc_ttl;
	__u8                 pmtudisc;
	__u8                 recverr:1,
	                     is_icsk:1,
	                     freebind:1,
	                     hdrincl:1,
	                     mc_loop:1;
	int                  mc_index;
	__be32               mc_addr;
	struct ip_mc_socklist    *mc_list;
	struct {
		unsigned int        flags;
		unsigned int        fragsize;
		struct ip_options   *opt;
		struct dst_entry    *dst;
		int                 length; /* Total length of all frames */
		__be32              addr;
		struct flowi        fl;
	} cork;
};

这个结构中的头一个变量就是sock结构,所以这里直接将sock的地址做为inet_sock结构的开始是完全可以的,这也就是inet_sk()这个函数的主要作用

1
2
3
4
5
sys_accept()-->inet_getname()-->inet_sk()
static inline struct inet_sock *inet_sk(const struct sock *sk)
{
	return (struct inet_sock *)sk;
}

那么可能会有朋友问我们只是从icsk->icsk_accept_queue接收队列中间接得到了sock结构指针并没有看到inet_sock结构指针啊?请朋友们相信我们在后边叙述完了客户端的连接请求过程后会把这部分给补上的,所以这里的inet_sock肯定是在服务器的底层驱动相关的部分完成的,我们将在完成客户端的连接后分析这部分的关键内容。所以我们看到这里将inet_sock结构中的请求方即客户端的端口和地址间接设置进了应用程序的地址结构变量client_address就取得了客户端的地址,这个过程是在sys_accept()中使用

1
2
err = move_addr_to_user(address, len, upeer_sockaddr,
				upeer_addrlen);

将客户端的socket地址复制给我们的应用程序界面。我们上边已经通过inet_getname()函数复制客户端的地址到address数组中了,这样通过move_addr_to_user()函数后,我们程序界面上client_address就得到了客户端的socket地址。接着我们看到函数执行了fd_install()函数,即为新创建的socket分配一个文件号和file结构,有关没有详述的函数请朋友们参考深入理解LINUX内核第三版中的介绍,自己阅读暂且做为一种练习吧。 朋友们看到这里可以结合一下我们的地图,因为截止到现在我们都是围绕着地图中的服务器角度来分析的,接下来的章节我们将转换到客户端的角度来分析。

Receive packet steering patch详解

http://simohayha.iteye.com/blog/720850

Receive packet steering简称rps,是google贡献给linux kernel的一个patch,主要的功能是解决多核情况下,网络协议栈的软中断的负载均衡。这里的负载均衡也就是指能够将软中断均衡的放在不同的cpu核心上运行。

简介在这里:
http://lwn.net/Articles/362339/

linux现在网卡的驱动支持两种模式,一种是NAPI,一种是非NAPI模式,这两种模式的区别,我前面的blog都有介绍,这里就再次简要的介绍下。

在NAPI中,中断收到数据包后调用__napi_schedule调度软中断,然后软中断处理函数中会调用注册的poll回掉函数中调用netif_receive_skb将数据包发送到3层,没有进行任何的软中断负载均衡。

在非NAPI中,中断收到数据包后调用netif_rx,这个函数会将数据包保存到input_pkt_queue,然后调度软中断,这里为了兼容NAPI的驱动,他的poll方法默认是process_backlog,最终这个函数会从input_pkt_queue中取得数据包然后发送到3层。

通过比较我们可以看到,不管是NAPI还是非NAPI的话都无法做到软中断的负载均衡,因为软中断此时都是运行在在硬件中断相应的cpu上。也就是说如果始终是cpu0相应网卡的硬件中断,那么始终都是cpu0在处理软中断,而此时cpu1就被浪费了,因为无法并行的执行多个软中断。

google的这个patch的基本原理是这样的,根据数据包的源地址,目的地址以及目的和源端口(这里它是将两个端口组合成一个4字节的无符数进行计算的,后面会看到)计算出一个hash值,然后根据这个hash值来选择软中断运行的cpu,从上层来看,也就是说将每个连接和cpu绑定,并通过这个hash值,来均衡软中断在多个cpu上。

这个介绍比较简单,我们来看代码是如何实现的。

它这里主要是hook了两个内核的函数,一个是netif_rx主要是针对非NAPI的驱动,一个是netif_receive_skb这个主要是针对NAPI的驱动,这两个函数我前面blog都有介绍过,想了解可以看我前面的blog,现在这里我只介绍打过patch的实现。

在看netif_rx和netif_receive_skb之前,我们先来看这个patch中两个重要的函数get_rps_cpu和enqueue_to_backlog,我们一个个看。

先来看相关的两个数据结构,首先是netdev_rx_queue,它表示对应的接收队列,因为有的网卡可能硬件上就支持多队列的模式,此时对应就会有多个rx队列,这个结构是挂载在net_device中的,也就是每个网络设备最终都会有一个或者多个rx队列。这个结构在sys文件系统中的表示类似这样的/sys/class/net//queues/rx- 几个队列就是rx-n.

1
2
3
4
5
6
7
8
9
10
struct netdev_rx_queue {
	// 保存了当前队列的rps map
	struct rps_map *rps_map;
	// 对应的kobject
	struct kobject kobj;
	// 指向第一个rx队列
	struct netdev_rx_queue *first;
	// 引用计数
	atomic_t count;
} ____cacheline_aligned_in_smp;

然后就是rps_map,其实这个也就是保存了能够执行数据包的cpu。

1
2
3
4
5
6
7
8
struct rps_map {
	// cpu的个数,也就是cpus数组的个数
	unsigned int len;
	// RCU锁
	struct rcu_head rcu;
	// 保存了cpu的id.
	u16 cpus[0];
};

看完上面的结构,我们来看函数的实现。 get_rps_cpu主要是通过传递进来的skb然后来选择这个skb所应该被处理的cpu。它的逻辑很简单,就是通过skb计算hash,然后通过hash从对应的队列的rps_mapping中取得对应的cpu id。

这里有个要注意的就是这个hash值是可以交给硬件网卡去计算的,作者自己说是最好交由硬件去计算这个hash值,因为如果是软件计算的话会导致CPU 缓存不命中,带来一定的性能开销。

还有就是rps_mapping这个值是可以通过sys 文件系统设置的,位置在这里: /sys/class/net//queues/rx-/rps_cpus 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb)
{
	struct ipv6hdr *ip6;
	struct iphdr *ip;
	struct netdev_rx_queue *rxqueue;
	struct rps_map *map;
	int cpu = -1;
	u8 ip_proto;
	u32 addr1, addr2, ports, ihl;
	// rcu锁
	rcu_read_lock();
	// 取得设备对应的rx 队列
	if (skb_rx_queue_recorded(skb)) {
	..........................................
		rxqueue = dev->_rx + index;
	} else
		rxqueue = dev->_rx;

	if (!rxqueue->rps_map)
		goto done;
	// 如果硬件已经计算,则跳过计算过程
	if (skb->rxhash)
		goto got_hash; /* Skip hash computation on packet header */

	switch (skb->protocol) {
	case __constant_htons(ETH_P_IP):
		if (!pskb_may_pull(skb, sizeof(*ip)))
			goto done;
		// 得到计算hash的几个值
		ip = (struct iphdr *) skb->data;
		ip_proto = ip->protocol;
		// 两个地址
		addr1 = ip->saddr;
		addr2 = ip->daddr;
		// 得到ip头
		ihl = ip->ihl;
		break;
	case __constant_htons(ETH_P_IPV6):
		..........................................
		break;
	default:
		goto done;
	}
	ports = 0;
	switch (ip_proto) {
	case IPPROTO_TCP:
	case IPPROTO_UDP:
	case IPPROTO_DCCP:
	case IPPROTO_ESP:
	case IPPROTO_AH:
	case IPPROTO_SCTP:
	case IPPROTO_UDPLITE:
		if (pskb_may_pull(skb, (ihl * 4) + 4))
		// 我们知道tcp头的前4个字节就是源和目的端口,因此这里跳过ip头得到tcp头的前4个字节
			ports = *((u32 *) (skb->data + (ihl * 4)));
		break;

	default:
		break;
	}
	// 计算hash
	skb->rxhash = jhash_3words(addr1, addr2, ports, hashrnd);
	if (!skb->rxhash)
		skb->rxhash = 1;

got_hash:
	// 通过rcu得到对应rps map
	map = rcu_dereference(rxqueue->rps_map);
	if (map) {
		// 取得对应的cpu
		u16 tcpu = map->cpus[((u64) skb->rxhash * map->len) >> 32];
		// 如果cpu是online的,则返回计算出的这个cpu,否则跳出循环。
		if (cpu_online(tcpu)) {
			cpu = tcpu;
			goto done;
		}
	}

done:
	rcu_read_unlock();
	// 如果上面失败,则返回-1.
	return cpu;
}

然后是enqueue_to_backlog这个方法,首先我们知道在每个cpu都有一个softnet结构,而他有一个input_pkt_queue的队列,以前这个主要是用于非NAPi的驱动的,而这个patch则将这个队列也用与NAPI的处理中了。也就是每个cpu现在都会有一个input_pkt_queue队列,用于保存需要处理的数据包队列。这个队列作用现在是,如果发现不属于当前cpu处理的数据包,则我们可以直接将数据包挂载到他所属的cpu的input_pkt_queue中。

enqueue_to_backlog接受一个skb和cpu为参数,通过cpu来判断skb如何处理。要么加入所属的input_pkt_queue中,要么schecule 软中断。

还有个要注意就是我们知道NAPI为了兼容非NAPI模式,有个backlog的napi_struct结构,也就是非NAPI驱动会schedule backlog这个napi结构,而在enqueue_to_backlog中则是利用了这个结构,也就是它会schedule backlog,因为它会将数据放到input_pkt_queue中,而backlog的pool方法process_backlog就是从input_pkt_queue中取得数据然后交给上层处理。

这里还有一个会用到结构就是 rps_remote_softirq_cpus,它主要是保存了当前cpu上需要去另外的cpu schedule 软中断的cpu 掩码。因为我们可能将要处理的数据包放到了另外的cpu的input queue上,因此我们需要schedule 另外的cpu上的napi(也就是软中断),所以我们需要保存对应的cpu掩码,以便于后面遍历,然后schedule。

而这里为什么mask有两个元素,注释写的很清楚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
 * This structure holds the per-CPU mask of CPUs for which IPIs are scheduled
 * to be sent to kick remote softirq processing.  There are two masks since
 * the sending of IPIs must be done with interrupts enabled.  The select field
 * indicates the current mask that enqueue_backlog uses to schedule IPIs.
 * select is flipped before net_rps_action is called while still under lock,
 * net_rps_action then uses the non-selected mask to send the IPIs and clears
 * it without conflicting with enqueue_backlog operation.
 */
struct rps_remote_softirq_cpus {
	// 对应的cpu掩码
	cpumask_t mask[2];
	// 表示应该使用的数组索引
	int select;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static int enqueue_to_backlog(struct sk_buff *skb, int cpu)
{
	struct softnet_data *queue;
	unsigned long flags;
	// 取出传递进来的cpu的softnet-data结构
	queue = &per_cpu(softnet_data, cpu);

	local_irq_save(flags);
	__get_cpu_var(netdev_rx_stat).total++;
	// 自旋锁
	spin_lock(&queue->input_pkt_queue.lock);
	// 如果保存的队列还没到上限
	if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
	// 如果当前队列的输入队列长度不为空
		if (queue->input_pkt_queue.qlen) {
enqueue:
			// 将数据包加入到input_pkt_queue中,这里会有一个小问题,我们后面再说。
			__skb_queue_tail(&queue->input_pkt_queue, skb);
			spin_unlock_irqrestore(&queue->input_pkt_queue.lock,
				flags);
			return NET_RX_SUCCESS;
		}

		/* Schedule NAPI for backlog device */
		// 如果可以调度软中断
		if (napi_schedule_prep(&queue->backlog)) {
			// 首先判断数据包该不该当前的cpu处理
			if (cpu != smp_processor_id()) {
				// 如果不该,
				struct rps_remote_softirq_cpus *rcpus =
					&__get_cpu_var(rps_remote_softirq_cpus);

				cpu_set(cpu, rcpus->mask[rcpus->select]);
				__raise_softirq_irqoff(NET_RX_SOFTIRQ);
			} else
				// 如果就是应该当前cpu处理,则直接schedule 软中断,这里可以看到传递进去的是backlog
				__napi_schedule(&queue->backlog);
		}
		goto enqueue;
	}

	spin_unlock(&queue->input_pkt_queue.lock);

	__get_cpu_var(netdev_rx_stat).dropped++;
	local_irq_restore(flags);

	kfree_skb(skb);
	return NET_RX_DROP;
}

这里会有一个小问题,那就是假设此时一个属于cpu0的包进入处理,此时我们运行在cpu1,此时将数据包加入到input队列,然后cpu0上面刚好又来了一个cpu0需要处理的数据包,此时由于qlen不为0则又将数据包加入到input队列中,我们会发现cpu0上的napi没机会进行调度了。

google的patch对这个是这样处理的,在软中断处理函数中当数据包处理完毕,会调用net_rps_action来调度前面保存到其他cpu上的input队列。

下面就是代码片断(net_rx_action)

1
2
3
4
5
6
7
8
9
10
// 得到对应的rcpus.
rcpus = &__get_cpu_var(rps_remote_softirq_cpus);
	select = rcpus->select;
	// 翻转select,防止和enqueue_backlog冲突
	rcpus->select ^= 1;

	// 打开中断,此时下面的调度才会起作用.
	local_irq_enable();
	// 这个函数里面调度对应的远程cpu的napi.
	net_rps_action(&rcpus->mask[select]);

然后就是net_rps_action,这个函数很简单,就是遍历所需要处理的cpu,然后调度napi

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void net_rps_action(cpumask_t *mask)
{
	int cpu;

	/* Send pending IPI's to kick RPS processing on remote cpus. */
	// 遍历
	for_each_cpu_mask_nr(cpu, *mask) {
		struct softnet_data *queue = &per_cpu(softnet_data, cpu);
		if (cpu_online(cpu))
			// 到对应的cpu调用csd方法。
			__smp_call_function_single(cpu, &queue->csd, 0);
	}
	// 清理mask
	cpus_clear(*mask);
}

上面我们看到会调用csd方法,而上面的csd回掉就是被初始化为trigger_softirq函数。

1
2
3
4
5
6
7
static void trigger_softirq(void *data)
{
	struct softnet_data *queue = data;
	// 调度napi可以看到依旧是backlog 这个napi结构体。
	__napi_schedule(&queue->backlog);
	__get_cpu_var(netdev_rx_stat).received_rps++;
}

上面的函数都分析完毕了,剩下的就很简单了。

首先来看netif_rx如何被修改的,它被修改的很简单,首先是得到当前skb所应该被处理的cpu id,然后再通过比较这个cpu和当前正在处理的cpu id进行比较来做不同的处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int netif_rx(struct sk_buff *skb)
{
	int cpu;

	/* if netpoll wants it, pretend we never saw it */
	if (netpoll_rx(skb))
		return NET_RX_DROP;

	if (!skb->tstamp.tv64)
		net_timestamp(skb);
	// 得到cpu id。
	cpu = get_rps_cpu(skb->dev, skb);
	if (cpu < 0)
		cpu = smp_processor_id();
	// 通过cpu进行队列不同的处理
	return enqueue_to_backlog(skb, cpu);
}

然后是netif_receive_skb,这里patch将内核本身的这个函数改写为__netif_receive_skb。然后当返回值小于0,则说明不需要对队列进行处理,此时直接发送到3层。

1
2
3
4
5
6
7
8
9
10
11
int netif_receive_skb(struct sk_buff *skb)
{
	int cpu;

	cpu = get_rps_cpu(skb->dev, skb);

	if (cpu < 0)
		return __netif_receive_skb(skb);
	else
		return enqueue_to_backlog(skb, cpu);
}

最后来总结一下,可以看到input_pkt_queue是一个FIFO的队列,而且如果当qlen有值的时候,也就是在另外的cpu有数据包放到input_pkt_queue中,则当前cpu不会调度napi,而是将数据包放到input_pkt_queue中,然后等待trigger_softirq来调度napi。

因此这个patch完美的解决了软中断在多核下的均衡问题,并且没有由于是同一个连接会map到相同的cpu,并且input_pkt_queue的使用,因此乱序的问题也不会出现。

内核协议栈tcp层的内存管理

http://simohayha.iteye.com/blog/532450

http://www.ibm.com/developerworks/cn/linux/l-hisock.html#table1

http://blog.csdn.net/russell_tao/article/details/18711023

我们先来看tcp内存管理相关的几个内核参数,这些都能通过proc文件系统来修改:

1
2
3
4
// 内核写buf的最大值.
extern __u32 sysctl_wmem_max;
// 协议栈读buf的最大值
extern __u32 sysctl_rmem_max;

这两个值在/proc/sys/net/core 下。这里要注意,这两个值的单位是字节。

它们的初始化在sk_init里面,这里可以看到这两个值的大小是依赖于num_physpages的,而这个值应该是物理页数。也就是说这两个值依赖于物理内存:

1
2
3
4
5
6
7
8
9
10
11
12
void __init sk_init(void)
{
	if (num_physpages <= 4096) {
		sysctl_wmem_max = 32767;
		sysctl_rmem_max = 32767;
		sysctl_wmem_default = 32767;
		sysctl_rmem_default = 32767;
	} else if (num_physpages >= 131072) {
		sysctl_wmem_max = 131071;
		sysctl_rmem_max = 131071;
	}
}

而我通过搜索源码,只有设置套接口选项的时候,才会用到这两个值,也就是setsockopt,optname为SO_SNDBUF或者SO_RCVBUF时,来限制设置的值:

1
2
3
case SO_SNDBUF:
		if (val > sysctl_wmem_max)
			val = sysctl_wmem_max;

接下来就是整个tcp协议栈的socket的buf限制(也就是所有的socket). 这里要注意,这个东西的单位都是以页为单位的,我们下面就会看到。

1
2
3
4
其中sysctl_tcp_mem[0]表示整个tcp sock的buf限制.
sysctl_tcp_mem[1]也就是tcp sock内存使用的警戒线.
sysctl_tcp_mem[2]也就是tcp sock内存使用的hard limit,当超过这个限制,我们就要禁止再分配buf.
extern int sysctl_tcp_mem[3];

接下来就是针对每个sock的读写buf限制。

1
2
3
// 其中依次为最小buf,中等buf,以及最大buf.
extern int sysctl_tcp_wmem[3];
extern int sysctl_tcp_rmem[3];

tcp_init

这几个值的初始化在tcp_init里面,这里就能清晰的看到sysctl_tcp_mem的单位是页。而sysctl_tcp_wmem和sysctl_tcp_rmem的单位是字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void __init tcp_init(void)
{
	.................................
	// nr_pages就是页。
	nr_pages = totalram_pages - totalhigh_pages;
	limit = min(nr_pages, 1UL<<(28-PAGE_SHIFT)) >> (20-PAGE_SHIFT);
	limit = (limit * (nr_pages >> (20-PAGE_SHIFT))) >> (PAGE_SHIFT-11);
	limit = max(limit, 128UL);
	sysctl_tcp_mem[0] = limit / 4 * 3;
	sysctl_tcp_mem[1] = limit;
	sysctl_tcp_mem[2] = sysctl_tcp_mem[0] * 2;

	/* Set per-socket limits to no more than 1/128 the pressure threshold */
	// 转换为字节。
	limit = ((unsigned long)sysctl_tcp_mem[1]) << (PAGE_SHIFT - 7);
	max_share = min(4UL*1024*1024, limit);

	sysctl_tcp_wmem[0] = SK_MEM_QUANTUM;
	sysctl_tcp_wmem[1] = 16*1024;
	sysctl_tcp_wmem[2] = max(64*1024, max_share);

	sysctl_tcp_rmem[0] = SK_MEM_QUANTUM;
	sysctl_tcp_rmem[1] = 87380;
	sysctl_tcp_rmem[2] = max(87380, max_share);
	................................
}

然后就是读写buf的最小值

1
2
#define SOCK_MIN_SNDBUF 2048
#define SOCK_MIN_RCVBUF 256

最后就是当前tcp协议栈已经分配了的buf的总大小。这里要注意,这个值也是以页为单位的。

1
atomic_t tcp_memory_allocated

而上面的这些值如何与协议栈关联起来呢,我们来看tcp_prot结构,可以看到这些值的地址都被放到对应的tcp_prot的域。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct proto tcp_prot = {
	.name = "TCP",
	.owner = THIS_MODULE,
	...................................................
	.enter_memory_pressure = tcp_enter_memory_pressure,
	.sockets_allocated = &tcp_sockets_allocated,
	.orphan_count = &tcp_orphan_count,
	.memory_allocated = &tcp_memory_allocated,
	.memory_pressure = &tcp_memory_pressure,
	.sysctl_mem = sysctl_tcp_mem,
	.sysctl_wmem = sysctl_tcp_wmem,
	.sysctl_rmem = sysctl_tcp_rmem,
	........................................................
};

而对应的sock域中的几个值,这几个域非常重要,我们来看他们表示的含义

sk_rcvbuf和sk_sndbuf,这两个值分别代表每个sock的读写buf的最大限制

sk_rmem_alloc和sk_wmem_alloc这两个值分别代表已经提交的数据包的字节数。

读buf意味着进入tcp层的数据大小,而当数据提交给用户空间之后,这个值会相应的减去提交的大小(也就类似写buf的sk_wmem_queued)。

写buf意味着提交给ip层。可以看到这个值的增加是在tcp_transmit_skb中进行的。

而sk_wmem_queued也就代表skb的写队列write_queue的大小。

还有一个sk_forward_alloc,这个值表示一个预分配置,也就是整个tcp协议栈的内存cache,第一次为一个缓冲区分配buf的时候,我们不会直接分配精确的大小,而是按页来分配,而分配的大小就是这个值,下面我们会看到这个。并且这个值初始是0.

1
2
3
4
5
6
7
8
9
10
11
struct sock {
	int sk_rcvbuf;
	atomic_t sk_rmem_alloc;
	atomic_t sk_wmem_alloc;
	int sk_forward_alloc;
	..........................
	int sk_sndbuf;
	// 这个表示写buf已经分配的字节长度
	int sk_wmem_queued;
	...........................
}

sk_sndbuf和sk_rcvbuf,这两个的初始化在这里:

1
2
3
4
5
6
7
static int tcp_v4_init_sock(struct sock *sk)
{
	..................................
	sk->sk_sndbuf = sysctl_tcp_wmem[1];
	sk->sk_rcvbuf = sysctl_tcp_rmem[1];
	..........................
}

而当进入establish状态之后,sock会自己调整sndbuf和rcvbuf.他是通过tcp_init_buffer_space来进行调整的.这个函数会调用tcp_fixup_rcvbuf和tcp_fixup_sndbuf来调整读写buf的大小.

这里有用到sk_userlock这个标记,这个标记主要就是用来标记SO_SNDBUF 和SO_RCVBUF套接口选项是否被设置。而是否设置对应的值为:

1
2
#define SOCK_SNDBUF_LOCK 1
#define SOCK_RCVBUF_LOCK  2

我们可以看下面的设置SO_SNDBUF 和SO_RCVBUF的代码片断:

1
2
3
4
5
6
// 首先设置sk_userlocks.
sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
if ((val * 2) < SOCK_MIN_SNDBUF)
	sk->sk_sndbuf = SOCK_MIN_SNDBUF;
else
	sk->sk_sndbuf = val * 2;

因此内核里面的处理是这样的,如果用户已经通过套接字选项设置了读或者写buf的大小,那么这里将不会调整读写buf的大小,否则就进入tcp_fixup_XXX来调整大小。

还有一个要注意的就是MAX_TCP_HEADER,这个值表示了TCP + IP + link layer headers 以及option的长度。

我们来看代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
static void tcp_init_buffer_space(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	int maxwin;

	// 判断sk_userlocks,来决定是否需要fix缓冲区大小。
	if (!(sk->sk_userlocks & SOCK_RCVBUF_LOCK))
		tcp_fixup_rcvbuf(sk);
	if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK))
		tcp_fixup_sndbuf(sk);
......................................

}

接下来来看这两个函数如何来调整读写buf的大小,不过这里还有些疑问,就是为什么是要和3sndmem以及4rcvmem:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static void tcp_fixup_sndbuf(struct sock *sk)
{
	// 首先通过mss,tcp头,以及sk_buff的大小,得到一个最小范围的sndmem。
	int sndmem = tcp_sk(sk)->rx_opt.mss_clamp + MAX_TCP_HEADER + 16 +sizeof(struct sk_buff);

	// 然后取sysctl_tcp_wmem[2]和3倍的sndmem之间的最小值。
	if (sk->sk_sndbuf < 3 * sndmem)
		sk->sk_sndbuf = min(3 * sndmem, sysctl_tcp_wmem[2]);
}

static void tcp_fixup_rcvbuf(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	// 这里和上面类似,也是先得到最小的一个rcvmem段。
	int rcvmem = tp->advmss + MAX_TCP_HEADER + 16 + sizeof(struct sk_buff);

	/* Try to select rcvbuf so that 4 mss-sized segments
	 * will fit to window and corresponding skbs will fit to our rcvbuf.
	 * (was 3; 4 is minimum to allow fast retransmit to work.)
	 */
	// 这里则是通过sysctl_tcp_adv_win_scale来调整rcvmem的值。
	while (tcp_win_from_space(rcvmem) < tp->advmss)
		rcvmem += 128;
	if (sk->sk_rcvbuf < 4 * rcvmem)
		sk->sk_rcvbuf = min(4 * rcvmem, sysctl_tcp_rmem[2]);
}

ok,看完初始化,我们来看协议栈具体如何管理内存的,先来看发送端,发送端的主要实现是在tcp_sendmsg里面,这个函数我们前面已经详细的分析过了,我们这次只分析里面几个与内存相关的东西。

来看代码片断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int tcp_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
		size_t size)
{
	..................................

	if (copy <= 0) {
new_segment:
		if (!sk_stream_memory_free(sk))
			goto wait_for_sndbuf;

		skb = sk_stream_alloc_skb(sk, select_size(sk),
		sk->sk_allocation);
		if (sk->sk_route_caps & NETIF_F_ALL_CSUM)
			skb->ip_summed = CHECKSUM_PARTIAL;

		skb_entail(sk, skb);
		copy = size_goal;
		max = size_goal;
	..................
}

可以看到这里第一个sk_stream_memory_free用来判断是否还有空间来供我们分配,如果没有则跳到wait_for_sndbuf来等待buf的释放。

然后如果有空间供我们分配,则调用sk_stream_alloc_skb来分配一个skb,然后这个大小的选择是通过select_size。

最后调用skb_entail来更新相关的域。

现在我们就来详细看上面的四个函数,先来看第一个:

1
2
3
4
static inline int sk_stream_memory_free(struct sock *sk)
{
	return sk->sk_wmem_queued < sk->sk_sndbuf;
}

sk_stream_memory_free实现很简单,就是判断当前已经分配的写缓冲区的大小(sk_wmem_queued)是否小于当前写缓冲区(sk_sndbuf)的最大限制。

然后是skb_entail,这个函数主要是当我们分配完buf后,进行一些相关域的更新,以及添加skb到writequeue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static inline void skb_entail(struct sock *sk, struct sk_buff *skb)
{
	struct tcp_sock *tp = tcp_sk(sk);
	struct tcp_skb_cb *tcb = TCP_SKB_CB(skb);
	............................
	skb_header_release(skb);
	tcp_add_write_queue_tail(sk, skb);
	// 增加sk_wmem_queued.
	sk->sk_wmem_queued += skb->truesize;
	// 这里调整sk_forward_alloc的大小,也就是预分配buf的大小(减小).
	sk_mem_charge(sk, skb->truesize);
	if (tp->nonagle & TCP_NAGLE_PUSH)
		tp->nonagle &= ~TCP_NAGLE_PUSH;
}
// 这个函数很简单,就是将sk_forward_alloc - size.
static inline void sk_mem_charge(struct sock *sk, int size)
{
	if (!sk_has_account(sk))
		return;
	sk->sk_forward_alloc -= size;
}

然后是select_size,在看这个之前我们先来坎SKB_MAX_HEAD的实现. SKB_MAX_HEAD主要是得到要分配的tcp数据段(不包括头)在一页中最大为多少。

1
2
3
4
5
#define SKB_WITH_OVERHEAD(X) \
	((X) - SKB_DATA_ALIGN(sizeof(struct skb_shared_info)))
#define SKB_MAX_ORDER(X, ORDER) \
	SKB_WITH_OVERHEAD((PAGE_SIZE << (ORDER)) - (X))
#define SKB_MAX_HEAD(X)    (SKB_MAX_ORDER((X), 0))

我们带入代码来看,我们下面的代码是SKB_MAX_HEAD(MAX_TCP_HEADER),展开这个宏可以看到就是PAGE_SIZE-MAX_TCP_HEADER-SKB_DATA_ALIGN(sizeof(struct skb_shared_info).其实也就是一页还能容纳多少tcp的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static inline int select_size(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	// 首先取得存储的mss。
	int tmp = tp->mss_cache;

	// 然后判断是否使用scatter–gather(前面blog有介绍)
	if (sk->sk_route_caps & NETIF_F_SG) {
		if (sk_can_gso(sk))
			tmp = 0;
		else {
			// 然后开始计算buf的长度。
			int pgbreak = SKB_MAX_HEAD(MAX_TCP_HEADER);

			// 如果mss大于pgbreak,那么说明我们一页放不下当前需要的tcp数据,因此我们将会在skb的页区域分配,而skb的页区域是有限制的,因此tmp必须小于这个值。
			if (tmp >= pgbreak &&
					tmp <= pgbreak + (MAX_SKB_FRAGS - 1) * PAGE_SIZE)
				tmp = pgbreak;
		}
	}

	return tmp;
}

sk_stream_alloc_skb

接下来来看sk_stream_alloc_skb的实现。

1 它会调用alloc_skb_fclone来分配内存,这个函数就不详细分析了,我们只需要知道它会从slab里分配一块内存,而大小为size+max_header(上面的分析我们知道slect_size只计算数据段).

2 如果分配成功,则调用sk_wmem_schedule来判断我们所分配的skb的大小是否精确,是的话,就调整指针,然后返回。

3 否则调用tcp_enter_memory_pressure设置标志进入TCP memory pressure zone。然后再调用sk_stream_moderate_sndbuf调整sndbuf(缩小sndbuf)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct sk_buff *sk_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
{
	struct sk_buff *skb;

	// 4字节对其
	size = ALIGN(size, 4);
	// 分配skb。
	skb = alloc_skb_fclone(size + sk->sk_prot->max_header, gfp);
	if (skb) {
		// 得到精确的大小。
		if (sk_wmem_schedule(sk, skb->truesize)) {
			// 返回skb。
			skb_reserve(skb, skb_tailroom(skb) - size);
				return skb;
		}
		__kfree_skb(skb);
	} else {
		// 否则设置全局标记进入pressure zone
		sk->sk_prot->enter_memory_pressure(sk);
		sk_stream_moderate_sndbuf(sk);
	}
	return NULL;
}

ok,现在就来看上面的几个函数的实现。先来看几个简单的。

首先是tcp_enter_memory_pressure,这个函数很简单,就是判断全局标记tcp_memory_pressure,然后设置这个标记。这个标记主要是用来通知其他模块调整的,比如窗口大小等等,详细的话自己搜索这个值,就知道了。

1
2
3
4
5
6
7
8
void tcp_enter_memory_pressure(struct sock *sk)
{
	if (!tcp_memory_pressure) {
		NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPMEMORYPRESSURES);
		// 设置压力标志。
		tcp_memory_pressure = 1;
	}
}

然后是sk_stream_moderate_sndbuf,这个函数也是要使用sk_userlocks,来判断是否已经被用户设置了。可以看到如果我们自己设置过了snd_buf的话,内核就不会帮我们调整它的大小了。

1
2
3
4
5
6
7
8
static inline void sk_stream_moderate_sndbuf(struct sock *sk)
{
	if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK)) {
		// 它的大小调整为大于最小值,小于sk->sk_wmem_queued >> 1。
		sk->sk_sndbuf = min(sk->sk_sndbuf, sk->sk_wmem_queued >> 1);
		sk->sk_sndbuf = max(sk->sk_sndbuf, SOCK_MIN_SNDBUF);
	}
}

sk_wmem_schedule

最后来看最核心的一个函数sk_wmem_schedule,这个函数只是对__sk_mem_schedule的简单封装。这里要知道传递进来的size是skb->truesize,也就是所分配的skb的真实大小。并且第一次进入这个函数,也就是分配第一个缓冲区包时,sk_forward_alloc是为0的,也就是说,第一次必然会执行__sk_mem_schedule函数。

1
2
3
4
5
6
7
8
static inline int sk_wmem_schedule(struct sock *sk, int size)
{
	if (!sk_has_account(sk))
		return 1;
	// 先比较size(也就是skb->truesize)和预分配的内存大小。如果小于等于预分配的大小,则直接返回,否则调用__sk_mem_schedule进行调整。
	return size <= sk->sk_forward_alloc ||
		__sk_mem_schedule(sk, size, SK_MEM_SEND);
}

来看__sk_mem_schedule,这个函数的功能注释写的很清楚:

increase sk_forward_alloc and memory_allocated

然后来看源码。这里在看之前,我们要知道,协议栈通过读写buf的使用量,划分了3个区域,或者说标志。不同标志进行不同处理。这里的区域的划分是通过sysctl_tcp_mem,也就是prot->sysctl_mem这个数组进行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// 页的大小
#define SK_MEM_QUANTUM ((int)PAGE_SIZE)

int __sk_mem_schedule(struct sock *sk, int size, int kind)
{
	struct proto *prot = sk->sk_prot;
	// 首先得到size占用几个内存页。
	int amt = sk_mem_pages(size);
	int allocated;
	// 更新sk_forward_alloc,可以看到这个值是页的大小的倍数。
	sk->sk_forward_alloc += amt * SK_MEM_QUANTUM;

	// amt+memory_allocated也就是当前的总得内存使用量加上将要分配的内存的话,现在的tcp协议栈的总得内存使用量。(可以看到是以页为单位的。
	allocated = atomic_add_return(amt, prot->memory_allocated);

	// 然后开始判断,将会落入哪一个区域。通过上面的分析我们知道sysctl_mem也就是sysctl_tcp_mem.

	// 先判断是否小于等于内存最小使用限额。
	if (allocated <= prot->sysctl_mem[0]) {
		// 这里取消memory_pressure,然后返回。
		if (prot->memory_pressure && *prot->memory_pressure)
			*prot->memory_pressure = 0;
		return 1;
	}

	// 然后判断Under pressure。
	if (allocated > prot->sysctl_mem[1])
		// 大于sysctl_mem[1]说明,已经进入pressure,一次你需要调用tcp_enter_memory_pressure来设置标志。
		if (prot->enter_memory_pressure)
			prot->enter_memory_pressure(sk);

	// 如果超过的hard limit。则进入另外的处理。
	if (allocated > prot->sysctl_mem[2])
		goto suppress_allocation;

	// 判断类型,这里只有两种类型,读和写。总的内存大小判断完,这里开始判断单独的sock的读写内存。
	if (kind == SK_MEM_RECV) {
		if (atomic_read(&sk->sk_rmem_alloc) < prot->sysctl_rmem[0])
			return 1;
	} else { /* SK_MEM_SEND */
		// 这里当为tcp的时候,写队列的大小只有当对端数据确认后才会更新,因此我们要用sk_wmem_queued来判断。
		if (sk->sk_type == SOCK_STREAM) {
			if (sk->sk_wmem_queued < prot->sysctl_wmem[0])
				return 1;
		} else if (atomic_read(&sk->sk_wmem_alloc) <
			   prot->sysctl_wmem[0])
				return 1;
	}

	// 程序到达这里说明总的内存大小在sysctl_mem[0]和sysctl_mem[2]之间,因此我们再次判断memory_pressure
	if (prot->memory_pressure) {
		int alloc;

		// 如果没有在memory_pressure区域,则我们直接返回1。
		if (!*prot->memory_pressure)
			return 1;
		// 这个其实也就是计算整个系统分配的socket的多少。
		alloc = percpu_counter_read_positive(prot->sockets_allocated);
		// 这里假设其余的每个sock所占用的buf都和当前的sock一样大的时候,如果他们的总和小于sysctl_mem[2],也就是hard limit。那么我们也认为这次内存请求是成功的。
		if (prot->sysctl_mem[2] > alloc *
			sk_mem_pages(sk->sk_wmem_queued +
			 atomic_read(&sk->sk_rmem_alloc) +
				 sk->sk_forward_alloc))
			return 1;
	}

suppress_allocation:

	// 到达这里说明,我们超过了hard limit或者说处于presure 区域。
	if (kind == SK_MEM_SEND && sk->sk_type == SOCK_STREAM) {
		// 调整sk_sndbuf(减小).这个函数前面已经分析过了。
		sk_stream_moderate_sndbuf(sk);
		// 然后比较和sk_sndbuf的大小,如果大于的话,就说明下次我们再次要分配buf的时候会在tcp_memory_free阻塞住,因此这次我们返回1.
		if (sk->sk_wmem_queued + size >= sk->sk_sndbuf)
			return 1;
	}

	/* Alas. Undo changes. */
	// 到达这里说明,请求内存是不被接受的,因此undo所有的操作。然后返回0.
	sk->sk_forward_alloc -= amt * SK_MEM_QUANTUM;
	atomic_sub(amt, prot->memory_allocated);
	return 0;
}

接下来来看个很重要的函数skb_set_owner_w。

顾名思义,这个函数也就是将一个skb和scok关联起来。只不过关联的时候更新sock相应的域。我们来看源码:

1
2
3
4
5
6
7
8
9
10
static inline void skb_set_owner_w(struct sk_buff *skb, struct sock *sk)
{
	skb_orphan(skb);
	// 与传递进来的sock关联起来
	skb->sk = sk;
	// 设置skb的析构函数
	skb->destructor = sock_wfree;
	// 更新sk_wmem_alloc域,就是sk_wmem_alloc+truesize.
	atomic_add(skb->truesize, &sk->sk_wmem_alloc);
}

ok,接下来来看个scok_wfree函数,这个函数做得基本和上面函数相反。这个函数都是被kfree_skb自动调用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void sock_wfree(struct sk_buff *skb)
{
	struct sock *sk = skb->sk;
	int res;

	// 更新sk_wmem_alloc,减去skb的大小。
	res = atomic_sub_return(skb->truesize, &sk->sk_wmem_alloc);
	if (!sock_flag(sk, SOCK_USE_WRITE_QUEUE))
	// 唤醒等待队列,也就是唤醒等待内存分配。
		sk->sk_write_space(sk);
	if (res == 0)
		__sk_free(sk);
}

而skb_set_owner_w是什么时候被调用呢,我们通过搜索代码可以看到,它是在tcp_transmit_skb中被调用的。而tcp_transmit_skb我们知道是传递数据包到ip层的函数。

而kfree_skb被调用也就是在对端已经确认完我们发送的包后才会被调用来释放skb。

tcp_rcv_established

接下来来看接收数据的内存管理。我们主要来看tcp_rcv_established这个函数,我前面的blog已经断断续续的分析过了,因此这里我们只看一些重要的代码片断。

这里我们要知道,代码能到达下面的位置,则说明,数据并没有直接拷贝到用户空间。否则的话,是不会进入下面的片断的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if (!eaten) {
	..........................................

	// 如果skb的大小大于预分配的值,如果大于则要另外处理。
	if ((int)skb->truesize > sk->sk_forward_alloc)
			goto step5;
	__skb_pull(skb, tcp_header_len);
	__skb_queue_tail(&sk->sk_receive_queue, skb);
	// 这里关联skb和对应的sk,并且更新相关的域,我们下面会分析这个函数。
	skb_set_owner_r(skb, sk);
	tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
}
...............................................

step5:
	if (th->ack && tcp_ack(sk, skb, FLAG_SLOWPATH) < 0)
		goto discard;

	tcp_rcv_rtt_measure_ts(sk, skb);

	/* Process urgent data. */
	tcp_urg(sk, skb, th);

	/* step 7: process the segment text */
	// 最核心的函数就是这个。我们接下来会详细分析这个函数。
	tcp_data_queue(sk, skb);

	tcp_data_snd_check(sk);
	tcp_ack_snd_check(sk);
	return 0;

先来看skb_set_owner_r函数,这个函数关联skb和sk其实它和skb_set_owner_w类似:

1
2
3
4
5
6
7
8
9
10
11
12
static inline void skb_set_owner_r(struct sk_buff *skb, struct sock *sk)
{
	skb_orphan(skb);
	// 关联sk
	skb->sk = sk;
	// 设置析构函数
	skb->destructor = sock_rfree;
	// 更新rmem_alloc
	atomic_add(skb->truesize, &sk->sk_rmem_alloc);
	// 改变forward_alloc.
	sk_mem_charge(sk, skb->truesize);
}

tcp_data_queue

然后是tcp_data_queue,这个函数主要用来排队接收数据,并update相关的读buf。由于这个函数比较复杂,我们只关心我们感兴趣的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static void tcp_data_queue(struct sock *sk, struct sk_buff *skb)
{
	struct tcphdr *th = tcp_hdr(skb);
	struct tcp_sock *tp = tcp_sk(sk);
	int eaten = -1;
	.......................................
	// 首先判断skb的开始序列号和我们想要接收的序列号。如果相等开始处理这个数据包(也就是拷贝到用户空间).
	if (TCP_SKB_CB(skb)->seq == tp->rcv_nxt) {
		if (tcp_receive_window(tp) == 0)
			goto out_of_window;

		// tp的ucopy我前面的blog已经详细分析过了。这里就不解释了。
		if (tp->ucopy.task == current &&
			tp->copied_seq == tp->rcv_nxt && tp->ucopy.len &&sock_owned_by_user(sk) && !tp->urg_data)
		{
			// 计算将要拷贝给用户空间的大小。
			int chunk = min_t(unsigned int, skb->len,tp->ucopy.len);

			// 设置状态,说明我们处于进程上下文。
			__set_current_state(TASK_RUNNING);

			local_bh_enable();
			// 拷贝skb
			if (!skb_copy_datagram_iovec(skb, 0, tp->ucopy.iov, chunk)) {
				tp->ucopy.len -= chunk;
				tp->copied_seq += chunk;
				// 更新eaten,它的默认值为-1.
				eaten = (chunk == skb->len && !th->fin);
				tcp_rcv_space_adjust(sk);
			}
			local_bh_disable();
		}

		// 如果小于0则说明没有拷贝成功,或者说就没有进行拷贝。此时需要更新sock的相关域。
		if (eaten <= 0) {
queue_and_out:
			// 最关键的tcp_try_rmem_schedule函数。接下来会详细分析。
			if (eaten < 0 &&
		          tcp_try_rmem_schedule(sk, skb->truesize))
				goto drop;

			// 关联skb和sk。到达这里说明tcp_try_rmem_schedule成功,也就是返回0.
			skb_set_owner_r(skb, sk);
			// 加skb到receive_queue.
			__skb_queue_tail(&sk->sk_receive_queue, skb);
		}
		// 更新期待序列号。
		tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
		..............................................

		.....................................

		tcp_fast_path_check(sk);

		if (eaten > 0)
			__kfree_skb(skb);
		else if (!sock_flag(sk, SOCK_DEAD))
			sk->sk_data_ready(sk, 0);
		return;
	}
	// 下面就是处理乱序包。以后会详细分析。
	......................................
}

tcp_try_rmem_schedule

接下来我们就来看tcp_try_rmem_schedule这个函数,这个函数如果返回0则说明sk_rmem_schedule返回1,而sk_rmem_schedule和sk_wmem_schedule是一样的。也就是看当前的skb加入后有没有超过读buf的限制。并更新相关的域。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static inline int tcp_try_rmem_schedule(struct sock *sk, unsigned int size)
{
	// 首先判断rmem_alloc(当前的读buf字节数)是否大于最大buf字节数,如果大于则调用tcp_prune_queue调整分配的buf。否则调用sk_rmem_schedule来调整相关域(sk_forward_alloc)。
	if (atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf ||!sk_rmem_schedule(sk, size)) {

		// 调整分配的buf。
		if (tcp_prune_queue(sk) < 0)
			return -1;
		// 更新sk的相关域。
		if (!sk_rmem_schedule(sk, size)) {
			if (!tcp_prune_ofo_queue(sk))
				return -1;

			if (!sk_rmem_schedule(sk, size))
				return -1;
		}
	}
	return 0;
}

来看sk_rmem_schedule,这个函数很简单,就是封装了__sk_mem_schedule。而这个函数我们上面已经分析过了。

1
2
3
4
5
6
7
static inline int sk_rmem_schedule(struct sock *sk, int size)
{
	if (!sk_has_account(sk))
		return 1;
	return size <= sk->sk_forward_alloc ||
		__sk_mem_schedule(sk, size, SK_MEM_RECV);
}

tcp_prune_queue

最后是tcp_prune_queue,这个函数主要是用来丢掉一些skb,因为到这个函数就说明我们的内存使用已经到极限了,因此我们要合并一些buf。这个合并也就是将序列号连续的段进行合并。

这里我们要知道tcp的包是有序的,因此内核中tcp专门有一个队列来保存那些Out of order segments。因此我们这里会先处理这个队列里面的skb。

然后调用tcp_collapse来处理接收队列里面的skb。和上面的类似。

这里要注意,合并的话都是按页来合并,也就是先分配一页大小的内存,然后将老的skb复制进去,最后free掉老的buf。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int tcp_prune_queue(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	..................................
	// 如果rmem_alloc过于大,则重新计算窗口的大小。一半都会缩小窗口。
	if (atomic_read(&sk->sk_rmem_alloc) >= sk->sk_rcvbuf)
		tcp_clamp_window(sk);
	// 如果处于pressure区域,则调整窗口大小。这里也是缩小窗口。
	else if (tcp_memory_pressure)
		tp->rcv_ssthresh = min(tp->rcv_ssthresh, 4U * tp->advmss);

	// 处理ofo队列。
	tcp_collapse_ofo_queue(sk);
	// 如果接收队列为非空,则调用tcp_collapse来处理sk_receive_queue
	if (!skb_queue_empty(&sk->sk_receive_queue))
		tcp_collapse(sk, &sk->sk_receive_queue,
				 skb_peek(&sk->sk_receive_queue),
				 NULL,
				 tp->copied_seq, tp->rcv_nxt);
	// 更新全局的已分配内存的大小,也就是memory_allocated,接下来会详细介绍这个函数。
	sk_mem_reclaim(sk);

	// 如果调整后小于sk_rcvbuf,则返回0.
	if (atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf)
		return 0;

	......................................
	return -1;
}

tcp_collapse_ofo_queue 尝试减小ofo queue占内存的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/* Collapse ofo queue. Algorithm: select contiguous sequence of skbs
 * and tcp_collapse() them until all the queue is collapsed.
 */
static void tcp_collapse_ofo_queue(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	struct sk_buff *skb = skb_peek(&tp->out_of_order_queue);
	struct sk_buff *head;
	u32 start, end;

	if (skb == NULL)
		return;

	start = TCP_SKB_CB(skb)->seq;
	end = TCP_SKB_CB(skb)->end_seq;
	head = skb;

	for (;;) {
		struct sk_buff *next = NULL;

		if (!skb_queue_is_last(&tp->out_of_order_queue, skb))
			next = skb_queue_next(&tp->out_of_order_queue, skb);
		skb = next;

		/* Segment is terminated when we see gap or when
		 * we are at the end of all the queue. */
		if (!skb ||
			after(TCP_SKB_CB(skb)->seq, end) ||
			before(TCP_SKB_CB(skb)->end_seq, start)) {  // 找到ofo queue中连续的一段skb,即 prev->end_seq >= next->seq
			tcp_collapse(sk, &tp->out_of_order_queue,
					 head, skb, start, end);            // 尝试减小这一段连续skb占用的内存
			head = skb;
			if (!skb)
				break;
			/* Start new segment */
			start = TCP_SKB_CB(skb)->seq;               // 下个skb就是新的一段的开始
			end = TCP_SKB_CB(skb)->end_seq;
		} else {
			if (before(TCP_SKB_CB(skb)->seq, start))    // 这种情况只可能是tcp_collapse中大包拆成小包,拆到一半内存不够,没拆完导致。
				start = TCP_SKB_CB(skb)->seq;
			if (after(TCP_SKB_CB(skb)->end_seq, end))
				end = TCP_SKB_CB(skb)->end_seq;
		}
	}
}

tcp_collapse,gro上来的包有可能是大于4k的包,所以这个函数有时是在拆包,利弊难定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// 删除一个skb,返回下个skb
static struct sk_buff *tcp_collapse_one(struct sock *sk, struct sk_buff *skb,
					struct sk_buff_head *list)
{
	struct sk_buff *next = NULL;

	if (!skb_queue_is_last(list, skb))
		next = skb_queue_next(list, skb);

	__skb_unlink(skb, list);
	__kfree_skb(skb);
	NET_INC_STATS_BH(sock_net(sk), LINUX_MIB_TCPRCVCOLLAPSED);

	return next;
}

/* Collapse contiguous sequence of skbs head..tail with
 * sequence numbers start..end.
 *
 * If tail is NULL, this means until the end of the list.
 *
 * Segments with FIN/SYN are not collapsed (only because this
 * simplifies code)
 */
static void
tcp_collapse(struct sock *sk, struct sk_buff_head *list,
		 struct sk_buff *head, struct sk_buff *tail,
		 u32 start, u32 end)
{
	struct sk_buff *skb, *n;
	bool end_of_skbs;

	/* First, check that queue is collapsible and find
	 * the point where collapsing can be useful. */
	skb = head;
restart:
	end_of_skbs = true;
	skb_queue_walk_from_safe(list, skb, n) {
		if (skb == tail)
			break;
		/* No new bits? It is possible on ofo queue. */
		if (!before(start, TCP_SKB_CB(skb)->end_seq)) { // 这种情况现在是不会出现的,以前代码有可能出现??
			skb = tcp_collapse_one(sk, skb, list);
			if (!skb)
				break;
			goto restart;
		}

		/* The first skb to collapse is:
		 * - not SYN/FIN and
		 * - bloated or contains data before "start" or
		 *   overlaps to the next one.
		 */
		if (!tcp_hdr(skb)->syn && !tcp_hdr(skb)->fin &&         // SYN,FIN 不合并,简化操作
			(tcp_win_from_space(skb->truesize) > skb->len ||    // 合并后可能减小空间的情况才合并
			 before(TCP_SKB_CB(skb)->seq, start))) {            // seq到start的数据已经被读走了,有减小空间的可能
			end_of_skbs = false;
			break;
		}

		if (!skb_queue_is_last(list, skb)) {
			struct sk_buff *next = skb_queue_next(list, skb);
			if (next != tail &&
				TCP_SKB_CB(skb)->end_seq != TCP_SKB_CB(next)->seq) { // 两个skb之间有交集,有减小空间可能
				end_of_skbs = false;
				break;
			}
		}

		/* Decided to skip this, advance start seq. */
		start = TCP_SKB_CB(skb)->end_seq;     // 否则向后继续找可能减小空间的第一个skb
	}
	if (end_of_skbs || tcp_hdr(skb)->syn || tcp_hdr(skb)->fin)
		return;

	while (before(start, end)) {  // 落在在start到end的包就是这次要合并的
		struct sk_buff *nskb;
		unsigned int header = skb_headroom(skb); // skb中协议头的大小
		int copy = SKB_MAX_ORDER(header, 0);     // 一个页(4k)中出去协议头空间的大小,也就是能容下的数据大小

		/* Too big header? This can happen with IPv6. */
		if (copy < 0)
			return;
		if (end - start < copy)
			copy = end - start;
		nskb = alloc_skb(copy + header, GFP_ATOMIC);
		if (!nskb)
			return;

		skb_set_mac_header(nskb, skb_mac_header(skb) - skb->head);
		skb_set_network_header(nskb, (skb_network_header(skb) -
						  skb->head));
		skb_set_transport_header(nskb, (skb_transport_header(skb) -
						skb->head));
		skb_reserve(nskb, header);
		memcpy(nskb->head, skb->head, header);
		memcpy(nskb->cb, skb->cb, sizeof(skb->cb));
		TCP_SKB_CB(nskb)->seq = TCP_SKB_CB(nskb)->end_seq = start;
		__skb_queue_before(list, skb, nskb);
		skb_set_owner_r(nskb, sk);

		/* Copy data, releasing collapsed skbs. */
		while (copy > 0) {    // 如果copy = 0,这里就会出BUG,但如果没有认为改,是不会的。ipv6会吗???。后面版本改进这函数了,也不会出现copy=0了
			int offset = start - TCP_SKB_CB(skb)->seq;
			int size = TCP_SKB_CB(skb)->end_seq - start;

			BUG_ON(offset < 0);
			if (size > 0) { // copy旧的skb数据到新的skb上
				size = min(copy, size);
				if (skb_copy_bits(skb, offset, skb_put(nskb, size), size))
					BUG();
				TCP_SKB_CB(nskb)->end_seq += size;
				copy -= size;
				start += size;
			}
			if (!before(start, TCP_SKB_CB(skb)->end_seq)) { // 旧的skb被copy完了就删掉
				skb = tcp_collapse_one(sk, skb, list);
				if (!skb ||
					skb == tail ||
					tcp_hdr(skb)->syn ||
					tcp_hdr(skb)->fin)
					return;
			}
		}
	}
}

来看sk_mem_reclaim函数,它只是简单的封装了__sk_mem_reclaim

1
2
3
4
5
6
7
8
static inline void sk_mem_reclaim(struct sock *sk)
{
	if (!sk_has_account(sk))
		return;
	// 如果sk_forward_alloc大于1页则调用__sk_mem_reclaim,我们知道sk_forward_alloc是以页为单位的,因此这里也就是和大于0一样。
	if (sk->sk_forward_alloc >= SK_MEM_QUANTUM)
		__sk_mem_reclaim(sk);
}

__sk_mem_reclaim就是真正操作的函数,它会更新memory_allocated:

1
2
3
4
5
6
7
8
9
10
11
12
void __sk_mem_reclaim(struct sock *sk)
{
	struct proto *prot = sk->sk_prot;
	// 更新memory_allocated,这里我们知道memory_allocated也是以页为单位的,因此需要将sk_forward_alloc转化为页。
	atomic_sub(sk->sk_forward_alloc >> SK_MEM_QUANTUM_SHIFT,prot->memory_allocated);

	// 更新这个sk的sk_forward_alloc为一页。
	sk->sk_forward_alloc &= SK_MEM_QUANTUM - 1;
	// 判断是否处于pressure区域,是的话更新memory_pressure变量。
	if (prot->memory_pressure && *prot->memory_pressure &&(atomic_read(prot->memory_allocated) < (prot->sysctl_mem[0]))
		*prot->memory_pressure = 0;
}

最后看一下读buf的释放。这个函数会在kfree_skb中被调用。

1
2
3
4
5
6
7
8
9
void sock_rfree(struct sk_buff *skb)
{

	struct sock *sk = skb->sk;
	// 更新rmem_alloc
	atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
	// 更新forward_alloc.
	sk_mem_uncharge(skb->sk, skb->truesize);
}