kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp
sar -n DEV 1

linux内核中tcp连接的断开处理

http://simohayha.iteye.com/blog/503856

我们这次主要来分析相关的两个断开函数close和shotdown以及相关的套接口选项SO_LINGER。这里要注意SO_LINGER对shutdown无任何影响。它只对close起作用。

先来坎SO_LINGER所对应的数据结构:

1
2
3
4
5
6
struct linger {
	//linger的开关
	int     l_onoff;    /* Linger active        */
	//所等待的时间。
	int     l_linger;   /* How long to linger for   */
};

这里对这个套接口选项就不详细介绍了,在unix网络编程中有详细的介绍,我们这里只会分析内核的处理代码。

首先来看close函数,我们知道缺醒情况下,close是立即返回,但是如果套接口的发送缓冲区还有未发送的数据,系统将会试着把这些数据发送给对端。而这个缺醒情况我们是可以通过SO_LINGER来改变的。还有一个要注意就是close调用并不一定会引发tcp的断开连接。因为close只是将这个socket的引用计数减一(主要是针对多个进程),而真正要直接引发断开,则需要用shutdown函数。

内核中socket的close的系统调用是sock_close,而在sock_close中,直接调用sock_release来实现功能,因此这里我们直接看sock_release的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void sock_release(struct socket *sock)
{
	if (sock->ops) {
		struct module *owner = sock->ops->owner;

		//调用inet_stream_ops的inet_release函数
		sock->ops->release(sock);
		//将ops致空。
		sock->ops = NULL;
		module_put(owner);
	}

	//这个域貌似是26.31新加的,具体做什么的还不知道。
	if (sock->fasync_list)
		printk(KERN_ERR "sock_release: fasync list not empty!\n");

	//更新全局的socket数目
	percpu_sub(sockets_in_use, 1);
	if (!sock->file) {
		//更新inode的引用计数
		iput(SOCK_INODE(sock));
		return;
	}
	sock->file = NULL;
}

然后来看inet_release的实现,这个函数主要用来通过SO_LINGER套接字来得到超时时间,然后调用tcp_close来关闭sock。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
int inet_release(struct socket *sock)
{
	struct sock *sk = sock->sk;

	if (sk) {
		long timeout;

		/* Applications forget to leave groups before exiting */
		ip_mc_drop_socket(sk);

		timeout = 0;
		//判断是否设置SO_LINGER并且不是处于正在shutdowning,则设置timeout为l_linger(也就是我们设置的值).
		if (sock_flag(sk, SOCK_LINGER) &&
			!(current->flags & PF_EXITING))
			timeout = sk->sk_lingertime;
		sock->sk = NULL;
		//调用tcp_close.
		sk->sk_prot->close(sk, timeout);
	}
	return 0;
}

tcp_close函数比较长我们这里分段来分析它,首先来看第一部分。这里要注意几点:

1 当close掉一个服务端的父socket的时候,内核会先处理半连接队列然后是已经accept了的队列,最后才会处理父sock。

2 处理接收缓冲区的数据的时候,直接遍历receive_queue(前面blog有介绍),然后统计未发送的socket。我们知道close是不管接收buf的,也就是他会把接收buf释放掉,然后发送rst给对端的。

3 当so_linger有设置并且超时时间为0,则发送rst给对端,并且清空发送和接收buf。这个也不会引起最终的四分组终止序列。

4 当接收缓冲区有未读数据,则直接发送rst给对端。这个也不会引起最终的四分组终止序列。

5 当so_linger有设置,并且超时不为0,或者so_linger没有设置,此时都会引起最终的四分组终止序列来终止连接。(通过send_fin来发送fin,并引发四分组终止序列).而在send_fin中会发送掉发送缓冲区中的数据。

来看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
void tcp_close(struct sock *sk, long timeout)
{
	struct sk_buff *skb;
	int data_was_unread = 0;
	int state;

	lock_sock(sk);
	sk->sk_shutdown = SHUTDOWN_MASK;

	//如果处于tcp_listen说明将要关闭的这个socket是一个服务端的主socket。
	if (sk->sk_state == TCP_LISTEN) {
		//设置sock状态.
		tcp_set_state(sk, TCP_CLOSE);

		//这个函数主要用来清理半连接队列(下面会简要分析这个函数)
		/* Special case. */
		inet_csk_listen_stop(sk);
		//处理要关闭的sock
		goto adjudge_to_death;
	}

	//遍历sk_receive_queue也就是输入buf队列。然后统计还没有读取的数据。
	while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
		u32 len = TCP_SKB_CB(skb)->end_seq - TCP_SKB_CB(skb)->seq -
			  tcp_hdr(skb)->fin;
		data_was_unread += len;
		//free这个skb
		__kfree_skb(skb);
	}

	sk_mem_reclaim(sk);


	//第一个if主要是实现了rfc2525的2.17,也就是关闭的时候,如果接收buf中有未读数据,则发送一个rst给对端。(下面有摘抄相关内容)
	if (data_was_unread) {
		/* Unread data was tossed, zap the connection. */
		NET_INC_STATS_USER(sock_net(sk), LINUX_MIB_TCPABORTONCLOSE);
		//设置状态
		tcp_set_state(sk, TCP_CLOSE);
		//发送rst
		tcp_send_active_reset(sk, GFP_KERNEL);
	}
	//第二个if主要是判断so_linger套接字,并且超时时间为0。此时我们就直接丢掉所有的发送缓冲区中的数据
	else if (sock_flag(sk, SOCK_LINGER) && !sk->sk_lingertime) {
		/* Check zero linger _after_ checking for unread data. */
		//调用tcp_disconnect,这个函数主要用来断开和对端的连接,这个函数下面会介绍。
		sk->sk_prot->disconnect(sk, 0);
		NET_INC_STATS_USER(sock_net(sk), LINUX_MIB_TCPABORTONDATA);
	}
	//这个函数主要用来判断是否需要发送fin,也就是判断状态。下面我会详细介绍这个函数。
	else if (tcp_close_state(sk)) {

		//发送fin.
		tcp_send_fin(sk);
	}

	//等待一段时间。这里的timeout,如果有设置so_linger的话就是l_linger.这里主要是等待发送缓冲区的buf发送(如果超时时间不为0).
	sk_stream_wait_close(sk, timeout);
	........................

}
rfc2525的2.17的介绍:
1
2
3
When an application closes a connection in such a way that it can no longer read any received data, 
the TCP SHOULD, per section 4.2.2.13 of RFC 1122, send a RST if there is any unread received data, 
or if any new data is received. A TCP that fails to do so exhibits "Failure to RST on close with data pending".

