kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp

relay 数据传输

https://www.ibm.com/developerworks/cn/linux/l-cn-relay/

Relay 要解决的问题

对于任何在内核工作的程序而言,如何把大量的调试信息从内核空间传输到用户空间都是一个大麻烦,对于运行中的内核更是如此。特别是对于哪些用于调试内核性能的工具,更是如此。

对于这种大量数据需要在内核中缓存并传输到用户空间需求,很多传统的方法都已到达了极限,例如内核程序员很熟悉的 printk() 调用。此外,如果不同的内核子系统都开发自己的缓存和传输代码,造成很大的代码冗余,而且也带来维护上的困难。

这些,都要求开发一套能够高效可靠地将数据从内核空间转发到用户空间的系统,而且这个系统应该独立于各个调试子系统。

这样就诞生了 RelayFS。

Relay的发展历史

Relay 的前身是 RelayFS,即作为 Linux 的一个新型文件系统。2003年3月,RelayFS的第一个版本的代码被开发出来,在7月14日,第一个针对2.6内核的版本也开始提供下载。经过广泛的试用和改进,直到2005年9月,RelayFS才被加入mainline内核(2.6.14)。同时,RelayFS也被移植到2.4内核中。在2006年2月,从2.6.17开始,RelayFS不再作为单独的文件系统存在,而是成为内核的一部分。它的源码也从fs/目录下转移到kernel/relay.c中,名称中也从RelayFS改成了Relay。

RelayFS目前已经被越来越多的内核工具使用,包括内核调试工具SystemTap、LTT,以及一些特殊的文件系统例如DebugFS。

Relay的基本原理

总的说来,Relay提供了一种机制,使得内核空间的程序能够通过用户定义的relay通道(channel)将大量数据高效的传输到用户空间。

一个relay通道由一组和CPU一一对应的内核缓冲区组成。这些缓冲区又被称为relay缓冲区(buffer),其中的每一个在用户空间都用一个常规文件来表示,这被叫做relay文件(file)。内核空间的用户可以利用relay提供的API接口来写入数据,这些数据会被自动的写入当前的CPU id对应的那个relay缓冲区;同时,这些缓冲区从用户空间看来,是一组普通文件,可以直接使用read()进行读取,也可以使用mmap()进行映射。Relay并不关心数据的格式和内容,这些完全依赖于使用relay的用户程序。Relay的目的是提供一个足够简单的接口,从而使得基本操作尽可能的高效。

Relay将数据的读和写分离,使得突发性大量数据写入的时候,不需要受限于用户空间相对较慢的读取速度,从而大大提高了效率。Relay作为写入和读取的桥梁,也就是将内核用户写入的数据缓存并转发给用户空间的程序。这种转发机制也正是Relay这个名称的由来。

下面这个图给出了Relay的基本结构和典型操作:

Relay的基本结构和典型操作

可以看到,这里的relay通道由四个relay缓冲区(kbuf0到kbuf3)组成,分别对应于系统中的cpu0到cpu1。每个CPU上的代码调用relay_write()的时候将数据写入自己对应的relay缓冲区内。每个relay缓冲区称一个relay文件,即/cpu0到/cpu3。当文件系统被mount到/mnt/以后,这个relay文件就被映射成映射到用户空间的地址空间。一旦数据可用,用户程序就可以把它的数据读出来写入到硬盘上的文件中,即cpu0.out到cpu3.out。

Relay的主要API

前面提到的 relay_write() 就是 relay API 之一。除此以外,Relay 还提供了更多的 API来支持用户程序完整的使用 relay。这些 API,主要按照面向用户空间和面向内核空间分为两大类,下面我们来分别进行介绍。

面向用户空间的 API

这些 Relay 编程接口向用户空间程序提供了访问 relay 通道缓冲区数据的基本操作的入口,包括:

1
2
3
4
5
6
open() - 允许用户打开一个已经存在的通道缓冲区
mmap() - 使通道缓冲区被映射到位于用户空间的调用者的地址空间。要特别注意的是,我们不能仅对局部区域进行映射。也就是说,必须映射整个缓冲区文件,其大小是 CPU的个数和单个 CPU 缓冲区大小的乘积
read() - 读取通道缓冲区的内容。这些数据一旦被读出,就意味着他们被用户空间的程序消费掉了,也就不能被之后的读操作看到
sendfile() - 将数据从通道缓冲区传输到一个输出文件描述符。其中可能的填充字符会被自动去掉,不会被用户看到
poll() - 支持 POLLIN/POLLRDNORM/POLLERR 信号。每次子缓冲区的边界被越过时,等待着的用户空间程序会得到通知
close() - 将通道缓冲区的引用数减1。当引用数减为0时,表明没有进程或者内核用户需要打开它,从而这个通道缓冲区被释放。
面向内核空间的 API

这些API接口向位于内核空间的用户提供了管理relay通道、数据写入等功能。下面介绍其中主要的部分,完整的API接口列表请参见这里。

1
2
3
4
relay_open() - 创建一个relay通道,包括创建每个CPU对应的relay缓冲区。
relay_close() - 关闭一个relay通道,包括释放所有的relay缓冲区,在此之前会调用relay_switch()来处理这些relay缓冲区以保证已读取但是未满的数据不会丢失
relay_write() - 将数据写入到当前CPU对应的relay缓冲区内。由于它使用了local_irqsave()保护,因此也可以在中断上下文中使用。
relay_reserve() - 在relay通道中保留一块连续的区域来留给未来的写入操作。这通常用于那些希望直接写入到relay缓冲区的用户。考虑到性能或者其它因素,这些用户不希望先把数据写到一个临时缓冲区中,然后再通过relay_write()进行写入。

Relay的例子

我们用一个最简单的例子来介绍怎么使用Relay。这个例子由两部分组成:一部分是位于内核空间将数据写入relay文件的程序,使用时需要作为一个内核模块被加载;另一部分是位于用户空间从relay文件中读取数据的程序,使用时作为普通用户态程序运行。

内核空间的程序主要操作是:
加载模块时,打开一个relay通道,并且往打开的relay通道中写入消息;
卸载模块时,关闭relay通道。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include <linux/module.h>
#include <linux/relay.h>
#include <linux/debugfs.h>

static struct dentry *create_buf_file_handler(const char *filename, struct dentry *parent, int mode, struct rchan_buf *buf, int *is_global)
{
	return debugfs_create_file(filename, mode, parent, buf, &relay_file_operations);
}

static int remove_buf_file_handler(struct dentry *dentry)
{
	debugfs_remove(dentry);
	return 0;
}

static struct rchan_callbacks relay_callbacks =
{
	.create_buf_file = create_buf_file_handler,
	.remove_buf_file = remove_buf_file_handler,
};

static struct rchan *hello_rchan;
struct dentry *dir;

int init_module(void)
{
	const char *msg="Hello world\n";
	dir = debugfs_create_dir("test", NULL);
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,32))
	hello_rchan = relay_open("cpu", dir, 8192, 2, &relay_callbacks, NULL);
#else   
	hello_rchan = relay_open("cpu", dir, 8192, 2, &relay_callbacks);
#endif  
	if(!hello_rchan){
		printk("relay_open() failed.\n");
		return -ENOMEM;
	}
	relay_write(hello_rchan, msg, strlen(msg));
	return 0;
}

查看输出

1
2
mount -t debugfs debugfs /media
cat /media/test/cpu*


http://www.cnblogs.com/hoys/archive/2011/04/10/2011270.html

用户空间与内核空间数据交换的方式(4)——relayfs

relayfs是一个快速的转发(relay)数据的文件系统,它以其功能而得名。它为那些需要从内核空间转发大量数据到用户空间的工具和应用提供了快速有效的转发机制。

Channel是relayfs文件系统定义的一个主要概念,每一个channel由一组内核缓存组成,每一个CPU有一个对应于该channel 的内核缓存,每一个内核缓存用一个在relayfs文件系统中的文件文件表示,内核使用relayfs提供的写函数把需要转发给用户空间的数据快速地写入当前CPU上的channel内核缓存,用户空间应用通过标准的文件I/O函数在对应的channel文件中可以快速地取得这些被转发出的数据mmap 来。写入到channel中的数据的格式完全取决于内核中创建channel的模块或子系统。

relayfs的用户空间API:

relayfs实现了四个标准的文件I/O函数,open、mmap、poll和close.

open(),打开一个channel在某一个CPU上的缓存对应的文件。

mmap(),把打开的channel缓存映射到调用者进程的内存空间。

read (),读取channel缓存,随后的读操作将看不到被该函数消耗的字节,如果channel的操作模式为非覆盖写,那么用户空间应用在有内核模块写时仍 可以读取,但是如果channel的操作模式为覆盖式,那么在读操作期间如果有内核模块进行写,结果将无法预知,因此对于覆盖式写的channel,用户 应当在确认在channel的写完全结束后再进行读。

poll(),用于通知用户空间应用转发数据跨越了子缓存的边界,支持的轮询标志有POLLIN、POLLRDNORM和POLLERR。

close(),关闭open函数返回的文件描述符,如果没有进程或内核模块打开该channel缓存,close函数将释放该channel缓存。

注意:用户态应用在使用上述API时必须保证已经挂载了relayfs文件系统,但内核在创建和使用channel时不需要relayfs已经挂载。下面命令将把relayfs文件系统挂载到/mnt/relay。

1
mount -t relayfs relayfs /mnt/relay

relayfs内核API:

relayfs提供给内核的API包括四类:channel管理、写函数、回调函数和辅助函数。

Channel管理函数包括:

1
2
3
4
5
6
7
8
relay_open(base_filename, parent, subbuf_size, n_subbufs, overwrite, callbacks)
relay_close(chan)
relay_flush(chan)
relay_reset(chan)
relayfs_create_dir(name, parent)
relayfs_remove_dir(dentry)
relay_commit(buf, reserved, count)
relay_subbufs_consumed(chan, cpu, subbufs_consumed)

写函数包括:

1
2
3
relay_write(chan, data, length)
__relay_write(chan, data, length)
relay_reserve(chan, length)

回调函数包括:

1
2
3
subbuf_start(buf, subbuf, prev_subbuf_idx, prev_subbuf)
buf_mapped(buf, filp)
buf_unmapped(buf, filp)

辅助函数包括:

1
2
relay_buf_full(buf)
subbuf_start_reserve(buf, length)

前面已经讲过,每一个channel由一组channel缓存组成,每个CPU对应一个该channel的缓存,每一个缓存又由一个或多个子缓存组成,每一个缓存是子缓存组成的一个环型缓存。

