kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

HAProxy 研究笔记 -- 主循环处理流程

http://blog.chinaunix.net/uid-10167808-id-3807412.html

本文简单介绍 HAProxy 主循环的处理逻辑,版本为 1.5-dev17.

0. 主循环 run_poll_loop

HAproxy 的主循环在 haproxy.c 中的 run_poll_loop() 函数,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Runs the polling loop */
void run_poll_loop()
{
	int next;

	tv_update_date(0,1);
	while (1) {
		/* check if we caught some signals and process them */
		signal_process_queue();

		/* Check if we can expire some tasks */
		wake_expired_tasks(&next);

		/* Process a few tasks */
		process_runnable_tasks(&next);

		/* stop when there's nothing left to do */
		if (jobs == 0)
			break;

		/* The poller will ensure it returns around  */
		cur_poller.poll(&cur_poller, next);
		fd_process_spec_events();
	}
}

主循环的结构比较清晰,就是循环的调用几个函数,并在适当的时候结束循环并退出:

1
2
3
4
5
6
1. 处理信号队列
2. 超时任务
3. 处理可运行的任务
4. 检测是否可以结束循环
5. 执行 poll 处理 fd 的 IO 事件
6. 处理可能仍有 IO 事件的 fd

1. signal_process_queue - 处理信号队对列

haproxy 实现了自己的信号处理机制。接受到信号之后,将该信号放到信号队列中。在程序 运行到 signal_process_queue() 时处理所有位于信号队列中的信号。

2. wake_expired_tasks - 唤醒超时任务

haproxy 的顶层处理逻辑是 task,task 上存储着要处理的任务的全部信息。task 的管理 是采用队列方式,同时分为 wait queue 和 run queue。顾名思义,wait queue 是需要等 待一定时间的 task 的集合,而 run queue 则代表需要立即执行的 task 的集合。

该函数就是检查 wait queue 中那些超时的任务,并将其放到 run queue 中。haproxy 在 执行的过程中,会因为一些情况导致需要将当前的任务通过调用 task_queue 等接口放到 wait queue 中。

3. process_runnable_tasks - 处理可运行的任务

处理位于 run queue 中的任务。

前面提到,wake_expired_tasks 可能将一些超时的任务放到 run queue 中。此外,haproxy 执行的过程中,还有可能通过调用 task_wakeup 直接讲某个 task 放到 run queue 中,这代表程序希望该任务下次尽可能快的被执行。

对于 TCP 或者 HTTP 业务流量的处理,该函数最终通过调用 process_session 来完成,包括解析已经接收到的数据, 并执行一系列 load balance 的特性,但不负责从 socket 收发数据。

4. jobs == 0 - 无任务可执行,结束循环

haproxy 中用 jobs 记录当前要处理的任务总数,一个 listener 也会被计算在内。因此, 如果 jobs 为 0 的话,通常意味着 haproxy 要退出了,因为连 listener 都要释放了。 jobs 的数值通常在 process_session 时更新。因此,是否可以退出循环,就放在了所有 任务的 process_session 执行之后。

5. cur_poller.poll() - 执行 poll 处理 fd 的 IO 事件

haproxy 启动阶段,会检测当前系统可以启用那种异步处理的机制,比如 select、poll、 epoll、kqueue 等,并注册对应 poller 的 poll 方法。epoll 的相关函数接口在 ev_epoll.c 中。

这里就是执行已经注册的 poller 的 poll 方法,主要功能就是获取所有活动的 fd,并 调用对应的 handler,完成接受新建连接、数据收发等功能。

6. 处理可能仍有 IO 事件的 fd

poller 的 poll 方法执行时,程序会将某些符合条件以便再次执行 IO 处理的的 fd 放到 fd_spec list[] 中,fd_process_spec_events() 函数会再次执行这些 fd 的 io handler。

HAProxy 研究笔记 -- HTTP请求处理-1-接收

http://blog.chinaunix.net/uid-10167808-id-3795082.html

这里继续分析 http req 的处理。当前分析的代码为 1.5-dev17。

1. 初始化 session 数据处理相关的设置

建连的处理基本上就是 _do_poll ->listener_accept ->session_accept ->fronend_accept()

其中 session_accept() 会设置新建 fd 的 io handler

