kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

内核协议栈tcp层的内存管理

http://simohayha.iteye.com/blog/532450

http://www.ibm.com/developerworks/cn/linux/l-hisock.html#table1

http://blog.csdn.net/russell_tao/article/details/18711023

我们先来看tcp内存管理相关的几个内核参数,这些都能通过proc文件系统来修改:

1
2
3
4
// 内核写buf的最大值.
extern __u32 sysctl_wmem_max;
// 协议栈读buf的最大值
extern __u32 sysctl_rmem_max;

这两个值在/proc/sys/net/core 下。这里要注意,这两个值的单位是字节。

它们的初始化在sk_init里面,这里可以看到这两个值的大小是依赖于num_physpages的,而这个值应该是物理页数。也就是说这两个值依赖于物理内存:

1
2
3
4
5
6
7
8
9
10
11
12
void __init sk_init(void)
{
	if (num_physpages <= 4096) {
		sysctl_wmem_max = 32767;
		sysctl_rmem_max = 32767;
		sysctl_wmem_default = 32767;
		sysctl_rmem_default = 32767;
	} else if (num_physpages >= 131072) {
		sysctl_wmem_max = 131071;
		sysctl_rmem_max = 131071;
	}
}

而我通过搜索源码,只有设置套接口选项的时候,才会用到这两个值,也就是setsockopt,optname为SO_SNDBUF或者SO_RCVBUF时,来限制设置的值:

1
2
3
case SO_SNDBUF:
		if (val > sysctl_wmem_max)
			val = sysctl_wmem_max;

接下来就是整个tcp协议栈的socket的buf限制(也就是所有的socket). 这里要注意,这个东西的单位都是以页为单位的,我们下面就会看到。

1
2
3
4
其中sysctl_tcp_mem[0]表示整个tcp sock的buf限制.
sysctl_tcp_mem[1]也就是tcp sock内存使用的警戒线.
sysctl_tcp_mem[2]也就是tcp sock内存使用的hard limit,当超过这个限制,我们就要禁止再分配buf.
extern int sysctl_tcp_mem[3];

接下来就是针对每个sock的读写buf限制。

1
2
3
// 其中依次为最小buf,中等buf,以及最大buf.
extern int sysctl_tcp_wmem[3];
extern int sysctl_tcp_rmem[3];

tcp_init

这几个值的初始化在tcp_init里面,这里就能清晰的看到sysctl_tcp_mem的单位是页。而sysctl_tcp_wmem和sysctl_tcp_rmem的单位是字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void __init tcp_init(void)
{
	.................................
	// nr_pages就是页。
	nr_pages = totalram_pages - totalhigh_pages;
	limit = min(nr_pages, 1UL<<(28-PAGE_SHIFT)) >> (20-PAGE_SHIFT);
	limit = (limit * (nr_pages >> (20-PAGE_SHIFT))) >> (PAGE_SHIFT-11);
	limit = max(limit, 128UL);
	sysctl_tcp_mem[0] = limit / 4 * 3;
	sysctl_tcp_mem[1] = limit;
	sysctl_tcp_mem[2] = sysctl_tcp_mem[0] * 2;

	/* Set per-socket limits to no more than 1/128 the pressure threshold */
	// 转换为字节。
	limit = ((unsigned long)sysctl_tcp_mem[1]) << (PAGE_SHIFT - 7);
	max_share = min(4UL*1024*1024, limit);

	sysctl_tcp_wmem[0] = SK_MEM_QUANTUM;
	sysctl_tcp_wmem[1] = 16*1024;
	sysctl_tcp_wmem[2] = max(64*1024, max_share);

	sysctl_tcp_rmem[0] = SK_MEM_QUANTUM;
	sysctl_tcp_rmem[1] = 87380;
	sysctl_tcp_rmem[2] = max(87380, max_share);
	................................
}

然后就是读写buf的最小值

1
2
#define SOCK_MIN_SNDBUF 2048
#define SOCK_MIN_RCVBUF 256

最后就是当前tcp协议栈已经分配了的buf的总大小。这里要注意,这个值也是以页为单位的。

1
atomic_t tcp_memory_allocated

而上面的这些值如何与协议栈关联起来呢,我们来看tcp_prot结构,可以看到这些值的地址都被放到对应的tcp_prot的域。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct proto tcp_prot = {
	.name = "TCP",
	.owner = THIS_MODULE,
	...................................................
	.enter_memory_pressure = tcp_enter_memory_pressure,
	.sockets_allocated = &tcp_sockets_allocated,
	.orphan_count = &tcp_orphan_count,
	.memory_allocated = &tcp_memory_allocated,
	.memory_pressure = &tcp_memory_pressure,
	.sysctl_mem = sysctl_tcp_mem,
	.sysctl_wmem = sysctl_tcp_wmem,
	.sysctl_rmem = sysctl_tcp_rmem,
	........................................................
};

而对应的sock域中的几个值,这几个域非常重要,我们来看他们表示的含义

sk_rcvbuf和sk_sndbuf,这两个值分别代表每个sock的读写buf的最大限制

sk_rmem_alloc和sk_wmem_alloc这两个值分别代表已经提交的数据包的字节数。

读buf意味着进入tcp层的数据大小,而当数据提交给用户空间之后,这个值会相应的减去提交的大小(也就类似写buf的sk_wmem_queued)。

写buf意味着提交给ip层。可以看到这个值的增加是在tcp_transmit_skb中进行的。

而sk_wmem_queued也就代表skb的写队列write_queue的大小。

还有一个sk_forward_alloc,这个值表示一个预分配置,也就是整个tcp协议栈的内存cache,第一次为一个缓冲区分配buf的时候,我们不会直接分配精确的大小,而是按页来分配,而分配的大小就是这个值,下面我们会看到这个。并且这个值初始是0.

1
2
3
4
5
6
7
8
9
10
11
struct sock {
	int sk_rcvbuf;
	atomic_t sk_rmem_alloc;
	atomic_t sk_wmem_alloc;
	int sk_forward_alloc;
	..........................
	int sk_sndbuf;
	// 这个表示写buf已经分配的字节长度
	int sk_wmem_queued;
	...........................
}

sk_sndbuf和sk_rcvbuf,这两个的初始化在这里:

1
2
3
4
5
6
7
static int tcp_v4_init_sock(struct sock *sk)
{
	..................................
	sk->sk_sndbuf = sysctl_tcp_wmem[1];
	sk->sk_rcvbuf = sysctl_tcp_rmem[1];
	..........................
}

而当进入establish状态之后,sock会自己调整sndbuf和rcvbuf.他是通过tcp_init_buffer_space来进行调整的.这个函数会调用tcp_fixup_rcvbuf和tcp_fixup_sndbuf来调整读写buf的大小.

这里有用到sk_userlock这个标记,这个标记主要就是用来标记SO_SNDBUF 和SO_RCVBUF套接口选项是否被设置。而是否设置对应的值为:

1
2
#define SOCK_SNDBUF_LOCK 1
#define SOCK_RCVBUF_LOCK  2

我们可以看下面的设置SO_SNDBUF 和SO_RCVBUF的代码片断:

1
2
3
4
5
6
// 首先设置sk_userlocks.
sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
if ((val * 2) < SOCK_MIN_SNDBUF)
	sk->sk_sndbuf = SOCK_MIN_SNDBUF;
else
	sk->sk_sndbuf = val * 2;

因此内核里面的处理是这样的,如果用户已经通过套接字选项设置了读或者写buf的大小,那么这里将不会调整读写buf的大小,否则就进入tcp_fixup_XXX来调整大小。

还有一个要注意的就是MAX_TCP_HEADER,这个值表示了TCP + IP + link layer headers 以及option的长度。

我们来看代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
static void tcp_init_buffer_space(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	int maxwin;

	// 判断sk_userlocks,来决定是否需要fix缓冲区大小。
	if (!(sk->sk_userlocks & SOCK_RCVBUF_LOCK))
		tcp_fixup_rcvbuf(sk);
	if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK))
		tcp_fixup_sndbuf(sk);
......................................

}

接下来来看这两个函数如何来调整读写buf的大小,不过这里还有些疑问,就是为什么是要和3sndmem以及4rcvmem:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static void tcp_fixup_sndbuf(struct sock *sk)
{
	// 首先通过mss,tcp头,以及sk_buff的大小,得到一个最小范围的sndmem。
	int sndmem = tcp_sk(sk)->rx_opt.mss_clamp + MAX_TCP_HEADER + 16 +sizeof(struct sk_buff);

	// 然后取sysctl_tcp_wmem[2]和3倍的sndmem之间的最小值。
	if (sk->sk_sndbuf < 3 * sndmem)
		sk->sk_sndbuf = min(3 * sndmem, sysctl_tcp_wmem[2]);
}

static void tcp_fixup_rcvbuf(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	// 这里和上面类似,也是先得到最小的一个rcvmem段。
	int rcvmem = tp->advmss + MAX_TCP_HEADER + 16 + sizeof(struct sk_buff);

	/* Try to select rcvbuf so that 4 mss-sized segments
	 * will fit to window and corresponding skbs will fit to our rcvbuf.
	 * (was 3; 4 is minimum to allow fast retransmit to work.)
	 */
	// 这里则是通过sysctl_tcp_adv_win_scale来调整rcvmem的值。
	while (tcp_win_from_space(rcvmem) < tp->advmss)
		rcvmem += 128;
	if (sk->sk_rcvbuf < 4 * rcvmem)
		sk->sk_rcvbuf = min(4 * rcvmem, sysctl_tcp_rmem[2]);
}

ok,看完初始化,我们来看协议栈具体如何管理内存的,先来看发送端,发送端的主要实现是在tcp_sendmsg里面,这个函数我们前面已经详细的分析过了,我们这次只分析里面几个与内存相关的东西。