ok,现在来看上面遇到的3个函数,一个是inet_csk_listen_stop,一个是tcp_close_state,一个是tcp_disconnect.我们一个个来看他们。

首先是inet_csk_listen_stop函数。我们知道这个函数主要用来清理所有的半连接队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void inet_csk_listen_stop(struct sock *sk)
{
	struct inet_connection_sock *icsk = inet_csk(sk);
	struct request_sock *acc_req;
	struct request_sock *req;

	//首先删除keepalive定时器。
	inet_csk_delete_keepalive_timer(sk);

	/* make all the listen_opt local to us */
	//得到accept 队列。
	acc_req = reqsk_queue_yank_acceptq(&icsk->icsk_accept_queue);

	//然后销毁掉所有的半连接队列,也就是listen_sock队列
	reqsk_queue_destroy(&icsk->icsk_accept_queue);


	//遍历accept队列断开与对端的连接。
	while ((req = acc_req) != NULL) {
	...............................................

		//调用tcp_disconnect来断开与对端的连接。这里注意是非阻塞的。
		sk->sk_prot->disconnect(child, O_NONBLOCK);

		sock_orphan(child);

		percpu_counter_inc(sk->sk_prot->orphan_count);

		//销毁这个sock。
		inet_csk_destroy_sock(child);

		........................................
	}
	WARN_ON(sk->sk_ack_backlog);
}

接下来来看tcp_disconnect函数。这个函数主要用来断开和对端的连接.它会释放读写队列,发送rst,清除定时器等等一系列操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
int tcp_disconnect(struct sock *sk, int flags)
{
	struct inet_sock *inet = inet_sk(sk);
	struct inet_connection_sock *icsk = inet_csk(sk);
	struct tcp_sock *tp = tcp_sk(sk);
	int err = 0;
	int old_state = sk->sk_state;

	if (old_state != TCP_CLOSE)
		tcp_set_state(sk, TCP_CLOSE);
	...................

	//清除定时器,重传,delack等。
	tcp_clear_xmit_timers(sk);
	//直接free掉接收buf。
	__skb_queue_purge(&sk->sk_receive_queue);
	//free掉写buf。
	tcp_write_queue_purge(sk);
	__skb_queue_purge(&tp->out_of_order_queue);
#ifdef CONFIG_NET_DMA
	__skb_queue_purge(&sk->sk_async_wait_queue);
#endif

	inet->dport = 0;

	if (!(sk->sk_userlocks & SOCK_BINDADDR_LOCK))
		inet_reset_saddr(sk);
		..........................................
	//设置状态。
	tcp_set_ca_state(sk, TCP_CA_Open);
	//清理掉重传的一些标记
	tcp_clear_retrans(tp);
	inet_csk_delack_init(sk);
	tcp_init_send_head(sk);
	memset(&tp->rx_opt, 0, sizeof(tp->rx_opt));
	__sk_dst_reset(sk);

	WARN_ON(inet->num && !icsk->icsk_bind_hash);

	sk->sk_error_report(sk);
	return err;
}

紧接着是tcp_close_state函数这个函数就是用来判断是否应该发送fin:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//这个数组表示了当close后,tcp的状态变化,可以看到注释很清楚,包含了3部分。这里也就是通过current也就是tcp的状态取得new state也就是close的状态,然后再和TCP_ACTION_FIN按位于,得到action
static const unsigned char new_state[16] = {
  /* current state:        new state:      action:  */
  /* (Invalid)      */ TCP_CLOSE,
  /* TCP_ESTABLISHED    */ TCP_FIN_WAIT1 | TCP_ACTION_FIN,
  /* TCP_SYN_SENT   */ TCP_CLOSE,
  /* TCP_SYN_RECV   */ TCP_FIN_WAIT1 | TCP_ACTION_FIN,
  /* TCP_FIN_WAIT1  */ TCP_FIN_WAIT1,
  /* TCP_FIN_WAIT2  */ TCP_FIN_WAIT2,
  /* TCP_TIME_WAIT  */ TCP_CLOSE,
  /* TCP_CLOSE      */ TCP_CLOSE,
  /* TCP_CLOSE_WAIT */ TCP_LAST_ACK  | TCP_ACTION_FIN,
  /* TCP_LAST_ACK   */ TCP_LAST_ACK,
  /* TCP_LISTEN     */ TCP_CLOSE,
  /* TCP_CLOSING    */ TCP_CLOSING,
};

static int tcp_close_state(struct sock *sk)
{
	//取得new state
	int next = (int)new_state[sk->sk_state];
	int ns = next & TCP_STATE_MASK;

	tcp_set_state(sk, ns);

	//得到action
	return next & TCP_ACTION_FIN;
}

