kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

信号量内核源码

https://blog.csdn.net/u012603457/article/details/52971894

之前的一片博客介绍了用于Linux内核同步的自旋锁,即使用自旋锁来保护共享资源,今天介绍另外一种Linux内核同步机制——信号量。信号量在内核中的使用非常广泛,用于对各种共享资源的保护。信号量与自旋锁的实现机制是不一样的,用处也是不一样的。首先,自旋锁和信号量都使用了计数器来表示允许同时访问共享资源的最大进程数,但自旋锁的共享计数值是1,也就是说任意时刻只有一个进程在共享代码区运行;信号量却允许使用大于1的共享计数,即共享资源允许被多个不同的进程同时访问,当然,信号量的计数器也能设为1,这时信号量也称为互斥量。其次,自旋锁用于保护短时间能够完成操作的共享资源,使用期间不允许进程睡眠和进程切换;信号量常用于暂时无法获取的共享资源,如果获取失败则进程进入不可中断的睡眠状态,只能由释放资源的进程来唤醒。最后,自旋锁可以用于中断服务程序之中;信号量不能在中断服务程序中使用,因为中断服务程序是不允许进程睡眠的。关于信号量的基本知识已经讲解完毕,接下来看看信号量在内核里面的实现,本文讲解的内核版本是linux-2.6.24。

1 数据结构

1
2
3
4
5
struct semaphore {
	atomic_t count;
	int sleepers;
	wait_queue_head_t wait;
};

信号量使用的数据结构是struct semaphore,包含三个数据成员:count是共享计数值、sleepers是等待当前信号量进入睡眠的进程个数、wait是当前信号量的等待队列。

2 信号量使用

使用信号量之前要进行初始化,其实只是简单的设置共享计数和等待队列,睡眠进程数一开始是0。本文重点讲解信号量的使用和实现。信号量操作的API:

1
2
static inline void down(struct semaphore * sem)//获取信号量,获取失败则进入睡眠状态
static inline void up(struct semaphore * sem)//释放信号量,并唤醒等待队列中的第一个进程

信号量的使用方式如下:

1
2
3
down(sem);
...临界区...
up(sem);

内核保证正在访问临界区的进程数小于或等于初始化的共享计数值,获取信号量失败的进程将进入不可中断的睡眠状态,在信号量的等待队列中进行等待。当进程释放信号量的时候就会唤醒等待队列中的第一个进程。

3 信号量的实现

3.1 down(sem)

首先看函数的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static inline void down(struct semaphore * sem)
{
	might_sleep();
	__asm__ __volatile__(
		"# atomic down operation\n\t"
		LOCK_PREFIX "decl %0\n\t"  /* --sem->count */
		"jns 2f\n"
		"\tlea %0,%%eax\n\t"
		"call __down_failed\n"
		"2:"
		:"+m" (sem->count)
		:
		:"memory","ax");
}

这里面包含了一些汇编代码,%0代表sem->count。也就是说先将sem->count减1,LOCK_PREFIX表示执行这条指令时将总线锁住,保证减1操作是原子的。减1之后如果大于或等于0就转到标号2处执行,也就跳过了down_failed函数直接到函数尾部并返回,成功获取信号量;否则减1之后sem->count小于0则顺序执行后面的down_failed函数。接下来看__down_failed函数的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ENTRY(__down_failed)
	CFI_STARTPROC
	FRAME
	pushl %edx
	CFI_ADJUST_CFA_OFFSET 4
	CFI_REL_OFFSET edx,0
	pushl %ecx
	CFI_ADJUST_CFA_OFFSET 4
	CFI_REL_OFFSET ecx,0
	call __down
	popl %ecx
	CFI_ADJUST_CFA_OFFSET -4
	CFI_RESTORE ecx
	popl %edx
	CFI_ADJUST_CFA_OFFSET -4
	CFI_RESTORE edx
	ENDFRAME
	ret
	CFI_ENDPROC
	END(__down_failed)

pushl和popl是用于保存和恢复寄存器的,CFI前缀的指令用于指令对齐调整。重点在函数__down,下面来看该函数的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
fastcall void __sched __down(struct semaphore * sem)
{
	struct task_struct *tsk = current;
	DECLARE_WAITQUEUE(wait, tsk);
	unsigned long flags;

	tsk->state = TASK_UNINTERRUPTIBLE;
	spin_lock_irqsave(&sem->wait.lock, flags);
	add_wait_queue_exclusive_locked(&sem->wait, &wait);

	sem->sleepers++;
	for (;;) {
		int sleepers = sem->sleepers;

		/*
		 * Add "everybody else" into it. They aren't
		 * playing, because we own the spinlock in
		 * the wait_queue_head.
		 */
		if (!atomic_add_negative(sleepers - 1, &sem->count)) {
			sem->sleepers = 0;
			break;
		}
		sem->sleepers = 1;  /* us - see -1 above */
		spin_unlock_irqrestore(&sem->wait.lock, flags);

		schedule();

		spin_lock_irqsave(&sem->wait.lock, flags);
		tsk->state = TASK_UNINTERRUPTIBLE;
	}
	remove_wait_queue_locked(&sem->wait, &wait);
	wake_up_locked(&sem->wait);
	spin_unlock_irqrestore(&sem->wait.lock, flags);
	tsk->state = TASK_RUNNING;
}

fastcall表示一种快速调用方式,函数的前两个参数由寄存器ecx和edx来传递,其余参数仍使用堆栈传递。首先将进程设为不可中断睡眠状态,即不能通过信号来唤醒,只能是内核亲自唤醒。同时将进程的TASK_EXCLUSIVE标志设为1,则wake_up()只会唤醒等待队列中的第一个进程。然后将睡眠等待数加1,之后进入for循环。函数atomic_add_negative(sleepers - 1, &sem->count)将相当于sem->count += sleepers-1,然后返回sem->count,通过该函数进行信号量获取情况测试,返回结果为0则获取资源,小于0则没有获取。这段代码使用sleepers和sem->count共同表示当前资源的使用情况。进入for循环后有两种情况,一种是atomic_add_negative执行结果为0,即获取了信号量,此时将sleepers设为0并退出循环,同时唤醒等待队列的第一个进程进行信号量获取测试;另一种是没有获取信号量,将sleepers设为1并运行schedule()进入睡眠,被唤醒之后继续执行for循环进行信号量获取测试。

注意,运行完执行一遍for指令后sleepers的值有两种结果,一种是0,一种是1。如果0则表示有一个进程通过了信号量获取的测试,则atomic_add_negative(sleepers - 1, &sem->count)实际上是将sem->count执行了减1操作,这个操作会在下一个进程进行信号量获取测试的时候执行。如果是1则表示进程没有通过信号领获取的测试,则atomic_add_negative(sleepers - 1, &sem->count)操作不会影响sem->count的值。也就是说,当进程进入__down时,sleepers只会有两个值,一个是0,一个是1。0表示之前的进程获取了信号量,1表示之前的进程没有获取信号量。如果之前进程获取了信号量,执行atomic_add_negative(sleepers - 1, &sem->count)时就会将sem->count的值减1;否则sem->count的值将保持不变。但是这个减1操作延迟到了下一个进程的执行期间,考虑到获取信号量之后进程会唤醒等待队列里的第一个进程,这个减1操作应该会很快就得到执行。

细心地小伙伴可能会注意到,首次获取信号量失败的进程不是会执行sem->sleepers++操作吗,这样不就改变了sem->count的值了吗?仔细回想获取信号量的过程,获取失败的时候会执行sem->count–操作的,因此刚好和sem->sleeper++相互呼应,结果就是不会改变sem->count的结果。即只有进程获取信号量后才会对sem->count进行减1操作,这个操作并不是马上执行,而是后续进程进行信号量获取检测的时候进行的

3.2 up(sem)

先看函数定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void up(struct semaphore * sem)
{
	__asm__ __volatile__(
		"# atomic up operation\n\t"
		LOCK_PREFIX "incl %0\n\t"  /* ++sem->count */
		"jg 1f\n\t"
		"lea %0,%%eax\n\t"
		"call __up_wakeup\n"
		"1:"
		:"+m" (sem->count)
		:
		:"memory","ax");
}

首先将sem->count加1,是原子操作,如果加1后sem->count大于0则说明没有进程在等待信号量资源,无须唤醒队列中进程,直接跳转到标号1处返回;否则运行__up_wakeup唤醒等待队列中的进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ENTRY(__up_wakeup)
	CFI_STARTPROC
	FRAME
	pushl %edx
	CFI_ADJUST_CFA_OFFSET 4
	CFI_REL_OFFSET edx,0
	pushl %ecx
	CFI_ADJUST_CFA_OFFSET 4
	CFI_REL_OFFSET ecx,0
	call __up
	popl %ecx
	CFI_ADJUST_CFA_OFFSET -4
	CFI_RESTORE ecx
	popl %edx
	CFI_ADJUST_CFA_OFFSET -4
	CFI_RESTORE edx
	ENDFRAME
	ret
	CFI_ENDPROC
	END(__up_wakeup)

同样,我们只关注函数__up的定义:

1
2
3
4
fastcall void __up(struct semaphore *sem)
{
	wake_up(&sem->wait);
}

可以看到,__up的的工作就是唤醒等待队列中的所有进程,但是由于sem等待队列中的进程 的TASK_EXCLUSIVE标志为 1,因此不会唤醒后续进程了。也就是说up(sem)操作实际上是将sem->count自增1,然后唤醒等待队列中的第一个进程(如果有的话)。 4 小结 信号量作为一种基础的内核同步机制,使用非常广泛。本文基于linux-2.6.24内核版本介绍了信号量使用的数据结构和实现机制,同时介绍了信号量与自旋锁的区别。

进程通信--命令行ipcs,ipcrm

https://www.jb51.net/article/40805.htm

ipcs用法

1
2
3
4
ipcs -a  是默认的输出信息 打印出当前系统中所有的进程间通信方式的信息
ipcs -m  打印出使用共享内存进行进程间通信的信息
ipcs -q  打印出使用消息队列进行进程间通信的信息
ipcs -s  打印出使用信号进行进程间通信的信息

输出格式的控制

1
2
3
4
5
ipcs -t  输出信息的详细变化时间
ipcs -p  输出ipc方式的进程ID
ipcs -c  输出ipc方式的创建者/拥有者
ipcs -c  输出ipc各种方式的在该系统下的限制条件信息
ipcs -u  输出当前系统下ipc各种方式的状态信息(共享内存,消息队列,信号)

ipcrm 命令

移除一个消息对象。或者共享内存段,或者一个信号集,同时会将与ipc对象相关链的数据也一起移除。当然,只有超级管理员,或者ipc对象的创建者才有这项权利啦

ipcrm用法

1
2
3
4
5
6
ipcrm -M shmkey   移除用shmkey创建的共享内存段
ipcrm -m shmid    移除用shmid标识的共享内存段
ipcrm -Q msgkey   移除用msqkey创建的消息队列
ipcrm -q msqid    移除用msqid标识的消息队列
ipcrm -S semkey   移除用semkey创建的信号
ipcrm -s semid    移除用semid标识的信号

进程通信--信号量

https://blog.csdn.net/guoping16/article/details/6584043/

https://www.cnblogs.com/fangshenghui/p/4039946.html

一 为什么要使用信号量

为了防止出现因多个程序同时访问一个共享资源而引发的一系列问题,我们需要一种方法,它可以通过生成并使用令牌来授权,在任一时刻只能有一个执行线程访问 代码的临界区域。临界区域是指执行数据更新的代码需要独占式地执行。而信号量就可以提供这样的一种访问机制,让一个临界区同一时间只有一个线程在访问它, 也就是说信号量是用来调协进程对共享资源的访问的。其中共享内存的使用就要用到信号量。

