kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp

HAProxy 研究笔记 -- 主循环处理流程

http://blog.chinaunix.net/uid-10167808-id-3807412.html

本文简单介绍 HAProxy 主循环的处理逻辑,版本为 1.5-dev17.

0. 主循环 run_poll_loop

HAproxy 的主循环在 haproxy.c 中的 run_poll_loop() 函数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Runs the polling loop */
void run_poll_loop()
{
	int next;

	tv_update_date(0,1);
	while (1) {
		/* check if we caught some signals and process them */
		signal_process_queue();

		/* Check if we can expire some tasks */
		wake_expired_tasks(&next);

		/* Process a few tasks */
		process_runnable_tasks(&next);

		/* stop when there's nothing left to do */
		if (jobs == 0)
			break;

		/* The poller will ensure it returns around  */
		cur_poller.poll(&cur_poller, next);
		fd_process_spec_events();
	}
}

主循环的结构比较清晰,就是循环的调用几个函数,并在适当的时候结束循环并退出:

1
2
3
4
5
6
1. 处理信号队列
2. 超时任务
3. 处理可运行的任务
4. 检测是否可以结束循环
5. 执行 poll 处理 fd 的 IO 事件
6. 处理可能仍有 IO 事件的 fd

1. signal_process_queue - 处理信号队对列

haproxy 实现了自己的信号处理机制。接受到信号之后,将该信号放到信号队列中。在程序 运行到 signal_process_queue() 时处理所有位于信号队列中的信号。

2. wake_expired_tasks - 唤醒超时任务

haproxy 的顶层处理逻辑是 task,task 上存储着要处理的任务的全部信息。task 的管理 是采用队列方式,同时分为 wait queue 和 run queue。顾名思义,wait queue 是需要等 待一定时间的 task 的集合,而 run queue 则代表需要立即执行的 task 的集合。

该函数就是检查 wait queue 中那些超时的任务,并将其放到 run queue 中。haproxy 在 执行的过程中,会因为一些情况导致需要将当前的任务通过调用 task_queue 等接口放到 wait queue 中。

3. process_runnable_tasks - 处理可运行的任务

处理位于 run queue 中的任务。

前面提到,wake_expired_tasks 可能将一些超时的任务放到 run queue 中。此外,haproxy 执行的过程中,还有可能通过调用 task_wakeup 直接讲某个 task 放到 run queue 中,这代表程序希望该任务下次尽可能快的被执行。

对于 TCP 或者 HTTP 业务流量的处理,该函数最终通过调用 process_session 来完成,包括解析已经接收到的数据, 并执行一系列 load balance 的特性,但不负责从 socket 收发数据。

4. jobs == 0 - 无任务可执行,结束循环

haproxy 中用 jobs 记录当前要处理的任务总数,一个 listener 也会被计算在内。因此, 如果 jobs 为 0 的话,通常意味着 haproxy 要退出了,因为连 listener 都要释放了。 jobs 的数值通常在 process_session 时更新。因此,是否可以退出循环,就放在了所有 任务的 process_session 执行之后。

5. cur_poller.poll() - 执行 poll 处理 fd 的 IO 事件

haproxy 启动阶段,会检测当前系统可以启用那种异步处理的机制,比如 select、poll、 epoll、kqueue 等,并注册对应 poller 的 poll 方法。epoll 的相关函数接口在 ev_epoll.c 中。

这里就是执行已经注册的 poller 的 poll 方法,主要功能就是获取所有活动的 fd,并 调用对应的 handler,完成接受新建连接、数据收发等功能。

6. 处理可能仍有 IO 事件的 fd

poller 的 poll 方法执行时,程序会将某些符合条件以便再次执行 IO 处理的的 fd 放到 fd_spec list[] 中,fd_process_spec_events() 函数会再次执行这些 fd 的 io handler。

HAProxy 研究笔记 -- HTTP请求处理-1-接收

http://blog.chinaunix.net/uid-10167808-id-3795082.html

这里继续分析 http req 的处理。当前分析的代码为 1.5-dev17。

1. 初始化 session 数据处理相关的设置

建连的处理基本上就是 _do_poll ->listener_accept ->session_accept ->fronend_accept()

其中 session_accept() 会设置新建 fd 的 io handler

1
2
3
4
5
6
7
8
9
10
11
/* Add the various callbacks. Right now the transport layer is present
 * but not initialized. Also note we need to be careful as the stream
 * int is not initialized yet.
 */
conn_prepare(s->si[0].conn, &sess_conn_cb, l->proto, l->xprt, s);

