kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

Machine Check Exception

dmesg显示

1
2
3
4
5
6
7
8
...

sbridge: HANDLING MCE MEMORY ERROR
CPU 0: Machine Check Exception: 0 Bank 5: 8c00004000010093
TSC 0 ADDR 67081b300 MISC 2140040486 PROCESSOR 0:206d7 TIME 1441181676 SOCKET 0 APIC 0
EDAC MC0: CE row 2, channel 0, label "CPU_SrcID#0_Channel#3_DIMM#0": 1 Unknown error(s): memory read on FATAL area : cpu=0 Err=0001:0093 (ch=3), addr= 0x67081b300 => socket=0, Channel=3(mask=8), rank=0

...

保存4行log为mlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# mcelog --ascii < /tmp/mlog
WARNING: with --dmi mcelog --ascii must run on the same machine with the
	 same BIOS/memory configuration as where the machine check occurred.
sbridge: HANDLING MCE MEMORY ERROR
CPU 0: Machine Check Exception: 0 Bank 5: 8c00004000010093
HARDWARE ERROR. This is *NOT* a software problem!
Please contact your hardware vendor
Wed Sep  2 16:14:36 2015
CPU 0 BANK 5 MISC 2140040486 ADDR 67081b300
STATUS 8c00004000010093 MCGSTATUS 0
CPUID Vendor Intel Family 6 Model 45
WARNING: SMBIOS data is often unreliable. Take with a grain of salt!
<24> DIMM 1333 Mhz Res13 Width 72 Data Width 64 Size 16 GB
Device Locator: Node0_Channel2_Dimm0
Bank Locator: Node0_Bank0
Manufacturer: Hynix Semiconducto
Serial Number: 40743B5A
Asset Tag: Dimm2_AssetTag
Part Number: HMT42GR7BFR4A-PB
TSC 0 ADDR 67081b300 MISC 2140040486 PROCESSOR 0:206d7 TIME 1441181676 SOCKET 0 APIC 0
EDAC MC0: CE row 2, channel 0, label "CPU_SrcID#0_Channel#3_DIMM#0": 1 Unknown error(s): memory read on FATAL area : cpu=0 Err=0001:0093 (ch=3), addr = 0x67081b300 => socket=0, Channel=3(mask=8), rank=0

根据
Part Number: HMT42GR7BFR4A-PB
Serial Number: 40743B5A

在lshw中找相应硬件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
...

	 *-memory:0
	      description: System Memory
	      physical id: 2d
	      slot: System board or motherboard
	    *-bank:0
	         description: DIMM 1333 MHz (0.8 ns)
	         product: HMT42GR7BFR4A-PB
	         vendor: Hynix Semiconducto
	         physical id: 0
	         serial: 905D21AE
	         slot: Node0_Channel1_Dimm0
	         size: 16GiB
	         width: 64 bits
	         clock: 1333MHz (0.8ns)
	    *-bank:1
	         description: DIMM Synchronous [empty]
	         product: A1_Dimm1_PartNumber
	         vendor: Dimm1_Manufacturer
	         physical id: 1
	         serial: Dimm1_SerNum
	         slot: Node0_Channel1_Dimm1
	         width: 64 bits
	    *-bank:2
	         description: DIMM 1333 MHz (0.8 ns)
	         product: HMT42GR7BFR4A-PB
	         vendor: Hynix Semiconducto
	         physical id: 2
	         serial: 40743B5A
	         slot: Node0_Channel2_Dimm0
	         size: 16GiB
	         width: 64 bits
	         clock: 1333MHz (0.8ns)

		...

NAPI机制分析

http://blog.csdn.net/shanshanpt/article/details/20564845

NAPI 的核心在于:在一个繁忙网络,每次有网络数据包到达时,不需要都引发中断,因为高频率的中断可能会影响系统的整体效率,假象一个场景,我们此时使用标准的 100M 网卡,可能实际达到的接收速率为 80MBits/s,而此时数据包平均长度为 1500Bytes,则每秒产生的中断数目为:

1
80M bits/s / (8 Bits/Byte * 1500 Byte) = 6667 个中断 /s

每秒 6667 个中断,对于系统是个很大的压力,此时其实可以转为使用轮询 (polling) 来处理,而不是中断;但轮询在网络流量较小的时没有效率,因此低流量时,基于中断的方式则比较合适,这就是 NAPI 出现的原因,在低流量时候使用中断接收数据包,而在高流量时候则使用基于轮询的方式接收。

现在内核中 NIC 基本上已经全部支持 NAPI 功能,由前面的叙述可知,NAPI 适合处理高速率数据包的处理,而带来的好处则是:

1、中断缓和 (Interrupt mitigation),由上面的例子可以看到,在高流量下,网卡产生的中断可能达到每秒几千次,而如果每次中断都需要系统来处理,是一个很大的压力,而 NAPI 使用轮询时是禁止了网卡的接收中断的,这样会减小系统处理中断的压力;

2、数据包节流 (Packet throttling),NAPI 之前的 Linux NIC 驱动总在接收到数据包之后产生一个 IRQ,接着在中断服务例程里将这个 skb 加入本地的 softnet,然后触发本地 NET_RX_SOFTIRQ 软中断后续处理。如果包速过高,因为 IRQ 的优先级高于 SoftIRQ,导致系统的大部分资源都在响应中断,但 softnet 的队列大小有限,接收到的超额数据包也只能丢掉,所以这时这个模型是在用宝贵的系统资源做无用功。而 NAPI 则在这样的情况下,直接把包丢掉,不会继续将需要丢掉的数据包扔给内核去处理,这样,网卡将需要丢掉的数据包尽可能的早丢弃掉,内核将不可见需要丢掉的数据包,这样也减少了内核的压力。

对NAPI 的使用,一般包括以下的几个步骤:

1、在中断处理函数中,先禁止接收中断,且告诉网络子系统,将以轮询方式快速收包,其中禁止接收中断完全由硬件功能决定,而告诉内核将以轮询方式处理包则是使用函数 netif_rx_schedule(),也可以使用下面的方式,其中的 netif_rx_schedule_prep 是为了判定现在是否已经进入了轮询模式:

将网卡预定为轮询模式

1
void netif_rx_schedule(struct net_device *dev);

或者

1
2
if (netif_rx_schedule_prep(dev))
	__netif_rx_schedule(dev);

2、在驱动中创建轮询函数,它的工作是从网卡获取数据包并将其送入到网络子系统,其原型是:

NAPI 的轮询方法

1
int (*poll)(struct net_device *dev, int *budget);

这里的轮询函数用于在将网卡切换为轮询模式之后,用 poll() 方法处理接收队列中的数据包,如队列为空,则重新切换为中断模式。切换回中断模式需要先关闭轮询模式,使用的是函数 netif_rx_complete (),接着开启网卡接收中断 .。

退出轮询模式

1
void netif_rx_complete(struct net_device *dev);

3、在驱动中创建轮询函数,需要和实际的网络设备 struct net_device 关联起来,这一般在网卡的初始化时候完成,示例代码如下:

设置网卡支持轮询模式

1
2
dev->poll = my_poll;
dev->weight = 64;

里面另外一个字段为权重 (weight),该值并没有一个非常严格的要求,实际上是个经验数据,一般 10Mb 的网卡,我们设置为 16,而更快的网卡,我们则设置为 64。

NAPI的一些相关Interface

下面是 NAPI 功能的一些接口,在前面都基本有涉及,我们简单看看:

1
netif_rx_schedule(dev)

在网卡的中断处理函数中调用,用于将网卡的接收模式切换为轮询

1
netif_rx_schedule_prep(dev)

在网卡是 Up 且运行状态时,将该网卡设置为准备将其加入到轮询列表的状态,可以将该函数看做是 netif_rx_schedule(dev) 的前半部分

1
__netif_rx_schedule(dev)

将设备加入轮询列表,前提是需要 netif_schedule_prep(dev) 函数已经返回了 1

1
__netif_rx_schedule_prep(dev)

与 netif_rx_schedule_prep(dev) 相似,但是没有判断网卡设备是否 Up 及运行,不建议使用

1
netif_rx_complete(dev)

用于将网卡接口从轮询列表中移除,一般在轮询函数完成之后调用该函数。

1
__netif_rx_complete(dev)

Newer newer NAPI

其实之前的 NAPI(New API) 这样的命名已经有点让人忍俊不禁了,可见 Linux 的内核极客们对名字的掌控,比对代码的掌控差太多,于是乎,连续的两次对 NAPI 的重构,被戏称为 Newer newer NAPI 了。

与 netif_rx_complete(dev) 类似,但是需要确保本地中断被禁止

Newer newer NAPI

在最初实现的 NAPI 中,有 2 个字段在结构体 net_device 中,分别为轮询函数 poll() 和权重 weight,而所谓的 Newer newer NAPI,是在 2.6.24 版内核之后,对原有的 NAPI 实现的几次重构,其核心是将 NAPI 相关功能和 net_device 分离,这样减少了耦合,代码更加的灵活,因为 NAPI 的相关信息已经从特定的网络设备剥离了,不再是以前的一对一的关系了。例如有些网络适配器,可能提供了多个 port,但所有的 port 却是共用同一个接受数据包的中断,这时候,分离的 NAPI 信息只用存一份,同时被所有的 port 来共享,这样,代码框架上更好地适应了真实的硬件能力。Newer newer NAPI 的中心结构体是napi_struct:

NAPI 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* 
 * Structure for NAPI scheduling similar to tasklet but with weighting 
*/ 
struct napi_struct { 
	/* The poll_list must only be managed by the entity which 
	 * changes the state of the NAPI_STATE_SCHED bit.  This means 
	 * whoever atomically sets that bit can add this napi_struct 
	 * to the per-cpu poll_list, and whoever clears that bit 
	 * can remove from the list right before clearing the bit. 
	 */ 
	struct list_head      poll_list; 

	unsigned long          state; 
	int              weight; 
	int              (*poll)(struct napi_struct *, int); 
 #ifdef CONFIG_NETPOLL 
	spinlock_t          poll_lock; 
	int              poll_owner; 
 #endif 

	unsigned int          gro_count; 

	struct net_device      *dev; 
	struct list_head      dev_list; 
	struct sk_buff          *gro_list; 
	struct sk_buff          *skb; 
};

熟悉老的 NAPI 接口实现的话,里面的字段 poll_list、state、weight、poll、dev、没什么好说的,gro_count 和 gro_list 会在后面讲述 GRO 时候会讲述。需要注意的是,与之前的 NAPI 实现的最大的区别是该结构体不再是 net_device 的一部分,事实上,现在希望网卡驱动自己单独分配与管理 napi 实例,通常将其放在了网卡驱动的私有信息,这样最主要的好处在于,如果驱动愿意,可以创建多个 napi_struct,因为现在越来越多的硬件已经开始支持多接收队列 (multiple receive queues),这样,多个 napi_struct 的实现使得多队列的使用也更加的有效。

与最初的 NAPI 相比较,轮询函数的注册有些变化,现在使用的新接口是:

1
2
void netif_napi_add(struct net_device *dev, struct napi_struct *napi, 
					int (*poll)(struct napi_struct *, int), int weight)

熟悉老的 NAPI 接口的话,这个函数也没什么好说的。

值得注意的是,前面的轮询 poll() 方法原型也开始需要一些小小的改变:

1
int (*poll)(struct napi_struct *napi, int budget);

大部分 NAPI 相关的函数也需要改变之前的原型,下面是打开轮询功能的 API:

1
2
3
4
5
6
7
void netif_rx_schedule(struct net_device *dev, 
						struct napi_struct *napi); 
/* ...or... */ 
int netif_rx_schedule_prep(struct net_device *dev, 
						struct napi_struct *napi); 
