kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

TREE RCU实现

http://blog.csdn.net/junguo/article/details/8258231

http://blog.csdn.net/junguo/article/details/8258261

http://blog.csdn.net/junguo/article/details/8268277


TREE RCU实现之一 —— 数据结构

代码分布

在分析代码之前, 先看看代码的分布情况。RCU实现的代码包含在下列一些文件中,此处用到的是linux 3.6.4的代码。

< include/linux/rcupdate.h > RCU实现的头文件,所有使用RCU的代码都需要包含它
< include/rcutree.h > 包含rcupdate.h中没有包含的函数声明。
< include/rcutiny.h > 包含rcupdate.h中没有包含的函数声明。
< kernel/rcupdate.c > 包括一些RCU实现的基础函数的实现。
< kernel/rcutree.h > 包含Tree RCU用到的结构信息,TREE_RCU将所有的CPU组织成一颗树,通过层次结构来判别进程是否通过了宽限期,这种方式适用于多个CPU的系统。
< kernel/rcutree.c > 包含Tree RCU的主要实现代码。
< kernel/rcutree_plugin.h > 其实也是TREE RCU实现的一部分。主要包含了抢入式TreeRCU的代码。适用于抢入式的系统,抢入式的系统适用于需要低延迟的桌面或者嵌入式系统。
< kernel/rcutiny.c > Tiny RCU的主要实现代码,TINY_RCU适用于单个CPU,尤其是嵌入式操作系统。
< kernel/rcutiny_plugin.h > 主要包含了抢入式Tiny RCU的代码
< kernel/rcu.h > 定义了debug的接口,实现了__rcu_reclaim
< kernel/rcutorture.c> 对RCU进行稳定性测试的代码,通过配置CONFIG_RCU_TORTURE_TEST,可以在系统启动的时候运行稳定性测试。
< kernel/rcutree_trace.c> 通过配置CONFIG_RCU_TRACE,可以记录RCU的运行信息。
< include/trace/events/rcu.h> 为rcutree_trace.c定义的头文件。

RCU处理的基本流程

RCU实现的关键集中在宽限期的处理上,这个过程需要保证销毁对象前,当前系统中所有CPU上运行的进程都通过了静止状态(quiescent state)。

1, 程序调用call_rcu,将要删除的对象保存起来。并标记或者开始一个宽限期(同一时间只能运行一个宽限期,所以当已经有宽限期在运行的时候,其它的宽限期必须等待)。

2, 在读取数据开始和结尾处增加 rcu_read_lock 和 rcu_read_unlock来标记读过程。为了保证删除过程知道读过程的结束,在非抢占式RCU实现中是在rcu_read_lock开始处禁止进程抢占。这样做就可以保证再运行下一次进程切换的时候,读过程已经结束。其实系统也不会去统计各个CPU上是否存在过读线程,所以所有的CPU都会在进程切换的时候通知系统它处于进制状态。当所有的CPU都通过静止状态的时候,系统就会标记它通过了一个宽限期。

3,由于一个宽限期结束的时候,只有最后一个通过静止状态的CPU知道当前的宽限期已经结束,它并不会去通知其它CPU;同时出于性能考虑,系统也不会在宽限期结束后,马上去执行销毁过程。所以每个CPU都有一个固定的函数去检测是否有等待执行的宽限期,如果没有特别紧急的任务时,会去执行这些过程。

接下来,要分析Tree RCU的实现,先来看看它提供的一些接口函数。

1, call_rcu 与 synchronize_rcu都是删除对象时调用的函数。call_rcu将数据提交后会返回,而synchronize_rcu会调用call_rcu,并一直等待对象被删除后才返回。还有call_rcu_bh与synchronize_rcu_bh等接口函数,会在后续讲述。

2,rcu_read_lock 和 rcu_read_unlock

<linux/rcuupdate.h>

1
2
3
4
5
6
7
8
9
static inline void __rcu_read_lock(void)
{
	preempt_disable();
}

static inline void __rcu_read_unlock(void)
{
	preempt_enable();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static inline void rcu_read_lock(void)
{
	__rcu_read_lock();
	__acquire(RCU);
	rcu_lock_acquire(&rcu_lock_map);
	rcu_lockdep_assert(!rcu_is_cpu_idle(),
			"rcu_read_lock() used illegally while idle");
}
static inline void rcu_read_unlock(void)
{
	rcu_lockdep_assert(!rcu_is_cpu_idle(),
			"rcu_read_unlock() used illegally while idle");
	rcu_lock_release(&rcu_lock_map);
	__release(RCU);
	__rcu_read_unlock();
}

rcu_read_lock与rcu_read_unlock在非抢占式下的实现比较简单就是 preempt_disable与preempt_enable。这样做的目的是当调用schedule的时候,就可以肯定读的过程已经结束。其它_acquire(RCU)等函数是调试用的代码,暂不做讨论。

3, rcu_note_context_switch 在schedule中调用,每次进程切换就代表着一个静止状态。该函数会把当前的CPU状态设置为通过状态。

4, rcu_check_callbacks 在每次时钟周期里调用(update_process_times)。通过它会触发软件中断,软件中断对应着rcu_process_callbacks,这是一个真正繁忙的函数,他会检测当前CPU的状态,向父节点传递静止状态信息,调用注册函数等一系列工作。

在进一步了解这些函数之前,我们先来看看你Tree RCU的结构。

TREE RCU简介

在统计CPU的状态的时候,需要用到一个结构来存放所有CPU的状态。在早期的实现中,所有的状态都保存在一个结构中,这样做的后果是所有的CPU在更新自己状态的时候,都需要锁定该结构对象,一定程度上影响了系统性能。为了提高性能,把一定数目的CPU组成了一个节点(默认设定64个CPU为一个节点);当节点超过64个的时候,再把这些节点按64为单位划分为归属不同的父节点;如此类推,最后的一个单独的节点作为根节点。这样在更新CPU状态的时候,只需要锁定自己所属的节点就可以了。按节点设置的数目,可见这个结构只对CPU数成百上千的系统才真正起作用(我都没见过超过32个cpu的机器,不知道是啥样的感觉)。

这样所有的CPU就按层级结构组织了起来,也就是一个树结构。当一个系统的CPU数少于64个的时候,只要一个rcu_node就可以。

每个CPU在完成宽限期检测的时候,就会去更新它所属的rcu_node的值,当一个rcu_node所包含的CPU的状态都更新过以后,该node就会去更新它所属的父节点的值。直到最后一个根节点。

TREE RCU数据结构

为了实现该结构,系统提供了以下结构。

rcu_data

由于RCU需要统计每个CPU是否通过了宽限期,提供了rcu_data来保存信息。另外每个销毁的对象并不是直接删除,也保存在rcu_data中,等到合适的时机来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
struct rcu_data {  
	/* 1) 静止状态和宽限期处理: */  
	unsigned long   completed;      /* 对比 rsp->completed */  
									/* 目的是检测宽限期是否完成. */  
	unsigned long   gpnum;          /* 当前CPU上最高的宽限期数目*/  
									/* 在宽限期开始的时候设置. */  
	unsigned long   passed_quiesce_gpnum;  
									/* 已经通过的宽限期数目. */  
	bool            passed_quiesce; /* 是否通过了静止状态,在进程切换等状态会设置. */  
	bool            qs_pending;     /* 对于当前执行的宽限期,该CPU是否执行完成. */  
	bool            beenonline;     /* CPU是否在线,不在线的CPU需要特殊处理,以提高性能*/  
	bool            preemptible;    /* 是否抢占式RCU? */  
	struct rcu_node *mynode;        /* 这个CPU对应的 rcu_node */  
	unsigned long grpmask;          /* 占用1bit,对应与所属的rcu_node. */  
#ifdef CONFIG_RCU_CPU_STALL_INFO  
	unsigned long   ticks_this_gp;  /* The number of scheduling-clock */  
									/*  ticks this CPU has handled */  
									/*  during and after the last grace */  
									/* period it is aware of. */  
#endif /* #ifdef CONFIG_RCU_CPU_STALL_INFO */  
	/* 2) 批处理*/  
	/* 
	 * 
	 * 当nxtlist不为空的时候,会通过nxttail划分为以下几部分 
	 * 每一个部分为空的时候,它的指针会被设置成与它的下一部分相同 
	 * 当nxtlist为空的时候,所有的nxttail都会指向nxtlist的地址,这时候nxtlist指向NULL 
	 * 
	 * [nxtlist, *nxttail[RCU_DONE_TAIL]): 
	 *    批处理的开始节点# <= ->completed 
	 *    这些节点的宽限期已经完成,可以执行销毁操作。 
	 *    当调用rcu_process_callbacks()的时候,下一批完成宽限期的节点也会放到这儿. 
	 * [*nxttail[RCU_DONE_TAIL], *nxttail[RCU_WAIT_TAIL]): 
	 *    批处理的开始节点 # <= ->completed - 1: 等待当前的批处理完成 
	 * [*nxttail[RCU_WAIT_TAIL], *nxttail[RCU_NEXT_READY_TAIL]): 
	 *    已知的当下次宽限期开始,可以开始等待的节点。 
	 * [*nxttail[RCU_NEXT_READY_TAIL], *nxttail[RCU_NEXT_TAIL]): 
	 *    当前不确定下次宽限期开始后,是否可以开始等待状态的节点。 
	 *    *nxttail[RCU_NEXT_TAIL] 的值将永远是NULL, 
	 *    它表示nxtlist的结束. 
	 * 
	 */  
	struct rcu_head *nxtlist;  
	struct rcu_head **nxttail[RCU_NEXT_SIZE];  
	long            qlen_lazy;      /* # kfree_rcu调用的次数,kfee_rcu等同于call_rcu,只是它不需要销毁的对象提供销毁函数*/  
	long            qlen;           /* # 当前需要执行销毁操作的次数,每次call_rcu会加一,执行过后减一*/  
	long            qlen_last_fqs_check;  
									/* 对应与qlen,最后一次执行的次数*/  
	unsigned long   n_cbs_invoked;  /* 执行销毁操作的次数. */  
	unsigned long   n_cbs_orphaned; /* 统计离线后CPU上剩下的callback函数的个数 */  
	unsigned long   n_cbs_adopted;  /* 从离线后的CPU上移出的callback函数的个数 */  
	unsigned long   n_force_qs_snap;  
									/* 其它CPU是否在执行fore_qs? */  
	long            blimit;         /* nxtlist保存的上限 */  

	/* 3) 动态时钟,*/  
	struct rcu_dynticks *dynticks;  /* 每个CPU都包含一个动态时钟. */  
	int dynticks_snap;              /* 用于检测CPU是否在线. */  

	/* 4) 强制执行时候处理的CPU */  
	unsigned long dynticks_fqs;     /* 由于进入dynticks idle而被处理的CPU. */  
	unsigned long offline_fqs;      /* 由于不在在线被处理的CPU. */  

	/* 5) __rcu_pending() 的统计信息,这些信息都是在记录调用信息的时候使用. */  
	unsigned long n_rcu_pending;    /* rcu_pending() 调用次数,自从启动. */  
	unsigned long n_rp_qs_pending;  
	unsigned long n_rp_report_qs;  
	unsigned long n_rp_cb_ready;  
	unsigned long n_rp_cpu_needs_gp;  
	unsigned long n_rp_gp_completed;  
	unsigned long n_rp_gp_started;  
	unsigned long n_rp_need_fqs;  
	unsigned long n_rp_need_nothing;  

	/* 6) _rcu_barrier() 的回调函数. */  
	struct rcu_head barrier_head;  

	int cpu;  
	struct rcu_state *rsp;  
};  

1,completed ,gpnum , passed_quiesce_gpnum

gpnum表示当前正在运行的宽限期的个数,每当一个宽限期开始的时候,会设置这个值与其父节点相同。passed_quiesce_gpnum为当前CPU通过的宽限期个数,它的值在宽限期开始的时候小于gpnum,当这个CPU经过一个静止状态的时候,会把它设置成gpnum的值,通过对比它与父节点中的gpnum是否相同,可以确定该CPU是否通过了宽限期。passed_quiesce_gpnum只是表示这个CPU通过了宽限期,而completed表示所有的CPU都通过了宽限期,设置该值的同时,可以将nxtlist中等待的回调函数移动到完成队列。

2,nxtlist 与nxttail

nxtlist保存的是指向rcu_head对象,rcu_head的定义如下:

1
2
3
4
5
struct callback_head {
	struct callback_head *next;
	void (*func)(struct callback_head *head);
};
#define rcu_head callback_head

rcu_head的结构并不复杂,它包含一个回调函数指针。而next可以把rcu_head连成一个列表。

nxtlist指向一个rcu_head 列表,而nxttail的四个元素是指向指针的指针,它们指向的是rcu_head对象的next。RCU_DONE_TAIL指向的rcu_head对象之前的对象是可以销毁的对象。RCU_WAIT_TAIL指向的正在等待宽限期的元素,RCU_NEXT_READ_TAIL指向的是等待下次宽限期的元素,RCU_NEXT_TAIL指向最后一个元素,这个元素总是指向NULL。

rcu_node
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
struct rcu_node {  
	raw_spinlock_t lock;    /* rcu_node的锁,用来保护以下的一些成员*/  

	unsigned long gpnum;    /* 该节点当前的宽限期的数量 */  
							/* 该值等于或者比父节点的值小1*/  
	unsigned long completed; /* 该节点完成的宽限期数量*/  
							 /* 该值等于或者比父节点的值小1*/  
	unsigned long qsmask;   /* 标记这个节点对应的所有CPU或者子节点是否完成了当前的宽限期*/  
							/* 每一个bit对应一个cpu或者一个子节点.*/  
	unsigned long expmask;  /* 需要执行 ->blkd_tasks 的元素 */                              
							/*  (应用于TREE_PREEMPT_RCU). */  
	atomic_t wakemask;      /* 需要唤醒kthread的CPU. */  
							  
	unsigned long qsmaskinit;  
							/* 每个宽限期开始时,用它来初始化qsmask,不存在或者不在线的CPU需要清除. */  
	unsigned long grpmask;  /* 对应于父节点中的位置. */  
							/* 只是用一bit. */  
	int     grplo;          /* 该节点代表的CPU或者子节点开始的位置. */  
	int     grphi;          /* 该节点代表的CPU或者子节点结束的位置. */  
	u8      grpnum;         /* 下一级的CPU或者子节点的个数. */  
	u8      level;          /* 跟节点是 0. */  
	struct rcu_node *parent;  
	struct list_head blkd_tasks;  
							/* 阻断读关键段的任务列表 */  
							/*  */  
				 
	struct list_head *gp_tasks;  
							/* 指向第一个阻断读关键段的任务 */  
							  
							  
	struct list_head *exp_tasks;  

	/*以下为抢先式下加速RCU过程的变量*/

#ifdef CONFIG_RCU_BOOST  
	struct list_head *boost_tasks;  
							/* Pointer to first task that needs to be */  
							/*  priority boosted, or NULL if no priority */  
							/*  boosting is needed for this rcu_node */  
							/*  structure.  If there are no tasks */  
							/*  queued on this rcu_node structure that */  
							/*  are blocking the current grace period, */  
							/*  there can be no such task. */  
	unsigned long boost_time;  
							/* When to start boosting (jiffies). */  
	struct task_struct *boost_kthread_task;  
							/* kthread that takes care of priority */  
							/*  boosting for this rcu_node structure. */  
	unsigned int boost_kthread_status;  
							/* State of boost_kthread_task for tracing. */  
	unsigned long n_tasks_boosted;  
							/* Total number of tasks boosted. */  
	unsigned long n_exp_boosts;  
							/* Number of tasks boosted for expedited GP. */  
	unsigned long n_normal_boosts;  
							/* Number of tasks boosted for normal GP. */  
	unsigned long n_balk_blkd_tasks;  
							/* Refused to boost: no blocked tasks. */  
	unsigned long n_balk_exp_gp_tasks;  
							/* Refused to boost: nothing blocking GP. */  
	unsigned long n_balk_boost_tasks;  
							/* Refused to boost: already boosting. */  
	unsigned long n_balk_notblocked;  
							/* Refused to boost: RCU RS CS still running. */  
	unsigned long n_balk_notyet;  
							/* Refused to boost: not yet time. */  
	unsigned long n_balk_nos;  
							/* Refused to boost: not sure why, though. */  
							/*  This can happen due to race conditions. */  
#endif /* #ifdef CONFIG_RCU_BOOST */  
	struct task_struct *node_kthread_task;  
							/* kthread that takes care of this rcu_node */  
							/*  structure, for example, awakening the */  
							/*  per-CPU kthreads as needed. */  
	unsigned int node_kthread_status;  
							/* State of node_kthread_task for tracing. */  
} ____cacheline_internodealigned_in_smp;  

每个rcu_node代表着 一组CPU或者子节点。在非抢占式下,它的结构并不复杂。由于可能有多个CPU对它进行处理,所有进行相应操作的时候,需要lock保护。

rcu_state
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
struct rcu_state {  
	struct rcu_node node[NUM_RCU_NODES];    /* 保存了所有的节点. */  
	struct rcu_node *level[RCU_NUM_LVLS];   /* 每个层级所指向的节点. */  
	u32 levelcnt[MAX_RCU_LVLS + 1];         /* # 每一层的节点数. */  
	u8 levelspread[RCU_NUM_LVLS];           /* 每一层的CPU/节点数. */  
	struct rcu_data __percpu *rda;          /* 指向rcu_data. */  
	void (*call)(struct rcu_head *head,     /* rcu_barrier指向的回调函数. */  
				 void (*func)(struct rcu_head *head));  

	/* The following fields are guarded by the root rcu_node's lock. */  

	u8      fqs_state ____cacheline_internodealigned_in_smp;  
									      /* 调用force_quiescent_state时的状态. */  
	u8      fqs_active;                     /* force_quiescent_state() 正在运行*/  
									        
	u8      fqs_need_gp;                    /* 因为 force_quiescent_state() 正在运行*/  
									      /* 一个CPU需要运行的宽限期被阻止*/  

	u8      boost;                          /* 加速. */  
	unsigned long gpnum;                    /* 当前的宽限起数量. */  
	unsigned long completed;                /* # 最后一次完成的宽限期数量. */  

	/* 以下的成员被根rcu_node的lock保护. */  

	raw_spinlock_t onofflock;               /* 开始一个新的宽限期的时候,阻止CPU上下线*/  
									        
	struct rcu_head *orphan_nxtlist;        /* 等待宽限期的孤儿回调函数的列表 */  
									        
	struct rcu_head **orphan_nxttail;       /* 以上列表的结尾. */  
	struct rcu_head *orphan_donelist;       /* 需要执行的孤儿回调函数列表 */  
									        
	struct rcu_head **orphan_donetail;      /* 以上列表的结尾. */  
	long qlen_lazy;                         /* 懒惰回调函数的个数. */  
	long qlen;                              /* 总的回调函数的个数. */  
	struct task_struct *rcu_barrier_in_progress;  
									      /* 调用rcu_barrier()的进程, */  
									      /* 没有的话指向NULL. */  
	struct mutex barrier_mutex;             /* 执行barrier需要的互斥锁. */  
	atomic_t barrier_cpu_count;             /* # 等待barrier的CPU数 . */  
	struct completion barrier_completion;   /* 在barrier结束的时候调用. */  
	unsigned long n_barrier_done;           /* 在_rcu_barrier()开始结束处都需要调用++ */  
									        
	raw_spinlock_t fqslock;                 /* 只有一个进程能调用 force_quiescent_state().*/  
									        
	unsigned long jiffies_force_qs;         /* force_quiescent_state()开始的时间 */  
									        
	unsigned long n_force_qs;               /* 调用force_quiescent_state()的次数 */  
									        
	unsigned long n_force_qs_lh;            /* 因为lock不可用,而退出force_quiescent_state()的次数 */  
									        
	unsigned long n_force_qs_ngp;           /* 因为当前有宽限期执行,而退出force_quiescent_state()的次数*/  
									        
	unsigned long gp_start;                 /* 宽限期开始的时间*/  
									        
	unsigned long jiffies_stall;              
									     
	unsigned long gp_max;                   /*  最长的宽限的jiffie数 */  
									        
	char *name;                             /* 结构的名字. */  
	struct list_head flavors;               /* 系统中的rcu_state. */  
};  

rcu_state 保存了所有的node,宽限期的判断只要取出根节点,也就是第一个元素就可以。还有一些初始化要用到的变量。还有孤儿回调函数用于处理离线CPU遗留的信息。剩下还有很多统计信息,这些内容在讲解代码实现的时候再仔细考虑。


TREE RCU实现之二 —— 主干函数

RCU的实现集中在以下几个步骤:
1, 调用call_rcu,将回调函数增加到列表。
2, 开始一个宽限期。
3, 每个CPU报告自己的状态,直到最后一个CPU,结束一个宽限期。
4, 宽限期结束,每个CPU处理自己的回调函数。

call_rcu的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static void  
__call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),  
	   struct rcu_state *rsp, bool lazy)  
{  
	unsigned long flags;  
	struct rcu_data *rdp;  