1
2
3
4
5
6
7
8
9
10
11
/* Add the various callbacks. Right now the transport layer is present
 * but not initialized. Also note we need to be careful as the stream
 * int is not initialized yet.
 */
conn_prepare(s->si[0].conn, &sess_conn_cb, l->proto, l->xprt, s);

fdtab[cfd].owner = s->si[0].conn; /*fd 对应的 owner 为 connection 结构*/
fdtab[cfd].iocb = conn_fd_handler;
conn_data_want_recv(s->si[0].conn);
if (conn_xprt_init(s->si[0].conn) < 0)
	goto out_free_task;

IPv4 http 对应的 listener 的 xprt 和proto 分别被初始化为

1
2
l->xprt = &raw_sock;
l->proto = &proto_tcpv4;

conn_prepare() 就是将相关数据收发以及连接处理的函数都赋值到 connection 结构体上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Assigns a connection with the appropriate data, ctrl, transport layers, and owner. */
static inline void conn_assign(struct connection *conn, const struct data_cb *data,
	                           const struct protocol *ctrl, const struct xprt_ops *xprt,
	                           void *owner)
{
	conn->data = data;
	conn->ctrl = ctrl;
	conn->xprt = xprt;
	conn->owner = owner;
}

/* prepares a connection with the appropriate data, ctrl, transport layers, and
 * owner. The transport state and context are set to 0.
 */
static inline void conn_prepare(struct connection *conn, const struct data_cb *data,
	                            const struct protocol *ctrl, const struct xprt_ops *xprt,
	                            void *owner)
{
	conn_assign(conn, data, ctrl, xprt, owner);
	conn->xprt_st = 0;
	conn->xprt_ctx = NULL;
}

经过初始化, session client 端的 connection 结构体初始化完成:

1
2
3
4
conn->data 指向 sess_conn_cb。 后面调用 session_complete() 会被再次赋值
conn->ctrl 指向 l->proto, IPv4 下为 proto_tcpv4
conn->xprt 执向 l->xprt, 不启用 SSL 时为 raw_sock,启用 SSL 时为 ssl_sock
conn->owner 指向 session

接着调用 session_complete 完成建立一个 session 所需要的最后的初始化工作,其中 包含调用 frontend_accept,并将当前 session 对应的 task 放入runqueue 中以待下 次执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
      si_takeover_conn(&s->si[0], l->proto, l->xprt);
      ...
      t->process = l->handler;
      ...
if (p->accept && (ret = p->accept(s)) <= 0) {
	/* Either we had an unrecoverable error (<0) or work is
	 * finished (=0, eg: monitoring), in both situations,
	 * we can release everything and close.
	 */
	goto out_free_rep_buf;
}
...
task_wakeup(t, TASK_WOKEN_INIT);

其中 si_takeover_conn 完成为 si 分配连接的处理函数,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline void si_takeover_conn(struct stream_interface *si, const struct protocol *ctrl, const struct xprt_ops *xprt)
{
	si->ops = &si_conn_ops;
	conn_assign(si->conn, &si_conn_cb, ctrl, xprt, si);
}

si_conn_cb 的定义如下:

struct data_cb si_conn_cb = {
	.recv    = si_conn_recv_cb,
	.send    = si_conn_send_cb,
	.wake    = si_conn_wake_cb,
};

因此,si->conn->data 指向了 si_conn_cb。这个结构用在随后的 recv/send 中。

此外,session 所对应的任务 task 在 session_complete 的最后通过调用 task_wakeup() 是在随后的循环中被执行。task 的处理函数初始化为 l->handler 即 process_session().

至此,一个新建 session 的 client fd 的 io 处理函数 conn_fd_handler() 及 session 的处理函数 process_session() 都已经正确初始化好了。

以后基本上就是这两个函数分别负责数据的读取,以及业务的处理。

2. 接收 client 发送的请求数据

epoll 中考虑的新建连接通常会尽可能快的传输数据,因此对于新建的 fd,通常会尽快的 执行 io handler,即调用 conn_fd_handler

是在 ev_epoll.c 中的 _do_poll() 中进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
gettimeofday(&before_poll, NULL);
status = epoll_wait(epoll_fd, epoll_events, global.tune.maxpollevents, wait_time);
tv_update_date(wait_time, status);
measure_idle();