二 信号量的工作原理

由于信号量只能进行两种操作等待和发送信号,即P(sv)和V(sv),他们的行为是这样的:

P(sv):如果sv的值大于零,就给它减1;如果它的值为零,就挂起该进程的执行

V(sv):如果有其他进程因等待sv而被挂起,就让它恢复运行,如果没有进程因等待sv而挂起,就给它加1.

三 信号量的使用

1、创建信号量

semget函数创建一个信号量集或访问一个已存在的信号量集。

1
2
#include <sys/sem.h>
int semget (key_t key, int nsem, int oflag);

返回值是一个称为信号量标识符的整数,semop和semctl函数将使用它。

参数nsem指定集合中的信号量数。(若用于访问一个已存在的集合,那就可以把该参数指定为0)

参数oflag可以是SEM_R(read)和SEM_A(alter)常值的组合。(打开时用到),也可以是IPC_CREAT或IPC_EXCL;

2、打开信号量

使用semget打开一个信号量集后,对其中一个或多个信号量的操作就使用semop(op–operate)函数来执行。

1
2
#include <sys/sem.h>
int semop (int semid, struct sembuf * opsptr, size_t nops);

参数opsptr是一个指针,它指向一个信号量操作数组,信号量操作由sembuf结构表示:

1
2
3
4
5
6
7
struct sembuf {
	short sem_num;    // 除非使用一组信号量,否则它为0
	short sem_op; // 信号量在一次操作中需要改变的数据,通常是两个数,
			// 一个是-1,即P(等待)操作,一个是+1,即V(发送信号)操作
	short sem_flg;    // 通常为SEM_UNDO,使操作系统跟踪信号,并在进程没有释放该信号量而终止时,
			// 操作系统释放信号量
};

参数nops规定opsptr数组中元素个数。

sem_op值: (1)若sem_op为正,这对应于进程释放占用的资源数。sem_op值加到信号量的值上。(V操作)
(2)若sem_op为负,这表示要获取该信号量控制的资源数。信号量值减去sem_op的绝对值。(P操作)
(3)若sem_op为0,这表示调用进程希望等待到该信号量值变成0

如果信号量值小于sem_op的绝对值(资源不能满足要求),则:
(1)若指定了IPC_NOWAIT,则semop()出错返回EAGAIN。
(2)若未指定IPC_NOWAIT,则信号量的semncnt值加1(因为调用进程将进 入休眠状态),然后调用进程被挂起直至:①此信号量变成大于或等于sem_op的绝对值;②从系统中删除了此信号量,返回EIDRM;③进程捕捉到一个信 号,并从信号处理程序返回,返回EINTR。(与消息队列的阻塞处理方式 很相似)

3、信号量是操作

semctl函数对一个信号量执行各种控制操作。

1
2
#include <sys/sem.h>
int semctl (int semid, int semnum, int cmd, union semun arg);
参数semid 信号量集标识符
参数semnum 信号量集数组上的下标,表示某一个信号量
参数cmd指定以下10种命令中的一种,在semid指定的信号量集合上执行此命令。
1
2
3
4
5
6
7
8
9
10
IPC_STAT    读取一个信号量集的数据结构semid_ds,并将其存储在semun中的buf参数中。
IPC_SET     设置信号量集的数据结构semid_ds中的元素ipc_perm,其值取自semun中的buf参数。
IPC_RMID    将信号量集从内存中删除。
GETALL      用于读取信号量集中的所有信号量的值。
GETNCNT     返回正在等待资源的进程数目。
GETPID      返回最后一个执行semop操作的进程的PID。
GETVAL      返回信号量集中的一个单个的信号量的值。
GETZCNT     返回这在等待完全空闲的资源的进程数目。
SETALL      设置信号量集中的所有的信号量的值。
SETVAL      设置信号量集中的一个单独的信号量的值。
参数 arg
1
2
3
4
5
6
union semun {
	short val;             /* SETVAL用的值 */
	struct semid_ds* buf;  /* IPC_STAT、IPC_SET用的semid_ds结构 */
	unsigned short* array; /* SETALL、GETALL用的数组值 */
	struct seminfo *buf;   /* 为控制IPC_INFO提供的缓存 */
} arg;

四 信号量值的初始化

semget并不初始化各个信号量的值,这个初始化必须通过以SETVAL命令(设置集合中的一个值)或SETALL命令(设置集合中的所有值) 调用semctl来完成。

SystemV信号量的设计中,创建一个信号量集并将它初始化需两次函数调用是一个致命的缺陷。一个不完备的解决方案是:在调用semget时指定IPC_CREAT | IPC_EXCL标志,这样只有一个进程(首先调用semget的那个进程)创建所需信号量,该进程随后初始化该信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <assert.h>
#include <time.h>
#include <unistd.h>
#include <sys/wait.h>

#define MAX_SEMAPHORE 10
#define SEMKEY    1

union semun {
	int val;
	struct semid_ds *buf;
	unsigned short *array;
	struct seminfo *_buf;
} arg;

int main()
{
	key_t key;
	int semid ,ret, i;
	unsigned short buf[MAX_SEMAPHORE];
	struct sembuf sb[MAX_SEMAPHORE];
	pid_t pid;

	semid = semget(SEMKEY, MAX_SEMAPHORE, IPC_CREAT|0666);
	if (semid == -1) {
		fprintf(stderr, "Error in semget: %s\n", strerror(errno));
		return -1;
	}
	printf("Semaphore get, ID is: %d\n",semid);

	for (i = 0; i < MAX_SEMAPHORE; i++)
		buf[i] = 0;

	arg.array = buf;
	ret = semctl(semid, 0, SETALL, arg);
	if (ret == -1) {
		fprintf(stderr, "Error in semctl: %s\n", strerror(errno));
		return -2;
	}
	printf("Semaphore Init!\n");

	pid = fork();
	if (pid < 0) {
		fprintf(stderr, "Create Process Error!: %s\n",strerror(errno));
		return -3;
	}

	if (pid == 0) {
		sleep(5);
		// 子进程产生信号
		printf("  child wake up.\n");
		for (i = 0; i < MAX_SEMAPHORE; i ++) {
			sb[i].sem_num = i;
			sb[i].sem_op = +1;
			sb[i].sem_flg = 0;
		}

		printf("  child start to inc resource.\n");
		ret = semop(semid, sb, 10);
		if (ret == -1) {
			fprintf(stderr, "子进程产生信号失败: %s\n", strerror(errno));
			exit(-1);
		}
		printf("  child exiting successfully.\n");
		exit(0);
	}

	printf("parent wake up....\n");
	// 此时父进程的阻塞,因为初始化为0
	for (i = 0; i < MAX_SEMAPHORE; i++) {
		sb[i].sem_num = i;
		sb[i].sem_op = -1;
		sb[i].sem_flg = 0;
	}
	printf("parent is asking for resource...\n");
	ret = semop(semid, sb, 10); //p()
	if (ret == 0) {
		printf("parent got the resource!\n");
	}

	// 父进程等待子进程退出
	waitpid(pid, NULL, 0);

	ret = semctl(semid, 0, IPC_RMID);
	if (ret == -1) {
		fprintf(stderr, "semaphore删除失败: %s\n", strerror(errno));
		return -4;
	}

	printf("parent exiting.\n");
	return 0;
}

进程通信--共享内存

https://blog.csdn.net/xy913741894/article/details/72571260

https://www.cnblogs.com/fangshenghui/p/4039720.html

1. 共享内存,存在于每个进程的进程地址空间中,通过页表+MMU机制映射为同一块物理内存,因此,它属于每个进程,由于它并不需要系统调用干预和数据复制,它的效率是非常高的,它比我们所学的几种IPC机制(信号量,管道,消息队列)都要快。

2. 既然它性能最好,为什么还需要有其他的IPC机制?直接用它不就好了吗?它虽然很快,但是它不提供同步互斥机制,这样子一来就需要我们程序员来提供,带来了编程的难度。

Linux下相关函数

1、shmget函数

该函数用来创建共享内存,它的原型为:

1
int shmget(key_ t key, size_t size, int shmflg);

第一个参数,与信号量的semget函数一样,程序需要提供一个参数key(非0整数),它有效地为共享内存段命名,shmget函数成功时返回一个与key相关的共享内存标识符(非负整数),用于后续的共享内存函数。调用失败返回-1.

第二个参数,size以字节为单位指定需要共享的内存容量

第三个参数,shmflg是权限标志,它的作用与open函数的mode参数一样,如果要想在key标识的共享内存不存在时,创建它的话,可以与IPC_CREAT做或操作。共享内存的权限标志与文件的读写权限一样,举例来说,0644,它表示允许一个进程创建的共享内存被内存创建者所拥有的进程向共享内存读取和写入数据,同时其他用户创建的进程只能读取共享内存。

2、shmat函数

第一次创建完共享内存时,它还不能被任何进程访问,shmat函数的作用就是用来启动对该共享内存的访问,并把共享内存连接到当前进程的地址空间。它的原型如下:

1
void *shmat(int shm_id, const void *shm_addr, int shmflg);

第一个参数,shm_id是由shmget函数返回的共享内存标识。

第二个参数,shm_addr指定共享内存连接到当前进程中的地址位置,通常为空,表示让系统来选择共享内存的地址。

第三个参数,shm_flg是一组标志位,通常为0。

3、shmdt函数

该函数用于将共享内存从当前进程中分离。注意,将共享内存分离并不是删除它,只是使该共享内存对当前进程不再可用。它的原型如下:

1
int shmdt(const void *shmaddr);

参数shmaddr是shmat函数返回的地址指针,调用成功时返回0,失败时返回-1.

4、shmctl函数

与信号量的semctl函数一样,用来控制共享内存,它的原型如下:

1
int shmctl(int shm_id, int command, struct shmid_ds *buf);

第一个参数,shm_id是shmget函数返回的共享内存标识符。

第二个参数,command是要采取的操作,它可以取下面的三个值 :

IPC_STAT:把shmid_ds结构中的数据设置为共享内存的当前关联值,即用共享内存的当前关联值覆盖shmid_ds的值。
IPC_SET: 如果进程有足够的权限,就把共享内存的当前关联值设置为shmid_ds结构中给出的值
IPC_RMID:删除共享内存段

第三个参数,buf是一个结构指针,它指向共享内存模式和访问权限的结构。

简单互斥机制的样例

确定两个线程的可以用,更多线程需要信号量或互斥锁

shmread.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<sys/shm.h>

#define TEXT_SZ 2048
struct shared_use_st {
	int written;/* 作为一个标志,非0:表示可读,0表示可写 */
	char text[TEXT_SZ];/* 记录写入和读取的文本 */
};

#define MEM_KEY (1234)

int main()
{
	int running = 1; //程序是否继续运行的标志
	void *shm = NULL; //分配的共享内存的原始首地址
	struct shared_use_st *shared;//指向shm
	int shmid; //共享内存标识符
	//创建共享内存
	shmid = shmget((key_t)MEM_KEY, sizeof(struct shared_use_st), 0666|IPC_CREAT);
	if (shmid == -1) {
		fprintf(stderr,"shmget failed\n");
		return -1;
	}
	//将共享内存连接到当前进程的地址空间
	shm = shmat(shmid, 0, 0);
	if (shm == (void*)-1) {
		fprintf(stderr,"shmat failed\n");
		return -2;
	}
	printf("\nMemory attached at shmid=%d shm=%p\n", shmid, shm);
	//设置共享内存
	shared = (struct shared_use_st*)shm;
	shared->written = 0;
	//读取共享内存中的数据
	while (running) {
		//没有进程向共享内存定数据有数据可读取
		while (shared->written == 0) {
			sleep(1); //有其他进程在写数据,不能读取数据
		}
		printf("You wrote: %s", shared->text);
		sleep(rand() % 3);
		//输入了end,退出循环(程序)
		if (strncmp(shared->text, "end", 3) == 0)
			running = 0;
		//读取完数据,设置written使共享内存段可写
		shared->written = 0;
	}
	//把共享内存从当前进程中分离
	if (shmdt(shm) == -1) {
		fprintf(stderr,"shmdt failed\n");
		return -3;
	}
	//删除共享内存
	if (shmctl(shmid, IPC_RMID, 0) == -1) {
		fprintf(stderr,"shmctl(IPC_RMID) failed\n");
		return -4;
	}
	return 0;
}

