kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

HAProxy 研究笔记 -- HTTP请求处理-2-解析

http://blog.chinaunix.net/uid-10167808-id-3819702.html

本文继续分析 1.5-dev17 中接收到 client 数据之后的处理。

haproxy-1.5-dev17 中接收 client 发送的请求数据流程见文档: HTTP请求处理-1-接收

1. haproxy 主循环的处理流程

主循环处理流程见文档 主循环简介

请求数据的解析工作在主循环 process_runnable_tasks() 中执行。

2. 执行 run queue 中的任务

HTTP请求处理-1-接收 中分析到 session 建立之后,一来会将 session 的 task 放入 runqueue,该 task 会 在下一轮遍历可以运行的 task 中出现,并得到执行。二是立即调用 conn_fd_handler 去 接收 client 发送的数据。

数据接收流程结束后(注意,这并不代表接收到了完整的 client 请求,因为也可能暂时 读取不到 client 的数据退出接收),haproxy 调度执行下一轮循环,调用 process_runnable_tasks() 处理所有在 runqueue 中的 task:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void process_runnable_tasks(int *next)
{
	...
	eb = eb32_lookup_ge(&rqueue, rqueue_ticks - TIMER_LOOK_BACK);
	while (max_processed--) {
		...
		t = eb32_entry(eb, struct task, rq);
		eb = eb32_next(eb);
		__task_unlink_rq(t);

		t->state |= TASK_RUNNING;
		/* This is an optimisation to help the processor's branch
		 * predictor take this most common call.
		 */
		t->calls++;
		if (likely(t->process == process_session))
			t = process_session(t);
		else
			t = t->process(t);
		...
	}
}

大多数情况下,task 的 proecss 都指向 process_session() 函数。该函数就是负责解析 已接收到的数据,选择 backend server,以及 session 状态的变化等等。

3. session 的处理:process_session()

下面介绍 process_session() 函数的实现。该函数代码比较庞大,超过一千行,这里仅 介绍与 HTTP 请求处理的逻辑,采用代码块的逻辑介绍。

处理 HTTP 请求的逻辑代码集中在 label resync_request 处。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct task *process_session(struct task *t)
{
	...
 resync_request:
	/* Analyse request */
	if (((s->req->flags & ~rqf_last) & CF_MASK_ANALYSER) ||
		((s->req->flags ^ rqf_last) & CF_MASK_STATIC) ||
		s->si[0].state != rq_prod_last ||
		s->si[1].state != rq_cons_last) {
		unsigned int flags = s->req->flags;

		if (s->req->prod->state >= SI_ST_EST) {
			ana_list = ana_back = s->req->analysers;
			while (ana_list && max_loops--) {
				/* 这段代码中逐一的列举出了所有的 analysers 对应的处理函数
				 * 这里不一一列出,等待下文具体分析
				 */
				...
			}
		}
		rq_prod_last = s->si[0].state;
		rq_cons_last = s->si[1].state;
		s->req->flags &= ~CF_WAKE_ONCE;
		rqf_last = s->req->flags;

		if ((s->req->flags ^ flags) & CF_MASK_STATIC)
			goto resync_request;
	}

首先要判断 s->req->prod->state 的状态是否已经完成建连,根据之前的初始化动作, se->req->prod 指向 s->si[0],即标识与 client 端连接的相关信息。正确建连成功之 后,会更改 si 的状态的,具体代码在 session_complete() 中:

1
2
3
4
s->si[0].state     = s->si[0].prev_state = SI_ST_EST;
...
s->req->prod = &s->si[0];
s->req->cons = &s->si[1];

只有 frontend 连接建立成功,才具备处理 client 发送请求数据的基础。上一篇文章中 已经接收到了 client 发送的数据。这里就是需要根据 s->req->analysers 的值,确定 while 循环中哪些函数处理当前的数据。

补充介绍一下 s->req->analysers 的赋值。 同样是在 session_complete 中初始化的

1
2
/* activate default analysers enabled for this listener */
s->req->analysers = l->analysers;

可见,其直接使用 session 所在的 listener 的 analyser。 listener 中该数值的初始化 是在 check_config_validity() 中完成的:

1
		listener->analysers |= curproxy->fe_req_ana;

而归根结蒂还是来源于 listener 所在的 proxy 上的 fe_req_ana, proxy 上的 fe_req_ana 的初始化同样是在 check_config_validity(),且是在给 listener->analysers 赋值之前

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
	if (curproxy->cap & PR_CAP_FE) {
		if (!curproxy->accept)
			curproxy->accept = frontend_accept;

		if (curproxy->tcp_req.inspect_delay ||
			!LIST_ISEMPTY(&curproxy->tcp_req.inspect_rules))
			curproxy->fe_req_ana |= AN_REQ_INSPECT_FE;

		if (curproxy->mode == PR_MODE_HTTP) {
			curproxy->fe_req_ana |= AN_REQ_WAIT_HTTP | AN_REQ_HTTP_PROCESS_FE;
			curproxy->fe_rsp_ana |= AN_RES_WAIT_HTTP | AN_RES_HTTP_PROCESS_FE;
		}

		/* both TCP and HTTP must check switching rules */
		curproxy->fe_req_ana |= AN_REQ_SWITCHING_RULES;
	}

从上面代码可以看出,一个 HTTP 模式的 proxy,至少有三个标记位会被置位: AN_REQ_WAIT_HTTP, AN_REQ_HTTP_PROCESS_FE, AN_REQ_SWITCHING_RULES。也就是说, s->req->analysers 由以上三个标记置位。那么随后处理 HTTP REQ 的循环中,就要经过 这三个标记位对应的 analyser 的处理。

接着回到 resync_request 标签下的那个 while 循环,就是逐个判断 analysers 的设置, 并调用对应的函数处理。需要启用那些 analysers,是和 haproxy 的配置相对应的。本文 使用最简单的配置,下面仅列出配置所用到的几个处理函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
		while (ana_list && max_loops--) {
			/* Warning! ensure that analysers are always placed in ascending order! */

			if (ana_list & AN_REQ_INSPECT_FE) {
				if (!tcp_inspect_request(s, s->req, AN_REQ_INSPECT_FE))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_INSPECT_FE);
			}
		
			if (ana_list & AN_REQ_WAIT_HTTP) {
				if (!http_wait_for_request(s, s->req, AN_REQ_WAIT_HTTP))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_WAIT_HTTP);
			}

			if (ana_list & AN_REQ_HTTP_PROCESS_FE) {
				if (!http_process_req_common(s, s->req, AN_REQ_HTTP_PROCESS_FE, s->fe))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_HTTP_PROCESS_FE);
			}

			if (ana_list & AN_REQ_SWITCHING_RULES) {
				if (!process_switching_rules(s, s->req, AN_REQ_SWITCHING_RULES))
					break;
				UPDATE_ANALYSERS(s->req->analysers, ana_list, ana_back, AN_REQ_SWITCHING_RULES);
			}
			...
		}