void __netif_rx_schedule(struct net_device *dev, 
						struct napi_struct *napi);

轮询功能的关闭则需要使用:

1
2
void netif_rx_complete(struct net_device *dev, 
						struct napi_struct *napi);

因为可能存在多个 napi_struct 的实例,要求每个实例能够独立的使能或者禁止,因此,需要驱动作者保证在网卡接口关闭时,禁止所有的 napi_struct 的实例。

函数 netif_poll_enable() 和 netif_poll_disable() 不再需要,因为轮询管理不再和 net_device 直接管理,取而代之的是下面的两个函数:

1
2
void napi_enable(struct napi *napi); 
void napi_disable(struct napi *napi);

linux下ip协议(V4)的实现

这次主要介绍的是ip层的切片与组包的实现。

首先来看一下分片好的帧的一些概念:

1 第一个帧的offset位非0并且MF位为1

2 所有的在第一个帧和最后一个帧之间的帧都拥有长度大于0的域

3 最后一个帧MF位为0 并且offset位非0。(这样就能判断是否是最后一个帧了).

这里要注意在linux中,ip头的frag_off域包含了 rfcip头的定义中的nf,df,以及offset域,因此我们每次需要按位与来取得相应的域的值,看下面

ip_local_deliver的代码片段就清楚了:

1
2
3
4
5
	// 取出mf位和offset域,从而决定是否要组包。
	if (ip_hdr(skb)->frag_off & htons(IP_MF | IP_OFFSET)) {
		if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
			return 0;
	}

而fragmentation/defragmentation 子系统的初始化是通过ipfrag_init来实现了,而它是被inet_init来调用的。它主要做的是注册sys文件系统节点,并开启一个定时器,以及初始化一些相关的变量.这个函数的初始化以及相关的数据结构的详细介绍,我们会在后面的组包小节中介绍。现在我们先来看切片的处理。

相对于组包,切片逻辑什么的都比较简单。切片的主要函数是ip_fragment.它的输入包包括下面几种:

1 要被转发的包(没有切片的)。

2 要被转发的包(已经被路由器或者源主机切片了的).

3 被本地函数所创建的buffer,简而言之也就是本地所要传输的数据包(还未加包头),但是需要被切片的。

而ip_fragment所必须处理下面几种情况:

1 一大块数据需要被分割为更小的部分。

2 一堆数据片段(我的上篇blog有介绍,也就是ip_append_data已经切好的数据包,或者tcp已经切好的数据包)不需要再被切片。

上面的两种情况其实就是看高层(4层)协议有没有做切片工作(按照PMTU)了。如果已经被切片(其实也算不上切片(4层不能处理ip头),只能说i4层为了ip层更好的处理数据包,从而帮ip层做了一部分工作),则ip层所做的很简单,就是给每个包加上ip头就可以了。

切片分为两种类型,一种是fast (或者说 efficient)切片,这种也就是4层已经切好片,这里只需要加上ip头就可以了,一种是slow切片,也就是需要现在切片。

下来来看切片的主要任务:

1 将数据包切片为MTU大小(通过ptmu).

2 初始化每一个fragment的ip 头。还要判断一些option的copy位,因为并不是每一种option都要放在所有已切片的fragment 的ip头中的。

3 计算ip层的校验值。

4 通过netfilter过滤。

5 update 一些kernel 域以及snmp 统计值。

接下来来看ip_fragment的具体实现:

1
int ip_fragment(struct sk_buff *skb, int (*output)(struct sk_buff*))

第一个参数skb表示将要被切片的ip包,第二个参数是一个传输切片的输出函数(切片完毕后就交给这个函数处理)。比如ip_finish_output2类似的。

这个函数我们来分段看,首先来看它进行切片前的一些准备工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
	// 先是取出了一些下面将要使用的变量。
	struct iphdr *iph;
	int raw = 0;
	int ptr;
	struct net_device *dev;
	struct sk_buff *skb2;
	unsigned int mtu, hlen, left, len, ll_rs, pad;
	int offset;
	__be16 not_last_frag;
	// 路由表
	struct rtable *rt = skb->rtable;
	int err = 0;
	// 网络设备
	dev = rt->u.dst.dev;

	// ip头
	iph = ip_hdr(skb);
	// 判断DF位,我们知道如果df位被设置了话就表示不要被切片,这时ip_fragment将会发送一个icmp豹纹返回到源主机。这里主要是为forward数据所判断。
	if (unlikely((iph->frag_off & htons(IP_DF)) && !skb->local_df)) {
		IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGFAILS);
		icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED,
			  htonl(ip_skb_dst_mtu(skb)));
		kfree_skb(skb);
		return -EMSGSIZE;
	}
	// 得到ip头的长度
	hlen = iph->ihl * 4;
	// 得到mtu的大小。这里要注意,他的大小减去了hlen,也就是ip头的大小。
	mtu = dst_mtu(&rt->u.dst) - hlen;    /* Size of data space */
	IPCB(skb)->flags |= IPSKB_FRAG_COMPLETE;

不管是slow还是fast 被切片的任何一个帧如果传输失败,ip_fragment都会立即返回一个错误给4层,并且紧跟着的帧也不会再被传输,然后将处理方法交给4层去做。

接下来我们来看fast 切片。 一般用fast切片的都是经由4层的ip_append_data和ip_push_pending函数(udp)将数据包已经切片好的,或者是tcp层已经切片好的数据包,才会用fast切片.

这里要主要几个问题:
1 每一个切片的大小都不能超过PMTU。
2 只有最后一个切片才会有3层的整个数据包的大小。
3 每一个切片都必须有足够的大小来允许2层加上自己的头。

我们先看一下skb_pagelen这个函数(下面的处理会用到),这个函数用来得到当前skb的len,首先我们要知道(我前面的blog有介绍)在sk_write_queue的sk_buff队列中,每一个sk_buff的len = x(也就是么一个第一个切片的包的l4 payload的长度) + S1 (这里表示所有的frags域的数据的总大小,也就是data_len的长度)。可以先看下面的图:

很容易一目了然。

1
2
3
4
5
6
7
8
9
static inline int skb_pagelen(const struct sk_buff *skb)
{
	int i, len = 0;
	// 我们知道如果设备支持S/G IO的话,nr_frags会包含一些L4 payload,因此我们需要先遍历nr_frags.然后加入它的长度。
	for (i = (int)skb_shinfo(skb)->nr_frags - 1; i >= 0; i--)
		len += skb_shinfo(skb)->frags[i].size;
	// 最后加上skb_headlen,而skb_headlen = skb->len - skb->data_len;因此这里就会返回这个数据包的len。
	return len + skb_headlen(skb);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
	// 通过上一篇blog我们知道,如果4层将数据包分片了,那么就会把这些数据包放到skb的frag_list链表中,因此我们这里首先先判断frag_list链表是否为空,为空的话我们将会进行slow 切片。
	if (skb_shinfo(skb)->frag_list) {
		struct sk_buff *frag;
		// 取得第一个数据报的len.我们知道当sk_write_queue队列被flush后,除了第一个切好包的另外的包都会加入到frag_list中,而这里我们我们需要得到的第一个包(也就是本身这个sk_buff)的长度。
		int first_len = skb_pagelen(skb);
		int truesizes = 0;
		// 接下来的判断都是为了确定我们能进行fast切片。切片不能被共享,这是因为在fast path 中,我们需要加给每个切片不同的ip头(而并不会复制每个切片)。因此在fast path中是不可接受的。而在slow path中,就算有共享也无所谓,因为他会复制每一个切片,使用一个新的buff。

		// 判断第一个包长度是否符合一些限制(包括mtu,mf位等一些限制).如果第一个数据报的len没有包含mtu的大小这里之所以要把第一个切好片的数据包单独拿出来检测,是因为一些域是第一个包所独有的(比如IP_MF要为1)。这里由于这个mtu是不包括hlen的mtu,因此我们需要减去一个hlen。
		if (first_len - hlen > mtu ||
			((first_len - hlen) & 7) ||
			(iph->frag_off & htons(IP_MF|IP_OFFSET)) ||
			skb_cloned(skb))
			goto slow_path;
		// 遍历剩余的frag。
		for (frag = skb_shinfo(skb)->frag_list; frag; frag = frag->next) {
			/* Correct geometry. */
			// 判断每个帧的mtu,以及相关的东西,如果不符合条件则要进行slow path,基本和上面的第一个skb的判断类似。
			if (frag->len > mtu ||
				((frag->len & 7) && frag->next) ||
				skb_headroom(frag) < hlen)
				goto slow_path;
			// 判断是否共享。
			/* Partially cloned skb? */
			if (skb_shared(frag))
				goto slow_path;

			BUG_ON(frag->sk);
			// 进行socket的一些操作。
			if (skb->sk) {
				sock_hold(skb->sk);
				frag->sk = skb->sk;
				frag->destructor = sock_wfree;
				truesizes += frag->truesize;
			}
		}

		// 通过上面的检测,都通过了,因此我们可以进行fast path切片了。

		// 先是设置一些将要处理的变量的值。
		err = 0;
		offset = 0;
		// 取得frag_list列表
		frag = skb_shinfo(skb)->frag_list;
		skb_shinfo(skb)->frag_list = NULL;

		// 得到数据(不包括头)的大小。
		skb->data_len = first_len - skb_headlen(skb);
		skb->truesize -= truesizes;
		// 得到
		skb->len = first_len;
		iph->tot_len = htons(first_len);
		// 设置mf位
		iph->frag_off = htons(IP_MF);
		// 执行校验
		ip_send_check(iph);

		for (;;) {
			// 开始进行发送。
			if (frag) {
				// 设置校验位
				frag->ip_summed = CHECKSUM_NONE;
				// 设置相应的头部。
				skb_reset_transport_header(frag);
				__skb_push(frag, hlen);
				skb_reset_network_header(frag);
				// 复制ip头。
				memcpy(skb_network_header(frag), iph, hlen);
				// 修改每个切片的ip头的一些属性。
				iph = ip_hdr(frag);
				iph->tot_len = htons(frag->len);
				// 将当前skb的一些属性付给将要传递的切片好的帧。
				ip_copy_metadata(frag, skb);
				if (offset == 0)
				// 处理ip_option
					ip_options_fragment(frag);
				offset += skb->len - hlen;
				// 设置位移。
				iph->frag_off = htons(offset>>3);
				if (frag->next != NULL)
					iph->frag_off |= htons(IP_MF);
				/* Ready, complete checksum */
				ip_send_check(iph);
			}
			// 调用输出函数。
			err = output(skb);

			if (!err)
				IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGCREATES);
			if (err || !frag)
				break;
			// 处理链表中下一个buf。
			skb = frag;
			frag = skb->next;
			skb->next = NULL;
		}

		if (err == 0) {
			IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGOKS);
			return 0;
		}
		// 释放内存。
		while (frag) {
			skb = frag->next;
			kfree_skb(frag);
			frag = skb;
		}
		IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGFAILS);
		return err;
	}