来看代码片断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int tcp_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
		size_t size)
{
	..................................

	if (copy <= 0) {
new_segment:
		if (!sk_stream_memory_free(sk))
			goto wait_for_sndbuf;

		skb = sk_stream_alloc_skb(sk, select_size(sk),
		sk->sk_allocation);
		if (sk->sk_route_caps & NETIF_F_ALL_CSUM)
			skb->ip_summed = CHECKSUM_PARTIAL;

		skb_entail(sk, skb);
		copy = size_goal;
		max = size_goal;
	..................
}

可以看到这里第一个sk_stream_memory_free用来判断是否还有空间来供我们分配,如果没有则跳到wait_for_sndbuf来等待buf的释放。

然后如果有空间供我们分配,则调用sk_stream_alloc_skb来分配一个skb,然后这个大小的选择是通过select_size。

最后调用skb_entail来更新相关的域。

现在我们就来详细看上面的四个函数,先来看第一个:

1
2
3
4
static inline int sk_stream_memory_free(struct sock *sk)
{
	return sk->sk_wmem_queued < sk->sk_sndbuf;
}

sk_stream_memory_free实现很简单,就是判断当前已经分配的写缓冲区的大小(sk_wmem_queued)是否小于当前写缓冲区(sk_sndbuf)的最大限制。

然后是skb_entail,这个函数主要是当我们分配完buf后,进行一些相关域的更新,以及添加skb到writequeue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static inline void skb_entail(struct sock *sk, struct sk_buff *skb)
{
	struct tcp_sock *tp = tcp_sk(sk);
	struct tcp_skb_cb *tcb = TCP_SKB_CB(skb);
	............................
	skb_header_release(skb);
	tcp_add_write_queue_tail(sk, skb);
	// 增加sk_wmem_queued.
	sk->sk_wmem_queued += skb->truesize;
	// 这里调整sk_forward_alloc的大小,也就是预分配buf的大小(减小).
	sk_mem_charge(sk, skb->truesize);
	if (tp->nonagle & TCP_NAGLE_PUSH)
		tp->nonagle &= ~TCP_NAGLE_PUSH;
}
// 这个函数很简单,就是将sk_forward_alloc - size.
static inline void sk_mem_charge(struct sock *sk, int size)
{
	if (!sk_has_account(sk))
		return;
	sk->sk_forward_alloc -= size;
}

然后是select_size,在看这个之前我们先来坎SKB_MAX_HEAD的实现. SKB_MAX_HEAD主要是得到要分配的tcp数据段(不包括头)在一页中最大为多少。

1
2
3
4
5
#define SKB_WITH_OVERHEAD(X) \
	((X) - SKB_DATA_ALIGN(sizeof(struct skb_shared_info)))
#define SKB_MAX_ORDER(X, ORDER) \
	SKB_WITH_OVERHEAD((PAGE_SIZE << (ORDER)) - (X))
#define SKB_MAX_HEAD(X)    (SKB_MAX_ORDER((X), 0))

我们带入代码来看,我们下面的代码是SKB_MAX_HEAD(MAX_TCP_HEADER),展开这个宏可以看到就是PAGE_SIZE-MAX_TCP_HEADER-SKB_DATA_ALIGN(sizeof(struct skb_shared_info).其实也就是一页还能容纳多少tcp的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static inline int select_size(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	// 首先取得存储的mss。
	int tmp = tp->mss_cache;

	// 然后判断是否使用scatter–gather(前面blog有介绍)
	if (sk->sk_route_caps & NETIF_F_SG) {
		if (sk_can_gso(sk))
			tmp = 0;
		else {
			// 然后开始计算buf的长度。
			int pgbreak = SKB_MAX_HEAD(MAX_TCP_HEADER);

			// 如果mss大于pgbreak,那么说明我们一页放不下当前需要的tcp数据,因此我们将会在skb的页区域分配,而skb的页区域是有限制的,因此tmp必须小于这个值。
			if (tmp >= pgbreak &&
					tmp <= pgbreak + (MAX_SKB_FRAGS - 1) * PAGE_SIZE)
				tmp = pgbreak;
		}
	}

	return tmp;
}

sk_stream_alloc_skb

接下来来看sk_stream_alloc_skb的实现。

1 它会调用alloc_skb_fclone来分配内存,这个函数就不详细分析了,我们只需要知道它会从slab里分配一块内存,而大小为size+max_header(上面的分析我们知道slect_size只计算数据段).

2 如果分配成功,则调用sk_wmem_schedule来判断我们所分配的skb的大小是否精确,是的话,就调整指针,然后返回。

3 否则调用tcp_enter_memory_pressure设置标志进入TCP memory pressure zone。然后再调用sk_stream_moderate_sndbuf调整sndbuf(缩小sndbuf)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct sk_buff *sk_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
{
	struct sk_buff *skb;

	// 4字节对其
	size = ALIGN(size, 4);
	// 分配skb。
	skb = alloc_skb_fclone(size + sk->sk_prot->max_header, gfp);
	if (skb) {
		// 得到精确的大小。
		if (sk_wmem_schedule(sk, skb->truesize)) {
			// 返回skb。
			skb_reserve(skb, skb_tailroom(skb) - size);
				return skb;
		}
		__kfree_skb(skb);
	} else {
		// 否则设置全局标记进入pressure zone
		sk->sk_prot->enter_memory_pressure(sk);
		sk_stream_moderate_sndbuf(sk);
	}
	return NULL;
}

ok,现在就来看上面的几个函数的实现。先来看几个简单的。

首先是tcp_enter_memory_pressure,这个函数很简单,就是判断全局标记tcp_memory_pressure,然后设置这个标记。这个标记主要是用来通知其他模块调整的,比如窗口大小等等,详细的话自己搜索这个值,就知道了。

1
2
3
4
5
6
7
8
void tcp_enter_memory_pressure(struct sock *sk)
{
	if (!tcp_memory_pressure) {
		NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPMEMORYPRESSURES);
		// 设置压力标志。
		tcp_memory_pressure = 1;
	}
}

然后是sk_stream_moderate_sndbuf,这个函数也是要使用sk_userlocks,来判断是否已经被用户设置了。可以看到如果我们自己设置过了snd_buf的话,内核就不会帮我们调整它的大小了。

1
2
3
4
5
6
7
8
static inline void sk_stream_moderate_sndbuf(struct sock *sk)
{
	if (!(sk->sk_userlocks & SOCK_SNDBUF_LOCK)) {
		// 它的大小调整为大于最小值,小于sk->sk_wmem_queued >> 1。
		sk->sk_sndbuf = min(sk->sk_sndbuf, sk->sk_wmem_queued >> 1);
		sk->sk_sndbuf = max(sk->sk_sndbuf, SOCK_MIN_SNDBUF);
	}
}

sk_wmem_schedule

最后来看最核心的一个函数sk_wmem_schedule,这个函数只是对__sk_mem_schedule的简单封装。这里要知道传递进来的size是skb->truesize,也就是所分配的skb的真实大小。并且第一次进入这个函数,也就是分配第一个缓冲区包时,sk_forward_alloc是为0的,也就是说,第一次必然会执行__sk_mem_schedule函数。

1
2
3
4
5
6
7
8
static inline int sk_wmem_schedule(struct sock *sk, int size)
{
	if (!sk_has_account(sk))
		return 1;
	// 先比较size(也就是skb->truesize)和预分配的内存大小。如果小于等于预分配的大小,则直接返回,否则调用__sk_mem_schedule进行调整。
	return size <= sk->sk_forward_alloc ||
		__sk_mem_schedule(sk, size, SK_MEM_SEND);
}

来看__sk_mem_schedule,这个函数的功能注释写的很清楚:

increase sk_forward_alloc and memory_allocated

然后来看源码。这里在看之前,我们要知道,协议栈通过读写buf的使用量,划分了3个区域,或者说标志。不同标志进行不同处理。这里的区域的划分是通过sysctl_tcp_mem,也就是prot->sysctl_mem这个数组进行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// 页的大小
#define SK_MEM_QUANTUM ((int)PAGE_SIZE)

int __sk_mem_schedule(struct sock *sk, int size, int kind)
{
	struct proto *prot = sk->sk_prot;
	// 首先得到size占用几个内存页。
	int amt = sk_mem_pages(size);
	int allocated;
	// 更新sk_forward_alloc,可以看到这个值是页的大小的倍数。
	sk->sk_forward_alloc += amt * SK_MEM_QUANTUM;

	// amt+memory_allocated也就是当前的总得内存使用量加上将要分配的内存的话,现在的tcp协议栈的总得内存使用量。(可以看到是以页为单位的。
	allocated = atomic_add_return(amt, prot->memory_allocated);

	// 然后开始判断,将会落入哪一个区域。通过上面的分析我们知道sysctl_mem也就是sysctl_tcp_mem.

	// 先判断是否小于等于内存最小使用限额。
	if (allocated <= prot->sysctl_mem[0]) {
		// 这里取消memory_pressure,然后返回。
		if (prot->memory_pressure && *prot->memory_pressure)
			*prot->memory_pressure = 0;
		return 1;
	}

	// 然后判断Under pressure。
	if (allocated > prot->sysctl_mem[1])
		// 大于sysctl_mem[1]说明,已经进入pressure,一次你需要调用tcp_enter_memory_pressure来设置标志。
		if (prot->enter_memory_pressure)
			prot->enter_memory_pressure(sk);

	// 如果超过的hard limit。则进入另外的处理。
	if (allocated > prot->sysctl_mem[2])
		goto suppress_allocation;

	// 判断类型,这里只有两种类型,读和写。总的内存大小判断完,这里开始判断单独的sock的读写内存。
	if (kind == SK_MEM_RECV) {
		if (atomic_read(&sk->sk_rmem_alloc) < prot->sysctl_rmem[0])
			return 1;
	} else { /* SK_MEM_SEND */
		// 这里当为tcp的时候,写队列的大小只有当对端数据确认后才会更新,因此我们要用sk_wmem_queued来判断。
		if (sk->sk_type == SOCK_STREAM) {
			if (sk->sk_wmem_queued < prot->sysctl_wmem[0])
				return 1;
		} else if (atomic_read(&sk->sk_wmem_alloc) <
			   prot->sysctl_wmem[0])
				return 1;
	}

	// 程序到达这里说明总的内存大小在sysctl_mem[0]和sysctl_mem[2]之间,因此我们再次判断memory_pressure
	if (prot->memory_pressure) {
		int alloc;

		// 如果没有在memory_pressure区域,则我们直接返回1。
		if (!*prot->memory_pressure)
			return 1;
		// 这个其实也就是计算整个系统分配的socket的多少。
		alloc = percpu_counter_read_positive(prot->sockets_allocated);
		// 这里假设其余的每个sock所占用的buf都和当前的sock一样大的时候,如果他们的总和小于sysctl_mem[2],也就是hard limit。那么我们也认为这次内存请求是成功的。
		if (prot->sysctl_mem[2] > alloc *
			sk_mem_pages(sk->sk_wmem_queued +
			 atomic_read(&sk->sk_rmem_alloc) +
				 sk->sk_forward_alloc))
			return 1;
	}

suppress_allocation:

	// 到达这里说明,我们超过了hard limit或者说处于presure 区域。
	if (kind == SK_MEM_SEND && sk->sk_type == SOCK_STREAM) {
		// 调整sk_sndbuf(减小).这个函数前面已经分析过了。
		sk_stream_moderate_sndbuf(sk);
		// 然后比较和sk_sndbuf的大小,如果大于的话,就说明下次我们再次要分配buf的时候会在tcp_memory_free阻塞住,因此这次我们返回1.
		if (sk->sk_wmem_queued + size >= sk->sk_sndbuf)
			return 1;
	}

	/* Alas. Undo changes. */
	// 到达这里说明,请求内存是不被接受的,因此undo所有的操作。然后返回0.
	sk->sk_forward_alloc -= amt * SK_MEM_QUANTUM;
	atomic_sub(amt, prot->memory_allocated);
	return 0;
}

接下来来看个很重要的函数skb_set_owner_w。

顾名思义,这个函数也就是将一个skb和scok关联起来。只不过关联的时候更新sock相应的域。我们来看源码:

1
2
3
4
5
6
7
8
9
10
static inline void skb_set_owner_w(struct sk_buff *skb, struct sock *sk)
{
	skb_orphan(skb);
	// 与传递进来的sock关联起来
	skb->sk = sk;
	// 设置skb的析构函数
	skb->destructor = sock_wfree;
	// 更新sk_wmem_alloc域,就是sk_wmem_alloc+truesize.
	atomic_add(skb->truesize, &sk->sk_wmem_alloc);
}

ok,接下来来看个scok_wfree函数,这个函数做得基本和上面函数相反。这个函数都是被kfree_skb自动调用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
void sock_wfree(struct sk_buff *skb)
{
	struct sock *sk = skb->sk;
	int res;

	// 更新sk_wmem_alloc,减去skb的大小。
	res = atomic_sub_return(skb->truesize, &sk->sk_wmem_alloc);
	if (!sock_flag(sk, SOCK_USE_WRITE_QUEUE))
	// 唤醒等待队列,也就是唤醒等待内存分配。
		sk->sk_write_space(sk);
	if (res == 0)
		__sk_free(sk);
}

而skb_set_owner_w是什么时候被调用呢,我们通过搜索代码可以看到,它是在tcp_transmit_skb中被调用的。而tcp_transmit_skb我们知道是传递数据包到ip层的函数。

而kfree_skb被调用也就是在对端已经确认完我们发送的包后才会被调用来释放skb。

tcp_rcv_established

接下来来看接收数据的内存管理。我们主要来看tcp_rcv_established这个函数,我前面的blog已经断断续续的分析过了,因此这里我们只看一些重要的代码片断。

这里我们要知道,代码能到达下面的位置,则说明,数据并没有直接拷贝到用户空间。否则的话,是不会进入下面的片断的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
if (!eaten) {
	..........................................

	// 如果skb的大小大于预分配的值,如果大于则要另外处理。
	if ((int)skb->truesize > sk->sk_forward_alloc)
			goto step5;
	__skb_pull(skb, tcp_header_len);
	__skb_queue_tail(&sk->sk_receive_queue, skb);
	// 这里关联skb和对应的sk,并且更新相关的域,我们下面会分析这个函数。
	skb_set_owner_r(skb, sk);
	tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
}
...............................................

step5:
	if (th->ack && tcp_ack(sk, skb, FLAG_SLOWPATH) < 0)
		goto discard;

	tcp_rcv_rtt_measure_ts(sk, skb);

	/* Process urgent data. */
	tcp_urg(sk, skb, th);

	/* step 7: process the segment text */
	// 最核心的函数就是这个。我们接下来会详细分析这个函数。
	tcp_data_queue(sk, skb);

	tcp_data_snd_check(sk);
	tcp_ack_snd_check(sk);
	return 0;

先来看skb_set_owner_r函数,这个函数关联skb和sk其实它和skb_set_owner_w类似:

1
2
3
4
5
6
7
8
9
10
11
12
static inline void skb_set_owner_r(struct sk_buff *skb, struct sock *sk)
{
	skb_orphan(skb);
	// 关联sk
	skb->sk = sk;
	// 设置析构函数
	skb->destructor = sock_rfree;
	// 更新rmem_alloc
	atomic_add(skb->truesize, &sk->sk_rmem_alloc);
	// 改变forward_alloc.
	sk_mem_charge(sk, skb->truesize);
}

tcp_data_queue

然后是tcp_data_queue,这个函数主要用来排队接收数据,并update相关的读buf。由于这个函数比较复杂,我们只关心我们感兴趣的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static void tcp_data_queue(struct sock *sk, struct sk_buff *skb)
{
	struct tcphdr *th = tcp_hdr(skb);
	struct tcp_sock *tp = tcp_sk(sk);
	int eaten = -1;
	.......................................
	// 首先判断skb的开始序列号和我们想要接收的序列号。如果相等开始处理这个数据包(也就是拷贝到用户空间).
	if (TCP_SKB_CB(skb)->seq == tp->rcv_nxt) {
		if (tcp_receive_window(tp) == 0)
			goto out_of_window;

		// tp的ucopy我前面的blog已经详细分析过了。这里就不解释了。
		if (tp->ucopy.task == current &&
			tp->copied_seq == tp->rcv_nxt && tp->ucopy.len &&sock_owned_by_user(sk) && !tp->urg_data)
		{
			// 计算将要拷贝给用户空间的大小。
			int chunk = min_t(unsigned int, skb->len,tp->ucopy.len);

			// 设置状态,说明我们处于进程上下文。
			__set_current_state(TASK_RUNNING);

			local_bh_enable();
			// 拷贝skb
			if (!skb_copy_datagram_iovec(skb, 0, tp->ucopy.iov, chunk)) {
				tp->ucopy.len -= chunk;
				tp->copied_seq += chunk;
				// 更新eaten,它的默认值为-1.
				eaten = (chunk == skb->len && !th->fin);
				tcp_rcv_space_adjust(sk);
			}
			local_bh_disable();
		}

		// 如果小于0则说明没有拷贝成功,或者说就没有进行拷贝。此时需要更新sock的相关域。
		if (eaten <= 0) {
queue_and_out:
			// 最关键的tcp_try_rmem_schedule函数。接下来会详细分析。
			if (eaten < 0 &&
		          tcp_try_rmem_schedule(sk, skb->truesize))
				goto drop;

			// 关联skb和sk。到达这里说明tcp_try_rmem_schedule成功,也就是返回0.
			skb_set_owner_r(skb, sk);
			// 加skb到receive_queue.
			__skb_queue_tail(&sk->sk_receive_queue, skb);
		}
		// 更新期待序列号。
		tp->rcv_nxt = TCP_SKB_CB(skb)->end_seq;
		..............................................

		.....................................

		tcp_fast_path_check(sk);

		if (eaten > 0)
			__kfree_skb(skb);
		else if (!sock_flag(sk, SOCK_DEAD))
			sk->sk_data_ready(sk, 0);
		return;
	}
	// 下面就是处理乱序包。以后会详细分析。
	......................................
}

tcp_try_rmem_schedule

接下来我们就来看tcp_try_rmem_schedule这个函数,这个函数如果返回0则说明sk_rmem_schedule返回1,而sk_rmem_schedule和sk_wmem_schedule是一样的。也就是看当前的skb加入后有没有超过读buf的限制。并更新相关的域。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static inline int tcp_try_rmem_schedule(struct sock *sk, unsigned int size)
{
	// 首先判断rmem_alloc(当前的读buf字节数)是否大于最大buf字节数,如果大于则调用tcp_prune_queue调整分配的buf。否则调用sk_rmem_schedule来调整相关域(sk_forward_alloc)。
	if (atomic_read(&sk->sk_rmem_alloc) > sk->sk_rcvbuf ||!sk_rmem_schedule(sk, size)) {

		// 调整分配的buf。
		if (tcp_prune_queue(sk) < 0)
			return -1;
		// 更新sk的相关域。
		if (!sk_rmem_schedule(sk, size)) {
			if (!tcp_prune_ofo_queue(sk))
				return -1;

			if (!sk_rmem_schedule(sk, size))
				return -1;
		}
	}
	return 0;
}

来看sk_rmem_schedule,这个函数很简单,就是封装了__sk_mem_schedule。而这个函数我们上面已经分析过了。

1
2
3
4
5
6
7
static inline int sk_rmem_schedule(struct sock *sk, int size)
{
	if (!sk_has_account(sk))
		return 1;
	return size <= sk->sk_forward_alloc ||
		__sk_mem_schedule(sk, size, SK_MEM_RECV);
}

tcp_prune_queue

最后是tcp_prune_queue,这个函数主要是用来丢掉一些skb,因为到这个函数就说明我们的内存使用已经到极限了,因此我们要合并一些buf。这个合并也就是将序列号连续的段进行合并。

这里我们要知道tcp的包是有序的,因此内核中tcp专门有一个队列来保存那些Out of order segments。因此我们这里会先处理这个队列里面的skb。

然后调用tcp_collapse来处理接收队列里面的skb。和上面的类似。

这里要注意,合并的话都是按页来合并,也就是先分配一页大小的内存,然后将老的skb复制进去,最后free掉老的buf。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static int tcp_prune_queue(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	..................................
	// 如果rmem_alloc过于大,则重新计算窗口的大小。一半都会缩小窗口。
	if (atomic_read(&sk->sk_rmem_alloc) >= sk->sk_rcvbuf)
		tcp_clamp_window(sk);
	// 如果处于pressure区域,则调整窗口大小。这里也是缩小窗口。
	else if (tcp_memory_pressure)
		tp->rcv_ssthresh = min(tp->rcv_ssthresh, 4U * tp->advmss);

	// 处理ofo队列。
	tcp_collapse_ofo_queue(sk);
	// 如果接收队列为非空,则调用tcp_collapse来处理sk_receive_queue
	if (!skb_queue_empty(&sk->sk_receive_queue))
		tcp_collapse(sk, &sk->sk_receive_queue,
				 skb_peek(&sk->sk_receive_queue),
				 NULL,
				 tp->copied_seq, tp->rcv_nxt);
	// 更新全局的已分配内存的大小,也就是memory_allocated,接下来会详细介绍这个函数。
	sk_mem_reclaim(sk);

	// 如果调整后小于sk_rcvbuf,则返回0.
	if (atomic_read(&sk->sk_rmem_alloc) <= sk->sk_rcvbuf)
		return 0;

	......................................
	return -1;
}

tcp_collapse_ofo_queue 尝试减小ofo queue占内存的大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/* Collapse ofo queue. Algorithm: select contiguous sequence of skbs
 * and tcp_collapse() them until all the queue is collapsed.
 */
static void tcp_collapse_ofo_queue(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	struct sk_buff *skb = skb_peek(&tp->out_of_order_queue);
	struct sk_buff *head;
	u32 start, end;

	if (skb == NULL)
		return;

	start = TCP_SKB_CB(skb)->seq;
	end = TCP_SKB_CB(skb)->end_seq;
	head = skb;

	for (;;) {
		struct sk_buff *next = NULL;

		if (!skb_queue_is_last(&tp->out_of_order_queue, skb))
			next = skb_queue_next(&tp->out_of_order_queue, skb);
		skb = next;

		/* Segment is terminated when we see gap or when
		 * we are at the end of all the queue. */
		if (!skb ||
			after(TCP_SKB_CB(skb)->seq, end) ||
			before(TCP_SKB_CB(skb)->end_seq, start)) {  // 找到ofo queue中连续的一段skb,即 prev->end_seq >= next->seq
			tcp_collapse(sk, &tp->out_of_order_queue,
					 head, skb, start, end);            // 尝试减小这一段连续skb占用的内存
			head = skb;
			if (!skb)
				break;
			/* Start new segment */
			start = TCP_SKB_CB(skb)->seq;               // 下个skb就是新的一段的开始
			end = TCP_SKB_CB(skb)->end_seq;
		} else {
			if (before(TCP_SKB_CB(skb)->seq, start))    // 这种情况只可能是tcp_collapse中大包拆成小包,拆到一半内存不够,没拆完导致。
				start = TCP_SKB_CB(skb)->seq;
			if (after(TCP_SKB_CB(skb)->end_seq, end))
				end = TCP_SKB_CB(skb)->end_seq;
		}
	}
}

tcp_collapse,gro上来的包有可能是大于4k的包,所以这个函数有时是在拆包,利弊难定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// 删除一个skb,返回下个skb
static struct sk_buff *tcp_collapse_one(struct sock *sk, struct sk_buff *skb,
					struct sk_buff_head *list)
{
	struct sk_buff *next = NULL;

	if (!skb_queue_is_last(list, skb))
		next = skb_queue_next(list, skb);

	__skb_unlink(skb, list);
	__kfree_skb(skb);
	NET_INC_STATS_BH(sock_net(sk), LINUX_MIB_TCPRCVCOLLAPSED);

	return next;
}

/* Collapse contiguous sequence of skbs head..tail with
 * sequence numbers start..end.
 *
 * If tail is NULL, this means until the end of the list.
 *
 * Segments with FIN/SYN are not collapsed (only because this
 * simplifies code)
 */
static void
tcp_collapse(struct sock *sk, struct sk_buff_head *list,
		 struct sk_buff *head, struct sk_buff *tail,
		 u32 start, u32 end)
{
	struct sk_buff *skb, *n;
	bool end_of_skbs;

	/* First, check that queue is collapsible and find
	 * the point where collapsing can be useful. */
	skb = head;
restart:
	end_of_skbs = true;
	skb_queue_walk_from_safe(list, skb, n) {
		if (skb == tail)
			break;
		/* No new bits? It is possible on ofo queue. */
		if (!before(start, TCP_SKB_CB(skb)->end_seq)) { // 这种情况现在是不会出现的,以前代码有可能出现??
			skb = tcp_collapse_one(sk, skb, list);
			if (!skb)
				break;
			goto restart;
		}

		/* The first skb to collapse is:
		 * - not SYN/FIN and
		 * - bloated or contains data before "start" or
		 *   overlaps to the next one.
		 */
		if (!tcp_hdr(skb)->syn && !tcp_hdr(skb)->fin &&         // SYN,FIN 不合并,简化操作
			(tcp_win_from_space(skb->truesize) > skb->len ||    // 合并后可能减小空间的情况才合并
			 before(TCP_SKB_CB(skb)->seq, start))) {            // seq到start的数据已经被读走了,有减小空间的可能
			end_of_skbs = false;
			break;
		}

		if (!skb_queue_is_last(list, skb)) {
			struct sk_buff *next = skb_queue_next(list, skb);
			if (next != tail &&
				TCP_SKB_CB(skb)->end_seq != TCP_SKB_CB(next)->seq) { // 两个skb之间有交集,有减小空间可能
				end_of_skbs = false;
				break;
			}
		}

		/* Decided to skip this, advance start seq. */
		start = TCP_SKB_CB(skb)->end_seq;     // 否则向后继续找可能减小空间的第一个skb
	}
	if (end_of_skbs || tcp_hdr(skb)->syn || tcp_hdr(skb)->fin)
		return;

	while (before(start, end)) {  // 落在在start到end的包就是这次要合并的
		struct sk_buff *nskb;
		unsigned int header = skb_headroom(skb); // skb中协议头的大小
		int copy = SKB_MAX_ORDER(header, 0);     // 一个页(4k)中出去协议头空间的大小,也就是能容下的数据大小

		/* Too big header? This can happen with IPv6. */
		if (copy < 0)
			return;
		if (end - start < copy)
			copy = end - start;
		nskb = alloc_skb(copy + header, GFP_ATOMIC);
		if (!nskb)
			return;

		skb_set_mac_header(nskb, skb_mac_header(skb) - skb->head);
		skb_set_network_header(nskb, (skb_network_header(skb) -
						  skb->head));
		skb_set_transport_header(nskb, (skb_transport_header(skb) -
						skb->head));
		skb_reserve(nskb, header);
		memcpy(nskb->head, skb->head, header);
		memcpy(nskb->cb, skb->cb, sizeof(skb->cb));
		TCP_SKB_CB(nskb)->seq = TCP_SKB_CB(nskb)->end_seq = start;
		__skb_queue_before(list, skb, nskb);
		skb_set_owner_r(nskb, sk);

		/* Copy data, releasing collapsed skbs. */
		while (copy > 0) {    // 如果copy = 0,这里就会出BUG,但如果没有认为改,是不会的。ipv6会吗???。后面版本改进这函数了,也不会出现copy=0了
			int offset = start - TCP_SKB_CB(skb)->seq;
			int size = TCP_SKB_CB(skb)->end_seq - start;

			BUG_ON(offset < 0);
			if (size > 0) { // copy旧的skb数据到新的skb上
				size = min(copy, size);
				if (skb_copy_bits(skb, offset, skb_put(nskb, size), size))
					BUG();
				TCP_SKB_CB(nskb)->end_seq += size;
				copy -= size;
				start += size;
			}
			if (!before(start, TCP_SKB_CB(skb)->end_seq)) { // 旧的skb被copy完了就删掉
				skb = tcp_collapse_one(sk, skb, list);
				if (!skb ||
					skb == tail ||
					tcp_hdr(skb)->syn ||
					tcp_hdr(skb)->fin)
					return;
			}
		}
	}
}

来看sk_mem_reclaim函数,它只是简单的封装了__sk_mem_reclaim

1
2
3
4
5
6
7
8
static inline void sk_mem_reclaim(struct sock *sk)
{
	if (!sk_has_account(sk))
		return;
	// 如果sk_forward_alloc大于1页则调用__sk_mem_reclaim,我们知道sk_forward_alloc是以页为单位的,因此这里也就是和大于0一样。
	if (sk->sk_forward_alloc >= SK_MEM_QUANTUM)
		__sk_mem_reclaim(sk);
}

__sk_mem_reclaim就是真正操作的函数,它会更新memory_allocated:

1
2
3
4
5
6
7
8
9
10
11
12
void __sk_mem_reclaim(struct sock *sk)
{
	struct proto *prot = sk->sk_prot;
	// 更新memory_allocated,这里我们知道memory_allocated也是以页为单位的,因此需要将sk_forward_alloc转化为页。
	atomic_sub(sk->sk_forward_alloc >> SK_MEM_QUANTUM_SHIFT,prot->memory_allocated);

	// 更新这个sk的sk_forward_alloc为一页。
	sk->sk_forward_alloc &= SK_MEM_QUANTUM - 1;
	// 判断是否处于pressure区域,是的话更新memory_pressure变量。
	if (prot->memory_pressure && *prot->memory_pressure &&(atomic_read(prot->memory_allocated) < (prot->sysctl_mem[0]))
		*prot->memory_pressure = 0;
}

最后看一下读buf的释放。这个函数会在kfree_skb中被调用。

1
2
3
4
5
6
7
8
9
void sock_rfree(struct sk_buff *skb)
{

	struct sock *sk = skb->sk;
	// 更新rmem_alloc
	atomic_sub(skb->truesize, &sk->sk_rmem_alloc);
	// 更新forward_alloc.
	sk_mem_uncharge(skb->sk, skb->truesize);
}

TREE RCU实现

http://blog.csdn.net/junguo/article/details/8258231

http://blog.csdn.net/junguo/article/details/8258261

http://blog.csdn.net/junguo/article/details/8268277


TREE RCU实现之一 —— 数据结构

代码分布

在分析代码之前, 先看看代码的分布情况。RCU实现的代码包含在下列一些文件中,此处用到的是linux 3.6.4的代码。

< include/linux/rcupdate.h > RCU实现的头文件,所有使用RCU的代码都需要包含它
< include/rcutree.h > 包含rcupdate.h中没有包含的函数声明。
< include/rcutiny.h > 包含rcupdate.h中没有包含的函数声明。
< kernel/rcupdate.c > 包括一些RCU实现的基础函数的实现。
< kernel/rcutree.h > 包含Tree RCU用到的结构信息,TREE_RCU将所有的CPU组织成一颗树,通过层次结构来判别进程是否通过了宽限期,这种方式适用于多个CPU的系统。
< kernel/rcutree.c > 包含Tree RCU的主要实现代码。
< kernel/rcutree_plugin.h > 其实也是TREE RCU实现的一部分。主要包含了抢入式TreeRCU的代码。适用于抢入式的系统,抢入式的系统适用于需要低延迟的桌面或者嵌入式系统。
< kernel/rcutiny.c > Tiny RCU的主要实现代码,TINY_RCU适用于单个CPU,尤其是嵌入式操作系统。
< kernel/rcutiny_plugin.h > 主要包含了抢入式Tiny RCU的代码
< kernel/rcu.h > 定义了debug的接口,实现了__rcu_reclaim
< kernel/rcutorture.c> 对RCU进行稳定性测试的代码,通过配置CONFIG_RCU_TORTURE_TEST,可以在系统启动的时候运行稳定性测试。
< kernel/rcutree_trace.c> 通过配置CONFIG_RCU_TRACE,可以记录RCU的运行信息。
< include/trace/events/rcu.h> 为rcutree_trace.c定义的头文件。

RCU处理的基本流程

RCU实现的关键集中在宽限期的处理上,这个过程需要保证销毁对象前,当前系统中所有CPU上运行的进程都通过了静止状态(quiescent state)。

1, 程序调用call_rcu,将要删除的对象保存起来。并标记或者开始一个宽限期(同一时间只能运行一个宽限期,所以当已经有宽限期在运行的时候,其它的宽限期必须等待)。

2, 在读取数据开始和结尾处增加 rcu_read_lock 和 rcu_read_unlock来标记读过程。为了保证删除过程知道读过程的结束,在非抢占式RCU实现中是在rcu_read_lock开始处禁止进程抢占。这样做就可以保证再运行下一次进程切换的时候,读过程已经结束。其实系统也不会去统计各个CPU上是否存在过读线程,所以所有的CPU都会在进程切换的时候通知系统它处于进制状态。当所有的CPU都通过静止状态的时候,系统就会标记它通过了一个宽限期。

3,由于一个宽限期结束的时候,只有最后一个通过静止状态的CPU知道当前的宽限期已经结束,它并不会去通知其它CPU;同时出于性能考虑,系统也不会在宽限期结束后,马上去执行销毁过程。所以每个CPU都有一个固定的函数去检测是否有等待执行的宽限期,如果没有特别紧急的任务时,会去执行这些过程。

接下来,要分析Tree RCU的实现,先来看看它提供的一些接口函数。

1, call_rcu 与 synchronize_rcu都是删除对象时调用的函数。call_rcu将数据提交后会返回,而synchronize_rcu会调用call_rcu,并一直等待对象被删除后才返回。还有call_rcu_bh与synchronize_rcu_bh等接口函数,会在后续讲述。

2,rcu_read_lock 和 rcu_read_unlock

<linux/rcuupdate.h>

1
2
3
4
5
6
7
8
9
static inline void __rcu_read_lock(void)
{
	preempt_disable();
}

static inline void __rcu_read_unlock(void)
{
	preempt_enable();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static inline void rcu_read_lock(void)
{
	__rcu_read_lock();
	__acquire(RCU);
	rcu_lock_acquire(&rcu_lock_map);
	rcu_lockdep_assert(!rcu_is_cpu_idle(),
			"rcu_read_lock() used illegally while idle");
}
static inline void rcu_read_unlock(void)
{
	rcu_lockdep_assert(!rcu_is_cpu_idle(),
			"rcu_read_unlock() used illegally while idle");
	rcu_lock_release(&rcu_lock_map);
	__release(RCU);
	__rcu_read_unlock();
}

rcu_read_lock与rcu_read_unlock在非抢占式下的实现比较简单就是 preempt_disable与preempt_enable。这样做的目的是当调用schedule的时候,就可以肯定读的过程已经结束。其它_acquire(RCU)等函数是调试用的代码,暂不做讨论。

3, rcu_note_context_switch 在schedule中调用,每次进程切换就代表着一个静止状态。该函数会把当前的CPU状态设置为通过状态。

4, rcu_check_callbacks 在每次时钟周期里调用(update_process_times)。通过它会触发软件中断,软件中断对应着rcu_process_callbacks,这是一个真正繁忙的函数,他会检测当前CPU的状态,向父节点传递静止状态信息,调用注册函数等一系列工作。

在进一步了解这些函数之前,我们先来看看你Tree RCU的结构。

TREE RCU简介

在统计CPU的状态的时候,需要用到一个结构来存放所有CPU的状态。在早期的实现中,所有的状态都保存在一个结构中,这样做的后果是所有的CPU在更新自己状态的时候,都需要锁定该结构对象,一定程度上影响了系统性能。为了提高性能,把一定数目的CPU组成了一个节点(默认设定64个CPU为一个节点);当节点超过64个的时候,再把这些节点按64为单位划分为归属不同的父节点;如此类推,最后的一个单独的节点作为根节点。这样在更新CPU状态的时候,只需要锁定自己所属的节点就可以了。按节点设置的数目,可见这个结构只对CPU数成百上千的系统才真正起作用(我都没见过超过32个cpu的机器,不知道是啥样的感觉)。

这样所有的CPU就按层级结构组织了起来,也就是一个树结构。当一个系统的CPU数少于64个的时候,只要一个rcu_node就可以。

每个CPU在完成宽限期检测的时候,就会去更新它所属的rcu_node的值,当一个rcu_node所包含的CPU的状态都更新过以后,该node就会去更新它所属的父节点的值。直到最后一个根节点。

TREE RCU数据结构

为了实现该结构,系统提供了以下结构。

rcu_data

由于RCU需要统计每个CPU是否通过了宽限期,提供了rcu_data来保存信息。另外每个销毁的对象并不是直接删除,也保存在rcu_data中,等到合适的时机来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
struct rcu_data {  
	/* 1) 静止状态和宽限期处理: */  
	unsigned long   completed;      /* 对比 rsp->completed */  
									/* 目的是检测宽限期是否完成. */  
	unsigned long   gpnum;          /* 当前CPU上最高的宽限期数目*/  
									/* 在宽限期开始的时候设置. */  
	unsigned long   passed_quiesce_gpnum;  
									/* 已经通过的宽限期数目. */  
	bool            passed_quiesce; /* 是否通过了静止状态,在进程切换等状态会设置. */  
	bool            qs_pending;     /* 对于当前执行的宽限期,该CPU是否执行完成. */  
	bool            beenonline;     /* CPU是否在线,不在线的CPU需要特殊处理,以提高性能*/  
	bool            preemptible;    /* 是否抢占式RCU? */  
	struct rcu_node *mynode;        /* 这个CPU对应的 rcu_node */  
	unsigned long grpmask;          /* 占用1bit,对应与所属的rcu_node. */  
#ifdef CONFIG_RCU_CPU_STALL_INFO  
	unsigned long   ticks_this_gp;  /* The number of scheduling-clock */  
									/*  ticks this CPU has handled */  
									/*  during and after the last grace */  
									/* period it is aware of. */  
#endif /* #ifdef CONFIG_RCU_CPU_STALL_INFO */  
	/* 2) 批处理*/  
	/* 
	 * 
	 * 当nxtlist不为空的时候,会通过nxttail划分为以下几部分 
	 * 每一个部分为空的时候,它的指针会被设置成与它的下一部分相同 
	 * 当nxtlist为空的时候,所有的nxttail都会指向nxtlist的地址,这时候nxtlist指向NULL 
	 * 
	 * [nxtlist, *nxttail[RCU_DONE_TAIL]): 
	 *    批处理的开始节点# <= ->completed 
	 *    这些节点的宽限期已经完成,可以执行销毁操作。 
	 *    当调用rcu_process_callbacks()的时候,下一批完成宽限期的节点也会放到这儿. 
	 * [*nxttail[RCU_DONE_TAIL], *nxttail[RCU_WAIT_TAIL]): 
	 *    批处理的开始节点 # <= ->completed - 1: 等待当前的批处理完成 
	 * [*nxttail[RCU_WAIT_TAIL], *nxttail[RCU_NEXT_READY_TAIL]): 
	 *    已知的当下次宽限期开始,可以开始等待的节点。 
	 * [*nxttail[RCU_NEXT_READY_TAIL], *nxttail[RCU_NEXT_TAIL]): 
	 *    当前不确定下次宽限期开始后,是否可以开始等待状态的节点。 
	 *    *nxttail[RCU_NEXT_TAIL] 的值将永远是NULL, 
	 *    它表示nxtlist的结束. 
	 * 
	 */  
	struct rcu_head *nxtlist;  
	struct rcu_head **nxttail[RCU_NEXT_SIZE];  
	long            qlen_lazy;      /* # kfree_rcu调用的次数,kfee_rcu等同于call_rcu,只是它不需要销毁的对象提供销毁函数*/  
	long            qlen;           /* # 当前需要执行销毁操作的次数,每次call_rcu会加一,执行过后减一*/  
	long            qlen_last_fqs_check;  
									/* 对应与qlen,最后一次执行的次数*/  
	unsigned long   n_cbs_invoked;  /* 执行销毁操作的次数. */  
	unsigned long   n_cbs_orphaned; /* 统计离线后CPU上剩下的callback函数的个数 */  
	unsigned long   n_cbs_adopted;  /* 从离线后的CPU上移出的callback函数的个数 */  
	unsigned long   n_force_qs_snap;  
									/* 其它CPU是否在执行fore_qs? */  
	long            blimit;         /* nxtlist保存的上限 */  

	/* 3) 动态时钟,*/  
	struct rcu_dynticks *dynticks;  /* 每个CPU都包含一个动态时钟. */  
	int dynticks_snap;              /* 用于检测CPU是否在线. */  

	/* 4) 强制执行时候处理的CPU */  
	unsigned long dynticks_fqs;     /* 由于进入dynticks idle而被处理的CPU. */  
	unsigned long offline_fqs;      /* 由于不在在线被处理的CPU. */  

	/* 5) __rcu_pending() 的统计信息,这些信息都是在记录调用信息的时候使用. */  
	unsigned long n_rcu_pending;    /* rcu_pending() 调用次数,自从启动. */  
	unsigned long n_rp_qs_pending;  
	unsigned long n_rp_report_qs;  
	unsigned long n_rp_cb_ready;  
	unsigned long n_rp_cpu_needs_gp;  
	unsigned long n_rp_gp_completed;  
	unsigned long n_rp_gp_started;  
	unsigned long n_rp_need_fqs;  
	unsigned long n_rp_need_nothing;  

	/* 6) _rcu_barrier() 的回调函数. */  
	struct rcu_head barrier_head;  

	int cpu;  
	struct rcu_state *rsp;  
};  

1,completed ,gpnum , passed_quiesce_gpnum

gpnum表示当前正在运行的宽限期的个数,每当一个宽限期开始的时候,会设置这个值与其父节点相同。passed_quiesce_gpnum为当前CPU通过的宽限期个数,它的值在宽限期开始的时候小于gpnum,当这个CPU经过一个静止状态的时候,会把它设置成gpnum的值,通过对比它与父节点中的gpnum是否相同,可以确定该CPU是否通过了宽限期。passed_quiesce_gpnum只是表示这个CPU通过了宽限期,而completed表示所有的CPU都通过了宽限期,设置该值的同时,可以将nxtlist中等待的回调函数移动到完成队列。

2,nxtlist 与nxttail

nxtlist保存的是指向rcu_head对象,rcu_head的定义如下:

1
2
3
4
5
struct callback_head {
	struct callback_head *next;
	void (*func)(struct callback_head *head);
};
#define rcu_head callback_head

rcu_head的结构并不复杂,它包含一个回调函数指针。而next可以把rcu_head连成一个列表。

nxtlist指向一个rcu_head 列表,而nxttail的四个元素是指向指针的指针,它们指向的是rcu_head对象的next。RCU_DONE_TAIL指向的rcu_head对象之前的对象是可以销毁的对象。RCU_WAIT_TAIL指向的正在等待宽限期的元素,RCU_NEXT_READ_TAIL指向的是等待下次宽限期的元素,RCU_NEXT_TAIL指向最后一个元素,这个元素总是指向NULL。

rcu_node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
struct rcu_node {  
	raw_spinlock_t lock;    /* rcu_node的锁,用来保护以下的一些成员*/  

	unsigned long gpnum;    /* 该节点当前的宽限期的数量 */  
							/* 该值等于或者比父节点的值小1*/  
	unsigned long completed; /* 该节点完成的宽限期数量*/  
							 /* 该值等于或者比父节点的值小1*/  
	unsigned long qsmask;   /* 标记这个节点对应的所有CPU或者子节点是否完成了当前的宽限期*/  
							/* 每一个bit对应一个cpu或者一个子节点.*/  
	unsigned long expmask;  /* 需要执行 ->blkd_tasks 的元素 */                              
							/*  (应用于TREE_PREEMPT_RCU). */  
	atomic_t wakemask;      /* 需要唤醒kthread的CPU. */  
							  
	unsigned long qsmaskinit;  
							/* 每个宽限期开始时,用它来初始化qsmask,不存在或者不在线的CPU需要清除. */  
	unsigned long grpmask;  /* 对应于父节点中的位置. */  
							/* 只是用一bit. */  
	int     grplo;          /* 该节点代表的CPU或者子节点开始的位置. */  
	int     grphi;          /* 该节点代表的CPU或者子节点结束的位置. */  
	u8      grpnum;         /* 下一级的CPU或者子节点的个数. */  
	u8      level;          /* 跟节点是 0. */  
	struct rcu_node *parent;  
	struct list_head blkd_tasks;  
							/* 阻断读关键段的任务列表 */  
							/*  */  
				 
	struct list_head *gp_tasks;  
							/* 指向第一个阻断读关键段的任务 */  
							  
							  
	struct list_head *exp_tasks;  

	/*以下为抢先式下加速RCU过程的变量*/

#ifdef CONFIG_RCU_BOOST  
	struct list_head *boost_tasks;  
							/* Pointer to first task that needs to be */  
							/*  priority boosted, or NULL if no priority */  
							/*  boosting is needed for this rcu_node */  
							/*  structure.  If there are no tasks */  
							/*  queued on this rcu_node structure that */  
							/*  are blocking the current grace period, */  
							/*  there can be no such task. */  
	unsigned long boost_time;  
							/* When to start boosting (jiffies). */  
	struct task_struct *boost_kthread_task;  
							/* kthread that takes care of priority */  
							/*  boosting for this rcu_node structure. */  
	unsigned int boost_kthread_status;  
							/* State of boost_kthread_task for tracing. */  
	unsigned long n_tasks_boosted;  
							/* Total number of tasks boosted. */  
	unsigned long n_exp_boosts;  
							/* Number of tasks boosted for expedited GP. */  
	unsigned long n_normal_boosts;  
							/* Number of tasks boosted for normal GP. */  
	unsigned long n_balk_blkd_tasks;  
							/* Refused to boost: no blocked tasks. */  
	unsigned long n_balk_exp_gp_tasks;  
							/* Refused to boost: nothing blocking GP. */  
	unsigned long n_balk_boost_tasks;  
							/* Refused to boost: already boosting. */  
	unsigned long n_balk_notblocked;  
							/* Refused to boost: RCU RS CS still running. */  
	unsigned long n_balk_notyet;  
							/* Refused to boost: not yet time. */  
	unsigned long n_balk_nos;  
							/* Refused to boost: not sure why, though. */  
							/*  This can happen due to race conditions. */  
#endif /* #ifdef CONFIG_RCU_BOOST */  
	struct task_struct *node_kthread_task;  
							/* kthread that takes care of this rcu_node */  
							/*  structure, for example, awakening the */  
							/*  per-CPU kthreads as needed. */  
	unsigned int node_kthread_status;  
							/* State of node_kthread_task for tracing. */  
} ____cacheline_internodealigned_in_smp;  

每个rcu_node代表着 一组CPU或者子节点。在非抢占式下,它的结构并不复杂。由于可能有多个CPU对它进行处理,所有进行相应操作的时候,需要lock保护。

rcu_state
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
struct rcu_state {  
	struct rcu_node node[NUM_RCU_NODES];    /* 保存了所有的节点. */  
	struct rcu_node *level[RCU_NUM_LVLS];   /* 每个层级所指向的节点. */  
	u32 levelcnt[MAX_RCU_LVLS + 1];         /* # 每一层的节点数. */  
	u8 levelspread[RCU_NUM_LVLS];           /* 每一层的CPU/节点数. */  
	struct rcu_data __percpu *rda;          /* 指向rcu_data. */  
	void (*call)(struct rcu_head *head,     /* rcu_barrier指向的回调函数. */  
				 void (*func)(struct rcu_head *head));  

	/* The following fields are guarded by the root rcu_node's lock. */  

	u8      fqs_state ____cacheline_internodealigned_in_smp;  
									      /* 调用force_quiescent_state时的状态. */  
	u8      fqs_active;                     /* force_quiescent_state() 正在运行*/  
									        
	u8      fqs_need_gp;                    /* 因为 force_quiescent_state() 正在运行*/  
									      /* 一个CPU需要运行的宽限期被阻止*/  

	u8      boost;                          /* 加速. */  
	unsigned long gpnum;                    /* 当前的宽限起数量. */  
	unsigned long completed;                /* # 最后一次完成的宽限期数量. */  

	/* 以下的成员被根rcu_node的lock保护. */  

	raw_spinlock_t onofflock;               /* 开始一个新的宽限期的时候,阻止CPU上下线*/  
									        
	struct rcu_head *orphan_nxtlist;        /* 等待宽限期的孤儿回调函数的列表 */  
									        
	struct rcu_head **orphan_nxttail;       /* 以上列表的结尾. */  
	struct rcu_head *orphan_donelist;       /* 需要执行的孤儿回调函数列表 */  
									        
	struct rcu_head **orphan_donetail;      /* 以上列表的结尾. */  
	long qlen_lazy;                         /* 懒惰回调函数的个数. */  
	long qlen;                              /* 总的回调函数的个数. */  
	struct task_struct *rcu_barrier_in_progress;  
									      /* 调用rcu_barrier()的进程, */  
									      /* 没有的话指向NULL. */  
	struct mutex barrier_mutex;             /* 执行barrier需要的互斥锁. */  
	atomic_t barrier_cpu_count;             /* # 等待barrier的CPU数 . */  
	struct completion barrier_completion;   /* 在barrier结束的时候调用. */  
	unsigned long n_barrier_done;           /* 在_rcu_barrier()开始结束处都需要调用++ */  
									        
	raw_spinlock_t fqslock;                 /* 只有一个进程能调用 force_quiescent_state().*/  
									        
	unsigned long jiffies_force_qs;         /* force_quiescent_state()开始的时间 */  
									        
	unsigned long n_force_qs;               /* 调用force_quiescent_state()的次数 */  
									        
	unsigned long n_force_qs_lh;            /* 因为lock不可用,而退出force_quiescent_state()的次数 */  
									        
	unsigned long n_force_qs_ngp;           /* 因为当前有宽限期执行,而退出force_quiescent_state()的次数*/  
									        
	unsigned long gp_start;                 /* 宽限期开始的时间*/  
									        
	unsigned long jiffies_stall;              
									     
	unsigned long gp_max;                   /*  最长的宽限的jiffie数 */  
									        
	char *name;                             /* 结构的名字. */  
	struct list_head flavors;               /* 系统中的rcu_state. */  
};  

rcu_state 保存了所有的node,宽限期的判断只要取出根节点,也就是第一个元素就可以。还有一些初始化要用到的变量。还有孤儿回调函数用于处理离线CPU遗留的信息。剩下还有很多统计信息,这些内容在讲解代码实现的时候再仔细考虑。


TREE RCU实现之二 —— 主干函数

RCU的实现集中在以下几个步骤:
1, 调用call_rcu,将回调函数增加到列表。
2, 开始一个宽限期。
3, 每个CPU报告自己的状态,直到最后一个CPU,结束一个宽限期。
4, 宽限期结束,每个CPU处理自己的回调函数。

call_rcu的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static void  
__call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),  
	   struct rcu_state *rsp, bool lazy)  
{  
	unsigned long flags;  
	struct rcu_data *rdp;  

	WARN_ON_ONCE((unsigned long)head & 0x3); /* 检测head在内存中是否对齐! */  
	debug_rcu_head_queue(head);  
	head->func = func;  
	head->next = NULL;  

	smp_mb(); /* Ensure RCU update seen before callback registry. */  

	/* 
	 * 这是一个检测宽限期开始或者结束的机会。 
	 * 当我们看到一个结束的时候,可能还会看到一个开始。 
	 * 反过来,看到一个开始的时候,不一定能看到一个结束, 
	 * 因为宽限期结束需要一定时间。 
	 */  
	local_irq_save(flags);  
	rdp = this_cpu_ptr(rsp->rda);  

	/* 将要增加callback到nxtlist. */  
	ACCESS_ONCE(rdp->qlen)++;  
	if (lazy)  
		rdp->qlen_lazy++;  
	else  
		rcu_idle_count_callbacks_posted();  
	smp_mb();  /* Count before adding callback for rcu_barrier(). */  
	*rdp->nxttail[RCU_NEXT_TAIL] = head;  
	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;  

	if (__is_kfree_rcu_offset((unsigned long)func))  
		trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,  
									 rdp->qlen_lazy, rdp->qlen);  
	else  
		trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);  

	/* 去处理rcu_core。 */  
	__call_rcu_core(rsp, rdp, head, flags);  
	local_irq_restore(flags);  
}  