接下来来看tcp_close的剩余部分的代码,剩下的部分就是处理一些状态以及通知这里只有一个要注意的就是TCP_LINGER2这个套接字,这个套接字能够设置等待fin的超时时间,也就是tcp_sock的域linger2.我们知道系统还有一个sysctl_tcp_fin_timeout,也就是提供了一个sys文件系统的接口来修改这个值,不过我们如果设置linger2为一个大于0的值的话,内核就会取linger2这个值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
adjudge_to_death:

	//得到sock的状态。
	state = sk->sk_state;
	sock_hold(sk);
	sock_orphan(sk);

	//唤醒阻塞在这个sock的队列(前面有详细介绍这个函数)
	release_sock(sk);

	local_bh_disable();
	bh_lock_sock(sk);
	WARN_ON(sock_owned_by_user(sk));

	//全局的cpu变量引用计数减一。
	percpu_counter_inc(sk->sk_prot->orphan_count);

	/* Have we already been destroyed by a softirq or backlog? */
	if (state != TCP_CLOSE && sk->sk_state == TCP_CLOSE)
		goto out;

	//如果状态为TCP_FIN_WAIT2,说明接收了ack,在等待对端的fin。
	if (sk->sk_state == TCP_FIN_WAIT2) {
		struct tcp_sock *tp = tcp_sk(sk);
		//超时时间小于0,则说明马上超时,设置状态为tcp_close,然后发送rst给对端。
		if (tp->linger2 < 0) {
			tcp_set_state(sk, TCP_CLOSE);
			tcp_send_active_reset(sk, GFP_ATOMIC);
			NET_INC_STATS_BH(sock_net(sk),
					LINUX_MIB_TCPABORTONLINGER);
		} else {
			//得到等待fin的超时时间。这里主要也就是在linger2和sysctl_tcp_fin_timeout中来取得。
			const int tmo = tcp_fin_time(sk);
			//如果超时时间太长,则启动keepalive定时器发送探测报。
			if (tmo > TCP_TIMEWAIT_LEN) {
				inet_csk_reset_keepalive_timer(sk,
						tmo - TCP_TIMEWAIT_LEN);
			} else {
				//否则进入time_wait状态。
				tcp_time_wait(sk, TCP_FIN_WAIT2, tmo);
				goto out;
			}
		}
	}
	......................................

	//如果sk的状态为tcp_close则destroy掉这个sk
	if (sk->sk_state == TCP_CLOSE)
		inet_csk_destroy_sock(sk);
	/* Otherwise, socket is reprieved until protocol close. */

out:
	bh_unlock_sock(sk);
	local_bh_enable();
	sock_put(sk);
}

然后来看send_fin的实现,这个函数用来发送一个fin,并且尽量发送完发送缓冲区中的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
void tcp_send_fin(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);
	//取得写bufer的尾部。
	struct sk_buff *skb = tcp_write_queue_tail(sk);
	int mss_now;

	/* Optimization, tack on the FIN if we have a queue of
	 * unsent frames.  But be careful about outgoing SACKS
	 * and IP options.
	 */
	mss_now = tcp_current_mss(sk);
	//如果发送队列不为空,此时我们只需要设置sk buffer的标记位(也就是tcp报文的控制位为fin),可以看到我们是加到写buffer的尾部,这里是为了能尽量将写buffer中的数据全部传出)
	if (tcp_send_head(sk) != NULL) {
		TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_FIN;
		TCP_SKB_CB(skb)->end_seq++;
		tp->write_seq++;
	} else {
	..................................
		//到这里标明发送缓冲区位空,因此我们需要新建一个sk buffer,然后设置标记位,并加入到写buffer。
		skb_reserve(skb, MAX_TCP_HEADER);
		/* FIN eats a sequence byte, write_seq advanced by tcp_queue_skb(). */
		tcp_init_nondata_skb(skb, tp->write_seq,
					 TCPCB_FLAG_ACK | TCPCB_FLAG_FIN);
		tcp_queue_skb(sk, skb);
	}
	//发送写缓冲区中的数据。
	__tcp_push_pending_frames(sk, mss_now, TCP_NAGLE_OFF);
}
void __tcp_push_pending_frames(struct sock *sk, unsigned int cur_mss,
				   int nonagle)
{
	struct sk_buff *skb = tcp_send_head(sk);

	if (!skb)
		return;

	/* If we are closed, the bytes will have to remain here.
	 * In time closedown will finish, we empty the write queue and
	 * all will be happy.
	 */
	if (unlikely(sk->sk_state == TCP_CLOSE))
		return;
	//发送数据,这里关闭了nagle。也就是立即将数据全部发送出去(我前面的blog有详细解释这个函数).
	if (tcp_write_xmit(sk, cur_mss, nonagle, 0, GFP_ATOMIC))
		tcp_check_probe_timer(sk);
}

接下来来看shutdown的实现。在2.26.31中,系统调用的实现有些变化。

这里我们要知道shutdown会将写缓冲区的数据发出,然后唤醒阻塞的进程,来读取读缓冲区中的数据。

这个系统调用所对应的内核函数就是os_shutdown_socket。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#define SHUT_RD 0
#define SHUT_WR 1
#define SHUT_RDWR 2

int os_shutdown_socket(int fd, int r, int w)
{
	int what, err;

	if (r && w)
		what = SHUT_RDWR;
	else if (r)
		what = SHUT_RD;
	else if (w)
		what = SHUT_WR;
	else
		return -EINVAL;

	//调用socket的shutdown也就是kernel_sock_shutdown
	err = shutdown(fd, what);
	if (err < 0)
		return -errno;
	return 0;
}


int kernel_sock_shutdown(struct socket *sock, enum sock_shutdown_cmd how)
{
	//他最终会调用inet_shutdown
	return sock->ops->shutdown(sock, how);
}