shmwrite.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<sys/shm.h>

#define TEXT_SZ 2048
struct shared_use_st {
	int written;/* 作为一个标志,非0:表示可读,0表示可写 */
	char text[TEXT_SZ];/* 记录写入和读取的文本 */
};

#define MEM_KEY (1234)

int main()
{
	int running = 1;
	void *shm = NULL;
	struct shared_use_st *shared = NULL;
	char buffer[BUFSIZ +1];//用于保存输入的文本
	int shmid;
	//创建共享内存
	shmid = shmget((key_t)MEM_KEY, sizeof(struct shared_use_st), 0666|IPC_CREAT);
	if (shmid == -1) {
		fprintf(stderr,"shmget failed\n");
		return -1;
	}
	//将共享内存连接到当前进程的地址空间
	shm = shmat(shmid, (void*)0, 0);
	if (shm == (void*)-1) {
		fprintf(stderr,"shmat failed\n");
		return -2;
	}
	printf("Memory attached at shmid=%d shm=%p\n", shmid, shm);
	//设置共享内存
	shared = (struct shared_use_st*)shm;
	//向共享内存中写数据
	while (running) {
		//数据还没有被读取,则等待数据被读取,不能向共享内存中写入文本
		while (shared->written == 1) {
			sleep(1);
			printf("Waiting...\n");
		}
		//向共享内存中写入数据
		printf("Enter some text: ");
		fgets(buffer, BUFSIZ, stdin);
		strncpy(shared->text, buffer, TEXT_SZ);
		//写完数据,设置written使共享内存段可读
		shared->written = 1;
		//输入了end,退出循环(程序)
		if (strncmp(buffer, "end", 3) == 0)
			running = 0;
	}
	//把共享内存从当前进程中分离
	if (shmdt(shm) == -1) {
		fprintf(stderr,"shmdt failed\n");
		return -3;
	}
	return 0;
}

select,poll,epoll内核实现

https://blog.csdn.net/lishenglong666/article/details/45536611

poll/select/epoll的实现都是基于文件提供的poll方法(f_op->poll), 该方法利用poll_table提供的qproc方法向文件内部事件掩码key对应的的一个或多个等待队列(wait_queue_head_t)上添加包含唤醒函数(wait_queue_t.func)的节点(wait_queue_t),并检查文件当前就绪的状态返回给poll的调用者(依赖于文件的实现)。 当文件的状态发生改变时(例如网络数据包到达),文件就会遍历事件对应的等待队列并调用回调函数(wait_queue_t.func)唤醒等待线程。

通常的file.f_ops.poll实现及相关结构体如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
struct file {  
	const struct file_operations    *f_op;  
	spinlock_t          f_lock;  
	// 文件内部实现细节  
	void               *private_data;  
#ifdef CONFIG_EPOLL  
	/* Used by fs/eventpoll.c to link all the hooks to this file */  
	struct list_head    f_ep_links;  
	struct list_head    f_tfile_llink;  
#endif /* #ifdef CONFIG_EPOLL */  
	// 其他细节....  
};  
  
// 文件操作  
struct file_operations {  
	// 文件提供给poll/select/epoll  
	// 获取文件当前状态, 以及就绪通知接口函数  
	unsigned int (*poll) (struct file *, struct poll_table_struct *);  
	// 其他方法read/write 等... ...  
};  
  
// 通常的file.f_ops.poll 方法的实现  
unsigned int file_f_op_poll (struct file *filp, struct poll_table_struct *wait)  
{  
	unsigned int mask = 0;  
	wait_queue_head_t * wait_queue;  
  
	//1. 根据事件掩码wait->key_和文件实现filep->private_data 取得事件掩码对应的一个或多个wait queue head  
	some_code();  
  
	// 2. 调用poll_wait 向获得的wait queue head 添加节点  
	poll_wait(filp, wait_queue, wait);  
  
	// 3. 取得当前就绪状态保存到mask  
	some_code();  
  
	return mask;  
}  
  
// select/poll/epoll 向文件注册就绪后回调节点的接口结构  
typedef struct poll_table_struct {  
	// 向wait_queue_head 添加回调节点(wait_queue_t)的接口函数  
	poll_queue_proc _qproc;  
	// 关注的事件掩码, 文件的实现利用此掩码将等待队列传递给_qproc  
	unsigned long   _key;  
} poll_table;  
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);  
  
  
// 通用的poll_wait 函数, 文件的f_ops->poll 通常会调用此函数  
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)  
{  
	if (p && p->_qproc && wait_address) {  
		// 调用_qproc 在wait_address 上添加节点和回调函数  
		// 调用 poll_table_struct 上的函数指针向wait_address添加节点, 并设置节点的func  
		// (如果是select或poll 则是 __pollwait, 如果是 epoll 则是 ep_ptable_queue_proc),  
		p->_qproc(filp, wait_address, p);  
	}  
}  
  
  
// wait_queue 头节点  
typedef struct __wait_queue_head wait_queue_head_t;  
struct __wait_queue_head {  
	spinlock_t lock;  
	struct list_head task_list;  
};  
  
// wait_queue 节点  
typedef struct __wait_queue wait_queue_t;  
struct __wait_queue {  
	unsigned int flags;  
#define WQ_FLAG_EXCLUSIVE   0x01  
	void *private;  
	wait_queue_func_t func;  
	struct list_head task_list;  
};  
typedef int (*wait_queue_func_t)(wait_queue_t *wait, unsigned mode, int flags, void *key);  
  
  
// 当文件的状态发生改变时, 文件会调用此函数,此函数通过调用wait_queue_t.func通知poll的调用者  
// 其中key是文件当前的事件掩码  
void __wake_up(wait_queue_head_t *q, unsigned int mode,  
			   int nr_exclusive, void *key)  
{  
	unsigned long flags;  
  
	spin_lock_irqsave(&q->lock, flags);  
	__wake_up_common(q, mode, nr_exclusive, 0, key);  
	spin_unlock_irqrestore(&q->lock, flags);  
}  
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,  
							 int nr_exclusive, int wake_flags, void *key)  
{  
	wait_queue_t *curr, *next;  
	// 遍历并调用func 唤醒, 通常func会唤醒调用poll的线程  
	list_for_each_entry_safe(curr, next, &q->task_list, task_list) {  
		unsigned flags = curr->flags;  
  
		if (curr->func(curr, mode, wake_flags, key) &&  
				(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) {  
			break;  
		}  
	}  
}  

poll 和 select

poll和select的实现基本上是一致的,只是传递参数有所不同,他们的基本流程如下:

  1. 复制用户数据到内核空间

  2. 估计超时时间

  3. 遍历每个文件并调用f_op->poll 取得文件当前就绪状态, 如果前面遍历的文件都没有就绪,向文件插入wait_queue节点

  4. 遍历完成后检查状态:

a). 如果已经有就绪的文件转到5;

b). 如果有信号产生,重启poll或select(转到 1或3);

c). 否则挂起进程等待超时或唤醒,超时或被唤醒后再次遍历所有文件取得每个文件的就绪状态

  1. 将所有文件的就绪状态复制到用户空间

  2. 清理申请的资源

关键结构体

下面是poll/select共用的结构体及其相关功能:

poll_wqueues 是 select/poll 对poll_table接口的具体化实现,其中的table, inline_index和inline_entries都是为了管理内存。 poll_table_entry 与一个文件相关联,用于管理插入到文件的wait_queue节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// select/poll 对poll_table的具体化实现  
struct poll_wqueues {  
	poll_table pt;  
	struct poll_table_page *table;     // 如果inline_entries 空间不足, 从poll_table_page 中分配  
	struct task_struct *polling_task;  // 调用poll 或select 的进程  
	int triggered;                     // 已触发标记  
	int error;  
	int inline_index;                  // 下一个要分配的inline_entrie 索引  
	struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];//  
};  
// 帮助管理select/poll  申请的内存  
struct poll_table_page {  
	struct poll_table_page  * next;       // 下一个 page  
	struct poll_table_entry * entry;      // 指向第一个entries  
	struct poll_table_entry entries[0];  
};  
// 与一个正在poll /select 的文件相关联,  
struct poll_table_entry {  
	struct file *filp;               // 在poll/select中的文件  
	unsigned long key;  
	wait_queue_t wait;               // 插入到wait_queue_head_t 的节点  
	wait_queue_head_t *wait_address; // 文件上的wait_queue_head_t 地址  
};  

公共函数

下面是poll/select公用的一些函数,这些函数实现了poll和select的核心功能。

poll_initwait 用于初始化poll_wqueues,

__pollwait 实现了向文件中添加回调节点的逻辑,

pollwake 当文件状态发生改变时,由文件调用,用来唤醒线程,

poll_get_entry,free_poll_entry,poll_freewait用来申请释放poll_table_entry 占用的内存,并负责释放文件上的wait_queue节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// poll_wqueues 的初始化:  
// 初始化 poll_wqueues , __pollwait会在文件就绪时被调用  
void poll_initwait(struct poll_wqueues *pwq)  
{  
	// 初始化poll_table, 相当于调用基类的构造函数  
	init_poll_funcptr(&pwq->pt, __pollwait);  
	/* 
	 * static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc) 
	 * { 
	 *     pt->_qproc = qproc; 
	 *     pt->_key   = ~0UL; 
	 * } 
	 */  
	pwq->polling_task = current;  
	pwq->triggered = 0;  
	pwq->error = 0;  
	pwq->table = NULL;  
	pwq->inline_index = 0;  
}  
  
  
// wait_queue设置函数  
// poll/select 向文件wait_queue中添加节点的方法  
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,  
					   poll_table *p)  
{  
	struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);  
	struct poll_table_entry *entry = poll_get_entry(pwq);  
	if (!entry) {  
		return;  
	}  
	get_file(filp); //put_file() in free_poll_entry()  
	entry->filp = filp;  
	entry->wait_address = wait_address; // 等待队列头  
	entry->key = p->key;  
	// 设置回调为 pollwake  
	init_waitqueue_func_entry(&entry->wait, pollwake);  
	entry->wait.private = pwq;  
	// 添加到等待队列  
	add_wait_queue(wait_address, &entry->wait);  
}  
  
// 在等待队列(wait_queue_t)上回调函数(func)  
// 文件就绪后被调用,唤醒调用进程,其中key是文件提供的当前状态掩码  
static int pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)  
{  
	struct poll_table_entry *entry;  
	// 取得文件对应的poll_table_entry  
	entry = container_of(wait, struct poll_table_entry, wait);  
	// 过滤不关注的事件  
	if (key && !((unsigned long)key & entry->key)) {  
		return 0;  
	}  
	// 唤醒  
	return __pollwake(wait, mode, sync, key);  
}  
static int __pollwake(wait_queue_t *wait, unsigned mode, int sync, void *key)  
{  
	struct poll_wqueues *pwq = wait->private;  
	// 将调用进程 pwq->polling_task 关联到 dummy_wait  
	DECLARE_WAITQUEUE(dummy_wait, pwq->polling_task);  
	smp_wmb();  
	pwq->triggered = 1;// 标记为已触发  
	// 唤醒调用进程  
	return default_wake_function(&dummy_wait, mode, sync, key);  
}  
  