call_rcu中最主要的工作,就是将回调函数加入到CPU的nxtlist列表。这里用到了指针处理的小技巧,我们来看看。首先看看nxttail的初始化:

1
2
3
4
5
6
7
8
static void init_callback_list(struct rcu_data *rdp)  
{  
	int i;  

	rdp->nxtlist = NULL;  
	for (i = 0; i < RCU_NEXT_SIZE; i++)  
		rdp->nxttail[i] = &rdp->nxtlist;  
}  

我们看到nxttail的全部成员都指向了nxtlist的地址。当nxtlist为空的时候,也是这个情形。

1
*rdp->nxttail[RCU_NEXT_TAIL] = head;       

当nxtlist为空的时候, *rdp->nxttail[RCU_NEXT_TAIL] 得到的其实就是nxtlist,将head的值赋予它。

1
rdp->nxttail[RCU_NEXT_TAIL] = &head->next;

之后 RCU_NEXT_TAIL指向 head的next指针。这样当再有一个节点加入的时候,*rdp->nxttail[RCU_NEXT_TAIL]得到的其实就是前一次加入的head的next指针,它将指向新加入的值。如此,nxtlist就成为了一个链表。或者这样理解,rdp->nxttail[RCU_NEXT_TAIL] 指向的就是nxtlist中最后一个节点的 next指针。