analysers 的处理也是有顺序的。其中处理请求的第一个函数是 tcp_inspect_request()。 该函数主要是在于如果配置了这里先介绍 http_wait_for_request() 函数的实现。 顾名思义,该函数主要是配置中启用 inspect_rules 时,会调用到该函数。否则的话, 处理 HTTP Req 的第一个函数就是 http_wait_for_request().

顾名思义,http_wait_for_request() 该函数分析所解析的 HTTP Requset 不一定是一个 完整的请求。上篇文章分析读取 client 请求数据的实现中,已经提到,只要不能从 socket 读到更多的数据,就会结束数据的接收。一个请求完全完全有可能因为一些异常原因,或者 请求长度本身就比较大而被拆分到不同的 IP 报文中,一次 read 系统调用可能只读取到其 中的一部分内容。因此,该函数会同时分析已经接收到的数据,并确认是否已经接收到了 完整的 HTTP 请求。只有接收到了完整的 HTTP 请求,该函数处理完,才会交给下一个 analyser 处理,否则只能结束请求的处理,等待接收跟多的数据,解析出一个完成的 HTTP 请求才行。

4. 解析接收到的 http 请求数据: http_wait_for_request()

以下是 http_wait_for_request() 的简要分析:

1.调用 http_msg_analyzer,解析 s->req->buf 中新读取到的数据。该函数会按照 HTTP 协议, 解析 HTTP request 和 response 的头部数据,并记录到数据结构 struct http_msg 中。

2.如果开启了 debug,并且已经完整的解析了 header,则 header 内容打印出来