// 默认的唤醒函数,poll/select 设置的回调函数会调用此函数唤醒  
// 直接唤醒等待队列上的线程,即将线程移到运行队列(rq)  
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,  
						  void *key)  
{  
	// 这个函数比较复杂, 这里就不具体分析了  
	return try_to_wake_up(curr->private, mode, wake_flags);  
}  

poll,select对poll_table_entry的申请和释放采用的是类似内存池的管理方式,先使用预分配的空间,预分配的空间不足时,分配一个内存页,使用内存页上的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 分配或使用已先前申请的 poll_table_entry,  
static struct poll_table_entry *poll_get_entry(struct poll_wqueues *p) {  
	struct poll_table_page *table = p->table;  
  
	if (p->inline_index < N_INLINE_POLL_ENTRIES) {  
		return p->inline_entries + p->inline_index++;  
	}  
  
	if (!table || POLL_TABLE_FULL(table)) {  
		struct poll_table_page *new_table;  
		new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);  
		if (!new_table) {  
			p->error = -ENOMEM;  
			return NULL;  
		}  
		new_table->entry = new_table->entries;  
		new_table->next = table;  
		p->table = new_table;  
		table = new_table;  
	}  
	return table->entry++;  
}  
  
// 清理poll_wqueues 占用的资源  
void poll_freewait(struct poll_wqueues *pwq)  
{  
	struct poll_table_page * p = pwq->table;  
	// 遍历所有已分配的inline poll_table_entry  
	int i;  
	for (i = 0; i < pwq->inline_index; i++) {  
		free_poll_entry(pwq->inline_entries + i);  
	}  
	// 遍历在poll_table_page上分配的inline poll_table_entry  
	// 并释放poll_table_page  
	while (p) {  
		struct poll_table_entry * entry;  
		struct poll_table_page *old;  
		entry = p->entry;  
		do {  
			entry--;  
			free_poll_entry(entry);  
		} while (entry > p->entries);  
		old = p;  
		p = p->next;  
		free_page((unsigned long) old);  
	}  
}  
static void free_poll_entry(struct poll_table_entry *entry)  
{  
	// 从等待队列中删除, 释放文件引用计数  
	remove_wait_queue(entry->wait_address, &entry->wait);  
	fput(entry->filp);  
}  

poll/select核心结构关系

下图是 poll/select 实现公共部分的关系图,包含了与文件直接的关系,以及函数之间的依赖。

poll的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
// poll 使用的结构体  
struct pollfd {  
	int fd;        // 描述符  
	short events;  // 关注的事件掩码  
	short revents; // 返回的事件掩码  
};  
// long sys_poll(struct pollfd *ufds, unsigned int nfds, long timeout_msecs)  
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,  
				long, timeout_msecs)  
{  
	struct timespec end_time, *to = NULL;  
	int ret;  
	if (timeout_msecs >= 0) {  
		to = &end_time;  
		// 将相对超时时间msec 转化为绝对时间  
		poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,  
								NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));  
	}  
	// do sys poll  
	ret = do_sys_poll(ufds, nfds, to);  
	// do_sys_poll 被信号中断, 重新调用, 对使用者来说 poll 是不会被信号中断的.  
	if (ret == -EINTR) {  
		struct restart_block *restart_block;  
		restart_block = ¤t_thread_info()->restart_block;  
		restart_block->fn = do_restart_poll; // 设置重启的函数  
		restart_block->poll.ufds = ufds;  
		restart_block->poll.nfds = nfds;  
		if (timeout_msecs >= 0) {  
			restart_block->poll.tv_sec = end_time.tv_sec;  
			restart_block->poll.tv_nsec = end_time.tv_nsec;  
			restart_block->poll.has_timeout = 1;  
		} else {  
			restart_block->poll.has_timeout = 0;  
		}  
		// ERESTART_RESTARTBLOCK 不会返回给用户进程,  
		// 而是会被系统捕获, 然后调用 do_restart_poll,  
		ret = -ERESTART_RESTARTBLOCK;  
	}  
	return ret;  
}  
int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,  
				struct timespec *end_time)  
{  
	struct poll_wqueues table;  
	int err = -EFAULT, fdcount, len, size;  
	/* 首先使用栈上的空间,节约内存,加速访问 */  
	long stack_pps[POLL_STACK_ALLOC/sizeof(long)];  
	struct poll_list *const head = (struct poll_list *)stack_pps;  
	struct poll_list *walk = head;  
	unsigned long todo = nfds;  
	if (nfds > rlimit(RLIMIT_NOFILE)) {  
		// 文件描述符数量超过当前进程限制  
		return -EINVAL;  
	}  
	// 复制用户空间数据到内核  
	len = min_t(unsigned int, nfds, N_STACK_PPS);  
	for (;;) {  
		walk->next = NULL;  
		walk->len = len;  
		if (!len) {  
			break;  
		}  
		// 复制到当前的 entries  
		if (copy_from_user(walk->entries, ufds + nfds-todo,  
						   sizeof(struct pollfd) * walk->len)) {  
			goto out_fds;  
		}  
		todo -= walk->len;  
		if (!todo) {  
			break;  
		}  
		// 栈上空间不足,在堆上申请剩余部分  
		len = min(todo, POLLFD_PER_PAGE);  
		size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;  
		walk = walk->next = kmalloc(size, GFP_KERNEL);  
		if (!walk) {  
			err = -ENOMEM;  
			goto out_fds;  
		}  
	}  
	// 初始化 poll_wqueues 结构, 设置函数指针_qproc  为__pollwait  
	poll_initwait(&table);  
	// poll  
	fdcount = do_poll(nfds, head, &table, end_time);  
	// 从文件wait queue 中移除对应的节点, 释放entry.  
	poll_freewait(&table);  
	// 复制结果到用户空间  
	for (walk = head; walk; walk = walk->next) {  
		struct pollfd *fds = walk->entries;  
		int j;  
		for (j = 0; j < len; j++, ufds++)  
			if (__put_user(fds[j].revents, &ufds->revents)) {  
				goto out_fds;  
			}  
	}  
	err = fdcount;  
out_fds:  
	// 释放申请的内存  
	walk = head->next;  
	while (walk) {  
		struct poll_list *pos = walk;  
		walk = walk->next;  
		kfree(pos);  
	}  
	return err;  
}  
// 真正的处理函数  
static int do_poll(unsigned int nfds,  struct poll_list *list,  
				   struct poll_wqueues *wait, struct timespec *end_time)  
{  
	poll_table* pt = &wait->pt;  
	ktime_t expire, *to = NULL;  
	int timed_out = 0, count = 0;  
	unsigned long slack = 0;  
	if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {  
		// 已经超时,直接遍历所有文件描述符, 然后返回  
		pt = NULL;  
		timed_out = 1;  
	}  
	if (end_time && !timed_out) {  
		// 估计进程等待时间,纳秒  
		slack = select_estimate_accuracy(end_time);  
	}  
	// 遍历文件,为每个文件的等待队列添加唤醒函数(pollwake)  
	for (;;) {  
		struct poll_list *walk;  
		for (walk = list; walk != NULL; walk = walk->next) {  
			struct pollfd * pfd, * pfd_end;  
			pfd = walk->entries;  
			pfd_end = pfd + walk->len;  
			for (; pfd != pfd_end; pfd++) {  
				// do_pollfd 会向文件对应的wait queue 中添加节点  
				// 和回调函数(如果 pt 不为空)  
				// 并检查当前文件状态并设置返回的掩码  
				if (do_pollfd(pfd, pt)) {  
					// 该文件已经准备好了.  
					// 不需要向后面文件的wait queue 中添加唤醒函数了.  
					count++;  
					pt = NULL;  
				}  
			}  
		}  
		// 下次循环的时候不需要向文件的wait queue 中添加节点,  
		// 因为前面的循环已经把该添加的都添加了  
		pt = NULL;  
  
		// 第一次遍历没有发现ready的文件  
		if (!count) {  
			count = wait->error;  
			// 有信号产生  
			if (signal_pending(current)) {  
				count = -EINTR;  
			}  
		}  
  
		// 有ready的文件或已经超时  
		if (count || timed_out) {  
			break;  
		}  
		// 转换为内核时间  
		if (end_time && !to) {  
			expire = timespec_to_ktime(*end_time);  
			to = &expire;  
		}  
		// 等待事件就绪, 如果有事件发生或超时,就再循  
		// 环一遍,取得事件状态掩码并计数,  
		// 注意此次循环中, 文件 wait queue 中的节点依然存在  
		if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) {  
			timed_out = 1;  
		}  
	}  
	return count;  
}  
  
  
static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait)  
{  
	unsigned int mask;  
	int fd;  
	mask = 0;  
	fd = pollfd->fd;  
	if (fd >= 0) {  
		int fput_needed;  
		struct file * file;  
		// 取得fd对应的文件结构体  
		file = fget_light(fd, &fput_needed);  
		mask = POLLNVAL;  
		if (file != NULL) {  
			// 如果没有 f_op 或 f_op->poll 则认为文件始终处于就绪状态.  
			mask = DEFAULT_POLLMASK;  
			if (file->f_op && file->f_op->poll) {  
				if (pwait) {  
					// 设置关注的事件掩码  
					pwait->key = pollfd->events | POLLERR | POLLHUP;  
				}  
				// 注册回调函数,并返回当前就绪状态,就绪后会调用pollwake  
				mask = file->f_op->poll(file, pwait);  
			}  
			mask &= pollfd->events | POLLERR | POLLHUP; // 移除不需要的状态掩码  
			fput_light(file, fput_needed);// 释放文件  
		}  
	}  
	pollfd->revents = mask; // 更新事件状态  
	return mask;  
}  
  
  
static long do_restart_poll(struct restart_block *restart_block)  
{  
	struct pollfd __user *ufds = restart_block->poll.ufds;  
	int nfds = restart_block->poll.nfds;  
	struct timespec *to = NULL, end_time;  
	int ret;  
	if (restart_block->poll.has_timeout) {  
		// 获取先前的超时时间  
		end_time.tv_sec = restart_block->poll.tv_sec;  
		end_time.tv_nsec = restart_block->poll.tv_nsec;  
		to = &end_time;  
	}  
	ret = do_sys_poll(ufds, nfds, to); // 重新调用 do_sys_poll  
	if (ret == -EINTR) {  
		// 又被信号中断了, 再次重启  
		restart_block->fn = do_restart_poll;  
		ret = -ERESTART_RESTARTBLOCK;  
	}  
	return ret;  
}  