除了将回调函数插入,该函数其它代码多为检查代码。而最后要调用__call_rcu_core,该函数的功用主要是在回调函数太多或者等待时间过长的状态下,强制执行RCU状态更新。我们暂时不关注。

开始一个宽限期

在一个宽限期结束,或者当一个CPU检测到自身有需要一个宽限期的时候会开始一个新的宽限期,开始宽限期的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
static void  
rcu_start_gp(struct rcu_state *rsp, unsigned long flags)  
	__releases(rcu_get_root(rsp)->lock)  
{  
	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);  
	struct rcu_node *rnp = rcu_get_root(rsp);  

	if (!rcu_scheduler_fully_active ||  
			!cpu_needs_another_gp(rsp, rdp)) {  
		/* 
		 * 如果scheduler 还没有启动non-idle任务 
		 * 或者不需要启动一个新的宽限期则退出。 
		 * 需要再次判断cpu_needs_another_gp, 
		 * 是因为可能有多个CPU执行这个过程。 
		 */  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
		return;  
	}  

	if (rsp->fqs_active) {  
		/* 
		 * 这个CPU需要一个宽限期,而force_quiescent_state() 
		 * 正在运行,告诉它开始一个。 
		 */  
		rsp->fqs_need_gp = 1;  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
		return;  
	}  

	/* 开始一个新的宽限期并且初始化。 */  
	rsp->gpnum++;  
	trace_rcu_grace_period(rsp->name, rsp->gpnum, "start");  
	WARN_ON_ONCE(rsp->fqs_state == RCU_GP_INIT);  
	rsp->fqs_state = RCU_GP_INIT; /* 阻止 force_quiescent_state。 */  
	rsp->jiffies_force_qs = jiffies + RCU_JIFFIES_TILL_FORCE_QS;  
	record_gp_stall_check_time(rsp);  
	raw_spin_unlock(&rnp->lock);  /* leave irqs disabled. */  

	/* 排除CPU的热插拔。*/  
	raw_spin_lock(&rsp->onofflock);  /* irqs already disabled. */  

	/* 
	 * 从父节点开始以广度优先的方式,遍历所有的节点,设置qsmask的值, 
	 * 所有在线CPU所在bit都将被设置成1。 
	 * 通过遍历rsp->node[]数组就可以达到这个目的。 
	 * 其它CPU在自己所属的节点还没有被设置前,只有可能访问这个节点, 
	 * 因为它所作的判断是宽限期还没有开始。 
	 * 此外,我们排除了CPU热插拔。 
	 *  
	 * 直到初始化过程完成之前,这个宽限期不可能完成,因为至少当前的 
	 * CPU所属的bit将不会被设置。这个是因为我们启动了禁止中断,所以 
	 * 这个CPU不会调用到宽限期检测代码。 
	 */  
	rcu_for_each_node_breadth_first(rsp, rnp) {  
		raw_spin_lock(&rnp->lock);      /* irqs already disabled. */  
		rcu_preempt_check_blocked_tasks(rnp);  
		rnp->qsmask = rnp->qsmaskinit;  
		rnp->gpnum = rsp->gpnum;  
		rnp->completed = rsp->completed;  
		if (rnp == rdp->mynode)  
			rcu_start_gp_per_cpu(rsp, rnp, rdp);  
		rcu_preempt_boost_start_gp(rnp);  
		trace_rcu_grace_period_init(rsp->name, rnp->gpnum,  
							rnp->level, rnp->grplo,  
							rnp->grphi, rnp->qsmask);  
		raw_spin_unlock(&rnp->lock);    /* irqs remain disabled. */  
	}

	rnp = rcu_get_root(rsp);  
	raw_spin_lock(&rnp->lock);              /* irqs already disabled. */  
	rsp->fqs_state = RCU_SIGNAL_INIT; /* force_quiescent_state now OK. */  
	raw_spin_unlock(&rnp->lock);            /* irqs remain disabled. */  
	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);  
}  

标记一个新的宽限期开始,rcu_state要做的就是将gp_num加1。然后再设置所有node,qsmask被设置成qsmasinit,qsmask每个bit代表一个CPU,所有在线的CPU都将被设置成1;gpnum将被设置成新值。嗯,一个新宽限期的开始只需要设置这些标记位。

CPU的宽限期检测

当一个宽限期开始后,每个CPU都需要检测自己的状态,如果已经通过静止状态,那么就向上一级node进行报告。

这个处理过程,可以分为两个步骤:
1, 检测新的处理过程开始,设置rcu_data中的gpnum和passed_quiesce,另外用qs_pending标记一个待处理的新宽限期的开始。
2, 一个静止状态结束,向上一级node报告这个过程。

这两个过程通过rcu_check_quiescent_state()来实现,需要注意的是这个函数隔一段时间调用一次,并不只调用一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* 
* 检测这个CPU是否还不知道一个新宽限期开始,如果是设置它的变量。 
* 否则检查它是不是第一次通过静止状态,如果是,向上报告。 
*/  
static void  
rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	/* 如果有新的宽限期开始,记录它并返回。*/  
	if (check_for_new_grace_period(rsp, rdp))  
		return;  

	/* 
	 * 这个CPU是否已经处理过它的宽限期?如果是返回。 
	 */  
	if (!rdp->qs_pending)  
		return;  

	/* 
	 * 是否通过了静止状态?如果没有,返回。 
	 */  
	if (!rdp->passed_quiesce)  
		return;  

	/* 
	 * 向所属的node报告。(但rcu_report_qs_rdp() 仍然会去判断它)。 
	 */  
	rcu_report_qs_rdp(rdp->cpu, rsp, rdp, rdp->passed_quiesce_gpnum);  
}  

A, CPU检测新宽限期的开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/*  
 * 为当前CPU,更新rcu_data的状态,去标记一个新宽限期的开始 
 * 如果当前CPU启动了一个宽限期或者检测到一个新的宽限期开始, 
 * 都需要调用这个函数。这个过程必须锁定父节点的lock,另外需 
 * 要禁止中断 
 */  
static void __note_new_gpnum(struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp)  
{  
	if (rdp->gpnum != rnp->gpnum) {  
		/* 
		 * 如果当前的宽限期需要处理这个CPU的状态,设置并 
		 * 去检测它的静止状态。否则不要去管它。 
		 */          
		rdp->gpnum = rnp->gpnum;  
		trace_rcu_grace_period(rsp->name, rdp->gpnum, "cpustart");  
		if (rnp->qsmask & rdp->grpmask) {  
			rdp->qs_pending = 1;  
			rdp->passed_quiesce = 0;  
		} else {  
			rdp->qs_pending = 0;  
		}  
		zero_cpu_stall_ticks(rdp);  
	}  
}  

static void note_new_gpnum(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	unsigned long flags;  
	struct rcu_node *rnp;  

	local_irq_save(flags);  
	rnp = rdp->mynode;  
	if (rdp->gpnum == ACCESS_ONCE(rnp->gpnum) || /* outside lock. */  
			!raw_spin_trylock(&rnp->lock)) { /* irqs already off, so later. */  
		local_irq_restore(flags);  
		return;  
	}  
	__note_new_gpnum(rsp, rnp, rdp);  
	raw_spin_unlock_irqrestore(&rnp->lock, flags);  
}  

/* 
 * 在我们的上次检测之后,其它CPU启动了一个新的宽限期? 
 * 如果是更新相应的rcu_data的状态。 
 * 必须是在rdp对应的CPU上执行。 
 */  
static int  
check_for_new_grace_period(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	unsigned long flags;  
	int ret = 0;  

	local_irq_save(flags);  
	if (rdp->gpnum != rsp->gpnum) {  
		note_new_gpnum(rsp, rdp);  
		ret = 1;  
	}  
	local_irq_restore(flags);  
	return ret;  
}  

check_for_new_grace_period 和 note_new_gpnum分别用来检测rdp的gpnum与rsp已经对应的rnp的值是否相同,来确定是否有一个新的宽限期开始。之所以需要检测两次,是因为在rsp设置以后,rnp可能并没有设置完成。

__note_new_gpnum 将设置gpnum的值。另外设置 qs_pending为1,该标记位代表该节点还没有向父节点报告自己的状态;passed_quiesce为0,表示需要一个静止状态,设置该位是因为下次调用rcu_check_quiescent_state()可能是在一个读过程还没有结束的时候。

qs_pending的状态有可能为0,这只在以下情形下出现:当前CPU在宽限期开始的时候实在离线状态,而现在变成了在线。