3.尚未读取到完整的 request 的处理,分作以下几种情形处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (unlikely(msg->msg_state < HTTP_MSG_BODY)) {
	/*
	 * First, let's catch bad requests.
	 */

解析到 header 内容中有不符合 HTTP 协议的情形 HTTP_MSG_ERROR,应答 400 bad request 处理
req->buf 满了,甚至加入 maxrewrite 的空间仍然不够用,应答 400 bad request
读取错误 CF_READ_ERROR 发生,比如 client 发送 RST 断开连接, 应答 400 bad request
读取超时,client 超时未发送完整的请求,应答 408 Request Timeout
client 主动关闭,发送 FIN 包,实际上是所谓的 half-close,同样应答 400 bad request
如果以上情况都不满足,则意味着还可以继续尝试读取新数据,设置一下超时

	/* just set the request timeout once at the beginning of the request */
	if (!tick_isset(req->analyse_exp)) {
		if ((msg->msg_state == HTTP_MSG_RQBEFORE) &&
			(txn->flags & TX_WAIT_NEXT_RQ) &&
			tick_isset(s->be->timeout.httpka))
			req->analyse_exp = tick_add(now_ms, s->be->timeout.httpka);
		else
			req->analyse_exp = tick_add_ifset(now_ms, s->be->timeout.httpreq);
	}

根据以上代码,在等待 http request 期间,有两种 timeout 可以设置: 当是http 连接 Keep-Alive 时,并且处理完了头一个请求之后,等待第二个请求期间,设置 httpka 的超 时,超过设定时间不发送新的请求,将会超时;否则,将设置 http 的 request timeout。

因此,在不启用 http ka timeout 时,http request 同时承担起 http ka timeout 的 功能。在有 http ka timeout 时,这两者各自作用的时间段没有重叠。

满足该环节的请求都终止处理,不再继续了。

4.2. 处理完整的 http request

这里处理的都是已经解析到完整 http request header 的情况,并且所有 header 都被 索引化了,便于快速查找。根据已经得到的 header 的信息,设置 session 和 txn 的 相关成员,相当于汇总一下 header 的摘要信息,便于随后处理之用。流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
更新 session 和 proxy 的统计计数
删除 http ka timeout 的超时处理。可能在上一个请求处理完之后,设置了 http ka 的 timeout,因为这里已经得到完整的请求,因此需要停止该 timeout 的处理逻辑
确认 METHOD,并设置 session 的标记位 s->flags |= SN_REDIRECTABLE,只有 GET 和 HEAD 请求可以被重定向
检测 URI 是否是配置的要做 monitor 的 URI,是的话,则执行对应 ACL,并设置应答
检测如果开启 log 功能的话,要给 txn->uri 分配内存,用于记录 URI
检测 HTTP version
	将 0.9 版本的升级为 1.0
	1.1 及其以上的版本都当做 1.1 处理
初始化用于标识 Connection header 的标记位
如果启用了 capture header 配置,调用 capture_headers() 记录下对应的 header
处理 Transfer-Encoding/Content-Length 等 header
最后一步,清理 req->analysers 的标记位 AN_REQ_WAIT_HTTP,因为本函数已经成功处理完毕,可以进行下一个 analyser 的处理了。

至此,http_wait_for_request() 的处理已经结束。

5. 其他对 HTTP 请求的处理逻辑

按照我们前面分析的,随后应该还有两个 analyser 要处理,简单介绍一下:

1
2
3
4
5
6
AN_REQ_HTTP_PROCESS_FE 对应的 http_process_req_common()
	对 frontend 中 req 配置的常见处理,比如 block ACLs, filter, reqadd 等
	设置 Connection mode, 主要是 haproxy 到 server 采用什么连接方式,tunnel 或者 按照 transcation 处理的短连接
AN_REQ_SWITCHING_RULES 对应的 process_switching_rules()
	如果配置了选择 backend 的 rules,比如用 use_backend,则查询规则为 session 分配一个 backend
	处理 persist_rules,一旦设置了 force-persist, 则不管 server 是否 down,都要保证 session 分配给 persistence 中记录的 server。

以上两个函数,不再具体分析。待以后需要时再完善。

至此,client 端 http 请求已经完成解析和相关设置,并且给 session 指定了将来选择 server 所属的 backend。

下一篇文章就分析选择 server 的流程。

HAProxy 研究笔记 -- 主循环处理流程

http://blog.chinaunix.net/uid-10167808-id-3807412.html

本文简单介绍 HAProxy 主循环的处理逻辑,版本为 1.5-dev17.

0. 主循环 run_poll_loop

HAproxy 的主循环在 haproxy.c 中的 run_poll_loop() 函数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Runs the polling loop */
void run_poll_loop()
{
	int next;

	tv_update_date(0,1);
	while (1) {
		/* check if we caught some signals and process them */
		signal_process_queue();

		/* Check if we can expire some tasks */
		wake_expired_tasks(&next);

		/* Process a few tasks */
		process_runnable_tasks(&next);

		/* stop when there's nothing left to do */
		if (jobs == 0)
			break;

		/* The poller will ensure it returns around  */
		cur_poller.poll(&cur_poller, next);
		fd_process_spec_events();
	}
}

主循环的结构比较清晰,就是循环的调用几个函数,并在适当的时候结束循环并退出:

1
2
3
4
5
6
1. 处理信号队列
2. 超时任务
3. 处理可运行的任务
4. 检测是否可以结束循环
5. 执行 poll 处理 fd 的 IO 事件
6. 处理可能仍有 IO 事件的 fd

1. signal_process_queue - 处理信号队对列

haproxy 实现了自己的信号处理机制。接受到信号之后,将该信号放到信号队列中。在程序 运行到 signal_process_queue() 时处理所有位于信号队列中的信号。

2. wake_expired_tasks - 唤醒超时任务

haproxy 的顶层处理逻辑是 task,task 上存储着要处理的任务的全部信息。task 的管理 是采用队列方式,同时分为 wait queue 和 run queue。顾名思义,wait queue 是需要等 待一定时间的 task 的集合,而 run queue 则代表需要立即执行的 task 的集合。

该函数就是检查 wait queue 中那些超时的任务,并将其放到 run queue 中。haproxy 在 执行的过程中,会因为一些情况导致需要将当前的任务通过调用 task_queue 等接口放到 wait queue 中。

3. process_runnable_tasks - 处理可运行的任务

处理位于 run queue 中的任务。

前面提到,wake_expired_tasks 可能将一些超时的任务放到 run queue 中。此外,haproxy 执行的过程中,还有可能通过调用 task_wakeup 直接讲某个 task 放到 run queue 中,这代表程序希望该任务下次尽可能快的被执行。

对于 TCP 或者 HTTP 业务流量的处理,该函数最终通过调用 process_session 来完成,包括解析已经接收到的数据, 并执行一系列 load balance 的特性,但不负责从 socket 收发数据。

4. jobs == 0 - 无任务可执行,结束循环

haproxy 中用 jobs 记录当前要处理的任务总数,一个 listener 也会被计算在内。因此, 如果 jobs 为 0 的话,通常意味着 haproxy 要退出了,因为连 listener 都要释放了。 jobs 的数值通常在 process_session 时更新。因此,是否可以退出循环,就放在了所有 任务的 process_session 执行之后。

5. cur_poller.poll() - 执行 poll 处理 fd 的 IO 事件

haproxy 启动阶段,会检测当前系统可以启用那种异步处理的机制,比如 select、poll、 epoll、kqueue 等,并注册对应 poller 的 poll 方法。epoll 的相关函数接口在 ev_epoll.c 中。

这里就是执行已经注册的 poller 的 poll 方法,主要功能就是获取所有活动的 fd,并 调用对应的 handler,完成接受新建连接、数据收发等功能。

6. 处理可能仍有 IO 事件的 fd

poller 的 poll 方法执行时,程序会将某些符合条件以便再次执行 IO 处理的的 fd 放到 fd_spec list[] 中,fd_process_spec_events() 函数会再次执行这些 fd 的 io handler。

HAProxy 研究笔记 -- HTTP请求处理-1-接收

http://blog.chinaunix.net/uid-10167808-id-3795082.html

这里继续分析 http req 的处理。当前分析的代码为 1.5-dev17。

1. 初始化 session 数据处理相关的设置

建连的处理基本上就是 _do_poll ->listener_accept ->session_accept ->fronend_accept()

其中 session_accept() 会设置新建 fd 的 io handler

1
2
3
4
5
6
7
8
9
10
11
/* Add the various callbacks. Right now the transport layer is present
 * but not initialized. Also note we need to be careful as the stream
 * int is not initialized yet.
 */
conn_prepare(s->si[0].conn, &sess_conn_cb, l->proto, l->xprt, s);

fdtab[cfd].owner = s->si[0].conn; /*fd 对应的 owner 为 connection 结构*/
fdtab[cfd].iocb = conn_fd_handler;
conn_data_want_recv(s->si[0].conn);
if (conn_xprt_init(s->si[0].conn) < 0)
	goto out_free_task;

IPv4 http 对应的 listener 的 xprt 和proto 分别被初始化为

1
2
l->xprt = &raw_sock;
l->proto = &proto_tcpv4;

conn_prepare() 就是将相关数据收发以及连接处理的函数都赋值到 connection 结构体上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Assigns a connection with the appropriate data, ctrl, transport layers, and owner. */
static inline void conn_assign(struct connection *conn, const struct data_cb *data,
	                           const struct protocol *ctrl, const struct xprt_ops *xprt,
	                           void *owner)
{
	conn->data = data;
	conn->ctrl = ctrl;
	conn->xprt = xprt;
	conn->owner = owner;
}

/* prepares a connection with the appropriate data, ctrl, transport layers, and
 * owner. The transport state and context are set to 0.
 */
static inline void conn_prepare(struct connection *conn, const struct data_cb *data,
	                            const struct protocol *ctrl, const struct xprt_ops *xprt,
	                            void *owner)
{
	conn_assign(conn, data, ctrl, xprt, owner);
	conn->xprt_st = 0;
	conn->xprt_ctx = NULL;
}

经过初始化, session client 端的 connection 结构体初始化完成:

1
2
3
4
conn->data 指向 sess_conn_cb。 后面调用 session_complete() 会被再次赋值
conn->ctrl 指向 l->proto, IPv4 下为 proto_tcpv4
conn->xprt 执向 l->xprt, 不启用 SSL 时为 raw_sock,启用 SSL 时为 ssl_sock
conn->owner 指向 session

接着调用 session_complete 完成建立一个 session 所需要的最后的初始化工作,其中 包含调用 frontend_accept,并将当前 session 对应的 task 放入runqueue 中以待下 次执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
      si_takeover_conn(&s->si[0], l->proto, l->xprt);
      ...
      t->process = l->handler;
      ...
if (p->accept && (ret = p->accept(s)) <= 0) {
	/* Either we had an unrecoverable error (<0) or work is
	 * finished (=0, eg: monitoring), in both situations,
	 * we can release everything and close.
	 */
	goto out_free_rep_buf;
}
...
task_wakeup(t, TASK_WOKEN_INIT);

其中 si_takeover_conn 完成为 si 分配连接的处理函数,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void si_takeover_conn(struct stream_interface *si, const struct protocol *ctrl, const struct xprt_ops *xprt)
{
	si->ops = &si_conn_ops;
	conn_assign(si->conn, &si_conn_cb, ctrl, xprt, si);
}

si_conn_cb 的定义如下:

struct data_cb si_conn_cb = {
	.recv    = si_conn_recv_cb,
	.send    = si_conn_send_cb,
	.wake    = si_conn_wake_cb,
};

因此,si->conn->data 指向了 si_conn_cb。这个结构用在随后的 recv/send 中。

此外,session 所对应的任务 task 在 session_complete 的最后通过调用 task_wakeup() 是在随后的循环中被执行。task 的处理函数初始化为 l->handler 即 process_session().

至此,一个新建 session 的 client fd 的 io 处理函数 conn_fd_handler() 及 session 的处理函数 process_session() 都已经正确初始化好了。

以后基本上就是这两个函数分别负责数据的读取,以及业务的处理。

2. 接收 client 发送的请求数据

epoll 中考虑的新建连接通常会尽可能快的传输数据,因此对于新建的 fd,通常会尽快的 执行 io handler,即调用 conn_fd_handler

是在 ev_epoll.c 中的 _do_poll() 中进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
gettimeofday(&before_poll, NULL);
status = epoll_wait(epoll_fd, epoll_events, global.tune.maxpollevents, wait_time);
tv_update_date(wait_time, status);
measure_idle();

/* process polled events */

for (count = 0; count < status; count++) {
	unsigned int n;
	unsigned int e = epoll_events[count].events;
	fd = epoll_events[count].data.fd;
	...
	/* Save number of updates to detect creation of new FDs. */
	old_updt = fd_nbupdt;
	fdtab[fd].iocb(fd);
	...
	for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
		fd = fd_updt[new_updt - 1];
		...
		if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
			fdtab[fd].iocb(fd);
		...
	}

上面代码中第一处执行 iocb() 的是由 epoll_wait() 返回的 fd 触发的。而第二次的 iocb() 则就是在前面 iocb 的执行过程中新建的 fd,为了提高效率,则直接调用该 fd 的 iocb(),也 就是 conn_fd_handler() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int conn_fd_handler(int fd) 
{
	struct connection *conn = fdtab[fd].owner;
	...
	if ((fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) &&
		conn->xprt &&
		!(conn->flags & (CO_FL_WAIT_RD|CO_FL_WAIT_ROOM|CO_FL_ERROR|CO_FL_HANDSHAKE))) {
		/* force detection of a flag change : it's impossible to have both
		 * CONNECTED and WAIT_CONN so we're certain to trigger a change.
		 */
		flags = CO_FL_WAIT_L4_CONN | CO_FL_CONNECTED;
		conn->data->recv(conn);
	}
	...
}

根据的 session_complete 的初始化,上面代码 conn->data->recv 指向 si_conn_recv_cb()。 该函数就是 haproxy 中负责接收数据的入口函数。相同的,si_conn_send_cb() 就是 haproxy 中负责发送数据的入口函数。

si_conn_recv_cb() 函数简单介绍如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
if (conn->xprt->rcv_pipe &&
	chn->to_forward >= MIN_SPLICE_FORWARD && chn->flags & CF_KERN_SPLICING) {
	...
	ret = conn->xprt->rcv_pipe(conn, chn->pipe, chn->to_forward);
	...
}
...
while (!chn->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_RD | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {

	...
	ret = conn->xprt->rcv_buf(conn, chn->buf, max);
	...
}

该函数主要根据数据的接收情况,选择调用 xprt 的 rcv_pipe 还是 rcv_buf. 前面已经 分析过, conn->xprt 指向了 listner 的 xprt,不启用 SSL 就是 raw_sock 数据结构

因此,数据的接收最终是通过调用 raw_sock 的 raw_sock_to_pipe 或/和 raw_sock_to_buf 完成的。

HAProxy 研究笔记 -- rules 实现

http://blog.chinaunix.net/uid-10167808-id-3775567.html

本文研究 haproxy-1.5-dev17 中 rules 的相关实现。

1
2
3
4
1. ACL
2. rule 的组成
3. rule 的执行
4. rule 的种类

1. ACL

如果要实现功能丰富的 rules,需要有配套的 ACL 机制。

ACL 的格式如下:

1
acl [flags] [operator] ... 

haproxy 中 ACL 数据结构的定义:

1
2
3
4
5
6
7
8
/* The acl will be linked to from the proxy where it is declared */
struct acl {
	struct list list;           /* chaining */
	char *name;           /* acl name */
	struct list expr;     /* list of acl_exprs */
	int cache_idx;              /* ACL index in cache */
	unsigned int requires;      /* or'ed bit mask of all acl_expr's ACL_USE_* */
};

其中:

1
2
3
4
5
6
7
8
9
10
11
12
13
name: ACL 的名字
expr: ACL 定义的表达式。就是定义的 ACL 名字后面的表达式。这是一个链表结构。因此,可以定义多条表达式不同但是名字相同的 ACL。这样,多个表达式都属于同一个 ACL。
requires: 所有表达式中关键字对应的作用域(该关键字可以用在什么场合)的集合

函数 parse_acl() 负责解析定义好的 ACL:

查找 ACL 的名字,如果不存在的话,则 alloc 一个新的 acl 结构
通过调用 parse_acl_expr() 对表达式进行解析,并返回 struct acl_expr 结构
	ACL 中的表达式应该只有一个 kw
	查找该关键字,必须是已经注册好的。并返回该关键字注册时的数据结构 struct acl_expr
	alloc 一个 struct acl_expr 结构体,记录下返回的 kw 的数据结构,并作成员的初始化
	调用对应 kw 的 parse 方法,将解析的结果保存在 struct acl_pattern 结构体中,并将该结构体加入到 expr->patterns 的链表中
将解析到的表达式插入到 acl 中的 expr 链表中

总结: 一个 ACL 包含一到多个表达式。每个表达式包含一个 kw及一到多个 pattern。

2. rule 的组成

这里简要描述 rule 与 acl 之间的逻辑关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
rule 应该是 action + condition 组成
	有些动作自身可能也需要记录一些信息。不同的 rule 对应动作的信息可能不同,比如 reqirep 等
	block rules 的动作比较单一, condition 满足之后处理结果均相同
condition,完成 rule 检测的判断条件 对应数据结构: struct acl_cond

		struct acl_cond {
			struct list list;           /* Some specific tests may use multiple conditions */
			struct list suites;         /* list of acl_term_suites */
			int pol;                    /* polarity: ACL_COND_IF / ACL_COND_UNLESS */
			unsigned int requires;      /* or'ed bit mask of all acl's ACL_USE_* */
			const char *file;           /* config file where the condition is declared */
			int line;                   /* line in the config file where the condition is declared */
		};
		

condition 包含多个 ACL 组。组的分割逻辑是逻辑或(|| 或者 or),即 struct list suites 的成员,组的数据结构 struct acl_term_suite

	struct acl_term_suite {
		struct list list;           /* chaining of term suites */
		struct list terms;          /* list of acl_terms */
	};

	该数据结构可以包含多个 ACL,以及每个 ACL 可能的一个取反标识 '!'
	所有表达式中相邻的 ACL 且其逻辑关系为逻辑与(&&) 的构成一个 ACL 组
		比如 if acl1 !acl2 or acl3 acl4,则构成两个 acl_term_suite,分别是 acl1 !acl2 和 acl3 acl4
		每个 ACL 及其可能的取反标记对应的数据结构: struct acl_term

			struct acl_term {
				struct list list;           /* chaining */
				struct acl *acl;            /* acl pointed to by this term */
				int neg;                    /* 1 if the ACL result must be negated */
			};

	一个 ACL 包含多个 expr

3. rule 的执行

概括起来很简单,执行判断条件。符合条件,然后执行对应动作。

下面是 rspadd 的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* add response headers from the rule sets in the same order */
list_for_each_entry(wl, &rule_set->rsp_add, list) {
	if (txn->status < 200)
		break;
	if (wl->cond) {
		int ret = acl_exec_cond(wl->cond, px, t, txn, SMP_OPT_DIR_RES|SMP_OPT_FINAL);
		ret = acl_pass(ret);
		if (((struct acl_cond *)wl->cond)->pol == ACL_COND_UNLESS)
			ret = !ret;
		if (!ret)
			continue;
	}
	if (unlikely(http_header_add_tail(&txn->rsp, &txn->hdr_idx, wl->s) < 0))
		goto return_bad_resp;
}

对于同一个种类的 rules,执行逻辑如下:

1
2
3
4
5
6
主要遍历 rule,调用 acl_exec_cond 执行该 rule 的检测条件。该检测结果只给出匹配与否。
	逐个遍历 cond 上的 ACL 组,即cond->suites。任一 suite 匹配成功,则认为匹配成功
	同一个 ACL 组上,遍历所有 suite->terms (ACL + 取反逻辑)。任意一个 ACL 匹配失败,则跳到下一个 ACL 组继续匹配。同一组全部 ACL 匹配成功,则认为该 ACL 组匹配
		同一个 ACL 上的匹配,则是逐一遍历 ACL 的 expr。只要任意一个 expr 匹配成功,则认为该 ACL 匹配成功
结合 condition 中的条件 if/unless,确定最终匹配结果
如果匹配则执行对应的 action,否则检测下一条规则。

4. rule 的种类

从 proxy 结构体可以看出 rule 的种类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct proxy {
	...
	struct list acl;                        /* ACL declared on this proxy */
	struct list http_req_rules;       /* HTTP request rules: allow/deny/http-auth */
	struct list block_cond;                 /* early blocking conditions (chained) */
	struct list redirect_rules;             /* content redirecting rules (chained) */
	struct list switching_rules;            /* content switching rules (chained) */
	struct list persist_rules;        /* 'force-persist' and 'ignore-persist' rules (chained) */
	struct list sticking_rules;             /* content sticking rules (chained) */
	struct list storersp_rules;             /* content store response rules (chained) */
	struct list server_rules;               /* server switching rules (chained) */
	struct {                                /* TCP request processing */
		unsigned int inspect_delay;     /* inspection delay */
		struct list inspect_rules;      /* inspection rules */
		struct list l4_rules;           /* layer4 rules */
	} tcp_req;
	struct {                                /* TCP request processing */
		unsigned int inspect_delay;     /* inspection delay */
		struct list inspect_rules;      /* inspection rules */
	} tcp_rep;
	...
}

其中, 函数 http_process_req_common 中处理的规则如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
http_process_req_common
{
	... 
	1. process block rules
	...
	2. process http req rules
	...
	3. execute regular exp if any
	...
	4. req add
	...
	5. process redirect rules
	...
}

这里没有详细的介绍各种具体用途的 rules。随后具体分析代码的时候总结一下再加上。

HAProxy 研究笔记 -- TCP 连接处理流程

http://blog.chinaunix.net/uid-10167808-id-3771148.html

本文基于 HAProxy 1.5-dev7 版本。

1
2
3
4
5
6
7
8
9
10
11
12
目录
1. 关键数据结构 session
2. 相关初始化
	2.1. 初始化处理 TCP 连接的方法
	2.2. 初始化 listener
	2.3. 绑定所有已注册协议上的 listeners
	2.4. 启用所有已注册协议上的 listeners
3. TCP 连接的处理流程
	3.1. 接受新建连接
	3.2. TCP 连接上的接收事件
	3.3. TCP 连接上的发送事件
	3.4. http 请求的处理

1. 关键数据结构 session

haproxy 负责处理请求的核心数据结构是 struct session,本文不对该数据结构进行分析。

从业务的处理的角度,简单介绍一下对 session 的理解:

1
2
3
haproxy 每接收到 client 的一个连接,便会创建一个 session 结构,该结构一直伴随着连接的处理,直至连接被关闭,session 才会被释放
haproxy 其他的数据结构,大多会通过引用的方式和 session 进行关联
一个业务 session 上会存在两个 TCP 连接,一个是 client 到 haproxy,一个是 haproxy 到后端 server。

此外,一个 session,通常还要对应一个 task,haproxy 最终用来做调度的是通过 task。

2. 相关初始化

在 haproxy 正式处理请求之前,会有一系列初始化动作。这里介绍和请求处理相关的一些初始化。

2.1. 初始化处理 TCP 连接的方法

初始化处理 TCP 协议的相关数据结构,主要是和 socket 相关的方法的声明。详细见下面 proto_tcpv4 (proto_tcp.c)的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static struct protocol proto_tcpv4 = {
	.name = "tcpv4",
	.sock_domain = AF_INET,
	.sock_type = SOCK_STREAM,
	.sock_prot = IPPROTO_TCP,
	.sock_family = AF_INET,
	.sock_addrlen = sizeof(struct sockaddr_in),
	.l3_addrlen = 32/8,
	.accept = &stream_sock_accept,
	.read = &stream_sock_read,
	.write = &stream_sock_write,
	.bind = tcp_bind_listener,
	.bind_all = tcp_bind_listeners,
	.unbind_all = unbind_all_listeners,
	.enable_all = enable_all_listeners,
	.listeners = LIST_HEAD_INIT(proto_tcpv4.listeners),
	.nb_listeners = 0,
};
2.2. 初始化 listener

listener,顾名思义,就是用于负责处理监听相关的逻辑。

在 haproxy 解析 bind 配置的时候赋值给 listener 的 proto 成员。函数调用流程如下:

1
2
3
4
5
cfgparse.c
	-> cfg_parse_listen
		-> str2listener
			-> tcpv4_add_listener
				-> listener->proto = &proto_tcpv4;

由于这里初始化的是 listener 处理 socket 的一些方法。可以推断, haproxy 接收 client 新建连接的入口函数应该是 protocol 结构体中的 accpet 方法。对于tcpv4 来说,就是 stream_sock_accept() 函数。该函数到 1.5-dev19 中改名为 listener_accept()。这是后话,暂且不表。

listener 的其他初始化

1
2
3
4
5
cfgparse.c
	-> check_config_validity
		-> listener->accept = session_accept;
listener->frontend = curproxy; (解析 frontend 时,会执行赋值: curproxy->accept = frontend_accept)
listener->handler = process_session;

整个 haproxy 配置文件解析完毕,listener 也已初始化完毕。可以简单梳理一下几个 accept 方法的设计逻辑:

1
2
3
stream_sock_accept(): 负责接收新建 TCP 连接,并触发 listener 自己的 accept 方法 session_accept()
session_accept(): 负责创建 session,并作 session 成员的初步初始化,并调用 frontend 的 accept 方法 front_accetp()
frontend_accept(): 该函数主要负责 session 前端的 TCP 连接的初始化,包括 socket 设置,log 设置,以及 session 部分成员的初始化

下文分析 TCP 新建连接处理过程,基本上就是这三个函数的分析。

2.3. 绑定所有已注册协议上的 listeners
1
2
3
4
5
6
haproxy.c 
	-> protocol_bind_all 
		-> all registered protocol bind_all
			-> tcp_bind_listeners (TCP)
				-> tcp_bind_listener 
					-> [ fdtab[fd].cb[DIR_RD].f = listener->proto->accept ]

该函数指针指向 proto_tcpv4 结构体的 accept 成员,即函数 stream_sock_accept

2.4. 启用所有已注册协议上的 listeners

把所有 listeners 的 fd 加到 polling lists 中 haproxy.c -> protocol_enable_all -> all registered protocol enable_all -> enable_all_listeners (TCP) -> enable_listener 函数会将处于 LI_LISTEN 的 listener 的状态修改为 LI_READY,并调用 cur poller 的 set 方法, 比如使用 sepoll,就会调用 __fd_set

3. TCP 连接的处理流程

3.1. 接受新建连接

前面几个方面的分析,主要是为了搞清楚当请求到来时,处理过程中实际的函数调用关系。以下分析 TCP 建连过程。

1
2
3
4
5
6
7
8
haproxy.c 
	-> run_poll_loop 
		-> cur_poller.poll 
			-> __do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法) 
				-> fdtab[fd].cb[DIR_RD].f(fd) (TCP 协议的该函数指针指向 stream_sock_accept )
					-> stream_sock_accept
						-> 按照 global.tune.maxaccept 的设置尽量可能多执行系统调用 accept,然后再调用 l->accept(),即 listener 的 accept 方法 session_accept
							-> session_accept

session_accept 主要完成以下功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
调用 pool_alloc2 分配一个 session 结构
调用 task_new 分配一个新任务
将新分配的 session 加入全局 sessions 链表中
session 和 task 的初始化,若干重要成员的初始化如下
	t->process = l->handler: 即 t->process 指向 process_session
	t->context = s: 任务的上下文指向 session
	s->listener = l: session 的 listener 成员指向当前的 listener
	s->si[] 的初始化,记录 accept 系统调用返回的 cfd 等
	初始化 s->txn
	为 s->req 和 s->rep 分别分配内存,并作对应的初始化
		s->req = pool_alloc2(pool2_buffer)
		s->rep = pool_alloc2(pool2_buffer)
		从代码上来看,应该是各自独立分配 tune.bufsize + sizeof struct buffer 大小的内存
	新建连接 cfd 的一些初始化
		cfd 设置为非阻塞
		将 cfd 加入 fdtab[] 中,并注册新建连接 cfg 的 read 和 write 的方法
		fdtab[cfd].cb[DIR_RD].f = l->proto->read,设置 cfd 的 read 的函数 l->proto->read,对应 TCP 为 stream_sock_read,读缓存指向 s->req,
		fdtab[cfd].cb[DIR_WR].f = l->proto->write,设置 cfd 的 write 函数 l->proto->write,对应 TCP 为 stream_sock_write,写缓冲指向 s->rep
p->accept 执行 proxy 的 accept 方法即 frontend_accept
	设置 session 结构体的 log 成员
	根据配置的情况,分别设置新建连接套接字的选项,包括 TCP_NODELAY/KEEPALIVE/LINGER/SNDBUF/RCVBUF 等等
	如果 mode 是 http 的话,将 session 的 txn 成员做相关的设置和初始化
3.2. TCP 连接上的接收事件
1
2
3
4
5
6
haproxy.c 
	-> run_poll_loop 
		-> cur_poller.poll 
			-> __do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法) 
				-> fdtab[fd].cb[DIR_RD].f(fd) (该函数在建连阶段被初始化为四层协议的 read 方法,对于 TCP 协议,为 stream_sock_read )
					-> stream_sock_read