来看inet_shutdown的实现.这个函数的主要工作就是通过判断sock的状态不同来调用相关的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int inet_shutdown(struct socket *sock, int how)
{
	struct sock *sk = sock->sk;
	int err = 0;

	/* This should really check to make sure
	 * the socket is a TCP socket. (WHY AC...)
	 */
	//这里要注意每个how都是加1的,这说明在内核里读写是为1,2,3
	how++; /* maps 0->1 has the advantage of making bit 1 rcvs and
			   1->2 bit 2 snds.
			   2->3 */
	//判断how的合法性。
	if ((how & ~SHUTDOWN_MASK) || !how) /* MAXINT->0 */
		return -EINVAL;
	//锁住sock
	lock_sock(sk);

	//SS_CONNECTING说明这个sock的连接正在处理中。state域表示socket当前的内部状态
	if (sock->state == SS_CONNECTING) {
		//如果状态为这几个状态,说明是处于半连接处理阶段,此时设置状态为SS_DISCONNECTING
		if ((1 << sk->sk_state) &
			(TCPF_SYN_SENT | TCPF_SYN_RECV | TCPF_CLOSE))
			sock->state = SS_DISCONNECTING;
		else
			//否则设置为连接完毕
			sock->state = SS_CONNECTED;
	}

	//除过TCP_LISTEN以及TCP_SYN_SENT状态外的其他状态最终都会进入sk->sk_prot->shutdown也就是tcp_shutdown函数。

	switch (sk->sk_state) {
	//如果状态为tco_close则设置错误号,然后进入default处理
	case TCP_CLOSE:
		err = -ENOTCONN;
		/* Hack to wake up other listeners, who can poll for
		   POLLHUP, even on eg. unconnected UDP sockets -- RR */
	default:
		sk->sk_shutdown |= how;
		if (sk->sk_prot->shutdown)
			sk->sk_prot->shutdown(sk, how);
		break;

	/* Remaining two branches are temporary solution for missing
	 * close() in multithreaded environment. It is _not_ a good idea,
	 * but we have no choice until close() is repaired at VFS level.
	 */
	case TCP_LISTEN:
		//如果不为SHUT_RD则跳出switch,否则进入tcp_syn_sent的处理。
		if (!(how & RCV_SHUTDOWN))
			break;
		/* Fall through */
	case TCP_SYN_SENT:
		//断开连接,然后设置state
		err = sk->sk_prot->disconnect(sk, O_NONBLOCK);
		sock->state = err ? SS_DISCONNECTING : SS_UNCONNECTED;
		break;
	}

	/* Wake up anyone sleeping in poll. */
	//唤醒阻塞在这个socket上的进程,这里是为了将读缓冲区的数据尽量读完。
	sk->sk_state_change(sk);
	release_sock(sk);
	return err;
}

来看tcp_shutdown函数。

这里要注意,当只关闭读的话,并不会引起发送fin,也就是只会设置个标记,然后在读取数据的时候返回错误。而关闭写端,则就会引起发送fin。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void tcp_shutdown(struct sock *sk, int how)
{
	/*  We need to grab some memory, and put together a FIN,
	 *  and then put it into the queue to be sent.
	 *      Tim MacKenzie(tym@dibbler.cs.monash.edu.au) 4 Dec '92.
	 */
	//如果为SHUT_RD则直接返回。
	if (!(how & SEND_SHUTDOWN))
		return;

	/* If we've already sent a FIN, or it's a closed state, skip this. */
	//这里英文注释很详细我就不多解释了。
	if ((1 << sk->sk_state) &
		(TCPF_ESTABLISHED | TCPF_SYN_SENT |
		 TCPF_SYN_RECV | TCPF_CLOSE_WAIT)) {
		/* Clear out any half completed packets.  FIN if needed. */
		//和tcp_close那边处理一样
		if (tcp_close_state(sk))
			tcp_send_fin(sk);
	}
}

最后来看sock_def_readable它就是sk->sk_state_change。也就是用来唤醒阻塞的进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
static void sock_def_readable(struct sock *sk, int len)
{
	read_lock(&sk->sk_callback_lock);
	//判断是否有进程在等待这个sk
	if (sk_has_sleeper(sk))
	//有的话,唤醒进程,这里可以看到递交给上层的是POLLIN,也就是读事件。
	wake_up_interruptible_sync_poll(sk->sk_sleep, POLLIN |
						POLLRDNORM | POLLRDBAND);

	//这里异步唤醒,可以看到这里也是POLL_IN.
	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
	read_unlock(&sk->sk_callback_lock);
}

可以看到shutdown函数只会处理SEND_SHUTDOWN。并且当调用shutdown之后,读缓冲区,还可以继续读取。

Socket层实现系列 — I/O事件及其处理函数

http://blog.csdn.net/zhangskd/article/details/45787989

主要内容:Socket I/O事件的定义、I/O处理函数的实现。

内核版本:3.15.2

I/O事件定义

sock中定义了几个I/O事件,当协议栈遇到这些事件时,会调用它们的处理函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct sock {
	...
	struct socket_wq __rcu *sk_wq; /* socket的等待队列和异步通知队列 */
	...
	/* callback to indicate change in the state of the sock.
	 * sock状态改变时调用,比如从TCP_SYN_SENT或TCP_SYN_RECV变为TCP_ESTABLISHED,
	 * 导致connect()的唤醒。比如从TCP_ESTABLISHED变为TCP_CLOSE_WAIT。
	 */
	void (*sk_state_change) (struct sock *sk);

	/* callback to indicate there is data to be processed.
	 * sock上有数据可读时调用,比如服务器端收到第三次握手的ACK时会调用,导致accept()的唤醒。
	 */
	void (*sk_data_ready) (struct sock *sk);

	/* callback to indicate there is buffer sending space available.
	 * sock上有发送空间可写时调用,比如发送缓存变得足够大了。
	 */
	void (*sk_write_space) (struct sock *sk);

	/* callback to indicate errors (e.g. %MSG_ERRQUEUE)
	 * sock上有错误发生时调用,比如收到RST包。
	 */
	void (*sk_error_report) (struct sock *sk);
	...
};