函数relay_open用于创建一个channel并分配对应于每一个CPU的缓存,用户空间应用通过在relayfs文件系统中对应的文件可以 访问channel缓存,参数base_filename用于指定channel的文件名,relay_open函数将在relayfs文件系统中创建 base_filename0..base_filenameN-1,即每一个CPU对应一个channel文件,其中N为CPU数,缺省情况下,这些文件将建立在relayfs文件系统的根目录下,但如果参数parent非空,该函数将把channel文件创建于parent目录下,parent目录使 用函数relay_create_dir创建,函数relay_remove_dir用于删除由函数relay_create_dir创建的目录,谁创建的目录,谁就负责在不用时负责删除。参数subbuf_size用于指定channel缓存中每一个子缓存的大小,参数n_subbufs用于指定 channel缓存包含的子缓存数,因此实际的channel缓存大小为(subbuf_size x n_subbufs),参数overwrite用于指定该channel的操作模式,relayfs提供了两种写模式,一种是覆盖式写,另一种是非覆盖式 写。使用哪一种模式完全取决于函数subbuf_start的实现,覆盖写将在缓存已满的情况下无条件地继续从缓存的开始写数据,而不管这些数据是否已经 被用户应用读取,因此写操作决不失败。在非覆盖写模式下,如果缓存满了,写将失败,但内核将在用户空间应用读取缓存数据时通过函数 relay_subbufs_consumed()通知relayfs。如果用户空间应用没来得及消耗缓存中的数据或缓存已满,两种模式都将导致数据丢失,唯一的区别是,前者丢失数据在缓存开头,而后者丢失数据在缓存末尾。一旦内核再次调用函数relay_subbufs_consumed(),已满的缓存将不再满,因而可以继续写该缓存。当缓存满了以后,relayfs将调用回调函数buf_full()来通知内核模块或子系统。当新的数据太大无法写 入当前子缓存剩余的空间时,relayfs将调用回调函数subbuf_start()来通知内核模块或子系统将需要使用新的子缓存。内核模块需要在该回调函数中实现下述功能:

初始化新的子缓存;

如果1正确,完成当前子缓存;

如果2正确,返回是否正确完成子缓存切换;

在非覆盖写模式下,回调函数subbuf_start()应该如下实现:

1
2
3
4
5
6
7
8
9
10
11
static int subbuf_start(struct rchan_buf *buf, void *subbuf, void *prev_subbuf, unsigned intprev_padding)
{
	if (prev_subbuf)
		*((unsigned *)prev_subbuf) = prev_padding;

	if (relay_buf_full(buf))
		return 0;

	subbuf_start_reserve(buf, sizeof(unsigned int));
	return 1;
}

如果当前缓存满,即所有的子缓存都没读取,该函数返回0,指示子缓存切换没有成功。当子缓存通过函数relay_subbufs_consumed ()被读取后,读取者将负责通知relayfs,函数relay_buf_full()在已经有读者读取子缓存数据后返回0,在这种情况下,子缓存切换成 功进行。

在覆盖写模式下, subbuf_start()的实现与非覆盖模式类似:

1
2
3
4
5
6
7
8
9
static int subbuf_start(struct rchan_buf *buf, void *subbuf, void *prev_subbuf, unsigned int prev_padding)
{
	if (prev_subbuf)
		*((unsigned *)prev_subbuf) = prev_padding;

	subbuf_start_reserve(buf, sizeof(unsigned int));

	return 1;
}

只是不做relay_buf_full()检查,因为此模式下,缓存是环行的,可以无条件地写。因此在此模式下,子缓存切换必定成功,函数 relay_subbufs_consumed() 也无须调用。如果channel写者没有定义subbuf_start(),缺省的实现将被使用。 可以通过在回调函数subbuf_start()中调用辅助函数subbuf_start_reserve()在子缓存中预留头空间,预留空间可以保存任 何需要的信息,如上面例子中,预留空间用于保存子缓存填充字节数,在subbuf_start()实现中,前一个子缓存的填充值被设置。前一个子缓存的填 充值和指向前一个子缓存的指针一道作为subbuf_start()的参数传递给subbuf_start(),只有在子缓存完成后,才能知道填充值。 subbuf_start()也被在channel创建时分配每一个channel缓存的第一个子缓存时调用,以便预留头空间,但在这种情况下,前一个子 缓存指针为NULL。

内核模块使用函数relay_write()或__relay_write()往channel缓存中写需要转发的数据,它们的区别是前者失效了本 地中断,而后者只抢占失效,因此前者可以在任何内核上下文安全使用,而后者应当在没有任何中断上下文将写channel缓存的情况下使用。这两个函数没有 返回值,因此用户不能直接确定写操作是否失败,在缓存满且写模式为非覆盖模式时,relayfs将通过回调函数buf_full来通知内核模块。

函数relay_reserve()用于在channel缓存中预留一段空间以便以后写入,在那些没有临时缓存而直接写入channel缓存的内核 模块可能需要该函数,使用该函数的内核模块在实际写这段预留的空间时可以通过调用relay_commit()来通知relayfs。当所有预留的空间全 部写完并通过relay_commit通知relayfs后,relayfs将调用回调函数deliver()通知内核模块一个完整的子缓存已经填满。由于预留空间的操作并不在写channel的内核模块完全控制之下,因此relay_reserve()不能很好地保护缓存,因此当内核模块调用 relay_reserve()时必须采取恰当的同步机制。

当内核模块结束对channel的使用后需要调用relay_close() 来关闭channel,如果没有任何用户在引用该channel,它将和对应的缓存全部被释放。

函数relay_flush()强制在所有的channel缓存上做一个子缓存切换,它在channel被关闭前使用来终止和处理最后的子缓存。

函数relay_reset()用于将一个channel恢复到初始状态,因而不必释放现存的内存映射并重新分配新的channel缓存就可以使用channel,但是该调用只有在该channel没有任何用户在写的情况下才可以安全使用。

回调函数buf_mapped() 在channel缓存被映射到用户空间时被调用。

回调函数buf_unmapped()在释放该映射时被调用。内核模块可以通过它们触发一些内核操作,如开始或结束channel写操作。

在源代码包中给出了一个使用relayfs的示例程序relayfs_exam.c,它只包含一个内核模块,对于复杂的使用,需要应用程序配合。该模块实现了类似于文章中seq_file示例实现的功能。

当然为了使用relayfs,用户必须让内核支持relayfs,并且要mount它,下面是作者系统上的使用该模块的输出信息:

1
2
3
4
5
6
$ mkdir -p /relayfs
$ insmod ./relayfs-exam.ko
$ mount -t relayfs relayfs /relayfs
$ cat /relayfs/example0
$

relayfs是一种比较复杂的内核态与用户态的数据交换方式,本例子程序只提供了一个较简单的使用方式,对于复杂的使用,请参考relayfs用例页面http://relayfs.sourceforge.net/examples.html%E3%80%82

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
//kernel module: relayfs-exam.c
#include <linux/module.h>
#include <linux/relayfs_fs.h>
#include <linux/string.h>
#include <linux/sched.h>

#define WRITE_PERIOD (HZ * 60)
static struct rchan * chan;
static size_t subbuf_size = 65536;
static size_t n_subbufs = 4;
static char buffer[256];

void relayfs_exam_write(unsigned long data);

static DEFINE_TIMER(relayfs_exam_timer, relayfs_exam_write, 0, 0);

void relayfs_exam_write(unsigned long data)
{
	int len;
	task_t * p = NULL;

	len = sprintf(buffer, "Current all the processes:\n");
	len += sprintf(buffer + len, "process name\t\tpid\n");
	relay_write(chan, buffer, len);

	for_each_process(p) {
		len = sprintf(buffer, "%s\t\t%d\n", p->comm, p->pid);
		relay_write(chan, buffer, len);
	}
	len = sprintf(buffer, "\n\n");
	relay_write(chan, buffer, len);

	relayfs_exam_timer.expires = jiffies + WRITE_PERIOD;
	add_timer(&relayfs_exam_timer);
}


/*
* subbuf_start() relayfs callback.
*
* Defined so that we can 1) reserve padding counts in the sub-buffers, and
* 2) keep a count of events dropped due to the buffer-full condition.
*/
static int subbuf_start(struct rchan_buf *buf,
				void *subbuf,
				void *prev_subbuf,
				unsigned int prev_padding)
{
	if (prev_subbuf)
		*((unsigned *)prev_subbuf) = prev_padding;

	if (relay_buf_full(buf))
		return 0;

	subbuf_start_reserve(buf, sizeof(unsigned int));

	return 1;
}

/*
* relayfs callbacks
*/
static struct rchan_callbacks relayfs_callbacks =
{
	.subbuf_start = subbuf_start,
};

/**
* module init - creates channel management control files
*
* Returns 0 on success, negative otherwise.
*/
static int init(void)
{

	chan = relay_open("example", NULL, subbuf_size,
	n_subbufs, &relayfs_callbacks);

	if (!chan) {
		printk("relay channel creation failed.\n");
		return 1;
	}
	relayfs_exam_timer.expires = jiffies + WRITE_PERIOD;
	add_timer(&relayfs_exam_timer);

	return 0;
}

static void cleanup(void)
{
	del_timer_sync(&relayfs_exam_timer);
	if (chan) {
		relay_close(chan);
		chan = NULL;
	}
}

module_init(init);
module_exit(cleanup);
MODULE_LICENSE("GPL");

cgroups介绍、使用

http://blog.csdn.net/jesseyoung/article/details/39077829

http://tech.meituan.com/cgroups.html

http://www.cnblogs.com/lisperl/tag/%E8%99%9A%E6%8B%9F%E5%8C%96%E6%8A%80%E6%9C%AF/

1 cgroup简介

Cgroups是control groups的缩写,是Linux内核提供的一种可以限制、记录、隔离进程组(process groups)所使用的物理资源(如:cpu,memory,IO等等)的机制。最初由google的工程师提出,后来被整合进Linux内核。也是目前轻量级虚拟化技术 lxc (linux container)的基础之一。

2 cgroup作用

Cgroups最初的目标是为资源管理提供的一个统一的框架,既整合现有的cpuset等子系统,也为未来开发新的子系统提供接口。现在的cgroups适用于多种应用场景,从单个进程的资源控制,到实现操作系统层次的虚拟化(OS Level Virtualization)。Cgroups提供了以下功能:

1.限制进程组可以使用的资源数量(Resource limiting )。比如:memory子系统可以为进程组设定一个memory使用上限,一旦进程组使用的内存达到限额再申请内存,就会出发OOM(out of memory)。

2.进程组的优先级控制(Prioritization )。比如:可以使用cpu子系统为某个进程组分配特定cpu share。

3.记录进程组使用的资源数量(Accounting )。比如:可以使用cpuacct子系统记录某个进程组使用的cpu时间

4.进程组隔离(Isolation)。比如:使用ns子系统可以使不同的进程组使用不同的namespace,以达到隔离的目的,不同的进程组有各自的进程、网络、文件系统挂载空间。

5.进程组控制(Control)。比如:使用freezer子系统可以将进程组挂起和恢复。

3 cgroup相关概念

3.1 相关概念

1.任务(task)。在cgroups中,任务就是系统的一个进程。

2.控制族群(control group)。控制族群就是一组按照某种标准划分的进程。Cgroups中的资源控制都是以控制族群为单位实现。一个进程可以加入到某个控制族群,也从一个进程组迁移到另一个控制族群。一个进程组的进程可以使用cgroups以控制族群为单位分配的资源,同时受到cgroups以控制族群为单位设定的限制。

3.层级(hierarchy)。控制族群可以组织成hierarchical的形式,既一颗控制族群树。控制族群树上的子节点控制族群是父节点控制族群的孩子,继承父控制族群的特定的属性。

4.子系统(subsystem)。一个子系统就是一个资源控制器,比如cpu子系统就是控制cpu时间分配的一个控制器。子系统必须附加(attach)到一个层级上才能起作用,一个子系统附加到某个层级以后,这个层级上的所有控制族群都受到这个子系统的控制。

3.2 相互关系