再接下来我们来看slow fragmentation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
	// 切片开始的位移
	left = skb->len - hlen;      /* Space per frame */
	// 而ptr就是切片开始的指针。
	ptr = raw + hlen;       /* Where to start from */

	/* for bridged IP traffic encapsulated inside f.e. a vlan header,
	 * we need to make room for the encapsulating header
	 */
	// 处理桥接的相关操作。
	pad = nf_bridge_pad(skb);
	ll_rs = LL_RESERVED_SPACE_EXTRA(rt->u.dst.dev, pad);
	mtu -= pad;

	// 其实也就是取出取出ip offset域。
	offset = (ntohs(iph->frag_off) & IP_OFFSET) << 3;
	// not_last_frag,顾名思义,其实也就是表明这个帧是否是最后一个切片。
	not_last_frag = iph->frag_off & htons(IP_MF);


	// 开始为循环处理,每一个切片创建一个skb buffer。
	while (left > 0) {
		len = left;
		// 如果len大于mtu,我们设置当前的将要切片的数据大小为mtu。
		if (len > mtu)
			len = mtu;
		// 长度也必须位对齐。
		if (len < left)  {
			len &= ~7;
		}
		// malloc一个新的buff。它的大小包括ip payload,ip head,以及L2 head.
		if ((skb2 = alloc_skb(len+hlen+ll_rs, GFP_ATOMIC)) == NULL) {
			NETDEBUG(KERN_INFO "IP: frag: no memory for new fragment!\n");
			err = -ENOMEM;
			goto fail;
		}
		// 调用ip_copy_metadata复制一些相同的值的域。
		ip_copy_metadata(skb2, skb);
		// 进行skb的相关操作。为了加上ip头。
		skb_reserve(skb2, ll_rs);
		skb_put(skb2, len + hlen);
		skb_reset_network_header(skb2);
		skb2->transport_header = skb2->network_header + hlen;
		// 将每一个分片的ip包都关联到源包的socket上。
		if (skb->sk)
			skb_set_owner_w(skb2, skb->sk);
		// 开始填充新的ip包的数据。

		// 先拷贝包头。
		skb_copy_from_linear_data(skb, skb_network_header(skb2), hlen);
		// 拷贝数据部分,这个函数实现的比较复杂。
		if (skb_copy_bits(skb, ptr, skb_transport_header(skb2), len))
			BUG();
		left -= len;
		// 填充相应的ip头。
		iph = ip_hdr(skb2);
		iph->frag_off = htons((offset >> 3));

		// 第一个包,因此进行ip_option处理。
		if (offset == 0)
			ip_options_fragment(skb);
		// 不是最后一个包,因此设置mf位。
		if (left > 0 || not_last_frag)
			iph->frag_off |= htons(IP_MF);
		// 移动指针以及更改位移大小。
		ptr += len;
		offset += len;
		// update包头的大小。
		iph->tot_len = htons(len + hlen);
		// 重新计算校验。
		ip_send_check(iph);
		//最终输出。
		err = output(skb2);
		if (err)
			goto fail;

		IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGCREATES);
	}
	kfree_skb(skb);
	IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGOKS);
	return err;

接下来来看ip组包的实现。首先要知道每一个切片(属于同一个源包的)的ip包 id都是相同的。

首先来看相应的数据结构。在内核中,每一个ip包(切片好的)都是一个struct ipq链表。而不同的数据包(这里指不是属于同一个源包的数据包)都保

存在一个hash表中。也就是ip4_frags这个变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static struct inet_frags ip4_frags;

#define INETFRAGS_HASHSZ        64

struct inet_frags {
	struct hlist_head   hash[INETFRAGS_HASHSZ];
	rwlock_t        lock;
	// 随机值,它被用在计算hash值上面,下面会介绍到,过一段时间,内核就会更新这个值。
	u32         rnd;
	int         qsize;
	int         secret_interval;
	struct timer_list   secret_timer;
	// hash函数
	unsigned int        (*hashfn)(struct inet_frag_queue *);
	void            (*constructor)(struct inet_frag_queue *q,
						void *arg);
	void            (*destructor)(struct inet_frag_queue *);
	void            (*skb_free)(struct sk_buff *);
	int         (*match)(struct inet_frag_queue *q,
						void *arg);
	void            (*frag_expire)(unsigned long data);
};

struct ipq {
	struct inet_frag_queue q;
	u32     user;
	// 都是ip头相关的一些域。
	__be32      saddr;
	__be32      daddr;
	__be16      id;
	u8      protocol;
	int             iif;
	unsigned int    rid;
	struct inet_peer *peer;
};

struct inet_frag_queue {
	struct hlist_node   list;
	struct netns_frags  *net;
	// 基于LRU算法,主要用在GC上。
	struct list_head    lru_list;   /* lru list member */
	spinlock_t      lock;
	atomic_t        refcnt;
	// 属于同一个源的数据包的定时器,当定时器到期,切片还没到达,此时就会drop掉所有的数据切片。
	struct timer_list   timer;      /* when will this queue expire? */
	// 保存有所有的切片链表(从属于同一个ip包)
	struct sk_buff      *fragments; /* list of received fragments */
	ktime_t         stamp;
	int         len;        /* total length of orig datagram */
	// 表示从源ip包已经接收的字节数。
	int         meat;
	// 这个域主要可以设置为下面的3种值。
	__u8            last_in;    /* first/last segment arrived? */

// 完成,第一个帧以及最后一个帧。
#define INET_FRAG_COMPLETE  4
#define INET_FRAG_FIRST_IN  2
#define INET_FRAG_LAST_IN   1
};

看下面的图就一目了然了:

首先来看组包要解决的一些问题:

1 fragment必须存储在内存中,知道他们全部都被网络子系统处理。才会释放,因此内存会是个巨大的浪费。

2 这里虽然使用了hash表,可是假设恶意攻击者得到散列算法并且伪造数据包来尝试着降低一些hash表中的元素的比重,从而使执行变得缓慢。这里linux使用一个定时器通过制造的随机数来使hash值的生成不可预测。

这个定时器的初始化是通过ipfrag_init(它会初始化上面提到的ip4_frags全局变量)调用inet_frags_init进行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void inet_frags_init(struct inet_frags *f)
{
	int i;

	for (i = 0; i < INETFRAGS_HASHSZ; i++)
		INIT_HLIST_HEAD(&f->hash[i]);

	rwlock_init(&f->lock);

	f->rnd = (u32) ((num_physpages ^ (num_physpages>>7)) ^
				   (jiffies ^ (jiffies >> 6)));
	// 安装定时器,当定时器到期就会调用inet_frag_secret_rebuild方法。
	setup_timer(&f->secret_timer, inet_frag_secret_rebuild,
			(unsigned long)f);
	f->secret_timer.expires = jiffies + f->secret_interval;
	add_timer(&f->secret_timer);
}

static void inet_frag_secret_rebuild(unsigned long dummy)
{
................................................

	write_lock(&f->lock);
	// 得到随机值
	get_random_bytes(&f->rnd, sizeof(u32));

	// 然后通过这个随机值重新计算整个hash表的hash值。
	for (i = 0; i < INETFRAGS_HASHSZ; i++) {
		struct inet_frag_queue *q;
		struct hlist_node *p, *n;

		hlist_for_each_entry_safe(q, p, n, &f->hash[i], list) {
			unsigned int hval = f->hashfn(q);

			if (hval != i) {
				hlist_del(&q->list);

				/* Relink to new hash chain. */
				hlist_add_head(&q->list, &f->hash[hval]);
			}
		}
	}
..............................................
}

3 ip协议是不可靠的,因此切片有可能被丢失。内核处理这个,是使用了一个定时器(每个数据包(也就是这个切片从属于的那个数据包)).当定时器到期,而切片没有到达,就会丢弃这个包。

4 由于ip协议是无连接的,因此当高层决定重传数据包的时候,组包时有可能会出现多个重复分片的情况。这是因为ip包是由4个域来判断的,源和目的地址,包id以及4层的协议类型。而最主要的是包id。可是包id只有16位,因此一个gigabit网卡几乎在半秒时间就能用完这个id一次。而第二次重传的数据包有可能走的和第一个第一次时不同的路径,因此内核必须每个切片都要检测和前面接受的切片的重叠情况的发生。

先来看ip_defrag用到的几个函数:

inet_frag_create: 创建一个新的ipq实例

ip_evitor: remove掉所有的未完成的数据包。它每次都会update一个LRU链表。每次都会把一个新的ipq数据结构加到ipq_lru_list的结尾。

ip_find: 发现切片所从属的数据包的切片链表。

ip_frag_queue: 排队一个给定的切片刀一个切片列表。这个经常和上一个方法一起使用。

ip_frag_reasm: 当所有的切片都到达后,build一个ip数据包。

ip_frag_destroy: remove掉传进来的ipq数据结构。包括和他有联系的所有的ip切片。

ipq_put: 将引用计数减一,如果为0,则直接调用ip_frag_destroy.

1
2
3
4
5
static inline void inet_frag_put(struct inet_frag_queue *q, struct inet_frags *f)
{
	if (atomic_dec_and_test(&q->refcnt))
		inet_frag_destroy(q, f, NULL);
}

ipq_kill: 主要用在gc上,标记一个ipq数据结构可以被remove,由于一些帧没有按时到达。

接下来来看ip_defrag的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int ip_defrag(struct sk_buff *skb, u32 user)
{
	struct ipq *qp;
	struct net *net;

	net = skb->dev ? dev_net(skb->dev) : dev_net(skb->dst->dev);
	IP_INC_STATS_BH(net, IPSTATS_MIB_REASMREQDS);

	// 如果内存不够,则依据lru算法进行清理。
	if (atomic_read(&net->ipv4.frags.mem) > net->ipv4.frags.high_thresh)
		ip_evictor(net);

	// 查找相应的iqp,如果不存在则会新创建一个(这些都在ip_find里面实现)
	if ((qp = ip_find(net, ip_hdr(skb), user)) != NULL) {
		int ret;

		spin_lock(&qp->q.lock);
		// 排队进队列。
		ret = ip_frag_queue(qp, skb);

		spin_unlock(&qp->q.lock);
		ipq_put(qp);
		return ret;
	}

	IP_INC_STATS_BH(net, IPSTATS_MIB_REASMFAILS);
	kfree_skb(skb);
	return -ENOMEM;
}

我们可以看到这里最重要的一个函数其实是ip_frag_queue,它主要任务是:

1 发现输入帧在源包的位置。
2 基于blog刚开始所描述的,判断是否是最后一个切片。
3 插入切片到切片列表(从属于相同的ip包)
4 update 垃圾回收所用到的ipq的一些相关域。
5 校验l4层的校验值(在硬件计算).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// 其中qp是源ip包的所有切片链表,而skb是将要加进来切片。
static int ip_frag_queue(struct ipq *qp, struct sk_buff *skb)
{
	.............................
	//  INET_FRAG_COMPLETE表示所有的切片包都已经抵达,这个时侯就不需要再组包了,因此这里就是校验函数有没有被错误的调用。
	if (qp->q.last_in & INET_FRAG_COMPLETE)
		goto err;
	.................................................
	// 将offset 8字节对齐、
	offset = ntohs(ip_hdr(skb)->frag_off);
	flags = offset & ~IP_OFFSET;
	offset &= IP_OFFSET;
	offset <<= 3;     /* offset is in 8-byte chunks */
	ihl = ip_hdrlen(skb);

	// 计算这个新的切片包的结束位置。
	end = offset + skb->len - ihl;
	err = -EINVAL;

	// MF没有设置,表明这个帧是最后一个帧。进入相关处理。
	if ((flags & IP_MF) == 0) {
		/* If we already have some bits beyond end
		 * or have different end, the segment is corrrupted.
		 */
	// 设置相应的len位置,以及last_in域。
		if (end < qp->q.len ||
			((qp->q.last_in & INET_FRAG_LAST_IN) && end != qp->q.len))
			goto err;
		qp->q.last_in |= INET_FRAG_LAST_IN;
		qp->q.len = end;
	} else {
		// 除了最后一个切片,每个切片都必须是8字节的倍数。
		if (end&7) {
			// 不是8字节的倍数,kernel截断这个切片。此时就需要l4层的校验重新计算,因此设置ip_summed为 CHECKSUM_NONE
			end &= ~7;
			if (skb->ip_summed != CHECKSUM_UNNECESSARY)
				skb->ip_summed = CHECKSUM_NONE;
		}
		if (end > qp->q.len) {
			// 数据包太大,并且是最后一个包,则表明这个数据包出错,因此drop它。
			/* Some bits beyond end -> corruption. */
			if (qp->q.last_in & INET_FRAG_LAST_IN)
				goto err;
			qp->q.len = end;
		}
	}
	// ip头不能被切片,因此end肯定会大于offset。
	if (end == offset)
		goto err;

	err = -ENOMEM;
	// remove掉ip头。
	if (pskb_pull(skb, ihl) == NULL)
		goto err;
	// trim掉一些padding,然后重新计算checksum。
	err = pskb_trim_rcsum(skb, end - offset);
	if (err)
		goto err;

	// 接下来遍历并将切片(为了找出当前将要插入的切片的位置),是以offset为基准。这里要合租要FRAG_CB宏是用来提取sk_buff->cb域。
	prev = NULL;
	for (next = qp->q.fragments; next != NULL; next = next->next) {
		if (FRAG_CB(next)->offset >= offset)
			break;  /* bingo! */
		prev = next;
	}
	// 当prev!=NULL时,说明这个切片要插入到列表当中。
	if (prev) {
		// 计算有没有重叠。
		int i = (FRAG_CB(prev)->offset + prev->len) - offset;
		// 大于0.证明有重叠,因此进行相关处理
		if (i > 0) {
			// 将重叠部分用新的切片覆盖。
			offset += i;
			err = -EINVAL;
			if (end <= offset)
				goto err;
			err = -ENOMEM;
			//移动i个位置。
			if (!pskb_pull(skb, i))
				goto err;
			// 需要重新计算L4的校验。
			if (skb->ip_summed != CHECKSUM_UNNECESSARY)
				skb->ip_summed = CHECKSUM_NONE;
		}
	}

	err = -ENOMEM;

	while (next && FRAG_CB(next)->offset < end) {
		// 和上面的判断很类似,也是先计算重叠数。这里要注意重叠分为两种情况:1;一个或多个切片被新的切片完全覆盖。2;被部分覆盖,因此这里我们需要分两种情况进行处理。
		int i = end - FRAG_CB(next)->offset; /* overlap is 'i' bytes */

		if (i < next->len) {
			// 被部分覆盖的情况。将新的切片offset移动i字节,然后remove掉老的切片中的i个字节。
			/* Eat head of the next overlapped fragment
			 * and leave the loop. The next ones cannot overlap.
			 */
			if (!pskb_pull(next, i))
				goto err;
			FRAG_CB(next)->offset += i;
			// 将接收到的源数据报的大小减去i,也就是remove掉不完全覆盖的那一部分。
			qp->q.meat -= i;
			// 重新计算l4层的校验。
			if (next->ip_summed != CHECKSUM_UNNECESSARY)
				next->ip_summed = CHECKSUM_NONE;
			break;
		} else {
			// 老的切片完全被新的切片覆盖,此时只需要remove掉老的切片就可以了。
			struct sk_buff *free_it = next;
			next = next->next;

			if (prev)
				prev->next = next;
			else
				qp->q.fragments = next;
			// 将qp的接受字节数更新。
			qp->q.meat -= free_it->len;
			frag_kfree_skb(qp->q.net, free_it, NULL);
		}
	}

	FRAG_CB(skb)->offset = offset;

....................................................
	atomic_add(skb->truesize, &qp->q.net->mem);
	// offset为0说明是第一个切片,因此设置相应的位。
	if (offset == 0)
		qp->q.last_in |= INET_FRAG_FIRST_IN;

	if (qp->q.last_in == (INET_FRAG_FIRST_IN | INET_FRAG_LAST_IN) &&
		qp->q.meat == qp->q.len)
		// 所有条件的满足了,就开始buildip包。
		return ip_frag_reasm(qp, prev, dev);
	write_lock(&ip4_frags.lock);
	// 从将此切片加入到lry链表中。
	list_move_tail(&qp->q.lru_list, &qp->q.net->lru_list);
	write_unlock(&ip4_frags.lock);
	return -EINPROGRESS;

err:
	kfree_skb(skb);
	return err;
}

如果网络设备提供L4层的硬件校验的话,输入ip帧还会进行L4的校验计算。当帧通过ip_frag_reasm组合好,它会进行校验的重新计算。我们这里通过设置skb->ip_summed到CHECKSUM_NONE,来表示需要娇艳的标志。

最后来看下GC。

内核为ip切片数据包实现了两种类型的垃圾回收。

1 系统内存使用限制。

2 组包的定时器

这里有一个全局的ip_frag_mem变量,来表示当前被切片所占用的内存数。每次一个新的切片被加入,这个值都会更新。而所能使用的最大内存可以在运行时改变,是通过/proc的sysctl_ipfrag_high_thresh来改变的,因此我们能看到当ip_defrag时,一开始会先判断内存的限制:

1
2
if (atomic_read(&net->ipv4.frags.mem) > net->ipv4.frags.high_thresh)
		ip_evictor(net);

当一个切片数据包到达后,内核会启动一个组包定时器,他是为了避免一个数据包占据ipq_hash太长时间,因此当定时器到期后,它就会清理掉在hash表中的相应的qp结构(也就是所有的未完成切片包).这个处理函数就是ip_expire,它的初始化是在ipfrag_init进行的。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static void ip_expire(unsigned long arg)
{
	struct ipq *qp;
	struct net *net;
	// 取出相应的qp,以及net域。
	qp = container_of((struct inet_frag_queue *) arg, struct ipq, q);
	net = container_of(qp->q.net, struct net, ipv4.frags);

	spin_lock(&qp->q.lock);
	// 如果数据包已经传输完毕,则不进行任何处理,直接退出。
	if (qp->q.last_in & INET_FRAG_COMPLETE)
		goto out;
	// 调用ipq_kill,这个函数主要是减少qp的引用计数,并从相关链表(比如LRU_LIST)中移除它。
	ipq_kill(qp);

	IP_INC_STATS_BH(net, IPSTATS_MIB_REASMTIMEOUT);
	IP_INC_STATS_BH(net, IPSTATS_MIB_REASMFAILS);

	// 如果是第一个切片,则发送一个ICMP给源主机。
	if ((qp->q.last_in & INET_FRAG_FIRST_IN) && qp->q.fragments != NULL) {
		struct sk_buff *head = qp->q.fragments;

		/* Send an ICMP "Fragment Reassembly Timeout" message. */
		if ((head->dev = dev_get_by_index(net, qp->iif)) != NULL) {
			icmp_send(head, ICMP_TIME_EXCEEDED, ICMP_EXC_FRAGTIME, 0);
			dev_put(head->dev);
		}
	}
out:
	spin_unlock(&qp->q.lock);
	ipq_put(qp);
}

dev_queue_xmi函数详解

blog.chinaunix.net/uid-20788636-id-3181312.html

前面在分析IPv6的数据流程时,当所有的信息都准备好了之后,例如,出口设备,下一跳的地址,以及链路层地址。就会调用dev.c文件中的dev_queue_xmin函数,该函数是设备驱动程序执行传输的接口。也就是所有的数据包在填充完成后,最终发送数据时,都会调用该函数。

dev_queue_xmit函数只接收一个skb_buff结构作为输入的值。此数据结构包含了此函数所需要的一切信息。Skb->dev是出口设备,skb->data为有效的载荷的开头,其长度为skb->len.下面是2.6.37版本内核中的dev_queue_xmit函数,该版本的内核与之前的版本有了不少的区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
int dev_queue_xmit(struct sk_buff *skb)
{
	struct net_device *dev = skb->dev;
	struct netdev_queue *txq;
	struct Qdisc *q;
	int rc = -ENOMEM;

	/* Disable soft irqs for various locks below. Also
	 * stops preemption for RCU.
	 */
	//关闭软中断 - __rcu_read_lock_bh()--->local_bh_disable();
	rcu_read_lock_bh();
	// 选择一个发送队列,如果设备提供了select_queue回调函数就使用它,否则由内核选择一个队列,这里只是Linux内核多队列的实现,但是要真正的使用都队列,需要网卡支持多队列才可以,一般的网卡都只有一个队列。在调用alloc_etherdev分配net_device是,设置队列的个数
	txq = dev_pick_tx(dev, skb);
	//从netdev_queue结构上获取设备的qdisc
	q = rcu_dereference_bh(txq->qdisc);

#ifdef CONFIG_NET_CLS_ACT
	skb->tc_verd = SET_TC_AT(skb->tc_verd, AT_EGRESS);
#endif
	//如果硬件设备有队列可以使用,该函数由dev_queue_xmit函数直接调用或由dev_queue_xmit通过qdisc_run函数调用
	trace_net_dev_queue(skb);
	if (q->enqueue) {
		rc = __dev_xmit_skb(skb, q, dev, txq); //使用流控对象发送数据包(包含入队和出队)
		//更详细的内容参考说明3
		goto out;
	}

	//下面的处理是在没有发送队列的情况下
	/* The device has no queue. Common case for software devices:
	 loopback, all the sorts of tunnels...

	 Really, it is unlikely that netif_tx_lock protection is necessary
	 here. (f.e. loopback and IP tunnels are clean ignoring statistics
	 counters.)
	 However, it is possible, that they rely on protection
	 made by us here.

	 Check this and shot the lock. It is not prone from deadlocks.
	 Either shot noqueue qdisc, it is even simpler 8)
	 */
	//首先,确定设备是开启的,并且还要确定队列是运行的,启动和停止队列有驱动程序决定
	//设备没有输出队列典型的是回环设备。这里需要做的就是直接调用dev_start_queue_xmit、、函数,经过驱动发送出去,如果发送失败,就直接丢弃,没有队列可以保存。
	if (dev->flags & IFF_UP) {
		int cpu = smp_processor_id(); /* ok because BHs are off */

		if (txq->xmit_lock_owner != cpu) {

			if (__this_cpu_read(xmit_recursion) > RECURSION_LIMIT)
				goto recursion_alert;

			HARD_TX_LOCK(dev, txq, cpu);

			if (!netif_tx_queue_stopped(txq)) {
				__this_cpu_inc(xmit_recursion);
				rc = dev_hard_start_xmit(skb, dev, txq);//见说明4
				__this_cpu_dec(xmit_recursion);
				if (dev_xmit_complete(rc)) {
					HARD_TX_UNLOCK(dev, txq);
					goto out;
				}
			}
			HARD_TX_UNLOCK(dev, txq);
			if (net_ratelimit())
				printk(KERN_CRIT "Virtual device %s asks to "
				 "queue packet!\n", dev->name);
		} else {
			/* Recursion is It is possible,
			 * unfortunately
			 */
recursion_alert:
			if (net_ratelimit())
				printk(KERN_CRIT "Dead loop on virtual device "
				 "%s, fix it urgently!\n", dev->name);
		}
	}

	rc = -ENETDOWN;
	rcu_read_unlock_bh();

	kfree_skb(skb);
	return rc;
out:
	rcu_read_unlock_bh();
	return rc;
}
1. 下面是dev_pick_tx函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static struct netdev_queue *dev_pick_tx(struct net_device *dev,
					struct sk_buff *skb)
{
	int queue_index;
	const struct net_device_ops *ops = dev->netdev_ops;

	if (ops->ndo_select_queue) {
		//选择一个索引,这个策略可以设置,比如优先选择视频和音频队列,而哪个队列邦定哪个策略也是设定的。
		queue_index = ops->ndo_select_queue(dev, skb);
		queue_index = dev_cap_txqueue(dev, queue_index);
	} else {
		struct sock *sk = skb->sk;
		queue_index = sk_tx_queue_get(sk);
		if (queue_index < 0 || queue_index >= dev->real_num_tx_queues) {

			queue_index = 0;
			if (dev->real_num_tx_queues > 1)
				queue_index = skb_tx_hash(dev, skb);

			if (sk) {
				struct dst_entry *dst = rcu_dereference_check(sk->sk_dst_cache, 1);

				if (dst && skb_dst(skb) == dst)
					sk_tx_queue_set(sk, queue_index);
			}
		}
	}

