kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp
sar -n DEV 1

HAProxy 研究笔记 -- epoll 事件的处理

http://blog.chinaunix.net/uid-10167808-id-3825388.html

本文介绍 HAProxy 中 epoll 事件的处理机制,版本为 1.5-dev17。

1
2
3
4
5
6
7
8
1. 背景知识
	1.1. fd 更新列表
	1.2. fdtab 数据结构
	1.3. fd event 的设置
2. _do_poll() 代码分析
	2.1. 检测 fd 更新列表
	2.2. 获取活动的 fd
	2.3. 处理活动的 fd

HAProxy 支持多种异步机制,有 select,poll,epoll,kqueue 等。本文介绍 epoll 的 相关实现,epoll 的代码在源文件 ev_epoll.c 中。epoll 的关键处理逻辑集中在函数 _do_poll() 中,下面会详细的分析该函数。

1. 背景知识

在分析 _do_poll() 实现之前,有一些关联的设计需要简单介绍一下,以便于理解该函数中 的一些代码。

1.1. fd 更新列表

见 fd.c 中的全局变量:

1
2
3
/* FD status is defined by the poller's status and by the speculative I/O list */
int fd_nbupdt = 0;             // number of updates in the list
unsigned int *fd_updt = NULL;  // FD updates list

这两个全局变量用来记录状态需要更新的 fd 的数量及具体的 fd。_do_poll() 中会根据 这些信息修改对应 fd 的 epoll 设置。

1.2. fdtab 数据结构

struct fdtab 数据结构在 include/types/fd.h 中定义,内容如下:

1
2
3
4
5
6
7
8
9
10
/* info about one given fd */
struct fdtab {
	int (*iocb)(int fd);                 /* I/O handler, returns FD_WAIT_* */
	void *owner;                         /* the connection or listener associated with this fd, NULL if closed */
	unsigned int  spec_p;                /* speculative polling: position in spec list+1. 0=not in list. */
	unsigned char spec_e;                /* speculative polling: read and write events status. 4 bits */
	unsigned char ev;                    /* event seen in return of poll() : FD_POLL_* */
	unsigned char new:1;                 /* 1 if this fd has just been created */
	unsigned char updated:1;             /* 1 if this fd is already in the update list */
};

该结构的成员基本上都有注释,除了前两个成员,其余的都是和 fd IO 处理相关的。后面 分析代码的时候再具体的解释。

src/fd.c 中还有一个全局变量:

1
struct fdtab *fdtab = NULL;     /* array of all the file descriptors */

fdtab[] 记录了 HAProxy 所有 fd 的信息,数组的每个成员都是一个 struct fdtab, 而且成员的 index 正是 fd 的值,这样相当于 hash,可以高效的定位到某个 fd 对应的 信息。

1.3. fd event 的设置

include/proto/fd.h 中定义了一些设置 fd event 的函数:

1
2
3
4
5
6
/* event manipulation primitives for use by I/O callbacks */
static inline void fd_want_recv(int fd)
static inline void fd_stop_recv(int fd)
static inline void fd_want_send(int fd)
static inline void fd_stop_send(int fd)
static inline void fd_stop_both(int fd)

这些函数见名知义,就是用来设置 fd 启动或停止接收以及发送的。这些函数底层调用的 是一系列 fd_ev_XXX() 的函数真正的设置 fd。这里简单介绍一下 fd_ev_set() 的代码:

1
2
3
4
5
6
7
8
9
static inline void fd_ev_set(int fd, int dir)
{
	unsigned int i = ((unsigned int)fdtab[fd].spec_e) & (FD_EV_STATUS << dir);
	...
	if (i & (FD_EV_ACTIVE << dir))
		return; /* already in desired state */
	fdtab[fd].spec_e |= (FD_EV_ACTIVE << dir);
	updt_fd(fd); /* need an update entry to change the state */
}

该函数会判断一下 fd 的对应 event 是否已经设置了。没有设置的话,才重新设置。设置 的结果记录在 struct fdtab 结构的 spec_e 成员上,而且只是低 4 位上。然后调用 updt_fd() 将该 fd 放到 update list 中:

1
2
3
4
5
6
7
8
static inline void updt_fd(const int fd)
{
	if (fdtab[fd].updated)
		/* already scheduled for update */
		return;
	fdtab[fd].updated = 1;
	fd_updt[fd_nbupdt++] = fd;
}