/* process polled events */

for (count = 0; count < status; count++) {
	unsigned int n;
	unsigned int e = epoll_events[count].events;
	fd = epoll_events[count].data.fd;
	...
	/* Save number of updates to detect creation of new FDs. */
	old_updt = fd_nbupdt;
	fdtab[fd].iocb(fd);
	...
	for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
		fd = fd_updt[new_updt - 1];
		...
		if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
			fdtab[fd].iocb(fd);
		...
	}

上面代码中第一处执行 iocb() 的是由 epoll_wait() 返回的 fd 触发的。而第二次的 iocb() 则就是在前面 iocb 的执行过程中新建的 fd,为了提高效率,则直接调用该 fd 的 iocb(),也 就是 conn_fd_handler() 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int conn_fd_handler(int fd) 
{
	struct connection *conn = fdtab[fd].owner;
	...
	if ((fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) &&
		conn->xprt &&
		!(conn->flags & (CO_FL_WAIT_RD|CO_FL_WAIT_ROOM|CO_FL_ERROR|CO_FL_HANDSHAKE))) {
		/* force detection of a flag change : it's impossible to have both
		 * CONNECTED and WAIT_CONN so we're certain to trigger a change.
		 */
		flags = CO_FL_WAIT_L4_CONN | CO_FL_CONNECTED;
		conn->data->recv(conn);
	}
	...
}

根据的 session_complete 的初始化,上面代码 conn->data->recv 指向 si_conn_recv_cb()。 该函数就是 haproxy 中负责接收数据的入口函数。相同的,si_conn_send_cb() 就是 haproxy 中负责发送数据的入口函数。