	skb_set_queue_mapping(skb, queue_index);
	return netdev_get_tx_queue(dev, queue_index);
}
2. 下面是其中的一种网卡类型调用函数alloc_etherdev时,
1
dev = alloc_etherdev(sizeof(struct ether1_priv));

其实该函数是一个宏定义:其中第二参数表示的就是队列的数量,这里在Linux2.6.37内核中找到的一种硬件网卡的实现,可用的队列是1个。

1
#define alloc_etherdev(sizeof_priv) alloc_etherdev_mq(sizeof_priv, 1)

下面是alloc_etherdev_mq函数的定义实现。

1
2
3
4
struct net_device *alloc_etherdev_mq(int sizeof_priv, unsigned int queue_count)
{
	return alloc_netdev_mq(sizeof_priv, "eth%d", ether_setup, queue_count);
}
3.

几乎所有的设备都会使用队列调度出口的流量,而内核可以使用对了规则的算法安排那个帧进行发送,使其以最优效率的次序进行传输。这里检查这个队列中是否有enqueue函数,如果有则说明设备会使用这个队列,否则需另外处理。关于enqueue函数的设置,我找到dev_open->dev_activate中调用了qdisc_create_dflt来设置,需要注意的是,这里并不是将传进来的skb直接发送,而是先入队,然后调度队列,具体发送哪个数据包由enqueue和dequeue函数决定,这体现了设备的排队规则

Enqueue 把一个元素添加的队列

Dequeue 从队列中提取一个元素

Requeue 把一个原先已经提取的元素放回到队列,可以由于传输失败。

if (q->enqueue)为真的话,表明这个设备有队列,可以进行相关的流控。调用__dev_xmit_skb函数进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
				 struct net_device *dev,
				 struct netdev_queue *txq)
{
	spinlock_t *root_lock = qdisc_lock(q);
	bool contended = qdisc_is_running(q);
	int rc;

	/*
	 * Heuristic to force contended enqueues to serialize on a
	 * separate lock before trying to get qdisc main lock.
	 * This permits __QDISC_STATE_RUNNING owner to get the lock more often
	 * and dequeue packets faster.
	 */
	if (unlikely(contended))
		spin_lock(&q->busylock);

	spin_lock(root_lock);
	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
		kfree_skb(skb);
		rc = NET_XMIT_DROP;
	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
		 qdisc_run_begin(q)) {
		/*
		 * This is a work-conserving queue; there are no old skbs
		 * waiting to be sent out; and the qdisc is not running -
		 * xmit the skb directly.
		 */
		if (!(dev->priv_flags & IFF_XMIT_DST_RELEASE))
			skb_dst_force(skb);
		__qdisc_update_bstats(q, skb->len);
		if (sch_direct_xmit(skb, q, dev, txq, root_lock)) {
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);
		} else
			qdisc_run_end(q);

		rc = NET_XMIT_SUCCESS;
	} else {
		skb_dst_force(skb);
		rc = qdisc_enqueue_root(skb, q);
		if (qdisc_run_begin(q)) {
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);
		}
	}
	spin_unlock(root_lock);
	if (unlikely(contended))
		spin_unlock(&q->busylock);
	return rc;
}

_dev_xmit_skb函数主要做两件事情:
(1) 如果流控对象为空的,试图直接发送数据包。
(2) 如果流控对象不空,将数据包加入流控对象,并运行流控对象。

当设备进入调度队列准备传输时,qdisc_run函数就会选出下一个要传输的帧,而该函数会间接的调用相关联的队列规则dequeue函数,从对了中取出数据进行传输。

有两个时机将会调用qdisc_run():
1.__dev_xmit_skb()
2.软中断服务线程NET_TX_SOFTIRQ

其实,真正的工作有qdisc_restart函数实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void __qdisc_run(struct Qdisc *q)
{
	unsigned long start_time = jiffies;

	while (qdisc_restart(q)) { //返回值大于0,说明流控对象非空。
		/*
		 * Postpone processing if
		 * 1. another process needs the CPU;
		 * 2. we've been doing it for too long.
		 */
		if (need_resched() || jiffies != start_time) { //已经不允许继续运行本流控对象。
			__netif_schedule(q); //将本队列加入软中断的output_queue链表中。
			break;
		}
	}

	qdisc_run_end(q);
}

如果发现本队列运行的时间太长了,将会停止队列的运行,并将队列加入output_queue链表头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static inline int qdisc_restart(struct Qdisc *q)
{
	struct netdev_queue *txq;
	struct net_device *dev;
	spinlock_t *root_lock;
	struct sk_buff *skb;

	/* Dequeue packet */
	skb = dequeue_skb(q);//一开始就调用dequeue函数。
	if (unlikely(!skb))
		return 0;
	WARN_ON_ONCE(skb_dst_is_noref(skb));
	root_lock = qdisc_lock(q);
	dev = qdisc_dev(q);
	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));

	return sch_direct_xmit(skb, q, dev, txq, root_lock);//用于发送数据包
}
* Returns to the caller:
 *                0 - queue is empty or throttled.
 *                >0 - queue is not empty.
 */
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
		 struct net_device *dev, struct netdev_queue *txq,
		 spinlock_t *root_lock)
{
	int ret = NETDEV_TX_BUSY;

	/* And release qdisc */
	spin_unlock(root_lock);

	HARD_TX_LOCK(dev, txq, smp_processor_id());
	if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq)) //设备没有被停止,且发送队列没有被冻结
		ret = dev_hard_start_xmit(skb, dev, txq); //发送数据包

	HARD_TX_UNLOCK(dev, txq);

	spin_lock(root_lock);

	if (dev_xmit_complete(ret)) {
		/* Driver sent out skb successfully or skb was consumed */
		//发送成功,返回新的队列的长度
		ret = qdisc_qlen(q);
	} else if (ret == NETDEV_TX_LOCKED) {
		/* Driver try lock failed */
		ret = handle_dev_cpu_collision(skb, txq, q);
	} else {
		/* Driver returned NETDEV_TX_BUSY - requeue skb */
		if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))
			printk(KERN_WARNING "BUG %s code %d qlen %d\n",
			 dev->name, ret, q->q.qlen);
		 //设备繁忙,重新调度发送(利用softirq)
		ret = dev_requeue_skb(skb, q);
	}

	if (ret && (netif_tx_queue_stopped(txq) ||
		 netif_tx_queue_frozen(txq)))
		ret = 0;

	return ret;
}
4. 我们看一下下面的发送函数。

从此函数可以看出,当驱动使用发送队列的时候会循环从队列中取出包发送, 而不使用队列的时候只发送一次,如果没发送成功就直接丢弃

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
struct netdev_queue *txq)
{
	const struct net_device_ops *ops = dev->netdev_ops;//驱动程序的函数集
	int rc = NETDEV_TX_OK;

	if (likely(!skb->next)) {
		if (!list_empty(&ptype_all))
			dev_queue_xmit_nit(skb, dev);//如果dev_add_pack加入的是ETH_P_ALL,那么就会复制一份给你的回调函数。

		/*
		 * If device doesnt need skb->dst, release it right now while
		 * its hot in this cpu cache
		 */
		if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
			skb_dst_drop(skb);

		skb_orphan_try(skb);

		if (vlan_tx_tag_present(skb) &&
		 !(dev->features & NETIF_F_HW_VLAN_TX)) {
			skb = __vlan_put_tag(skb, vlan_tx_tag_get(skb));
			if (unlikely(!skb))
				goto out;

			skb->vlan_tci = 0;
		}

		if (netif_needs_gso(dev, skb)) {
			if (unlikely(dev_gso_segment(skb)))
				goto out_kfree_skb;
			if (skb->next)
				goto gso;
		} else {
			if (skb_needs_linearize(skb, dev) &&
			 __skb_linearize(skb))
				goto out_kfree_skb;

			/* If packet is not checksummed and device does not
			 * support checksumming for this protocol, complete
			 * checksumming here.
			 */
			if (skb->ip_summed == CHECKSUM_PARTIAL) {
				skb_set_transport_header(skb, skb->csum_start -
					 skb_headroom(skb));
				if (!dev_can_checksum(dev, skb) &&
				 skb_checksum_help(skb))
					goto out_kfree_skb;
			}
		}

		rc = ops->ndo_start_xmit(skb, dev);//调用网卡的驱动程序发送数据。不同的网络设备有不同的发送函数
		trace_net_dev_xmit(skb, rc);
		if (rc == NETDEV_TX_OK)
			txq_trans_update(txq);
		return rc;
	}

gso:
	do {
		struct sk_buff *nskb = skb->next;

		skb->next = nskb->next;
		nskb->next = NULL;

		/*
		 * If device doesnt need nskb->dst, release it right now while
		 * its hot in this cpu cache
		 */
		if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
			skb_dst_drop(nskb);

		rc = ops->ndo_start_xmit(nskb, dev); //调用网卡的驱动程序发送数据。不同的网络设备有不同的发送函数
		trace_net_dev_xmit(nskb, rc);
		if (unlikely(rc != NETDEV_TX_OK)) {
			if (rc & ~NETDEV_TX_MASK)
				goto out_kfree_gso_skb;
			nskb->next = skb->next;
			skb->next = nskb;
			return rc;
		}
		txq_trans_update(txq);
		if (unlikely(netif_tx_queue_stopped(txq) && skb->next))
			return NETDEV_TX_BUSY;
	} while (skb->next);

out_kfree_gso_skb:
	if (likely(skb->next == NULL))
		skb->destructor = DEV_GSO_CB(skb)->destructor;
out_kfree_skb:
	kfree_skb(skb);
out:
	return rc;
}
5.下面看一下dev_queue_xmit_nit函数。

对于通过socket(AF_PACKET,SOCK_RAW,htons(ETH_P_ALL))创建的原始套接口,不但可以接受从外部输入的数据包,而且对于由于本地输出的数据包,如果满足条件,也可以能接受。

该函数就是用来接收由于本地输出的数据包,在链路层的输出过程中,会调用此函数,将满足条件的数据包输入到RAW套接口,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void dev_queue_xmit_nit(struct sk_buff *skb, struct net_device *dev)
{
	struct packet_type *ptype;

#ifdef CONFIG_NET_CLS_ACT
	if (!(skb->tstamp.tv64 && (G_TC_FROM(skb->tc_verd) & AT_INGRESS)))
		net_timestamp_set(skb);-----------------(1)
#else
	net_timestamp_set(skb);
#endif

	rcu_read_lock();
	list_for_each_entry_rcu(ptype, &ptype_all, list) {-----------------(2)
		/* Never send packets back to the socket
		 * they originated from - MvS (miquels@drinkel.ow.org)
		 */
		if ((ptype->dev == dev || !ptype->dev) &&
		 (ptype->af_packet_priv == NULL ||
		 (struct sock *)ptype->af_packet_priv != skb->sk)) {-----------------(3)
			struct sk_buff *skb2 = skb_clone(skb, GFP_ATOMIC); -----------------(4)
			if (!skb2)
				break;

			/* skb->nh should be correctly
			 set by sender, so that the second statement is
			 just protection against buggy protocols.
			 */
			skb_reset_mac_header(skb2);

			if (skb_network_header(skb2) < skb2->data ||
			 skb2->network_header > skb2->tail) {
				if (net_ratelimit())
					printk(KERN_CRIT "protocol %04x is "
					 "buggy, dev %s\n",
					 ntohs(skb2->protocol),
					 dev->name);
				skb_reset_network_header(skb2); -----------------(5)
			}

			skb2->transport_header = skb2->network_header;
			skb2->pkt_type = PACKET_OUTGOING;
			ptype->func(skb2, skb->dev, ptype, skb->dev); -----------------(6)
		}
	}
	rcu_read_unlock();
}