1.每次在系统中创建新层级时,该系统中的所有任务都是那个层级的默认 cgroup(我们称之为 root cgroup ,此cgroup在创建层级时自动创建,后面在该层级中创建的cgroup都是此cgroup的后代)的初始成员。

2.一个子系统最多只能附加到一个层级。

3.一个层级可以附加多个子系统

4.一个任务可以是多个cgroup的成员,但是这些cgroup必须在不同的层级。

5.系统中的进程(任务)创建子进程(任务)时,该子任务自动成为其父进程所在 cgroup 的成员。然后可根据需要将该子任务移动到不同的 cgroup 中,但开始时它总是继承其父任务的cgroup。

4 cgroup子系统介绍

1
2
3
4
5
6
7
8
9
blkio   -- 这个子系统为块设备设定输入/输出限制,比如物理设备(磁盘,固态硬盘,USB 等等)。
cpu     -- 这个子系统使用调度程序提供对 CPU 的 cgroup 任务访问。
cpuacct -- 这个子系统自动生成 cgroup 中任务所使用的 CPU 报告。
cpuset  -- 这个子系统为 cgroup 中的任务分配独立 CPU(在多核系统)和内存节点。
devices -- 这个子系统可允许或者拒绝 cgroup 中的任务访问设备。
freezer -- 这个子系统挂起或者恢复 cgroup 中的任务。
memory  -- 这个子系统设定 cgroup 中任务使用的内存限制,并自动生成由那些任务使用的内存资源报告。
net_cls -- 这个子系统使用等级识别符(classid)标记网络数据包,可允许 Linux 流量控制程序(tc)识别从具体 cgroup 中生成的数据包。
ns      -- 名称空间子系统。

5 cgroup安装(centos下)

若系统未安装则进行安装,若已安装则进行更新。

1
[root@localhost ~]# yum install libcgroup  

查看运行状态,并启动服务

1
2
3
4
5
6
[root@localhost ~]# service cgconfig status  
Stopped  
[root@localhost ~]# service cgconfig start  
Starting cgconfig service:                                 [  OK  ]  
[root@localhost ~]# service cgconfig status  
Running  

6 cgroup配置

6.1 配置文件介绍

6.1.1 cgroup配置文件所在位置

1
/etc/cgconfig.conf  

6.1.2 默认配置文件内容

1
2
3
4
5
6
7
8
9
10
mount {  
	cpuset  = /cgroup/cpuset;  
	cpu     = /cgroup/cpu;  
	cpuacct = /cgroup/cpuacct;  
	memory  = /cgroup/memory;  
	devices = /cgroup/devices;  
	freezer = /cgroup/freezer;  
	net_cls = /cgroup/net_cls;  
	blkio   = /cgroup/blkio;  
}  

相当于执行命令

1
2
3
4
5
mkdir /cgroup/cpuset  
mount -t cgroup -o cpuset red /cgroup/cpuset  
……  
mkdir /cgroup/blkio  
mount -t cgroup -o cpuset red /cgroup/blkio  

6.1.3 cgroup section的语法格式如下

1
2
3
4
5
6
7
group <name> {  
	[<permissions>]  
	<controller> {  
		<param name> = <param value>;  
	}  
…}  

name: 指定cgroup的名称
permissions:可选项,指定cgroup对应的挂载点文件系统的权限,root用户拥有所有权限。
controller: 子系统的名称
param name 和 param value:子系统的属性及其属性值

7 cgroup实例分析(限制mysql资源使用)

7.1 配置对mysql实例的资源限制

前提:mysql数据库已在机器上安装

7.1.1 修改cgconfig.conf文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
mount {  
	cpuset  = /cgroup/cpuset;  
	cpu = /cgroup/cpu;  
	cpuacct = /cgroup/cpuacct;  
	memory  = /cgroup/memory;  
	blkio   = /cgroup/blkio;  
}  

group mysql_g1 {    
	cpu {  
		cpu.cfs_quota_us = 50000;  
		cpu.cfs_period_us = 100000;  
	}  
	cpuset {    
		cpuset.cpus = "3";    
		cpuset.mems = "0";    
	}    
	cpuacct{  
  
	}  
	memory {    
		memory.limit_in_bytes=104857600;  
		memory.swappiness=0;  
		# memory.max_usage_in_bytes=104857600;  
		# memory.oom_control=0;  
	}   
	blkio  {  
		blkio.throttle.read_bps_device="8:0 524288";  
		blkio.throttle.write_bps_device="8:0 524288";  
	}   
}   

7.1.2 配置文件的部分解释。

cpu:cpu使用时间限额。

cpu.cfs_period_us和cpu.cfs_quota_us来限制该组中的所有进程在单位时间里可以使用的cpu时间。这里的cfs是完全公平调度器的缩写。cpu.cfs_period_us就是时间周期(微秒),默认为100000,即百毫秒。cpu.cfs_quota_us就是在这期间内可使用的cpu时间(微秒),默认-1,即无限制。(cfs_quota_us是cfs_period_us的两倍即可限定在双核上完全使用)。

cpuset:cpu绑定

我们限制该组只能在0一共1个超线程上运行。cpuset.mems是用来设置内存节点的。

本例限制使用超线程0上的第四个cpu线程。

其实cgconfig也就是帮你把配置文件中的配置整理到/cgroup/cpuset这个目录里面,比如你需要动态设置mysql_group1/ cpuset.cpus的CPU超线程号,可以采用如下的办法。

1
[root@localhost ~]# echo "0" > mysql_group1/ cpuset.cpus  

cpuacct:cpu资源报告

memory:内存限制

内存限制我们主要限制了MySQL可以使用的内存最大大小memory.limit_in_bytes=256M。而设置swappiness为0是为了让操作系统不会将MySQL的内存匿名页交换出去。

blkio:BLOCK IO限额

blkio.throttle.read_bps_device=“8:0 524288”; #每秒读数据上限
blkio.throttle.write_bps_device=“8:0 524288”; #每秒写数据上限

其中8:0对应主设备号和副设备号,可以通过ls -l /dev/sda查看

1
2
[root@localhost /]# ls -l /dev/sda  
brw-rw----. 1 root disk 8, 0 Sep 15 04:19 /dev/sda

7.1.4 修改cgrules.conf文件

1
2
3
4
5
6
7
8
9
10
11
12
[root@localhost ~]# vi /etc/cgrules.conf  
# /etc/cgrules.conf  
#The format of this file is described in cgrules.conf(5)  
#manual page.  
#  
# Example:  
#<user>         <controllers>   <destination>  
#@student       cpu,memory      usergroup/student/  
#peter          cpu             test1/  
#%              memory          test2/  

*:/usr/local/mysql/bin/mysqld * mysql_g1  

注:共分为3个部分,分别为需要限制的实例,限制的内容(如cpu,memory),挂载目标。

7.2 使配置生效

1
2
3
4
5
6
[root@localhost ~]# /etc/init.d/cgconfig restart  
Stopping cgconfig service:                                 [  OK  ]  
Starting cgconfig service:                                 [  OK  ]  
[root@localhost ~]# /etc/init.d/cgred restart  
Stopping CGroup Rules Engine Daemon...                     [  OK  ]  
Starting CGroup Rules Engine Daemon:                       [  OK  ]  

注:重启顺序为cgconfig -> cgred ,更改配置文件后两个服务需要重启,且顺序不能错。

7.3 启动MySQL,查看MySQL是否处于cgroup的限制中
1
2
3
4
[root@localhost ~]# ps -eo pid,cgroup,cmd | grep -i mysqld  
29871 blkio:/;net_cls:/;freezer:/;devices:/;memory:/;cpuacct:/;cpu:/;cpuset:/ /bin/sh ./bin/mysqld_safe --defaults-file=/etc/my.cnf --basedir=/usr/local/mysql/ --datadir=/usr/local/mysql/data/  
30219 blkio:/;net_cls:/;freezer:/;devices:/;memory:/;cpuacct:/;cpu:/;cpuset:/mysql_g1 /usr/local/mysql/bin/mysqld --defaults-file=/etc/my.cnf --basedir=/usr/local/mysql/ --datadir=/usr/local/mysql/data/ --plugin-dir=/usr/local/mysql//lib/plugin --user=mysql --log-error=/usr/local/mysql/data//localhost.localdomain.err --pid-file=/usr/local/mysql/data//localhost.localdomain.pid --socket=/tmp/mysql.sock --port=3306  
30311 blkio:/;net_cls:/;freezer:/;devices:/;memory:/;cpuacct:/;cpu:/;cpuset:/ grep -i mysqld  


不改配置文件,用命令实时配置

比如通过命令

1
cgcreate -t sankuai:sankuai -g cpu:test

就可以在 cpu 子系统下建立一个名为 test 的节点。

当需要删除某一个 cgroups 节点的时候,可以使用 cgdelete 命令,比如要删除上述的 test 节点,可以使用 cgdelete -r cpu:test命令进行删除

然后可以通过写入需要的值到 test 下面的不同文件,来配置需要限制的资源。每个子系统下面都可以进行多种不同的配置,需要配置的参数各不相同,详细的参数设置需要参考 cgroups 手册。使用 cgset 命令也可以设置 cgroups 子系统的参数,格式为 cgset -r parameter=value path_to_cgroup。

把进程加入到 cgroups 子节点也有多种方法,可以直接把 pid 写入到子节点下面的 task 文件中。也可以通过 cgclassify 添加进程,格式为

1
cgclassify -g subsystems:path_to_cgroup pidlist

也可以直接使用 cgexec 在某一个 cgroups 下启动进程,格式为

1
gexec -g subsystems:path_to_cgroup1 -g subsystems:path_to_cgroup2 command arguments.

把任务的cpu资源使用率限制在了50%。

首先在 cpu 子系统下面创建了一个 halfapi 的子节点:

1
cgcreate abc:abc -g cpu:halfapi

然后在配置文件中写入配置数据:

1
echo 50000 > /cgroup/cpu/halfapi/cpu.cfs_quota_us

cpu.cfs_quota_us中的默认值是100000,写入50000表示只能使用50%的 cpu 运行时间。

最后在这个cgroups中启动这个任务:

1
cgexec -g "cpu:/halfapi" php halfapi.php half >/dev/null 2>&1

Linux RCU机制详解

http://wenku.baidu.com/link?url=bzayVU6qmUlc6UO9WGdgdxGDzrRBVWCiwjysigFxYuJToiZgtaXF5ss01GENBv4l4xxfyedZtm2Ehz7StlFSIECo65pdZHI3kZxuPv5zzwO

http://blog.csdn.net/lili20082008/article/details/17675093

http://blog.csdn.net/junguo/article/details/8244530

一:前言

RCU机制出现的比较早,只是在linux kernel中一直到2.5版本的时候才被采用.关于RCU机制,这里就不做过多的介绍了,网上有很多有关RCU介绍和使用的文档.请自行查阅.本文主要是从linux kernel源代码的角度.来分析RCU的实现.

在讨论RCU的实现之前.有必要重申以下几点:

1:RCU使用在读者多而写者少的情况.RCU和读写锁相似.但RCU的读者占锁没有任何的系统开销.写者与写写者之间必须要保持同步,且写者必须要等它之前的读者全部都退出之后才能释放之前的资源.

2:RCU保护的是指针.这一点尤其重要.因为指针赋值是一条单指令.也就是说是一个原子操作.因它更改指针指向没必要考虑它的同步.只需要考虑cache的影响.