我们注意到在 check_for_new_grace_period检测到有新的宽限期开始后,rcu_check_quiescent_state将直接返回,因为这个宽限期可能是在该CPU的上一个静止状态之前已经开始,所以需要等待下一个静止状态。

B,CPU报告静止状态

当再一次调用到rcu_check_quiescent_state()的时候,check_for_new_grace_period()将返回FALSE,接着运行后面的函数来判断 qs_pending 和 passed_quiesce 的值来决定是否调用rcu_report_qs_rdp。需要判断qs_peding是因为当这次rcu_report_qs_rdp调用成功的时候,下次再运行rcu_check_quiescent_state()则不需要继续运行后续函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static void  
rcu_report_qs_rdp(int cpu, struct rcu_state *rsp, struct rcu_data *rdp, long lastgp)  
{  
	unsigned long flags;  
	unsigned long mask;  
	struct rcu_node *rnp;  

	rnp = rdp->mynode;  
	raw_spin_lock_irqsave(&rnp->lock, flags);  
	if (lastgp != rnp->gpnum || rnp->completed == rnp->gpnum) {  
		/* 
		 * 如果宽限期的处理已经完成,那么返回。 
		 */          
		rdp->passed_quiesce = 0; /* need qs for new gp. */  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
		return;  
	}  
	mask = rdp->grpmask;  
	if ((rnp->qsmask & mask) == 0) {  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
	} else {  
		rdp->qs_pending = 0;  
		/* 
		 *  可以确定这个宽限期还没有结束,所以可以确定当前CPU上的 
		 *  所有回调函数可以在下次宽限期结束后处理。 
		 */  
		rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];  

		rcu_report_qs_rnp(mask, rsp, rnp, flags); /* rlses rnp->lock */  
	}  
}  

从我看来,这个函数只会调用到最后一个else分支,而之前的连个if分支都不会调用到。因为在调用该函数前,代码已经做了必要的检测。

以此来看,这个函数的功用就是设置qs_pending的值,阻止这次宽限期没有完成之前再次调用掉该函数;设置nxttail,决定下次宽限期后可以执行的回调函数;然后向父节点报告静止状态完成。

C,向上报告
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static void  
rcu_report_qs_rnp(unsigned long mask, struct rcu_state *rsp,  
	  struct rcu_node *rnp, unsigned long flags)  
	__releases(rnp->lock)  
{  
	struct rcu_node *rnp_c;  

	/* 向上遍历所有层级 */  
	for (;;) {  
		if (!(rnp->qsmask & mask)) {  
			/* 这个CPU的标记已经被清除,证明已经处理过了,返回 */  
			raw_spin_unlock_irqrestore(&rnp->lock, flags);  
			return;  
		}  
		rnp->qsmask &= ~mask;  
		trace_rcu_quiescent_state_report(rsp->name, rnp->gpnum,  
					 mask, rnp->qsmask, rnp->level,  
					 rnp->grplo, rnp->grphi,  
					 !!rnp->gp_tasks);  
		if (rnp->qsmask != 0 || rcu_preempt_blocked_readers_cgp(rnp)) {  
			/* 这个节点中还有其它CPU没有处理完成,那么返回 */  
			raw_spin_unlock_irqrestore(&rnp->lock, flags);  
			return;  
		}  
		mask = rnp->grpmask;  
		if (rnp->parent == NULL) {  
			/* 到这儿,已经到了根节点 */  
			break;  
		}  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
		rnp_c = rnp;  
		rnp = rnp->parent;  
		raw_spin_lock_irqsave(&rnp->lock, flags);  
		WARN_ON_ONCE(rnp_c->qsmask);  
	}  
	/* 
	 *  程序运行到这儿,说明所有的CPU都通过了宽限期, 
	 *  那么调用rcu_report_qs_rsp()来结束这个宽限期。 
	 */   
	rcu_report_qs_rsp(rsp, flags); /* releases rnp->lock. */  
}  

这个过程并不复杂,清理rnp中qsmask对应该CPU的bit。然后判断该节点是否处理完成,如果是则继续向上调用,否则就退出函数。最后一个CPU调用后,可以调用到rcu_report_qs_rsp()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static void rcu_report_qs_rsp(struct rcu_state *rsp, unsigned long flags)  
	__releases(rcu_get_root(rsp)->lock)  
{  
	unsigned long gp_duration;  
	struct rcu_node *rnp = rcu_get_root(rsp);  
	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);  

	WARN_ON_ONCE(!rcu_gp_in_progress(rsp));  

	/* 
	 * Ensure that all grace-period and pre-grace-period activity 
	 * is seen before the assignment to rsp->completed. 
	 */  
	smp_mb(); /* See above block comment. */  
	gp_duration = jiffies - rsp->gp_start;  
	if (gp_duration > rsp->gp_max)  
		rsp->gp_max = gp_duration;  

	/* 
	 * 当前CPU知道宽限期已经结束,不过其它CPU都认为它还在运行。 
	 * 由于completed还没有设置,其它CPU都不会对父node进行处理。 
	 * 所以这时候将各个node标记为完成是安全的。 
	 *  
	 * 不过当前CPU有等待下一次宽限期的回调函数的时候,我们会 
	 * 先去处理下一个宽限期。 
	 * 这儿使用RCU_WAIT_TAIL代替了RCU_DONE_TAIL,这是因为当前 
	 * CPU还没有进一步处理完成状态,当前RCU_WAIT_TAIL状态的元 
	 * 素其实在这次宽限期结束后,已经可以执行了。 
	 *  
	 */  
	if (*rdp->nxttail[RCU_WAIT_TAIL] == NULL) {  
		raw_spin_unlock(&rnp->lock);  /* irqs remain disabled. */  

		/* 
		 * 设置 rnp->completed的值,避免这个过程要等到下一次宽限期开始。          
		 */  
		rcu_for_each_node_breadth_first(rsp, rnp) {  
			raw_spin_lock(&rnp->lock); /* irqs already disabled. */  
			rnp->completed = rsp->gpnum;  
			raw_spin_unlock(&rnp->lock); /* irqs remain disabled. */  
		}  
		rnp = rcu_get_root(rsp);  
		raw_spin_lock(&rnp->lock); /* irqs already disabled. */  
	}  

	rsp->completed = rsp->gpnum;  /* Declare the grace period complete. */  
	trace_rcu_grace_period(rsp->name, rsp->completed, "end");  
	rsp->fqs_state = RCU_GP_IDLE;  
	rcu_start_gp(rsp, flags);  /* releases root node's rnp->lock. */  
}  

这个过程最主要的内容就是设置rsp->completed的值,中间多了对node的处理。因为在rcu_start_gp中也会对node进行处理,当前CPU无法判断其它CPU是否需要一个宽限期,但它自身还有等待宽限期的回调函数的时候,它确定会有一个新的宽限期马上开始,所以忽略这个过程。

CPU的宽限期结束处理

这个过程也可以分为两个步骤,第一步是检查宽限期是否结束,第二步是调用已完成的回调函数。

A, CPU检测宽限期的结束

每个CPU都会定期检查当前的宽限期是否结束,如果结束将处理自身状态已经nxtlist表。rcu_process_gp_end就是用来做这个事情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void  
rcu_process_gp_end(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	unsigned long flags;  
	struct rcu_node *rnp;  

	local_irq_save(flags);  
	rnp = rdp->mynode;  
	if (rdp->completed == ACCESS_ONCE(rnp->completed) || /* outside lock. */  
			!raw_spin_trylock(&rnp->lock)) { /* irqs already off, so later. */  
		local_irq_restore(flags);  
		return;  
	}  
	__rcu_process_gp_end(rsp, rnp, rdp);  
	raw_spin_unlock_irqrestore(&rnp->lock, flags);  
}  

当 rdp->completed与rnp->completed的值不同的时候,会调用__rcu_process_gp_end来完成具体的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static void  
__rcu_process_gp_end(struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp)  
{  
	/* 之前的宽限期是否完成? */  
	if (rdp->completed != rnp->completed) {  

		/* 推进回调函数,即使是NULL指针也没关系。 */  
		rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[RCU_WAIT_TAIL];  
		rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_READY_TAIL];  
		rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];  

		/* 更新completed。 */  
		rdp->completed = rnp->completed;  
		trace_rcu_grace_period(rsp->name, rdp->gpnum, "cpuend");  

		/* 
	   * 如果当前的CPU在外部的静止的状态(如离线状态), 
		 * 可能已经错过了其它CPU发起的宽限期。所以需要更 
		 * 新gpnum的值,同时要注意不要错过当前正在运行的 
		 * 宽限期,所以它的值被设置成与rnp->completed相同, 
		 * 此时rnp->gpnum 可以已经加1,那么后续的调用 
		 * rcu_check_quiescent_state()会去检测新的宽限期。 
		 */       
		if (ULONG_CMP_LT(rdp->gpnum, rdp->completed))  
			rdp->gpnum = rdp->completed;  

		/* 
		 * 如果下次的宽限期不需要当前CPU报告静止状态, 
		 * 设置qs_pending为0。 
		 */  
		if ((rnp->qsmask & rdp->grpmask) == 0)  
			rdp->qs_pending = 0;  
	}  
}  

这个过程的重点是设置nxttail的值,将根据它来进行下一步的处理。

B,回调函数的调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	unsigned long flags;  
	struct rcu_head *next, *list, **tail;  
	int bl, count, count_lazy, i;  

	/* 没有回调函数,那么返回。*/  
	if (!cpu_has_callbacks_ready_to_invoke(rdp)) {  
		trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, 0);  
		trace_rcu_batch_end(rsp->name, 0, !!ACCESS_ONCE(rdp->nxtlist),  
				need_resched(), is_idle_task(current),  
				rcu_is_callbacks_kthread());  
		return;  
	}  

	/* 
	 * 提取回调函数的list,需要禁用中断,以防止调用call_rcu()。  
	 */   
	local_irq_save(flags);  
	WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));  
	bl = rdp->blimit;  
	trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, bl);  
	list = rdp->nxtlist;  
	/*  
	 * 已经将list指向了nxtlist,此时将nxtlist指向 *rdp->nxttail[RCU_DONE_TAIL]。 
	 * 由于nxttail指向的是 rcu_head中的next指针的地址,所以此处得到的就是next所 
	 * 指向的rcu_head对象。 
	 */  
	rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];  
	/*将*rdp->nxttail[RCU_DONE_TAIL]指向NULL,也就是将list中的最后一个元素的next设置成NULL*/  
	*rdp->nxttail[RCU_DONE_TAIL] = NULL;  
	/*tail指向list最后一个元素的next指针的地址*/  
	tail = rdp->nxttail[RCU_DONE_TAIL];  
	/*此时rdp->nxttail[RCU_DONE_TAIL]指向的内容已经移出,所以让它重新指向nxtlist的地址*/  
	for (i = RCU_NEXT_SIZE - 1; i >= 0; i--)  
	if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])  
		rdp->nxttail[i] = &rdp->nxtlist;  
	local_irq_restore(flags);  

	/* 调用回调函数 */  
	count = count_lazy = 0;  
	while (list) {  
		next = list->next;  
		prefetch(next);  
		debug_rcu_head_unqueue(list);  
		if (__rcu_reclaim(rsp->name, list))  
			count_lazy++;  
		list = next;  
		/* 当已经全部运行完毕或者CPU有更重要的事情的时候,退出循环。 */  
		if (++count >= bl &&  
				(need_resched() ||  
				(!is_idle_task(current) && !rcu_is_callbacks_kthread())))  
			break;  
	}  

	local_irq_save(flags);  
	trace_rcu_batch_end(rsp->name, count, !!list, need_resched(),  
			is_idle_task(current),  
			rcu_is_callbacks_kthread());  

	/* 更新数量。并将没有执行完的回调函数重新放进列表。 */  
	if (list != NULL) {  
	*tail = rdp->nxtlist;  
	rdp->nxtlist = list;  
	for (i = 0; i < RCU_NEXT_SIZE; i++)  
		if (&rdp->nxtlist == rdp->nxttail[i])  
			rdp->nxttail[i] = tail;  
		else  
			break;  
	}  
	smp_mb(); /* 为了 rcu_barrier()统计运行过的回调函数 */  
	rdp->qlen_lazy -= count_lazy;  
	ACCESS_ONCE(rdp->qlen) -= count;  
	rdp->n_cbs_invoked += count;  

	/* Reinstate batch limit if we have worked down the excess. */  
	if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)  
		rdp->blimit = blimit;  

	/* Reset ->qlen_last_fqs_check trigger if enough CBs have drained. */  
	if (rdp->qlen == 0 && rdp->qlen_last_fqs_check != 0) {  
		rdp->qlen_last_fqs_check = 0;  
		rdp->n_force_qs_snap = rsp->n_force_qs;  
	} else if (rdp->qlen < rdp->qlen_last_fqs_check - qhimark)  
		rdp->qlen_last_fqs_check = rdp->qlen;  
	WARN_ON_ONCE((rdp->nxtlist == NULL) != (rdp->qlen == 0));  

	local_irq_restore(flags);  

	/* 如果还有回调函数没有执行,通知再次调用软中断 */  
	if (cpu_has_callbacks_ready_to_invoke(rdp))  
		invoke_rcu_core();  
}  

rcu_do_batch主要作用是取出nxtlist中,nxttail[RCU_DONE_TAIL]之前的元素,遍历执行它们。这时候销毁过程真正的执行了。这段函数需要仔细想想nxttail的处理。

到此RCU中涉及到的主干函数介绍完了,但是还需要与进程切换等过程交互。将在下节分析它们。


TREE RCU实现之三 —— 定期调用

上一节,介绍过了RCU实现中用到的主要函数。不过还需要定期的运行这些函数,整个机制才完整。

RCU的实现是通过在update_process_times() 中调用rcu_check_callbacks()来达到这个目的的。每个CPU都会定期的调用update_process_times()。rcu_check_callbacks()会去检查当前的RCU机制中是否有需要处理的内容,如当前CPU需要开启一个新的宽限期,当前CPU上的宽限期还没有处理完成。如果有需要处理的内容,将触发一个软件中断,真正的操作由软件中断触发的rcu_process_callbacks()来完成。

rcu_check_callbacks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void rcu_check_callbacks(int cpu, int user)  
{  
	trace_rcu_utilization("Start scheduler-tick");  
	increment_cpu_stall_ticks();  
	if (user || rcu_is_cpu_rrupt_from_idle()) {  
		 /* 
		  * 如果是从用户模式或者是idle模式调用该函数, 
		  * 那么这个CPU是静止状态。 
		  *  
		  * 此处不需要内存屏障。因为rcu_sched_qs()和 
		  * and rcu_bh_qs()支处理CPU自身的局部变量, 
		  * 其它CPU不会访问和修改,至少当CPU在线的时候。 
		  *  
		  */                  
		  rcu_sched_qs(cpu);  
		  rcu_bh_qs(cpu);          
	} else if (!in_softirq()) {                  
		 /* 
		  * 运行到这儿,如果不是软件中断。如果当前CPU上运行的 
		  * 软中断的读过程,肯定已经完成,所以标记它。 
		  * 
		  */                 
		 rcu_bh_qs(cpu);  
	}  
	rcu_preempt_check_callbacks(cpu); /*抢先式下的检测*/  
	if (rcu_pending(cpu))  
		invoke_rcu_core();  
	trace_rcu_utilization("End scheduler-tick");  
}  

该函数的主要功能是通过 rcu_pending()判断是否当前有需要处理的rcu内容,如果有调用invoke_rcu_core()。

1
2
3
4
5
6
7
8
9
static int rcu_pending(int cpu)  
{  
	struct rcu_state *rsp;  

	for_each_rcu_flavor(rsp)  
	if (__rcu_pending(rsp, per_cpu_ptr(rsp->rda, cpu)))  
		return 1;  
	return 0;  
}  

rcu_pending会循环所有的rcu_state,在非抢占式模式下,有rcu_sched_state 和rcu_bh_state 两个实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	struct rcu_node *rnp = rdp->mynode;  

	rdp->n_rcu_pending++;  

	/* Check for CPU stalls, if enabled. */  
	check_cpu_stall(rsp, rdp);  

	/*  是否宽限期在等待这个CPU去完成静止状态呢?  */  
	if (rcu_scheduler_fully_active &&  
			rdp->qs_pending && !rdp->passed_quiesce) {  

		/* 
		 * 如果force_quiescent_state() 需要马上执行,而这个CPU 
		 * 需要一个静止状态,强制执行本地进程切换。       
		 */  
		rdp->n_rp_qs_pending++;  
		if (!rdp->preemptible &&  
			ULONG_CMP_LT(ACCESS_ONCE(rsp->jiffies_force_qs) - 1,  
			 jiffies))  
		set_need_resched();  
	} else if (rdp->qs_pending && rdp->passed_quiesce) {  
		rdp->n_rp_report_qs++;  
		return 1;  
	}  

	/* 这个CPU是否有callbacks等着调用? */  
	if (cpu_has_callbacks_ready_to_invoke(rdp)) {  
		rdp->n_rp_cb_ready++;  
		return 1;  
	}  

	/* 当前CPU有需要执行的宽限期,而没有其它的宽限期在执行?  */  
	if (cpu_needs_another_gp(rsp, rdp)) {  
		rdp->n_rp_cpu_needs_gp++;  
		return 1;  
	}  

	/* 另一个CPU上执行的宽限期结束?   */  
	if (ACCESS_ONCE(rnp->completed) != rdp->completed) { /* outside lock */  
			rdp->n_rp_gp_completed++;  
		return 1;  
	}  

	/* 有新的RCU开始? */  
	if (ACCESS_ONCE(rnp->gpnum) != rdp->gpnum) { /* outside lock */  
			rdp->n_rp_gp_started++;  
		return 1;  
	}  

	/* 一个宽限期运行了太长时间,需要强制执行? */  
	if (rcu_gp_in_progress(rsp) &&  
			ULONG_CMP_LT(ACCESS_ONCE(rsp->jiffies_force_qs), jiffies)) {  
		rdp->n_rp_need_fqs++;  
		return 1;  
	}  

	/* 无事可做 */  
	rdp->n_rp_need_nothing++;  
	return 0;  
}  