说明:
(1) 记录该数据包输入的时间戳
(2) 遍历ptype_all链表,查找所有符合输入条件的原始套接口,并循环将数据包输入到满足条件的套接口
(3) 数据包的输出设备与套接口的输入设备相符或者套接口不指定输入设备,并且该数据包不是有当前用于比较的套接口输出,此时该套接口满足条件,数据包可以输入
(4) 由于该数据包是额外输入到这个原始套接口的,因此需要克隆一个数据包
(5) 校验数据包是否有效
(6) 将数据包输入原始套接口

6. 对于lookback设备来说处理有些不同。它的hard_start_xmit函数是loopback_xmit

在net/lookback.c文件中,定义的struct net_device_ops loopback_ops结构体

1
2
3
4
5
static const struct net_device_ops loopback_ops = {
	.ndo_init = loopback_dev_init,
	.ndo_start_xmit= loopback_xmit,
	.ndo_get_stats64 = loopback_get_stats64,
};

从这里可以看到起发送函数为loopback_xmit函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static netdev_tx_t loopback_xmit(struct sk_buff *skb,
				 struct net_device *dev)
{
	struct pcpu_lstats *lb_stats;
	int len;

	skb_orphan(skb);

	skb->protocol = eth_type_trans(skb, dev);

	/* it's OK to use per_cpu_ptr() because BHs are off */
	lb_stats = this_cpu_ptr(dev->lstats);

	len = skb->len;
	if (likely(netif_rx(skb) == NET_RX_SUCCESS)) {//直接调用了netif_rx进行了接收处理
		u64_stats_update_begin(&lb_stats->syncp);
		lb_stats->bytes += len;
		lb_stats->packets++;
		u64_stats_update_end(&lb_stats->syncp);
	}

	return NETDEV_TX_OK;
}
7. 已经有了dev_queue_xmit函数,为什么还需要软中断来发送呢?

dev_queue_xmit是对skb做些最后的处理并且第一次尝试发送,软中断是将前者发送失败或者没发完的包发送出去。

主要参考文献:

Linux发送函数dev_queue_xmit分析 http://shaojiashuai123456.iteye.com/blog/842236

TC流量控制实现分析(初步) http://blog.csdn.net/wwwlkk/article/details/5929308

Linux内核源码剖析 TCP/IP实现

路由表 rtable

http://blog.csdn.net/qy532846454/article/details/6423496

http://blog.csdn.net/qy532846454/article/details/6726171

http://blog.csdn.net/qy532846454/article/details/7568994


路由表

在内核中存在路由表fib_table_hash和路由缓存表rt_hash_table。路由缓存表主要是为了加速路由的查找,每次路由查询都会先查找路由缓存,再查找路由表。这和cache是一个道理,缓存存储最近使用过的路由项,容量小,查找快速;路由表存储所有路由项,容量大,查找慢。

首先,应该先了解路由表的意义,下面是route命令查看到的路由表:

1
2
3
4
Destination    Netmask        Gateway         Flags  Interface  Metric
169.254.0.0    255.255.0.0      *               U      eth0       1
192.168.123.0  255.255.255.0    *               U      eth0       1
default        0.0.0.0       192.168.123.254    UG     eth0       1

一条路由其实就是告知主机要到达一个目的地址,下一跳应该走哪里。比如发往192.168.22.3报文通过查路由表,会得到下一跳为192.168.123.254,再将其发送出去。在路由表项中,还有一个很重要的属性-scope,它代表了到目的网络的距离。

路由scope可取值:RT_SCOPE_UNIVERSE, RT_SCOPE_LINK, RT_SCOPE_HOST

在报文的转发过程中,显然是每次转发都要使到达目的网络的距离要越来越小或不变,否则根本到达不了目的网络。上面提到的scope很好的实现这个功能,在查找路由表中,表项的scope一定是更小或相等的scope(比如RT_SCOPE_LINK,则表项scope只能为RT_SCOPE_LINK或RT_SCOPE_HOST)。

路由缓存

路由缓存用于加速路由的查找,当收到报文或发送报文时,首先会查询路由缓存,在内核中被组织成hash表,就是rt_hash_table。

1
static struct rt_hash_bucket          *rt_hash_table __read_mostly;      [net/ipv4/route.c]

通过ip_route_input()进行查询,首先是缓存操作时,通过[src_ip, dst_ip, iif,rt_genid]计算出hash值

1
hash = rt_hash(daddr, saddr, iif, rt_genid(net));

此时rt_hash_table[hash].chain就是要操作的缓存表项的链表,比如遍历该链表

1
for (rth = rt_hash_table[hash].chain; rth; rth = rth->u.dst.rt_next)

因此,在缓存中查找一个表项,首先计算出hash值,取出这组表项,然后遍历链表,找出指定的表项,这里需要完全匹配[src_ip, dst_ip, iif, tos, mark, net],实际上struct rtable中有专门的属性用于缓存的查找键值 – struct flowi。

1
2
/* Cache lookup keys */
struct flowi                fl;

当找到表项后会更新表项的最后访问时间,并取出dst

1
2
dst_use(&rth->u.dst, jiffies);
skb_dst_set(skb, &rth->u.dst);

路由缓存的创建

inet_init() -> ip_init() -> ip_rt_init()

1
2
3
4
5
6
7
8
9
10
rt_hash_table = (struct rt_hash_bucket *)
	alloc_large_system_hash("IP route cache",
								sizeof(struct rt_hash_bucket),
								rhash_entries,
								(totalram_pages >= 128 * 1024) ?
								15 : 17,
								0,
								&rt_hash_log,
								&rt_hash_mask,
								rhash_entries ? 0 : 512 * 1024);

其中rt_hash_mask表示表的大小,rt_hash_log = log(rt_hash_mask),创建后的结构如图所示:

路由缓存插入条目

函数rt_intern_hash()

要插入的条目是rt,相应散列值是hash,首先通过hash值找到对应的bucket

1
rthp = &rt_hash_table[hash].chain;

然后对bucket进行一遍查询,这次查询的目的有两个:如果是超时的条目,则直接删除;如果是与rt相同键值的条目,则删除并将rt插入头部返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
while ((rth = *rthp) != NULL) {
	if (rt_is_expired(rth)) {     // 超时的条目
		*rthp = rth->u.dst.rt_next;
		rt_free(rth);
		continue;
	}
	if (compare_keys(&rth->fl, &rt->fl) && compare_netns(rth, rt)) { //重复的条目
		*rthp = rth->u.dst.rt_next;
		rcu_assign_pointer(rth->u.dst.rt_next, rt_hash_table[hash].chain);
		rcu_assign_pointer(rt_hash_table[hash].chain, rth);
		……
	}
	……
	rthp = &rth->u.dst.rt_next;
}

在扫描一遍后,如rt还未存在,则将其插入头部

1
2
rt->u.dst.rt_next = rt_hash_table[hash].chain;
rcu_assign_pointer(rt_hash_table[hash].chain, rt);

如果新插入rt满足一定条件,还要与ARP邻居表进行绑定

Hint:缓存的每个bucket是没有头结点的,单向链表,它所使用的插入和删除操作是值得学习的,简单实用。

路由缓存删除条目

rt_del()

要删除的条目是rt,相应散列值是hash,首先通过hash值找到对应的bucket,然后遍历,如果条目超时,或找到rt,则删除它。

1
2
3
4
5
6
7
8
9
10
11
12
rthp = &rt_hash_table[hash].chain;
spin_lock_bh(rt_hash_lock_addr(hash));
ip_rt_put(rt);
while ((aux = *rthp) != NULL) {
	if (aux == rt || rt_is_expired(aux)) {
		*rthp = aux->u.dst.rt_next;
		rt_free(aux);
		continue;
	}
	rthp = &aux->u.dst.rt_next;
}
spin_unlock_bh(rt_hash_lock_addr(hash));

路由表的创建

inet_init() -> ip_init() -> ip_fib_init() -> fib_net_init() -> ip_fib_net_init()[net/ipv4/fib_frontend.c]

首先为路由表分配空间,这里的每个表项hlist_head实际都会链接一个单独的路由表,FIB_TABLE_HASHSZ表示了分配多少个路由表,一般情况下至少有两个 – LOCAL和MAIN。注意这里仅仅是表头的空间分配,还没有真正分配路由表空间。

1
2
net->ipv4.fib_table_hash = kzalloc(
		sizeof(struct hlist_head)*FIB_TABLE_HASHSZ, GFP_KERNEL);

ip_fib_net_init() -> fib4_rules_init(),这里真正分配了路由表空间

1
2
local_table = fib_hash_table(RT_TABLE_LOCAL);
main_table  = fib_hash_table(RT_TABLE_MAIN);

然后将local和main表链入之前的fib_table_hash中

1
2
3
4
5
hlist_add_head_rcu(&local_table->tb_hlist,
		&net->ipv4.fib_table_hash[TABLE_LOCAL_INDEX]);

hlist_add_head_rcu(&main_table->tb_hlist,
		&net->ipv4.fib_table_hash[TABLE_MAIN_INDEX]);

最终生成结构如图,LOCAL表位于fib_table_hash[0],MAIN表位于fib_table_hash[1];两张表通过结构tb_hlist链入链表,而tb_id则标识了功能,255是LOCAL表,254是MAIN表。

关于这里的struct fn_hash,它表示了不同子网掩码长度的hash表[即fn_zone],对于ipv4,从0~32共33个。而fn_hash的实现则是fib_table的最后一个参数unsigned char tb_data[0]。

注意到这里fn_zone还只是空指针,我们还只完成了路由表初始化的一部分。在启动阶段还会调用inet_rtm_newroute() -> fib_table_insert() -> fn_new_zone() [fib_hash.c]来创建fn_zone结构,前面已经讲过,fn_zone一共有33个,其中掩码长度为0[/0]表示为默认路由,fn_zone可以理解为相同掩码的地址集合。

首先为fn_zone分配空间

1
struct fn_zone *fz = kzalloc(sizeof(struct fn_zone), GFP_KERNEL);

传入参数z代表掩码长度, z = 0的掩码用于默认路由,一般只有一个,所以fz_divisor只需设为1;其它设为16;这里要提到fz_divisor的作用,fz->fz_hash并不是个单链表,而是一个哈希表,而哈希表的大小就是fz_divisor。

1
2
3
4
5
if (z) {
	fz->fz_divisor = 16;
} else {
	fz->fz_divisor = 1;
}

fz_hashmask实际是用于求余数的,当算出hash值,再hash & fz_hashmask就得出了在哈希表的位置;而fz_hash就是下一层的哈希表了,前面已经提过路由表被多组分层了,这里fz_hash就是根据fz_divisor大小来创建的;fz_order就是子网掩码长度;fz_mask就是子网掩码。

1
2
3
4
fz->fz_hashmask = (fz->fz_divisor - 1);
fz->fz_hash = fz_hash_alloc(fz->fz_divisor);
fz->fz_order = z;
fz->fz_mask = inet_make_mask(z);

从子网长度大于新添加fz的fn_zone中挑选一个不为空的fn_zones[i],将新创建的fz设成fn_zones[i].next;然后将fz根据掩码长度添加到fn_zones[]中相应位置;fn_zone_list始终指向掩码长度最长的fn_zone。

1
2
3
4
5
6
7
8
9
10
11
for (i=z+1; i<=32; i++)
	if (table->fn_zones[i])
		break;