3:读者是可以嵌套的.也就是说rcu_read_lock()可以嵌套调用.

4:读者在持有rcu_read_lock()的时候,不能发生进程上下文切换.否则,因为写者需要要等待读者完成,写者进程也会一直被阻塞.

以下的代码是基于linux kernel 2.6.26

二:使用RCU的实例

Linux kernel中自己附带有详细的文档来介绍RCU,这些文档位于linux-2.6.26.3/Documentation/RCU. 这些文档值得多花点时间去仔细研读一下.

下面以whatisRCU.txt中的例子作为今天分析的起点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct foo {
	int a;
	char b;
	long c;
};
DEFINE_SPINLOCK(foo_mutex);

struct foo *gbl_foo;
void foo_update_a(int new_a)
{
	struct foo *new_fp;
	struct foo *old_fp;

	new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
	spin_lock(&foo_mutex);
	old_fp = gbl_foo;
	*new_fp = *old_fp;
	new_fp->a = new_a;
	rcu_assign_pointer(gbl_foo, new_fp);
	spin_unlock(&foo_mutex);
	synchronize_rcu();
	kfree(old_fp);
}

int foo_get_a(void)
{
	int retval;

	rcu_read_lock();
	retval = rcu_dereference(gbl_foo)->a;
	rcu_read_unlock();
	return retval;
}

如上代码所示,RCU被用来保护全局指针struct foo *gbl_foo. foo_get_a()用来从RCU保护的结构中取得gbl_foo的值.而foo_update_a()用来更新被RCU保护的gbl_foo的值.

另外,我们思考一下,为什么要在foo_update_a()中使用自旋锁foo_mutex呢?

假设中间没有使用自旋锁.那foo_update_a()的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void foo_update_a(int new_a)
{
	struct foo *new_fp;
	struct foo *old_fp;

	new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);

	old_fp = gbl_foo;
	1:-------------------------     
	*new_fp = *old_fp;
	new_fp->a = new_a;
	rcu_assign_pointer(gbl_foo, new_fp);

	synchronize_rcu();
	kfree(old_fp);
}

假设A进程在上图—-标识处被B进程抢点.B进程也执行了goo_ipdate_a().等B执行完后,再切换回A进程.此时,A进程所持的old_fd实际上已经被B进程给释放掉了.此后A进程对old_fd的操作都是非法的.

另外,我们在上面也看到了几个有关RCU的核心API.它们为别是:

1
2
3
4
5
rcu_read_lock()
rcu_read_unlock()
synchronize_rcu()
rcu_assign_pointer()
rcu_dereference()

其中,rcu_read_lock()和rcu_read_unlock()用来保持一个读者的RCU临界区.在该临界区内不允许发生上下文切换.

rcu_dereference():读者调用它来获得一个被RCU保护的指针.

Rcu_assign_pointer():写者使用该函数来为被RCU保护的指针分配一个新的值.这样是为了安全从写者到读者更改其值.这个函数会返回一个新值

三:RCU API实现分析

Rcu_read_lock()和rcu_read_unlock()的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define rcu_read_lock() __rcu_read_lock()
#define rcu_read_unlock() __rcu_read_unlock()

#define __rcu_read_lock() \
	do { \
		preempt_disable(); \
		__acquire(RCU); \
		rcu_read_acquire(); \
	} while (0)
#define __rcu_read_unlock() \
	do { \
		rcu_read_release(); \
		__release(RCU); \
		preempt_enable(); \
	} while (0)

其中__acquire(),rcu_read_read_acquire(),rcu_read_release(),rcu_read_release()都是一些选择编译函数,可以忽略不可看.因此可以得知.rcu_read_lock(),rcu_read_unlock()只是禁止和启用抢占.因为在读者临界区,不允许发生上下文切换.

rcu_dereference()和rcu_assign_pointer()的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
#define rcu_dereference(p)     ({ \
				typeof(p) _________p1 = ACCESS_ONCE(p); \
				smp_read_barrier_depends(); \
				(_________p1); \
				})
#define rcu_assign_pointer(p, v) \
	({ \
		if (!__builtin_constant_p(v) || \
			((v) != NULL)) \
			smp_wmb(); \
		(p) = (v); \
	})

它们的实现也很简单.因为它们本身都是原子操作.因为只是为了cache一致性,插上了内存屏障.可以让其它的读者/写者可以看到保护指针的最新值.

synchronize_rcu()在RCU中是一个最核心的函数,它用来等待之前的读者全部退出.我们后面的大部份分析也是围绕着它而进行.实现如下:

1
2
3
4
5
6
7
8
9
10
11
void synchronize_rcu(void)
{
	struct rcu_synchronize rcu;

	init_completion(&rcu.completion);
	/* Will wake me after RCU finished */
	call_rcu(&rcu.head, wakeme_after_rcu);

	/* Wait for it */
	wait_for_completion(&rcu.completion);
}

我们可以看到,它初始化了一个本地变量,它的类型为struct rcu_synchronize.调用call_rcu()之后.一直等待条件变量rcu.competion的满足.

在这里看到了RCU的另一个核心API,它就是call_run().它的定义如下:

1
2
void call_rcu(struct rcu_head *head,
				void (*func)(struct rcu_head *rcu))

它用来等待之前的读者操作完成之后,就会调用函数func.

我们也可以看到,在synchronize_rcu()中,读者操作完了要调用的函数就是wakeme_after_rcu().

另外,call_rcu()用在不可睡眠的条件中,如果中断环境,禁止抢占环境等.而synchronize_rcu()用在可睡眠的环境下.先跟踪看一下wakeme_after_rcu():

1
2
3
4
5
6
7
static void wakeme_after_rcu(struct rcu_head  *head)
{
	struct rcu_synchronize *rcu;

	rcu = container_of(head, struct rcu_synchronize, head);
	complete(&rcu->completion);
}

我们可以看到,该函数将条件变量置真,然后唤醒了在条件变量上等待的进程.

看下call_rcu():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void call_rcu(struct rcu_head *head,
				void (*func)(struct rcu_head *rcu))
{
	unsigned long flags;
	struct rcu_data *rdp;

	head->func = func;
	head->next = NULL;
	local_irq_save(flags);
	rdp = &__get_cpu_var(rcu_data);
	*rdp->nxttail = head;
	rdp->nxttail = &head->next;
	if (unlikely(++rdp->qlen > qhimark)) {
		rdp->blimit = INT_MAX;
		force_quiescent_state(rdp, &rcu_ctrlblk);
	}
	local_irq_restore(flags);
}

该函数也很简单,就是将head加在了per_cpu变量rcu_data的tail链表上.

Rcu_data定义如下:

1
DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };

由此,我们可以得知,每一个CPU都有一个rcu_data.每个调用call_rcu()/synchronize_rcu()进程所代表的head都会挂到rcu_data的tail链表上.

那究竟怎么去判断当前的写者已经操作完了呢?我们在之前看到,不是读者在调用rcu_read_lock()的时候要禁止抢占么?因此,我们只需要判断如有的CPU都进过了一次上下文切换,就说明所有读者已经退出了.

http://www.ibm.com/developerworks/cn/linux/l-rcu/ 中有关这个过程的描述:

“等待适当时机的这一时期称为grace period,而CPU发生了上下文切换称为经历一个quiescent state,grace period就是所有CPU都经历一次quiescent state所需要的等待的时间。垃圾收集器就是在grace period之后调用写者注册的回调函数来完成真正的数据修改或数据释放操作的”

要彻底弄清楚这个问题,我们得从RCU的初始化说起.

四:从RCU的初始化说起

RCU的初始化位于start_kernel()àrcu_init().代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
void __init rcu_init(void)
{
	__rcu_init();
}

void __init __rcu_init(void)
{
	rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
			(void *)(long)smp_processor_id());
	/* Register notifier for non-boot CPUs */
	register_cpu_notifier(&rcu_nb);
}

Reqister_cpu_notifier()是关于通知链表的操作,可以忽略不看.

跟进rcu_cpu_notify():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
			unsigned long action, void *hcpu)
{
	long cpu = (long)hcpu;

	switch (action) {
	case CPU_UP_PREPARE:
	case CPU_UP_PREPARE_FROZEN:
		rcu_online_cpu(cpu);
		break;
	case CPU_DEAD:
	case CPU_DEAD_FROZEN:
		rcu_offline_cpu(cpu);
		break;
	default:
		break;
	}
	return NOTIFY_OK;
}

注意到,在__rcu_init()中是以CPU_UP_PREPARE为参数调用此函数,对应流程转入rcu_online_cpu中:

1
2
3
4
5
6
7
8
9
static void __cpuinit rcu_online_cpu(int cpu)
{
	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
	struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);

	rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
	rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);
}

我们从这里又看到了另一个per_cpu变量,rcu_bh_data.有关bh的部份之后再来分析.在这里略过这些部份.

Rcu_init_percpu_data()如下:

1
2
3
4
5
6
7
8
9
10
11
12
static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
						struct rcu_data *rdp)
{
	memset(rdp, 0, sizeof(*rdp));
	rdp->curtail = &rdp->curlist;
	rdp->nxttail = &rdp->nxtlist;
	rdp->donetail = &rdp->donelist;
	rdp->quiescbatch = rcp->completed;
	rdp->qs_pending = 0;
	rdp->cpu = cpu;
	rdp->blimit = blimit;
}

调用这个函数的第二个参数是一个全局变量rcu_ctlblk.定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
static struct rcu_ctrlblk rcu_ctrlblk = {
	.cur = -300,
	.completed = -300,
	.lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
	.cpumask = CPU_MASK_NONE,
};
static struct rcu_ctrlblk rcu_bh_ctrlblk = {
	.cur = -300,
	.completed = -300,
	.lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
	.cpumask = CPU_MASK_NONE,
};

在rcu_init_percpu_data中,初始化了三个链表,分别是taillist,curlist和donelist.另外, 将rdp->quiescbatch 赋值为 rcp->completed.这个是一个很重要的操作.

Rdp-> quiescbatch表示rcu_data已经完成的grace period序号(在代码中也被称为了batch),rcp->completed表示全部变量rcu_ctrlblk计数已经完成的grace period序号.将rdp->quiescbatch = rcp->completed;,表示不需要等待grace period.

回到rcu_online_cpu()中:

1
open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);

初始化了RCU_SOFTIRQ类型的软中断.但这个软中断什么时候被打开,还需要之后来分析.

之后,每个CPU的初始化都会经过start_kernel()->rcu_init().相应的,也为每个CPU初始化了RCU的相关结构.

五:等待RCU读者操作完成

之前,我们看完了RCU的初始化,现在可以来看一下RCU如何来判断当前的RCU读者已经退出了.

在每一次进程切换的时候,都会调用rcu_qsctr_inc().如下代码片段如示:

1
2
3
4
5
6
7
asmlinkage void __sched schedule(void)
{
	......
	......
	rcu_qsctr_inc(cpu);
	......
}

Rcu_qsctr_inc()代码如下:

1
2
3
4
5
static inline void rcu_qsctr_inc(int cpu)
{
	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
	rdp->passed_quiesc = 1;
}

该函数将对应CPU上的rcu_data的passed_quiesc成员设为了1.

或许你已经发现了,这个过程就标识该CPU经过了一次quiescent state.没错:-)

另外,在时钟中断中,会进行以下操作:

1
2
3
4
5
6
7
8
9
10
void update_process_times(int user_tick)
{
	......
	......

	if (rcu_pending(cpu))
		rcu_check_callbacks(cpu, user_tick);
	......
	......
}

在每一次时钟中断,都会检查是否有需要更新的RCU需要处理,如果有,就会为其调用rcu_check_callbacks().

Rcu_pending()的代码如下:

1
2
3
4
5
int rcu_pending(int cpu)
{
	return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
		__rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
}

同上面一样,忽略bh的部份.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
{
	/* This cpu has pending rcu entries and the grace period
	 * for them has completed.
	 */
	if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
		return 1;

	/* This cpu has no pending entries, but there are new entries */
	if (!rdp->curlist && rdp->nxtlist)
		return 1;

	/* This cpu has finished callbacks to invoke */
	if (rdp->donelist)
		return 1;

	/* The rcu core waits for a quiescent state from the cpu */
	if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
		return 1;

	/* nothing to do */
	return 0;
}

上面有四种情况会返回1,分别对应:

1:该CPU上有等待处理的回调函数,且已经经过了一个batch(grace period).rdp->datch表示rdp在等待的batch序号

2:上一个等待已经处理完了,又有了新注册的回调函数.

3:等待已经完成,但尚末调用该次等待的回调函数.

4:在等待quiescent state.

关于rcp和rdp结构中成员的含义,我们等用到的时候再来分析.

如果rcu_pending返回1,就会进入到rcu_check_callbacks().代码如下:

1
2
3
4
5
6
7
8
9
10
11
void rcu_check_callbacks(int cpu, int user)
{
	if (user ||
		(idle_cpu(cpu) && !in_softirq() &&
				hardirq_count() 
		rcu_qsctr_inc(cpu);
		rcu_bh_qsctr_inc(cpu);
	} else if (!in_softirq())
		rcu_bh_qsctr_inc(cpu);
	raise_rcu_softirq();
}

如果已经CPU中运行的进程是用户空间进程或者是CPU空闲且不处于中断环境,那么,它也已经进过了一次切换.注意,RCU只能在内核空间使用.

最后调用raise_rcu_softirq()打开了软中断处理.相应的,也就调用RCU的软中断处理函数.结合上面分析的初始化流程,软中断的处理函数为rcu_process_callbacks().

代码如下:

1
2
3
4
5
static void rcu_process_callbacks(struct softirq_action *unused)
{
	__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
	__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
}

在阅读__rcu_process_callbacks()之前,先来了解一下rdp中几个链表的含义:

每次新注册的回调函数,都会链入到rdp->taillist.

当前等待grace period完成的函数都会链入到rdp->curlist上.

到等待的grace period已经到来,就会将curlist上的链表移到donelist上.

当一个grace period过了之后,就会将taillist上的数据移到rdp->curlist上.之后加册的回调函数又会将其加到rdp->taillist上.

__rcu_process_callbacks()代码分段分析如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
					struct rcu_data *rdp)
{
	if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
		*rdp->donetail = rdp->curlist;
		rdp->donetail = rdp->curtail;
		rdp->curlist = NULL;
		rdp->curtail = &rdp->curlist;
	}

	如果有需要处理的回调函数,且已经经过了一次grace period.就将curlist上的数据移到donetlist上.
其中,crp->completed表示已经完成的grace period.rdp->batch表示该CPU正在等待的grace period序号.

	if (rdp->nxtlist && !rdp->curlist) {
		local_irq_disable();
		rdp->curlist = rdp->nxtlist;
		rdp->curtail = rdp->nxttail;
		rdp->nxtlist = NULL;
		rdp->nxttail = &rdp->nxtlist;
		local_irq_enable();

		/*
		 * start the next batch of callbacks
		 */

		/* determine batch number */
		rdp->batch = rcp->cur + 1;
		/* see the comment and corresponding wmb() in
		 * the rcu_start_batch()
		 */
		smp_rmb();

		if (!rcp->next_pending) {
			/* and start it/schedule start if it's a new batch */
			spin_lock(&rcp->lock);
			rcp->next_pending = 1;
			rcu_start_batch(rcp);
			spin_unlock(&rcp->lock);
		}
	}
如果上一个等待的回调函数处理完了,而且又有了新注册的回调函数.就将taillist上的数据移动到curlist上.并开启新的grace period等待.
注意里面几个变量的赋值: 
rdp->batch = rcp->cur + 1表示该CPU等待的grace period置为当前已发生grace period序号的下一个.
每次启动一个新的grace period等待之后,就会将rcp->next_pending.在启动的过程中,也就是rcu_start_batch()的过程中,会将rcp->next_pending置为1.设置这个变量主要是防止多个写者竞争的情况

	//更新相关信息
	rcu_check_quiescent_state(rcp, rdp);
	//处理等待完成的回调函数
	if (rdp->donelist)
		rcu_do_batch(rdp);
}

接着,更新相关的信息,例如,判断当前CPU是否进行了quiescent state.或者grace period是否已经完成.

最后再处理挂在rdp->donelist上的链表.

这里面有几个子函数值得好好分析,分别分析如下:

第一个要分析的是rcu_start_batch():

1
2
3
4
5
6
7
8
9
10
11
12
13
static void rcu_start_batch(struct rcu_ctrlblk *rcp)
{
	if (rcp->next_pending &&
			rcp->completed == rcp->cur) {
		rcp->next_pending = 0;
		smp_wmb();
		rcp->cur++;
		smp_mb();
		cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);

		rcp->signaled = 0;
	}
}

这个函数的代码虽然很简单,但隐藏了很多玄机.

每次启动一个新的grace period等待的时候就将rcp->cur加1,将rcp->cpumask中,将存在的CPU的位置1.

其中,if判断必须要满足二个条件:

第一:rcp->next_pending必须为1.我们把这个函数放到__rcu_process_callbacks()这个大环境中看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
					struct rcu_data *rdp)
{
	......
	......
	if (rdp->nxtlist && !rdp->curlist) {
		......
		if (!rcp->next_pending) {
			/* and start it/schedule start if it's a new batch */
			spin_lock(&rcp->lock);
			rcp->next_pending = 1;
			rcu_start_batch(rcp);
			spin_unlock(&rcp->lock);
		}
	}
}

首先,rcp->next_pending为0才会调用rcu_start_batch()启动一个新的进程.然后,将rcp->next_pending置为1,再调用rcu_start_batch().在这里要注意中间的自旋锁.然后在rcu_start_batch()中,再次判断rcp->next_pending为1后,再进行后续操作,并将rcp->next_pending置为0.

为什么这里需要这样的判断呢? 如果其它CPU正在开启一个新的grace period等待,那就用不着再次开启一个新的等待了,直接返回即可.

第二: rcu_start_batch()中if要满足的第二个条件为rcp->completed == rcp->cur.也就是说前面的grace period全部都完成了.每次开启新等待的时候都会将rcp->cur加1.每一个等待完成之后,都会将rc-> completed等于rcp->cur.

第二个要分析的函数是rcu_check_quiescent_state().代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
					struct rcu_data *rdp)
{
	if (rdp->quiescbatch != rcp->cur) {
		/* start new grace period: */
		rdp->qs_pending = 1;
		rdp->passed_quiesc = 0;
		rdp->quiescbatch = rcp->cur;
		return;
	}

	/* Grace period already completed for this cpu?
	 * qs_pending is checked instead of the actual bitmap to avoid
	 * cacheline trashing.
	 */
	if (!rdp->qs_pending)
		return;

	/*
	 * Was there a quiescent state since the beginning of the grace
	 * period? If no, then exit and wait for the next call.
	 */
	if (!rdp->passed_quiesc)
		return;
	rdp->qs_pending = 0;

	spin_lock(&rcp->lock);
	/*
	 * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
	 * during cpu startup. Ignore the quiescent state.
	 */
	if (likely(rdp->quiescbatch == rcp->cur))
		cpu_quiet(rdp->cpu, rcp);

	spin_unlock(&rcp->lock);
}

首先,如果rdp->quiescbatch != rcp->cur.则说明又开启了一个新的等待,因此需要重新处理这个等待,首先将rdp->quiescbatch 更新为rcp->cur.然后,使rdp->qs_pending为1.表示有等待需要处理. passed_quiesc也被清成了0.

然后,再判断rdp->passed_quiesc是否为真,记得我们在之前分析过,在每次进程切换或者进程切换的时候,都会调用rcu_qsctr_inc().该函数会将rdp->passed_quiesc置为1. 因此,在这里判断这个值是为了检测该CPU上是否发生了上下文切换.

之后,就是一段被rcp->lock保护的一段区域.如果还是等待没有发生改变,就会调用cpu_quiet(rdp->cpu, rcp)将该CPU位清零.如果是一个新的等待了,就用不着清了,因为需要重新判断该CPU上是否发生了上下文切换.

cpu_quiet()函数代码如下:

1
2
3
4
5
6
7
8
9
static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
{
	cpu_clear(cpu, rcp->cpumask);
	if (cpus_empty(rcp->cpumask)) {
		/* batch completed ! */
		rcp->completed = rcp->cur;
		rcu_start_batch(rcp);
	}
}

它清除当前CPU对应的位,如果CPMMASK为空,对应所有的CPU都发生了进程切换,就会将rcp->completed = rcp->cur.并且根据需要是否开始一个grace period等待.