si_conn_recv_cb() 函数简单介绍如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
if (conn->xprt->rcv_pipe &&
	chn->to_forward >= MIN_SPLICE_FORWARD && chn->flags & CF_KERN_SPLICING) {
	...
	ret = conn->xprt->rcv_pipe(conn, chn->pipe, chn->to_forward);
	...
}
...
while (!chn->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_RD | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {

	...
	ret = conn->xprt->rcv_buf(conn, chn->buf, max);
	...
}

该函数主要根据数据的接收情况,选择调用 xprt 的 rcv_pipe 还是 rcv_buf. 前面已经 分析过, conn->xprt 指向了 listner 的 xprt,不启用 SSL 就是 raw_sock 数据结构

因此,数据的接收最终是通过调用 raw_sock 的 raw_sock_to_pipe 或/和 raw_sock_to_buf 完成的。

HAProxy 研究笔记 -- rules 实现

http://blog.chinaunix.net/uid-10167808-id-3775567.html

本文研究 haproxy-1.5-dev17 中 rules 的相关实现。

1
2
3
4
1. ACL
2. rule 的组成
3. rule 的执行
4. rule 的种类

1. ACL

如果要实现功能丰富的 rules,需要有配套的 ACL 机制。

ACL 的格式如下:

1
acl [flags] [operator] ... 

haproxy 中 ACL 数据结构的定义:

1
2
3
4
5
6
7
8
/* The acl will be linked to from the proxy where it is declared */
struct acl {
	struct list list;           /* chaining */
	char *name;           /* acl name */
	struct list expr;     /* list of acl_exprs */
	int cache_idx;              /* ACL index in cache */
	unsigned int requires;      /* or'ed bit mask of all acl_expr's ACL_USE_* */
};

其中:

1
2
3
4
5
6
7
8
9
10
11
12
13
name: ACL 的名字
expr: ACL 定义的表达式。就是定义的 ACL 名字后面的表达式。这是一个链表结构。因此,可以定义多条表达式不同但是名字相同的 ACL。这样,多个表达式都属于同一个 ACL。
requires: 所有表达式中关键字对应的作用域(该关键字可以用在什么场合)的集合

函数 parse_acl() 负责解析定义好的 ACL:

查找 ACL 的名字,如果不存在的话,则 alloc 一个新的 acl 结构
通过调用 parse_acl_expr() 对表达式进行解析,并返回 struct acl_expr 结构
	ACL 中的表达式应该只有一个 kw
	查找该关键字,必须是已经注册好的。并返回该关键字注册时的数据结构 struct acl_expr
	alloc 一个 struct acl_expr 结构体,记录下返回的 kw 的数据结构,并作成员的初始化
	调用对应 kw 的 parse 方法,将解析的结果保存在 struct acl_pattern 结构体中,并将该结构体加入到 expr->patterns 的链表中
将解析到的表达式插入到 acl 中的 expr 链表中

总结: 一个 ACL 包含一到多个表达式。每个表达式包含一个 kw及一到多个 pattern。

2. rule 的组成

这里简要描述 rule 与 acl 之间的逻辑关系:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
rule 应该是 action + condition 组成
	有些动作自身可能也需要记录一些信息。不同的 rule 对应动作的信息可能不同,比如 reqirep 等
	block rules 的动作比较单一, condition 满足之后处理结果均相同
condition,完成 rule 检测的判断条件 对应数据结构: struct acl_cond

		struct acl_cond {
			struct list list;           /* Some specific tests may use multiple conditions */
			struct list suites;         /* list of acl_term_suites */
			int pol;                    /* polarity: ACL_COND_IF / ACL_COND_UNLESS */
			unsigned int requires;      /* or'ed bit mask of all acl's ACL_USE_* */
			const char *file;           /* config file where the condition is declared */
			int line;                   /* line in the config file where the condition is declared */
		};
		

condition 包含多个 ACL 组。组的分割逻辑是逻辑或(|| 或者 or),即 struct list suites 的成员,组的数据结构 struct acl_term_suite

	struct acl_term_suite {
		struct list list;           /* chaining of term suites */
		struct list terms;          /* list of acl_terms */
	};

	该数据结构可以包含多个 ACL,以及每个 ACL 可能的一个取反标识 '!'
	所有表达式中相邻的 ACL 且其逻辑关系为逻辑与(&&) 的构成一个 ACL 组
		比如 if acl1 !acl2 or acl3 acl4,则构成两个 acl_term_suite,分别是 acl1 !acl2 和 acl3 acl4
		每个 ACL 及其可能的取反标记对应的数据结构: struct acl_term

			struct acl_term {
				struct list list;           /* chaining */
				struct acl *acl;            /* acl pointed to by this term */
				int neg;                    /* 1 if the ACL result must be negated */
			};

	一个 ACL 包含多个 expr

3. rule 的执行

概括起来很简单,执行判断条件。符合条件,然后执行对应动作。

下面是 rspadd 的示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/* add response headers from the rule sets in the same order */
list_for_each_entry(wl, &rule_set->rsp_add, list) {
	if (txn->status < 200)
		break;
	if (wl->cond) {
		int ret = acl_exec_cond(wl->cond, px, t, txn, SMP_OPT_DIR_RES|SMP_OPT_FINAL);
		ret = acl_pass(ret);
		if (((struct acl_cond *)wl->cond)->pol == ACL_COND_UNLESS)
			ret = !ret;
		if (!ret)
			continue;
	}
	if (unlikely(http_header_add_tail(&txn->rsp, &txn->hdr_idx, wl->s) < 0))
		goto return_bad_resp;
}

对于同一个种类的 rules,执行逻辑如下:

1
2
3
4
5
6
主要遍历 rule,调用 acl_exec_cond 执行该 rule 的检测条件。该检测结果只给出匹配与否。
	逐个遍历 cond 上的 ACL 组,即cond->suites。任一 suite 匹配成功,则认为匹配成功
	同一个 ACL 组上,遍历所有 suite->terms (ACL + 取反逻辑)。任意一个 ACL 匹配失败,则跳到下一个 ACL 组继续匹配。同一组全部 ACL 匹配成功,则认为该 ACL 组匹配
		同一个 ACL 上的匹配,则是逐一遍历 ACL 的 expr。只要任意一个 expr 匹配成功,则认为该 ACL 匹配成功
结合 condition 中的条件 if/unless,确定最终匹配结果
如果匹配则执行对应的 action,否则检测下一条规则。

4. rule 的种类

从 proxy 结构体可以看出 rule 的种类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct proxy {
	...
	struct list acl;                        /* ACL declared on this proxy */
	struct list http_req_rules;       /* HTTP request rules: allow/deny/http-auth */
	struct list block_cond;                 /* early blocking conditions (chained) */
	struct list redirect_rules;             /* content redirecting rules (chained) */
	struct list switching_rules;            /* content switching rules (chained) */
	struct list persist_rules;        /* 'force-persist' and 'ignore-persist' rules (chained) */
	struct list sticking_rules;             /* content sticking rules (chained) */
	struct list storersp_rules;             /* content store response rules (chained) */
	struct list server_rules;               /* server switching rules (chained) */
	struct {                                /* TCP request processing */
		unsigned int inspect_delay;     /* inspection delay */
		struct list inspect_rules;      /* inspection rules */
		struct list l4_rules;           /* layer4 rules */
	} tcp_req;
	struct {                                /* TCP request processing */
		unsigned int inspect_delay;     /* inspection delay */
		struct list inspect_rules;      /* inspection rules */
	} tcp_rep;
	...
}

其中, 函数 http_process_req_common 中处理的规则如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
http_process_req_common
{
	... 
	1. process block rules
	...
	2. process http req rules
	...
	3. execute regular exp if any
	...
	4. req add
	...
	5. process redirect rules
	...
}

这里没有详细的介绍各种具体用途的 rules。随后具体分析代码的时候总结一下再加上。

HAProxy 研究笔记 -- TCP 连接处理流程

http://blog.chinaunix.net/uid-10167808-id-3771148.html

本文基于 HAProxy 1.5-dev7 版本。

1
2
3
4
5
6
7
8
9
10
11
12
目录
1. 关键数据结构 session
2. 相关初始化
	2.1. 初始化处理 TCP 连接的方法
	2.2. 初始化 listener
	2.3. 绑定所有已注册协议上的 listeners
	2.4. 启用所有已注册协议上的 listeners
3. TCP 连接的处理流程
	3.1. 接受新建连接
	3.2. TCP 连接上的接收事件
	3.3. TCP 连接上的发送事件
	3.4. http 请求的处理

1. 关键数据结构 session

haproxy 负责处理请求的核心数据结构是 struct session,本文不对该数据结构进行分析。

从业务的处理的角度,简单介绍一下对 session 的理解:

1
2
3
haproxy 每接收到 client 的一个连接,便会创建一个 session 结构,该结构一直伴随着连接的处理,直至连接被关闭,session 才会被释放
haproxy 其他的数据结构,大多会通过引用的方式和 session 进行关联
一个业务 session 上会存在两个 TCP 连接,一个是 client 到 haproxy,一个是 haproxy 到后端 server。

此外,一个 session,通常还要对应一个 task,haproxy 最终用来做调度的是通过 task。

2. 相关初始化

在 haproxy 正式处理请求之前,会有一系列初始化动作。这里介绍和请求处理相关的一些初始化。

2.1. 初始化处理 TCP 连接的方法

初始化处理 TCP 协议的相关数据结构,主要是和 socket 相关的方法的声明。详细见下面 proto_tcpv4 (proto_tcp.c)的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static struct protocol proto_tcpv4 = {
	.name = "tcpv4",
	.sock_domain = AF_INET,
	.sock_type = SOCK_STREAM,
	.sock_prot = IPPROTO_TCP,
	.sock_family = AF_INET,
	.sock_addrlen = sizeof(struct sockaddr_in),
	.l3_addrlen = 32/8,
	.accept = &stream_sock_accept,
	.read = &stream_sock_read,
	.write = &stream_sock_write,
	.bind = tcp_bind_listener,
	.bind_all = tcp_bind_listeners,
	.unbind_all = unbind_all_listeners,
	.enable_all = enable_all_listeners,
	.listeners = LIST_HEAD_INIT(proto_tcpv4.listeners),
	.nb_listeners = 0,
};
2.2. 初始化 listener

listener,顾名思义,就是用于负责处理监听相关的逻辑。

在 haproxy 解析 bind 配置的时候赋值给 listener 的 proto 成员。函数调用流程如下:

1
2
3
4
5
cfgparse.c
	-> cfg_parse_listen
		-> str2listener
			-> tcpv4_add_listener
				-> listener->proto = &proto_tcpv4;

由于这里初始化的是 listener 处理 socket 的一些方法。可以推断, haproxy 接收 client 新建连接的入口函数应该是 protocol 结构体中的 accpet 方法。对于tcpv4 来说,就是 stream_sock_accept() 函数。该函数到 1.5-dev19 中改名为 listener_accept()。这是后话,暂且不表。

listener 的其他初始化

1
2
3
4
5
cfgparse.c
	-> check_config_validity
		-> listener->accept = session_accept;
listener->frontend = curproxy; (解析 frontend 时,会执行赋值: curproxy->accept = frontend_accept)
listener->handler = process_session;

整个 haproxy 配置文件解析完毕,listener 也已初始化完毕。可以简单梳理一下几个 accept 方法的设计逻辑:

1
2
3
stream_sock_accept(): 负责接收新建 TCP 连接,并触发 listener 自己的 accept 方法 session_accept()
session_accept(): 负责创建 session,并作 session 成员的初步初始化,并调用 frontend 的 accept 方法 front_accetp()
frontend_accept(): 该函数主要负责 session 前端的 TCP 连接的初始化,包括 socket 设置,log 设置,以及 session 部分成员的初始化

下文分析 TCP 新建连接处理过程,基本上就是这三个函数的分析。

2.3. 绑定所有已注册协议上的 listeners
1
2
3
4
5
6
haproxy.c 
	-> protocol_bind_all 
		-> all registered protocol bind_all
			-> tcp_bind_listeners (TCP)
				-> tcp_bind_listener 
					-> [ fdtab[fd].cb[DIR_RD].f = listener->proto->accept ]

该函数指针指向 proto_tcpv4 结构体的 accept 成员,即函数 stream_sock_accept

2.4. 启用所有已注册协议上的 listeners

把所有 listeners 的 fd 加到 polling lists 中 haproxy.c -> protocol_enable_all -> all registered protocol enable_all -> enable_all_listeners (TCP) -> enable_listener 函数会将处于 LI_LISTEN 的 listener 的状态修改为 LI_READY,并调用 cur poller 的 set 方法, 比如使用 sepoll,就会调用 __fd_set

3. TCP 连接的处理流程

3.1. 接受新建连接

前面几个方面的分析,主要是为了搞清楚当请求到来时,处理过程中实际的函数调用关系。以下分析 TCP 建连过程。

1
2
3
4
5
6
7
8
haproxy.c 
	-> run_poll_loop 
		-> cur_poller.poll 
			-> __do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法) 
				-> fdtab[fd].cb[DIR_RD].f(fd) (TCP 协议的该函数指针指向 stream_sock_accept )
					-> stream_sock_accept
						-> 按照 global.tune.maxaccept 的设置尽量可能多执行系统调用 accept,然后再调用 l->accept(),即 listener 的 accept 方法 session_accept
							-> session_accept