__rcu_pending 判断了可能存在的各种情形,如果有需要处理的工作的话,就返回1,否则返回0。

1
2
3
4
5
6
7
8
9
static void invoke_rcu_core(void)  
{  
	raise_softirq(RCU_SOFTIRQ);  
}  


 invoke_rcu_core()的作用是开启软中断。在初始化的时候,系统已经注册了软中断。

open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
1
2
3
4
5
6
7
8
9
static void rcu_process_callbacks(struct softirq_action *unused)  
{  
	struct rcu_state *rsp;  

	trace_rcu_utilization("Start RCU core");  
	for_each_rcu_flavor(rsp)  
	__rcu_process_callbacks(rsp);  
	trace_rcu_utilization("End RCU core");  
}  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static void  
__rcu_process_callbacks(struct rcu_state *rsp)  
{  
	unsigned long flags;  
	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);  

	WARN_ON_ONCE(rdp->beenonline == 0);  

	/* 
	 * 如果一个宽限期运行了很长时间,那么强制静止状态。 
	 *  
	 */  
	if (ULONG_CMP_LT(ACCESS_ONCE(rsp->jiffies_force_qs), jiffies))  
		force_quiescent_state(rsp, 1);  

	/* 
	 * 处理宽限期结束相关内容。 
	 */  
	rcu_process_gp_end(rsp, rdp);  

	/* 检测是否有新的宽限期开始或者静止状态需要向上报告。 */  
	rcu_check_quiescent_state(rsp, rdp);  

	/* 当前CPU需要新的宽限期吗? */  
	if (cpu_needs_another_gp(rsp, rdp)) {  
		raw_spin_lock_irqsave(&rcu_get_root(rsp)->lock, flags);  
		rcu_start_gp(rsp, flags);  /* releases above lock */  
	}  

	/* 如果有等着调用的回调函数,那么调用它。 */  
	if (cpu_has_callbacks_ready_to_invoke(rdp))  
		invoke_rcu_callbacks(rsp, rdp);  
}  

软件中断其实就是调用之前提到过的函数来完成具体的任务。

Linux kernel 内存屏障在RCU上的应用

http://blog.csdn.net/jianchaolv/article/details/7527647

内存屏障主要解决的问题是编译器的优化和CPU的乱序执行。

编译器在优化的时候,生成的汇编指令可能和c语言程序的执行顺序不一样,在需要程序严格按照c语言顺序执行时,需要显式的告诉编译不需要优化,这在linux下是通过barrier()宏完成的,它依靠volidate关键字和memory关键字,前者告诉编译barrier()周围的指令不要被优化,后者作用是告诉编译器汇编代码会使内存里面的值更改,编译器应使用内存里的新值而非寄存器里保存的老值。

同样,CPU执行会通过乱序以提高性能。汇编里的指令不一定是按照我们看到的顺序执行的。linux中通过mb()系列宏来保证执行的顺序。简单的说,如果在程序某处插入了mb()/rmb()/wmb()宏,则宏之前的程序保证比宏之后的程序先执行,从而实现串行化。

即使是编译器生成的汇编码有序,处理器也不一定能保证有序。就算编译器生成了有序的汇编码,到了处理器那里也拿不准是不 是会按照代码顺序执行。所以就算编译器保证有序了,程序员也还是要往代码里面加内存屏障才能保证绝对访存有序,这倒不如编译器干脆不管算了,因为内存屏障 本身就是一个sequence point,加入后已经能够保证编译器也有序。

处理器虽然乱序执行,但最终会得出正确的结果,所以逻辑上讲程序员本不需要关心处理器乱序的问题。但是在SMP并发执行的情况下,处理器无法知道并发程序之间的逻辑,比如,在不同core上的读者和写者之间的逻辑。简单讲,处理器只保证在单个core上按照code中的顺序给出最终结果。这就要求程序员通过mb()/rmb()/wmb()/read_barrier_depends来告知处理器,从而得到正确的并发结果。内存屏障、数据依赖屏障都是为了处理SMP环境下的数据同步问题,UP根本不存在这个问题。

下面分析下内存屏障在RCU上的应用:

1
2
3
4
5
6
7
8
9
10
#define rcu_assign_pointer(p, v) ({ \
	smp_wmb();                      \
	(p)= (v);                       \
})

#define rcu_dereference(p) ({     \
	typeof(p)_________p1 = p;     \
	smp_read_barrier_depends();   \
	(_________p1);                \
}) 

rcu_assign_pointer()通常用于写者的发布,rcu_dereference()通常用于读者的订阅。

写者:

1
2
3
4
5
6
7
p->a = 1;
p->b = 2;
p->c = 3;
rcu_assign_pointer(gp, p);

// 如果gp的原值马上会被改变/释放,则需要synchronize_rcu()/synchronize_net(),
// 如: 模块的卸载, 原gp指向函数被释放

读者:

1
2
3
4
5
6
rcu_read_lock();
p = rcu_dereference(gp);
if (p != NULL) {
	do_something_with(p->a, p->b, p->c);
}
rcu_read_unlock();

rcu_assign_pointer()是说,先把那块内存写好,再把指针指过去。这里使用的内存写屏障是为了保证并发的读者读到数据一致性。在这条语句之前的读者读到旧的指针和旧的内存,这条语句之后的读者读到新的指针和新的内存。如果没有这条语句,很有可能出现读者读到新的指针和旧的内存。也就是说,这里通过内存屏障刷新了p所指向的内存的值,至于gp本身的值有没有更新还不确定。实际上,gp本身值的真正更新要等到并发的读者来促发。

rcu_dereference() 原语用的是数据依赖屏障,smp_read_barrier_dependence,它要求后面的读操作如果依赖前面的读操作,则前面的读操作需要首先完成。根据数据之间的依赖,要读p->a, p->b, p->c, 就必须先读p,要先读p,就必须先读p1,要先读p1,就必须先读gp。也就是说读者所在的core在进行后续的操作之前,gp必须是同步过的当前时刻的最新值。如果没有这个数据依赖屏障,有可能读者所在的core很长一段时间内一直用的是旧的gp值。所以,这里使用数据依赖屏障是为了督促写者将gp值准备好,是为了呼应写者,这个呼应的诉求是通过数据之间的依赖关系来促发的,也就是说到了非呼应不可的地步了。

下面看看kernel中常用的链表操作是如何使用这样的发布、订阅机制的:

写者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static inline void list_add_rcu(struct list_head *new, struct list_head *head)
{
	__list_add_rcu(new, head, head->next);
}

static inline void __list_add_rcu(struct list_head * new,
struct list_head * prev, struct list_head * next)
{
	new->next = next;
	new->prev = prev;
	smp_wmb();
	next->prev = new;
	prev->next = new;
}

读者:

1
2
3
4
5
#define list_for_each_entry_rcu(pos, head, member)                \
	for(pos = list_entry((head)->next, typeof(*pos), member);     \
			prefetch(rcu_dereference(pos)->member.next),          \
			&pos->member!= (head);                                \
		pos= list_entry(pos->member.next, typeof(*pos), member))

写者通过调用list_add_rcu来发布新的节点,其实是发布next->prev, prev->next这两个指针。读者通过list_for_each_entry_rcu来订阅这连个指针,我们将list_for_each_entry_rcu订阅部分简化如下:

1
2
pos = prev->next;
prefetch(rcu_dereference(pos)->next);

读者通过rcu_dereference订阅的是pos,而由于数据依赖关系,又间接订阅了prev->next指针,或者说是促发prev->next的更新。

下面介绍下其他相关链表操作的函数:

safe版本的iterate的函数?为什么就safe了?

1
2
3
4
5
6
7
#define list_for_each_safe(pos,n, head)                    \
	for(pos = (head)->next, n = pos->next; pos != (head);  \
			pos= n, n = pos->next)

#define list_for_each(pos, head)                                \
	for(pos = (head)->next; prefetch(pos->next), pos != (head); \
			pos= pos->next)

当在iterate的过程中执行删除操作的时候,比如:

1
2
list_for_each(pos,head)
	list_del(pos)

这样会断链,为了避免这种断链,增加了safe版本的iterate函数。另外,由于preftech的缘故,有可能引用一个无效的指针LIST_POISON1。这里的safe是指,为避免有些cpu的preftech的影响,干脆在iterate的过程中去掉preftech。

还有一个既有rcu+safe版本的iterative函数:

1
2
3
4
#define list_for_each_safe_rcu(pos, n, head)              \
	for(pos = (head)->next;                               \
			n= rcu_dereference(pos)->next, pos != (head); \
			pos= n)

只要用这个版本的iterate函数,就可以和多个_rcu版本的写操作(如:list_add_rcu())并发执行。

kmalloc、vmalloc、malloc的区别

blog.csdn.net/macrossdzh/article/details/5958368

简单的说:
kmalloc和vmalloc是分配的是内核的内存,malloc分配的是用户的内存
kmalloc保证分配的内存在物理上是连续的,vmalloc保证的是在虚拟地址空间上的连续,malloc不保证任何东西(这点是自己猜测的,不一定正确)
kmalloc能分配的大小有限,vmalloc和malloc能分配的大小相对较大
内存只有在要被DMA访问的时候才需要物理上连续
vmalloc比kmalloc要慢

详细的解释:
对于提供了MMU(存储管理器,辅助操作系统进行内存管理,提供虚实地址转换等硬件支持)的处理器而言,Linux提供了复杂的存储管理系统,使得进程所能访问的内存达到4GB。 进程的4GB内存空间被人为的分为两个部分–用户空间与内核空间。用户空间地址分布从0到3GB(PAGE_OFFSET,在0x86中它等于0xC0000000),3GB到4GB为内核空间。
内核空间中,从3G到vmalloc_start这段地址是物理内存映射区域(该区域中包含了内核镜像、物理页框表mem_map等等),比如我们使用 的 VMware虚拟系统内存是160M,那么3G~3G+160M这片内存就应该映射物理内存。在物理内存映射区之后,就是vmalloc区域。对于 160M的系统而言,vmalloc_start位置应在3G+160M附近(在物理内存映射区与vmalloc_start期间还存在一个8M的gap 来防止跃界),vmalloc_end的位置接近4G(最后位置系统会保留一片128k大小的区域用于专用页面映射)

kmalloc和get_free_page申请的内存位于物理内存映射区域,而且在物理上也是连续的,它们与真实的物理地址只有一个固定的偏移,因此存在较简单的转换关系,virt_to_phys()可以实现内核虚拟地址转化为物理地址:

1
2
3
4
5
#define __pa(x) ((unsigned long)(x)-PAGE_OFFSET)
extern inline unsigned long virt_to_phys(volatile void * address)
{
	return __pa(address);
}

上面转换过程是将虚拟地址减去3G(PAGE_OFFSET=0XC000000)。

与之对应的函数为phys_to_virt(),将内核物理地址转化为虚拟地址:

1
2
3
4
5
#define __va(x) ((void *)((unsigned long)(x)+PAGE_OFFSET))
extern inline void * phys_to_virt(unsigned long address)
{
	return __va(address);
}

virt_to_phys()和phys_to_virt()都定义在include/asm-i386/io.h中。

而vmalloc申请的内存则位于vmalloc_start~vmalloc_end之间,与物理地址没有简单的转换关系,虽然在逻辑上它们也是连续的,但是在物理上它们不要求连续。


blog.csdn.net/kris_fei/article/details/17243527

平台: msm8x25
系统: android 4.1
内核: 3.4.0

概念

由于系统的连续物理内存有限,这使得非连续物理内存的使用在linux内核中出现,这叫vmalloc机制。和前者一样,vmalloc机制中的虚拟地址也是连续的。

Vmallocinfo

Vmalloc机制并不是狭义地指使用vmalloc函数分配,其他还有如ioremap, iotable_init等。可以从/proc/vmallocinfo获取到此信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#cat /proc/vmallocinfo
0xf3600000-0xf36ff0001044480 binder_mmap+0xb0/0x224 ioremap
………..
0xf6680000-0xf66c1000 266240 kgsl_page_alloc_map_kernel+0x98/0xe8 ioremap
0xf6700000-0xf67ff0001044480 binder_mmap+0xb0/0x224 ioremap
…………….
0xf6f00000-0xf6f41000 266240 kgsl_page_alloc_map_kernel+0x98/0xe8 ioremap
0xf7200000-0xf72ff0001044480 binder_mmap+0xb0/0x224 ioremap
0xfa000000-0xfa001000   4096 iotable_init+0x0/0xb0 phys=c0800000 ioremap
……………..
0xfa105000-0xfa106000   4096 iotable_init+0x0/0xb0 phys=a9800000 ioremap
0xfa200000-0xfa3000001048576 pmd_empty_section_gap+0x0/0x3c ioremap
0xfa300000-0xfa4000001048576 iotable_init+0x0/0xb0 phys=100000 ioremap
0xfa400000-0xfa5000001048576 iotable_init+0x0/0xb0 phys=aa500000 ioremap
0xfa500000-0xfa6000001048576 pmd_empty_section_gap+0x0/0x3c ioremap
0xfa701000-0xfa702000   4096 iotable_init+0x0/0xb0 phys=c0400000 ioremap
…………..
0xfa800000-0xfa9000001048576 pmd_empty_section_gap+0x0/0x3c ioremap
0xfa900000-0xfb60000013631488 iotable_init+0x0/0xb0 phys=ac000000 ioremap
0xfefdc000-0xff000000 147456 pcpu_get_vm_areas+0x0/0x56c vmalloc

上面的列数意思依次是:虚拟地址,分配大小,哪个函数分配的,物理地址,分配类型。

后面会提到vmalloc size的划分是按照此info来修改的。

分配标志

是否划分到vamlloc区域主要是以下重要的标志来决定的:

File: kernel/include/linux/vmalloc.h

1
2
3
4
5
6
7
8
/* bits in flags ofvmalloc's vm_struct below */
#defineVM_IOREMAP    0x00000001     /* ioremap()and friends */
#define VM_ALLOC     0x00000002     /* vmalloc() */
#defineVM_MAP        0x00000004     /* vmap()ed pages */
#defineVM_USERMAP    0x00000008     /* suitable forremap_vmalloc_range */
#defineVM_VPAGES     0x00000010     /* buffer for pages was vmalloc'ed */
#defineVM_UNLIST     0x00000020     /* vm_struct is not listed in vmlist */
/* bits [20..32]reserved for arch specific ioremap internals */

Vmallocinfo中的函数,你可以对照源码看一下,在设置flag的时候就会有VM_IOREMAP, VM_ALLOC这些标志。

Vmalloc区域

Vmalloc的区域是由两个宏变量来表示: VMALLOC_START,VMALLOC_END.

File: kernel/arch/arm/include/asm/pgtable.h

1
2
3
#defineVMALLOC_OFFSET       (8*1024*1024)
#defineVMALLOC_START        (((unsigned long)high_memory + VMALLOC_OFFSET) & ~(VMALLOC_OFFSET-1))
#defineVMALLOC_END          0xff000000UL

VMALLOC_START:看上去会随着high_memory的值变化。

VMALLOC_OFFSET:系统会在low memory和VMALLOC区域留8M,防止访问越界。因此假如理论上vmalloc size有300M,实际可用的也是只有292M。

File: kernel/Documentation/arm/memory.txt有给出更好的解释:

1
VMALLOC_START   VMALLOC_END-1    vmalloc() / ioremap() space. Memory returned byvmalloc/ioremap will be dynamically placed in this region. Machine specificstatic mappings are also located here through iotable_init(). VMALLOC_START isbased upon the value of the high_memoryvariable, and VMALLOC_END is equal to 0xff000000.

下图摘自网络,看下VMALLOC_START和VMALLOC_END的位置。0xc0000000到VMALLOC_START为low memory虚拟地址区域。

Vmallocsize 计算

有了以上知识后我们看下vmalloc size是如何分配的,目前有两种方法,kernel默认分配一个, 以及开机从cmdline分配。

1. 从cmdline分配

File: device/qcom/msm7627a/BoardConfig.mk

BOARD_KERNEL_CMDLINE := androidboot.hardware=qcom loglevel=7vmalloc=200M

上面的值在build的时候会被赋值给kernel 的cmdline。

开机的时候early_vmalloc()会去读取vmalloc这个值。

File: kernel/arch/arm/mm/mmu.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int__init early_vmalloc(char *arg)
{
	/*cmdline中的vmalloc会被解析到vmlloc_reserve中。*/
	unsigned long vmalloc_reserve = memparse(arg, NULL);

	/*小于16M则用16M。*/
	if (vmalloc_reserve < SZ_16M) {
		vmalloc_reserve = SZ_16M;
		printk(KERN_WARNING
				"vmalloc area too small, limiting to %luMB\n",
				vmalloc_reserve >> 20);
	}

	/*大于可用虚拟地址内存则使用可用地址部分再减去32M。*/
	if (vmalloc_reserve > VMALLOC_END - (PAGE_OFFSET + SZ_32M)) {
		vmalloc_reserve = VMALLOC_END - (PAGE_OFFSET + SZ_32M);
		printk(KERN_WARNING
				"vmalloc area is too big, limiting to %luMB\n",
				vmalloc_reserve >> 20);
	}

	/*计算偏移起始地址。*/
	vmalloc_min = (void *)(VMALLOC_END - vmalloc_reserve);
	return 0;
}
early_param("vmalloc",early_vmalloc);

vmalloc_min会影响arm_lowmem_limit,arm_lowmem_limit其实就是high_memory。因为此过程不是我们要分析的重点,如果有兴趣可分析kernel/arch/arm/mm/mmu.c中的sanity_check_meminfo()函数。

所以,VMALLOC_START受到了hight_memory的影响而发生了变化,最终使得vmalloc size也变化了!

2. 开机默认分配:

File: kernel/arch/arm/mm/mmu.c

1
2
static void * __initdata vmalloc_min =
	(void *)(VMALLOC_END - (240 << 20) - VMALLOC_OFFSET);