fdtab[cfd].owner = s->si[0].conn; /*fd 对应的 owner 为 connection 结构*/
fdtab[cfd].iocb = conn_fd_handler;
conn_data_want_recv(s->si[0].conn);
if (conn_xprt_init(s->si[0].conn) < 0)
	goto out_free_task;

IPv4 http 对应的 listener 的 xprt 和proto 分别被初始化为

1
2
l->xprt = &raw_sock;
l->proto = &proto_tcpv4;

conn_prepare() 就是将相关数据收发以及连接处理的函数都赋值到 connection 结构体上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Assigns a connection with the appropriate data, ctrl, transport layers, and owner. */
static inline void conn_assign(struct connection *conn, const struct data_cb *data,
	                           const struct protocol *ctrl, const struct xprt_ops *xprt,
	                           void *owner)
{
	conn->data = data;
	conn->ctrl = ctrl;
	conn->xprt = xprt;
	conn->owner = owner;
}

/* prepares a connection with the appropriate data, ctrl, transport layers, and
 * owner. The transport state and context are set to 0.
 */
static inline void conn_prepare(struct connection *conn, const struct data_cb *data,
	                            const struct protocol *ctrl, const struct xprt_ops *xprt,
	                            void *owner)
{
	conn_assign(conn, data, ctrl, xprt, owner);
	conn->xprt_st = 0;
	conn->xprt_ctx = NULL;
}

经过初始化, session client 端的 connection 结构体初始化完成:

1
2
3
4
conn->data 指向 sess_conn_cb。 后面调用 session_complete() 会被再次赋值
conn->ctrl 指向 l->proto, IPv4 下为 proto_tcpv4
conn->xprt 执向 l->xprt, 不启用 SSL 时为 raw_sock,启用 SSL 时为 ssl_sock
conn->owner 指向 session

接着调用 session_complete 完成建立一个 session 所需要的最后的初始化工作,其中 包含调用 frontend_accept,并将当前 session 对应的 task 放入runqueue 中以待下 次执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
      si_takeover_conn(&s->si[0], l->proto, l->xprt);
      ...
      t->process = l->handler;
      ...
if (p->accept && (ret = p->accept(s)) <= 0) {
	/* Either we had an unrecoverable error (<0) or work is
	 * finished (=0, eg: monitoring), in both situations,
	 * we can release everything and close.
	 */
	goto out_free_rep_buf;
}
...
task_wakeup(t, TASK_WOKEN_INIT);

其中 si_takeover_conn 完成为 si 分配连接的处理函数,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void si_takeover_conn(struct stream_interface *si, const struct protocol *ctrl, const struct xprt_ops *xprt)
{
	si->ops = &si_conn_ops;
	conn_assign(si->conn, &si_conn_cb, ctrl, xprt, si);
}

si_conn_cb 的定义如下:

struct data_cb si_conn_cb = {
	.recv    = si_conn_recv_cb,
	.send    = si_conn_send_cb,
	.wake    = si_conn_wake_cb,
};

因此,si->conn->data 指向了 si_conn_cb。这个结构用在随后的 recv/send 中。

此外,session 所对应的任务 task 在 session_complete 的最后通过调用 task_wakeup() 是在随后的循环中被执行。task 的处理函数初始化为 l->handler 即 process_session().

至此,一个新建 session 的 client fd 的 io 处理函数 conn_fd_handler() 及 session 的处理函数 process_session() 都已经正确初始化好了。

以后基本上就是这两个函数分别负责数据的读取,以及业务的处理。

2. 接收 client 发送的请求数据

epoll 中考虑的新建连接通常会尽可能快的传输数据,因此对于新建的 fd,通常会尽快的 执行 io handler,即调用 conn_fd_handler

是在 ev_epoll.c 中的 _do_poll() 中进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
gettimeofday(&before_poll, NULL);
status = epoll_wait(epoll_fd, epoll_events, global.tune.maxpollevents, wait_time);
tv_update_date(wait_time, status);
measure_idle();

/* process polled events */

for (count = 0; count < status; count++) {
	unsigned int n;
	unsigned int e = epoll_events[count].events;
	fd = epoll_events[count].data.fd;
	...
	/* Save number of updates to detect creation of new FDs. */
	old_updt = fd_nbupdt;
	fdtab[fd].iocb(fd);
	...
	for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
		fd = fd_updt[new_updt - 1];
		...
		if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
			fdtab[fd].iocb(fd);
		...
	}