	WARN_ON_ONCE((unsigned long)head & 0x3); /* 检测head在内存中是否对齐! */  
	debug_rcu_head_queue(head);  
	head->func = func;  
	head->next = NULL;  

	smp_mb(); /* Ensure RCU update seen before callback registry. */  

	/* 
	 * 这是一个检测宽限期开始或者结束的机会。 
	 * 当我们看到一个结束的时候,可能还会看到一个开始。 
	 * 反过来,看到一个开始的时候,不一定能看到一个结束, 
	 * 因为宽限期结束需要一定时间。 
	 */  
	local_irq_save(flags);  
	rdp = this_cpu_ptr(rsp->rda);  

	/* 将要增加callback到nxtlist. */  
	ACCESS_ONCE(rdp->qlen)++;  
	if (lazy)  
		rdp->qlen_lazy++;  
	else  
		rcu_idle_count_callbacks_posted();  
	smp_mb();  /* Count before adding callback for rcu_barrier(). */  
	*rdp->nxttail[RCU_NEXT_TAIL] = head;  
	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;  

	if (__is_kfree_rcu_offset((unsigned long)func))  
		trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,  
									 rdp->qlen_lazy, rdp->qlen);  
	else  
		trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);  

	/* 去处理rcu_core。 */  
	__call_rcu_core(rsp, rdp, head, flags);  
	local_irq_restore(flags);  
}  

call_rcu中最主要的工作,就是将回调函数加入到CPU的nxtlist列表。这里用到了指针处理的小技巧,我们来看看。首先看看nxttail的初始化:

1
2
3
4
5
6
7
8
static void init_callback_list(struct rcu_data *rdp)  
{  
	int i;  

	rdp->nxtlist = NULL;  
	for (i = 0; i < RCU_NEXT_SIZE; i++)  
		rdp->nxttail[i] = &rdp->nxtlist;  
}  

我们看到nxttail的全部成员都指向了nxtlist的地址。当nxtlist为空的时候,也是这个情形。

1
*rdp->nxttail[RCU_NEXT_TAIL] = head;       

当nxtlist为空的时候, *rdp->nxttail[RCU_NEXT_TAIL] 得到的其实就是nxtlist,将head的值赋予它。

1
rdp->nxttail[RCU_NEXT_TAIL] = &head->next;

之后 RCU_NEXT_TAIL指向 head的next指针。这样当再有一个节点加入的时候,*rdp->nxttail[RCU_NEXT_TAIL]得到的其实就是前一次加入的head的next指针,它将指向新加入的值。如此,nxtlist就成为了一个链表。或者这样理解,rdp->nxttail[RCU_NEXT_TAIL] 指向的就是nxtlist中最后一个节点的 next指针。

除了将回调函数插入,该函数其它代码多为检查代码。而最后要调用__call_rcu_core,该函数的功用主要是在回调函数太多或者等待时间过长的状态下,强制执行RCU状态更新。我们暂时不关注。

开始一个宽限期

在一个宽限期结束,或者当一个CPU检测到自身有需要一个宽限期的时候会开始一个新的宽限期,开始宽限期的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
static void  
rcu_start_gp(struct rcu_state *rsp, unsigned long flags)  
	__releases(rcu_get_root(rsp)->lock)  
{  
	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);  
	struct rcu_node *rnp = rcu_get_root(rsp);  

	if (!rcu_scheduler_fully_active ||  
			!cpu_needs_another_gp(rsp, rdp)) {  
		/* 
		 * 如果scheduler 还没有启动non-idle任务 
		 * 或者不需要启动一个新的宽限期则退出。 
		 * 需要再次判断cpu_needs_another_gp, 
		 * 是因为可能有多个CPU执行这个过程。 
		 */  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
		return;  
	}  

	if (rsp->fqs_active) {  
		/* 
		 * 这个CPU需要一个宽限期,而force_quiescent_state() 
		 * 正在运行,告诉它开始一个。 
		 */  
		rsp->fqs_need_gp = 1;  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
		return;  
	}  

	/* 开始一个新的宽限期并且初始化。 */  
	rsp->gpnum++;  
	trace_rcu_grace_period(rsp->name, rsp->gpnum, "start");  
	WARN_ON_ONCE(rsp->fqs_state == RCU_GP_INIT);  
	rsp->fqs_state = RCU_GP_INIT; /* 阻止 force_quiescent_state。 */  
	rsp->jiffies_force_qs = jiffies + RCU_JIFFIES_TILL_FORCE_QS;  
	record_gp_stall_check_time(rsp);  
	raw_spin_unlock(&rnp->lock);  /* leave irqs disabled. */  

	/* 排除CPU的热插拔。*/  
	raw_spin_lock(&rsp->onofflock);  /* irqs already disabled. */  

	/* 
	 * 从父节点开始以广度优先的方式,遍历所有的节点,设置qsmask的值, 
	 * 所有在线CPU所在bit都将被设置成1。 
	 * 通过遍历rsp->node[]数组就可以达到这个目的。 
	 * 其它CPU在自己所属的节点还没有被设置前,只有可能访问这个节点, 
	 * 因为它所作的判断是宽限期还没有开始。 
	 * 此外,我们排除了CPU热插拔。 
	 *  
	 * 直到初始化过程完成之前,这个宽限期不可能完成,因为至少当前的 
	 * CPU所属的bit将不会被设置。这个是因为我们启动了禁止中断,所以 
	 * 这个CPU不会调用到宽限期检测代码。 
	 */  
	rcu_for_each_node_breadth_first(rsp, rnp) {  
		raw_spin_lock(&rnp->lock);      /* irqs already disabled. */  
		rcu_preempt_check_blocked_tasks(rnp);  
		rnp->qsmask = rnp->qsmaskinit;  
		rnp->gpnum = rsp->gpnum;  
		rnp->completed = rsp->completed;  
		if (rnp == rdp->mynode)  
			rcu_start_gp_per_cpu(rsp, rnp, rdp);  
		rcu_preempt_boost_start_gp(rnp);  
		trace_rcu_grace_period_init(rsp->name, rnp->gpnum,  
							rnp->level, rnp->grplo,  
							rnp->grphi, rnp->qsmask);  
		raw_spin_unlock(&rnp->lock);    /* irqs remain disabled. */  
	}

	rnp = rcu_get_root(rsp);  
	raw_spin_lock(&rnp->lock);              /* irqs already disabled. */  
	rsp->fqs_state = RCU_SIGNAL_INIT; /* force_quiescent_state now OK. */  
	raw_spin_unlock(&rnp->lock);            /* irqs remain disabled. */  
	raw_spin_unlock_irqrestore(&rsp->onofflock, flags);  
}  

标记一个新的宽限期开始,rcu_state要做的就是将gp_num加1。然后再设置所有node,qsmask被设置成qsmasinit,qsmask每个bit代表一个CPU,所有在线的CPU都将被设置成1;gpnum将被设置成新值。嗯,一个新宽限期的开始只需要设置这些标记位。

CPU的宽限期检测

当一个宽限期开始后,每个CPU都需要检测自己的状态,如果已经通过静止状态,那么就向上一级node进行报告。

这个处理过程,可以分为两个步骤:
1, 检测新的处理过程开始,设置rcu_data中的gpnum和passed_quiesce,另外用qs_pending标记一个待处理的新宽限期的开始。
2, 一个静止状态结束,向上一级node报告这个过程。

这两个过程通过rcu_check_quiescent_state()来实现,需要注意的是这个函数隔一段时间调用一次,并不只调用一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/* 
* 检测这个CPU是否还不知道一个新宽限期开始,如果是设置它的变量。 
* 否则检查它是不是第一次通过静止状态,如果是,向上报告。 
*/  
static void  
rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	/* 如果有新的宽限期开始,记录它并返回。*/  
	if (check_for_new_grace_period(rsp, rdp))  
		return;  

	/* 
	 * 这个CPU是否已经处理过它的宽限期?如果是返回。 
	 */  
	if (!rdp->qs_pending)  
		return;  

	/* 
	 * 是否通过了静止状态?如果没有,返回。 
	 */  
	if (!rdp->passed_quiesce)  
		return;  

	/* 
	 * 向所属的node报告。(但rcu_report_qs_rdp() 仍然会去判断它)。 
	 */  
	rcu_report_qs_rdp(rdp->cpu, rsp, rdp, rdp->passed_quiesce_gpnum);  
}  

A, CPU检测新宽限期的开始
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/*  
 * 为当前CPU,更新rcu_data的状态,去标记一个新宽限期的开始 
 * 如果当前CPU启动了一个宽限期或者检测到一个新的宽限期开始, 
 * 都需要调用这个函数。这个过程必须锁定父节点的lock,另外需 
 * 要禁止中断 
 */  
static void __note_new_gpnum(struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp)  
{  
	if (rdp->gpnum != rnp->gpnum) {  
		/* 
		 * 如果当前的宽限期需要处理这个CPU的状态,设置并 
		 * 去检测它的静止状态。否则不要去管它。 
		 */          
		rdp->gpnum = rnp->gpnum;  
		trace_rcu_grace_period(rsp->name, rdp->gpnum, "cpustart");  
		if (rnp->qsmask & rdp->grpmask) {  
			rdp->qs_pending = 1;  
			rdp->passed_quiesce = 0;  
		} else {  
			rdp->qs_pending = 0;  
		}  
		zero_cpu_stall_ticks(rdp);  
	}  
}  

static void note_new_gpnum(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	unsigned long flags;  
	struct rcu_node *rnp;  

	local_irq_save(flags);  
	rnp = rdp->mynode;  
	if (rdp->gpnum == ACCESS_ONCE(rnp->gpnum) || /* outside lock. */  
			!raw_spin_trylock(&rnp->lock)) { /* irqs already off, so later. */  
		local_irq_restore(flags);  
		return;  
	}  
	__note_new_gpnum(rsp, rnp, rdp);  
	raw_spin_unlock_irqrestore(&rnp->lock, flags);  
}  