Socket I/O事件的默认处理函数在sock初始化时赋值。

对于SOCK_STREAM类型的Socket,sock有发送缓存可写事件会被更新为sk_stream_write_space。

1
2
3
4
5
6
7
8
9
void sock_init_data(struct socket *sock, struct sock *sk)
{
	...
	sk->sk_state_change = sock_def_wakeup; /* sock状态改变事件 */
	sk->sk_data_ready = sock_def_readable; /* sock有数据可读事件 */
	sk->sk_write_space = sock_def_write_space; /* sock有发送缓存可写事件 */
	sk->sk_error_report = sock_def_error_report; /* sock有IO错误事件 */
	...
}

判断socket的等待队列上是否有进程。

1
2
3
4
5
static inline bool wq_has_sleeper(struct socket_wq *wq)
{
	smp_mb();
	return wq && waitqueue_active(&wq->wait);
}

状态改变事件

sk->sk_state_change的实例为sock_def_wakeup(),当sock的状态发生改变时,会调用此函数来进行处理。

1
2
3
4
5
6
7
8
9
10
static void sock_def_wakeup(struct sock *sk)
{
	struct socket_wq *wq; /* socket的等待队列和异步通知队列 */

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	if (wq_has_sleeper(wq)) /* 有进程阻塞在此socket上 */
		wake_up_interruptible_all(&wq->wait); /* 唤醒此socket上的所有睡眠进程 */
	rcu_read_unlock();
}
1
2
3
4
5
6
7
8
9
#define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)

void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key)
{
	unsigned long flags;
	spin_lock_irqsave(&q->lock, flags);
	__wake_up_common(q, mode, nr_exclusive, 0, key);
	spin_unlock_irqrestore(&q->lock, flags);
}

初始化等待任务时,如果flags设置了WQ_FLAG_EXCLUSIVE,那么传入的nr_exclusive为1,

表示只允许唤醒一个等待任务,这是为了避免惊群现象。否则会把t等待队列上的所有睡眠进程都唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive,
							 int wake_flags, void *key)
{
	wait_queue_t *curr, *next;

	list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
		unsigned flags = curr->flags;

		if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE)
			!--nr_exclusive)
			break;
	}
}

最终调用的是等待任务中的处理函数,默认为autoremove_wake_function()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)

#define DEFINE_WAIT_FUNC(name, function)    \
	wait_queue_t name = {    \
		.private = current,    \
		.func = function,    \
		.task_list = LIST_HEAD_INIT((name).task_list),    \
	}

int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	int ret = default_wake_function(wait, mode, sync, key); /* 默认的唤醒函数 */

	if (ret)
		list_del_init(&wait->task_list); /* 从等待队列中删除 */

	return ret;
}

int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key)
{
	return try_to_wake_up(curr->private, mode, wake_flags);
}

try_to_wake_up()通过把进程的状态设置为TASK_RUNNING,并把进程插入CPU运行队列,来唤醒睡眠的进程。

有数据可读事件

sk->sk_data_ready的实例为sock_def_readable(),当sock有输入数据可读时,会调用此函数来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void sock_def_readable(struct sock *sk)
{
	struct socket_wq *wq; /* socket的等待队列和异步通知队列 */

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);

	if (wq_has_sleeper(wq)) /* 有进程在此socket的等待队列 */
		wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
			POLLRDNORM | POLLRDBAND); /* 唤醒等待进程 */

	/* 异步通知队列的处理。
	 * 检查应用程序是否通过recv()类调用来等待接收数据,如果没有就发送SIGIO信号,
	 * 告知它有数据可读。
	 * how为函数的处理方式,band为用来告知的IO类型。
	 */
	sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define wake_up_interruptible_sync_poll(x, m) \
	__wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))

void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key)
{
	unsigned long flags;
	int wake_flags = 1; /* XXX WF_SYNC */

	if (unlikely(!q))
		return;
	if (unlikely(nr_exclusive != 1))
		wake_flags = 0;

	spin_lock_irqsave(&q->lock, flags);
	__wake_up_common(q, mode, nr_exclusive, wake_flags, key);
	spin_unlock_irqrestore(&q->lock, flags);
}

最终也是调用__wake_up_common()。初始化等待任务时,flags |= WQ_FLAG_EXCLUSIVE。

传入的nr_exclusive为1,表示只允许唤醒一个等待任务。所以这里只会唤醒一个等待的进程。

有缓存可写事件

sk->sk_write_space的实例为sock_def_write_space()。

如果socket是SOCK_STREAM类型的,那么函数指针的值会更新为sk_stream_write_space()。

sk_stream_write_space()在TCP中的调用路径为:

1
2
3
4
tcp_rcv_established / tcp_rcv_state_process
	tcp_data_snd_check
		tcp_check_space
			tcp_new_space
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* When incoming ACK allowed to free some skb from write_queue,
 * we remember this event in flag SOCK_QUEUE_SHRUNK and wake up socket
 * on the exit from tcp input handler.
 */