if (i>32) {
	fz->fz_next = table->fn_zone_list;
	table->fn_zone_list = fz;
} else {
	fz->fz_next = table->fn_zones[i]->fz_next;
	table->fn_zones[i]->fz_next = fz;
}
table->fn_zones[z] = fz;

这里的fn_hash是数组与链表的结合体,看下fn_hash定义

1
2
3
4
struct fn_hash {
	struct fn_zone *fn_zones[33];
	struct fn_zone *fn_zone_list;
};

fn_hash包含33数组元素,每个元素存放一定掩码长度的fn_zone,其中fn_zone[i]存储掩码长度为i。而fn_zone通过内部属性fz_next又彼此串连起来,形成单向链表,其中fn_zone_list可以看作链表头,而这里链表的组织顺序是倒序的,即从掩码长到短。

到这里,fz_hash所分配的哈希表还没有插入内容,这部分为fib_insert_node()完成。

inet_rtm_newroute() -> fib_table_insert() -> fib_insert_node() [net/ipv4/fib_hash.c]

这里f是fib_node,可以理解为具有相同网络地址的路由项集合。根据fn_key(网络地址)和fz(掩码长度)来计算hash值,决定将f插入fz_hash的哪个项。

1
2
struct hlist_head *head = &fz->fz_hash[fn_hash(f->fn_key, fz)];
hlist_add_head(&f->fn_hash, head);

如何fib_node还不存在,则会创建它,这里的kmem_cache_zalloc()其实就是内存分配

1
2
3
4
5
6
7
new_f = kmem_cache_zalloc(fn_hash_kmem, GFP_KERNEL);
if (new_f == NULL)
	goto out;
INIT_HLIST_NODE(&new_f->fn_hash);
INIT_LIST_HEAD(&new_f->fn_alias);
new_f->fn_key = key;
f = new_f;

路由表最后一层是fib_info,具体的路由信息都存储在此,它由fib_create_info()创建。

首先为fib_info分配空间,由于fib_info的最后一个属性是struct fib_nh fib_nh[0],因此大小是fib_info + nhs * fib_nh,这里的fib_nh代表了下一跳(next hop)的信息,nhs代表了下一跳的数目,一般情况下nhs=1,除非配置了支持多路径。

1
fi = kzalloc(sizeof(*fi)+nhs*sizeof(struct fib_nh), GFP_KERNEL);

设置fi的相关属性

1
2
3
4
5
6
fi->fib_net = hold_net(net);
fi->fib_protocol = cfg->fc_protocol;
fi->fib_flags = cfg->fc_flags;
fi->fib_priority = cfg->fc_priority;
fi->fib_prefsrc = cfg->fc_prefsrc;
fi->fib_nhs = nhs;

使fi后面所有的nh->nh_parent指向fi,设置后如图所示

1
2
3
change_nexthops(fi) {
	nexthop_nh->nh_parent = fi;
} endfor_nexthops(fi)

设置fib_nh的属性,这里仅展示了单一路径的情况:

1
2
3
4
struct fib_nh *nh = fi->fib_nh;
nh->nh_oif = cfg->fc_oif;
nh->nh_gw = cfg->fc_gw;
nh->nh_flags = cfg->fc_flags;

然后,再根据cfg->fc_scope值来设置nh的其余属性。如果scope是RT_SCOPE_HOST,则设置下一跳scope为RT_SCOPE_NOWHERE

1
2
3
4
5
if (cfg->fc_scope == RT_SCOPE_HOST) {
	struct fib_nh *nh = fi->fib_nh;
	nh->nh_scope = RT_SCOPE_NOWHERE;
	nh->nh_dev = dev_get_by_index(net, fi->fib_nh->nh_oif);
}

如果scope是RT_SCOPE_LINK或RT_SCOPE_UNIVERSE,则设置下跳

1
2
3
4
change_nexthops(fi) {
	if ((err = fib_check_nh(cfg, fi, nexthop_nh)) != 0)
		goto failure;
} endfor_nexthops(fi)

最后,将fi链入链表中,这里要注意的是所有的fib_info(只要创建了的)都会加入fib_info_hash中,如果路由项使用了优先地址属性,还会加入fib_info_laddrhash中。

1
2
3
4
5
6
7
8
hlist_add_head(&fi->fib_hash,
		&fib_info_hash[fib_info_hashfn(fi)]);

if (fi->fib_prefsrc) {
	struct hlist_head *head;
	head = &fib_info_laddrhash[fib_laddr_hashfn(fi->fib_prefsrc)];
	hlist_add_head(&fi->fib_lhash, head);
}

无论fib_info在路由表中位于哪个掩码、哪个网段结构下,都与fib_info_hash和fib_info_laddrhash无关,这两个哈希表与路由表独立,主要是用于加速路由信息fib_info的查找。哈希表的大小为fib_hash_size,当超过这个限制时,fib_hash_size * 2(如果哈希函数够好,每个bucket都有一个fib_info)。fib_info在哈希表的图示如下:

由于路由表信息也可能要以设备dev为键值搜索,因此还存在fib_info_devhash哈希表,用于存储nh的设置dev->ifindex。

1
2
3
4
5
change_nexthops(fi) {
	hash = fib_devindex_hashfn(nexthop_nh->nh_dev->ifindex);
	head = &fib_info_devhash[hash];
	hlist_add_head(&nexthop_nh->nh_hash, head);
} endfor_nexthops(fi)

上面讲过了路由表各个部分的创建,现在来看下它们是如何一起工作的,在fib_table_insert()[net/ipv4/fib_hash.c]完成整个的路由表创建过程。下面来看下fib_table_insert()函数:

从fn_zones中取出掩码长度为fc_dst_len的项,如果该项不存在,则创建它[fn_zone的创建前面已经讲过]。

1
2
3
fz = table->fn_zones[cfg->fc_dst_len];
if (!fz && !(fz = fn_new_zone(table, cfg->fc_dst_len)))
	return -ENOBUFS;

然后创建fib_info结构,[前面已经讲过]

1
fi = fib_create_info(cfg);

然后在掩码长度相同项里查找指定网络地址key(如145.222.33.0/24),查找的结果如图所示

1
f = fib_find_node(fz, key);

如果不存在该网络地址项,则创建相应的fib_node,并加入到链表fz_hash中

1
2
3
4
5
6
7
8
9
10
11
12
if (!f) {
	new_f = kmem_cache_zalloc(fn_hash_kmem, GFP_KERNEL);
	if (new_f == NULL)
		goto out;
 
	INIT_HLIST_NODE(&new_f->fn_hash);
	INIT_LIST_HEAD(&new_f->fn_alias);
	new_f->fn_key = key;
	f = new_f;
}
……
fib_insert_node(fz, new_f);

如果存在该网络地址项,则在fib_node的属性fn_alias中以tos和fi->fib_priority作为键值查找。一个fib_node可以有多个fib_alias相对应,这些fib_alias以链表形式存在,并按tos并从大到小的顺序排列。因此,fib_find_alias查找到的是第一个fib_alias->tos不大于tos的fib_alias项。

1
fa = fib_find_alias(&f->fn_alias, tos, fi->fib_priority);

如果查找到的fa与与要插入的路由项完全相同,则按照设置的标置位进行操作,NLM_F_REPLACE则替换掉旧的,NLM_F_APPEND添加在后面。

设置要插入的fib_alias的属性,包括最重要的fib_alias->fa_info设置为fi

1
2
3
4
5
new_fa->fa_info = fi;
new_fa->fa_tos = tos;
new_fa->fa_type = cfg->fc_type;
new_fa->fa_scope = cfg->fc_scope;
new_fa->fa_state = 0;

如果没有要插入路由的网络地址项fib_node,则之前已经创建了新的,现在将它插入到路由表中fib_insert_node();然后将new_fa链入到fib_node->fn_alias中

1
2
3
4
5
if (new_f)
	fib_insert_node(fz, new_f);

list_add_tail(&new_fa->fa_list,
			(fa ? &fa->fa_list : &f->fn_alias));

最后,由于新插入的路由表项,会发出通告,告知所以加入RTNLGRP_IPV4_ROUTE组的成员,这个功能可以在linux中使用”ip route monitor”来测试。最终的路由表如图所示:

1
rtmsg_fib(RTM_NEWROUTE, key, new_fa, cfg->fc_dst_len, tb->tb_id, &cfg->fc_nlinfo, 0);

至此,就完成了路由表项的插入,加上之前的路由表的初始化,整个路由表的创建过程就讲解完了,小小总结一下:

路由表的查找效率是第一位的,因此内核在实现时使用了多级索引来进行加速

第一级:fn_zone 按不同掩码长度分类(如/5和/24)

第二级:fib_node 按不同网络地址分类(如124.44.33.0/24)

第三级:fib_info 下一跳路由信息


路由可以分为两部分:路由缓存(rt_hash_table)和路由表()

路由缓存顾名思义就是加速路由查找的,路由缓存的插入是由内核控制的,而非人为的插入,与之相对比的是路由表是人为插入的,而非内核插入的。在内核中,路由缓存组织成rt_hash_table的结构。

下面是一段IP层协议的代码段[net/ipv4/route.c],传入IP层的协议在查找路由时先在路由缓存中查找,如果已存在,则skb_dst_set(skb, &rth->u.dst)并返回;否则在路由表中查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
hash = rt_hash(daddr, saddr, iif, rt_genid(net));  
  
rcu_read_lock();  
for (rth = rcu_dereference(rt_hash_table[hash].chain); rth;  
	 rth = rcu_dereference(rth->u.dst.rt_next)) {  
	if (((rth->fl.fl4_dst ^ daddr) |  
		 (rth->fl.fl4_src ^ saddr) |  
		 (rth->fl.iif ^ iif) |  
		 rth->fl.oif |  
		 (rth->fl.fl4_tos ^ tos)) == 0 &&  
		rth->fl.mark == skb->mark &&  
		net_eq(dev_net(rth->u.dst.dev), net) &&  
		!rt_is_expired(rth)) {  
		dst_use(&rth->u.dst, jiffies);  
		RT_CACHE_STAT_INC(in_hit);  
		rcu_read_unlock();  
		skb_dst_set(skb, &rth->u.dst);  
		return 0;  
	}  
	RT_CACHE_STAT_INC(in_hlist_search);  
}  
rcu_read_unlock();  

在ip_route_input()中查询完陆由缓存后会处理组播地址,如果是组播地址,则下面判断会成功:ipv4_is_multicast(daddr)。

然后执行ip_route_input_mc(),它的主要作用就是生成路由缓存项rth,并插入缓存。rth的生成与初始化只给出了input函数的,其它略去了,可以看出组播报文会通过ip_local_deliver()继续向上传递。

1
2
3
rth->u.dst.input= ip_local_deliver;  
hash = rt_hash(daddr, saddr, dev->ifindex, rt_genid(dev_net(dev)));  
return rt_intern_hash(hash, rth, NULL, skb, dev->ifindex);  

路由表又可以分为两个:RT_TABLE_LOCAL和RT_TABLE_MAIN
RT_TABLE_LOCAL存储目的地址是本机的路由表项,这些目的地址就是为各个网卡配置的IP地址;
RT_TABLE_MAIN存储到其它主机的路由表项;

显然,RT_TABLE_MAIN路由表只有当主机作为路由器时才有作用,一般主机该表是空的,因为主机不具有转发数据包的功能。RT_TABLE_LOCAL对主机就足够了,为各个网卡配置的IP地址都会加入RT_TABLE_LOCAL中,如为eth1配置了1.2.3.4的地址,则RT_TABLE_LOCAL中会存在1.2.3.4的路由项。只有本地的网卡地址会被加入,比如lo、eth1。IP模块在初始化时ip_init() -> ip_rt_init() - > ip_fib_init()会注册notifier机制,当为网卡地址配置时会执行fib_netdev_notifier和fib_inetaddr_notifier,使更改反映到RT_TABLE_LOCAL中。