当cmdline无vmalloc参数传进来的时候,early_vmalloc()函数也不会调用到,vmalloc_min的值就会被默认传进来了,默认是240M。

后面的步骤和方法1一样了!

开机log有memory layout 信息:

1
2
3
4
5
6
7
8
9
10
11
[    0.000000] [cpuid: 0] Virtual kernelmemory layout:
[    0.000000] [cpuid:0]     vector  : 0xffff0000 - 0xffff1000  (   4 kB)
[    0.000000] [cpuid:0]     fixmap  : 0xfff00000 - 0xfffe0000   (896 kB)
[    0.000000] [cpuid:0]     vmalloc : 0xf3000000 - 0xff000000   ( 192MB)
[    0.000000] [cpuid:0]     lowmem  : 0xc0000000 - 0xf2800000   (808 MB)
[    0.000000] [cpuid:0]     pkmap   : 0xbfe00000 -0xc0000000   (   2 MB)
[    0.000000] [cpuid:0]     modules : 0xbf000000 - 0xbfe00000  (  14 MB)
[    0.000000] [cpuid:0]       .text : 0xc0008000 -0xc0893034   (8749 kB)
[    0.000000] [cpuid:0]       .init : 0xc0894000 -0xc08cdc00   ( 231 kB)
[    0.000000] [cpuid:0]       .data : 0xc08ce000 -0xc09f8eb8   (1196 kB)
[    0.000000] [cpuid:0]        .bss : 0xc0a78eec -0xc0f427a8   (4903 kB)

其中看到vmalloc为192MB , cmdline中使用vmllaoc就是200M。

Lowmem为地段内存部分,可见lowmem和vmalloc中间有8M空隙。

Vmalloc该分配多大?

Linux内核版本从3.2到3.3 默认的vmalloc size由128M 增大到了240M,3.4.0上的

修改Commit信息如下:

To accommodate all static mappings on machines withpossible highmem usage, the default vmalloc area size is changed to 240 MB sothat VMALLOC_START is no higher than 0xf0000000 by default.

看其意思是因为开机的静态映射增加了,所以要扩大。

另外3.2到3.3版本的一个重大变化是将android引入到主线内核中。我想增大vmalloc size到240M是基于此考虑吧。当然,各家厂商都也可以基于自己平台来动态修改size的。

那么如何判断当前vmalloc size不足呢?

/proc/meminfo中有vmalloc信息:
VmallocTotal: 540672 kB
VmallocUsed: 165268 kB
VmallocChunk: 118788kB

事实上这里的VmallocUsed只是表示已经被真正使用掉的vmalloc区域,但是区域之前的空隙也就是碎片没有被计算进去。

所以,回到前面说的/proc/vmallocinfo,假设我们的vmalloc size就是200M。那么区域为0xf3000000- 0xff000000,从vmallocinfo中可以看到,前面大部分虚拟地址空间都用掉了,剩下0xfb600000到0xfefdc000这57M空间,假如申请了64M,那么就会失败了。

开机分配使用掉vmalloc之后到底该剩余多少目前没有具体依据,一般来说1GB RAM可以设置为400~600M。

Linux-2.6.32 NUMA架构之内存和调度

http://blog.chinaunix.net/uid-7295895-id-3076420.html

Linux-2.6.32 NUMA架构之内存和调度

本文将以XLP832通过ICI互连形成的NUMA架构进行分析,主要包括内存管理和调度两方面,参考内核版本2.6.32.9;NUMA架构常见配置选项有:CONFIG_SMP, CONFIG_NUMA, CONFIG_NEED_MULTIPLE_NODES, CONFIG_NODES_SHIFT, CONFIG_SPARSEMEM, CONFIG_CGROUPS, CONFIG_CPUSETS, CONFIG_MIGRATION等。

本文试图从原理上介绍,尽量避免涉及代码的实现细节。

1 NUMA架构简介

NUMA(Non Uniform Memory Access)即非一致内存访问架构,市面上主要有X86_64(JASPER)和MIPS64(XLP)体系。

1.1 概念

NUMA具有多个节点(Node),每个节点可以拥有多个CPU(每个CPU可以具有多个核或线程),节点内使用共有的内存控制器,因此节点的所有内存对于本节点的所有CPU都是等同的,而对于其它节点中的所有CPU都是不同的。节点可分为本地节点(Local Node)、邻居节点(Neighbour Node)和远端节点(Remote Node)三种类型。

本地节点:对于某个节点中的所有CPU,此节点称为本地节点;
邻居节点:与本地节点相邻的节点称为邻居节点;
远端节点:非本地节点或邻居节点的节点,称为远端节点。

邻居节点和远端节点,称作非本地节点(Off Node)。

CPU访问不同类型节点内存的速度是不相同的:本地节点>邻居节点>远端节点。访问本地节点的速度最快,访问远端节点的速度最慢,即访问速度与节点的距离有关,距离越远访问速度越慢,此距离称作Node Distance。

常用的NUMA系统中:硬件设计已保证系统中所有的Cache是一致的(Cache Coherent, ccNUMA);不同类型节点间的Cache同步时间不一样,会导致资源竞争不公平,对于某些特殊的应用,可以考虑使用FIFO Spinlock保证公平性。

1.2 关键信息

1) 物理内存区域与Node号之间的映射关系;
2) 各Node之间的Node Distance;
3) 逻辑CPU号与Node号之间的映射关系。

2 XLP832 NUMA初始化

首先需要完成1.2节中描述的3个关键信息的初始化。

2.1 CPU和Node的关系

start_kernel()->setup_arch()->prom_init():

1
2
3
#ifdef CONFIG_NUMA
	build_node_cpu_map();
#endif

build_node_cpu_map()函数工作:

a) 确定CPU与Node的相互关系,做法很简单:

1
2
#define cpu_to_node(cpu)       (cpu >> 5)
#define cpumask_of_node    (NODE_CPU_MASK(node)) /* node0:0~31; node1: 32~63 */

说明:XLP832每个节点有1个物理CPU,每个物理CPU有8个核,每个核有4个超线程,因此每个节点对应32个逻辑CPU,按节点依次展开。另外,实际物理存在的CPU数目是通过DTB传递给内核的;numa_node_id()可以获取当前CPU所处的Node号。

b) 设置每个物理存在的节点的在线状态,具体是通过node_set_online()函数来设置全局变量

nodemask_t node_states[];

这样,类似于CPU号,Node号也就具有如下功能宏:

1
2
for_each_node(node);
for_each_online_node(node);

详细可参考include/linux/nodemask.h

2.2 Node Distance确立

作用:建立buddy时用,可以依此来构建zonelist,以及zone relaim(zone_reclaim_mode)使用,详见后面的4.2.2节。

2.3 内存区域与Node的关系

start_kernel()->setup_arch()->arch_mem_init->bootmem_init()->nlm_numa_bootmem_init():

nlm_get_dram_mapping();

XLP832上电后的默认memory-mapped物理地址空间分布:

其中PCIE配置空间映射地址范围为[0x1800_0000, 0x1BFF_FFFF],由寄存器ECFG_BASE和ECFG_LIMIT指定(注:但这2个寄存器本身是处于PCIE配置空间之中的)。

PCIE配置空间:
PCIE配置空间与memory-mapped物理地址的映射方式:

XLP832实现了所有设备都位于虚拟总线0上,每个节点有8个设备,按节点依次排开。

DRAM映射寄存器组:
每个节点都独立实现有几组不同类型的DRAM(每组有8个相同类型的)寄存器可以配置DRAM空间映射到物理地址空间中的基址和大小,以及所属的节点信息(这些寄存器的值事先会由bootloader设好);这组寄存器位于虚拟总线0的设备0/8/16/24(依次对应每个节点的第一个设备号)的Function0(每个设备最多可定义8个Function,每个Function有着独立的PCIE 4KB的配置空间)的PCIE配置空间中(这个配置空间实现的是DRAM/Bridge控制器)。

本小节涉及到的3组不同类型的寄存器(注:按索引对应即DRAM_BAR,DRAM_LIMIT和 DRAM_NODE_TRANSLATION描述一个内存区域属性):

第一组(DRAM空间映射物理空间基址):

1
2
3
4
5
6
7
8
DRAM_BAR0: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x54
DRAM_BAR1: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x55
DRAM_BAR2: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x56
DRAM_BAR3: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x57
DRAM_BAR4: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x58
DRAM_BAR5: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x59
DRAM_BAR6: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5A
DRAM_BAR7: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5B

第二组(DRAM空间映射物理空间长度):

1
2
3
4
5
6
7
8
DRAM_LIMIT0: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5C
DRAM_LIMIT1: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5D
DRAM_LIMIT2: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5E
DRAM_LIMIT3: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5F
DRAM_LIMIT4: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x60
DRAM_LIMIT5: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x61
DRAM_LIMIT6: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x62
DRAM_LIMIT7: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x63

第三组(节点相关):

1
2
3
4
5
6
7
8
DRAM_NODE_TRANSLATION0: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x64
DRAM_NODE_TRANSLATION1: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x65
DRAM_NODE_TRANSLATION2: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x66
DRAM_NODE_TRANSLATION3: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x67
DRAM_NODE_TRANSLATION4: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x68
DRAM_NODE_TRANSLATION5: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x69
DRAM_NODE_TRANSLATION6: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x6A
DRAM_NODE_TRANSLATION7: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x6B

根据上述的PCIE配置空间memory-mapped映射方式便可直接获取寄存器中的值,就可以建立各个节点中的所有内存区域(最多8个区域)信息。关于这些寄存器的使用可以参考“XLP® Processor Family Programming Reference Manual”的“Chapter 7 Memory and I/O Subsystem”。

3 Bootmem初始化

bootmem_init()->…->init_bootmem_node()->init_bootmem_core():

每个节点拥有各自的bootmem管理(code&data之前可以为空闲页面)。

4 Buddy初始化

初始化流程最后会设置全局struct node_active_region early_node_map[]用于初始化Buddy系统,for_each_online_node()遍历所有在线节点调用free_area_init_node()初始化,主要初始化每个zone的大小和所涉及页面的struct page结构(flags中初始化有所属zone和node信息,由set_page_links()函数设置)等。

4.1 NUMA带来的变化

1) pglist_data

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct pglist_data {
	struct zone node_zones[MAX_NR_ZONES];
	struct zonelist node_zonelists[MAX_ZONELISTS];
	int nr_zones;
	struct bootmem_data *bdata;
	unsigned long node_start_pfn;
	unsigned long node_present_pages; /* total number of physical pages */
	unsigned long node_spanned_pages; /* total size of physical pagerange, including holes */
	int node_id;
	wait_queue_head_t kswapd_wait;
	struct task_struct *kswapd;
	int kswapd_max_order;
} pg_data_t;

a)上节的bootmem结构的描述信息存放在NODE_DATA(node)-> bdata中;NODE_DATA(i)宏返回节点i的struct pglist_data结构,需要在架构相关的mmzone.h中实现;
b) #define MAX_ZONELISTS 2,请参考后面的“zonelist初始化”。

2) zone

1
2
3
4
5
6
7
8
9
10
11
12
struct zone {
#ifdef CONFIG_NUMA
	int node;
	/*
	 * zone reclaim becomes active if more unmapped pages exist.
	 */
	unsigned long        min_unmapped_pages;
	unsigned long        min_slab_pages;
	struct per_cpu_pageset   *pageset[NR_CPUS];
#else
	… …
};

a)最终调用kmalloc_node()为pageset成员在每个CPU的对应的内存节点分配内存;
b)min_unmapped_pages 对应/proc/sys/vm/min_unmapped_ratio,默认值为1;
min_slab_pages对应/proc/sys/vm/min_slab_ratio,默认值为5;
作用:当剩余可回收的非文件映射和SLAB页面超过这2个值时,才激活当前zone回收;

c) 增加了zone对应的节点号。

4.2 zonelist初始化

本节讲述zonelist的构建方式,实现位于start_kernel()->build_all_zonelists()中,zonelist的组织方式非常关键(这一点与以前的2.6.21内核版本不一样,2.6.32组织得更清晰)。

4.2.1 zonelist order

NUMA系统中存在多个节点,每个节点对应一个struct pglist_data结构,此结构中可以包含多个zone,如:ZONE_DMA, ZONE_NORMAL,这样就产生几种排列顺序,以2个节点2个zone为例(zone从高到低排列, ZONE_DMA0表示节点0的ZONE_DMA,其它类似):

a) Legacy方式

每个节点只排列自己的zone;

b)Node方式

按节点顺序依次排列,先排列本地节点的所有zone,再排列其它节点的所有zone。

c) Zone方式

按zone类型从高到低依次排列各节点的同相类型zone。

可通过启动参数“numa_zonelist_order”来配置zonelist order,内核定义了3种配置:

1
2
3
#define ZONELIST_ORDER_DEFAULT  0 /* 智能选择Node或Zone方式 */
#define ZONELIST_ORDER_NODE     1 /* 对应Node方式 */
#define ZONELIST_ORDER_ZONE     2 /* 对应Zone方式 */

默认配置为ZONELIST_ORDER_DEFAULT,由内核通过一个算法来判断选择Node或Zone方式,算法思想:

a) alloc_pages()分配内存是按照ZONE从高到低的顺序进行的,例如上节“Node方式”的图示中,从ZONE_NORMAL0中分配内存时,ZONE_NORMAL0中无内存时将落入较低的ZONE_DMA0中分配,这样当ZONE_DMA0比较小的时候,很容易将ZONE_DMA0中的内存耗光,这样是很不理智的,因为还有更好的分配方式即从ZONE_NORMAL1中分配;

b) 内核会检测各ZONE的页面数来选择Zone组织方式,当ZONE_DMA很小时,选择ZONELIST_ORDER_DEFAULT时,内核将倾向于选择ZONELIST_ORDER_ZONE方式,否则选择ZONELIST_ORDER_NODE方式。

另外,可以通过/proc/sys/vm/numa_zonelist_order动态改变zonelist order的分配方式。

4.2.2 Node Distance

上节中的例子是以2个节点为例,如果有>2个节点存在,就需要考虑不同节点间的距离来安排节点,例如以4个节点2个ZONE为例,各节点的布局(如4个XLP832物理CPU级联)值如下:

上图中,Node0和Node2的Node Distance为25,Node1和Node3的Node Distance为25,其它的Node Distance为15。

4.2.2.1 优先进行Zone Reclaim

另外,当Node Distance超过20的时候,内核会在某个zone分配内存不足的时候,提前激活本zone的内存回收工作,由全局变量zone_reclaim_mode控制,build_zonelists()中:

1
2
3
4
5
6
/*
 * If another node is sufficiently far away then it is better
 * to reclaim pages in a zone before going off node.
 */
if (distance > RECLAIM_DISTANCE)
	zone_reclaim_mode = 1;

通过/proc/sys/vm/zone_reclaim_mode可以动态调整zone_reclaim_mode的值来控制回收模式,含义如下:

1
2
3
4
#define RECLAIM_OFF    0
#define RECLAIM_ZONE  (1<<0)     /* Run shrink_inactive_list on the zone */
#define RECLAIM_WRITE (1<<1)     /* Writeout pages during reclaim */
#define RECLAIM_SWAP  (1<<2)     /* Swap pages out during reclaim */
4.2.2.2 影响zonelist方式

采用Node方式组织的zonelist为:

即各节点按照与本节点的Node Distance距离大小来排序,以达到更优的内存分配。

4.2.3 zonelist[2]

配置NUMA后,每个节点将关联2个zonelist:
1) zonelist[0]中存放以Node方式或Zone方式组织的zonelist,包括所有节点的zone;
2) zonelist[1]中只存放本节点的zone即Legacy方式;

zonelist[1]用来实现仅从节点自身zone中的内存分配(参考__GFP_THISNODE标志)。

5 SLAB初始化

配置NUMA后对SLAB(本文不涉及SLOB或SLUB)的初始化影响不大,只是在分配一些变量采用类似Buddy系统的per_cpu_pageset(单面页缓存)在CPU本地节点进行内存分配。

5.1 NUMA带来的变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct kmem_cache {
	struct array_cache *array[NR_CPUS];
	… …
	struct kmem_list3 *nodelists[MAX_NUMNODES];
};

struct kmem_list3 {
	… …
	struct array_cache *shared;    /* shared per node */
	struct array_cache **alien;    /* on other nodes */
	… …
};

struct slab {
	… …
	unsigned short nodeid;
	… …
};

上面的4种类型的指针变量在SLAB初始化完毕后将改用kmalloc_node()分配的内存。具体实现请参考enable_cpucache(),此函数最终调用alloc_arraycache()和alloc_kmemlist()来分配这些变量代表的空间。

nodelists[MAX_NUMNODES]存放的是所有节点对应的相关数据,本文称作SLAB节点。每个节点拥有各自的数据;

注:有些非NUMA系统比如非连续内存系统可能根据不同的内存区域定义多个节点(实际上Node Distance都是0即物理内存访问速度相同),所以这些变量并没有采用CONFIG_NUMA宏来控制,本文暂称为NUMA带来的变化。

5.2 SLAB缓存

配置NUMA后,SLAB将有三种类型的缓存:本地缓存(当前CPU的缓存),共享缓存(节点内的缓存)和外部缓存(节点间的缓存)。

SLAB系统分配对象时,先从本地缓存中查找,如果本地缓存为空,则将共享缓存中的缓存搬运本地缓存中,重新从本地缓存中分配;如果共享缓存为空,则从SLAB中进行分配;如果SLAB中已经无空闲对象,则分配新的SLAB后重新分配本地缓存。

SLAB系统释放对象时,先不归还给SLAB (简化分配流程,也可充分利用CPU Cache),如果是同节点的SLAB对象先放入本地缓存中,如果本地缓存溢出(满),则转移一部分(以batch为单位)至共享缓存中;如果是跨节点释放,则先放入外部缓存中,如果外部缓存溢出,则转移一部分至共享缓存中,以供后续分配时使用;如果共享缓存溢出,则调用free_block()函数释放溢出的缓存对象。