stream_sock_read 主要完成以下功能

找到当前连接的读缓冲,即当前 session 的 req buffer:

1
struct buffer *b = si->ib
1
2
3
4
5
6
7
8
9
根据配置,调用 splice 或者 recv 读取套接字上的数据,并填充到读缓冲中,即填充到从 b->r(初始位置应该就是 b->data)开始的内存中
如果读取到 0 字节,则意味着接收到对端的关闭请求,调用 stream_sock_shutr 进行处理
	读缓冲标记 si->ib->flags 的 BF_SHUTR 置位,清除当前 fd 的 epoll 读事件,不再从该 fd 读取
	如果写缓冲 si->ob->flags 的 BF_SHUTW 已经置位,说明应该是由本地首先发起的关闭连接动作
		将 fd 从 fdset[] 中清除,从 epoll 中移除 fd,执行系统调用 close(fd), fd.state 置位 FD_STCLOSE
		stream interface 的状态修改 si->state = SI_ST_DIS
唤醒任务 task_wakeup,把当前任务加入到 run queue 中。随后检测 runnable tasks 时,就会处理该任务

##### 3.3. TCP 连接上的发送事件
haproxy.c 
    -> run_poll_loop 
        -> cur_poller.poll 
            -> __do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法) 
                -> fdtab[fd].cb[DIR_WR].f(fd) (该函数在建连阶段被初始化为四层协议的 write 方法,对于 TCP 协议,为 stream_sock_write )
                    -> stream_sock_write