/* 
 * 在我们的上次检测之后,其它CPU启动了一个新的宽限期? 
 * 如果是更新相应的rcu_data的状态。 
 * 必须是在rdp对应的CPU上执行。 
 */  
static int  
check_for_new_grace_period(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	unsigned long flags;  
	int ret = 0;  

	local_irq_save(flags);  
	if (rdp->gpnum != rsp->gpnum) {  
		note_new_gpnum(rsp, rdp);  
		ret = 1;  
	}  
	local_irq_restore(flags);  
	return ret;  
}  

check_for_new_grace_period 和 note_new_gpnum分别用来检测rdp的gpnum与rsp已经对应的rnp的值是否相同,来确定是否有一个新的宽限期开始。之所以需要检测两次,是因为在rsp设置以后,rnp可能并没有设置完成。

__note_new_gpnum 将设置gpnum的值。另外设置 qs_pending为1,该标记位代表该节点还没有向父节点报告自己的状态;passed_quiesce为0,表示需要一个静止状态,设置该位是因为下次调用rcu_check_quiescent_state()可能是在一个读过程还没有结束的时候。

qs_pending的状态有可能为0,这只在以下情形下出现:当前CPU在宽限期开始的时候实在离线状态,而现在变成了在线。

我们注意到在 check_for_new_grace_period检测到有新的宽限期开始后,rcu_check_quiescent_state将直接返回,因为这个宽限期可能是在该CPU的上一个静止状态之前已经开始,所以需要等待下一个静止状态。

B,CPU报告静止状态

当再一次调用到rcu_check_quiescent_state()的时候,check_for_new_grace_period()将返回FALSE,接着运行后面的函数来判断 qs_pending 和 passed_quiesce 的值来决定是否调用rcu_report_qs_rdp。需要判断qs_peding是因为当这次rcu_report_qs_rdp调用成功的时候,下次再运行rcu_check_quiescent_state()则不需要继续运行后续函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static void  
rcu_report_qs_rdp(int cpu, struct rcu_state *rsp, struct rcu_data *rdp, long lastgp)  
{  
	unsigned long flags;  
	unsigned long mask;  
	struct rcu_node *rnp;  

	rnp = rdp->mynode;  
	raw_spin_lock_irqsave(&rnp->lock, flags);  
	if (lastgp != rnp->gpnum || rnp->completed == rnp->gpnum) {  
		/* 
		 * 如果宽限期的处理已经完成,那么返回。 
		 */          
		rdp->passed_quiesce = 0; /* need qs for new gp. */  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
		return;  
	}  
	mask = rdp->grpmask;  
	if ((rnp->qsmask & mask) == 0) {  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
	} else {  
		rdp->qs_pending = 0;  
		/* 
		 *  可以确定这个宽限期还没有结束,所以可以确定当前CPU上的 
		 *  所有回调函数可以在下次宽限期结束后处理。 
		 */  
		rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];  

		rcu_report_qs_rnp(mask, rsp, rnp, flags); /* rlses rnp->lock */  
	}  
}  

从我看来,这个函数只会调用到最后一个else分支,而之前的连个if分支都不会调用到。因为在调用该函数前,代码已经做了必要的检测。

以此来看,这个函数的功用就是设置qs_pending的值,阻止这次宽限期没有完成之前再次调用掉该函数;设置nxttail,决定下次宽限期后可以执行的回调函数;然后向父节点报告静止状态完成。

C,向上报告
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static void  
rcu_report_qs_rnp(unsigned long mask, struct rcu_state *rsp,  
	  struct rcu_node *rnp, unsigned long flags)  
	__releases(rnp->lock)  
{  
	struct rcu_node *rnp_c;  

	/* 向上遍历所有层级 */  
	for (;;) {  
		if (!(rnp->qsmask & mask)) {  
			/* 这个CPU的标记已经被清除,证明已经处理过了,返回 */  
			raw_spin_unlock_irqrestore(&rnp->lock, flags);  
			return;  
		}  
		rnp->qsmask &= ~mask;  
		trace_rcu_quiescent_state_report(rsp->name, rnp->gpnum,  
					 mask, rnp->qsmask, rnp->level,  
					 rnp->grplo, rnp->grphi,  
					 !!rnp->gp_tasks);  
		if (rnp->qsmask != 0 || rcu_preempt_blocked_readers_cgp(rnp)) {  
			/* 这个节点中还有其它CPU没有处理完成,那么返回 */  
			raw_spin_unlock_irqrestore(&rnp->lock, flags);  
			return;  
		}  
		mask = rnp->grpmask;  
		if (rnp->parent == NULL) {  
			/* 到这儿,已经到了根节点 */  
			break;  
		}  
		raw_spin_unlock_irqrestore(&rnp->lock, flags);  
		rnp_c = rnp;  
		rnp = rnp->parent;  
		raw_spin_lock_irqsave(&rnp->lock, flags);  
		WARN_ON_ONCE(rnp_c->qsmask);  
	}  
	/* 
	 *  程序运行到这儿,说明所有的CPU都通过了宽限期, 
	 *  那么调用rcu_report_qs_rsp()来结束这个宽限期。 
	 */   
	rcu_report_qs_rsp(rsp, flags); /* releases rnp->lock. */  
}  

这个过程并不复杂,清理rnp中qsmask对应该CPU的bit。然后判断该节点是否处理完成,如果是则继续向上调用,否则就退出函数。最后一个CPU调用后,可以调用到rcu_report_qs_rsp()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
static void rcu_report_qs_rsp(struct rcu_state *rsp, unsigned long flags)  
	__releases(rcu_get_root(rsp)->lock)  
{  
	unsigned long gp_duration;  
	struct rcu_node *rnp = rcu_get_root(rsp);  
	struct rcu_data *rdp = this_cpu_ptr(rsp->rda);  

	WARN_ON_ONCE(!rcu_gp_in_progress(rsp));  

	/* 
	 * Ensure that all grace-period and pre-grace-period activity 
	 * is seen before the assignment to rsp->completed. 
	 */  
	smp_mb(); /* See above block comment. */  
	gp_duration = jiffies - rsp->gp_start;  
	if (gp_duration > rsp->gp_max)  
		rsp->gp_max = gp_duration;  

	/* 
	 * 当前CPU知道宽限期已经结束,不过其它CPU都认为它还在运行。 
	 * 由于completed还没有设置,其它CPU都不会对父node进行处理。 
	 * 所以这时候将各个node标记为完成是安全的。 
	 *  
	 * 不过当前CPU有等待下一次宽限期的回调函数的时候,我们会 
	 * 先去处理下一个宽限期。 
	 * 这儿使用RCU_WAIT_TAIL代替了RCU_DONE_TAIL,这是因为当前 
	 * CPU还没有进一步处理完成状态,当前RCU_WAIT_TAIL状态的元 
	 * 素其实在这次宽限期结束后,已经可以执行了。 
	 *  
	 */  
	if (*rdp->nxttail[RCU_WAIT_TAIL] == NULL) {  
		raw_spin_unlock(&rnp->lock);  /* irqs remain disabled. */  

		/* 
		 * 设置 rnp->completed的值,避免这个过程要等到下一次宽限期开始。          
		 */  
		rcu_for_each_node_breadth_first(rsp, rnp) {  
			raw_spin_lock(&rnp->lock); /* irqs already disabled. */  
			rnp->completed = rsp->gpnum;  
			raw_spin_unlock(&rnp->lock); /* irqs remain disabled. */  
		}  
		rnp = rcu_get_root(rsp);  
		raw_spin_lock(&rnp->lock); /* irqs already disabled. */  
	}  

	rsp->completed = rsp->gpnum;  /* Declare the grace period complete. */  
	trace_rcu_grace_period(rsp->name, rsp->completed, "end");  
	rsp->fqs_state = RCU_GP_IDLE;  
	rcu_start_gp(rsp, flags);  /* releases root node's rnp->lock. */  
}  

这个过程最主要的内容就是设置rsp->completed的值,中间多了对node的处理。因为在rcu_start_gp中也会对node进行处理,当前CPU无法判断其它CPU是否需要一个宽限期,但它自身还有等待宽限期的回调函数的时候,它确定会有一个新的宽限期马上开始,所以忽略这个过程。

CPU的宽限期结束处理

这个过程也可以分为两个步骤,第一步是检查宽限期是否结束,第二步是调用已完成的回调函数。

A, CPU检测宽限期的结束

每个CPU都会定期检查当前的宽限期是否结束,如果结束将处理自身状态已经nxtlist表。rcu_process_gp_end就是用来做这个事情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void  
rcu_process_gp_end(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	unsigned long flags;  
	struct rcu_node *rnp;  

	local_irq_save(flags);  
	rnp = rdp->mynode;  
	if (rdp->completed == ACCESS_ONCE(rnp->completed) || /* outside lock. */  
			!raw_spin_trylock(&rnp->lock)) { /* irqs already off, so later. */  
		local_irq_restore(flags);  
		return;  
	}  
	__rcu_process_gp_end(rsp, rnp, rdp);  
	raw_spin_unlock_irqrestore(&rnp->lock, flags);  
}  

当 rdp->completed与rnp->completed的值不同的时候,会调用__rcu_process_gp_end来完成具体的工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static void  
__rcu_process_gp_end(struct rcu_state *rsp, struct rcu_node *rnp, struct rcu_data *rdp)  
{  
	/* 之前的宽限期是否完成? */  
	if (rdp->completed != rnp->completed) {  

		/* 推进回调函数,即使是NULL指针也没关系。 */  
		rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[RCU_WAIT_TAIL];  
		rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_READY_TAIL];  
		rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];  

		/* 更新completed。 */  
		rdp->completed = rnp->completed;  
		trace_rcu_grace_period(rsp->name, rdp->gpnum, "cpuend");  

		/* 
	   * 如果当前的CPU在外部的静止的状态(如离线状态), 
		 * 可能已经错过了其它CPU发起的宽限期。所以需要更 
		 * 新gpnum的值,同时要注意不要错过当前正在运行的 
		 * 宽限期,所以它的值被设置成与rnp->completed相同, 
		 * 此时rnp->gpnum 可以已经加1,那么后续的调用 
		 * rcu_check_quiescent_state()会去检测新的宽限期。 
		 */       
		if (ULONG_CMP_LT(rdp->gpnum, rdp->completed))  
			rdp->gpnum = rdp->completed;  

		/* 
		 * 如果下次的宽限期不需要当前CPU报告静止状态, 
		 * 设置qs_pending为0。 
		 */  
		if ((rnp->qsmask & rdp->grpmask) == 0)  
			rdp->qs_pending = 0;  
	}  
}  

这个过程的重点是设置nxttail的值,将根据它来进行下一步的处理。

B,回调函数的调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	unsigned long flags;  
	struct rcu_head *next, *list, **tail;  
	int bl, count, count_lazy, i;  

	/* 没有回调函数,那么返回。*/  
	if (!cpu_has_callbacks_ready_to_invoke(rdp)) {  
		trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, 0);  
		trace_rcu_batch_end(rsp->name, 0, !!ACCESS_ONCE(rdp->nxtlist),  
				need_resched(), is_idle_task(current),  
				rcu_is_callbacks_kthread());  
		return;  
	}  

	/* 
	 * 提取回调函数的list,需要禁用中断,以防止调用call_rcu()。  
	 */   
	local_irq_save(flags);  
	WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));  
	bl = rdp->blimit;  
	trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, bl);  
	list = rdp->nxtlist;  
	/*  
	 * 已经将list指向了nxtlist,此时将nxtlist指向 *rdp->nxttail[RCU_DONE_TAIL]。 
	 * 由于nxttail指向的是 rcu_head中的next指针的地址,所以此处得到的就是next所 
	 * 指向的rcu_head对象。 
	 */  
	rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];  
	/*将*rdp->nxttail[RCU_DONE_TAIL]指向NULL,也就是将list中的最后一个元素的next设置成NULL*/  
	*rdp->nxttail[RCU_DONE_TAIL] = NULL;  
	/*tail指向list最后一个元素的next指针的地址*/  
	tail = rdp->nxttail[RCU_DONE_TAIL];  
	/*此时rdp->nxttail[RCU_DONE_TAIL]指向的内容已经移出,所以让它重新指向nxtlist的地址*/  
	for (i = RCU_NEXT_SIZE - 1; i >= 0; i--)  
	if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])  
		rdp->nxttail[i] = &rdp->nxtlist;  
	local_irq_restore(flags);  

	/* 调用回调函数 */  
	count = count_lazy = 0;  
	while (list) {  
		next = list->next;  
		prefetch(next);  
		debug_rcu_head_unqueue(list);  
		if (__rcu_reclaim(rsp->name, list))  
			count_lazy++;  
		list = next;  
		/* 当已经全部运行完毕或者CPU有更重要的事情的时候,退出循环。 */  
		if (++count >= bl &&  
				(need_resched() ||  
				(!is_idle_task(current) && !rcu_is_callbacks_kthread())))  
			break;  
	}  

	local_irq_save(flags);  
	trace_rcu_batch_end(rsp->name, count, !!list, need_resched(),  
			is_idle_task(current),  
			rcu_is_callbacks_kthread());  

	/* 更新数量。并将没有执行完的回调函数重新放进列表。 */  
	if (list != NULL) {  
	*tail = rdp->nxtlist;  
	rdp->nxtlist = list;  
	for (i = 0; i < RCU_NEXT_SIZE; i++)  
		if (&rdp->nxtlist == rdp->nxttail[i])  
			rdp->nxttail[i] = tail;  
		else  
			break;  
	}  
	smp_mb(); /* 为了 rcu_barrier()统计运行过的回调函数 */  
	rdp->qlen_lazy -= count_lazy;  
	ACCESS_ONCE(rdp->qlen) -= count;  
	rdp->n_cbs_invoked += count;  

	/* Reinstate batch limit if we have worked down the excess. */  
	if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)  
		rdp->blimit = blimit;  

	/* Reset ->qlen_last_fqs_check trigger if enough CBs have drained. */  
	if (rdp->qlen == 0 && rdp->qlen_last_fqs_check != 0) {  
		rdp->qlen_last_fqs_check = 0;  
		rdp->n_force_qs_snap = rsp->n_force_qs;  
	} else if (rdp->qlen < rdp->qlen_last_fqs_check - qhimark)  
		rdp->qlen_last_fqs_check = rdp->qlen;  
	WARN_ON_ONCE((rdp->nxtlist == NULL) != (rdp->qlen == 0));  

	local_irq_restore(flags);  

	/* 如果还有回调函数没有执行,通知再次调用软中断 */  
	if (cpu_has_callbacks_ready_to_invoke(rdp))  
		invoke_rcu_core();  
}  

rcu_do_batch主要作用是取出nxtlist中,nxttail[RCU_DONE_TAIL]之前的元素,遍历执行它们。这时候销毁过程真正的执行了。这段函数需要仔细想想nxttail的处理。

到此RCU中涉及到的主干函数介绍完了,但是还需要与进程切换等过程交互。将在下节分析它们。


TREE RCU实现之三 —— 定期调用

上一节,介绍过了RCU实现中用到的主要函数。不过还需要定期的运行这些函数,整个机制才完整。

RCU的实现是通过在update_process_times() 中调用rcu_check_callbacks()来达到这个目的的。每个CPU都会定期的调用update_process_times()。rcu_check_callbacks()会去检查当前的RCU机制中是否有需要处理的内容,如当前CPU需要开启一个新的宽限期,当前CPU上的宽限期还没有处理完成。如果有需要处理的内容,将触发一个软件中断,真正的操作由软件中断触发的rcu_process_callbacks()来完成。