select 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
typedef struct {  
	unsigned long *in, *out, *ex;  
	unsigned long *res_in, *res_out, *res_ex;  
} fd_set_bits;  
//  long sys_select(int n, fd_set *inp, fd_set *outp, fd_set *exp, struct timeval *tvp)  
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,  
				fd_set __user *, exp, struct timeval __user *, tvp)  
{  
	struct timespec end_time, *to = NULL;  
	struct timeval tv;  
	int ret;  
	if (tvp) {  
		if (copy_from_user(&tv, tvp, sizeof(tv))) {  
			return -EFAULT;  
		}  
		// 计算超时时间  
		to = &end_time;  
		if (poll_select_set_timeout(to,  
									tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),  
									(tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC)) {  
			return -EINVAL;  
		}  
	}  
	ret = core_sys_select(n, inp, outp, exp, to);  
	// 复制剩余时间到用户空间  
	ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);  
	return ret;  
}  
  
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,  
					fd_set __user *exp, struct timespec *end_time)  
{  
	fd_set_bits fds;  
	void *bits;  
	int ret, max_fds;  
	unsigned int size;  
	struct fdtable *fdt;  
	//小对象使用栈上的空间,节约内存, 加快访问速度  
	long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];  
  
	ret = -EINVAL;  
	if (n < 0) {  
		goto out_nofds;  
	}  
  
	rcu_read_lock();  
	// 取得进程对应的 fdtable  
	fdt = files_fdtable(current->files);  
	max_fds = fdt->max_fds;  
	rcu_read_unlock();  
	if (n > max_fds) {  
		n = max_fds;  
	}  
  
	size = FDS_BYTES(n);  
	bits = stack_fds;  
	if (size > sizeof(stack_fds) / 6) {  
		// 栈上的空间不够, 申请内存, 全部使用堆上的空间  
		ret = -ENOMEM;  
		bits = kmalloc(6 * size, GFP_KERNEL);  
		if (!bits) {  
			goto out_nofds;  
		}  
	}  
	fds.in     = bits;  
	fds.out    = bits +   size;  
	fds.ex     = bits + 2*size;  
	fds.res_in  = bits + 3*size;  
	fds.res_out = bits + 4*size;  
	fds.res_ex  = bits + 5*size;  
  
	// 复制用户空间到内核  
	if ((ret = get_fd_set(n, inp, fds.in)) ||  
			(ret = get_fd_set(n, outp, fds.out)) ||  
			(ret = get_fd_set(n, exp, fds.ex))) {  
		goto out;  
	}  
	// 初始化fd set  
	zero_fd_set(n, fds.res_in);  
	zero_fd_set(n, fds.res_out);  
	zero_fd_set(n, fds.res_ex);  
  
	ret = do_select(n, &fds, end_time);  
  
	if (ret < 0) {  
		goto out;  
	}  
	if (!ret) {  
		// 该返回值会被系统捕获, 并以同样的参数重新调用sys_select()  
		ret = -ERESTARTNOHAND;  
		if (signal_pending(current)) {  
			goto out;  
		}  
		ret = 0;  
	}  
  
	// 复制到用户空间  
	if (set_fd_set(n, inp, fds.res_in) ||  
			set_fd_set(n, outp, fds.res_out) ||  
			set_fd_set(n, exp, fds.res_ex)) {  
		ret = -EFAULT;  
	}  
  
out:  
	if (bits != stack_fds) {  
		kfree(bits);  
	}  
out_nofds:  
	return ret;  
}  
  
int do_select(int n, fd_set_bits *fds, struct timespec *end_time)  
{  
	ktime_t expire, *to = NULL;  
	struct poll_wqueues table;  
	poll_table *wait;  
	int retval, i, timed_out = 0;  
	unsigned long slack = 0;  
  
	rcu_read_lock();  
	// 检查fds中fd的有效性, 并获取当前最大的fd  
	retval = max_select_fd(n, fds);  
	rcu_read_unlock();  
  
	if (retval < 0) {  
		return retval;  
	}  
	n = retval;  
  
	// 初始化 poll_wqueues 结构, 设置函数指针_qproc    为__pollwait  
	poll_initwait(&table);  
	wait = &table.pt;  
	if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {  
		wait = NULL;  
		timed_out = 1;  
	}  
  
	if (end_time && !timed_out) {  
		// 估计需要等待的时间.  
		slack = select_estimate_accuracy(end_time);  
	}  
  
	retval = 0;  
	for (;;) {  
		unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;  
  
		inp = fds->in;  
		outp = fds->out;  
		exp = fds->ex;  
		rinp = fds->res_in;  
		routp = fds->res_out;  
		rexp = fds->res_ex;  
		// 遍历所有的描述符, i 文件描述符  
		for (i = 0; i < n; ++rinp, ++routp, ++rexp) {  
			unsigned long in, out, ex, all_bits, bit = 1, mask, j;  
			unsigned long res_in = 0, res_out = 0, res_ex = 0;  
			const struct file_operations *f_op = NULL;  
			struct file *file = NULL;  
			// 检查当前的 slot 中的描述符  
			in = *inp++;  
			out = *outp++;  
			ex = *exp++;  
			all_bits = in | out | ex;  
			if (all_bits == 0) { // 没有需要监听的描述符, 下一个slot  
				i += __NFDBITS;  
				continue;  
			}  
  
			for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {  
				int fput_needed;  
				if (i >= n) {  
					break;  
				}  
				// 不需要监听描述符 i  
				if (!(bit & all_bits)) {  
					continue;  
				}  
				// 取得文件结构  
				file = fget_light(i, &fput_needed);  
				if (file) {  
					f_op = file->f_op;  
					// 没有 f_op 的话就认为一直处于就绪状态  
					mask = DEFAULT_POLLMASK;  
					if (f_op && f_op->poll) {  
						// 设置等待事件的掩码  
						wait_key_set(wait, in, out, bit);  
						/* 
						static inline void wait_key_set(poll_table *wait, unsigned long in, 
						unsigned long out, unsigned long bit) 
						{ 
						wait->_key = POLLEX_SET;// (POLLPRI) 
						if (in & bit) 
						wait->_key |= POLLIN_SET;//(POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR) 
						if (out & bit) 
						wait->_key |= POLLOUT_SET;//POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR) 
						} 
						*/  
						// 获取当前的就绪状态, 并添加到文件的对应等待队列中  
						mask = (*f_op->poll)(file, wait);  
						// 和poll完全一样  
					}  
					fput_light(file, fput_needed);  
					// 释放文件  
					// 检查文件 i 是否已有事件就绪,  
					if ((mask & POLLIN_SET) && (in & bit)) {  
						res_in |= bit;  
						retval++;  
						// 如果已有就绪事件就不再向其他文件的  
						// 等待队列中添加回调函数  
						wait = NULL;  
					}  
					if ((mask & POLLOUT_SET) && (out & bit)) {  
						res_out |= bit;  
						retval++;  
						wait = NULL;  
					}  
					if ((mask & POLLEX_SET) && (ex & bit)) {  
						res_ex |= bit;  
						retval++;  
						wait = NULL;  
					}  
				}  
			}  
			if (res_in) {  
				*rinp = res_in;  
			}  
			if (res_out) {  
				*routp = res_out;  
			}  
			if (res_ex) {  
				*rexp = res_ex;  
			}  
			cond_resched();  
		}  
		wait = NULL; // 该添加回调函数的都已经添加了  
		if (retval || timed_out || signal_pending(current)) {  
			break;   // 信号发生,监听事件就绪或超时  
		}  
		if (table.error) {  
			retval = table.error; // 产生错误了  
			break;  
		}  
		// 转换到内核时间  
		if (end_time && !to) {  
			expire = timespec_to_ktime(*end_time);  
			to = &expire;  
		}  
		// 等待直到超时, 或由回调函数唤醒, 超时后会再次遍历文件描述符  
		if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,  
								   to, slack)) {  
			timed_out = 1;  
		}  
	}  
  
	poll_freewait(&table);  
  
	return retval;  
}  

epoll实现

epoll 的实现比poll/select 复杂一些,这是因为:

  1. epoll_wait, epoll_ctl 的调用完全独立开来,内核需要锁机制对这些操作进行保护,并且需要持久的维护添加到epoll的文件
  2. epoll本身也是文件,也可以被poll/select/epoll监视,这可能导致epoll之间循环唤醒的问题
  3. 单个文件的状态改变可能唤醒过多监听在其上的epoll,产生唤醒风暴

epoll各个功能的实现要非常小心面对这些问题,使得复杂度大大增加。

epoll的核心数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// epoll的核心实现对应于一个epoll描述符  
struct eventpoll {  
	spinlock_t lock;  
	struct mutex mtx;  
	wait_queue_head_t wq; // sys_epoll_wait() 等待在这里  
	// f_op->poll()  使用的, 被其他事件通知机制利用的wait_address  
	wait_queue_head_t poll_wait;  
	/* 已就绪的需要检查的epitem 列表 */  
	struct list_head rdllist;  
	/* 保存所有加入到当前epoll的文件对应的epitem*/  
	struct rb_root rbr;  
	// 当正在向用户空间复制数据时, 产生的可用文件  
	struct epitem *ovflist;  
	/* The user that created the eventpoll descriptor */  
	struct user_struct *user;  
	struct file *file;  
	/*优化循环检查,避免循环检查中重复的遍历 */  
	int visited;  
	struct list_head visited_list_link;  
}  
  
// 对应于一个加入到epoll的文件  
struct epitem {  
	// 挂载到eventpoll 的红黑树节点  
	struct rb_node rbn;  
	// 挂载到eventpoll.rdllist 的节点  
	struct list_head rdllink;  
	// 连接到ovflist 的指针  
	struct epitem *next;  
	/* 文件描述符信息fd + file, 红黑树的key */  
	struct epoll_filefd ffd;  
	/* Number of active wait queue attached to poll operations */  
	int nwait;  
	// 当前文件的等待队列(eppoll_entry)列表  
	// 同一个文件上可能会监视多种事件,  
	// 这些事件可能属于不同的wait_queue中  
	// (取决于对应文件类型的实现),  
	// 所以需要使用链表  
	struct list_head pwqlist;  
	// 当前epitem 的所有者  
	struct eventpoll *ep;  
	/* List header used to link this item to the "struct file" items list */  
	struct list_head fllink;  
	/* epoll_ctl 传入的用户数据 */  
	struct epoll_event event;  
};  
  
struct epoll_filefd {  
	struct file *file;  
	int fd;  
};  
  
// 与一个文件上的一个wait_queue_head 相关联,因为同一文件可能有多个等待的事件,这些事件可能使用不同的等待队列  
struct eppoll_entry {  
	// List struct epitem.pwqlist  
	struct list_head llink;  
	// 所有者  
	struct epitem *base;  
	// 添加到wait_queue 中的节点  
	wait_queue_t wait;  
	// 文件wait_queue 头  
	wait_queue_head_t *whead;  
};  
  
// 用户使用的epoll_event  
struct epoll_event {  
	__u32 events;  
	__u64 data;  
} EPOLL_PACKED;  

文件系统初始化和epoll_create

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// epoll 文件系统的相关实现  
// epoll 文件系统初始化, 在系统启动时会调用  
  
static int __init eventpoll_init(void)  
{  
	struct sysinfo si;  
  
	si_meminfo(&si);  
	// 限制可添加到epoll的最多的描述符数量  
  
	max_user_watches = (((si.totalram - si.totalhigh) / 25) << PAGE_SHIFT) /  
					   EP_ITEM_COST;  
	BUG_ON(max_user_watches < 0);  
  
	// 初始化递归检查队列  
   ep_nested_calls_init(&poll_loop_ncalls);  
	ep_nested_calls_init(&poll_safewake_ncalls);  
	ep_nested_calls_init(&poll_readywalk_ncalls);  
	// epoll 使用的slab分配器分别用来分配epitem和eppoll_entry  
	epi_cache = kmem_cache_create("eventpoll_epi", sizeof(struct epitem),  
								  0, SLAB_HWCACHE_ALIGN | SLAB_PANIC, NULL);  
	pwq_cache = kmem_cache_create("eventpoll_pwq",  
								  sizeof(struct eppoll_entry), 0, SLAB_PANIC, NULL);  
  
	return 0;  
}  
  
  
SYSCALL_DEFINE1(epoll_create, int, size)  
{  
	if (size <= 0) {  
		return -EINVAL;  
	}  
  
	return sys_epoll_create1(0);  
}  
  