session_accept 主要完成以下功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
调用 pool_alloc2 分配一个 session 结构
调用 task_new 分配一个新任务
将新分配的 session 加入全局 sessions 链表中
session 和 task 的初始化,若干重要成员的初始化如下
	t->process = l->handler: 即 t->process 指向 process_session
	t->context = s: 任务的上下文指向 session
	s->listener = l: session 的 listener 成员指向当前的 listener
	s->si[] 的初始化,记录 accept 系统调用返回的 cfd 等
	初始化 s->txn
	为 s->req 和 s->rep 分别分配内存,并作对应的初始化
		s->req = pool_alloc2(pool2_buffer)
		s->rep = pool_alloc2(pool2_buffer)
		从代码上来看,应该是各自独立分配 tune.bufsize + sizeof struct buffer 大小的内存
	新建连接 cfd 的一些初始化
		cfd 设置为非阻塞
		将 cfd 加入 fdtab[] 中,并注册新建连接 cfg 的 read 和 write 的方法
		fdtab[cfd].cb[DIR_RD].f = l->proto->read,设置 cfd 的 read 的函数 l->proto->read,对应 TCP 为 stream_sock_read,读缓存指向 s->req,
		fdtab[cfd].cb[DIR_WR].f = l->proto->write,设置 cfd 的 write 函数 l->proto->write,对应 TCP 为 stream_sock_write,写缓冲指向 s->rep