rcu_check_callbacks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void rcu_check_callbacks(int cpu, int user)  
{  
	trace_rcu_utilization("Start scheduler-tick");  
	increment_cpu_stall_ticks();  
	if (user || rcu_is_cpu_rrupt_from_idle()) {  
		 /* 
		  * 如果是从用户模式或者是idle模式调用该函数, 
		  * 那么这个CPU是静止状态。 
		  *  
		  * 此处不需要内存屏障。因为rcu_sched_qs()和 
		  * and rcu_bh_qs()支处理CPU自身的局部变量, 
		  * 其它CPU不会访问和修改,至少当CPU在线的时候。 
		  *  
		  */                  
		  rcu_sched_qs(cpu);  
		  rcu_bh_qs(cpu);          
	} else if (!in_softirq()) {                  
		 /* 
		  * 运行到这儿,如果不是软件中断。如果当前CPU上运行的 
		  * 软中断的读过程,肯定已经完成,所以标记它。 
		  * 
		  */                 
		 rcu_bh_qs(cpu);  
	}  
	rcu_preempt_check_callbacks(cpu); /*抢先式下的检测*/  
	if (rcu_pending(cpu))  
		invoke_rcu_core();  
	trace_rcu_utilization("End scheduler-tick");  
}  

该函数的主要功能是通过 rcu_pending()判断是否当前有需要处理的rcu内容,如果有调用invoke_rcu_core()。

1
2
3
4
5
6
7
8
9
static int rcu_pending(int cpu)  
{  
	struct rcu_state *rsp;  

	for_each_rcu_flavor(rsp)  
	if (__rcu_pending(rsp, per_cpu_ptr(rsp->rda, cpu)))  
		return 1;  
	return 0;  
}  

rcu_pending会循环所有的rcu_state,在非抢占式模式下,有rcu_sched_state 和rcu_bh_state 两个实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)  
{  
	struct rcu_node *rnp = rdp->mynode;  

	rdp->n_rcu_pending++;  

	/* Check for CPU stalls, if enabled. */  
	check_cpu_stall(rsp, rdp);  

	/*  是否宽限期在等待这个CPU去完成静止状态呢?  */  
	if (rcu_scheduler_fully_active &&  
			rdp->qs_pending && !rdp->passed_quiesce) {  

		/* 
		 * 如果force_quiescent_state() 需要马上执行,而这个CPU 
		 * 需要一个静止状态,强制执行本地进程切换。       
		 */  
		rdp->n_rp_qs_pending++;  
		if (!rdp->preemptible &&  
			ULONG_CMP_LT(ACCESS_ONCE(rsp->jiffies_force_qs) - 1,  
			 jiffies))  
		set_need_resched();  
	} else if (rdp->qs_pending && rdp->passed_quiesce) {  
		rdp->n_rp_report_qs++;  
		return 1;  
	}  

	/* 这个CPU是否有callbacks等着调用? */  
	if (cpu_has_callbacks_ready_to_invoke(rdp)) {  
		rdp->n_rp_cb_ready++;  
		return 1;  
	}  

	/* 当前CPU有需要执行的宽限期,而没有其它的宽限期在执行?  */  
	if (cpu_needs_another_gp(rsp, rdp)) {  
		rdp->n_rp_cpu_needs_gp++;  
		return 1;  
	}  

	/* 另一个CPU上执行的宽限期结束?   */  
	if (ACCESS_ONCE(rnp->completed) != rdp->completed) { /* outside lock */  
			rdp->n_rp_gp_completed++;  
		return 1;  
	}  

	/* 有新的RCU开始? */  
	if (ACCESS_ONCE(rnp->gpnum) != rdp->gpnum) { /* outside lock */  
			rdp->n_rp_gp_started++;  
		return 1;  
	}  

	/* 一个宽限期运行了太长时间,需要强制执行? */  
	if (rcu_gp_in_progress(rsp) &&  
			ULONG_CMP_LT(ACCESS_ONCE(rsp->jiffies_force_qs), jiffies)) {  
		rdp->n_rp_need_fqs++;  
		return 1;  
	}  

	/* 无事可做 */  
	rdp->n_rp_need_nothing++;  
	return 0;  
}  

__rcu_pending 判断了可能存在的各种情形,如果有需要处理的工作的话,就返回1,否则返回0。

1
2
3
4
5
6
7
8
9
static void invoke_rcu_core(void)  
{  
	raise_softirq(RCU_SOFTIRQ);  
}  


 invoke_rcu_core()的作用是开启软中断。在初始化的时候,系统已经注册了软中断。

open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
1
2
3
4
5
6
7
8
9
static void rcu_process_callbacks(struct softirq_action *unused)  
{  
	struct rcu_state *rsp;  

	trace_rcu_utilization("Start RCU core");  
	for_each_rcu_flavor(rsp)  
	__rcu_process_callbacks(rsp);  
	trace_rcu_utilization("End RCU core");  
}  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static void  
__rcu_process_callbacks(struct rcu_state *rsp)  
{  
	unsigned long flags;  
	struct rcu_data *rdp = __this_cpu_ptr(rsp->rda);  

	WARN_ON_ONCE(rdp->beenonline == 0);  

	/* 
	 * 如果一个宽限期运行了很长时间,那么强制静止状态。 
	 *  
	 */  
	if (ULONG_CMP_LT(ACCESS_ONCE(rsp->jiffies_force_qs), jiffies))  
		force_quiescent_state(rsp, 1);  

	/* 
	 * 处理宽限期结束相关内容。 
	 */  
	rcu_process_gp_end(rsp, rdp);  

	/* 检测是否有新的宽限期开始或者静止状态需要向上报告。 */  
	rcu_check_quiescent_state(rsp, rdp);  

	/* 当前CPU需要新的宽限期吗? */  
	if (cpu_needs_another_gp(rsp, rdp)) {  
		raw_spin_lock_irqsave(&rcu_get_root(rsp)->lock, flags);  
		rcu_start_gp(rsp, flags);  /* releases above lock */  
	}  

	/* 如果有等着调用的回调函数,那么调用它。 */  
	if (cpu_has_callbacks_ready_to_invoke(rdp))  
		invoke_rcu_callbacks(rsp, rdp);  
}  

软件中断其实就是调用之前提到过的函数来完成具体的任务。

Linux kernel 内存屏障在RCU上的应用

http://blog.csdn.net/jianchaolv/article/details/7527647

内存屏障主要解决的问题是编译器的优化和CPU的乱序执行。

编译器在优化的时候,生成的汇编指令可能和c语言程序的执行顺序不一样,在需要程序严格按照c语言顺序执行时,需要显式的告诉编译不需要优化,这在linux下是通过barrier()宏完成的,它依靠volidate关键字和memory关键字,前者告诉编译barrier()周围的指令不要被优化,后者作用是告诉编译器汇编代码会使内存里面的值更改,编译器应使用内存里的新值而非寄存器里保存的老值。

同样,CPU执行会通过乱序以提高性能。汇编里的指令不一定是按照我们看到的顺序执行的。linux中通过mb()系列宏来保证执行的顺序。简单的说,如果在程序某处插入了mb()/rmb()/wmb()宏,则宏之前的程序保证比宏之后的程序先执行,从而实现串行化。

即使是编译器生成的汇编码有序,处理器也不一定能保证有序。就算编译器生成了有序的汇编码,到了处理器那里也拿不准是不 是会按照代码顺序执行。所以就算编译器保证有序了,程序员也还是要往代码里面加内存屏障才能保证绝对访存有序,这倒不如编译器干脆不管算了,因为内存屏障 本身就是一个sequence point,加入后已经能够保证编译器也有序。

处理器虽然乱序执行,但最终会得出正确的结果,所以逻辑上讲程序员本不需要关心处理器乱序的问题。但是在SMP并发执行的情况下,处理器无法知道并发程序之间的逻辑,比如,在不同core上的读者和写者之间的逻辑。简单讲,处理器只保证在单个core上按照code中的顺序给出最终结果。这就要求程序员通过mb()/rmb()/wmb()/read_barrier_depends来告知处理器,从而得到正确的并发结果。内存屏障、数据依赖屏障都是为了处理SMP环境下的数据同步问题,UP根本不存在这个问题。

下面分析下内存屏障在RCU上的应用:

1
2
3
4
5
6
7
8
9
10
#define rcu_assign_pointer(p, v) ({ \
	smp_wmb();                      \
	(p)= (v);                       \
})

#define rcu_dereference(p) ({     \
	typeof(p)_________p1 = p;     \
	smp_read_barrier_depends();   \
	(_________p1);                \
}) 

rcu_assign_pointer()通常用于写者的发布,rcu_dereference()通常用于读者的订阅。

写者:

1
2
3
4
5
6
7
p->a = 1;
p->b = 2;
p->c = 3;
rcu_assign_pointer(gp, p);

// 如果gp的原值马上会被改变/释放,则需要synchronize_rcu()/synchronize_net(),
// 如: 模块的卸载, 原gp指向函数被释放

读者:

1
2
3
4
5
6
rcu_read_lock();
p = rcu_dereference(gp);
if (p != NULL) {
	do_something_with(p->a, p->b, p->c);
}
rcu_read_unlock();

rcu_assign_pointer()是说,先把那块内存写好,再把指针指过去。这里使用的内存写屏障是为了保证并发的读者读到数据一致性。在这条语句之前的读者读到旧的指针和旧的内存,这条语句之后的读者读到新的指针和新的内存。如果没有这条语句,很有可能出现读者读到新的指针和旧的内存。也就是说,这里通过内存屏障刷新了p所指向的内存的值,至于gp本身的值有没有更新还不确定。实际上,gp本身值的真正更新要等到并发的读者来促发。

rcu_dereference() 原语用的是数据依赖屏障,smp_read_barrier_dependence,它要求后面的读操作如果依赖前面的读操作,则前面的读操作需要首先完成。根据数据之间的依赖,要读p->a, p->b, p->c, 就必须先读p,要先读p,就必须先读p1,要先读p1,就必须先读gp。也就是说读者所在的core在进行后续的操作之前,gp必须是同步过的当前时刻的最新值。如果没有这个数据依赖屏障,有可能读者所在的core很长一段时间内一直用的是旧的gp值。所以,这里使用数据依赖屏障是为了督促写者将gp值准备好,是为了呼应写者,这个呼应的诉求是通过数据之间的依赖关系来促发的,也就是说到了非呼应不可的地步了。

下面看看kernel中常用的链表操作是如何使用这样的发布、订阅机制的:

写者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static inline void list_add_rcu(struct list_head *new, struct list_head *head)
{
	__list_add_rcu(new, head, head->next);
}

static inline void __list_add_rcu(struct list_head * new,
struct list_head * prev, struct list_head * next)
{
	new->next = next;
	new->prev = prev;
	smp_wmb();
	next->prev = new;
	prev->next = new;
}

读者:

1
2
3
4
5
#define list_for_each_entry_rcu(pos, head, member)                \
	for(pos = list_entry((head)->next, typeof(*pos), member);     \
			prefetch(rcu_dereference(pos)->member.next),          \
			&pos->member!= (head);                                \
		pos= list_entry(pos->member.next, typeof(*pos), member))

写者通过调用list_add_rcu来发布新的节点,其实是发布next->prev, prev->next这两个指针。读者通过list_for_each_entry_rcu来订阅这连个指针,我们将list_for_each_entry_rcu订阅部分简化如下:

1
2
pos = prev->next;
prefetch(rcu_dereference(pos)->next);

读者通过rcu_dereference订阅的是pos,而由于数据依赖关系,又间接订阅了prev->next指针,或者说是促发prev->next的更新。

下面介绍下其他相关链表操作的函数:

safe版本的iterate的函数?为什么就safe了?

1
2
3
4
5
6
7
#define list_for_each_safe(pos,n, head)                    \
	for(pos = (head)->next, n = pos->next; pos != (head);  \
			pos= n, n = pos->next)

#define list_for_each(pos, head)                                \
	for(pos = (head)->next; prefetch(pos->next), pos != (head); \
			pos= pos->next)

当在iterate的过程中执行删除操作的时候,比如:

1
2
list_for_each(pos,head)
	list_del(pos)

这样会断链,为了避免这种断链,增加了safe版本的iterate函数。另外,由于preftech的缘故,有可能引用一个无效的指针LIST_POISON1。这里的safe是指,为避免有些cpu的preftech的影响,干脆在iterate的过程中去掉preftech。

还有一个既有rcu+safe版本的iterative函数:

1
2
3
4
#define list_for_each_safe_rcu(pos, n, head)              \
	for(pos = (head)->next;                               \
			n= rcu_dereference(pos)->next, pos != (head); \
			pos= n)

只要用这个版本的iterate函数,就可以和多个_rcu版本的写操作(如:list_add_rcu())并发执行。

kmalloc、vmalloc、malloc的区别

blog.csdn.net/macrossdzh/article/details/5958368

简单的说:
kmalloc和vmalloc是分配的是内核的内存,malloc分配的是用户的内存
kmalloc保证分配的内存在物理上是连续的,vmalloc保证的是在虚拟地址空间上的连续,malloc不保证任何东西(这点是自己猜测的,不一定正确)
kmalloc能分配的大小有限,vmalloc和malloc能分配的大小相对较大
内存只有在要被DMA访问的时候才需要物理上连续
vmalloc比kmalloc要慢

详细的解释:
对于提供了MMU(存储管理器,辅助操作系统进行内存管理,提供虚实地址转换等硬件支持)的处理器而言,Linux提供了复杂的存储管理系统,使得进程所能访问的内存达到4GB。 进程的4GB内存空间被人为的分为两个部分–用户空间与内核空间。用户空间地址分布从0到3GB(PAGE_OFFSET,在0x86中它等于0xC0000000),3GB到4GB为内核空间。
内核空间中,从3G到vmalloc_start这段地址是物理内存映射区域(该区域中包含了内核镜像、物理页框表mem_map等等),比如我们使用 的 VMware虚拟系统内存是160M,那么3G~3G+160M这片内存就应该映射物理内存。在物理内存映射区之后,就是vmalloc区域。对于 160M的系统而言,vmalloc_start位置应在3G+160M附近(在物理内存映射区与vmalloc_start期间还存在一个8M的gap 来防止跃界),vmalloc_end的位置接近4G(最后位置系统会保留一片128k大小的区域用于专用页面映射)

kmalloc和get_free_page申请的内存位于物理内存映射区域,而且在物理上也是连续的,它们与真实的物理地址只有一个固定的偏移,因此存在较简单的转换关系,virt_to_phys()可以实现内核虚拟地址转化为物理地址:

1
2
3
4
5
#define __pa(x) ((unsigned long)(x)-PAGE_OFFSET)
extern inline unsigned long virt_to_phys(volatile void * address)
{
	return __pa(address);
}

上面转换过程是将虚拟地址减去3G(PAGE_OFFSET=0XC000000)。

与之对应的函数为phys_to_virt(),将内核物理地址转化为虚拟地址:

1
2
3
4
5
#define __va(x) ((void *)((unsigned long)(x)+PAGE_OFFSET))
extern inline void * phys_to_virt(unsigned long address)
{
	return __va(address);
}

virt_to_phys()和phys_to_virt()都定义在include/asm-i386/io.h中。

而vmalloc申请的内存则位于vmalloc_start~vmalloc_end之间,与物理地址没有简单的转换关系,虽然在逻辑上它们也是连续的,但是在物理上它们不要求连续。


blog.csdn.net/kris_fei/article/details/17243527

平台: msm8x25
系统: android 4.1
内核: 3.4.0