最后一个要分析的函数是rcu_do_batch().它进行的是清尾的工作.如果等待完成了,那就必须要处理donelist链表上挂载的数据了.代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static void rcu_do_batch(struct rcu_data *rdp)
{
	struct rcu_head *next, *list;
	int count = 0;

	list = rdp->donelist;
	while (list) {
		next = list->next;
		prefetch(next);
		list->func(list);
		list = next;
		if (++count >= rdp->blimit)
			break;
	}
	rdp->donelist = list;

	local_irq_disable();
	rdp->qlen -= count;
	local_irq_enable();
	if (rdp->blimit == INT_MAX && rdp->qlen 
		rdp->blimit = blimit;

	if (!rdp->donelist)
		rdp->donetail = &rdp->donelist;
	else
		raise_rcu_softirq();
}

它遍历处理挂在链表上的回调函数.在这里,注意每次调用的回调函数有最大值限制.这样做主要是防止一次调用过多的回调函数而产生不必要系统负载.如果donelist中还有没处理完的数据,打开RCU软中断,在下次软中断到来的时候接着处理.

五:几种RCU情况分析

1:如果CPU 1上有进程调用rcu_read_lock进入临界区,之后退出来,发生了进程切换,新进程又通过rcu_read­_lock进入临界区.由于RCU软中断中只判断一次上下文切换,因此,在调用回调函数的时候,仍然有进程处于RCU的读临界区,这样会不会有问题呢?

这样是不会有问题的.还是上面的例子:

1
2
3
4
5
6
7
8
	spin_lock(&foo_mutex);
	old_fp = gbl_foo;
	*new_fp = *old_fp;
	new_fp->a = new_a;
	rcu_assign_pointer(gbl_foo, new_fp);
	spin_unlock(&foo_mutex);
	synchronize_rcu();
	kfree(old_fp);

使用synchronize_rcu ()只是为了等待持有old_fd(也就是调用rcu_assign_pointer ()更新之前的gbl_foo)的进程退出.而不需要等待所有的读者全部退出.这是因为,在rcu_assign_pointer ()之后的读取取得的保护指针,已经是更新好的新值了.

2:上面分析的似乎是针对有挂载链表的CPU而言的,那对于只调用rcu_read_lock()的CPU,它们是怎么处理的呢?

首先,每次启动一次等待,肯定是会更新rcp->cur的.因此,在rcu_pending()的判断中,下面语句会被满足:

1
2
if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
	return 1;

因此会进入到RCU的软中断.在软中断处理中:

rcu_process_callbacks() -> __rcu_process_callbacks() -> rcu_check_quiescent_state()

中,如果该CPU上有进程切换,就会各新rcp中的CPU 掩码数组.

3:如果一个CPU连续调用synchronize_rcu()或者call_rcu()它们会有什么影响呢?

如果当前有请求在等待,就会新请提交的回调函数挂到taillist上,一直到前一个等待完成,再将taillist的数据移到curlist,并开启一个新的等待,因此,也就是说,在前一个等待期间提交的请求,都会放到一起处理.也就是说,他们会共同等待所有CPU切换完成.

举例说明如下:
假设grace period时间是12ms.在12ms内,先后有A,B,C进程提交请求.
那系统在等待处理完后,交A,B,C移到curlist中,开始一个新的等待.

六:有关rcu_read_lock_bh()/rcu_read_unlock_bh()/call_rcu_bh().

在上面的代码分析的时候,经常看到带有bh的RCU代码.现在来看一下这些带bh的RCU是什么样的.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define rcu_read_lock_bh() __rcu_read_lock_bh()
#define rcu_read_unlock_bh() __rcu_read_unlock_bh()

#define __rcu_read_lock_bh() \
	do { \
		local_bh_disable(); \
		__acquire(RCU_BH); \
		rcu_read_acquire(); \
	} while (0)
#define __rcu_read_unlock_bh() \
	do { \
		rcu_read_release(); \
		__release(RCU_BH); \
		local_bh_enable(); \
	} while (0)

根据上面的分析:bh RCU跟普通的RCU相比不同的是,普通RCU是禁止内核抢占,而bh RCU是禁止下半部.

其实,带bh的RCU一般在软中断使用,不过计算quiescent state并不是发生一次上下文切换.而是发生一次softirq.我们在后面的分析中可得到印证.

Call_rcu_bh()代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void call_rcu_bh(struct rcu_head *head,
				void (*func)(struct rcu_head *rcu))
{
	unsigned long flags;
	struct rcu_data *rdp;

	head->func = func;
	head->next = NULL;
	local_irq_save(flags);
	rdp = &__get_cpu_var(rcu_bh_data);
	*rdp->nxttail = head;
	rdp->nxttail = &head->next;

	if (unlikely(++rdp->qlen > qhimark)) {
		rdp->blimit = INT_MAX;
		force_quiescent_state(rdp, &rcu_bh_ctrlblk);
	}

	local_irq_restore(flags);
}

它跟call_rcu()不相同的是,rcu是取per_cpu变量rcu__data和全局变量rcu_ctrlblk.而bh RCU是取rcu_bh_data,rcu_bh_ctrlblk.他们的类型都是一样的,这样做只是为了区分BH和普通RCU的等待.

对于rcu_bh_qsctr_inc

1
2
3
4
5
static inline void rcu_bh_qsctr_inc(int cpu)
{
	struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
	rdp->passed_quiesc = 1;
}

它跟rcu_qsctr_inc()机同,也是更改对应成员.

所不同的是,调用rcu_bh_qsctr_inc()的地方发生了变化.

1
2
3
4
5
6
7
8
9
10
11
12
13
asmlinkage void __do_softirq(void)
{
	......
		do {
		if (pending & 1) {
			h->action(h);
			rcu_bh_qsctr_inc(cpu);
		}
		h++;
		pending >>= 1;
	} while (pending);
	......
}

也就是说,在发生软中断的时候,才会认为是经过了一次quiescent state.

HAProxy 研究笔记 -- epoll 事件的处理

http://blog.chinaunix.net/uid-10167808-id-3825388.html

本文介绍 HAProxy 中 epoll 事件的处理机制,版本为 1.5-dev17。

1
2
3
4
5
6
7
8
1. 背景知识
	1.1. fd 更新列表
	1.2. fdtab 数据结构
	1.3. fd event 的设置
2. _do_poll() 代码分析
	2.1. 检测 fd 更新列表
	2.2. 获取活动的 fd
	2.3. 处理活动的 fd

HAProxy 支持多种异步机制,有 select,poll,epoll,kqueue 等。本文介绍 epoll 的 相关实现,epoll 的代码在源文件 ev_epoll.c 中。epoll 的关键处理逻辑集中在函数 _do_poll() 中,下面会详细的分析该函数。

1. 背景知识

在分析 _do_poll() 实现之前,有一些关联的设计需要简单介绍一下,以便于理解该函数中 的一些代码。

1.1. fd 更新列表

见 fd.c 中的全局变量:

1
2
3
/* FD status is defined by the poller's status and by the speculative I/O list */
int fd_nbupdt = 0;             // number of updates in the list
unsigned int *fd_updt = NULL;  // FD updates list

这两个全局变量用来记录状态需要更新的 fd 的数量及具体的 fd。_do_poll() 中会根据 这些信息修改对应 fd 的 epoll 设置。

1.2. fdtab 数据结构

struct fdtab 数据结构在 include/types/fd.h 中定义,内容如下:

1
2
3
4
5
6
7
8
9
10
/* info about one given fd */
struct fdtab {
	int (*iocb)(int fd);                 /* I/O handler, returns FD_WAIT_* */
	void *owner;                         /* the connection or listener associated with this fd, NULL if closed */
	unsigned int  spec_p;                /* speculative polling: position in spec list+1. 0=not in list. */
	unsigned char spec_e;                /* speculative polling: read and write events status. 4 bits */
	unsigned char ev;                    /* event seen in return of poll() : FD_POLL_* */
	unsigned char new:1;                 /* 1 if this fd has just been created */
	unsigned char updated:1;             /* 1 if this fd is already in the update list */
};

该结构的成员基本上都有注释,除了前两个成员,其余的都是和 fd IO 处理相关的。后面 分析代码的时候再具体的解释。

src/fd.c 中还有一个全局变量:

1
struct fdtab *fdtab = NULL;     /* array of all the file descriptors */

fdtab[] 记录了 HAProxy 所有 fd 的信息,数组的每个成员都是一个 struct fdtab, 而且成员的 index 正是 fd 的值,这样相当于 hash,可以高效的定位到某个 fd 对应的 信息。

1.3. fd event 的设置

include/proto/fd.h 中定义了一些设置 fd event 的函数:

1
2
3
4
5
6
/* event manipulation primitives for use by I/O callbacks */
static inline void fd_want_recv(int fd)
static inline void fd_stop_recv(int fd)
static inline void fd_want_send(int fd)
static inline void fd_stop_send(int fd)
static inline void fd_stop_both(int fd)

这些函数见名知义,就是用来设置 fd 启动或停止接收以及发送的。这些函数底层调用的 是一系列 fd_ev_XXX() 的函数真正的设置 fd。这里简单介绍一下 fd_ev_set() 的代码:

1
2
3
4
5
6
7
8
9
static inline void fd_ev_set(int fd, int dir)
{
	unsigned int i = ((unsigned int)fdtab[fd].spec_e) & (FD_EV_STATUS << dir);
	...
	if (i & (FD_EV_ACTIVE << dir))
		return; /* already in desired state */
	fdtab[fd].spec_e |= (FD_EV_ACTIVE << dir);
	updt_fd(fd); /* need an update entry to change the state */
}

该函数会判断一下 fd 的对应 event 是否已经设置了。没有设置的话,才重新设置。设置 的结果记录在 struct fdtab 结构的 spec_e 成员上,而且只是低 4 位上。然后调用 updt_fd() 将该 fd 放到 update list 中:

1
2
3
4
5
6
7
8
static inline void updt_fd(const int fd)
{
	if (fdtab[fd].updated)
		/* already scheduled for update */
		return;
	fdtab[fd].updated = 1;
	fd_updt[fd_nbupdt++] = fd;
}

从上面代码可以看出, struct fdtab 中的 updated 成员用来标记当前 fd 是否已经被放 到 update list 中了。没有的话,则更新设置 updated 成员,并且记录到 fd_updt[] 中, 并且增加需要跟新的 fd 的计数 fd_nbupdt。

至此,用于分析 _do_poll() 的一些背景知识介绍完毕。

2. _do_poll() 代码分析

这里将会重点的分析 _do_poll() 的实现。该函数可以粗略分为三部分:

1
2
3
检查 fd 更新列表,获取各个 fd event 的变化情况,并作 epoll 的设置
计算 epoll_wait 的 delay 时间,并调用 epoll_wait,获取活动的 fd
逐一处理所有有 IO 事件的 fd

以下将按顺序介绍这三部分的代码。

2.1. 检测 fd 更新列表

代码如下,后面会按行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
 43 /*
 44  * speculative epoll() poller
 45  */
 46 REGPRM2 static void _do_poll(struct poller *p, int exp)
 47 {
 ..     ..
 53 
 54     /* first, scan the update list to find changes */
 55     for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
 56         fd = fd_updt[updt_idx];
 57         en = fdtab[fd].spec_e & 15;  /* new events */
 58         eo = fdtab[fd].spec_e >> 4;  /* previous events */
 59 
 60         if (fdtab[fd].owner && (eo ^ en)) {
 61             if ((eo ^ en) & FD_EV_POLLED_RW) {
 62                 /* poll status changed */
 63                 if ((en & FD_EV_POLLED_RW) == 0) {
 64                     /* fd removed from poll list */
 65                     opcode = EPOLL_CTL_DEL;
 66                 }
 67                 else if ((eo & FD_EV_POLLED_RW) == 0) {
 68                     /* new fd in the poll list */
 69                     opcode = EPOLL_CTL_ADD;
 70                 }
 71                 else {
 72                     /* fd status changed */
 73                     opcode = EPOLL_CTL_MOD;     
 74                 }
 75 
 76                 /* construct the epoll events based on new state */
 77                 ev.events = 0;
 78                 if (en & FD_EV_POLLED_R)
 79                     ev.events |= EPOLLIN;
 80 
 81                 if (en & FD_EV_POLLED_W)
 82                     ev.events |= EPOLLOUT;
 83 
 84                 ev.data.fd = fd;
 85                 epoll_ctl(epoll_fd, opcode, fd, &ev);
 86             }
 87 
 88             fdtab[fd].spec_e = (en << 4) + en;  /* save new events */
 89 
 90             if (!(en & FD_EV_ACTIVE_RW)) {
 91                 /* This fd doesn't use any active entry anymore, we can
 92                  * kill its entry.
 93                  */
 94                 release_spec_entry(fd);
 95             }
 96             else if ((en & ~eo) & FD_EV_ACTIVE_RW) {
 97                 /* we need a new spec entry now */
 98                 alloc_spec_entry(fd);
 99             }
100                                                             
101         }
102         fdtab[fd].updated = 0;
103         fdtab[fd].new = 0;
104     }
105     fd_nbupdt = 0;

haproxy 就是一个大的循环。每一轮循环,都顺序执行几个不同的功能。其中调用当前 poller 的 poll 方法便是其中一个环节。

55 - 56 行: 获取 fd 更新列表中的每一个 fd。 fd_updt[] 就是前面背景知识中介绍 的。haproxy 运行的不同阶段,都有可能通过调用背景知识中介绍的一些 fd event 设置函数 来更改 fd 的状态,最终会更新 fd_updt[] 和 fd_nbupdt。这里集中处理一下所有需要更新 的 fd。

57 - 58 行: 获取当前 fd 的最新事件,以及保存的上一次的事件。前面提到了,fd 的事 设置仅用 4 个 bit 就可以了。sturct fdtab 的 spec_e 成员是 unsigned char, 8 bit, 低 4 bit 保存 fd 当前最新的事件,高 4 bit 保存上一次的事件。这个做法就是为了判断 fd 的哪些事件上前面的处理中发生了变化,以便于更新。至于 fd 前一次的事件是什么时 后保存的,看后面的分析就知道了。

60 行: 主要判断 fd 记录的事件是否发生了变化。如果没有变化,就直接到 102-103 行 的处理了。这里有个小疑问,还没来及深入分析,就是哪些情况会使 fd 处于更新列表中, 但是 fd 上的事件有没有任何变化。

63 - 74 行:检测 fd 的 epoll operation 是否需要更改,比如ADD/DEL/MOD 等操作。

77 - 85 行:检测 fd 的 epoll events 的设置,并调用 epoll_ctl 设置 op 和 event

88 行:这里就是记录下 fd events 设置的最新状态。高低 4 位记录的结果相同。而在 程序运行过程中,仅修改低 4 位,这样和高 4 位一比较,就知道发生了哪些变化。

90 - 99 行:这里主要根据 fd 的新旧状态,更新 speculative I/O list。这个地方在 haproxy 的大循环中有独立的处理流程,这里不作分析。

102 - 103 行:清除 fd 的 new 和 updated 状态。new 状态通常是在新建一个 fd 时调 用 fd_insert 设置的,这里已经完成了 fd 状态的更新,因此两个成员均清零。

105 行: 整个 update list 都处理完了,fd_nbupdt 清零。haproxy 的其他处理流程会 继续更新 update list。下一次调用 _do_poll() 的时候继续处理。当然,这么说也说是 不全面的,因为接下来的处理流程也会有可能处理 fd 的 update list。但主要的处理还 是这里分析的代码块。

至此,fd 更新列表中的所有 fd 都处理完毕,该设置的也都设置了。下面就需要调用 epoll_wait 获得所有活动的 fd 了。 2.2. 获取活动的 fd

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
107     /* compute the epoll_wait() timeout */
108 
109     if (fd_nbspec || run_queue || signal_queue_len) {
...         ...
115         wait_time = 0;
116     }
117     else {
118         if (!exp)
119             wait_time = MAX_DELAY_MS;
120         else if (tick_is_expired(exp, now_ms))
121             wait_time = 0;
122         else {
123             wait_time = TICKS_TO_MS(tick_remain(now_ms, exp)) + 1;
124             if (wait_time > MAX_DELAY_MS)
125                 wait_time = MAX_DELAY_MS;
126         }
127     }
128 
129     /* now let's wait for polled events */
130 
131     fd = MIN(maxfd, global.tune.maxpollevents);
132     gettimeofday(&before_poll, NULL);
133     status = epoll_wait(epoll_fd, epoll_events, fd, wait_time);
134     tv_update_date(wait_time, status);
135     measure_idle();

107 - 127 行:主要是用来计算调用 epoll_wait 时的 timeout 参数。如果 fd_nbspec 不为 0,或 run_queue 中有任务需要运行,或者信号处理 queue 中有需要处理的,都设置 timeout 为 0,目的是希望 epoll_wait 尽快返回,程序好及时处理其他的任务。

131 - 135 行: 计算当前最多可以处理的 event 数目。这个数目也是可配置的。然后调用 epoll_wait, 所有活动 fd 的信息都保存在 epoll_events[] 数组中。

这部分代码逻辑比较简单,接下来就是处理所有活动的 fd 了。 2.3. 处理活动的 fd

逐一处理活动的 fd。这段代码也可以划分为若干个小代码,分别介绍如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
139     for (count = 0; count < status; count++) {
140         unsigned char n;
141         unsigned char e = epoll_events[count].events;
142         fd = epoll_events[count].data.fd;
143 
144         if (!fdtab[fd].owner)
145             continue;
146 
147         /* it looks complicated but gcc can optimize it away when constants
148          * have same values... In fact it depends on gcc :-(
149          */
150         fdtab[fd].ev &= FD_POLL_STICKY;
151         if (EPOLLIN == FD_POLL_IN && EPOLLOUT == FD_POLL_OUT &&
152             EPOLLPRI == FD_POLL_PRI && EPOLLERR == FD_POLL_ERR &&
153             EPOLLHUP == FD_POLL_HUP) {
154             n = e & (EPOLLIN|EPOLLOUT|EPOLLPRI|EPOLLERR|EPOLLHUP);
155         }
156         else {
157             n = ((e & EPOLLIN ) ? FD_POLL_IN  : 0) |
158                 ((e & EPOLLPRI) ? FD_POLL_PRI : 0) |
159                 ((e & EPOLLOUT) ? FD_POLL_OUT : 0) |
160                 ((e & EPOLLERR) ? FD_POLL_ERR : 0) |
161                 ((e & EPOLLHUP) ? FD_POLL_HUP : 0);
162         }
163 
164         if (!n)
165             continue;
166 
167         fdtab[fd].ev |= n;    
168

139 - 142 行: 从 epoll_events[] 中取出一个活动 fd 及其对应的 event。

150 行: fdtab[fd].ev 仅保留 FD_POLL_STICKY 设置,即 FD_POLL_ERR | FD_POLL_HUP, 代表仅保留 fd 原先 events 设置中的错误以及 hang up 的标记位,不管 epoll_wait 中 是否设置了该 fd 的这两个 events。

151 - 162 行: 这段代码的功能主要就是根据 epoll_wait 返回的 fd 的 events 设置情 况,正确的设置 fdtab[fd].ev。之所以代码还要加上条件判断,是因为 haproxy 自己也 用了一套标记 fd 的 events 的宏定义 FD_POLL_XXX,而 epoll_wait 返回的则是系统中 的 EPOLLXXX。因此,这里就涉及到系统标准的 events 转换到 haproxy 自定义 events 的过程。其中,151-154 行代表 haproxy 自定义的关于 fd 的 events 和系统标准的 完全一致,157-161 行代表 haproxy 自定义的和系统标准的不一致,因此需要一个一个 标记位判断,然后转换成 haproxy 自定义的。

167 行: 将转换后的 events 记录到 fdtab[fd].ev。因此,haproxy 中对于 fd events 的记录,始终是采用 haproxy 自定义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
169         if (fdtab[fd].iocb) {
170             int new_updt, old_updt;
171 
172             /* Mark the events as speculative before processing
173              * them so that if nothing can be done we don't need
174              * to poll again.
175              */
176             if (fdtab[fd].ev & FD_POLL_IN)
177                 fd_ev_set(fd, DIR_RD);
178 
179             if (fdtab[fd].ev & FD_POLL_OUT)
180                 fd_ev_set(fd, DIR_WR);
181 
182             if (fdtab[fd].spec_p) {
183                 /* This fd was already scheduled for being called as a speculative I/O */
184                 continue;
185             }
186 
187             /* Save number of updates to detect creation of new FDs. */
188             old_updt = fd_nbupdt;
189             fdtab[fd].iocb(fd);

169 行: 正常情况下, fdtab[fd] 的 iocb 方法指向 conn_fd_handler,该函数负责处 理 fd 上的 IO 事件。

176 - 180 行: 根据前面设置的 fd 的 events,通过调用 fd_ev_set() 更新 fdtab 结构 的 spec_e 成员。也就是说,在调用 fd_ev_clr() 清理对应 event 之前,就不需要再次设 置 fd 的 event。因为 haproxy 认为仍然需要处理 fd 的 IO。fdtab 的 ev 成员是从 epoll_wait 返回的 events 转换后的结果,而 spec_e 成员则是 haproxy 加入了一些对 fd IO 事件可能性判断的结果。

188 - 189 行: 保存一下当前的 fd update list 的数目,接着调用 fd 的 iocb 方法, 也就是 conn_fd_handler()。之所以要保存当前的 fd update list 数目,是因为 conn_fd_handler() 执行时,如果接受了新的连接,则会有新的 fd 生成,这时也会更新 fd_nbupdt。记录下旧值,就是为了方便知道在 conn_fd_handler 执行之后,有哪些 fd 是新生成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
...             ...
200             for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
201                 fd = fd_updt[new_updt - 1];
202                 if (!fdtab[fd].new)
203                     continue;
204 
205                 fdtab[fd].new = 0;
206                 fdtab[fd].ev &= FD_POLL_STICKY;
207 
208                 if ((fdtab[fd].spec_e & FD_EV_STATUS_R) == FD_EV_ACTIVE_R)
209                     fdtab[fd].ev |= FD_POLL_IN;
210 
211                 if ((fdtab[fd].spec_e & FD_EV_STATUS_W) == FD_EV_ACTIVE_W)
212                     fdtab[fd].ev |= FD_POLL_OUT;
213 
214                 if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
215                     fdtab[fd].iocb(fd);
216 
217                 /* we can remove this update entry if it's the last one and is
218                  * unused, otherwise we don't touch anything.
219                  */
220                 if (new_updt == fd_nbupdt && fdtab[fd].spec_e == 0) {
221                     fdtab[fd].updated = 0;
222                     fd_nbupdt--;
223                 }
224             }
225         }
226     }
227 
228     /* the caller will take care of speculative events */
229 }  