从上面代码可以看出, struct fdtab 中的 updated 成员用来标记当前 fd 是否已经被放 到 update list 中了。没有的话,则更新设置 updated 成员,并且记录到 fd_updt[] 中, 并且增加需要跟新的 fd 的计数 fd_nbupdt。

至此,用于分析 _do_poll() 的一些背景知识介绍完毕。

2. _do_poll() 代码分析

这里将会重点的分析 _do_poll() 的实现。该函数可以粗略分为三部分:

1
2
3
检查 fd 更新列表,获取各个 fd event 的变化情况,并作 epoll 的设置
计算 epoll_wait 的 delay 时间,并调用 epoll_wait,获取活动的 fd
逐一处理所有有 IO 事件的 fd

以下将按顺序介绍这三部分的代码。

2.1. 检测 fd 更新列表

代码如下,后面会按行分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
 43 /*
 44  * speculative epoll() poller
 45  */
 46 REGPRM2 static void _do_poll(struct poller *p, int exp)
 47 {
 ..     ..
 53 
 54     /* first, scan the update list to find changes */
 55     for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
 56         fd = fd_updt[updt_idx];
 57         en = fdtab[fd].spec_e & 15;  /* new events */
 58         eo = fdtab[fd].spec_e >> 4;  /* previous events */
 59 
 60         if (fdtab[fd].owner && (eo ^ en)) {
 61             if ((eo ^ en) & FD_EV_POLLED_RW) {
 62                 /* poll status changed */
 63                 if ((en & FD_EV_POLLED_RW) == 0) {
 64                     /* fd removed from poll list */
 65                     opcode = EPOLL_CTL_DEL;
 66                 }
 67                 else if ((eo & FD_EV_POLLED_RW) == 0) {
 68                     /* new fd in the poll list */
 69                     opcode = EPOLL_CTL_ADD;
 70                 }
 71                 else {
 72                     /* fd status changed */
 73                     opcode = EPOLL_CTL_MOD;     
 74                 }
 75 
 76                 /* construct the epoll events based on new state */
 77                 ev.events = 0;
 78                 if (en & FD_EV_POLLED_R)
 79                     ev.events |= EPOLLIN;
 80 
 81                 if (en & FD_EV_POLLED_W)
 82                     ev.events |= EPOLLOUT;
 83 
 84                 ev.data.fd = fd;
 85                 epoll_ctl(epoll_fd, opcode, fd, &ev);
 86             }
 87 
 88             fdtab[fd].spec_e = (en << 4) + en;  /* save new events */
 89 
 90             if (!(en & FD_EV_ACTIVE_RW)) {
 91                 /* This fd doesn't use any active entry anymore, we can
 92                  * kill its entry.
 93                  */
 94                 release_spec_entry(fd);
 95             }
 96             else if ((en & ~eo) & FD_EV_ACTIVE_RW) {
 97                 /* we need a new spec entry now */
 98                 alloc_spec_entry(fd);
 99             }
100                                                             
101         }
102         fdtab[fd].updated = 0;
103         fdtab[fd].new = 0;
104     }
105     fd_nbupdt = 0;

haproxy 就是一个大的循环。每一轮循环,都顺序执行几个不同的功能。其中调用当前 poller 的 poll 方法便是其中一个环节。

55 - 56 行: 获取 fd 更新列表中的每一个 fd。 fd_updt[] 就是前面背景知识中介绍 的。haproxy 运行的不同阶段,都有可能通过调用背景知识中介绍的一些 fd event 设置函数 来更改 fd 的状态,最终会更新 fd_updt[] 和 fd_nbupdt。这里集中处理一下所有需要更新 的 fd。

57 - 58 行: 获取当前 fd 的最新事件,以及保存的上一次的事件。前面提到了,fd 的事 设置仅用 4 个 bit 就可以了。sturct fdtab 的 spec_e 成员是 unsigned char, 8 bit, 低 4 bit 保存 fd 当前最新的事件,高 4 bit 保存上一次的事件。这个做法就是为了判断 fd 的哪些事件上前面的处理中发生了变化,以便于更新。至于 fd 前一次的事件是什么时 后保存的,看后面的分析就知道了。