概念

由于系统的连续物理内存有限,这使得非连续物理内存的使用在linux内核中出现,这叫vmalloc机制。和前者一样,vmalloc机制中的虚拟地址也是连续的。

Vmallocinfo

Vmalloc机制并不是狭义地指使用vmalloc函数分配,其他还有如ioremap, iotable_init等。可以从/proc/vmallocinfo获取到此信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#cat /proc/vmallocinfo
0xf3600000-0xf36ff0001044480 binder_mmap+0xb0/0x224 ioremap
………..
0xf6680000-0xf66c1000 266240 kgsl_page_alloc_map_kernel+0x98/0xe8 ioremap
0xf6700000-0xf67ff0001044480 binder_mmap+0xb0/0x224 ioremap
…………….
0xf6f00000-0xf6f41000 266240 kgsl_page_alloc_map_kernel+0x98/0xe8 ioremap
0xf7200000-0xf72ff0001044480 binder_mmap+0xb0/0x224 ioremap
0xfa000000-0xfa001000   4096 iotable_init+0x0/0xb0 phys=c0800000 ioremap
……………..
0xfa105000-0xfa106000   4096 iotable_init+0x0/0xb0 phys=a9800000 ioremap
0xfa200000-0xfa3000001048576 pmd_empty_section_gap+0x0/0x3c ioremap
0xfa300000-0xfa4000001048576 iotable_init+0x0/0xb0 phys=100000 ioremap
0xfa400000-0xfa5000001048576 iotable_init+0x0/0xb0 phys=aa500000 ioremap
0xfa500000-0xfa6000001048576 pmd_empty_section_gap+0x0/0x3c ioremap
0xfa701000-0xfa702000   4096 iotable_init+0x0/0xb0 phys=c0400000 ioremap
…………..
0xfa800000-0xfa9000001048576 pmd_empty_section_gap+0x0/0x3c ioremap
0xfa900000-0xfb60000013631488 iotable_init+0x0/0xb0 phys=ac000000 ioremap
0xfefdc000-0xff000000 147456 pcpu_get_vm_areas+0x0/0x56c vmalloc

上面的列数意思依次是:虚拟地址,分配大小,哪个函数分配的,物理地址,分配类型。

后面会提到vmalloc size的划分是按照此info来修改的。

分配标志

是否划分到vamlloc区域主要是以下重要的标志来决定的:

File: kernel/include/linux/vmalloc.h

1
2
3
4
5
6
7
8
/* bits in flags ofvmalloc's vm_struct below */
#defineVM_IOREMAP    0x00000001     /* ioremap()and friends */
#define VM_ALLOC     0x00000002     /* vmalloc() */
#defineVM_MAP        0x00000004     /* vmap()ed pages */
#defineVM_USERMAP    0x00000008     /* suitable forremap_vmalloc_range */
#defineVM_VPAGES     0x00000010     /* buffer for pages was vmalloc'ed */
#defineVM_UNLIST     0x00000020     /* vm_struct is not listed in vmlist */
/* bits [20..32]reserved for arch specific ioremap internals */

Vmallocinfo中的函数,你可以对照源码看一下,在设置flag的时候就会有VM_IOREMAP, VM_ALLOC这些标志。

Vmalloc区域

Vmalloc的区域是由两个宏变量来表示: VMALLOC_START,VMALLOC_END.

File: kernel/arch/arm/include/asm/pgtable.h

1
2
3
#defineVMALLOC_OFFSET       (8*1024*1024)
#defineVMALLOC_START        (((unsigned long)high_memory + VMALLOC_OFFSET) & ~(VMALLOC_OFFSET-1))
#defineVMALLOC_END          0xff000000UL

VMALLOC_START:看上去会随着high_memory的值变化。

VMALLOC_OFFSET:系统会在low memory和VMALLOC区域留8M,防止访问越界。因此假如理论上vmalloc size有300M,实际可用的也是只有292M。

File: kernel/Documentation/arm/memory.txt有给出更好的解释:

1
VMALLOC_START   VMALLOC_END-1    vmalloc() / ioremap() space. Memory returned byvmalloc/ioremap will be dynamically placed in this region. Machine specificstatic mappings are also located here through iotable_init(). VMALLOC_START isbased upon the value of the high_memoryvariable, and VMALLOC_END is equal to 0xff000000.

下图摘自网络,看下VMALLOC_START和VMALLOC_END的位置。0xc0000000到VMALLOC_START为low memory虚拟地址区域。

Vmallocsize 计算

有了以上知识后我们看下vmalloc size是如何分配的,目前有两种方法,kernel默认分配一个, 以及开机从cmdline分配。

1. 从cmdline分配

File: device/qcom/msm7627a/BoardConfig.mk

BOARD_KERNEL_CMDLINE := androidboot.hardware=qcom loglevel=7vmalloc=200M

上面的值在build的时候会被赋值给kernel 的cmdline。

开机的时候early_vmalloc()会去读取vmalloc这个值。

File: kernel/arch/arm/mm/mmu.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int__init early_vmalloc(char *arg)
{
	/*cmdline中的vmalloc会被解析到vmlloc_reserve中。*/
	unsigned long vmalloc_reserve = memparse(arg, NULL);

	/*小于16M则用16M。*/
	if (vmalloc_reserve < SZ_16M) {
		vmalloc_reserve = SZ_16M;
		printk(KERN_WARNING
				"vmalloc area too small, limiting to %luMB\n",
				vmalloc_reserve >> 20);
	}

	/*大于可用虚拟地址内存则使用可用地址部分再减去32M。*/
	if (vmalloc_reserve > VMALLOC_END - (PAGE_OFFSET + SZ_32M)) {
		vmalloc_reserve = VMALLOC_END - (PAGE_OFFSET + SZ_32M);
		printk(KERN_WARNING
				"vmalloc area is too big, limiting to %luMB\n",
				vmalloc_reserve >> 20);
	}

	/*计算偏移起始地址。*/
	vmalloc_min = (void *)(VMALLOC_END - vmalloc_reserve);
	return 0;
}
early_param("vmalloc",early_vmalloc);

vmalloc_min会影响arm_lowmem_limit,arm_lowmem_limit其实就是high_memory。因为此过程不是我们要分析的重点,如果有兴趣可分析kernel/arch/arm/mm/mmu.c中的sanity_check_meminfo()函数。

所以,VMALLOC_START受到了hight_memory的影响而发生了变化,最终使得vmalloc size也变化了!

2. 开机默认分配:

File: kernel/arch/arm/mm/mmu.c

1
2
static void * __initdata vmalloc_min =
	(void *)(VMALLOC_END - (240 << 20) - VMALLOC_OFFSET);

当cmdline无vmalloc参数传进来的时候,early_vmalloc()函数也不会调用到,vmalloc_min的值就会被默认传进来了,默认是240M。

后面的步骤和方法1一样了!

开机log有memory layout 信息:

1
2
3
4
5
6
7
8
9
10
11
[    0.000000] [cpuid: 0] Virtual kernelmemory layout:
[    0.000000] [cpuid:0]     vector  : 0xffff0000 - 0xffff1000  (   4 kB)
[    0.000000] [cpuid:0]     fixmap  : 0xfff00000 - 0xfffe0000   (896 kB)
[    0.000000] [cpuid:0]     vmalloc : 0xf3000000 - 0xff000000   ( 192MB)
[    0.000000] [cpuid:0]     lowmem  : 0xc0000000 - 0xf2800000   (808 MB)
[    0.000000] [cpuid:0]     pkmap   : 0xbfe00000 -0xc0000000   (   2 MB)
[    0.000000] [cpuid:0]     modules : 0xbf000000 - 0xbfe00000  (  14 MB)
[    0.000000] [cpuid:0]       .text : 0xc0008000 -0xc0893034   (8749 kB)
[    0.000000] [cpuid:0]       .init : 0xc0894000 -0xc08cdc00   ( 231 kB)
[    0.000000] [cpuid:0]       .data : 0xc08ce000 -0xc09f8eb8   (1196 kB)
[    0.000000] [cpuid:0]        .bss : 0xc0a78eec -0xc0f427a8   (4903 kB)

其中看到vmalloc为192MB , cmdline中使用vmllaoc就是200M。

Lowmem为地段内存部分,可见lowmem和vmalloc中间有8M空隙。

Vmalloc该分配多大?

Linux内核版本从3.2到3.3 默认的vmalloc size由128M 增大到了240M,3.4.0上的

修改Commit信息如下:

To accommodate all static mappings on machines withpossible highmem usage, the default vmalloc area size is changed to 240 MB sothat VMALLOC_START is no higher than 0xf0000000 by default.

看其意思是因为开机的静态映射增加了,所以要扩大。

另外3.2到3.3版本的一个重大变化是将android引入到主线内核中。我想增大vmalloc size到240M是基于此考虑吧。当然,各家厂商都也可以基于自己平台来动态修改size的。

那么如何判断当前vmalloc size不足呢?

/proc/meminfo中有vmalloc信息:
VmallocTotal: 540672 kB
VmallocUsed: 165268 kB
VmallocChunk: 118788kB

事实上这里的VmallocUsed只是表示已经被真正使用掉的vmalloc区域,但是区域之前的空隙也就是碎片没有被计算进去。

所以,回到前面说的/proc/vmallocinfo,假设我们的vmalloc size就是200M。那么区域为0xf3000000- 0xff000000,从vmallocinfo中可以看到,前面大部分虚拟地址空间都用掉了,剩下0xfb600000到0xfefdc000这57M空间,假如申请了64M,那么就会失败了。

开机分配使用掉vmalloc之后到底该剩余多少目前没有具体依据,一般来说1GB RAM可以设置为400~600M。

Linux-2.6.32 NUMA架构之内存和调度

http://blog.chinaunix.net/uid-7295895-id-3076420.html

Linux-2.6.32 NUMA架构之内存和调度

本文将以XLP832通过ICI互连形成的NUMA架构进行分析,主要包括内存管理和调度两方面,参考内核版本2.6.32.9;NUMA架构常见配置选项有:CONFIG_SMP, CONFIG_NUMA, CONFIG_NEED_MULTIPLE_NODES, CONFIG_NODES_SHIFT, CONFIG_SPARSEMEM, CONFIG_CGROUPS, CONFIG_CPUSETS, CONFIG_MIGRATION等。

本文试图从原理上介绍,尽量避免涉及代码的实现细节。

1 NUMA架构简介

NUMA(Non Uniform Memory Access)即非一致内存访问架构,市面上主要有X86_64(JASPER)和MIPS64(XLP)体系。

1.1 概念

NUMA具有多个节点(Node),每个节点可以拥有多个CPU(每个CPU可以具有多个核或线程),节点内使用共有的内存控制器,因此节点的所有内存对于本节点的所有CPU都是等同的,而对于其它节点中的所有CPU都是不同的。节点可分为本地节点(Local Node)、邻居节点(Neighbour Node)和远端节点(Remote Node)三种类型。

本地节点:对于某个节点中的所有CPU,此节点称为本地节点;
邻居节点:与本地节点相邻的节点称为邻居节点;
远端节点:非本地节点或邻居节点的节点,称为远端节点。

邻居节点和远端节点,称作非本地节点(Off Node)。

CPU访问不同类型节点内存的速度是不相同的:本地节点>邻居节点>远端节点。访问本地节点的速度最快,访问远端节点的速度最慢,即访问速度与节点的距离有关,距离越远访问速度越慢,此距离称作Node Distance。

常用的NUMA系统中:硬件设计已保证系统中所有的Cache是一致的(Cache Coherent, ccNUMA);不同类型节点间的Cache同步时间不一样,会导致资源竞争不公平,对于某些特殊的应用,可以考虑使用FIFO Spinlock保证公平性。

1.2 关键信息

1) 物理内存区域与Node号之间的映射关系;
2) 各Node之间的Node Distance;
3) 逻辑CPU号与Node号之间的映射关系。

2 XLP832 NUMA初始化

首先需要完成1.2节中描述的3个关键信息的初始化。

2.1 CPU和Node的关系

start_kernel()->setup_arch()->prom_init():

1
2
3
#ifdef CONFIG_NUMA
	build_node_cpu_map();
#endif

build_node_cpu_map()函数工作:

a) 确定CPU与Node的相互关系,做法很简单:

1
2
#define cpu_to_node(cpu)       (cpu >> 5)
#define cpumask_of_node    (NODE_CPU_MASK(node)) /* node0:0~31; node1: 32~63 */

说明:XLP832每个节点有1个物理CPU,每个物理CPU有8个核,每个核有4个超线程,因此每个节点对应32个逻辑CPU,按节点依次展开。另外,实际物理存在的CPU数目是通过DTB传递给内核的;numa_node_id()可以获取当前CPU所处的Node号。

b) 设置每个物理存在的节点的在线状态,具体是通过node_set_online()函数来设置全局变量

nodemask_t node_states[];

这样,类似于CPU号,Node号也就具有如下功能宏:

1
2
for_each_node(node);
for_each_online_node(node);

详细可参考include/linux/nodemask.h

2.2 Node Distance确立

作用:建立buddy时用,可以依此来构建zonelist,以及zone relaim(zone_reclaim_mode)使用,详见后面的4.2.2节。

2.3 内存区域与Node的关系

start_kernel()->setup_arch()->arch_mem_init->bootmem_init()->nlm_numa_bootmem_init():

nlm_get_dram_mapping();

XLP832上电后的默认memory-mapped物理地址空间分布:

其中PCIE配置空间映射地址范围为[0x1800_0000, 0x1BFF_FFFF],由寄存器ECFG_BASE和ECFG_LIMIT指定(注:但这2个寄存器本身是处于PCIE配置空间之中的)。

PCIE配置空间:
PCIE配置空间与memory-mapped物理地址的映射方式:

XLP832实现了所有设备都位于虚拟总线0上,每个节点有8个设备,按节点依次排开。

DRAM映射寄存器组:
每个节点都独立实现有几组不同类型的DRAM(每组有8个相同类型的)寄存器可以配置DRAM空间映射到物理地址空间中的基址和大小,以及所属的节点信息(这些寄存器的值事先会由bootloader设好);这组寄存器位于虚拟总线0的设备0/8/16/24(依次对应每个节点的第一个设备号)的Function0(每个设备最多可定义8个Function,每个Function有着独立的PCIE 4KB的配置空间)的PCIE配置空间中(这个配置空间实现的是DRAM/Bridge控制器)。

本小节涉及到的3组不同类型的寄存器(注:按索引对应即DRAM_BAR,DRAM_LIMIT和 DRAM_NODE_TRANSLATION描述一个内存区域属性):

第一组(DRAM空间映射物理空间基址):

1
2
3
4
5
6
7
8
DRAM_BAR0: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x54
DRAM_BAR1: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x55
DRAM_BAR2: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x56
DRAM_BAR3: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x57
DRAM_BAR4: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x58
DRAM_BAR5: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x59
DRAM_BAR6: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5A
DRAM_BAR7: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5B

第二组(DRAM空间映射物理空间长度):

1
2
3
4
5
6
7
8
DRAM_LIMIT0: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5C
DRAM_LIMIT1: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5D
DRAM_LIMIT2: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5E
DRAM_LIMIT3: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x5F
DRAM_LIMIT4: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x60
DRAM_LIMIT5: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x61
DRAM_LIMIT6: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x62
DRAM_LIMIT7: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x63

第三组(节点相关):