上面代码中第一处执行 iocb() 的是由 epoll_wait() 返回的 fd 触发的。而第二次的 iocb() 则就是在前面 iocb 的执行过程中新建的 fd,为了提高效率,则直接调用该 fd 的 iocb(),也 就是 conn_fd_handler() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int conn_fd_handler(int fd) 
{
	struct connection *conn = fdtab[fd].owner;
	...
	if ((fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) &&
		conn->xprt &&
		!(conn->flags & (CO_FL_WAIT_RD|CO_FL_WAIT_ROOM|CO_FL_ERROR|CO_FL_HANDSHAKE))) {
		/* force detection of a flag change : it's impossible to have both
		 * CONNECTED and WAIT_CONN so we're certain to trigger a change.
		 */
		flags = CO_FL_WAIT_L4_CONN | CO_FL_CONNECTED;
		conn->data->recv(conn);
	}
	...
}

根据的 session_complete 的初始化,上面代码 conn->data->recv 指向 si_conn_recv_cb()。 该函数就是 haproxy 中负责接收数据的入口函数。相同的,si_conn_send_cb() 就是 haproxy 中负责发送数据的入口函数。

si_conn_recv_cb() 函数简单介绍如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
if (conn->xprt->rcv_pipe &&
	chn->to_forward >= MIN_SPLICE_FORWARD && chn->flags & CF_KERN_SPLICING) {
	...
	ret = conn->xprt->rcv_pipe(conn, chn->pipe, chn->to_forward);
	...
}
...
while (!chn->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_RD | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {

	...
	ret = conn->xprt->rcv_buf(conn, chn->buf, max);
	...
}

该函数主要根据数据的接收情况,选择调用 xprt 的 rcv_pipe 还是 rcv_buf. 前面已经 分析过, conn->xprt 指向了 listner 的 xprt,不启用 SSL 就是 raw_sock 数据结构

因此,数据的接收最终是通过调用 raw_sock 的 raw_sock_to_pipe 或/和 raw_sock_to_buf 完成的。

HAProxy 研究笔记 -- rules 实现

http://blog.chinaunix.net/uid-10167808-id-3775567.html

本文研究 haproxy-1.5-dev17 中 rules 的相关实现。

1
2
3
4
1. ACL
2. rule 的组成
3. rule 的执行
4. rule 的种类

1. ACL

如果要实现功能丰富的 rules,需要有配套的 ACL 机制。

ACL 的格式如下:

1
acl [flags] [operator] ... 

haproxy 中 ACL 数据结构的定义:

1
2
3
4
5
6
7
8
/* The acl will be linked to from the proxy where it is declared */
struct acl {
	struct list list;           /* chaining */
	char *name;           /* acl name */
	struct list expr;     /* list of acl_exprs */
	int cache_idx;              /* ACL index in cache */
	unsigned int requires;      /* or'ed bit mask of all acl_expr's ACL_USE_* */
};

其中:

1
2
3
4
5
6
7
8
9
10
11
12
13
name: ACL 的名字
expr: ACL 定义的表达式。就是定义的 ACL 名字后面的表达式。这是一个链表结构。因此,可以定义多条表达式不同但是名字相同的 ACL。这样,多个表达式都属于同一个 ACL。
requires: 所有表达式中关键字对应的作用域(该关键字可以用在什么场合)的集合

函数 parse_acl() 负责解析定义好的 ACL:

查找 ACL 的名字,如果不存在的话,则 alloc 一个新的 acl 结构
通过调用 parse_acl_expr() 对表达式进行解析,并返回 struct acl_expr 结构
	ACL 中的表达式应该只有一个 kw
	查找该关键字,必须是已经注册好的。并返回该关键字注册时的数据结构 struct acl_expr
	alloc 一个 struct acl_expr 结构体,记录下返回的 kw 的数据结构,并作成员的初始化
	调用对应 kw 的 parse 方法,将解析的结果保存在 struct acl_pattern 结构体中,并将该结构体加入到 expr->patterns 的链表中
将解析到的表达式插入到 acl 中的 expr 链表中

总结: 一个 ACL 包含一到多个表达式。每个表达式包含一个 kw及一到多个 pattern。

2. rule 的组成

这里简要描述 rule 与 acl 之间的逻辑关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
rule 应该是 action + condition 组成
	有些动作自身可能也需要记录一些信息。不同的 rule 对应动作的信息可能不同,比如 reqirep 等
	block rules 的动作比较单一, condition 满足之后处理结果均相同
condition,完成 rule 检测的判断条件 对应数据结构: struct acl_cond

		struct acl_cond {
			struct list list;           /* Some specific tests may use multiple conditions */
			struct list suites;         /* list of acl_term_suites */
			int pol;                    /* polarity: ACL_COND_IF / ACL_COND_UNLESS */
			unsigned int requires;      /* or'ed bit mask of all acl's ACL_USE_* */
			const char *file;           /* config file where the condition is declared */
			int line;                   /* line in the config file where the condition is declared */
		};
		

condition 包含多个 ACL 组。组的分割逻辑是逻辑或(|| 或者 or),即 struct list suites 的成员,组的数据结构 struct acl_term_suite

	struct acl_term_suite {
		struct list list;           /* chaining of term suites */
		struct list terms;          /* list of acl_terms */
	};

	该数据结构可以包含多个 ACL,以及每个 ACL 可能的一个取反标识 '!'
	所有表达式中相邻的 ACL 且其逻辑关系为逻辑与(&&) 的构成一个 ACL 组
		比如 if acl1 !acl2 or acl3 acl4,则构成两个 acl_term_suite,分别是 acl1 !acl2 和 acl3 acl4
		每个 ACL 及其可能的取反标记对应的数据结构: struct acl_term

			struct acl_term {
				struct list list;           /* chaining */
				struct acl *acl;            /* acl pointed to by this term */
				int neg;                    /* 1 if the ACL result must be negated */
			};

	一个 ACL 包含多个 expr

3. rule 的执行

概括起来很简单,执行判断条件。符合条件,然后执行对应动作。

下面是 rspadd 的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* add response headers from the rule sets in the same order */
list_for_each_entry(wl, &rule_set->rsp_add, list) {
	if (txn->status < 200)
		break;
	if (wl->cond) {
		int ret = acl_exec_cond(wl->cond, px, t, txn, SMP_OPT_DIR_RES|SMP_OPT_FINAL);
		ret = acl_pass(ret);
		if (((struct acl_cond *)wl->cond)->pol == ACL_COND_UNLESS)
			ret = !ret;
		if (!ret)
			continue;
	}
	if (unlikely(http_header_add_tail(&txn->rsp, &txn->hdr_idx, wl->s) < 0))
		goto return_bad_resp;
}

对于同一个种类的 rules,执行逻辑如下:

1
2
3
4
5
6
主要遍历 rule,调用 acl_exec_cond 执行该 rule 的检测条件。该检测结果只给出匹配与否。
	逐个遍历 cond 上的 ACL 组,即cond->suites。任一 suite 匹配成功,则认为匹配成功
	同一个 ACL 组上,遍历所有 suite->terms (ACL + 取反逻辑)。任意一个 ACL 匹配失败,则跳到下一个 ACL 组继续匹配。同一组全部 ACL 匹配成功,则认为该 ACL 组匹配
		同一个 ACL 上的匹配,则是逐一遍历 ACL 的 expr。只要任意一个 expr 匹配成功,则认为该 ACL 匹配成功
结合 condition 中的条件 if/unless,确定最终匹配结果
如果匹配则执行对应的 action,否则检测下一条规则。

4. rule 的种类

从 proxy 结构体可以看出 rule 的种类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct proxy {
	...
	struct list acl;                        /* ACL declared on this proxy */
	struct list http_req_rules;       /* HTTP request rules: allow/deny/http-auth */
	struct list block_cond;                 /* early blocking conditions (chained) */
	struct list redirect_rules;             /* content redirecting rules (chained) */
	struct list switching_rules;            /* content switching rules (chained) */
	struct list persist_rules;        /* 'force-persist' and 'ignore-persist' rules (chained) */
	struct list sticking_rules;             /* content sticking rules (chained) */
	struct list storersp_rules;             /* content store response rules (chained) */
	struct list server_rules;               /* server switching rules (chained) */
	struct {                                /* TCP request processing */
		unsigned int inspect_delay;     /* inspection delay */
		struct list inspect_rules;      /* inspection rules */
		struct list l4_rules;           /* layer4 rules */
	} tcp_req;
	struct {                                /* TCP request processing */
		unsigned int inspect_delay;     /* inspection delay */
		struct list inspect_rules;      /* inspection rules */
	} tcp_rep;
	...
}

其中, 函数 http_process_req_common 中处理的规则如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
http_process_req_common
{
	... 
	1. process block rules
	...
	2. process http req rules
	...
	3. execute regular exp if any
	...
	4. req add
	...
	5. process redirect rules
	...
}

这里没有详细的介绍各种具体用途的 rules。随后具体分析代码的时候总结一下再加上。