1
2
register_netdevice_notifier(&fib_netdev_notifier);  
register_inetaddr_notifier(&fib_inetaddr_notifier);  

而当在路由缓存中没有查找到缓存项时,会进行路由表查询,还是以IP层协议中的代码段为例[net/ipv4/route.c],fib_lookup()会在MAIN和LOCAL两张表中进行查找。

1
2
3
4
5
if ((err = fib_lookup(net, &fl, &res)) != 0) {  
	if (!IN_DEV_FORWARD(in_dev))  
		goto e_hostunreach;  
	goto no_route;  
}  

如果主机配置成了支持转发,则无论在路由表中找到与否,都会生成这次查询的一个缓存,包括源IP、目的IP、接收的网卡,插入路由缓存中:

1
2
hash = rt_hash(daddr, saddr, fl.iif, rt_genid(net));  
err = rt_intern_hash(hash, rth, NULL, skb, fl.iif);  

不同的是,如果在路由表中查询失败,即数据包不是发往本机,也不能被本机转发,则会设置插入路由缓存的缓存项u.dst.input=ip_error,而u.dst.input即为IP层处理完后向上传递的函数,而ip_error()会丢弃数据包,被发送相应的ICMP错误报文。不在路由表中的路由项也要插入路由缓存,这可以看作路由学习功能,下次就可以直接在路由缓存中找到。

1
2
3
rth->u.dst.input= ip_error;  
rth->u.dst.error= -err;  
rth->rt_flags    &= ~RTCF_LOCAL;  

但如果主机不支持转发,即没有路由功能,则只有在找到时才会添加路由缓存项,都不会生成路由缓存项。这是因为在LOCAL表中没有找到,表明数据包不是发往本机的,此时缓存这样的路由项对于主机的数据包传输没有一点意义。它只需要知道哪些数据包是发给它的,其余的一律不管!

路由查询整合起来,就是由ip_route_input()引入,然后依次进行路由缓存和路由表查询,并对路由缓存进行更新。路由缓存在每个数据包到来时都可能发生更新,但路由表则不一样,只能通过RTM机制更新,LOCAL表是在网卡配置时更新的,MAIN表则是由人工插入的(inet_rtm_newroute)。

ip_route_input()
- 路由缓存查询
- 路由表查询:ip_route_input_slow() -> fib_lookup()


这次将以更实际的例子来分析过程中路由表的使用情况,注意下文都是对路由缓存表的描述,因为路由表在配置完网卡地址后就不会再改变了(除非人为的去改动),测试环境如下图:

两台主机Host1与Host2,分别配置了IP地址192.168.1.1与192.168.1.2,两台主机间用网线直连。在两台主机上分别执行如下操作:
1. 在Host1上ping主机Host2
2. 在Host2上ping主机Host1

很简单常的两台主机互ping的例子,下面来分析这过程中路由表的变化,准备说是路由缓存的变化。首先,路由缓存会存在几个条目?答案不是2条而是3条,这点很关键,具体可以通过/proc/net/rt_cache来查看路由缓存表,下图是执行上述操作后得到的结果:

brcm0.1是Host主机上的网卡设备,等同于常用的eth0,lo是环路设备。对结果稍加分析,可以发现,条目1和条目2是完全一样的,除了计数的Use稍有差别,存在这种情况的原因是缓存表是以Hash表的形式存储的,尽管两者内容相同,在实际插入时使用的键值是不同的,下面以Host2主机的路由缓存表为视角,针对互ping的过程进行逐一分析。

假设brcm0.1设备的index = 2

步骤0:初始时陆由缓存为空

步骤1:主机Host1 ping 主机Host2

Host2收到来自Host1的echo报文(dst = 192.168.1.2, src = 192.168.1.1)
在报文进入IP层后会查询路由表,以确定报文的接收方式,相应调用流程:
ip_route_input() -> ip_route_input_slow()
在ip_route_input()中查询路由缓存,使用的键值是[192.168.1.2, 192.168.1.1, 2, id],由于缓存表为空,查询失败,继续走ip_route_input_slow()来创建并插入新的缓存项。

1
hash = rt_hash(daddr, saddr, iif, rt_genid(net));  

在ip_route_input_slow()中查询路由表,因为发往本机,在会LOCAL表中匹配192.168.1.2条目,查询结果res.type==RTN_LOCAL。

1
2
3
4
5
if ((err = fib_lookup(net, &fl, &res)) != 0) {  
 if (!IN_DEV_FORWARD(in_dev))  
  goto e_hostunreach;  
 goto no_route;  
}  

然后根据res.type跳转到local_input代码段,创建新的路由缓存项,并插入陆由缓存。

1
2
3
4
5
6
7
8
9
10
rth = dst_alloc(&ipv4_dst_ops);  
……  
rth->u.dst.dev = net->loopback_dev;  
rth->rt_dst = daddr;  
rth->rt_src = saddr;  
rth->rt_gateway = daddr;  
rth->rt_spec_dst = spec_dst; (spec_dst=daddr)  
……  
hash = rt_hash(daddr, saddr, fl.iif, rt_genid(net));  
err = rt_intern_hash(hash, rth, NULL, skb, fl.iif);  

因此插入的第一条缓存信息如下:

1
2
	Key = [dst = 192.168.1.2  src = 192.168.1.1 idx = 2 id = id]
	Value = [Iface = lo dst = 192.168.1.2 src = 192.168.1.1 idx = 2 id = id ……]

步骤2: 主机Host2 发送echo reply报文给主机 Host1 (dst = 192.168.1.1 src = 192.168.1.2)
步骤2是紧接着步骤1的,Host2在收到echo报文后会立即回复echo reply报文,相应调用流程:
icmp_reply() -> ip_route_output_key() -> ip_route_output_flow() -> ip_route_output_key() -> ip_route_output_slow() -> ip_mkroute_output() -> mkroute_output()
在icmp_reply()中生成稍后路由查找中的关键数据flowi,可以看作查找的键值,由于是回复已收到的报文,因此目的与源IP地址者是已知的,下面结构中daddr=192.168.1.1,saddr=192.168.1.2。

1
2
3
4
5
struct flowi fl = { .nl_u = { .ip4_u =  
  { .daddr = daddr,  
  .saddr = rt->rt_spec_dst,  
  .tos = RT_TOS(ip_hdr(skb)->tos) } },  
  .proto = IPPROTO_ICMP };  

在__ip_route_output_key()时会查询路由缓存表,查询的键值是[192.168.1.1, 192.168.1.2, 0, id],由于此时路由缓存中只有一条刚刚插入的从192.168.1.1->192.168.1.2的缓存项,因而查询失败,继续走ip_route_output_slow()来创建并插入新的缓存项。

1
hash = rt_hash(flp->fl4_dst, flp->fl4_src, flp->oif, rt_genid(net));  

在ip_route_input_slow()中查询路由表,因为在同一网段,在会MAIN表中匹配192.168.1.0/24条目,查询结果res.type==RTN_UNICAST。

1
2
3
if (fib_lookup(net, &fl, &res)) {  
…..  
}  

然后调用__mkroute_output()来生成新的路由缓存,信息如下:

1
2
3
4
5
6
rth->u.dst.dev = dev_out;  
rth->rt_dst = fl->fl4_dst;  
rth->rt_src = fl->fl4_src;  
rth->rt_gateway = fl->fl4_dst;  
rth->rt_spec_dst= fl->fl4_src;  
rth->fl.oif = oldflp->oif; (oldflp->oif为0)  

插入路由缓存表时使用的键值是:

1
hash = rt_hash(oldflp->fl4_dst, oldflp->fl4_src, oldflp->oif, rt_genid(dev_net(dev_out)));  

这条语句很关键,缓存的存储形式是hash表,除了生成缓存信息外,还要有相应的键值,这句的hash就是产生的键值,可以看到,它是由(dst, src, oif, id)四元组生成的,dst和src很好理解,id对于net来说是定值,oif则是关键,注意这里用的是oldflp->oif(它的值为0),尽管路由缓存对应的出接口设备是dev_out。所以,第二条缓存信息的如下:

1
2
	Key = [dst = 192.168.1.1  src = 192.168.1.2 idx = 0 id = id]
	Value = [Iface = brcm0.1  dst = 192.168.1.1 src = 192.168.1.2 idx = 2 id = id ……]

步骤3:

主机Host2 ping 主机Host1
Host2向Host1发送echo报文(dst = 192.168.1.1, src = 192.168.1.2)
Host2主动发送echo报文,使用SOCK_RAW与IPPROTO_ICMP组合的套接字,相应调用流程:
raw_sendmsg() -> ip_route_output_flow() -> ip_route_output_key() -> ip_route_output_slow() -> ip_mkroute_output() -> mkroute_output()
在raw_sendmsg()中生成稍后路由查找中的关键数据flowi,可以看作查找的键值,由于是主动发送的报文,源IP地址者还是未知的,因为主机可能是多接口的,在查询完路由表后才能得到要走的设备接口和相应的源IP地址。下面结构中daddr=192.168.1.1,saddr=0。

1
2
3
4
5
6
7
8
9
struct flowi fl = { .oif = ipc.oif,  
  .mark = sk->sk_mark,  
  .nl_u = { .ip4_u =  
	{ .daddr = daddr,  
   .saddr = saddr,  
   .tos = tos } },  
  .proto = inet->hdrincl ? IPPROTO_RAW :  
		sk->sk_protocol,  
 };  

在__ip_route_output_key()时会查询路由缓存表,查询的键值是[192.168.1.1, 0, 0, id],尽管此时路由缓存中刚刚插入了192.168.1.2->192.168.1.1的条目,但由于两者的键值不同,因而查询依旧失败,继续走ip_route_output_slow()来创建并插入新的缓存项。

1
hash = rt_hash(flp->fl4_dst, flp->fl4_src, flp->oif, rt_genid(net));  

与Host2回复Host1的echo报文相比,除了进入函数不同(前者为icmp_reply,后者为raw_sendmsg),后续调用流程是完全相同的,导致最终路由缓存不同(准确说是键值)是因为初始时flowi不同。
此处,raw_sendmsg()中,flowi的初始值:dst = 192.168.1.1, src = 0, oif = 0
对比icmp_reply()中,flowi的初始值:dst = 192.168.1.1, src = 192.168.1.2, oif = 0
在上述调用流程中,在__ip_route_output_key()中查找路由缓存,尽管此时路由缓存有从192.168.1.2到192.168.1.1的缓存项,但它的键值与此次查找的键值[192.168.1.1, 192.168.1.2, 0],从下表可以明显看出:

由于查找失败,生成新的路由缓存项并插入路由缓存表,注意在ip_route_output_slow()中查找完路由表后,设置了缓存的src。

1
2
if (!fl.fl4_src)  
	fl.fl4_src = FIB_RES_PREFSRC(res);  

因此插入的第三条缓存信息如下,它与第二条缓存完成相同,区别在于键值不同:

1
2
	Key = [dst = 192.168.1.1  src = 0 idx = 0 id = id]
	Value = [Iface = brcm0.1  dst = 192.168.1.1 src = 192.168.1.2 idx = 2 id = id ……]

最终,路由缓存表如下:

第三条缓存条目键值使用src=0, idx=0的原因是当主机要发送报文给192.168.1.1的主机时,直到IP层路由查询前,它都无法知道该使用的接口地址(如果没有绑定的话),而路由缓存的查找发生在路由查询之前,所以src=0,idx=0才能保证后续报文使用该条目。