1
2
3
4
5
6
7
8
DRAM_NODE_TRANSLATION0: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x64
DRAM_NODE_TRANSLATION1: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x65
DRAM_NODE_TRANSLATION2: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x66
DRAM_NODE_TRANSLATION3: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x67
DRAM_NODE_TRANSLATION4: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x68
DRAM_NODE_TRANSLATION5: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x69
DRAM_NODE_TRANSLATION6: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x6A
DRAM_NODE_TRANSLATION7: PCIe Bus 0, Device 0/8/16/24, Function 0, Register 0x6B

根据上述的PCIE配置空间memory-mapped映射方式便可直接获取寄存器中的值,就可以建立各个节点中的所有内存区域(最多8个区域)信息。关于这些寄存器的使用可以参考“XLP® Processor Family Programming Reference Manual”的“Chapter 7 Memory and I/O Subsystem”。

3 Bootmem初始化

bootmem_init()->…->init_bootmem_node()->init_bootmem_core():

每个节点拥有各自的bootmem管理(code&data之前可以为空闲页面)。

4 Buddy初始化

初始化流程最后会设置全局struct node_active_region early_node_map[]用于初始化Buddy系统,for_each_online_node()遍历所有在线节点调用free_area_init_node()初始化,主要初始化每个zone的大小和所涉及页面的struct page结构(flags中初始化有所属zone和node信息,由set_page_links()函数设置)等。

4.1 NUMA带来的变化

1) pglist_data

1
2
3
4
5
6
7
8
9
10
11
12
13
typedef struct pglist_data {
	struct zone node_zones[MAX_NR_ZONES];
	struct zonelist node_zonelists[MAX_ZONELISTS];
	int nr_zones;
	struct bootmem_data *bdata;
	unsigned long node_start_pfn;
	unsigned long node_present_pages; /* total number of physical pages */
	unsigned long node_spanned_pages; /* total size of physical pagerange, including holes */
	int node_id;
	wait_queue_head_t kswapd_wait;
	struct task_struct *kswapd;
	int kswapd_max_order;
} pg_data_t;

a)上节的bootmem结构的描述信息存放在NODE_DATA(node)-> bdata中;NODE_DATA(i)宏返回节点i的struct pglist_data结构,需要在架构相关的mmzone.h中实现;
b) #define MAX_ZONELISTS 2,请参考后面的“zonelist初始化”。

2) zone

1
2
3
4
5
6
7
8
9
10
11
12
struct zone {
#ifdef CONFIG_NUMA
	int node;
	/*
	 * zone reclaim becomes active if more unmapped pages exist.
	 */
	unsigned long        min_unmapped_pages;
	unsigned long        min_slab_pages;
	struct per_cpu_pageset   *pageset[NR_CPUS];
#else
	… …
};

a)最终调用kmalloc_node()为pageset成员在每个CPU的对应的内存节点分配内存;
b)min_unmapped_pages 对应/proc/sys/vm/min_unmapped_ratio,默认值为1;
min_slab_pages对应/proc/sys/vm/min_slab_ratio,默认值为5;
作用:当剩余可回收的非文件映射和SLAB页面超过这2个值时,才激活当前zone回收;

c) 增加了zone对应的节点号。

4.2 zonelist初始化

本节讲述zonelist的构建方式,实现位于start_kernel()->build_all_zonelists()中,zonelist的组织方式非常关键(这一点与以前的2.6.21内核版本不一样,2.6.32组织得更清晰)。

4.2.1 zonelist order

NUMA系统中存在多个节点,每个节点对应一个struct pglist_data结构,此结构中可以包含多个zone,如:ZONE_DMA, ZONE_NORMAL,这样就产生几种排列顺序,以2个节点2个zone为例(zone从高到低排列, ZONE_DMA0表示节点0的ZONE_DMA,其它类似):

a) Legacy方式

每个节点只排列自己的zone;

b)Node方式

按节点顺序依次排列,先排列本地节点的所有zone,再排列其它节点的所有zone。

c) Zone方式

按zone类型从高到低依次排列各节点的同相类型zone。

可通过启动参数“numa_zonelist_order”来配置zonelist order,内核定义了3种配置:

1
2
3
#define ZONELIST_ORDER_DEFAULT  0 /* 智能选择Node或Zone方式 */
#define ZONELIST_ORDER_NODE     1 /* 对应Node方式 */
#define ZONELIST_ORDER_ZONE     2 /* 对应Zone方式 */

默认配置为ZONELIST_ORDER_DEFAULT,由内核通过一个算法来判断选择Node或Zone方式,算法思想:

a) alloc_pages()分配内存是按照ZONE从高到低的顺序进行的,例如上节“Node方式”的图示中,从ZONE_NORMAL0中分配内存时,ZONE_NORMAL0中无内存时将落入较低的ZONE_DMA0中分配,这样当ZONE_DMA0比较小的时候,很容易将ZONE_DMA0中的内存耗光,这样是很不理智的,因为还有更好的分配方式即从ZONE_NORMAL1中分配;

b) 内核会检测各ZONE的页面数来选择Zone组织方式,当ZONE_DMA很小时,选择ZONELIST_ORDER_DEFAULT时,内核将倾向于选择ZONELIST_ORDER_ZONE方式,否则选择ZONELIST_ORDER_NODE方式。

另外,可以通过/proc/sys/vm/numa_zonelist_order动态改变zonelist order的分配方式。

4.2.2 Node Distance

上节中的例子是以2个节点为例,如果有>2个节点存在,就需要考虑不同节点间的距离来安排节点,例如以4个节点2个ZONE为例,各节点的布局(如4个XLP832物理CPU级联)值如下:

上图中,Node0和Node2的Node Distance为25,Node1和Node3的Node Distance为25,其它的Node Distance为15。

4.2.2.1 优先进行Zone Reclaim

另外,当Node Distance超过20的时候,内核会在某个zone分配内存不足的时候,提前激活本zone的内存回收工作,由全局变量zone_reclaim_mode控制,build_zonelists()中:

1
2
3
4
5
6
/*
 * If another node is sufficiently far away then it is better
 * to reclaim pages in a zone before going off node.
 */
if (distance > RECLAIM_DISTANCE)
	zone_reclaim_mode = 1;

通过/proc/sys/vm/zone_reclaim_mode可以动态调整zone_reclaim_mode的值来控制回收模式,含义如下:

1
2
3
4
#define RECLAIM_OFF    0
#define RECLAIM_ZONE  (1<<0)     /* Run shrink_inactive_list on the zone */
#define RECLAIM_WRITE (1<<1)     /* Writeout pages during reclaim */
#define RECLAIM_SWAP  (1<<2)     /* Swap pages out during reclaim */
4.2.2.2 影响zonelist方式

采用Node方式组织的zonelist为:

即各节点按照与本节点的Node Distance距离大小来排序,以达到更优的内存分配。

4.2.3 zonelist[2]

配置NUMA后,每个节点将关联2个zonelist:
1) zonelist[0]中存放以Node方式或Zone方式组织的zonelist,包括所有节点的zone;
2) zonelist[1]中只存放本节点的zone即Legacy方式;

zonelist[1]用来实现仅从节点自身zone中的内存分配(参考__GFP_THISNODE标志)。

5 SLAB初始化

配置NUMA后对SLAB(本文不涉及SLOB或SLUB)的初始化影响不大,只是在分配一些变量采用类似Buddy系统的per_cpu_pageset(单面页缓存)在CPU本地节点进行内存分配。

5.1 NUMA带来的变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
struct kmem_cache {
	struct array_cache *array[NR_CPUS];
	… …
	struct kmem_list3 *nodelists[MAX_NUMNODES];
};

struct kmem_list3 {
	… …
	struct array_cache *shared;    /* shared per node */
	struct array_cache **alien;    /* on other nodes */
	… …
};

struct slab {
	… …
	unsigned short nodeid;
	… …
};

上面的4种类型的指针变量在SLAB初始化完毕后将改用kmalloc_node()分配的内存。具体实现请参考enable_cpucache(),此函数最终调用alloc_arraycache()和alloc_kmemlist()来分配这些变量代表的空间。

nodelists[MAX_NUMNODES]存放的是所有节点对应的相关数据,本文称作SLAB节点。每个节点拥有各自的数据;

注:有些非NUMA系统比如非连续内存系统可能根据不同的内存区域定义多个节点(实际上Node Distance都是0即物理内存访问速度相同),所以这些变量并没有采用CONFIG_NUMA宏来控制,本文暂称为NUMA带来的变化。

5.2 SLAB缓存

配置NUMA后,SLAB将有三种类型的缓存:本地缓存(当前CPU的缓存),共享缓存(节点内的缓存)和外部缓存(节点间的缓存)。

SLAB系统分配对象时,先从本地缓存中查找,如果本地缓存为空,则将共享缓存中的缓存搬运本地缓存中,重新从本地缓存中分配;如果共享缓存为空,则从SLAB中进行分配;如果SLAB中已经无空闲对象,则分配新的SLAB后重新分配本地缓存。

SLAB系统释放对象时,先不归还给SLAB (简化分配流程,也可充分利用CPU Cache),如果是同节点的SLAB对象先放入本地缓存中,如果本地缓存溢出(满),则转移一部分(以batch为单位)至共享缓存中;如果是跨节点释放,则先放入外部缓存中,如果外部缓存溢出,则转移一部分至共享缓存中,以供后续分配时使用;如果共享缓存溢出,则调用free_block()函数释放溢出的缓存对象。

关于这三种类型缓存的大小以及参数设置,不在本文的讨论范围。

本地缓存
kmem_cache-> array[] 中缓存每个CPU的SLAB cached objects;

共享缓存
kmem_list3[]->shared(如果存在shared缓存)中缓存与当前CPU同节点的所有CPU (如XLP832 NUMA系统中的Node0包含为CPU0~CPU31) 本地缓存溢出的缓存,详细实现请参考cache_flusharray();另外,大对象SLAB不存在共享缓存。

外部缓存
kmem_list3[]->alien中存放其它节点的SLAB cached objects,当在某个节点上分配的SLAB 的object在另外一个节点上被释放的时候(即slab->nodeid与numa_node_id()当前节点不相等时),将加入到对象所在节点的alien缓存中(如果不存在此alien缓存,此对象不会被缓存,而是直接释放给此对象所属SLAB),否则加入本地缓存或共享缓存(本地缓存溢出且存在shared缓存时);当alien缓存满的时候,会调用cache_free_alien()搬迁至shared缓存中(如果不存在shared缓存,直接释放给SLAB);

slab->nodeid记录本SLAB内存块(若干个页面)所在的节点。

示例

例如2个节点,CPU0~31位于Node0,CPU32~CPU63位于Node1:

64个(依次对应于CPU0~CPU63)本地缓存
kmem_cache->array[0~31]:在Node0分配“array_cache结构+cached Objs指针”;
kmem_cache->array[32~63]:在Node1分配“array_cache结构+cached Objs指针”;

2个SLAB节点
kmem_cache->nodelists[0]:在Node0分配“kmem_list3结构”;
kmem_cache->nodelists[1]:在Node1分配“kmem_list3结构”;

SLAB节点0(CPU0~CPU31)共享缓存和外部缓存alien[1]
kmem_cache->nodelists[0]->shared:在Node0分配“array_cache结构+cached Objs指针”;
kmem_cache->nodelists[0]->alien:在Node0分配“节点数sizeof(void)”;
kmem_cache->nodelists[0]->alien[0]:置为NULL;
kmem_cache->nodelists[0]->alien[1]:在Node0分配“array_cache结构+cached Objs指针”;

SLAB节点1(CPU32~CPU63)共享缓存和外部缓存alien[0]
kmem_cache->nodelists[1]->shared:在Node1分配“array_cache结构+cached Objs指针”;
kmem_cache->nodelists[1]->alien:在Node1分配“节点数sizeof(void)”;
kmem_cache->nodelists[1]->alien[0]:在Node1分配“array_cache结构+cached Objs指针”;
kmem_cache->nodelists[1]->alien[1]:置为NULL;

另外,可以用内核启动参数“use_alien_caches”来控制是否开启alien缓存:默认值为1,当系统中的节点数目为1时,use_alien_caches初始化为0;use_alien_caches目的是用于某些多节点非连续内存(访问速度相同)的非NUMA系统。

由上可见,随着节点个数的增加,SLAB明显会开销越来越多的缓存,这也是SLUB涎生的一个重要原因。

5.3 __GFP_THISNODE

SLAB在某个节点创建新的SLAB时,都会置__GFP_THISNODE标记向Buddy系统提交页面申请,Buddy系统中看到此标记,选用申请节点的Legacy zonelist[1],仅从申请节点的zone中分配内存,并且不会走内存不足流程,也不会重试或告警,这一点需要引起注意。

SLAB在申请页面的时候会置GFP_THISNODE标记后调用cache_grow()来增长SLAB;

GFP_THISNODE定义如下:

1
2
#ifdef CONFIG_NUMA
#define GFP_THISNODE     (__GFP_THISNODE | __GFP_NOWARN | __GFP_NORETRY)

6 调度初始化

配置NUMA后负载均衡会多一层NUMA调度域,根据需要在topology.h中定义,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define SD_NODE_INIT (struct sched_domain) {                 \
	.parent             = NULL,                              \
	.child              = NULL,                              \
	.groups             = NULL,                              \
	.min_interval       = 8,                                 \
	.max_interval       = 32,                                \
	.busy_factor        = 32,                                \
	.imbalance_pct      = 125,                               \
	.cache_nice_tries   = 1,                                 \
	.flags              = SD_LOAD_BALANCE | SD_BALANCE_EXEC, \
	.last_balance       = jiffies,                           \
	.balance_interval   = 1,                                 \
	.nr_balance_failed  = 0,                                 \
}

顺便提一下,2.6.32对于实时任务不走负载均衡流程,采用了全局优先级调度的思想,保证实时任务的及时运行;这样的做法同时也解决了低版本内核在处理同一个逻辑CPU上相同最高优先级实时任务的负载均衡的时延。

7 NUMA内存分配

Zonelist[2]组织方式在NUMA内存分配过程中起着至关重要的作用,它决定了整个页面在不同节点间的申请顺序和流程。

7.1显式分配

显式分配即指定节点的分配函数,此类基础分配函数主要有2个:Buddy系统的 alloc_pages_node()和SLAB系统的kmem_cache_alloc_node(),其它的函数都可以从这2个派生出来。

例如,kmalloc_node()最终调用kmem_cache_alloc_node()进行分配。

7.1.1 Buddy显式分配

alloc_pages_node(node, gfp_flags, order)分配流程:
1) 如果node小于0,node取本地节点号(node = numa_node_id());
2) NODE_DATA(node)得到node对应的struct pglist_data结构,从而得到zonelist[2];
3) 如果gfp_flags含有__GFP_THISNODE标志,仅在此节点分配内存,使用node节点的Legacy zonelist[1],否则使用其包含所有节点zone的zonelist[0] (见4.2.2.3节);
4) 遍历确定出来的zonelist结构中包含的每一个符合要求的zone,gfp_flags指定了本次分配中的最高的zone,如__GFP_HIGHMEM表示最高的zone为ZONE_HIGH;
5) 分配结束。

7.1.2 SLAB显式分配

kmem_cache_alloc_node(cachep, gfp_flags, node)分配流程:
1) 如果node值为-1,node取本地节点号(node = numa_node_id());
2) 如果node < -1,则执行fall back行为,此行为与用户策略有关,有点类似隐式分配:
a) 根据用户策略(包括CPUSET和内存策略)依次选取节点,根据gfp_flags选取合适的zonelist进行分配;
b) 如果内存不足分配失败,则跳过内存策略直接进行隐式Buddy页面分配(仍受CPUSET的限定,关于CPUSET和内存策略后面会介绍),最终构建成新的SLAB并完成本次分配;转5);
3) 如果node是正常节点号,则先在node节点上根据gfp_flags选取合适的zonelist进行分配;
4) 如果3)中node节点内存不足分配失败,转2) a)执行fall back行为。
5) 分配结束。