60 行: 主要判断 fd 记录的事件是否发生了变化。如果没有变化,就直接到 102-103 行 的处理了。这里有个小疑问,还没来及深入分析,就是哪些情况会使 fd 处于更新列表中, 但是 fd 上的事件有没有任何变化。

63 - 74 行:检测 fd 的 epoll operation 是否需要更改,比如ADD/DEL/MOD 等操作。

77 - 85 行:检测 fd 的 epoll events 的设置,并调用 epoll_ctl 设置 op 和 event

88 行:这里就是记录下 fd events 设置的最新状态。高低 4 位记录的结果相同。而在 程序运行过程中,仅修改低 4 位,这样和高 4 位一比较,就知道发生了哪些变化。

90 - 99 行:这里主要根据 fd 的新旧状态,更新 speculative I/O list。这个地方在 haproxy 的大循环中有独立的处理流程,这里不作分析。

102 - 103 行:清除 fd 的 new 和 updated 状态。new 状态通常是在新建一个 fd 时调 用 fd_insert 设置的,这里已经完成了 fd 状态的更新,因此两个成员均清零。

105 行: 整个 update list 都处理完了,fd_nbupdt 清零。haproxy 的其他处理流程会 继续更新 update list。下一次调用 _do_poll() 的时候继续处理。当然,这么说也说是 不全面的,因为接下来的处理流程也会有可能处理 fd 的 update list。但主要的处理还 是这里分析的代码块。

至此,fd 更新列表中的所有 fd 都处理完毕,该设置的也都设置了。下面就需要调用 epoll_wait 获得所有活动的 fd 了。 2.2. 获取活动的 fd

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
107     /* compute the epoll_wait() timeout */
108 
109     if (fd_nbspec || run_queue || signal_queue_len) {
...         ...
115         wait_time = 0;
116     }
117     else {
118         if (!exp)
119             wait_time = MAX_DELAY_MS;
120         else if (tick_is_expired(exp, now_ms))
121             wait_time = 0;
122         else {
123             wait_time = TICKS_TO_MS(tick_remain(now_ms, exp)) + 1;
124             if (wait_time > MAX_DELAY_MS)
125                 wait_time = MAX_DELAY_MS;
126         }
127     }
128 
129     /* now let's wait for polled events */
130 
131     fd = MIN(maxfd, global.tune.maxpollevents);
132     gettimeofday(&before_poll, NULL);
133     status = epoll_wait(epoll_fd, epoll_events, fd, wait_time);
134     tv_update_date(wait_time, status);
135     measure_idle();

107 - 127 行:主要是用来计算调用 epoll_wait 时的 timeout 参数。如果 fd_nbspec 不为 0,或 run_queue 中有任务需要运行,或者信号处理 queue 中有需要处理的,都设置 timeout 为 0,目的是希望 epoll_wait 尽快返回,程序好及时处理其他的任务。

131 - 135 行: 计算当前最多可以处理的 event 数目。这个数目也是可配置的。然后调用 epoll_wait, 所有活动 fd 的信息都保存在 epoll_events[] 数组中。

这部分代码逻辑比较简单,接下来就是处理所有活动的 fd 了。 2.3. 处理活动的 fd