p->accept 执行 proxy 的 accept 方法即 frontend_accept
	设置 session 结构体的 log 成员
	根据配置的情况,分别设置新建连接套接字的选项,包括 TCP_NODELAY/KEEPALIVE/LINGER/SNDBUF/RCVBUF 等等
	如果 mode 是 http 的话,将 session 的 txn 成员做相关的设置和初始化
3.2. TCP 连接上的接收事件
1
2
3
4
5
6
haproxy.c 
	-> run_poll_loop 
		-> cur_poller.poll 
			-> __do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法) 
				-> fdtab[fd].cb[DIR_RD].f(fd) (该函数在建连阶段被初始化为四层协议的 read 方法,对于 TCP 协议,为 stream_sock_read )
					-> stream_sock_read

stream_sock_read 主要完成以下功能

找到当前连接的读缓冲,即当前 session 的 req buffer:

1
struct buffer *b = si->ib
1
2
3
4
5
6
7
8
9
根据配置,调用 splice 或者 recv 读取套接字上的数据,并填充到读缓冲中,即填充到从 b->r(初始位置应该就是 b->data)开始的内存中
如果读取到 0 字节,则意味着接收到对端的关闭请求,调用 stream_sock_shutr 进行处理
	读缓冲标记 si->ib->flags 的 BF_SHUTR 置位,清除当前 fd 的 epoll 读事件,不再从该 fd 读取
	如果写缓冲 si->ob->flags 的 BF_SHUTW 已经置位,说明应该是由本地首先发起的关闭连接动作
		将 fd 从 fdset[] 中清除,从 epoll 中移除 fd,执行系统调用 close(fd), fd.state 置位 FD_STCLOSE
		stream interface 的状态修改 si->state = SI_ST_DIS