关于这三种类型缓存的大小以及参数设置,不在本文的讨论范围。

本地缓存
kmem_cache-> array[] 中缓存每个CPU的SLAB cached objects;

共享缓存
kmem_list3[]->shared(如果存在shared缓存)中缓存与当前CPU同节点的所有CPU (如XLP832 NUMA系统中的Node0包含为CPU0~CPU31) 本地缓存溢出的缓存,详细实现请参考cache_flusharray();另外,大对象SLAB不存在共享缓存。

外部缓存
kmem_list3[]->alien中存放其它节点的SLAB cached objects,当在某个节点上分配的SLAB 的object在另外一个节点上被释放的时候(即slab->nodeid与numa_node_id()当前节点不相等时),将加入到对象所在节点的alien缓存中(如果不存在此alien缓存,此对象不会被缓存,而是直接释放给此对象所属SLAB),否则加入本地缓存或共享缓存(本地缓存溢出且存在shared缓存时);当alien缓存满的时候,会调用cache_free_alien()搬迁至shared缓存中(如果不存在shared缓存,直接释放给SLAB);

slab->nodeid记录本SLAB内存块(若干个页面)所在的节点。

示例

例如2个节点,CPU0~31位于Node0,CPU32~CPU63位于Node1:

64个(依次对应于CPU0~CPU63)本地缓存
kmem_cache->array[0~31]:在Node0分配“array_cache结构+cached Objs指针”;
kmem_cache->array[32~63]:在Node1分配“array_cache结构+cached Objs指针”;

2个SLAB节点
kmem_cache->nodelists[0]:在Node0分配“kmem_list3结构”;
kmem_cache->nodelists[1]:在Node1分配“kmem_list3结构”;

SLAB节点0(CPU0~CPU31)共享缓存和外部缓存alien[1]
kmem_cache->nodelists[0]->shared:在Node0分配“array_cache结构+cached Objs指针”;
kmem_cache->nodelists[0]->alien:在Node0分配“节点数sizeof(void)”;
kmem_cache->nodelists[0]->alien[0]:置为NULL;
kmem_cache->nodelists[0]->alien[1]:在Node0分配“array_cache结构+cached Objs指针”;

SLAB节点1(CPU32~CPU63)共享缓存和外部缓存alien[0]
kmem_cache->nodelists[1]->shared:在Node1分配“array_cache结构+cached Objs指针”;
kmem_cache->nodelists[1]->alien:在Node1分配“节点数sizeof(void)”;
kmem_cache->nodelists[1]->alien[0]:在Node1分配“array_cache结构+cached Objs指针”;
kmem_cache->nodelists[1]->alien[1]:置为NULL;

另外,可以用内核启动参数“use_alien_caches”来控制是否开启alien缓存:默认值为1,当系统中的节点数目为1时,use_alien_caches初始化为0;use_alien_caches目的是用于某些多节点非连续内存(访问速度相同)的非NUMA系统。

由上可见,随着节点个数的增加,SLAB明显会开销越来越多的缓存,这也是SLUB涎生的一个重要原因。

5.3 __GFP_THISNODE

SLAB在某个节点创建新的SLAB时,都会置__GFP_THISNODE标记向Buddy系统提交页面申请,Buddy系统中看到此标记,选用申请节点的Legacy zonelist[1],仅从申请节点的zone中分配内存,并且不会走内存不足流程,也不会重试或告警,这一点需要引起注意。

SLAB在申请页面的时候会置GFP_THISNODE标记后调用cache_grow()来增长SLAB;

GFP_THISNODE定义如下:

1
2
#ifdef CONFIG_NUMA
#define GFP_THISNODE     (__GFP_THISNODE | __GFP_NOWARN | __GFP_NORETRY)

6 调度初始化

配置NUMA后负载均衡会多一层NUMA调度域,根据需要在topology.h中定义,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define SD_NODE_INIT (struct sched_domain) {                 \
	.parent             = NULL,                              \
	.child              = NULL,                              \
	.groups             = NULL,                              \
	.min_interval       = 8,                                 \
	.max_interval       = 32,                                \
	.busy_factor        = 32,                                \
	.imbalance_pct      = 125,                               \
	.cache_nice_tries   = 1,                                 \
	.flags              = SD_LOAD_BALANCE | SD_BALANCE_EXEC, \
	.last_balance       = jiffies,                           \
	.balance_interval   = 1,                                 \
	.nr_balance_failed  = 0,                                 \
}

顺便提一下,2.6.32对于实时任务不走负载均衡流程,采用了全局优先级调度的思想,保证实时任务的及时运行;这样的做法同时也解决了低版本内核在处理同一个逻辑CPU上相同最高优先级实时任务的负载均衡的时延。

7 NUMA内存分配

Zonelist[2]组织方式在NUMA内存分配过程中起着至关重要的作用,它决定了整个页面在不同节点间的申请顺序和流程。

7.1显式分配

显式分配即指定节点的分配函数,此类基础分配函数主要有2个:Buddy系统的 alloc_pages_node()和SLAB系统的kmem_cache_alloc_node(),其它的函数都可以从这2个派生出来。

例如,kmalloc_node()最终调用kmem_cache_alloc_node()进行分配。

7.1.1 Buddy显式分配

alloc_pages_node(node, gfp_flags, order)分配流程:
1) 如果node小于0,node取本地节点号(node = numa_node_id());
2) NODE_DATA(node)得到node对应的struct pglist_data结构,从而得到zonelist[2];
3) 如果gfp_flags含有__GFP_THISNODE标志,仅在此节点分配内存,使用node节点的Legacy zonelist[1],否则使用其包含所有节点zone的zonelist[0] (见4.2.2.3节);
4) 遍历确定出来的zonelist结构中包含的每一个符合要求的zone,gfp_flags指定了本次分配中的最高的zone,如__GFP_HIGHMEM表示最高的zone为ZONE_HIGH;
5) 分配结束。

7.1.2 SLAB显式分配

kmem_cache_alloc_node(cachep, gfp_flags, node)分配流程:
1) 如果node值为-1,node取本地节点号(node = numa_node_id());
2) 如果node < -1,则执行fall back行为,此行为与用户策略有关,有点类似隐式分配:
a) 根据用户策略(包括CPUSET和内存策略)依次选取节点,根据gfp_flags选取合适的zonelist进行分配;
b) 如果内存不足分配失败,则跳过内存策略直接进行隐式Buddy页面分配(仍受CPUSET的限定,关于CPUSET和内存策略后面会介绍),最终构建成新的SLAB并完成本次分配;转5);
3) 如果node是正常节点号,则先在node节点上根据gfp_flags选取合适的zonelist进行分配;
4) 如果3)中node节点内存不足分配失败,转2) a)执行fall back行为。
5) 分配结束。

注:fall back行为指的是某个节点上内存不足时会落到此节点的zonelist[0]中定义的其它节点zone分配。

7.1.3 设备驱动

配置CONFIG_NUMA后,设备会关联一个NUMA节点信息,struct device结构中会多一个numa_node字段记录本设备所在的节点,这个结构嵌套在各种类型的驱动中,如struct net_device结构。

1
2
3
4
5
6
7
struct device {
	… …
	#ifdef CONFIG_NUMA
		int          numa_node;    /* NUMA node this device is close to */
	#endif
	… …
}

__netdev_alloc_skb()的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct sk_buff *__netdev_alloc_skb(struct net_device *dev,
		unsigned int length, gfp_t gfp_mask)
{
	int node = dev->dev.parent ? dev_to_node(dev->dev.parent) : -1;
	struct sk_buff *skb;

	skb = __alloc_skb(length + NET_SKB_PAD, gfp_mask, 0, node);
	if (likely(skb)) {
		skb_reserve(skb, NET_SKB_PAD);
		skb->dev = dev;
	}
	return skb;
}

__alloc_skb()最终调用kmem_cache_alloc_node()和kmalloc_node()在此node上分配内存。

7.2 隐式分配和内存策略

隐式分配即不指定节点的分配函数,此类基础分配函数主要有2个:Buddy系统的 alloc_pages()和SLAB系统的kmem_cache_alloc(),其它的函数都可以从这2个派生出来。

隐式分配涉及到NUMA内存策略(Memory Policy),内核定义了四种内存策略。

注:隐式分配还涉及到CPUSET,本文后面会介绍。

7.2.1 内存策略

内核mm/mempolicy.c中实现了NUMA内存的四种内存分配策略:MPOL_DEFAULT, MPOL_PREFERRED, MPOL_INTERLEAVE和MPOL_BIND,内存策略会从父进程继承。

MPOL_DEFAULT:使用本地节点的zonelist;
MPOL_PREFERRED:使用指定节点的zonelist;
MPOL_BIND: 设置一个节点集合,只能从这个集合中节点的zone申请内存:

1)无__GFP_THISNODE申请标记,使用本地节点的zonelist[0];
2)置有__GFP_THISNODE申请标记,如果本地节点:
a)在集合中,使用本地节点的zonelist[1];
b)不在集合中,使用集合中最小节点号的zonelist[1];

MPOL_INTERLEAVE:采用Round-Robin方式从设定的节点集合中选出某个节点,使用此节点的zonelist;

内核实现的内存策略,用struct mempolicy结构来描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct mempolicy {
	atomic_t refcnt;
	unsigned short mode;              /* See MPOL_* above */
	unsigned short flags;             /* See set_mempolicy() MPOL_F_* above */
	union {
		short         preferred_node; /* preferred */
		nodemask_t    nodes;          /* interleave/bind */
		/* undefined for default */
	} v;
	union {
		nodemask_t cpuset_mems_allowed;     /* relative to these nodes */
		nodemask_t user_nodemask;           /* nodemask passed by user */
	} w;
};

成员mode表示使用四种分配策略中的哪一种,联合体v根据不同的分配策略记录相应的分配信息。

另外,MPOL_PREFERRED策略有一种特殊的模式,当其flags置上MPOL_F_LOCAL标志后,将等同于MPOL_DEFAULT策略,内核默认使用此种策略,见全局变量default_policy。

内存策略涉及的分配函数有2个:alloc_pages_current()和alloc_page_vma(),可以分别为不同任务以及任务的不同VMA设置内存策略。

7.2.2 Buddy隐式分配

以默认的NUMA内存策略为例讲解,alloc_pages(gfp_flags, order)分配流程:
1) 得到本地节点对应的struct pglist_data结构,从而得到zonelist[2];
2) 如果gfp_flags含有__GFP_THISNODE标志,仅在此节点分配内存即使用本地节点的Legacy zonelist[1],否则使用zonelist[0] (见4.2.2.3节);
3) 遍历确定出来的zonelist结构中包含的每一个符合要求的zone,gfp_flags指定了本次分配中的最高的zone,如__GFP_HIGHMEM表示最高的zone为ZONE_HIGH;
4) 分配结束。

7.2.3 SLAB隐式分配

以默认的NUMA内存策略为例讲解,kmem_cache_alloc(cachep, gfp_flags)分配流程:
1) 调用____cache_alloc()函数在本地节点local_node分配,此函数无fall back行为;
2) 如果1)中本地节点内存不足分配失败,调用____cache_alloc_node(cachep, gfp_flags,local_node)再次尝试在本地节点分配,如果还失败此函数会进行fall back行为;
3) 分配结束。

7.3 小结

上文提到的所有的内存分配函数都允许fall back行为,但有2种情况例外:
1) __GFP_THISNODE分配标记限制了只能从某一个节点上分配内存;
2) MPOL_BIND策略,限制了只能从一个节点集合中的节点上分配内存;
(gfp_zone(gfp_flags) < policy_zone的情况,MPOL_BIND不限制节点)。

注:还有一种情况,CPUSET限制的内存策略,后面会介绍。

8 CPUSET

CPUSET基于CGROUP的框架构建的子系统,有如下特点:
1) 限定一组任务所允许使用的内存Node和CPU资源;
2) CPUSET在内核各子系统中添加的检测代码很少,对内核没有性能影响;
3) CPUSET的限定优先级高于内存策略(针对于Node)和绑定(针对于CPU);
4) 没有额外实现系统调用接口,只能通过/proc文件系统和用户交互。

本节只讲述CPUSET的使用方法和说明。

8.1 创建CPUSET

因为CPUSET只能使用/proc文件系统访问,所以第一步就要先mount cpuset文件系统,配置CONFIG_CGROUPS和CONFIG_CPUSETS后/proc/filesystems中将有这个文件系统。

CPUSET是分层次的,可以在cpuset文件系统根目录是最顶层的CPUSET,可以在其下创建CPUSET子项,创建方式很简单即创建一个新的目录。

mount命令:mount nodev –t cpuset /your_dir或mount nodev –t cgroup –o cpuset /your_dir

Mount成功后,进入mount目录,这个就是最顶层的CPUSET了(top_cpuset),下面附一个演示例子:

8.2 CPUSET文件

介绍几个重要的CPUSET文件:
1) tasks,实际上是CGROUPS文件,为此CPUSET包含的线程pid集合;
echo 100 > tasks

2) cgroup.procs是CGROUPS文件,为此CPUSET包含的线程组tgid集合;
echo 100 > cgroup.procs

3) cpus是CPUSET文件,表示此CPUSET允许的CPU;
echo 0-8 > cpus

4) mems是CPUSET文件,表示此CPUSET允许的内存节点; echo 0-1 > mems (对应于struct task_struct中的mems_allowed字段)

5) sched_load_balance,为CPUSET文件,设置cpus集合的CPU是否参与负载均衡; echo 0 > sched_load_balance (禁止负载均衡);默认值为1表示开启负载均衡;

6) sched_relax_domain_level,为CPUSET文件,数值代表某个调度域级别,大于此级别的调度域层次将禁用闲时均衡和唤醒均衡,而其余级别的调度域都开启; 也可以通过启动参数“relax_domain_level”设置,其值含义:
-1 : 无效果,此为默认值
0 - 设置此值会禁用所有调度域的闲时均衡和唤醒均衡
1 - 超线程域
2 - 核域
3 - 物理域
4 - NUMA域
5 - ALLNODES模式的NUMA域

7) mem_exclusive和mem_hardwall,为CPUSET文件,表示内存硬墙标记;默认为0,表示软墙;有关CPUSET的内存硬墙(HardWall)和内存软墙(SoftWall),下文会介绍;

8) memory_spread_page和memory_spread_slab,为CPUSET文件,设定CPUSET中的任务PageCache和SLAB(创建时置有SLAB_MEM_SPREAD)以Round-Robin方式使用内存节点(类似于MPOL_INTERLEAVE);默认为0,表示未开启;struct task_struct结构中增加成员cpuset_mem_spread_rotor记录下次使用的节点号;

9) memory_migrate,为CPUSET文件,表明开启此CPUSET的内存迁移,默认为0;

当一个任务从一个CPUSET1(mems值为0)迁移至另一个CPUSET2(mems值为1)的时候,此任务在节点0上分配的页面内容将迁移至节点1上分配新的页面(将数据同步到新页面),这样就避免了此任务的非本地节点的内存访问。

上图为单Node,8个CPU的系统。

1) 顶层CPUSET包含了系统中的所有CPU以及Node,而且是只读的,不能更改;
2) 顶层CPUSET包含了系统中的所有任务,可以更改;
3) child为新创建的子CPUSET,子CPUSET的资源不能超过父CPUSET的资源;
4) 新创建的CPUSET的mems和cpus都是空的,使用前必须先初始化;
5) 添加任务:设置tasks和cgroup.procs文件;
6) 删除任务:将任务重新添加至其它CPUSET(如顶层)就可以从本CPUSET删除任务。

8.3 利用CPUSET限定CPU和Node

设置步骤:
1) 在某个父CPUSET中创建子CPUSET;
2) 在子CPUSET目录下,输入指定的Node号至mems文件;
3) 在子CPUSET目录下,输入指定的Node号至mems文件;
4) 在子CPUSET目录下,设定任务至tasks或group.procs文件;
5) 还可以设置memory_migrate为1,激活内存页面的迁移功能。

这样限定后,此CPUSET中所有的任务都将使用限定的CPU和Node,但毕竟系统中的任务并不能完全孤立,比如还是可能会全局共享Page Cache,动态库等资源,因此内核在某些情况下还是可以允许打破这个限制,如果不允许内核打破这个限制,需要设定CPUSET的内存硬墙标志即mem_exclusive或mem_hardwall置1即可;CPUSET默认是软墙。

硬软墙用于Buddy系统的页面分配,优先级高于内存策略,请参考内核函数:

cpuset_zone_allowed_hardwall()和cpuset_zone_allowed_softwall()

另外,当内核分不到内存将导致Oops的时候,CPUSET所有规则将被打破,毕竟一个系统的正常运行才是最重要的:
1) __GFP_THISNODE标记分配内存的时候(通常是SLAB系统);
2) 中断中分配内存的时候;
3) 任务置有TIF_MEMDIE标记即被内核OOM杀死的任务。

8.4 利用CPUSET动态改变调度域结构

利用sched_load_balance文件可以禁用掉某些CPU的负载均衡,同时重新构建调度域,此功能类似启动参数“isolcpus”的功能。

8个CPU的系统中,系统中存在一个物理域,现需要禁掉CPU4~CPU7的负载均衡,配置步骤为:
1) “mkdir child”在顶层CPUSET中创建子CPUSET,记为child;
2) “echo 0-3 > child/cpus ”(新建CPUSET的sched_load_balance默认是是打开的);
3) “echo 0 > sched_load_balance”关闭顶层CPUSET的负载均衡。

操作过程见下图:

由图可见,CPU4~CPU7的调度域已经不存在了,具体效果是将CPU4~CPU7从负载均衡中隔离出来。

9 NUMA杂项

1) /sys/devices/system/node/中记录有系统中的所有内存节点信息;
2)任务额外关联一个/proc//numa_smaps文件信息;
3) tmpfs可以指定在某个Node上创建;
4) libnuma库和其numactl小工具可以方便操作NUMA内存;
5) … …

10 参考资料

  1. www.kernel.org
  2. ULK3
  3. XLP® Processor Family Programming Reference Manual