逐一处理活动的 fd。这段代码也可以划分为若干个小代码,分别介绍如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
139     for (count = 0; count < status; count++) {
140         unsigned char n;
141         unsigned char e = epoll_events[count].events;
142         fd = epoll_events[count].data.fd;
143 
144         if (!fdtab[fd].owner)
145             continue;
146 
147         /* it looks complicated but gcc can optimize it away when constants
148          * have same values... In fact it depends on gcc :-(
149          */
150         fdtab[fd].ev &= FD_POLL_STICKY;
151         if (EPOLLIN == FD_POLL_IN && EPOLLOUT == FD_POLL_OUT &&
152             EPOLLPRI == FD_POLL_PRI && EPOLLERR == FD_POLL_ERR &&
153             EPOLLHUP == FD_POLL_HUP) {
154             n = e & (EPOLLIN|EPOLLOUT|EPOLLPRI|EPOLLERR|EPOLLHUP);
155         }
156         else {
157             n = ((e & EPOLLIN ) ? FD_POLL_IN  : 0) |
158                 ((e & EPOLLPRI) ? FD_POLL_PRI : 0) |
159                 ((e & EPOLLOUT) ? FD_POLL_OUT : 0) |
160                 ((e & EPOLLERR) ? FD_POLL_ERR : 0) |
161                 ((e & EPOLLHUP) ? FD_POLL_HUP : 0);
162         }
163 
164         if (!n)
165             continue;
166 
167         fdtab[fd].ev |= n;    
168

139 - 142 行: 从 epoll_events[] 中取出一个活动 fd 及其对应的 event。

150 行: fdtab[fd].ev 仅保留 FD_POLL_STICKY 设置,即 FD_POLL_ERR | FD_POLL_HUP, 代表仅保留 fd 原先 events 设置中的错误以及 hang up 的标记位,不管 epoll_wait 中 是否设置了该 fd 的这两个 events。

151 - 162 行: 这段代码的功能主要就是根据 epoll_wait 返回的 fd 的 events 设置情 况,正确的设置 fdtab[fd].ev。之所以代码还要加上条件判断,是因为 haproxy 自己也 用了一套标记 fd 的 events 的宏定义 FD_POLL_XXX,而 epoll_wait 返回的则是系统中 的 EPOLLXXX。因此,这里就涉及到系统标准的 events 转换到 haproxy 自定义 events 的过程。其中,151-154 行代表 haproxy 自定义的关于 fd 的 events 和系统标准的 完全一致,157-161 行代表 haproxy 自定义的和系统标准的不一致,因此需要一个一个 标记位判断,然后转换成 haproxy 自定义的。

167 行: 将转换后的 events 记录到 fdtab[fd].ev。因此,haproxy 中对于 fd events 的记录,始终是采用 haproxy 自定义的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
169         if (fdtab[fd].iocb) {
170             int new_updt, old_updt;
171 
172             /* Mark the events as speculative before processing
173              * them so that if nothing can be done we don't need
174              * to poll again.
175              */
176             if (fdtab[fd].ev & FD_POLL_IN)
177                 fd_ev_set(fd, DIR_RD);
178 
179             if (fdtab[fd].ev & FD_POLL_OUT)
180                 fd_ev_set(fd, DIR_WR);
181 
182             if (fdtab[fd].spec_p) {
183                 /* This fd was already scheduled for being called as a speculative I/O */
184                 continue;
185             }
186 
187             /* Save number of updates to detect creation of new FDs. */
188             old_updt = fd_nbupdt;
189             fdtab[fd].iocb(fd);

169 行: 正常情况下, fdtab[fd] 的 iocb 方法指向 conn_fd_handler,该函数负责处 理 fd 上的 IO 事件。

176 - 180 行: 根据前面设置的 fd 的 events,通过调用 fd_ev_set() 更新 fdtab 结构 的 spec_e 成员。也就是说,在调用 fd_ev_clr() 清理对应 event 之前,就不需要再次设 置 fd 的 event。因为 haproxy 认为仍然需要处理 fd 的 IO。fdtab 的 ev 成员是从 epoll_wait 返回的 events 转换后的结果,而 spec_e 成员则是 haproxy 加入了一些对 fd IO 事件可能性判断的结果。

188 - 189 行: 保存一下当前的 fd update list 的数目,接着调用 fd 的 iocb 方法, 也就是 conn_fd_handler()。之所以要保存当前的 fd update list 数目,是因为 conn_fd_handler() 执行时,如果接受了新的连接,则会有新的 fd 生成,这时也会更新 fd_nbupdt。记录下旧值,就是为了方便知道在 conn_fd_handler 执行之后,有哪些 fd 是新生成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
...             ...
200             for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
201                 fd = fd_updt[new_updt - 1];
202                 if (!fdtab[fd].new)
203                     continue;
204 
205                 fdtab[fd].new = 0;
206                 fdtab[fd].ev &= FD_POLL_STICKY;
207 
208                 if ((fdtab[fd].spec_e & FD_EV_STATUS_R) == FD_EV_ACTIVE_R)
209                     fdtab[fd].ev |= FD_POLL_IN;
210 
211                 if ((fdtab[fd].spec_e & FD_EV_STATUS_W) == FD_EV_ACTIVE_W)
212                     fdtab[fd].ev |= FD_POLL_OUT;
213 
214                 if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
215                     fdtab[fd].iocb(fd);
216 
217                 /* we can remove this update entry if it's the last one and is
218                  * unused, otherwise we don't touch anything.
219                  */
220                 if (new_updt == fd_nbupdt && fdtab[fd].spec_e == 0) {
221                     fdtab[fd].updated = 0;
222                     fd_nbupdt--;
223                 }
224             }
225         }
226     }
227 
228     /* the caller will take care of speculative events */
229 }  