上面这段代码就是执行完毕当前活动 fd 的 iocb 之后,发现有若干个新的 fd 生成,通常 发生在接收新建连接的情况。这种情况,haproxy 认为有必要立即执行这些新的 fd 的 iocb 方法。因为通常一旦客户端新建连接的话,都会尽快发送数据的。这么做就不必等到 下次 epoll_wait 返回之后才处理新的 fd,提高了效率。

至此,haproxy epoll 的事件处理机制粗略分析完毕。这里还有一个 speculative events 的逻辑,本文分析中全都跳过了,随后再完善。

HAProxy 研究笔记 -- HTTP请求处理-2-解析

http://blog.chinaunix.net/uid-10167808-id-3819702.html

本文继续分析 1.5-dev17 中接收到 client 数据之后的处理。

haproxy-1.5-dev17 中接收 client 发送的请求数据流程见文档: HTTP请求处理-1-接收

1. haproxy 主循环的处理流程

主循环处理流程见文档 主循环简介

请求数据的解析工作在主循环 process_runnable_tasks() 中执行。

2. 执行 run queue 中的任务

HTTP请求处理-1-接收 中分析到 session 建立之后,一来会将 session 的 task 放入 runqueue,该 task 会 在下一轮遍历可以运行的 task 中出现,并得到执行。二是立即调用 conn_fd_handler 去 接收 client 发送的数据。