SYSCALL_DEFINE1(epoll_create1, int, flags)  
{  
	int error, fd;  
	struct eventpoll *ep = NULL;  
	struct file *file;  
  
	/* Check the EPOLL_* constant for consistency.  */  
	BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);  
  
	if (flags & ~EPOLL_CLOEXEC) {  
		return -EINVAL;  
	}  
	/* 
	 * Create the internal data structure ("struct eventpoll"). 
	 */  
	error = ep_alloc(&ep);  
	if (error < 0) {  
		return error;  
	}  
	/* 
	 * Creates all the items needed to setup an eventpoll file. That is, 
	 * a file structure and a free file descriptor. 
	 */  
	fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));  
	if (fd < 0) {  
		 error = fd;  
		 goto out_free_ep;  
	  }  
	  // 设置epfd的相关操作,由于epoll也是文件也提供了poll操作  
	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,  
							  O_RDWR | (flags & O_CLOEXEC));  
	if (IS_ERR(file)) {  
		error = PTR_ERR(file);  
		goto out_free_fd;  
	}  
	fd_install(fd, file);  
	ep->file = file;  
	return fd;  
  
out_free_fd:  
	put_unused_fd(fd);  
out_free_ep:  
	ep_free(ep);  
	return error;  
}  

epoll中的递归死循环和深度检查

递归深度检测(ep_call_nested)

epoll本身也是文件,也可以被poll/select/epoll监视,如果epoll之间互相监视就有可能导致死循环。epoll的实现中,所有可能产生递归调用的函数都由函函数ep_call_nested进行包裹,递归调用过程中出现死循环或递归过深就会打破死循环和递归调用直接返回。该函数的实现依赖于一个外部的全局链表nested_call_node(不同的函数调用使用不同的节点),每次调用可能发生递归的函数(nproc)就向链表中添加一个包含当前函数调用上下文ctx(进程,CPU,或epoll文件)和处理的对象标识cookie的节点,通过检测是否有相同的节点就可以知道是否发生了死循环,检查链表中同一上下文包含的节点个数就可以知道递归的深度。以下就是这一过程的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
struct nested_call_node {  
	struct list_head llink;  
	void *cookie;   // 函数运行标识, 任务标志  
	void *ctx;      // 运行环境标识  
};  
struct nested_calls {  
	struct list_head tasks_call_list;  
	spinlock_t lock;  
};  
  
// 全局的不同调用使用的链表  
// 死循环检查和唤醒风暴检查链表  
static nested_call_node poll_loop_ncalls;  
// 唤醒时使用的检查链表  
static nested_call_node poll_safewake_ncalls;  
// 扫描readylist 时使用的链表  
static nested_call_node poll_readywalk_ncalls;  
  
  
// 限制epoll 中直接或间接递归调用的深度并防止死循环  
// ctx: 任务运行上下文(进程, CPU 等)  
// cookie: 每个任务的标识  
// priv: 任务运行需要的私有数据  
// 如果用面向对象语言实现应该就会是一个wapper类  
static int ep_call_nested(struct nested_calls *ncalls, int max_nests,  
						  int (*nproc)(void *, void *, int), void *priv,  
						  void *cookie, void *ctx)  
{  
	int error, call_nests = 0;  
	unsigned long flags;  
	struct list_head *lsthead = &ncalls->tasks_call_list;  
	struct nested_call_node *tncur;  
	struct nested_call_node tnode;  
	spin_lock_irqsave(&ncalls->lock, flags);  
	// 检查原有的嵌套调用链表ncalls, 查看是否有深度超过限制的情况  
	list_for_each_entry(tncur, lsthead, llink) {  
		// 同一上下文中(ctx)有相同的任务(cookie)说明产生了死循环  
		// 同一上下文的递归深度call_nests 超过限制  
		if (tncur->ctx == ctx &&  
				(tncur->cookie == cookie || ++call_nests > max_nests)) {  
			error = -1;  
		}  
		goto out_unlock;  
	}  
	/* 将当前的任务请求添加到调用列表*/  
	tnode.ctx = ctx;  
	tnode.cookie = cookie;  
	list_add(&tnode.llink, lsthead);  
	spin_unlock_irqrestore(&ncalls->lock, flags);  
	/* nproc 可能会导致递归调用(直接或间接)ep_call_nested 
		 * 如果发生递归调用, 那么在此函数返回之前, 
		 * ncalls 又会被加入额外的节点, 
		 * 这样通过前面的检测就可以知道递归调用的深度 
	  */  
	error = (*nproc)(priv, cookie, call_nests);  
	/* 从链表中删除当前任务*/  
	spin_lock_irqsave(&ncalls->lock, flags);  
	list_del(&tnode.llink);  
out_unlock:  
	spin_unlock_irqrestore(&ncalls->lock, flags);  
	return error;  
}  

循环检测(ep_loop_check)

循环检查(ep_loop_check),该函数递归调用ep_loop_check_proc利用ep_call_nested来实现epoll之间相互监视的死循环。因为ep_call_nested中已经对死循环和过深的递归做了检查,实际的ep_loop_check_proc的实现只是递归调用自己。其中的visited_list和visited标记完全是为了优化处理速度,如果没有visited_list和visited标记函数也是能够工作的。该函数中得上下文就是当前的进程,cookie就是正在遍历的epoll结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static LIST_HEAD(visited_list);  
// 检查 file (epoll)和ep 之间是否有循环  
static int ep_loop_check(struct eventpoll *ep, struct file *file)  
{  
	int ret;  
	struct eventpoll *ep_cur, *ep_next;  
  
	ret = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,  
						 ep_loop_check_proc, file, ep, current);  
	/* 清除链表和标志 */  
	list_for_each_entry_safe(ep_cur, ep_next, &visited_list,  
							 visited_list_link) {  
		ep_cur->visited = 0;  
		list_del(&ep_cur->visited_list_link);  
	}  
	return ret;  
}  
  
static int ep_loop_check_proc(void *priv, void *cookie, int call_nests)  
{  
	int error = 0;  
	struct file *file = priv;  
	struct eventpoll *ep = file->private_data;  
	struct eventpoll *ep_tovisit;  
	struct rb_node *rbp;  
	struct epitem *epi;  
  
	mutex_lock_nested(&ep->mtx, call_nests + 1);  
	// 标记当前为已遍历  
	ep->visited = 1;  
	list_add(&ep->visited_list_link, &visited_list);  
	// 遍历所有ep 监视的文件  
	for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {  
		epi = rb_entry(rbp, struct epitem, rbn);  
		if (unlikely(is_file_epoll(epi->ffd.file))) {  
			ep_tovisit = epi->ffd.file->private_data;  
			// 跳过先前已遍历的, 避免循环检查  
			if (ep_tovisit->visited) {  
				continue;  
			}  
			// 所有ep监视的未遍历的epoll  
			error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,  
								   ep_loop_check_proc, epi->ffd.file,  
								   ep_tovisit, current);  
			if (error != 0) {  
				break;  
			}  
		} else {  
			// 文件不在tfile_check_list 中, 添加  
			// 最外层的epoll 需要检查子epoll监视的文件  
			if (list_empty(&epi->ffd.file->f_tfile_llink))  
				list_add(&epi->ffd.file->f_tfile_llink,  
						 &tfile_check_list);  
		}  
	}  
	mutex_unlock(&ep->mtx);  
  
	return error;  
}  

唤醒风暴检测(reverse_path_check)

当文件状态发生改变时,会唤醒监听在其上的epoll文件,而这个epoll文件还可能唤醒其他的epoll文件,这种连续的唤醒就形成了一个唤醒路径,所有的唤醒路径就形成了一个有向图。如果文件对应的epoll唤醒有向图的节点过多,那么文件状态的改变就会唤醒所有的这些epoll(可能会唤醒很多进程,这样的开销是很大的),而实际上一个文件经过少数epoll处理以后就可能从就绪转到未就绪,剩余的epoll虽然认为文件已就绪而实际上经过某些处理后已不可用。epoll的实现中考虑到了此问题,在每次添加新文件到epoll中时,就会首先检查是否会出现这样的唤醒风暴。

该函数的实现逻辑是这样的,递归调用reverse_path_check_proc遍历监听在当前文件上的epoll文件,在reverse_pach_check_proc中统计并检查不同路径深度上epoll的个数,从而避免产生唤醒风暴。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#define PATH_ARR_SIZE 5  
// 在EPOLL_CTL_ADD 时, 检查是否有可能产生唤醒风暴  
// epoll 允许的单个文件的唤醒深度小于5, 例如  
// 一个文件最多允许唤醒1000个深度为1的epoll描述符,  
//允许所有被单个文件直接唤醒的epoll描述符再次唤醒的epoll描述符总数是500  
//  
  
// 深度限制  
static const int path_limits[PATH_ARR_SIZE] = { 1000, 500, 100, 50, 10 };  
// 计算出来的深度  
static int path_count[PATH_ARR_SIZE];  
  
static int path_count_inc(int nests)  
{  
	/* Allow an arbitrary number of depth 1 paths */  
	if (nests == 0) {  
		return 0;  
	}  
  
	if (++path_count[nests] > path_limits[nests]) {  
		return -1;  
	}  
	return 0;  
}  
  
static void path_count_init(void)  
{  
	int i;  
  
	for (i = 0; i < PATH_ARR_SIZE; i++) {  
		path_count[i] = 0;  
	}  
}  
  
// 唤醒风暴检查函数  
static int reverse_path_check(void)  
{  
	int error = 0;  
	struct file *current_file;  
  
	/* let's call this for all tfiles */  
	// 遍历全局tfile_check_list 中的文件, 第一级  
	list_for_each_entry(current_file, &tfile_check_list, f_tfile_llink) {  
		// 初始化  
		path_count_init();  
		// 限制递归的深度, 并检查每个深度上唤醒的epoll 数量  
		error = ep_call_nested(&poll_loop_ncalls, EP_MAX_NESTS,  
							   reverse_path_check_proc, current_file,  
							   current_file, current);  
		if (error) {  
			break;  
		}  
	}  
	return error;  
}  
static int reverse_path_check_proc(void *priv, void *cookie, int call_nests)  
{  
	int error = 0;  
	struct file *file = priv;  
	struct file *child_file;  
	struct epitem *epi;  
  
	list_for_each_entry(epi, &file->f_ep_links, fllink) {  
		// 遍历监视file 的epoll  
		child_file = epi->ep->file;  
		if (is_file_epoll(child_file)) {  
			if (list_empty(&child_file->f_ep_links)) {  
				// 没有其他的epoll监视当前的这个epoll,  
				// 已经是叶子了  
				if (path_count_inc(call_nests)) {  
					error = -1;  
					break;  
				}  
			} else {  
				// 遍历监视这个epoll 文件的epoll,  
				// 递归调用  
				error = ep_call_nested(&poll_loop_ncalls,  
									   EP_MAX_NESTS,  
									   reverse_path_check_proc,  
									   child_file, child_file,  
									   current);  
			}  
			if (error != 0) {  
				break;  
			}  
		} else {  
			// 不是epoll , 不可能吧?  
			printk(KERN_ERR "reverse_path_check_proc: "  
				   "file is not an ep!\n");  
		}  
	}  
	return error;  
}  

epoll 的唤醒过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void ep_poll_safewake(wait_queue_head_t *wq)  
{  
	int this_cpu = get_cpu();  
  
	ep_call_nested(&poll_safewake_ncalls, EP_MAX_NESTS,  
				   ep_poll_wakeup_proc, NULL, wq, (void *) (long) this_cpu);  
  
	put_cpu();  
}  
  
static int ep_poll_wakeup_proc(void *priv, void *cookie, int call_nests)  
{  
	ep_wake_up_nested((wait_queue_head_t *) cookie, POLLIN,  
					  1 + call_nests);  
	return 0;  
}  
  
static inline void ep_wake_up_nested(wait_queue_head_t *wqueue,  
									 unsigned long events, int subclass)  
{  
	// 这回唤醒所有正在等待此epfd 的select/epoll/poll 等  
	// 如果唤醒的是epoll 就可能唤醒其他的epoll, 产生连锁反应  
	// 这个很可能在中断上下文中被调用  
	wake_up_poll(wqueue, events);  
}  