上面这段代码就是执行完毕当前活动 fd 的 iocb 之后,发现有若干个新的 fd 生成,通常 发生在接收新建连接的情况。这种情况,haproxy 认为有必要立即执行这些新的 fd 的 iocb 方法。因为通常一旦客户端新建连接的话,都会尽快发送数据的。这么做就不必等到 下次 epoll_wait 返回之后才处理新的 fd,提高了效率。

至此,haproxy epoll 的事件处理机制粗略分析完毕。这里还有一个 speculative events 的逻辑,本文分析中全都跳过了,随后再完善。

HAProxy 研究笔记 -- HTTP请求处理-2-解析

http://blog.chinaunix.net/uid-10167808-id-3819702.html

本文继续分析 1.5-dev17 中接收到 client 数据之后的处理。

haproxy-1.5-dev17 中接收 client 发送的请求数据流程见文档: HTTP请求处理-1-接收

1. haproxy 主循环的处理流程

主循环处理流程见文档 主循环简介

请求数据的解析工作在主循环 process_runnable_tasks() 中执行。

2. 执行 run queue 中的任务

HTTP请求处理-1-接收 中分析到 session 建立之后,一来会将 session 的 task 放入 runqueue,该 task 会 在下一轮遍历可以运行的 task 中出现,并得到执行。二是立即调用 conn_fd_handler 去 接收 client 发送的数据。

数据接收流程结束后(注意,这并不代表接收到了完整的 client 请求,因为也可能暂时 读取不到 client 的数据退出接收),haproxy 调度执行下一轮循环,调用 process_runnable_tasks() 处理所有在 runqueue 中的 task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void process_runnable_tasks(int *next)
{
	...
	eb = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
	while (max_processed--) {
		...
		t = eb32_entry(eb, struct task, rq);
		eb = eb32_next(eb);
		__task_unlink_rq(t);

		t->state |= TASK_RUNNING;
		/* This is an optimisation to help the processor's branch
		 * predictor take this most common call.
		 */
		t->calls++;
		if (likely(t->process == process_session))
			t = process_session(t);
		else
			t = t->process(t);
		...
	}
}

大多数情况下,task 的 proecss 都指向 process_session() 函数。该函数就是负责解析 已接收到的数据,选择 backend server,以及 session 状态的变化等等。

3. session 的处理:process_session()

下面介绍 process_session() 函数的实现。该函数代码比较庞大,超过一千行,这里仅 介绍与 HTTP 请求处理的逻辑,采用代码块的逻辑介绍。