1
2
3
4

stream_sock_write主要完成以下功能

  找到当前连接的写缓冲,即当前 session 的 rep buffer:
struct buffer *b = si->ob
1
2
3
4
5

```
  将待发送的数据调用 send 系统调用发送出去  
  或者数据已经发送完毕,需要发送关闭连接的动作 stream_sock_shutw-> 系统调用 shutdown  
  唤醒任务 task_wakeup,把当前任务加入到 run queue 中。随后检测 runnable tasks 时,就会处理该任务  
3.4. http 请求的处理
1
2
3
4
haproxy.c 
	-> run_poll_loop 
		-> process_runnable_tasks,查找当前待处理的任务所有 tasks, 然后调用 task->process(大多时候就是 process_session) 进行处理
			-> process_session

process_session 主要完成以下功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
处理连接需要关闭的情形,分支 resync_stream_interface
处理请求,分支 resync_request (read event)
	根据 s->req->analysers 的标记位,调用不同的 analyser 进行处理请求
	ana_list & AN_REQ_WAIT_HTTP: http_wait_for_request
	ana_list & AN_REQ_HTTP_PROCESS_FE: http_process_req_common
	ana_list & AN_REQ_SWITCHING_RULES:process_switching_rules
处理应答,分支 resync_response (write event)
	根据 s->rep->analysers 的标记位,调用不同的 analyser 进行处理请求
	ana_list & AN_RES_WAIT_HTTP: http_wait_for_response
	ana_list & AN_RES_HTTP_PROCESS_BE:http_process_res_common
处理 forward buffer 的相关动作
关闭 req 和 rep 的 buffer,调用 pool2_free 释放 session 及其申请的相关内存,包括读写缓冲 (read 0 bytes)
	pool_free2(pool2_buffer, s->req);
	pool_free2(pool2_buffer, s->rep);
	pool_free2(pool2_session, s);
task 从运行任务队列中清除,调用 pool2_free 释放 task 申请的内存: task_delete(); task_free();

本文简单分析了 TCP 连接的处理过程,不侧重细节分析,而且缺少后端 server 的选择以及建连等,重在希望展示出一个 haproxy 处理 TCP 连接的框架。