唤醒任务 task_wakeup,把当前任务加入到 run queue 中。随后检测 runnable tasks 时,就会处理该任务

##### 3.3. TCP 连接上的发送事件
haproxy.c 
    -> run_poll_loop 
        -> cur_poller.poll 
            -> __do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法) 
                -> fdtab[fd].cb[DIR_WR].f(fd) (该函数在建连阶段被初始化为四层协议的 write 方法,对于 TCP 协议,为 stream_sock_write )
                    -> stream_sock_write
1
2
3
4

stream_sock_write主要完成以下功能

  找到当前连接的写缓冲,即当前 session 的 rep buffer:
struct buffer *b = si->ob
1
2
3
4
5

```
  将待发送的数据调用 send 系统调用发送出去  
  或者数据已经发送完毕,需要发送关闭连接的动作 stream_sock_shutw-> 系统调用 shutdown  
  唤醒任务 task_wakeup,把当前任务加入到 run queue 中。随后检测 runnable tasks 时,就会处理该任务  
3.4. http 请求的处理
1
2
3
4
haproxy.c 
	-> run_poll_loop 
		-> process_runnable_tasks,查找当前待处理的任务所有 tasks, 然后调用 task->process(大多时候就是 process_session) 进行处理
			-> process_session

process_session 主要完成以下功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
处理连接需要关闭的情形,分支 resync_stream_interface
处理请求,分支 resync_request (read event)
	根据 s->req->analysers 的标记位,调用不同的 analyser 进行处理请求
	ana_list & AN_REQ_WAIT_HTTP: http_wait_for_request
	ana_list & AN_REQ_HTTP_PROCESS_FE: http_process_req_common
	ana_list & AN_REQ_SWITCHING_RULES:process_switching_rules
处理应答,分支 resync_response (write event)
	根据 s->rep->analysers 的标记位,调用不同的 analyser 进行处理请求
	ana_list & AN_RES_WAIT_HTTP: http_wait_for_response
	ana_list & AN_RES_HTTP_PROCESS_BE:http_process_res_common
处理 forward buffer 的相关动作
关闭 req 和 rep 的 buffer,调用 pool2_free 释放 session 及其申请的相关内存,包括读写缓冲 (read 0 bytes)
	pool_free2(pool2_buffer, s->req);
	pool_free2(pool2_buffer, s->rep);
	pool_free2(pool2_session, s);
task 从运行任务队列中清除,调用 pool2_free 释放 task 申请的内存: task_delete(); task_free();

本文简单分析了 TCP 连接的处理过程,不侧重细节分析,而且缺少后端 server 的选择以及建连等,重在希望展示出一个 haproxy 处理 TCP 连接的框架。