处理 HTTP 请求的逻辑代码集中在 label resync_request 处。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct task *process_session(struct task *t)
{
	...
 resync_request:
	/* Analyse request */
	if (((s->req->flags & ~rqf_last) & CF_MASK_ANALYSER) ||
		((s->req->flags ^ rqf_last) & CF_MASK_STATIC) ||
		s->si[0].state != rq_prod_last ||
		s->si[1].state != rq_cons_last) {
		unsigned int flags = s->req->flags;

		if (s->req->prod->state >= SI_ST_EST) {
			ana_list = ana_back = s->req->analysers;
			while (ana_list && max_loops--) {
				/* 这段代码中逐一的列举出了所有的 analysers 对应的处理函数
				 * 这里不一一列出,等待下文具体分析
				 */
				...
			}
		}
		rq_prod_last = s->si[0].state;
		rq_cons_last = s->si[1].state;
		s->req->flags &= ~CF_WAKE_ONCE;
		rqf_last = s->req->flags;

		if ((s->req->flags ^ flags) & CF_MASK_STATIC)
			goto resync_request;
	}

首先要判断 s->req->prod->state 的状态是否已经完成建连,根据之前的初始化动作, se->req->prod 指向 s->si[0],即标识与 client 端连接的相关信息。正确建连成功之 后,会更改 si 的状态的,具体代码在 session_complete() 中:

1
2
3
4
s->si[0].state     = s->si[0].prev_state = SI_ST_EST;
...
s->req->prod = &s->si[0];
s->req->cons = &s->si[1];

只有 frontend 连接建立成功,才具备处理 client 发送请求数据的基础。上一篇文章中 已经接收到了 client 发送的数据。这里就是需要根据 s->req->analysers 的值,确定 while 循环中哪些函数处理当前的数据。

补充介绍一下 s->req->analysers 的赋值。 同样是在 session_complete 中初始化的

1
2
/* activate default analysers enabled for this listener */
s->req->analysers = l->analysers;

可见,其直接使用 session 所在的 listener 的 analyser。 listener 中该数值的初始化 是在 check_config_validity() 中完成的:

1
		listener->analysers |= curproxy->fe_req_ana;

而归根结蒂还是来源于 listener 所在的 proxy 上的 fe_req_ana, proxy 上的 fe_req_ana 的初始化同样是在 check_config_validity(),且是在给 listener->analysers 赋值之前

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
	if (curproxy->cap & PR_CAP_FE) {
		if (!curproxy->accept)
			curproxy->accept = frontend_accept;

		if (curproxy->tcp_req.inspect_delay ||
			!LIST_ISEMPTY(&curproxy->tcp_req.inspect_rules))
			curproxy->fe_req_ana |= AN_REQ_INSPECT_FE;

		if (curproxy->mode == PR_MODE_HTTP) {
			curproxy->fe_req_ana |= AN_REQ_WAIT_HTTP | AN_REQ_HTTP_PROCESS_FE;
			curproxy->fe_rsp_ana |= AN_RES_WAIT_HTTP | AN_RES_HTTP_PROCESS_FE;
		}

		/* both TCP and HTTP must check switching rules */
		curproxy->fe_req_ana |= AN_REQ_SWITCHING_RULES;
	}

从上面代码可以看出,一个 HTTP 模式的 proxy,至少有三个标记位会被置位: AN_REQ_WAIT_HTTP, AN_REQ_HTTP_PROCESS_FE, AN_REQ_SWITCHING_RULES。也就是说, s->req->analysers 由以上三个标记置位。那么随后处理 HTTP REQ 的循环中,就要经过 这三个标记位对应的 analyser 的处理。

接着回到 resync_request 标签下的那个 while 循环,就是逐个判断 analysers 的设置, 并调用对应的函数处理。需要启用那些 analysers,是和 haproxy 的配置相对应的。本文 使用最简单的配置,下面仅列出配置所用到的几个处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
		while (ana_list && max_loops--) {
			/* Warning! ensure that analysers are always placed in ascending order! */

			if (ana_list & AN_REQ_INSPECT_FE) {
				if (!tcp_inspect_request(s, s->req, AN_REQ_INSPECT_FE))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_INSPECT_FE);
			}
		
			if (ana_list & AN_REQ_WAIT_HTTP) {
				if (!http_wait_for_request(s, s->req, AN_REQ_WAIT_HTTP))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_WAIT_HTTP);
			}

			if (ana_list & AN_REQ_HTTP_PROCESS_FE) {
				if (!http_process_req_common(s, s->req, AN_REQ_HTTP_PROCESS_FE, s->fe))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_HTTP_PROCESS_FE);
			}

			if (ana_list & AN_REQ_SWITCHING_RULES) {
				if (!process_switching_rules(s, s->req, AN_REQ_SWITCHING_RULES))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_SWITCHING_RULES);
			}
			...
		}

analysers 的处理也是有顺序的。其中处理请求的第一个函数是 tcp_inspect_request()。 该函数主要是在于如果配置了这里先介绍 http_wait_for_request() 函数的实现。 顾名思义,该函数主要是配置中启用 inspect_rules 时,会调用到该函数。否则的话, 处理 HTTP Req 的第一个函数就是 http_wait_for_request().

顾名思义,http_wait_for_request() 该函数分析所解析的 HTTP Requset 不一定是一个 完整的请求。上篇文章分析读取 client 请求数据的实现中,已经提到,只要不能从 socket 读到更多的数据,就会结束数据的接收。一个请求完全完全有可能因为一些异常原因,或者 请求长度本身就比较大而被拆分到不同的 IP 报文中,一次 read 系统调用可能只读取到其 中的一部分内容。因此,该函数会同时分析已经接收到的数据,并确认是否已经接收到了 完整的 HTTP 请求。只有接收到了完整的 HTTP 请求,该函数处理完,才会交给下一个 analyser 处理,否则只能结束请求的处理,等待接收跟多的数据,解析出一个完成的 HTTP 请求才行。