static void tcp_new_space(struct sock *sk)
{
	struct tcp_sock *tp = tcp_sk(sk);

	if (tcp_should_expand_sndbuf(sk)) {
		tcp_sndbuf_expand(sk);
		tp->snd_cwnd_stamp = tcp_time_stamp;
	}

	/* 检查是否需要触发有缓存可写事件 */
	sk->sk_write_space(sk);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void sk_stream_write_space(struct sock *sk)
{
	struct socket *sock = sk->sk_socket;
	struct socket_wq *wq; /* 等待队列和异步通知队列 */

	/* 如果剩余的发送缓存不低于发送缓存上限的1/3,且尚未发送的数据不高于一定值时 */
	if (sk_stream_is_writeable(sk) && sock) {
		clear_bit(SOCK_NOSPACE, &sock->flags); /* 清除发送缓存不够的标志 */

		rcu_read_lock();
		wq = rcu_dereference(sk->sk_wq); /* socket的等待队列和异步通知队列 */
		if (wq_has_sleeper(wq)) /* 如果等待队列不为空,则唤醒一个睡眠进程 */
			wake_up_interruptible_poll(&wq->wait, POLLOUT | POLLWRNORM | POLLWRBAND);

		/* 异步通知队列不为空,且允许发送数据时。
		 * 检测sock的发送队列是否曾经到达上限,如果有的话发送SIGIO信号,告知异步通知队列上
		 * 的进程有发送缓存可写。
		 */
		if (wq && wq->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
			sock_wake_async(sock, SOCK_WAKE_SPACE, POLL_OUT);

		rcu_read_unlock();
	}
}

#define wake_up_interruptible_poll(x, m) \
	__wake_up(x, TASK_INTERRUPTIBLE, 1, (void *) (m))

最终也是调用__wake_up_common()。初始化等待任务时,flags |= WQ_FLAG_EXCLUSIVE。

传入的nr_exclusive为1,表示只允许唤醒一个等待进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct sock {
	...
	/* 发送队列中,skb数据区的总大小 */
	atomic_t sk_wmem_alloc;
	...
	int sk_sndbuf; /* 发送缓冲区大小的上限 */
	struct sk_buff_head sk_write_queue; /* 发送队列 */
	...
	/* 发送队列的总大小,包含发送队列中skb数据区的总大小,
	 * 以及sk_buff、sk_shared_info结构体、协议头的额外开销。
	 */
	int sk_wmem_queued;
	...
};

如果剩余的发送缓存大于发送缓存上限的1/3,且尚未发送的数据少于一定值时,才会触发有发送

缓存可写的事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static inline bool sk_stream_is_writeable(const struct sock *sk)
{
	return sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) &&
}

static inline int sk_stream_wspace(const struct sock *sk)
{
	return sk->sk_sndbuf - sk->sk_wmem_queued;
}

static inline int sk_stream_min_wspace(const struct sock *sk)
{
	return sk->sk_wmem_queued >> 1;
}

检查尚未发送的数据是否已经够多了,如果超过了用户设置的值,就不用触发有发送缓存可写事件,

以免使用过多的内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static inline bool sk_stream_memory_free(const struct sock *sk)
{
	if (sk->sk_wmem_queued >= sk->sk_sndbuf)
		return false;

	return sk->sk_prot->stream_memory_free ? sk->sk_prot->stream_memory_free(sk) : true;
}

struct proto tcp_prot = {
	...
	.stream_memory_free = tcp_stream_memory_free,
	...
};

static inline bool tcp_stream_memory_free(const struct sock *sk)
{
	const struct tcp_sock *tp = tcp_sk(sk);
	u32 notsent_bytes = tp->write_seq - tp->snd_nxt; /* 尚未发送的数据大小 */

	/* 当尚未发送的数据,少于配置的值时,才触发有发送缓存可写的事件。
	 * 这是为了避免发送缓存占用过多的内存。
	 */
	return notsent_bytes < tcp_notsent_lowat(tp);
}

如果有使用TCP_NOTSENT_LOWAT选项,则使用用户设置的值。

否则使用sysctl_tcp_notsent_lowat,默认为无穷大。

1
2
3
4
static inline u32 tcp_notsent_lowat(const struct tcp_sock *tp)
{
	return tp->notsent_lowat ?: sysctl_tcp_notsent_lowat;
}

有I/O错误事件

sk->sk_error_report的实例为sock_def_error_report()。

在以下函数中会调用I/O错误事件处理函数:

1
2
3
4
tcp_disconnect
tcp_reset
tcp_v4_err
tcp_write_err
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void sock_def_error_report(struct sock *sk)
{
	struct socket_wq *wq; /* 等待队列和异步通知队列 */

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	if (wq_has_sleeper(wq)) /* 有进程阻塞在此socket上 */
		wake_up_interruptible_poll(&wq->wait, POLLERR);

	/* 如果使用了异步通知,则发送SIGIO信号通知进程有错误 */
	sk_wake_async(sk, SOCK_WAKE_IO, POLL_ERR);
}

#define wake_up_interruptible_poll(x, m) \
	__wake_up(x, TASK_INTERRUPTIBLE, 1, (void *) (m))

最终也是调用__wake_up_common(),由于nr_exclusive为1,只会唤醒socket上的一个等待进程。

Socket层实现系列 — 睡眠驱动的同步等待

http://blog.csdn.net/zhangskd/article/details/45770323

主要内容:Socket的同步等待机制,connect和accept等待的实现。

内核版本:3.15.2

概述

socket上定义了几个IO事件:状态改变事件、有数据可读事件、有发送缓存可写事件、有IO错误事件。对于这些事件,socket中分别定义了相应的事件处理函数,也称回调函数。

Socket I/O事件的处理过程中,要使用到sock上的两个队列:等待队列和异步通知队列,这两个队列中都保存着等待该Socket I/O事件的进程。

Q:为什么要使用两个队列,等待队列和异步通知队列有什么区别呢?
A:等待队列上的进程会睡眠,直到Socket I/O事件的发生,然后在事件处理函数中被唤醒。异步通知队列上的进程则不需要睡眠,Socket I/O事件发时,事件处理函数会给它们发送到信号,这些进程事先注册的信号处理函数就能够被执行。

等待队列

Socket层使用等待队列来进行阻塞等待,在等待期间,阻塞在此socket上的进程会睡眠。

1
2
3
4
5
6
7
8
9
10
11
12
struct sock {
	...
	struct socket_wq __rcu *sk_wq; /* socket的等待队列和异步通知队列 */
	...
}

struct socket_wq {
	/* Note: wait MUST be first field of socket_wq */
	wait_queue_head_t wait; /* 等待队列头 */
	struct fasync_struct *fasync_list; /* 异步通知队列 */
	struct rcu_head *rcu;
};
(1) socket的等待队列头
1
2
3
4
5
struct __wait_queue_head {
	spinlock_t lock;
	struct list_head task_list;
};
typedef struct __wait_queue_head wait_queue_head_t;
(2) 进程的等待任务
1
2
3
4
5
6
7
8
9
10
struct __wait_queue {
	unsigned int flags;
#define WQ_FLAG_EXCLUSIVE 0x01
	void *private; /* 指向当前的进程控制块 */
	wait_queue_func_t func; /* 唤醒函数 */
	struct list_head task_list; /* 用于链接入等待队列 */
};
typedef struct __wait_queue wait_queue_t;
typedef int (*wait_queue_func_t) (wait_queue_t *wait, unsigned mode, int flags, void *key);
int default_wake_function(wait_queue_t *wait, unsigned mode, int flags, void *key);
(3) 初始化等待任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)