epoll_ctl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// long epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
  
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,  
				struct epoll_event __user *, event)  
{  
	int error;  
	int did_lock_epmutex = 0;  
	struct file *file, *tfile;  
	struct eventpoll *ep;  
	struct epitem *epi;  
	struct epoll_event epds;  
  
	error = -EFAULT;  
	if (ep_op_has_event(op) &&  
			// 复制用户空间数据到内核  
			copy_from_user(&epds, event, sizeof(struct epoll_event))) {  
		goto error_return;  
	}  
  
	// 取得 epfd 对应的文件  
	error = -EBADF;  
	file = fget(epfd);  
	if (!file) {  
		goto error_return;  
	}  
  
	// 取得目标文件  
	tfile = fget(fd);  
	if (!tfile) {  
		goto error_fput;  
	}  
  
	// 目标文件必须提供 poll 操作  
	error = -EPERM;  
	if (!tfile->f_op || !tfile->f_op->poll) {  
		goto error_tgt_fput;  
	}  
  
	// 添加自身或epfd 不是epoll 句柄  
	error = -EINVAL;  
	if (file == tfile || !is_file_epoll(file)) {  
		goto error_tgt_fput;  
	}  
  
	// 取得内部结构eventpoll  
	ep = file->private_data;  
  
	// EPOLL_CTL_MOD 不需要加全局锁 epmutex  
	if (op == EPOLL_CTL_ADD || op == EPOLL_CTL_DEL) {  
		mutex_lock(&epmutex);  
		did_lock_epmutex = 1;  
	}  
	if (op == EPOLL_CTL_ADD) {  
		if (is_file_epoll(tfile)) {  
			error = -ELOOP;  
			// 目标文件也是epoll 检测是否有循环包含的问题  
			if (ep_loop_check(ep, tfile) != 0) {  
				goto error_tgt_fput;  
			}  
		} else  
		{  
			// 将目标文件添加到 epoll 全局的tfile_check_list 中  
			list_add(&tfile->f_tfile_llink, &tfile_check_list);  
		}  
	}  
  
	mutex_lock_nested(&ep->mtx, 0);  
  
	// 以tfile 和fd 为key 在rbtree 中查找文件对应的epitem  
	epi = ep_find(ep, tfile, fd);  
  
	error = -EINVAL;  
	switch (op) {  
	case EPOLL_CTL_ADD:  
		if (!epi) {  
			// 没找到, 添加额外添加ERR HUP 事件  
			epds.events |= POLLERR | POLLHUP;  
			error = ep_insert(ep, &epds, tfile, fd);  
		} else {  
			error = -EEXIST;  
		}  
		// 清空文件检查列表  
		clear_tfile_check_list();  
		break;  
	case EPOLL_CTL_DEL:  
		if (epi) {  
			error = ep_remove(ep, epi);  
		} else {  
			error = -ENOENT;  
		}  
		break;  
	case EPOLL_CTL_MOD:  
		if (epi) {  
			epds.events |= POLLERR | POLLHUP;  
			error = ep_modify(ep, epi, &epds);  
		} else {  
			error = -ENOENT;  
		}  
		break;  
	}  
	mutex_unlock(&ep->mtx);  
  
error_tgt_fput:  
	if (did_lock_epmutex) {  
		mutex_unlock(&epmutex);  
	}  
  
	fput(tfile);  
error_fput:  
	fput(file);  
error_return:  
  
	return error;  
}  

EPOLL_CTL_ADD 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// EPOLL_CTL_ADD  
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,  
					 struct file *tfile, int fd)  
{  
	int error, revents, pwake = 0;  
	unsigned long flags;  
	long user_watches;  
	struct epitem *epi;  
	struct ep_pqueue epq;  
	/* 
	struct ep_pqueue { 
		poll_table pt; 
		struct epitem *epi; 
	}; 
	*/  
  
	// 增加监视文件数  
	user_watches = atomic_long_read(&ep->user->epoll_watches);  
	if (unlikely(user_watches >= max_user_watches)) {  
		return -ENOSPC;  
	}  
  
	// 分配初始化 epi  
	if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL))) {  
		return -ENOMEM;  
	}  
  
	INIT_LIST_HEAD(&epi->rdllink);  
	INIT_LIST_HEAD(&epi->fllink);  
	INIT_LIST_HEAD(&epi->pwqlist);  
	epi->ep = ep;  
	// 初始化红黑树中的key  
	ep_set_ffd(&epi->ffd, tfile, fd);  
	// 直接复制用户结构  
	epi->event = *event;  
	epi->nwait = 0;  
	epi->next = EP_UNACTIVE_PTR;  
  
	// 初始化临时的 epq  
	epq.epi = epi;  
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);  
	// 设置事件掩码  
	epq.pt._key = event->events;  
	//  内部会调用ep_ptable_queue_proc, 在文件对应的wait queue head 上  
	// 注册回调函数, 并返回当前文件的状态  
	revents = tfile->f_op->poll(tfile, &epq.pt);  
  
	// 检查错误  
	error = -ENOMEM;  
	if (epi->nwait < 0) { // f_op->poll 过程出错  
		goto error_unregister;  
	}  
	// 添加当前的epitem 到文件的f_ep_links 链表  
	spin_lock(&tfile->f_lock);  
	list_add_tail(&epi->fllink, &tfile->f_ep_links);  
	spin_unlock(&tfile->f_lock);  
  
	// 插入epi 到rbtree  
	ep_rbtree_insert(ep, epi);  
  
	/* now check if we've created too many backpaths */  
	error = -EINVAL;  
	if (reverse_path_check()) {  
		goto error_remove_epi;  
	}  
  
	spin_lock_irqsave(&ep->lock, flags);  
  
	/* 文件已经就绪插入到就绪链表rdllist */  
	if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {  
		list_add_tail(&epi->rdllink, &ep->rdllist);  
  
  
		if (waitqueue_active(&ep->wq))  
			// 通知sys_epoll_wait , 调用回调函数唤醒sys_epoll_wait 进程  
		{  
			wake_up_locked(&ep->wq);  
		}  
		// 先不通知调用eventpoll_poll 的进程  
		if (waitqueue_active(&ep->poll_wait)) {  
			pwake++;  
		}  
	}  
  
	spin_unlock_irqrestore(&ep->lock, flags);  
  
	atomic_long_inc(&ep->user->epoll_watches);  
  
	if (pwake)  
		// 安全通知调用eventpoll_poll 的进程  
	{  
		ep_poll_safewake(&ep->poll_wait);  
	}  
  
	return 0;  
  
error_remove_epi:  
	spin_lock(&tfile->f_lock);  
	// 删除文件上的 epi  
	if (ep_is_linked(&epi->fllink)) {  
		list_del_init(&epi->fllink);  
	}  
	spin_unlock(&tfile->f_lock);  
  
	// 从红黑树中删除  
	rb_erase(&epi->rbn, &ep->rbr);  
  
error_unregister:  
	// 从文件的wait_queue 中删除, 释放epitem 关联的所有eppoll_entry  
	ep_unregister_pollwait(ep, epi);  
  
	/* 
	 * We need to do this because an event could have been arrived on some 
	 * allocated wait queue. Note that we don't care about the ep->ovflist 
	 * list, since that is used/cleaned only inside a section bound by "mtx". 
	 * And ep_insert() is called with "mtx" held. 
	 */  
	// TODO:  
	spin_lock_irqsave(&ep->lock, flags);  
	if (ep_is_linked(&epi->rdllink)) {  
		list_del_init(&epi->rdllink);  
	}  
	spin_unlock_irqrestore(&ep->lock, flags);  
  
	// 释放epi  
	kmem_cache_free(epi_cache, epi);  
  
	return error;  
}  

EPOLL_CTL_DEL

EPOLL_CTL_DEL 的实现调用的是 ep_remove 函数,函数只是清除ADD时, 添加的各种结构,EPOLL_CTL_MOD 的实现调用的是ep_modify,在ep_modify中用新的事件掩码调用f_ops->poll,检测事件是否已可用,如果可用就直接唤醒epoll,这两个的实现与EPOLL_CTL_ADD 类似,代码上比较清晰,这里就不具体分析了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static int ep_remove(struct eventpoll *ep, struct epitem *epi)  
{  
	unsigned long flags;  
	struct file *file = epi->ffd.file;  
  
	/* 
	 * Removes poll wait queue hooks. We _have_ to do this without holding 
	 * the "ep->lock" otherwise a deadlock might occur. This because of the 
	 * sequence of the lock acquisition. Here we do "ep->lock" then the wait 
	 * queue head lock when unregistering the wait queue. The wakeup callback 
	 * will run by holding the wait queue head lock and will call our callback 
	 * that will try to get "ep->lock". 
	 */  
	ep_unregister_pollwait(ep, epi);  
  
	/* Remove the current item from the list of epoll hooks */  
	spin_lock(&file->f_lock);  
	if (ep_is_linked(&epi->fllink))  
		list_del_init(&epi->fllink);  
	spin_unlock(&file->f_lock);  
  
	rb_erase(&epi->rbn, &ep->rbr);  
  
	spin_lock_irqsave(&ep->lock, flags);  
	if (ep_is_linked(&epi->rdllink))  
		list_del_init(&epi->rdllink);  
	spin_unlock_irqrestore(&ep->lock, flags);  
  
	/* At this point it is safe to free the eventpoll item */  
	kmem_cache_free(epi_cache, epi);  
  
	atomic_long_dec(&ep->user->epoll_watches);  
  
	return 0;  
}  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/* 
 * Modify the interest event mask by dropping an event if the new mask 
 * has a match in the current file status. Must be called with "mtx" held. 
 */  
static int ep_modify(struct eventpoll *ep, struct epitem *epi, struct epoll_event *event)  
{  
	int pwake = 0;  
	unsigned int revents;  
	poll_table pt;  
  
	init_poll_funcptr(&pt, NULL);  
  
	/* 
	 * Set the new event interest mask before calling f_op->poll(); 
	 * otherwise we might miss an event that happens between the 
	 * f_op->poll() call and the new event set registering. 
	 */  
	epi->event.events = event->events;  
	pt._key = event->events;  
	epi->event.data = event->data; /* protected by mtx */  
  
	/* 
	 * Get current event bits. We can safely use the file* here because 
	 * its usage count has been increased by the caller of this function. 
	 */  
	revents = epi->ffd.file->f_op->poll(epi->ffd.file, &pt);  
  
	/* 
	 * If the item is "hot" and it is not registered inside the ready 
	 * list, push it inside. 
	 */  
	if (revents & event->events) {  
		spin_lock_irq(&ep->lock);  
		if (!ep_is_linked(&epi->rdllink)) {  
			list_add_tail(&epi->rdllink, &ep->rdllist);  
  
			/* Notify waiting tasks that events are available */  
			if (waitqueue_active(&ep->wq))  
				wake_up_locked(&ep->wq);  
			if (waitqueue_active(&ep->poll_wait))  
				pwake++;  
		}  
		spin_unlock_irq(&ep->lock);  
	}  
  
	/* We have to call this outside the lock */  
	if (pwake)  
		ep_poll_safewake(&ep->poll_wait);  
  
	return 0;  
}  