注:fall back行为指的是某个节点上内存不足时会落到此节点的zonelist[0]中定义的其它节点zone分配。

7.1.3 设备驱动

配置CONFIG_NUMA后,设备会关联一个NUMA节点信息,struct device结构中会多一个numa_node字段记录本设备所在的节点,这个结构嵌套在各种类型的驱动中,如struct net_device结构。

1
2
3
4
5
6
7
struct device {
	… …
	#ifdef CONFIG_NUMA
		int          numa_node;    /* NUMA node this device is close to */
	#endif
	… …
}

__netdev_alloc_skb()的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
struct sk_buff *__netdev_alloc_skb(struct net_device *dev,
		unsigned int length, gfp_t gfp_mask)
{
	int node = dev->dev.parent ? dev_to_node(dev->dev.parent) : -1;
	struct sk_buff *skb;

	skb = __alloc_skb(length + NET_SKB_PAD, gfp_mask, 0, node);
	if (likely(skb)) {
		skb_reserve(skb, NET_SKB_PAD);
		skb->dev = dev;
	}
	return skb;
}

__alloc_skb()最终调用kmem_cache_alloc_node()和kmalloc_node()在此node上分配内存。

7.2 隐式分配和内存策略

隐式分配即不指定节点的分配函数,此类基础分配函数主要有2个:Buddy系统的 alloc_pages()和SLAB系统的kmem_cache_alloc(),其它的函数都可以从这2个派生出来。

隐式分配涉及到NUMA内存策略(Memory Policy),内核定义了四种内存策略。

注:隐式分配还涉及到CPUSET,本文后面会介绍。

7.2.1 内存策略

内核mm/mempolicy.c中实现了NUMA内存的四种内存分配策略:MPOL_DEFAULT, MPOL_PREFERRED, MPOL_INTERLEAVE和MPOL_BIND,内存策略会从父进程继承。

MPOL_DEFAULT:使用本地节点的zonelist;
MPOL_PREFERRED:使用指定节点的zonelist;
MPOL_BIND: 设置一个节点集合,只能从这个集合中节点的zone申请内存:

1)无__GFP_THISNODE申请标记,使用本地节点的zonelist[0];
2)置有__GFP_THISNODE申请标记,如果本地节点:
a)在集合中,使用本地节点的zonelist[1];
b)不在集合中,使用集合中最小节点号的zonelist[1];

MPOL_INTERLEAVE:采用Round-Robin方式从设定的节点集合中选出某个节点,使用此节点的zonelist;

内核实现的内存策略,用struct mempolicy结构来描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct mempolicy {
	atomic_t refcnt;
	unsigned short mode;              /* See MPOL_* above */
	unsigned short flags;             /* See set_mempolicy() MPOL_F_* above */
	union {
		short         preferred_node; /* preferred */
		nodemask_t    nodes;          /* interleave/bind */
		/* undefined for default */
	} v;
	union {
		nodemask_t cpuset_mems_allowed;     /* relative to these nodes */
		nodemask_t user_nodemask;           /* nodemask passed by user */
	} w;
};

成员mode表示使用四种分配策略中的哪一种,联合体v根据不同的分配策略记录相应的分配信息。

另外,MPOL_PREFERRED策略有一种特殊的模式,当其flags置上MPOL_F_LOCAL标志后,将等同于MPOL_DEFAULT策略,内核默认使用此种策略,见全局变量default_policy。

内存策略涉及的分配函数有2个:alloc_pages_current()和alloc_page_vma(),可以分别为不同任务以及任务的不同VMA设置内存策略。

7.2.2 Buddy隐式分配

以默认的NUMA内存策略为例讲解,alloc_pages(gfp_flags, order)分配流程:
1) 得到本地节点对应的struct pglist_data结构,从而得到zonelist[2];
2) 如果gfp_flags含有__GFP_THISNODE标志,仅在此节点分配内存即使用本地节点的Legacy zonelist[1],否则使用zonelist[0] (见4.2.2.3节);
3) 遍历确定出来的zonelist结构中包含的每一个符合要求的zone,gfp_flags指定了本次分配中的最高的zone,如__GFP_HIGHMEM表示最高的zone为ZONE_HIGH;
4) 分配结束。

7.2.3 SLAB隐式分配

以默认的NUMA内存策略为例讲解,kmem_cache_alloc(cachep, gfp_flags)分配流程:
1) 调用____cache_alloc()函数在本地节点local_node分配,此函数无fall back行为;
2) 如果1)中本地节点内存不足分配失败,调用____cache_alloc_node(cachep, gfp_flags,local_node)再次尝试在本地节点分配,如果还失败此函数会进行fall back行为;
3) 分配结束。

7.3 小结

上文提到的所有的内存分配函数都允许fall back行为,但有2种情况例外:
1) __GFP_THISNODE分配标记限制了只能从某一个节点上分配内存;
2) MPOL_BIND策略,限制了只能从一个节点集合中的节点上分配内存;
(gfp_zone(gfp_flags) < policy_zone的情况,MPOL_BIND不限制节点)。

注:还有一种情况,CPUSET限制的内存策略,后面会介绍。

8 CPUSET

CPUSET基于CGROUP的框架构建的子系统,有如下特点:
1) 限定一组任务所允许使用的内存Node和CPU资源;
2) CPUSET在内核各子系统中添加的检测代码很少,对内核没有性能影响;
3) CPUSET的限定优先级高于内存策略(针对于Node)和绑定(针对于CPU);
4) 没有额外实现系统调用接口,只能通过/proc文件系统和用户交互。

本节只讲述CPUSET的使用方法和说明。

8.1 创建CPUSET

因为CPUSET只能使用/proc文件系统访问,所以第一步就要先mount cpuset文件系统,配置CONFIG_CGROUPS和CONFIG_CPUSETS后/proc/filesystems中将有这个文件系统。

CPUSET是分层次的,可以在cpuset文件系统根目录是最顶层的CPUSET,可以在其下创建CPUSET子项,创建方式很简单即创建一个新的目录。

mount命令:mount nodev –t cpuset /your_dir或mount nodev –t cgroup –o cpuset /your_dir

Mount成功后,进入mount目录,这个就是最顶层的CPUSET了(top_cpuset),下面附一个演示例子:

8.2 CPUSET文件

介绍几个重要的CPUSET文件:
1) tasks,实际上是CGROUPS文件,为此CPUSET包含的线程pid集合;
echo 100 > tasks

2) cgroup.procs是CGROUPS文件,为此CPUSET包含的线程组tgid集合;
echo 100 > cgroup.procs

3) cpus是CPUSET文件,表示此CPUSET允许的CPU;
echo 0-8 > cpus

4) mems是CPUSET文件,表示此CPUSET允许的内存节点; echo 0-1 > mems (对应于struct task_struct中的mems_allowed字段)

5) sched_load_balance,为CPUSET文件,设置cpus集合的CPU是否参与负载均衡; echo 0 > sched_load_balance (禁止负载均衡);默认值为1表示开启负载均衡;

6) sched_relax_domain_level,为CPUSET文件,数值代表某个调度域级别,大于此级别的调度域层次将禁用闲时均衡和唤醒均衡,而其余级别的调度域都开启; 也可以通过启动参数“relax_domain_level”设置,其值含义:
-1 : 无效果,此为默认值
0 - 设置此值会禁用所有调度域的闲时均衡和唤醒均衡
1 - 超线程域
2 - 核域
3 - 物理域
4 - NUMA域
5 - ALLNODES模式的NUMA域

7) mem_exclusive和mem_hardwall,为CPUSET文件,表示内存硬墙标记;默认为0,表示软墙;有关CPUSET的内存硬墙(HardWall)和内存软墙(SoftWall),下文会介绍;

8) memory_spread_page和memory_spread_slab,为CPUSET文件,设定CPUSET中的任务PageCache和SLAB(创建时置有SLAB_MEM_SPREAD)以Round-Robin方式使用内存节点(类似于MPOL_INTERLEAVE);默认为0,表示未开启;struct task_struct结构中增加成员cpuset_mem_spread_rotor记录下次使用的节点号;

9) memory_migrate,为CPUSET文件,表明开启此CPUSET的内存迁移,默认为0;

当一个任务从一个CPUSET1(mems值为0)迁移至另一个CPUSET2(mems值为1)的时候,此任务在节点0上分配的页面内容将迁移至节点1上分配新的页面(将数据同步到新页面),这样就避免了此任务的非本地节点的内存访问。

上图为单Node,8个CPU的系统。

1) 顶层CPUSET包含了系统中的所有CPU以及Node,而且是只读的,不能更改;
2) 顶层CPUSET包含了系统中的所有任务,可以更改;
3) child为新创建的子CPUSET,子CPUSET的资源不能超过父CPUSET的资源;
4) 新创建的CPUSET的mems和cpus都是空的,使用前必须先初始化;
5) 添加任务:设置tasks和cgroup.procs文件;
6) 删除任务:将任务重新添加至其它CPUSET(如顶层)就可以从本CPUSET删除任务。

8.3 利用CPUSET限定CPU和Node

设置步骤:
1) 在某个父CPUSET中创建子CPUSET;
2) 在子CPUSET目录下,输入指定的Node号至mems文件;
3) 在子CPUSET目录下,输入指定的Node号至mems文件;
4) 在子CPUSET目录下,设定任务至tasks或group.procs文件;
5) 还可以设置memory_migrate为1,激活内存页面的迁移功能。

这样限定后,此CPUSET中所有的任务都将使用限定的CPU和Node,但毕竟系统中的任务并不能完全孤立,比如还是可能会全局共享Page Cache,动态库等资源,因此内核在某些情况下还是可以允许打破这个限制,如果不允许内核打破这个限制,需要设定CPUSET的内存硬墙标志即mem_exclusive或mem_hardwall置1即可;CPUSET默认是软墙。

硬软墙用于Buddy系统的页面分配,优先级高于内存策略,请参考内核函数:

cpuset_zone_allowed_hardwall()和cpuset_zone_allowed_softwall()

另外,当内核分不到内存将导致Oops的时候,CPUSET所有规则将被打破,毕竟一个系统的正常运行才是最重要的:
1) __GFP_THISNODE标记分配内存的时候(通常是SLAB系统);
2) 中断中分配内存的时候;
3) 任务置有TIF_MEMDIE标记即被内核OOM杀死的任务。

8.4 利用CPUSET动态改变调度域结构

利用sched_load_balance文件可以禁用掉某些CPU的负载均衡,同时重新构建调度域,此功能类似启动参数“isolcpus”的功能。

8个CPU的系统中,系统中存在一个物理域,现需要禁掉CPU4~CPU7的负载均衡,配置步骤为:
1) “mkdir child”在顶层CPUSET中创建子CPUSET,记为child;
2) “echo 0-3 > child/cpus ”(新建CPUSET的sched_load_balance默认是是打开的);
3) “echo 0 > sched_load_balance”关闭顶层CPUSET的负载均衡。

操作过程见下图:

由图可见,CPU4~CPU7的调度域已经不存在了,具体效果是将CPU4~CPU7从负载均衡中隔离出来。

9 NUMA杂项

1) /sys/devices/system/node/中记录有系统中的所有内存节点信息;
2)任务额外关联一个/proc//numa_smaps文件信息;
3) tmpfs可以指定在某个Node上创建;
4) libnuma库和其numactl小工具可以方便操作NUMA内存;
5) … …

10 参考资料

  1. www.kernel.org
  2. ULK3
  3. XLP® Processor Family Programming Reference Manual

Linux内存管理

http://blog.csdn.net/myarrow/article/details/8624687

http://blog.csdn.net/myarrow/article/details/8682819

1. Linux物理内存三级架构

对于内存管理,Linux采用了与具体体系架构不相关的设计模型,实现了良好的可伸缩性。它主要由内存节点node、内存区域zone和物理页框page三级架构组成。

内存节点node

内存节点node是计算机系统中对物理内存的一种描述方法,一个总线主设备访问位于同一个节点中的任意内存单元所花的代价相同,而访问任意两个不同节点中的内存单元所花的代价不同。在一致存储结构(Uniform Memory Architecture,简称UMA)计算机系统中只有一个节点,而在非一致性存储结构(NUMA)计算机系统中有多个节点。Linux内核中使用数据结构pg_data_t来表示内存节点node。如常用的ARM架构为UMA架构。

内存区域zone

内存区域位于同一个内存节点之内,由于各种原因它们的用途和使用方法并不一样。如基于IA32体系结构的个人计算机系统中,由于历史原因使得ISA设备只能使用最低16MB来进行DMA传输。又如,由于Linux内核采用

• 物理页框page

2. Linux虚拟内存三级页表

Linux虚拟内存三级管理由以下三级组成:
• PGD: Page Global Directory (页目录)
• PMD: Page Middle Directory (页目录)
• PTE: Page Table Entry (页表项)

每一级有以下三个关键描述宏:
• SHIFT
• SIZE
• MASK

如页的对应描述为:

1
2
3
4
/* PAGE_SHIFT determines the page size  asm/page.h */  
#define PAGE_SHIFT      12  
#define PAGE_SIZE       (_AC(1,UL) << PAGE_SHIFT)  
#define PAGE_MASK       (~(PAGE_SIZE-1))  

数据结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* asm/page.h */  
typedef unsigned long pteval_t;  

typedef pteval_t pte_t;  
typedef unsigned long pmd_t;  
typedef unsigned long pgd_t[2];  
typedef unsigned long pgprot_t;  

#define pte_val(x)      (x)  
#define pmd_val(x)      (x)  
#define pgd_val(x)  ((x)[0])  
#define pgprot_val(x)   (x)  

#define __pte(x)        (x)  
#define __pmd(x)        (x)  
#define __pgprot(x)     (x)  
2.1 Page Directory (PGD and PMD)

每个进程有它自己的PGD( Page Global Directory),它是一个物理页,并包含一个pgd_t数组。其定义见<asm/page.h>。 进程的pgd_t数据见 task_struct -> mm_struct -> pgd_t * pgd;

ARM架构的PGD和PMD的定义如下<arch/arm/include/asm/pgtable.h>:

1
2
3
4
5
6
7
8
9
10
11
#define PTRS_PER_PTE  512    // PTE中可包含的指针<u32>数 (21-12=9bit)  
#define PTRS_PER_PMD  1  
#define PTRS_PER_PGD  2048   // PGD中可包含的指针<u32>数 (32-21=11bit)</p><p>#define PTE_HWTABLE_PTRS (PTRS_PER_PTE)  
#define PTE_HWTABLE_OFF  (PTE_HWTABLE_PTRS * sizeof(pte_t))  
#define PTE_HWTABLE_SIZE (PTRS_PER_PTE * sizeof(u32))
/*  
 * PMD_SHIFT determines the size of the area a second-level page table can map  
 * PGDIR_SHIFT determines what a third-level page table entry can map  
 */  
#define PMD_SHIFT  21  
#define PGDIR_SHIFT  21

虚拟地址SHIFT宏图:

虚拟地址MASK和SIZE宏图:

2.2 Page Table Entry

PTEs, PMDs和PGDs分别由pte_t, pmd_t 和pgd_t来描述。为了存储保护位,pgprot_t被定义,它拥有相关的flags并经常被存储在page table entry低位(lower bits),其具体的存储方式依赖于CPU架构。