数据接收流程结束后(注意,这并不代表接收到了完整的 client 请求,因为也可能暂时 读取不到 client 的数据退出接收),haproxy 调度执行下一轮循环,调用 process_runnable_tasks() 处理所有在 runqueue 中的 task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void process_runnable_tasks(int *next)
{
	...
	eb = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
	while (max_processed--) {
		...
		t = eb32_entry(eb, struct task, rq);
		eb = eb32_next(eb);
		__task_unlink_rq(t);

		t->state |= TASK_RUNNING;
		/* This is an optimisation to help the processor's branch
		 * predictor take this most common call.
		 */
		t->calls++;
		if (likely(t->process == process_session))
			t = process_session(t);
		else
			t = t->process(t);
		...
	}
}

大多数情况下,task 的 proecss 都指向 process_session() 函数。该函数就是负责解析 已接收到的数据,选择 backend server,以及 session 状态的变化等等。

3. session 的处理:process_session()

下面介绍 process_session() 函数的实现。该函数代码比较庞大,超过一千行,这里仅 介绍与 HTTP 请求处理的逻辑,采用代码块的逻辑介绍。

处理 HTTP 请求的逻辑代码集中在 label resync_request 处。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct task *process_session(struct task *t)
{
	...
 resync_request:
	/* Analyse request */
	if (((s->req->flags & ~rqf_last) & CF_MASK_ANALYSER) ||
		((s->req->flags ^ rqf_last) & CF_MASK_STATIC) ||
		s->si[0].state != rq_prod_last ||
		s->si[1].state != rq_cons_last) {
		unsigned int flags = s->req->flags;

		if (s->req->prod->state >= SI_ST_EST) {
			ana_list = ana_back = s->req->analysers;
			while (ana_list && max_loops--) {
				/* 这段代码中逐一的列举出了所有的 analysers 对应的处理函数
				 * 这里不一一列出,等待下文具体分析
				 */
				...
			}
		}
		rq_prod_last = s->si[0].state;
		rq_cons_last = s->si[1].state;
		s->req->flags &= ~CF_WAKE_ONCE;
		rqf_last = s->req->flags;

		if ((s->req->flags ^ flags) & CF_MASK_STATIC)
			goto resync_request;
	}

首先要判断 s->req->prod->state 的状态是否已经完成建连,根据之前的初始化动作, se->req->prod 指向 s->si[0],即标识与 client 端连接的相关信息。正确建连成功之 后,会更改 si 的状态的,具体代码在 session_complete() 中:

1
2
3
4
s->si[0].state     = s->si[0].prev_state = SI_ST_EST;
...
s->req->prod = &s->si[0];
s->req->cons = &s->si[1];

只有 frontend 连接建立成功,才具备处理 client 发送请求数据的基础。上一篇文章中 已经接收到了 client 发送的数据。这里就是需要根据 s->req->analysers 的值,确定 while 循环中哪些函数处理当前的数据。

补充介绍一下 s->req->analysers 的赋值。 同样是在 session_complete 中初始化的

1
2
/* activate default analysers enabled for this listener */
s->req->analysers = l->analysers;

可见,其直接使用 session 所在的 listener 的 analyser。 listener 中该数值的初始化 是在 check_config_validity() 中完成的:

1
		listener->analysers |= curproxy->fe_req_ana;

而归根结蒂还是来源于 listener 所在的 proxy 上的 fe_req_ana, proxy 上的 fe_req_ana 的初始化同样是在 check_config_validity(),且是在给 listener->analysers 赋值之前

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
	if (curproxy->cap & PR_CAP_FE) {
		if (!curproxy->accept)
			curproxy->accept = frontend_accept;

		if (curproxy->tcp_req.inspect_delay ||
			!LIST_ISEMPTY(&curproxy->tcp_req.inspect_rules))
			curproxy->fe_req_ana |= AN_REQ_INSPECT_FE;

		if (curproxy->mode == PR_MODE_HTTP) {
			curproxy->fe_req_ana |= AN_REQ_WAIT_HTTP | AN_REQ_HTTP_PROCESS_FE;
			curproxy->fe_rsp_ana |= AN_RES_WAIT_HTTP | AN_RES_HTTP_PROCESS_FE;
		}

		/* both TCP and HTTP must check switching rules */
		curproxy->fe_req_ana |= AN_REQ_SWITCHING_RULES;
	}

从上面代码可以看出,一个 HTTP 模式的 proxy,至少有三个标记位会被置位: AN_REQ_WAIT_HTTP, AN_REQ_HTTP_PROCESS_FE, AN_REQ_SWITCHING_RULES。也就是说, s->req->analysers 由以上三个标记置位。那么随后处理 HTTP REQ 的循环中,就要经过 这三个标记位对应的 analyser 的处理。

接着回到 resync_request 标签下的那个 while 循环,就是逐个判断 analysers 的设置, 并调用对应的函数处理。需要启用那些 analysers,是和 haproxy 的配置相对应的。本文 使用最简单的配置,下面仅列出配置所用到的几个处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
		while (ana_list && max_loops--) {
			/* Warning! ensure that analysers are always placed in ascending order! */

			if (ana_list & AN_REQ_INSPECT_FE) {
				if (!tcp_inspect_request(s, s->req, AN_REQ_INSPECT_FE))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_INSPECT_FE);
			}
		
			if (ana_list & AN_REQ_WAIT_HTTP) {
				if (!http_wait_for_request(s, s->req, AN_REQ_WAIT_HTTP))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_WAIT_HTTP);
			}

			if (ana_list & AN_REQ_HTTP_PROCESS_FE) {
				if (!http_process_req_common(s, s->req, AN_REQ_HTTP_PROCESS_FE, s->fe))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_HTTP_PROCESS_FE);
			}

			if (ana_list & AN_REQ_SWITCHING_RULES) {
				if (!process_switching_rules(s, s->req, AN_REQ_SWITCHING_RULES))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_SWITCHING_RULES);
			}
			...
		}

analysers 的处理也是有顺序的。其中处理请求的第一个函数是 tcp_inspect_request()。 该函数主要是在于如果配置了这里先介绍 http_wait_for_request() 函数的实现。 顾名思义,该函数主要是配置中启用 inspect_rules 时,会调用到该函数。否则的话, 处理 HTTP Req 的第一个函数就是 http_wait_for_request().

顾名思义,http_wait_for_request() 该函数分析所解析的 HTTP Requset 不一定是一个 完整的请求。上篇文章分析读取 client 请求数据的实现中,已经提到,只要不能从 socket 读到更多的数据,就会结束数据的接收。一个请求完全完全有可能因为一些异常原因,或者 请求长度本身就比较大而被拆分到不同的 IP 报文中,一次 read 系统调用可能只读取到其 中的一部分内容。因此,该函数会同时分析已经接收到的数据,并确认是否已经接收到了 完整的 HTTP 请求。只有接收到了完整的 HTTP 请求,该函数处理完,才会交给下一个 analyser 处理,否则只能结束请求的处理,等待接收跟多的数据,解析出一个完成的 HTTP 请求才行。

4. 解析接收到的 http 请求数据: http_wait_for_request()

以下是 http_wait_for_request() 的简要分析:

1.调用 http_msg_analyzer,解析 s->req->buf 中新读取到的数据。该函数会按照 HTTP 协议, 解析 HTTP request 和 response 的头部数据,并记录到数据结构 struct http_msg 中。

2.如果开启了 debug,并且已经完整的解析了 header,则 header 内容打印出来

3.尚未读取到完整的 request 的处理,分作以下几种情形处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (unlikely(msg->msg_state < HTTP_MSG_BODY)) {
	/*
	 * First, let's catch bad requests.
	 */

解析到 header 内容中有不符合 HTTP 协议的情形 HTTP_MSG_ERROR,应答 400 bad request 处理
req->buf 满了,甚至加入 maxrewrite 的空间仍然不够用,应答 400 bad request
读取错误 CF_READ_ERROR 发生,比如 client 发送 RST 断开连接, 应答 400 bad request
读取超时,client 超时未发送完整的请求,应答 408 Request Timeout
client 主动关闭,发送 FIN 包,实际上是所谓的 half-close,同样应答 400 bad request
如果以上情况都不满足,则意味着还可以继续尝试读取新数据,设置一下超时

	/* just set the request timeout once at the beginning of the request */
	if (!tick_isset(req->analyse_exp)) {
		if ((msg->msg_state == HTTP_MSG_RQBEFORE) &&
			(txn->flags & TX_WAIT_NEXT_RQ) &&
			tick_isset(s->be->timeout.httpka))
			req->analyse_exp = tick_add(now_ms, s->be->timeout.httpka);
		else
			req->analyse_exp = tick_add_ifset(now_ms, s->be->timeout.httpreq);
	}

根据以上代码,在等待 http request 期间,有两种 timeout 可以设置: 当是http 连接 Keep-Alive 时,并且处理完了头一个请求之后,等待第二个请求期间,设置 httpka 的超 时,超过设定时间不发送新的请求,将会超时;否则,将设置 http 的 request timeout。

因此,在不启用 http ka timeout 时,http request 同时承担起 http ka timeout 的 功能。在有 http ka timeout 时,这两者各自作用的时间段没有重叠。

满足该环节的请求都终止处理,不再继续了。

4.2. 处理完整的 http request

这里处理的都是已经解析到完整 http request header 的情况,并且所有 header 都被 索引化了,便于快速查找。根据已经得到的 header 的信息,设置 session 和 txn 的 相关成员,相当于汇总一下 header 的摘要信息,便于随后处理之用。流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
更新 session 和 proxy 的统计计数
删除 http ka timeout 的超时处理。可能在上一个请求处理完之后,设置了 http ka 的 timeout,因为这里已经得到完整的请求,因此需要停止该 timeout 的处理逻辑
确认 METHOD,并设置 session 的标记位 s->flags |= SN_REDIRECTABLE,只有 GET 和 HEAD 请求可以被重定向
检测 URI 是否是配置的要做 monitor 的 URI,是的话,则执行对应 ACL,并设置应答
检测如果开启 log 功能的话,要给 txn->uri 分配内存,用于记录 URI
检测 HTTP version
	将 0.9 版本的升级为 1.0
	1.1 及其以上的版本都当做 1.1 处理
初始化用于标识 Connection header 的标记位
如果启用了 capture header 配置,调用 capture_headers() 记录下对应的 header
处理 Transfer-Encoding/Content-Length 等 header
最后一步,清理 req->analysers 的标记位 AN_REQ_WAIT_HTTP,因为本函数已经成功处理完毕,可以进行下一个 analyser 的处理了。

至此,http_wait_for_request() 的处理已经结束。

5. 其他对 HTTP 请求的处理逻辑

按照我们前面分析的,随后应该还有两个 analyser 要处理,简单介绍一下:

1
2
3
4
5
6
AN_REQ_HTTP_PROCESS_FE 对应的 http_process_req_common()
	对 frontend 中 req 配置的常见处理,比如 block ACLs, filter, reqadd 等
	设置 Connection mode, 主要是 haproxy 到 server 采用什么连接方式,tunnel 或者 按照 transcation 处理的短连接
AN_REQ_SWITCHING_RULES 对应的 process_switching_rules()
	如果配置了选择 backend 的 rules,比如用 use_backend,则查询规则为 session 分配一个 backend
	处理 persist_rules,一旦设置了 force-persist, 则不管 server 是否 down,都要保证 session 分配给 persistence 中记录的 server。

以上两个函数,不再具体分析。待以后需要时再完善。

至此,client 端 http 请求已经完成解析和相关设置,并且给 session 指定了将来选择 server 所属的 backend。

下一篇文章就分析选择 server 的流程。