epoll_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/* 
epoll_wait实现 
*/  
  
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,  
				int, maxevents, int, timeout)  
{  
	int error;  
	struct file *file;  
	struct eventpoll *ep;  
  
	// 检查输入数据有效性  
	if (maxevents <= 0 || maxevents > EP_MAX_EVENTS) {  
		return -EINVAL;  
	}  
  
	if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {  
		error = -EFAULT;  
		goto error_return;  
	}  
  
	/* Get the "struct file *" for the eventpoll file */  
	error = -EBADF;  
	file = fget(epfd);  
	if (!file) {  
		goto error_return;  
	}  
  
	error = -EINVAL;  
	if (!is_file_epoll(file)) {  
		goto error_fput;  
	}  
	// 取得ep 结构  
	ep = file->private_data;  
  
	// 等待事件  
	error = ep_poll(ep, events, maxevents, timeout);  
  
error_fput:  
	fput(file);  
error_return:  
  
	return error;  
}  
  
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,  
				   int maxevents, long timeout)  
{  
	int res = 0, eavail, timed_out = 0;  
	unsigned long flags;  
	long slack = 0;  
	wait_queue_t wait;  
	ktime_t expires, *to = NULL;  
  
	if (timeout > 0) {  
		// 转换为内核时间  
		struct timespec end_time = ep_set_mstimeout(timeout);  
  
		slack = select_estimate_accuracy(&end_time);  
		to = &expires;  
		*to = timespec_to_ktime(end_time);  
	} else if (timeout == 0) {  
		// 已经超时直接检查readylist  
		timed_out = 1;  
		spin_lock_irqsave(&ep->lock, flags);  
		goto check_events;  
	}  
  
fetch_events:  
	spin_lock_irqsave(&ep->lock, flags);  
  
	// 没有可用的事件,ready list 和ovflist 都为空  
	if (!ep_events_available(ep)) {  
  
		// 添加当前进程的唤醒函数  
		init_waitqueue_entry(&wait, current);  
		__add_wait_queue_exclusive(&ep->wq, &wait);  
  
		for (;;) {  
			/* 
			 * We don't want to sleep if the ep_poll_callback() sends us 
			 * a wakeup in between. That's why we set the task state 
			 * to TASK_INTERRUPTIBLE before doing the checks. 
			 */  
			set_current_state(TASK_INTERRUPTIBLE);  
			if (ep_events_available(ep) || timed_out) {  
				break;  
			}  
			if (signal_pending(current)) {  
				res = -EINTR;  
				break;  
			}  
  
			spin_unlock_irqrestore(&ep->lock, flags);  
			// 挂起当前进程,等待唤醒或超时  
			if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) {  
				timed_out = 1;  
			}  
  
			spin_lock_irqsave(&ep->lock, flags);  
		}  
	  
		__remove_wait_queue(&ep->wq, &wait);  
  
		set_current_state(TASK_RUNNING);  
	}  
check_events:  
	// 再次检查是否有可用事件  
	eavail = ep_events_available(ep);  
  
	spin_unlock_irqrestore(&ep->lock, flags);  
  
	/* 
	 * Try to transfer events to user space. In case we get 0 events and 
	 * there's still timeout left over, we go trying again in search of 
	 * more luck. 
	 */  
	if (!res && eavail   
			&& !(res = ep_send_events(ep, events, maxevents)) // 复制事件到用户空间  
			&& !timed_out) // 复制事件失败并且没有超时,重新等待。  
			{  
		goto fetch_events;  
	}  
  
	return res;  
}  
  
  
static inline int ep_events_available(struct eventpoll *ep)  
{  
	return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;  
}  
  
struct ep_send_events_data {  
	int maxevents;  
	struct epoll_event __user *events;  
};  
  
static int ep_send_events(struct eventpoll *ep,  
						  struct epoll_event __user *events, int maxevents)  
{  
	struct ep_send_events_data esed;  
  
	esed.maxevents = maxevents;  
	esed.events = events;  
  
	return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0);  
}  
  
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,  
							   void *priv)  
{  
	struct ep_send_events_data *esed = priv;  
	int eventcnt;  
	unsigned int revents;  
	struct epitem *epi;  
	struct epoll_event __user *uevent;  
  
	// 遍历已就绪链表  
	for (eventcnt = 0, uevent = esed->events;  
			!list_empty(head) && eventcnt < esed->maxevents;) {  
		epi = list_first_entry(head, struct epitem, rdllink);  
  
		list_del_init(&epi->rdllink);  
		// 获取ready 事件掩码  
		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &  
				  epi->event.events;  
  
		/* 
		 * If the event mask intersect the caller-requested one, 
		 * deliver the event to userspace. Again, ep_scan_ready_list() 
		 * is holding "mtx", so no operations coming from userspace 
		 * can change the item. 
		 */  
		if (revents) {  
			// 事件就绪, 复制到用户空间  
			if (__put_user(revents, &uevent->events) ||  
					__put_user(epi->event.data, &uevent->data)) {  
				list_add(&epi->rdllink, head);  
				return eventcnt ? eventcnt : -EFAULT;  
			}  
			eventcnt++;  
			uevent++;  
			if (epi->event.events & EPOLLONESHOT) {  
				epi->event.events &= EP_PRIVATE_BITS;  
			} else if (!(epi->event.events & EPOLLET)) {  
				// 不是边缘模式, 再次添加到ready list,  
				// 下次epoll_wait 时直接进入此函数检查ready list是否仍然继续  
				list_add_tail(&epi->rdllink, &ep->rdllist);  
			}  
			// 如果是边缘模式, 只有当文件状态发生改变时,  
			// 才文件会再次触发wait_address 上wait_queue的回调函数,  
		}  
	}  
  
	return eventcnt;  
}  

eventpoll_poll

由于epoll自身也是文件系统,其描述符也可以被poll/select/epoll监视,因此需要实现poll方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
static const struct file_operations eventpoll_fops = {  
	.release = ep_eventpoll_release,  
	.poll    = ep_eventpoll_poll,  
	.llseek  = noop_llseek,  
};  
  
static unsigned int ep_eventpoll_poll(struct file *file, poll_table *wait)  
{  
	int pollflags;  
	struct eventpoll *ep = file->private_data;  
	// 插入到wait_queue  
	poll_wait(file, &ep->poll_wait, wait);  
	// 扫描就绪的文件列表, 调用每个文件上的poll 检测是否真的就绪,  
	// 然后复制到用户空间  
	// 文件列表中有可能有epoll文件, 调用poll的时候有可能会产生递归,  
	// 调用所以用ep_call_nested 包装一下, 防止死循环和过深的调用  
	pollflags = ep_call_nested(&poll_readywalk_ncalls, EP_MAX_NESTS,  
							   ep_poll_readyevents_proc, ep, ep, current);  
	// static struct nested_calls poll_readywalk_ncalls;  
	return pollflags != -1 ? pollflags : 0;  
}  
  
static int ep_poll_readyevents_proc(void *priv, void *cookie, int call_nests)  
{  
	return ep_scan_ready_list(priv, ep_read_events_proc, NULL, call_nests + 1);  
}  
  
static int ep_scan_ready_list(struct eventpoll *ep,  
							  int (*sproc)(struct eventpoll *,  
									  struct list_head *, void *),  
							  void *priv,  
							  int depth)  
{  
	int error, pwake = 0;  
	unsigned long flags;  
	struct epitem *epi, *nepi;  
	LIST_HEAD(txlist);  
  
	/* 
	 * We need to lock this because we could be hit by 
	 * eventpoll_release_file() and epoll_ctl(). 
	 */  
	mutex_lock_nested(&ep->mtx, depth);  
  
	spin_lock_irqsave(&ep->lock, flags);  
	// 移动rdllist 到新的链表txlist  
	list_splice_init(&ep->rdllist, &txlist);  
	// 改变ovflist 的状态, 如果ep->ovflist != EP_UNACTIVE_PTR,  
	// 当文件激活wait_queue时,就会将对应的epitem加入到ep->ovflist  
	// 否则将文件直接加入到ep->rdllist,  
	// 这样做的目的是避免丢失事件  
	// 这里不需要检查ep->ovflist 的状态,因为ep->mtx的存在保证此处的ep->ovflist  
	// 一定是EP_UNACTIVE_PTR  
	ep->ovflist = NULL;  
	spin_unlock_irqrestore(&ep->lock, flags);  
  
	// 调用扫描函数处理txlist  
	error = (*sproc)(ep, &txlist, priv);  
  
	spin_lock_irqsave(&ep->lock, flags);  
  
	// 调用 sproc 时可能有新的事件,遍历这些新的事件将其插入到ready list  
	for (nepi = ep->ovflist; (epi = nepi) != NULL;  
			nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {  
		// #define EP_UNACTIVE_PTR (void *) -1  
		// epi 不在rdllist, 插入  
		if (!ep_is_linked(&epi->rdllink)) {  
			list_add_tail(&epi->rdllink, &ep->rdllist);  
		}  
	}  
	// 还原ep->ovflist的状态  
	ep->ovflist = EP_UNACTIVE_PTR;  
  
	// 将处理后的 txlist 链接到 rdllist  
	list_splice(&txlist, &ep->rdllist);  
  
	if (!list_empty(&ep->rdllist)) {  
		// 唤醒epoll_wait  
		if (waitqueue_active(&ep->wq)) {  
			wake_up_locked(&ep->wq);  
		}  
		// 当前的ep有其他的事件通知机制监控  
		if (waitqueue_active(&ep->poll_wait)) {  
			pwake++;  
		}  
	}  
	spin_unlock_irqrestore(&ep->lock, flags);  
  
	mutex_unlock(&ep->mtx);  
  
	if (pwake) {  
		// 安全唤醒外部的事件通知机制  
		ep_poll_safewake(&ep->poll_wait);  
	}  
  
	return error;  
}  
  
static int ep_read_events_proc(struct eventpoll *ep, struct list_head *head,  
							   void *priv)  
{  
	struct epitem *epi, *tmp;  
	poll_table pt;  
	init_poll_funcptr(&pt, NULL);  
	list_for_each_entry_safe(epi, tmp, head, rdllink) {  
		pt._key = epi->event.events;  
		if (epi->ffd.file->f_op->poll(epi->ffd.file, &pt) &  
				epi->event.events) {  
			return POLLIN | POLLRDNORM;  
		} else {  
			 // 这个事件虽然在就绪列表中,  
			 // 但是实际上并没有就绪, 将他移除  
		 // 这有可能是水平触发模式中没有将文件从就绪列表中移除  
		 // 也可能是事件插入到就绪列表后有其他的线程对文件进行了操作  
			list_del_init(&epi->rdllink);  
		}  
	}  
	return 0;  
}  

epoll全景

以下是epoll使用的全部数据结构之间的关系图,采用的是一种类UML图,希望对理解epoll的内部实现有所帮助。

poll/select/epoll 对比

通过以上的分析可以看出,poll和select的实现基本是一致,只是用户到内核传递的数据格式有所不同,

select和poll即使只有一个描述符就绪,也要遍历整个集合。如果集合中活跃的描述符很少,遍历过程的开销就会变得很大,而如果集合中大部分的描述符都是活跃的,遍历过程的开销又可以忽略。

epoll的实现中每次只遍历活跃的描述符(如果是水平触发,也会遍历先前活跃的描述符),在活跃描述符较少的情况下就会很有优势,在代码的分析过程中可以看到epoll的实现过于复杂并且其实现过程中需要同步处理(锁),如果大部分描述符都是活跃的,epoll的效率可能不如select或poll。(参见epoll 和poll的性能测试 http://jacquesmattheij.com/Poll+vs+Epoll+once+again)

select能够处理的最大fd无法超出FDSETSIZE。

select会复写传入的fd_set 指针,而poll对每个fd返回一个掩码,不更改原来的掩码,从而可以对同一个集合多次调用poll,而无需调整。

select对每个文件描述符最多使用3个bit,而poll采用的pollfd需要使用64个bit,epoll采用的 epoll_event则需要96个bit

如果事件需要循环处理select, poll 每一次的处理都要将全部的数据复制到内核,而epoll的实现中,内核将持久维护加入的描述符,减少了内核和用户复制数据的开销。


https://www.cnblogs.com/wsw-seu/p/8274195.html