#define DEFINE_WAIT_FUNC(name, function)    \
	wait_queue_t name = {    \
		.private = current,    \
		.func = function,    \
		.task_list = LIST_HEAD_INIT((name).task_list),    \
	}

int autoremove_wake_function(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	int ret = default_wake_function(wait, mode, sync, key); /* 默认的唤醒函数 */

	if (ret)
		list_del_init(&wait->task_list); /* 从等待队列中删除 */

	return ret;
}

int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags, void *key)
{
	return try_to_wake_up(curr->private, mode, wake_flags);
}

try_to_wake_up()通过把进程的状态设置为TASK_RUNNING,并把进程插入CPU运行队列,来唤醒睡眠的进程。

(4) 把等待任务插入到等待队列中

获取sock的等待队列。

1
2
3
4
5
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
	BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
	return &rcu_dereference_raw(sk->sk_wq)->wait;
}

把等待任务加入到等待队列中,同时设置当前进程的状态,TASK_INTERRUPTIBLE或TASK_UNINTERRUPTIBLE。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
	unsigned long flags;
	wait->flags &= ~WQ_FLAG_EXCLUSIVE; /* 可以同时唤醒多个等待进程 */

	spin_lock_irqsave(&q->lock, flags);

	if (list_empty(&wait->task_list))
		__add_wait_queue(q, wait); /* 把等待任务加入到等待队列的头部,会最先被唤醒 */

	set_current_state(state); /* 设置进程的状态 */

	spin_unlock_irqrestore(&q->lock, flags);
}

prepare_to_wait()和prepare_to_wait_exclusive()都是用来把等待任务加入到等待队列中,不同之处在于使用prepare_to_wait_exclusive()时,会在等待任务中添加WQ_FLAG_EXCLUSIVE标志,表示一次只能唤醒一个等待任务,目的是为了避免惊群现象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void prepare_to_wait_exclusive(wait_queue_head_t *q, wait_queue_t *wait, int state)
{
	unsigned long flags;

	/* 这个标志表示一次只唤醒一个等待任务,避免惊群现象 */
	wait->flags |= WQ_FLAG_EXCLUSIVE;

	spin_lock_irqsave(&q->lock, flags);

	if (list_empty(&wait->task_list))
		__add_wait_queue_tail(q, wait); /* 把此等待任务加入到等待队列尾部 */

	set_current_state(state); /* 设置当前进程的状态 */

	spin_unlock_irqrestore(&q->lock, flags);
}

static inline void __add_wait_queue_tail(wait_queue_head_t *head, wait_queue_t *new)
{
	list_add_tail(&new->task_list, &head->task_list);
}

#define set_current_state(state_value)    \
	set_mb(current->state, (state_value))
(5) 删除等待任务

从等待队列中删除等待任务,同时把等待进程的状态置为可运行状态,即TASK_RUNNING。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
 * finish_wait - clean up after waiting in a queue
 * @q: waitqueue waited on,等待队列头
 * @wait: wait descriptor,等待任务
 *
 * Sets current thread back to running state and removes the wait
 * descriptor from the given waitqueue if still queued.
 */
void finish_wait(wait_queue_head_t *q, wait_queue_t *wait)
{
	unsigned long flags;
	__set_current_state(TASK_RUNNING);

	if (! list_empty_careful(&wait->task_list)) {
		spin_lock_irqsave(&q->lock, flags);

		list_del_init(&wait->task_list); /* 从等待队列中删除 */

		spin_unlock_irqrestore(&q->lock, flags);
	}
}

connect等待

(1) 睡眠