4. 解析接收到的 http 请求数据: http_wait_for_request()

以下是 http_wait_for_request() 的简要分析:

1.调用 http_msg_analyzer,解析 s->req->buf 中新读取到的数据。该函数会按照 HTTP 协议, 解析 HTTP request 和 response 的头部数据,并记录到数据结构 struct http_msg 中。

2.如果开启了 debug,并且已经完整的解析了 header,则 header 内容打印出来

3.尚未读取到完整的 request 的处理,分作以下几种情形处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (unlikely(msg->msg_state < HTTP_MSG_BODY)) {
	/*
	 * First, let's catch bad requests.
	 */

解析到 header 内容中有不符合 HTTP 协议的情形 HTTP_MSG_ERROR,应答 400 bad request 处理
req->buf 满了,甚至加入 maxrewrite 的空间仍然不够用,应答 400 bad request
读取错误 CF_READ_ERROR 发生,比如 client 发送 RST 断开连接, 应答 400 bad request
读取超时,client 超时未发送完整的请求,应答 408 Request Timeout
client 主动关闭,发送 FIN 包,实际上是所谓的 half-close,同样应答 400 bad request
如果以上情况都不满足,则意味着还可以继续尝试读取新数据,设置一下超时

	/* just set the request timeout once at the beginning of the request */
	if (!tick_isset(req->analyse_exp)) {
		if ((msg->msg_state == HTTP_MSG_RQBEFORE) &&
			(txn->flags & TX_WAIT_NEXT_RQ) &&
			tick_isset(s->be->timeout.httpka))
			req->analyse_exp = tick_add(now_ms, s->be->timeout.httpka);
		else
			req->analyse_exp = tick_add_ifset(now_ms, s->be->timeout.httpreq);
	}

根据以上代码,在等待 http request 期间,有两种 timeout 可以设置: 当是http 连接 Keep-Alive 时,并且处理完了头一个请求之后,等待第二个请求期间,设置 httpka 的超 时,超过设定时间不发送新的请求,将会超时;否则,将设置 http 的 request timeout。

因此,在不启用 http ka timeout 时,http request 同时承担起 http ka timeout 的 功能。在有 http ka timeout 时,这两者各自作用的时间段没有重叠。

满足该环节的请求都终止处理,不再继续了。

4.2. 处理完整的 http request

这里处理的都是已经解析到完整 http request header 的情况,并且所有 header 都被 索引化了,便于快速查找。根据已经得到的 header 的信息,设置 session 和 txn 的 相关成员,相当于汇总一下 header 的摘要信息,便于随后处理之用。流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
更新 session 和 proxy 的统计计数
删除 http ka timeout 的超时处理。可能在上一个请求处理完之后,设置了 http ka 的 timeout,因为这里已经得到完整的请求,因此需要停止该 timeout 的处理逻辑
确认 METHOD,并设置 session 的标记位 s->flags |= SN_REDIRECTABLE,只有 GET 和 HEAD 请求可以被重定向
检测 URI 是否是配置的要做 monitor 的 URI,是的话,则执行对应 ACL,并设置应答
检测如果开启 log 功能的话,要给 txn->uri 分配内存,用于记录 URI
检测 HTTP version
	将 0.9 版本的升级为 1.0
	1.1 及其以上的版本都当做 1.1 处理
初始化用于标识 Connection header 的标记位
如果启用了 capture header 配置,调用 capture_headers() 记录下对应的 header
处理 Transfer-Encoding/Content-Length 等 header
最后一步,清理 req->analysers 的标记位 AN_REQ_WAIT_HTTP,因为本函数已经成功处理完毕,可以进行下一个 analyser 的处理了。

至此,http_wait_for_request() 的处理已经结束。

5. 其他对 HTTP 请求的处理逻辑

按照我们前面分析的,随后应该还有两个 analyser 要处理,简单介绍一下:

1
2
3
4
5
6
AN_REQ_HTTP_PROCESS_FE 对应的 http_process_req_common()
	对 frontend 中 req 配置的常见处理,比如 block ACLs, filter, reqadd 等
	设置 Connection mode, 主要是 haproxy 到 server 采用什么连接方式,tunnel 或者 按照 transcation 处理的短连接
AN_REQ_SWITCHING_RULES 对应的 process_switching_rules()
	如果配置了选择 backend 的 rules,比如用 use_backend,则查询规则为 session 分配一个 backend
	处理 persist_rules,一旦设置了 force-persist, 则不管 server 是否 down,都要保证 session 分配给 persistence 中记录的 server。

以上两个函数,不再具体分析。待以后需要时再完善。

至此,client 端 http 请求已经完成解析和相关设置,并且给 session 指定了将来选择 server 所属的 backend。

下一篇文章就分析选择 server 的流程。

HAProxy 研究笔记 -- 主循环处理流程

http://blog.chinaunix.net/uid-10167808-id-3807412.html

本文简单介绍 HAProxy 主循环的处理逻辑,版本为 1.5-dev17.

0. 主循环 run_poll_loop

HAproxy 的主循环在 haproxy.c 中的 run_poll_loop() 函数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Runs the polling loop */
void run_poll_loop()
{
	int next;

	tv_update_date(0,1);
	while (1) {
		/* check if we caught some signals and process them */
		signal_process_queue();

		/* Check if we can expire some tasks */
		wake_expired_tasks(&next);

		/* Process a few tasks */
		process_runnable_tasks(&next);

		/* stop when there's nothing left to do */
		if (jobs == 0)
			break;

		/* The poller will ensure it returns around  */
		cur_poller.poll(&cur_poller, next);
		fd_process_spec_events();
	}
}

主循环的结构比较清晰,就是循环的调用几个函数,并在适当的时候结束循环并退出:

1
2
3
4
5
6
1. 处理信号队列
2. 超时任务
3. 处理可运行的任务
4. 检测是否可以结束循环
5. 执行 poll 处理 fd 的 IO 事件
6. 处理可能仍有 IO 事件的 fd

1. signal_process_queue - 处理信号队对列

haproxy 实现了自己的信号处理机制。接受到信号之后,将该信号放到信号队列中。在程序 运行到 signal_process_queue() 时处理所有位于信号队列中的信号。

2. wake_expired_tasks - 唤醒超时任务

haproxy 的顶层处理逻辑是 task,task 上存储着要处理的任务的全部信息。task 的管理 是采用队列方式,同时分为 wait queue 和 run queue。顾名思义,wait queue 是需要等 待一定时间的 task 的集合,而 run queue 则代表需要立即执行的 task 的集合。

该函数就是检查 wait queue 中那些超时的任务,并将其放到 run queue 中。haproxy 在 执行的过程中,会因为一些情况导致需要将当前的任务通过调用 task_queue 等接口放到 wait queue 中。

3. process_runnable_tasks - 处理可运行的任务

处理位于 run queue 中的任务。

前面提到,wake_expired_tasks 可能将一些超时的任务放到 run queue 中。此外,haproxy 执行的过程中,还有可能通过调用 task_wakeup 直接讲某个 task 放到 run queue 中,这代表程序希望该任务下次尽可能快的被执行。

对于 TCP 或者 HTTP 业务流量的处理,该函数最终通过调用 process_session 来完成,包括解析已经接收到的数据, 并执行一系列 load balance 的特性,但不负责从 socket 收发数据。

4. jobs == 0 - 无任务可执行,结束循环

haproxy 中用 jobs 记录当前要处理的任务总数,一个 listener 也会被计算在内。因此, 如果 jobs 为 0 的话,通常意味着 haproxy 要退出了,因为连 listener 都要释放了。 jobs 的数值通常在 process_session 时更新。因此,是否可以退出循环,就放在了所有 任务的 process_session 执行之后。

5. cur_poller.poll() - 执行 poll 处理 fd 的 IO 事件

haproxy 启动阶段,会检测当前系统可以启用那种异步处理的机制,比如 select、poll、 epoll、kqueue 等,并注册对应 poller 的 poll 方法。epoll 的相关函数接口在 ev_epoll.c 中。

这里就是执行已经注册的 poller 的 poll 方法,主要功能就是获取所有活动的 fd,并 调用对应的 handler,完成接受新建连接、数据收发等功能。

6. 处理可能仍有 IO 事件的 fd

poller 的 poll 方法执行时,程序会将某些符合条件以便再次执行 IO 处理的的 fd 放到 fd_spec list[] 中,fd_process_spec_events() 函数会再次执行这些 fd 的 io handler。