每个pte_t指向一个物理页的地址,并且所有的地址都是页对齐的。因此在32位地址中有PAGE_SHIFT(12)位是空闲的,它可以为PTE的状态位。

PTE的保护和状态位如下图所示:

2.3 如何通过3级页表访问物理内存

为了通过PGD、PMD和PTE访问物理内存,其相关宏在asm/pgtable.h中定义。

• pgd_offset

根据当前虚拟地址和当前进程的mm_struct获取pgd项的宏定义如下:

1
2
3
4
5
6
7
/* to find an entry in a page-table-directory */  
#define pgd_index(addr)     ((addr) >> PGDIR_SHIFT)  //获得在pgd表中的索引  

#define pgd_offset(mm, addr)    ((mm)->pgd + pgd_index(addr)) //获得pmd表的起始地址  

/* to find an entry in a kernel page-table-directory */  
#define pgd_offset_k(addr)  pgd_offset(&init_mm, addr)  

• pmd_offset

根据通过pgd_offset获取的pgd 项和虚拟地址,获取相关的pmd项(即pte表的起始地址)

1
2
/* Find an entry in the second-level page table.. */  
#define pmd_offset(dir, addr)   ((pmd_t *)(dir))   //即为pgd项的值  

• pte_offset

根据通过pmd_offset获取的pmd项和虚拟地址,获取相关的pte项(即物理页的起始地址)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#ifndef CONFIG_HIGHPTE  
#define __pte_map(pmd)      pmd_page_vaddr(*(pmd))  
#define __pte_unmap(pte)    do { } while (0)  
#else  
#define __pte_map(pmd)      (pte_t *)kmap_atomic(pmd_page(*(pmd)))  
#define __pte_unmap(pte)    kunmap_atomic(pte)  
#endif  

#define pte_index(addr)     (((addr) >> PAGE_SHIFT) & (PTRS_PER_PTE - 1))  

#define pte_offset_kernel(pmd,addr) (pmd_page_vaddr(*(pmd)) + pte_index(addr))  

#define pte_offset_map(pmd,addr)    (__pte_map(pmd) + pte_index(addr))  
#define pte_unmap(pte)          __pte_unmap(pte)  

#define pte_pfn(pte)        (pte_val(pte) >> PAGE_SHIFT)  
#define pfn_pte(pfn,prot)   __pte(__pfn_to_phys(pfn) | pgprot_val(prot))  

#define pte_page(pte)       pfn_to_page(pte_pfn(pte))  
#define mk_pte(page,prot)   pfn_pte(page_to_pfn(page), prot)  

#define set_pte_ext(ptep,pte,ext) cpu_set_pte_ext(ptep,pte,ext)  
#define pte_clear(mm,addr,ptep) set_pte_ext(ptep, __pte(0), 0)  

其示意图如下图所示:

2.4 根据虚拟地址获取物理页的示例代码

根据虚拟地址获取物理页的示例代码详见<mm/memory.c中的函数follow_page>。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/** 
 * follow_page - look up a page descriptor from a user-virtual address 
 * @vma: vm_area_struct mapping @address 
 * @address: virtual address to look up 
 * @flags: flags modifying lookup behaviour 
 * 
 * @flags can have FOLL_ flags set, defined in <linux/mm.h> 
 * 
 * Returns the mapped (struct page *), %NULL if no mapping exists, or 
 * an error pointer if there is a mapping to something not represented 
 * by a page descriptor (see also vm_normal_page()). 
 */  
struct page *follow_page(struct vm_area_struct *vma, unsigned long address,  
		    unsigned int flags)  
{  
	pgd_t *pgd;  
	pud_t *pud;  
	pmd_t *pmd;  
	pte_t *ptep, pte;  
	spinlock_t *ptl;  
	struct page *page;  
	struct mm_struct *mm = vma->vm_mm;  

	page = follow_huge_addr(mm, address, flags & FOLL_WRITE);  
	if (!IS_ERR(page)) {  
		BUG_ON(flags & FOLL_GET);  
		goto out;  
	}  

	page = NULL;  
	pgd = pgd_offset(mm, address);  
	if (pgd_none(*pgd) || unlikely(pgd_bad(*pgd)))  
		goto no_page_table;  

	pud = pud_offset(pgd, address);  
	if (pud_none(*pud))  
		goto no_page_table;  
	if (pud_huge(*pud) && vma->vm_flags & VM_HUGETLB) {  
		BUG_ON(flags & FOLL_GET);  
		page = follow_huge_pud(mm, address, pud, flags & FOLL_WRITE);  
		goto out;  
	}  
	if (unlikely(pud_bad(*pud)))  
		goto no_page_table;  

	pmd = pmd_offset(pud, address);  
	if (pmd_none(*pmd))  
		goto no_page_table;  
	if (pmd_huge(*pmd) && vma->vm_flags & VM_HUGETLB) {  
		BUG_ON(flags & FOLL_GET);  
		page = follow_huge_pmd(mm, address, pmd, flags & FOLL_WRITE);  
		goto out;  
	}  
	if (pmd_trans_huge(*pmd)) {  
		if (flags & FOLL_SPLIT) {  
		    split_huge_page_pmd(mm, pmd);  
		    goto split_fallthrough;  
		}  
		spin_lock(&mm->page_table_lock);  
		if (likely(pmd_trans_huge(*pmd))) {  
		    if (unlikely(pmd_trans_splitting(*pmd))) {  
		        spin_unlock(&mm->page_table_lock);  
		        wait_split_huge_page(vma->anon_vma, pmd);  
		    } else {  
		        page = follow_trans_huge_pmd(mm, address,  
		                         pmd, flags);  
		        spin_unlock(&mm->page_table_lock);  
		        goto out;  
		    }  
		} else  
		    spin_unlock(&mm->page_table_lock);  
		/* fall through */  
	}  
split_fallthrough:  
	if (unlikely(pmd_bad(*pmd)))  
		goto no_page_table;  

	ptep = pte_offset_map_lock(mm, pmd, address, &ptl);  

	pte = *ptep;  
	if (!pte_present(pte))  
		goto no_page;  
	if ((flags & FOLL_WRITE) && !pte_write(pte))  
		goto unlock;  

	page = vm_normal_page(vma, address, pte);  
	if (unlikely(!page)) {  
		if ((flags & FOLL_DUMP) ||  
		    !is_zero_pfn(pte_pfn(pte)))  
		    goto bad_page;  
		page = pte_page(pte);  
	}  

	if (flags & FOLL_GET)  
		get_page(page);  
	if (flags & FOLL_TOUCH) {  
		if ((flags & FOLL_WRITE) &&  
		    !pte_dirty(pte) && !PageDirty(page))  
		    set_page_dirty(page);  
		/* 
		 * pte_mkyoung() would be more correct here, but atomic care 
		 * is needed to avoid losing the dirty bit: it is easier to use 
		 * mark_page_accessed(). 
		 */  
		mark_page_accessed(page);  
	}  
	if ((flags & FOLL_MLOCK) && (vma->vm_flags & VM_LOCKED)) {  
		/* 
		 * The preliminary mapping check is mainly to avoid the 
		 * pointless overhead of lock_page on the ZERO_PAGE 
		 * which might bounce very badly if there is contention. 
		 * 
		 * If the page is already locked, we don't need to 
		 * handle it now - vmscan will handle it later if and 
		 * when it attempts to reclaim the page. 
		 */  
		if (page->mapping && trylock_page(page)) {  
		    lru_add_drain();  /* push cached pages to LRU */  
		    /* 
		     * Because we lock page here and migration is 
		     * blocked by the pte's page reference, we need 
		     * only check for file-cache page truncation. 
		     */  
		    if (page->mapping)  
		        mlock_vma_page(page);  
		    unlock_page(page);  
		}  
	}  
unlock:  
	pte_unmap_unlock(ptep, ptl);  
out:  
	return page;  

bad_page:  
	pte_unmap_unlock(ptep, ptl);  
	return ERR_PTR(-EFAULT);  

no_page:  
	pte_unmap_unlock(ptep, ptl);  
	if (!pte_none(pte))  
		return page;  

no_page_table:  
	/* 
	 * When core dumping an enormous anonymous area that nobody 
	 * has touched so far, we don't want to allocate unnecessary pages or 
	 * page tables.  Return error instead of NULL to skip handle_mm_fault, 
	 * then get_dump_page() will return NULL to leave a hole in the dump. 
	 * But we can only make this optimization where a hole would surely 
	 * be zero-filled if handle_mm_fault() actually did handle it. 
	 */  
	if ((flags & FOLL_DUMP) &&  
		(!vma->vm_ops || !vma->vm_ops->fault))  
		return ERR_PTR(-EFAULT);  
	return page;  
}

1. First Fit分配器

First Fit分配器是最基本的内存分配器,它使用bitmap而不是空闲块列表来表示内存。在bitmap中,如果page对应位为1,则表示此page已经被分配,为0则表示此page没有被分配。为了分配小于一个page的内存块,First Fit分配器记录了最后被分配的PFN (Page Frame Number)和分配的结束地址在页内的偏移量。随后小的内存分配被Merge到一起并存储到同一页中。

First Fit分配器不会造成严重的内存碎片,但其效率较低,由于内存经常通过线性地址进行search,而First Fit中的小块内存经常在物理内存的开始处,为了分配大块内存而不得不扫描前面大量的内存。

2. Boot Memory分配器

物理内存分配器如何分配内存来初始化其自己呢?

答案是:通过Boot Memory分配器来实现,而Boot Memory分配器则通过最基本的First Fit分配器来实现。

2.1 Boot Map定义

Boot Map通过数据结构bootmem_data来定义,详见<linux/bootmem.h>,其定义如下所示:

1
2
3
4
5
6
7
8
typedef struct bootmem_data {  
	unsigned long node_boot_start; // 描述的物理内存的起始地址  
	unsigned long node_low_pfn;    // 结束物理地址,即ZONE_NORMAL的结束  
	void *node_bootmem_map;        // 描述“使用或空闲的位图”的地址  
	unsigned long last_offset;     // 最后被分配的页内偏移量,即在llast_pos描述的物理页中,  
		                         // 从last_offset开始,没有被分配   
	unsigned long last_pos;        // 最后被分配的页的PFN  
} bootmem_data_t;  

所有bootmem_data被放于全局变量bdata_list中。

2.2 Boot Memory分配器初始化

每一个CPU架构被要求提供setup_arch函数,它负责获取初始化boot memory分配器的必要参数。不同的CPU架构通过不同的函数来实现,如ARM通过bootmem_init来实现。它负责获取以下参数:

1
2
3
4
5
• min_low_pfn: 系统中可获得的最小的PFN,装载kernel image结束之后的第一页,在mm/bootmem.c中定义
• max_low_pfn:低端内存(ZONE_NORMAL)中可获得的最大PFN
• highstart_pfn:高端内存(ZONE_HIGHMEM)的起始PFN
• highend_pfn:高端内存(ZONE_HIGHMEM)的结束PFN
• max_pfn:系统中可获得的最大的PFN, 在mm/bootmem.c中定义

PFN是在物理内存map的偏移量,以page为单位。Kernel可直接访问ZONE_NORMAL,其偏移量为:PAGE_OFFSET。

通过以上5个参数明确了可用物理内存之后,调用init_bootmem->init_bootmem_core来初始化contig_page_data。它主要完成以下两件事:
1) 将把与此node对应pgdat_data_t插入到pgdat_list中
2) 初始化bootmem_data_t的中参数,并分配表示页分配状态的bitmap,其大小为: (end_pfn-start_pfn+7)/8

bitmap的物理地址为:bootmem_data_t->node_boot_start
bitmap的虚拟地直为:bootmem_data_t->node_bootmem_map

2.3 分配内存

• reserve_bootmem:用于预留物理页面。但用于通用的内存分配是低率的,它主要用于各种驱动(如:Video Codec)预留内存。

常用的内存分配函数如下(in UMA架构,我们常的ARM架构为UMA架构):

1
2
3
4
• alloc_bootmem
• alloc_bootmem_low
• alloc_bootmem_pages
• alloc_bootmem_low_pages

其调用关系如下图所示:

2.3.1 __alloc_bootmem

__alloc_bootmem() 需要以下参数:
• pgdat 用于分配内存块的节点,在UMA架构中,它被忽略,因为它总是为:contig_page_data
• size 指定请求分配的内存大小,以字节为单位
• align 请求以多少字节对齐,地于小块内存分配,一般以SMP_CACHE_BYTES对齐,如在X86上,与L1硬件cache对齐
• goal 偏好的分配内存的起始地址,

2.3.2 __alloc_bootmem_core

它从goal指定的地址开始,线性地扫描内存,以寻找可以满足内存分配要求的内存块。它的另外一项功能是决定是否需要把新分配的内存块与以前已经分配的内存块merge到一起。

分配内存常用函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#ifdef CONFIG_NO_BOOTMEM  
/* We are using top down, so it is safe to use 0 here */  
#define BOOTMEM_LOW_LIMIT 0  
#else  
#define BOOTMEM_LOW_LIMIT __pa(MAX_DMA_ADDRESS)  
#endif  

#define alloc_bootmem(x) \  
	__alloc_bootmem(x, SMP_CACHE_BYTES, BOOTMEM_LOW_LIMIT)  
#define alloc_bootmem_align(x, align) \  
	__alloc_bootmem(x, align, BOOTMEM_LOW_LIMIT)  
#define alloc_bootmem_nopanic(x) \  
	__alloc_bootmem_nopanic(x, SMP_CACHE_BYTES, BOOTMEM_LOW_LIMIT)  
#define alloc_bootmem_pages(x) \  
	__alloc_bootmem(x, PAGE_SIZE, BOOTMEM_LOW_LIMIT)  
#define alloc_bootmem_pages_nopanic(x) \  
	__alloc_bootmem_nopanic(x, PAGE_SIZE, BOOTMEM_LOW_LIMIT)  
#define alloc_bootmem_node(pgdat, x) \  
	__alloc_bootmem_node(pgdat, x, SMP_CACHE_BYTES, BOOTMEM_LOW_LIMIT)  
#define alloc_bootmem_node_nopanic(pgdat, x) \  
	__alloc_bootmem_node_nopanic(pgdat, x, SMP_CACHE_BYTES, BOOTMEM_LOW_LIMIT)  
#define alloc_bootmem_pages_node(pgdat, x) \  
	__alloc_bootmem_node(pgdat, x, PAGE_SIZE, BOOTMEM_LOW_LIMIT)  
#define alloc_bootmem_pages_node_nopanic(pgdat, x) \  
	__alloc_bootmem_node_nopanic(pgdat, x, PAGE_SIZE, BOOTMEM_LOW_LIMIT)  

#define alloc_bootmem_low(x) \  
	__alloc_bootmem_low(x, SMP_CACHE_BYTES, 0)  
#define alloc_bootmem_low_pages(x) \  
	__alloc_bootmem_low(x, PAGE_SIZE, 0)  
#define alloc_bootmem_low_pages_node(pgdat, x) \  
	__alloc_bootmem_low_node(pgdat, x, PAGE_SIZE, 0)  
2.4 释放内存

调用free_bootmem来释放内存。

1
2
3
4
5
6
7
8
9
10
11
void __init free_bootmem(unsigned long addr, unsigned long size)  
{  
	unsigned long start, end;  

	kmemleak_free_part(__va(addr), size);  

	start = PFN_UP(addr);  
	end = PFN_DOWN(addr + size);  

	mark_bootmem(start, end, 0, 0);  
}