connect()的超时时间为sk->sk_sndtimeo,在sock_init_data()中初始化为MAX_SCHEDULE_TIMEOUT,表示无限等待,可以通过SO_SNDTIMEO选项来修改。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static long inet_wait_for_connect(struct sock *sk, long timeo, int writebias)
{
	DEFINE_WAIT(wait);  /* 初始化等待任务 */

	/* 把等待任务加入到socket的等待队列头部,把进程的状态设为TASK_INTERRUPTIBLE */
	prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
	sk->sk_write_pending += writebias;

	/* Basic assumption: if someone sets sk->sk_err, he _must_ change state of the socket
	 * from TCP_SYN_*. Connect() does not allow to get error notifications without closing
	 * the socket.
	 */

	/* 完成三次握手后,状态就会变为TCP_ESTABLISHED,从而退出循环 */
	while ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV)) {
		release_sock(sk); /* 等下要睡觉了,先释放锁 */

		/* 进入睡眠,直到超时或收到信号,或者被I/O事件处理函数唤醒。
		 * 1. 如果是收到信号退出的,timeo为剩余的jiffies。
		 * 2. 如果使用了SO_SNDTIMEO选项,超时退出后,timeo为0。
		 * 3. 如果没有使用SO_SNDTIMEO选项,timeo为无穷大,即MAX_SCHEDULE_TIMEOUT,
		 *      那么返回值也是这个,而超时时间不定。为了无限阻塞,需要上面的while循环。
		 */
		timeo = schedule_timeout(timeo);

		lock_sock(sk); /* 被唤醒后重新上锁 */

		/* 如果进程有待处理的信号,或者睡眠超时了,退出循环,之后会返回错误码 */
		if (signal_pending(current) || !timeo)
			break;

		/* 继续睡眠吧 */
		prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
	}

	/* 等待结束时,把等待进程从等待队列中删除,把当前进程的状态设为TASK_RUNNING */
	finish_wait(sk_sleep(sk), &wait);
	sk->sk_write_pending -= writebias;
	return timeo;
}
(2) 唤醒

三次握手中,当客户端收到SYNACK、发出ACK后,连接就成功建立了。此时连接的状态从TCP_SYN_SENT或TCP_SYN_RECV变为TCP_ESTABLISHED,sock的状态发生变化,会调用sock_def_wakeup()来处理连接状态变化事件,唤醒进程,connect()就能成功返回了。

sock_def_wakeup()的函数调用路径如下:

1
2
3
4
5
6
7
8
9
tcp_v4_rcv
	tcp_v4_do_rcv
		tcp_rcv_state_process
			tcp_rcv_synsent_state_process
				tcp_finish_connect
					sock_def_wakeup
						wake_up_interruptible_all
							__wake_up
								__wake_up_common
1
2
3
4
5
6
7
8
9
10
void tcp_finish_connect(struct sock *sk, struct sk_buff *skb)
{
	...
	tcp_set_state(sk, TCP_ESTABLISHED); /* 在这里设置为连接已建立的状态 */
	...
	if (! sock_flag(sk, SOCK_DEAD)) {
		sk->sk_state_change(sk); /* 指向sock_def_wakeup,会唤醒调用connect()的进程,完成连接的建立 */
		sk_wake_async(sk, SOCK_WAKE_IO, POLL_OUT); /* 如果使用了异步通知,则发送SIGIO通知进程可写 */
	}
}

accept等待

(1) 睡眠

accept()超时时间为sk->sk_rcvtimeo,在sock_init_data()中初始化为MAX_SCHEDULE_TIMEOUT,表示无限等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* Wait for an incoming connection, avoid race conditions.
 * This must be called with the socket locked.
 */
static int inet_csk_wait_for_connect(struct sock *sk, long timeo)
{
	struct inet_connection_sock *icsk = inet_csk(sk);
	DEFINE_WAIT(wait); /* 初始化等待任务 */
	int err;

	for (; ;) {
		/* 把等待任务加入到socket的等待队列中,把进程状态设置为TASK_INTERRUPTIBLE */
		prepare_to_wait_exclusive(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);

		release_sock(sk); /* 等下可能要睡觉了,先释放 */

		if (reqsk_queue_empty(&icsk->icsk_accept_queue)) /* 如果全连接队列为空 */
			timeo = schedule_timeout(timeo); /* 进入睡眠直到超时或收到信号,或被IO事件处理函数唤醒 */

		lock_sock(sk); /* 醒来后重新上锁 */
		err = 0;
		/* 全连接队列不为空时,说明有新的连接建立了,成功返回 */
		if (! reqsk_queue_empty(&icsk->icsk_accept_queue))
			break;

		err = -EINVAL;
		if (sk->sk_state != TCP_LISTEN) /* 如果sock不处于监听状态了,退出,返回错误码 */
			break;

		err = sock_intr_errno(timeo);

		/* 如果进程有待处理的信号,退出,返回错误码。
		 * 因为timeo默认为MAX_SCHEDULE_TIMEOUT,所以err默认为-ERESTARTSYS。
		 * 接下来会重新调用此函数,所以accept()依然阻塞。
		 */
		if (signal_pending(current))
			break;

		err = -EAGAIN;
		if (! timeo) /* 如果等待超时,即超过用户设置的sk->sk_rcvtimeo,退出 */
			break;
	}

	/* 从等待队列中删除等待任务,把等待进程的状态设为TASK_RUNNING */
	finish_wait(sk_sleep(sk), &wait);
	return err;
}
(2) 唤醒

三次握手中,当服务器端接收到ACK完成连接建立的时候,会把新的连接链入全连接队列中,然后唤醒监听socket上的等待进程,accept()就能成功返回了。

三次握手时,当收到客户端的ACK后,经过如下调用:

1
2
3
4
5
6
7
tcp_v4_rcv
	tcp_v4_do_rcv
		tcp_child_process
			sock_def_readable
				wake_up_interruptible_sync_poll
					__wake_up_sync_key
						__wake_up_common

最终调用我们给等待任务注册的唤醒函数。

我们来看下accept()是如何避免惊群现象的。

1
2
3
4
5
6
7
8
9
10
11
12
13
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive,
							 int wake_flags, void *key)
{
	wait_queue_t *curr, *next;

	list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
		unsigned flags = curr->flags;

		if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE)
			!--nr_exclusive)
			break;
	}
}

初始化等待任务时,flags |= WQ_FLAG_EXCLUSIVE。传入的nr_exclusive为1,表示只允许唤醒一个等待任务。

所以这里只会唤醒一个等待的进程,不会导致惊群现象。