kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

Linux内存管理--基本概念

http://blog.csdn.net/myarrow/article/details/8624687

1
2
3
4
5
PTE_PFN_MASK=0x3ffffffff000 PAGE_OFFSET=ffff880000000000
PGDIR_SHIFT=39 PTRS_PER_PGD=512
PUD_SHIFT=30 PTRS_PER_PUD=512
PMD_SHIFT=21 PTRS_PER_PMD=512
PAGE_SHIFT=12 PTRS_PER_PTE=512

对于内存管理,Linux采用了与具体体系架构不相关的设计模型,实现了良好的可伸缩性。它主要由内存节点node、内存区域zone和物理页框page三级架构组成。

• 内存节点node

内存节点node是计算机系统中对物理内存的一种描述方法,一个总线主设备访问位于同一个节点中的任意内存单元所花的代价相同,而访问任意两个不同节点中的内存单元所花的代价不同。在一致存储结构(Uniform Memory Architecture,简称UMA)计算机系统中只有一个节点,而在非一致性存储结构(NUMA)计算机系统中有多个节点。Linux内核中使用数据结构pg_data_t来表示内存节点node。如常用的ARM架构为UMA架构。

• 内存区域zone

内存区域位于同一个内存节点之内,由于各种原因它们的用途和使用方法并不一样。如基于IA32体系结构的个人计算机系统中,由于历史原因使得ISA设备只能使用最低16MB来进行DMA传输。又如,由于Linux内核采用

• 物理页框page

  1. Linux虚拟内存三级页表

Linux虚拟内存三级管理由以下三级组成:
• PGD: Page Global Directory (页目录)
• PMD: Page Middle Directory (页目录)
• PTE: Page Table Entry (页表项)

每一级有以下三个关键描述宏:
• SHIFT
• SIZE
• MASK

如页的对应描述为:

1
2
3
4
/* PAGE_SHIFT determines the page size  asm/page.h */  
#define PAGE_SHIFT      12  
#define PAGE_SIZE       (_AC(1,UL) << PAGE_SHIFT)  
#define PAGE_MASK       (~(PAGE_SIZE-1))  

数据结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* asm/page.h */  
typedef unsigned long pteval_t;  
  
typedef pteval_t pte_t;  
typedef unsigned long pmd_t;  
typedef unsigned long pgd_t[2];  
typedef unsigned long pgprot_t;  
  
#define pte_val(x)      (x)  
#define pmd_val(x)      (x)  
#define pgd_val(x)  ((x)[0])  
#define pgprot_val(x)   (x)  
  
#define __pte(x)        (x)  
#define __pmd(x)        (x)  
#define __pgprot(x)     (x)  

2.1 Page Directory (PGD and PMD)

每个进程有它自己的PGD( Page Global Directory),它是一个物理页,并包含一个pgd_t数组。其定义见<asm/page.h>。 进程的pgd_t数据见 task_struct -> mm_struct -> pgd_t * pgd;

ARM架构的PGD和PMD的定义如下<arch/arm/include/asm/pgtable.h>:

1
2
3
4
5
6
7
8
9
10
#define PTRS_PER_PTE  512    // PTE中可包含的指针<u32>数 (21-12=9bit)  
#define PTRS_PER_PMD  1  
#define PTRS_PER_PGD  2048   // PGD中可包含的指针<u32>数 (32-21=11bit)</p><p>#define PTE_HWTABLE_PTRS (PTRS_PER_PTE)  
#define PTE_HWTABLE_OFF  (PTE_HWTABLE_PTRS * sizeof(pte_t))  
#define PTE_HWTABLE_SIZE (PTRS_PER_PTE * sizeof(u32))</p><p>/*  
 * PMD_SHIFT determines the size of the area a second-level page table can map  
 * PGDIR_SHIFT determines what a third-level page table entry can map  
 */  
#define PMD_SHIFT  21  
#define PGDIR_SHIFT  21

虚拟地址SHIFT宏图:

虚拟地址MASK和SIZE宏图:

2.2 Page Table Entry

PTEs, PMDs和PGDs分别由pte_t, pmd_t 和pgd_t来描述。为了存储保护位,pgprot_t被定义,它拥有相关的flags并经常被存储在page table entry低位(lower bits),其具体的存储方式依赖于CPU架构。

每个pte_t指向一个物理页的地址,并且所有的地址都是页对齐的。因此在32位地址中有PAGE_SHIFT(12)位是空闲的,它可以为PTE的状态位。

PTE的保护和状态位如下图所示:

2.3 如何通过3级页表访问物理内存

为了通过PGD、PMD和PTE访问物理内存,其相关宏在asm/pgtable.h中定义。

• pgd_offset

根据当前虚拟地址和当前进程的mm_struct获取pgd项的宏定义如下:

1
2
3
4
5
6
7
/* to find an entry in a page-table-directory */  
#define pgd_index(addr)     ((addr) >> PGDIR_SHIFT)  //获得在pgd表中的索引  
  
#define pgd_offset(mm, addr)    ((mm)->pgd + pgd_index(addr)) //获得pmd表的起始地址  
  
/* to find an entry in a kernel page-table-directory */  
#define pgd_offset_k(addr)  pgd_offset(&init_mm, addr)  

• pmd_offset

根据通过pgd_offset获取的pgd 项和虚拟地址,获取相关的pmd项(即pte表的起始地址)

1
2
/* Find an entry in the second-level page table.. */  
#define pmd_offset(dir, addr)   ((pmd_t *)(dir))   //即为pgd项的值  

• pte_offset

根据通过pmd_offset获取的pmd项和虚拟地址,获取相关的pte项(即物理页的起始地址)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#ifndef CONFIG_HIGHPTE  
#define __pte_map(pmd)      pmd_page_vaddr(*(pmd))  
#define __pte_unmap(pte)    do { } while (0)  
#else  
#define __pte_map(pmd)      (pte_t *)kmap_atomic(pmd_page(*(pmd)))  
#define __pte_unmap(pte)    kunmap_atomic(pte)  
#endif  
  
#define pte_index(addr)     (((addr) >> PAGE_SHIFT) & (PTRS_PER_PTE - 1))  
  
#define pte_offset_kernel(pmd,addr) (pmd_page_vaddr(*(pmd)) + pte_index(addr))  
  
#define pte_offset_map(pmd,addr)    (__pte_map(pmd) + pte_index(addr))  
#define pte_unmap(pte)          __pte_unmap(pte)  
  
#define pte_pfn(pte)        (pte_val(pte) >> PAGE_SHIFT)  
#define pfn_pte(pfn,prot)   __pte(__pfn_to_phys(pfn) | pgprot_val(prot))  
  
#define pte_page(pte)       pfn_to_page(pte_pfn(pte))  
#define mk_pte(page,prot)   pfn_pte(page_to_pfn(page), prot)  
  
#define set_pte_ext(ptep,pte,ext) cpu_set_pte_ext(ptep,pte,ext)  
#define pte_clear(mm,addr,ptep) set_pte_ext(ptep, __pte(0), 0)  

其示意图如下图所示:

2.4 根据虚拟地址获取物理页的示例代码

根据虚拟地址获取物理页的示例代码详见<mm/memory.c中的函数follow_page>。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/** 
 * follow_page - look up a page descriptor from a user-virtual address 
 * @vma: vm_area_struct mapping @address 
 * @address: virtual address to look up 
 * @flags: flags modifying lookup behaviour 
 * 
 * @flags can have FOLL_ flags set, defined in <linux/mm.h> 
 * 
 * Returns the mapped (struct page *), %NULL if no mapping exists, or 
 * an error pointer if there is a mapping to something not represented 
 * by a page descriptor (see also vm_normal_page()). 
 */  
struct page *follow_page(struct vm_area_struct *vma, unsigned long address,  
			unsigned int flags)  
{  
	pgd_t *pgd;  
	pud_t *pud;  
	pmd_t *pmd;  
	pte_t *ptep, pte;  
	spinlock_t *ptl;  
	struct page *page;  
	struct mm_struct *mm = vma->vm_mm;  
  
	page = follow_huge_addr(mm, address, flags & FOLL_WRITE);  
	if (!IS_ERR(page)) {  
		BUG_ON(flags & FOLL_GET);  
		goto out;  
	}  
  
	page = NULL;  
	pgd = pgd_offset(mm, address);  
	if (pgd_none(*pgd) || unlikely(pgd_bad(*pgd)))  
		goto no_page_table;  
  
	pud = pud_offset(pgd, address);  
	if (pud_none(*pud))  
		goto no_page_table;  
	if (pud_huge(*pud) && vma->vm_flags & VM_HUGETLB) {  
		BUG_ON(flags & FOLL_GET);  
		page = follow_huge_pud(mm, address, pud, flags & FOLL_WRITE);  
		goto out;  
	}  
	if (unlikely(pud_bad(*pud)))  
		goto no_page_table;  
  
	pmd = pmd_offset(pud, address);  
	if (pmd_none(*pmd))  
		goto no_page_table;  
	if (pmd_huge(*pmd) && vma->vm_flags & VM_HUGETLB) {  
		BUG_ON(flags & FOLL_GET);  
		page = follow_huge_pmd(mm, address, pmd, flags & FOLL_WRITE);  
		goto out;  
	}  
	if (pmd_trans_huge(*pmd)) {  
		if (flags & FOLL_SPLIT) {  
			split_huge_page_pmd(mm, pmd);  
			goto split_fallthrough;  
		}  
		spin_lock(&mm->page_table_lock);  
		if (likely(pmd_trans_huge(*pmd))) {  
			if (unlikely(pmd_trans_splitting(*pmd))) {  
				spin_unlock(&mm->page_table_lock);  
				wait_split_huge_page(vma->anon_vma, pmd);  
			} else {  
				page = follow_trans_huge_pmd(mm, address,  
								pmd, flags);  
				spin_unlock(&mm->page_table_lock);  
				goto out;  
			}  
		} else  
			spin_unlock(&mm->page_table_lock);  
		/* fall through */  
	}  
split_fallthrough:  
	if (unlikely(pmd_bad(*pmd)))  
		goto no_page_table;  
  
	ptep = pte_offset_map_lock(mm, pmd, address, &ptl);  
  
	pte = *ptep;  
	if (!pte_present(pte))  
		goto no_page;  
	if ((flags & FOLL_WRITE) && !pte_write(pte))  
		goto unlock;  
  
	page = vm_normal_page(vma, address, pte);  
	if (unlikely(!page)) {  
		if ((flags & FOLL_DUMP) ||  
			!is_zero_pfn(pte_pfn(pte)))  
			goto bad_page;  
		page = pte_page(pte);  
	}  
  
	if (flags & FOLL_GET)  
		get_page(page);  
	if (flags & FOLL_TOUCH) {  
		if ((flags & FOLL_WRITE) &&  
			!pte_dirty(pte) && !PageDirty(page))  
			set_page_dirty(page);  
		/* 
		 * pte_mkyoung() would be more correct here, but atomic care 
		 * is needed to avoid losing the dirty bit: it is easier to use 
		 * mark_page_accessed(). 
		 */  
		mark_page_accessed(page);  
	}  
	if ((flags & FOLL_MLOCK) && (vma->vm_flags & VM_LOCKED)) {  
		/* 
		 * The preliminary mapping check is mainly to avoid the 
		 * pointless overhead of lock_page on the ZERO_PAGE 
		 * which might bounce very badly if there is contention. 
		 * 
		 * If the page is already locked, we don't need to 
		 * handle it now - vmscan will handle it later if and 
		 * when it attempts to reclaim the page. 
		 */  
		if (page->mapping && trylock_page(page)) {  
			lru_add_drain();  /* push cached pages to LRU */  
			/* 
			 * Because we lock page here and migration is 
			 * blocked by the pte's page reference, we need 
			 * only check for file-cache page truncation. 
			 */  
			if (page->mapping)  
				mlock_vma_page(page);  
			unlock_page(page);  
		}  
	}  
unlock:  
	pte_unmap_unlock(ptep, ptl);  
out:  
	return page;  
  
bad_page:  
	pte_unmap_unlock(ptep, ptl);  
	return ERR_PTR(-EFAULT);  
  
no_page:  
	pte_unmap_unlock(ptep, ptl);  
	if (!pte_none(pte))  
		return page;  
  
no_page_table:  
	/* 
	 * When core dumping an enormous anonymous area that nobody 
	 * has touched so far, we don't want to allocate unnecessary pages or 
	 * page tables.  Return error instead of NULL to skip handle_mm_fault, 
	 * then get_dump_page() will return NULL to leave a hole in the dump. 
	 * But we can only make this optimization where a hole would surely 
	 * be zero-filled if handle_mm_fault() actually did handle it. 
	 */  
	if ((flags & FOLL_DUMP) &&  
		(!vma->vm_ops || !vma->vm_ops->fault))  
		return ERR_PTR(-EFAULT);  
	return page;  
}

Machine Check Exception

dmesg显示

1
2
3
4
5
6
7
8
...

sbridge: HANDLING MCE MEMORY ERROR
CPU 0: Machine Check Exception: 0 Bank 5: 8c00004000010093
TSC 0 ADDR 67081b300 MISC 2140040486 PROCESSOR 0:206d7 TIME 1441181676 SOCKET 0 APIC 0
EDAC MC0: CE row 2, channel 0, label "CPU_SrcID#0_Channel#3_DIMM#0": 1 Unknown error(s): memory read on FATAL area : cpu=0 Err=0001:0093 (ch=3), addr= 0x67081b300 => socket=0, Channel=3(mask=8), rank=0

...

保存4行log为mlog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# mcelog --ascii < /tmp/mlog
WARNING: with --dmi mcelog --ascii must run on the same machine with the
	 same BIOS/memory configuration as where the machine check occurred.
sbridge: HANDLING MCE MEMORY ERROR
CPU 0: Machine Check Exception: 0 Bank 5: 8c00004000010093
HARDWARE ERROR. This is *NOT* a software problem!
Please contact your hardware vendor
Wed Sep  2 16:14:36 2015
CPU 0 BANK 5 MISC 2140040486 ADDR 67081b300
STATUS 8c00004000010093 MCGSTATUS 0
CPUID Vendor Intel Family 6 Model 45
WARNING: SMBIOS data is often unreliable. Take with a grain of salt!
<24> DIMM 1333 Mhz Res13 Width 72 Data Width 64 Size 16 GB
Device Locator: Node0_Channel2_Dimm0
Bank Locator: Node0_Bank0
Manufacturer: Hynix Semiconducto
Serial Number: 40743B5A
Asset Tag: Dimm2_AssetTag
Part Number: HMT42GR7BFR4A-PB
TSC 0 ADDR 67081b300 MISC 2140040486 PROCESSOR 0:206d7 TIME 1441181676 SOCKET 0 APIC 0
EDAC MC0: CE row 2, channel 0, label "CPU_SrcID#0_Channel#3_DIMM#0": 1 Unknown error(s): memory read on FATAL area : cpu=0 Err=0001:0093 (ch=3), addr = 0x67081b300 => socket=0, Channel=3(mask=8), rank=0

根据
Part Number: HMT42GR7BFR4A-PB
Serial Number: 40743B5A

在lshw中找相应硬件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
...

	 *-memory:0
	      description: System Memory
	      physical id: 2d
	      slot: System board or motherboard
	    *-bank:0
	         description: DIMM 1333 MHz (0.8 ns)
	         product: HMT42GR7BFR4A-PB
	         vendor: Hynix Semiconducto
	         physical id: 0
	         serial: 905D21AE
	         slot: Node0_Channel1_Dimm0
	         size: 16GiB
	         width: 64 bits
	         clock: 1333MHz (0.8ns)
	    *-bank:1
	         description: DIMM Synchronous [empty]
	         product: A1_Dimm1_PartNumber
	         vendor: Dimm1_Manufacturer
	         physical id: 1
	         serial: Dimm1_SerNum
	         slot: Node0_Channel1_Dimm1
	         width: 64 bits
	    *-bank:2
	         description: DIMM 1333 MHz (0.8 ns)
	         product: HMT42GR7BFR4A-PB
	         vendor: Hynix Semiconducto
	         physical id: 2
	         serial: 40743B5A
	         slot: Node0_Channel2_Dimm0
	         size: 16GiB
	         width: 64 bits
	         clock: 1333MHz (0.8ns)

		...

NAPI机制分析

http://blog.csdn.net/shanshanpt/article/details/20564845

NAPI 的核心在于:在一个繁忙网络,每次有网络数据包到达时,不需要都引发中断,因为高频率的中断可能会影响系统的整体效率,假象一个场景,我们此时使用标准的 100M 网卡,可能实际达到的接收速率为 80MBits/s,而此时数据包平均长度为 1500Bytes,则每秒产生的中断数目为:

1
80M bits/s / (8 Bits/Byte * 1500 Byte) = 6667 个中断 /s

每秒 6667 个中断,对于系统是个很大的压力,此时其实可以转为使用轮询 (polling) 来处理,而不是中断;但轮询在网络流量较小的时没有效率,因此低流量时,基于中断的方式则比较合适,这就是 NAPI 出现的原因,在低流量时候使用中断接收数据包,而在高流量时候则使用基于轮询的方式接收。

现在内核中 NIC 基本上已经全部支持 NAPI 功能,由前面的叙述可知,NAPI 适合处理高速率数据包的处理,而带来的好处则是:

1、中断缓和 (Interrupt mitigation),由上面的例子可以看到,在高流量下,网卡产生的中断可能达到每秒几千次,而如果每次中断都需要系统来处理,是一个很大的压力,而 NAPI 使用轮询时是禁止了网卡的接收中断的,这样会减小系统处理中断的压力;

2、数据包节流 (Packet throttling),NAPI 之前的 Linux NIC 驱动总在接收到数据包之后产生一个 IRQ,接着在中断服务例程里将这个 skb 加入本地的 softnet,然后触发本地 NET_RX_SOFTIRQ 软中断后续处理。如果包速过高,因为 IRQ 的优先级高于 SoftIRQ,导致系统的大部分资源都在响应中断,但 softnet 的队列大小有限,接收到的超额数据包也只能丢掉,所以这时这个模型是在用宝贵的系统资源做无用功。而 NAPI 则在这样的情况下,直接把包丢掉,不会继续将需要丢掉的数据包扔给内核去处理,这样,网卡将需要丢掉的数据包尽可能的早丢弃掉,内核将不可见需要丢掉的数据包,这样也减少了内核的压力。

对NAPI 的使用,一般包括以下的几个步骤:

1、在中断处理函数中,先禁止接收中断,且告诉网络子系统,将以轮询方式快速收包,其中禁止接收中断完全由硬件功能决定,而告诉内核将以轮询方式处理包则是使用函数 netif_rx_schedule(),也可以使用下面的方式,其中的 netif_rx_schedule_prep 是为了判定现在是否已经进入了轮询模式:

将网卡预定为轮询模式

1
void netif_rx_schedule(struct net_device *dev);

或者

1
2
if (netif_rx_schedule_prep(dev))
	__netif_rx_schedule(dev);

2、在驱动中创建轮询函数,它的工作是从网卡获取数据包并将其送入到网络子系统,其原型是:

NAPI 的轮询方法

1
int (*poll)(struct net_device *dev, int *budget);

这里的轮询函数用于在将网卡切换为轮询模式之后,用 poll() 方法处理接收队列中的数据包,如队列为空,则重新切换为中断模式。切换回中断模式需要先关闭轮询模式,使用的是函数 netif_rx_complete (),接着开启网卡接收中断 .。

退出轮询模式

1
void netif_rx_complete(struct net_device *dev);

3、在驱动中创建轮询函数,需要和实际的网络设备 struct net_device 关联起来,这一般在网卡的初始化时候完成,示例代码如下:

设置网卡支持轮询模式

1
2
dev->poll = my_poll;
dev->weight = 64;

里面另外一个字段为权重 (weight),该值并没有一个非常严格的要求,实际上是个经验数据,一般 10Mb 的网卡,我们设置为 16,而更快的网卡,我们则设置为 64。

NAPI的一些相关Interface

下面是 NAPI 功能的一些接口,在前面都基本有涉及,我们简单看看:

1
netif_rx_schedule(dev)

在网卡的中断处理函数中调用,用于将网卡的接收模式切换为轮询

1
netif_rx_schedule_prep(dev)

在网卡是 Up 且运行状态时,将该网卡设置为准备将其加入到轮询列表的状态,可以将该函数看做是 netif_rx_schedule(dev) 的前半部分

1
__netif_rx_schedule(dev)

将设备加入轮询列表,前提是需要 netif_schedule_prep(dev) 函数已经返回了 1

1
__netif_rx_schedule_prep(dev)

与 netif_rx_schedule_prep(dev) 相似,但是没有判断网卡设备是否 Up 及运行,不建议使用

1
netif_rx_complete(dev)

用于将网卡接口从轮询列表中移除,一般在轮询函数完成之后调用该函数。

1
__netif_rx_complete(dev)

Newer newer NAPI

其实之前的 NAPI(New API) 这样的命名已经有点让人忍俊不禁了,可见 Linux 的内核极客们对名字的掌控,比对代码的掌控差太多,于是乎,连续的两次对 NAPI 的重构,被戏称为 Newer newer NAPI 了。

与 netif_rx_complete(dev) 类似,但是需要确保本地中断被禁止

Newer newer NAPI

在最初实现的 NAPI 中,有 2 个字段在结构体 net_device 中,分别为轮询函数 poll() 和权重 weight,而所谓的 Newer newer NAPI,是在 2.6.24 版内核之后,对原有的 NAPI 实现的几次重构,其核心是将 NAPI 相关功能和 net_device 分离,这样减少了耦合,代码更加的灵活,因为 NAPI 的相关信息已经从特定的网络设备剥离了,不再是以前的一对一的关系了。例如有些网络适配器,可能提供了多个 port,但所有的 port 却是共用同一个接受数据包的中断,这时候,分离的 NAPI 信息只用存一份,同时被所有的 port 来共享,这样,代码框架上更好地适应了真实的硬件能力。Newer newer NAPI 的中心结构体是napi_struct:

NAPI 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/* 
 * Structure for NAPI scheduling similar to tasklet but with weighting 
*/ 
struct napi_struct { 
	/* The poll_list must only be managed by the entity which 
	 * changes the state of the NAPI_STATE_SCHED bit.  This means 
	 * whoever atomically sets that bit can add this napi_struct 
	 * to the per-cpu poll_list, and whoever clears that bit 
	 * can remove from the list right before clearing the bit. 
	 */ 
	struct list_head      poll_list; 

	unsigned long          state; 
	int              weight; 
	int              (*poll)(struct napi_struct *, int); 
 #ifdef CONFIG_NETPOLL 
	spinlock_t          poll_lock; 
	int              poll_owner; 
 #endif 

	unsigned int          gro_count; 

	struct net_device      *dev; 
	struct list_head      dev_list; 
	struct sk_buff          *gro_list; 
	struct sk_buff          *skb; 
};

熟悉老的 NAPI 接口实现的话,里面的字段 poll_list、state、weight、poll、dev、没什么好说的,gro_count 和 gro_list 会在后面讲述 GRO 时候会讲述。需要注意的是,与之前的 NAPI 实现的最大的区别是该结构体不再是 net_device 的一部分,事实上,现在希望网卡驱动自己单独分配与管理 napi 实例,通常将其放在了网卡驱动的私有信息,这样最主要的好处在于,如果驱动愿意,可以创建多个 napi_struct,因为现在越来越多的硬件已经开始支持多接收队列 (multiple receive queues),这样,多个 napi_struct 的实现使得多队列的使用也更加的有效。

与最初的 NAPI 相比较,轮询函数的注册有些变化,现在使用的新接口是:

1
2
void netif_napi_add(struct net_device *dev, struct napi_struct *napi, 
					int (*poll)(struct napi_struct *, int), int weight)

熟悉老的 NAPI 接口的话,这个函数也没什么好说的。

值得注意的是,前面的轮询 poll() 方法原型也开始需要一些小小的改变:

1
int (*poll)(struct napi_struct *napi, int budget);

大部分 NAPI 相关的函数也需要改变之前的原型,下面是打开轮询功能的 API:

1
2
3
4
5
6
7
void netif_rx_schedule(struct net_device *dev, 
						struct napi_struct *napi); 
/* ...or... */ 
int netif_rx_schedule_prep(struct net_device *dev, 
						struct napi_struct *napi); 
void __netif_rx_schedule(struct net_device *dev, 
						struct napi_struct *napi);

轮询功能的关闭则需要使用:

1
2
void netif_rx_complete(struct net_device *dev, 
						struct napi_struct *napi);

因为可能存在多个 napi_struct 的实例,要求每个实例能够独立的使能或者禁止,因此,需要驱动作者保证在网卡接口关闭时,禁止所有的 napi_struct 的实例。

函数 netif_poll_enable() 和 netif_poll_disable() 不再需要,因为轮询管理不再和 net_device 直接管理,取而代之的是下面的两个函数:

1
2
void napi_enable(struct napi *napi); 
void napi_disable(struct napi *napi);

linux下ip协议(V4)的实现

这次主要介绍的是ip层的切片与组包的实现。

首先来看一下分片好的帧的一些概念:

1 第一个帧的offset位非0并且MF位为1

2 所有的在第一个帧和最后一个帧之间的帧都拥有长度大于0的域

3 最后一个帧MF位为0 并且offset位非0。(这样就能判断是否是最后一个帧了).

这里要注意在linux中,ip头的frag_off域包含了 rfcip头的定义中的nf,df,以及offset域,因此我们每次需要按位与来取得相应的域的值,看下面

ip_local_deliver的代码片段就清楚了:

1
2
3
4
5
	// 取出mf位和offset域,从而决定是否要组包。
	if (ip_hdr(skb)->frag_off & htons(IP_MF | IP_OFFSET)) {
		if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
			return 0;
	}

而fragmentation/defragmentation 子系统的初始化是通过ipfrag_init来实现了,而它是被inet_init来调用的。它主要做的是注册sys文件系统节点,并开启一个定时器,以及初始化一些相关的变量.这个函数的初始化以及相关的数据结构的详细介绍,我们会在后面的组包小节中介绍。现在我们先来看切片的处理。

相对于组包,切片逻辑什么的都比较简单。切片的主要函数是ip_fragment.它的输入包包括下面几种:

1 要被转发的包(没有切片的)。

2 要被转发的包(已经被路由器或者源主机切片了的).

3 被本地函数所创建的buffer,简而言之也就是本地所要传输的数据包(还未加包头),但是需要被切片的。

而ip_fragment所必须处理下面几种情况:

1 一大块数据需要被分割为更小的部分。

2 一堆数据片段(我的上篇blog有介绍,也就是ip_append_data已经切好的数据包,或者tcp已经切好的数据包)不需要再被切片。

上面的两种情况其实就是看高层(4层)协议有没有做切片工作(按照PMTU)了。如果已经被切片(其实也算不上切片(4层不能处理ip头),只能说i4层为了ip层更好的处理数据包,从而帮ip层做了一部分工作),则ip层所做的很简单,就是给每个包加上ip头就可以了。

切片分为两种类型,一种是fast (或者说 efficient)切片,这种也就是4层已经切好片,这里只需要加上ip头就可以了,一种是slow切片,也就是需要现在切片。

下来来看切片的主要任务:

1 将数据包切片为MTU大小(通过ptmu).

2 初始化每一个fragment的ip 头。还要判断一些option的copy位,因为并不是每一种option都要放在所有已切片的fragment 的ip头中的。

3 计算ip层的校验值。

4 通过netfilter过滤。

5 update 一些kernel 域以及snmp 统计值。

接下来来看ip_fragment的具体实现:

1
int ip_fragment(struct sk_buff *skb, int (*output)(struct sk_buff*))

第一个参数skb表示将要被切片的ip包,第二个参数是一个传输切片的输出函数(切片完毕后就交给这个函数处理)。比如ip_finish_output2类似的。

这个函数我们来分段看,首先来看它进行切片前的一些准备工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
	// 先是取出了一些下面将要使用的变量。
	struct iphdr *iph;
	int raw = 0;
	int ptr;
	struct net_device *dev;
	struct sk_buff *skb2;
	unsigned int mtu, hlen, left, len, ll_rs, pad;
	int offset;
	__be16 not_last_frag;
	// 路由表
	struct rtable *rt = skb->rtable;
	int err = 0;
	// 网络设备
	dev = rt->u.dst.dev;

	// ip头
	iph = ip_hdr(skb);
	// 判断DF位,我们知道如果df位被设置了话就表示不要被切片,这时ip_fragment将会发送一个icmp豹纹返回到源主机。这里主要是为forward数据所判断。
	if (unlikely((iph->frag_off & htons(IP_DF)) && !skb->local_df)) {
		IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGFAILS);
		icmp_send(skb, ICMP_DEST_UNREACH, ICMP_FRAG_NEEDED,
			  htonl(ip_skb_dst_mtu(skb)));
		kfree_skb(skb);
		return -EMSGSIZE;
	}
	// 得到ip头的长度
	hlen = iph->ihl * 4;
	// 得到mtu的大小。这里要注意,他的大小减去了hlen,也就是ip头的大小。
	mtu = dst_mtu(&rt->u.dst) - hlen;    /* Size of data space */
	IPCB(skb)->flags |= IPSKB_FRAG_COMPLETE;

不管是slow还是fast 被切片的任何一个帧如果传输失败,ip_fragment都会立即返回一个错误给4层,并且紧跟着的帧也不会再被传输,然后将处理方法交给4层去做。

接下来我们来看fast 切片。 一般用fast切片的都是经由4层的ip_append_data和ip_push_pending函数(udp)将数据包已经切片好的,或者是tcp层已经切片好的数据包,才会用fast切片.

这里要主要几个问题:
1 每一个切片的大小都不能超过PMTU。
2 只有最后一个切片才会有3层的整个数据包的大小。
3 每一个切片都必须有足够的大小来允许2层加上自己的头。

我们先看一下skb_pagelen这个函数(下面的处理会用到),这个函数用来得到当前skb的len,首先我们要知道(我前面的blog有介绍)在sk_write_queue的sk_buff队列中,每一个sk_buff的len = x(也就是么一个第一个切片的包的l4 payload的长度) + S1 (这里表示所有的frags域的数据的总大小,也就是data_len的长度)。可以先看下面的图:

很容易一目了然。

1
2
3
4
5
6
7
8
9
static inline int skb_pagelen(const struct sk_buff *skb)
{
	int i, len = 0;
	// 我们知道如果设备支持S/G IO的话,nr_frags会包含一些L4 payload,因此我们需要先遍历nr_frags.然后加入它的长度。
	for (i = (int)skb_shinfo(skb)->nr_frags - 1; i >= 0; i--)
		len += skb_shinfo(skb)->frags[i].size;
	// 最后加上skb_headlen,而skb_headlen = skb->len - skb->data_len;因此这里就会返回这个数据包的len。
	return len + skb_headlen(skb);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
	// 通过上一篇blog我们知道,如果4层将数据包分片了,那么就会把这些数据包放到skb的frag_list链表中,因此我们这里首先先判断frag_list链表是否为空,为空的话我们将会进行slow 切片。
	if (skb_shinfo(skb)->frag_list) {
		struct sk_buff *frag;
		// 取得第一个数据报的len.我们知道当sk_write_queue队列被flush后,除了第一个切好包的另外的包都会加入到frag_list中,而这里我们我们需要得到的第一个包(也就是本身这个sk_buff)的长度。
		int first_len = skb_pagelen(skb);
		int truesizes = 0;
		// 接下来的判断都是为了确定我们能进行fast切片。切片不能被共享,这是因为在fast path 中,我们需要加给每个切片不同的ip头(而并不会复制每个切片)。因此在fast path中是不可接受的。而在slow path中,就算有共享也无所谓,因为他会复制每一个切片,使用一个新的buff。

		// 判断第一个包长度是否符合一些限制(包括mtu,mf位等一些限制).如果第一个数据报的len没有包含mtu的大小这里之所以要把第一个切好片的数据包单独拿出来检测,是因为一些域是第一个包所独有的(比如IP_MF要为1)。这里由于这个mtu是不包括hlen的mtu,因此我们需要减去一个hlen。
		if (first_len - hlen > mtu ||
			((first_len - hlen) & 7) ||
			(iph->frag_off & htons(IP_MF|IP_OFFSET)) ||
			skb_cloned(skb))
			goto slow_path;
		// 遍历剩余的frag。
		for (frag = skb_shinfo(skb)->frag_list; frag; frag = frag->next) {
			/* Correct geometry. */
			// 判断每个帧的mtu,以及相关的东西,如果不符合条件则要进行slow path,基本和上面的第一个skb的判断类似。
			if (frag->len > mtu ||
				((frag->len & 7) && frag->next) ||
				skb_headroom(frag) < hlen)
				goto slow_path;
			// 判断是否共享。
			/* Partially cloned skb? */
			if (skb_shared(frag))
				goto slow_path;

			BUG_ON(frag->sk);
			// 进行socket的一些操作。
			if (skb->sk) {
				sock_hold(skb->sk);
				frag->sk = skb->sk;
				frag->destructor = sock_wfree;
				truesizes += frag->truesize;
			}
		}

		// 通过上面的检测,都通过了,因此我们可以进行fast path切片了。

		// 先是设置一些将要处理的变量的值。
		err = 0;
		offset = 0;
		// 取得frag_list列表
		frag = skb_shinfo(skb)->frag_list;
		skb_shinfo(skb)->frag_list = NULL;

		// 得到数据(不包括头)的大小。
		skb->data_len = first_len - skb_headlen(skb);
		skb->truesize -= truesizes;
		// 得到
		skb->len = first_len;
		iph->tot_len = htons(first_len);
		// 设置mf位
		iph->frag_off = htons(IP_MF);
		// 执行校验
		ip_send_check(iph);

		for (;;) {
			// 开始进行发送。
			if (frag) {
				// 设置校验位
				frag->ip_summed = CHECKSUM_NONE;
				// 设置相应的头部。
				skb_reset_transport_header(frag);
				__skb_push(frag, hlen);
				skb_reset_network_header(frag);
				// 复制ip头。
				memcpy(skb_network_header(frag), iph, hlen);
				// 修改每个切片的ip头的一些属性。
				iph = ip_hdr(frag);
				iph->tot_len = htons(frag->len);
				// 将当前skb的一些属性付给将要传递的切片好的帧。
				ip_copy_metadata(frag, skb);
				if (offset == 0)
				// 处理ip_option
					ip_options_fragment(frag);
				offset += skb->len - hlen;
				// 设置位移。
				iph->frag_off = htons(offset>>3);
				if (frag->next != NULL)
					iph->frag_off |= htons(IP_MF);
				/* Ready, complete checksum */
				ip_send_check(iph);
			}
			// 调用输出函数。
			err = output(skb);

			if (!err)
				IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGCREATES);
			if (err || !frag)
				break;
			// 处理链表中下一个buf。
			skb = frag;
			frag = skb->next;
			skb->next = NULL;
		}

		if (err == 0) {
			IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGOKS);
			return 0;
		}
		// 释放内存。
		while (frag) {
			skb = frag->next;
			kfree_skb(frag);
			frag = skb;
		}
		IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGFAILS);
		return err;
	}

再接下来我们来看slow fragmentation:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
	// 切片开始的位移
	left = skb->len - hlen;      /* Space per frame */
	// 而ptr就是切片开始的指针。
	ptr = raw + hlen;       /* Where to start from */

	/* for bridged IP traffic encapsulated inside f.e. a vlan header,
	 * we need to make room for the encapsulating header
	 */
	// 处理桥接的相关操作。
	pad = nf_bridge_pad(skb);
	ll_rs = LL_RESERVED_SPACE_EXTRA(rt->u.dst.dev, pad);
	mtu -= pad;

	// 其实也就是取出取出ip offset域。
	offset = (ntohs(iph->frag_off) & IP_OFFSET) << 3;
	// not_last_frag,顾名思义,其实也就是表明这个帧是否是最后一个切片。
	not_last_frag = iph->frag_off & htons(IP_MF);


	// 开始为循环处理,每一个切片创建一个skb buffer。
	while (left > 0) {
		len = left;
		// 如果len大于mtu,我们设置当前的将要切片的数据大小为mtu。
		if (len > mtu)
			len = mtu;
		// 长度也必须位对齐。
		if (len < left)  {
			len &= ~7;
		}
		// malloc一个新的buff。它的大小包括ip payload,ip head,以及L2 head.
		if ((skb2 = alloc_skb(len+hlen+ll_rs, GFP_ATOMIC)) == NULL) {
			NETDEBUG(KERN_INFO "IP: frag: no memory for new fragment!\n");
			err = -ENOMEM;
			goto fail;
		}
		// 调用ip_copy_metadata复制一些相同的值的域。
		ip_copy_metadata(skb2, skb);
		// 进行skb的相关操作。为了加上ip头。
		skb_reserve(skb2, ll_rs);
		skb_put(skb2, len + hlen);
		skb_reset_network_header(skb2);
		skb2->transport_header = skb2->network_header + hlen;
		// 将每一个分片的ip包都关联到源包的socket上。
		if (skb->sk)
			skb_set_owner_w(skb2, skb->sk);
		// 开始填充新的ip包的数据。

		// 先拷贝包头。
		skb_copy_from_linear_data(skb, skb_network_header(skb2), hlen);
		// 拷贝数据部分,这个函数实现的比较复杂。
		if (skb_copy_bits(skb, ptr, skb_transport_header(skb2), len))
			BUG();
		left -= len;
		// 填充相应的ip头。
		iph = ip_hdr(skb2);
		iph->frag_off = htons((offset >> 3));

		// 第一个包,因此进行ip_option处理。
		if (offset == 0)
			ip_options_fragment(skb);
		// 不是最后一个包,因此设置mf位。
		if (left > 0 || not_last_frag)
			iph->frag_off |= htons(IP_MF);
		// 移动指针以及更改位移大小。
		ptr += len;
		offset += len;
		// update包头的大小。
		iph->tot_len = htons(len + hlen);
		// 重新计算校验。
		ip_send_check(iph);
		//最终输出。
		err = output(skb2);
		if (err)
			goto fail;

		IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGCREATES);
	}
	kfree_skb(skb);
	IP_INC_STATS(dev_net(dev), IPSTATS_MIB_FRAGOKS);
	return err;

接下来来看ip组包的实现。首先要知道每一个切片(属于同一个源包的)的ip包 id都是相同的。

首先来看相应的数据结构。在内核中,每一个ip包(切片好的)都是一个struct ipq链表。而不同的数据包(这里指不是属于同一个源包的数据包)都保

存在一个hash表中。也就是ip4_frags这个变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static struct inet_frags ip4_frags;

#define INETFRAGS_HASHSZ        64

struct inet_frags {
	struct hlist_head   hash[INETFRAGS_HASHSZ];
	rwlock_t        lock;
	// 随机值,它被用在计算hash值上面,下面会介绍到,过一段时间,内核就会更新这个值。
	u32         rnd;
	int         qsize;
	int         secret_interval;
	struct timer_list   secret_timer;
	// hash函数
	unsigned int        (*hashfn)(struct inet_frag_queue *);
	void            (*constructor)(struct inet_frag_queue *q,
						void *arg);
	void            (*destructor)(struct inet_frag_queue *);
	void            (*skb_free)(struct sk_buff *);
	int         (*match)(struct inet_frag_queue *q,
						void *arg);
	void            (*frag_expire)(unsigned long data);
};

struct ipq {
	struct inet_frag_queue q;
	u32     user;
	// 都是ip头相关的一些域。
	__be32      saddr;
	__be32      daddr;
	__be16      id;
	u8      protocol;
	int             iif;
	unsigned int    rid;
	struct inet_peer *peer;
};

struct inet_frag_queue {
	struct hlist_node   list;
	struct netns_frags  *net;
	// 基于LRU算法,主要用在GC上。
	struct list_head    lru_list;   /* lru list member */
	spinlock_t      lock;
	atomic_t        refcnt;
	// 属于同一个源的数据包的定时器,当定时器到期,切片还没到达,此时就会drop掉所有的数据切片。
	struct timer_list   timer;      /* when will this queue expire? */
	// 保存有所有的切片链表(从属于同一个ip包)
	struct sk_buff      *fragments; /* list of received fragments */
	ktime_t         stamp;
	int         len;        /* total length of orig datagram */
	// 表示从源ip包已经接收的字节数。
	int         meat;
	// 这个域主要可以设置为下面的3种值。
	__u8            last_in;    /* first/last segment arrived? */

// 完成,第一个帧以及最后一个帧。
#define INET_FRAG_COMPLETE  4
#define INET_FRAG_FIRST_IN  2
#define INET_FRAG_LAST_IN   1
};

看下面的图就一目了然了:

首先来看组包要解决的一些问题:

1 fragment必须存储在内存中,知道他们全部都被网络子系统处理。才会释放,因此内存会是个巨大的浪费。

2 这里虽然使用了hash表,可是假设恶意攻击者得到散列算法并且伪造数据包来尝试着降低一些hash表中的元素的比重,从而使执行变得缓慢。这里linux使用一个定时器通过制造的随机数来使hash值的生成不可预测。

这个定时器的初始化是通过ipfrag_init(它会初始化上面提到的ip4_frags全局变量)调用inet_frags_init进行的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void inet_frags_init(struct inet_frags *f)
{
	int i;

	for (i = 0; i < INETFRAGS_HASHSZ; i++)
		INIT_HLIST_HEAD(&f->hash[i]);

	rwlock_init(&f->lock);

	f->rnd = (u32) ((num_physpages ^ (num_physpages>>7)) ^
				   (jiffies ^ (jiffies >> 6)));
	// 安装定时器,当定时器到期就会调用inet_frag_secret_rebuild方法。
	setup_timer(&f->secret_timer, inet_frag_secret_rebuild,
			(unsigned long)f);
	f->secret_timer.expires = jiffies + f->secret_interval;
	add_timer(&f->secret_timer);
}

static void inet_frag_secret_rebuild(unsigned long dummy)
{
................................................

	write_lock(&f->lock);
	// 得到随机值
	get_random_bytes(&f->rnd, sizeof(u32));

	// 然后通过这个随机值重新计算整个hash表的hash值。
	for (i = 0; i < INETFRAGS_HASHSZ; i++) {
		struct inet_frag_queue *q;
		struct hlist_node *p, *n;

		hlist_for_each_entry_safe(q, p, n, &f->hash[i], list) {
			unsigned int hval = f->hashfn(q);

			if (hval != i) {
				hlist_del(&q->list);

				/* Relink to new hash chain. */
				hlist_add_head(&q->list, &f->hash[hval]);
			}
		}
	}
..............................................
}

3 ip协议是不可靠的,因此切片有可能被丢失。内核处理这个,是使用了一个定时器(每个数据包(也就是这个切片从属于的那个数据包)).当定时器到期,而切片没有到达,就会丢弃这个包。

4 由于ip协议是无连接的,因此当高层决定重传数据包的时候,组包时有可能会出现多个重复分片的情况。这是因为ip包是由4个域来判断的,源和目的地址,包id以及4层的协议类型。而最主要的是包id。可是包id只有16位,因此一个gigabit网卡几乎在半秒时间就能用完这个id一次。而第二次重传的数据包有可能走的和第一个第一次时不同的路径,因此内核必须每个切片都要检测和前面接受的切片的重叠情况的发生。

先来看ip_defrag用到的几个函数:

inet_frag_create: 创建一个新的ipq实例

ip_evitor: remove掉所有的未完成的数据包。它每次都会update一个LRU链表。每次都会把一个新的ipq数据结构加到ipq_lru_list的结尾。

ip_find: 发现切片所从属的数据包的切片链表。

ip_frag_queue: 排队一个给定的切片刀一个切片列表。这个经常和上一个方法一起使用。

ip_frag_reasm: 当所有的切片都到达后,build一个ip数据包。

ip_frag_destroy: remove掉传进来的ipq数据结构。包括和他有联系的所有的ip切片。

ipq_put: 将引用计数减一,如果为0,则直接调用ip_frag_destroy.

1
2
3
4
5
static inline void inet_frag_put(struct inet_frag_queue *q, struct inet_frags *f)
{
	if (atomic_dec_and_test(&q->refcnt))
		inet_frag_destroy(q, f, NULL);
}

ipq_kill: 主要用在gc上,标记一个ipq数据结构可以被remove,由于一些帧没有按时到达。

接下来来看ip_defrag的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int ip_defrag(struct sk_buff *skb, u32 user)
{
	struct ipq *qp;
	struct net *net;

	net = skb->dev ? dev_net(skb->dev) : dev_net(skb->dst->dev);
	IP_INC_STATS_BH(net, IPSTATS_MIB_REASMREQDS);

	// 如果内存不够,则依据lru算法进行清理。
	if (atomic_read(&net->ipv4.frags.mem) > net->ipv4.frags.high_thresh)
		ip_evictor(net);

	// 查找相应的iqp,如果不存在则会新创建一个(这些都在ip_find里面实现)
	if ((qp = ip_find(net, ip_hdr(skb), user)) != NULL) {
		int ret;

		spin_lock(&qp->q.lock);
		// 排队进队列。
		ret = ip_frag_queue(qp, skb);

		spin_unlock(&qp->q.lock);
		ipq_put(qp);
		return ret;
	}

	IP_INC_STATS_BH(net, IPSTATS_MIB_REASMFAILS);
	kfree_skb(skb);
	return -ENOMEM;
}

我们可以看到这里最重要的一个函数其实是ip_frag_queue,它主要任务是:

1 发现输入帧在源包的位置。
2 基于blog刚开始所描述的,判断是否是最后一个切片。
3 插入切片到切片列表(从属于相同的ip包)
4 update 垃圾回收所用到的ipq的一些相关域。
5 校验l4层的校验值(在硬件计算).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// 其中qp是源ip包的所有切片链表,而skb是将要加进来切片。
static int ip_frag_queue(struct ipq *qp, struct sk_buff *skb)
{
	.............................
	//  INET_FRAG_COMPLETE表示所有的切片包都已经抵达,这个时侯就不需要再组包了,因此这里就是校验函数有没有被错误的调用。
	if (qp->q.last_in & INET_FRAG_COMPLETE)
		goto err;
	.................................................
	// 将offset 8字节对齐、
	offset = ntohs(ip_hdr(skb)->frag_off);
	flags = offset & ~IP_OFFSET;
	offset &= IP_OFFSET;
	offset <<= 3;     /* offset is in 8-byte chunks */
	ihl = ip_hdrlen(skb);

	// 计算这个新的切片包的结束位置。
	end = offset + skb->len - ihl;
	err = -EINVAL;

	// MF没有设置,表明这个帧是最后一个帧。进入相关处理。
	if ((flags & IP_MF) == 0) {
		/* If we already have some bits beyond end
		 * or have different end, the segment is corrrupted.
		 */
	// 设置相应的len位置,以及last_in域。
		if (end < qp->q.len ||
			((qp->q.last_in & INET_FRAG_LAST_IN) && end != qp->q.len))
			goto err;
		qp->q.last_in |= INET_FRAG_LAST_IN;
		qp->q.len = end;
	} else {
		// 除了最后一个切片,每个切片都必须是8字节的倍数。
		if (end&7) {
			// 不是8字节的倍数,kernel截断这个切片。此时就需要l4层的校验重新计算,因此设置ip_summed为 CHECKSUM_NONE
			end &= ~7;
			if (skb->ip_summed != CHECKSUM_UNNECESSARY)
				skb->ip_summed = CHECKSUM_NONE;
		}
		if (end > qp->q.len) {
			// 数据包太大,并且是最后一个包,则表明这个数据包出错,因此drop它。
			/* Some bits beyond end -> corruption. */
			if (qp->q.last_in & INET_FRAG_LAST_IN)
				goto err;
			qp->q.len = end;
		}
	}
	// ip头不能被切片,因此end肯定会大于offset。
	if (end == offset)
		goto err;

	err = -ENOMEM;
	// remove掉ip头。
	if (pskb_pull(skb, ihl) == NULL)
		goto err;
	// trim掉一些padding,然后重新计算checksum。
	err = pskb_trim_rcsum(skb, end - offset);
	if (err)
		goto err;

	// 接下来遍历并将切片(为了找出当前将要插入的切片的位置),是以offset为基准。这里要合租要FRAG_CB宏是用来提取sk_buff->cb域。
	prev = NULL;
	for (next = qp->q.fragments; next != NULL; next = next->next) {
		if (FRAG_CB(next)->offset >= offset)
			break;  /* bingo! */
		prev = next;
	}
	// 当prev!=NULL时,说明这个切片要插入到列表当中。
	if (prev) {
		// 计算有没有重叠。
		int i = (FRAG_CB(prev)->offset + prev->len) - offset;
		// 大于0.证明有重叠,因此进行相关处理
		if (i > 0) {
			// 将重叠部分用新的切片覆盖。
			offset += i;
			err = -EINVAL;
			if (end <= offset)
				goto err;
			err = -ENOMEM;
			//移动i个位置。
			if (!pskb_pull(skb, i))
				goto err;
			// 需要重新计算L4的校验。
			if (skb->ip_summed != CHECKSUM_UNNECESSARY)
				skb->ip_summed = CHECKSUM_NONE;
		}
	}

	err = -ENOMEM;

	while (next && FRAG_CB(next)->offset < end) {
		// 和上面的判断很类似,也是先计算重叠数。这里要注意重叠分为两种情况:1;一个或多个切片被新的切片完全覆盖。2;被部分覆盖,因此这里我们需要分两种情况进行处理。
		int i = end - FRAG_CB(next)->offset; /* overlap is 'i' bytes */

		if (i < next->len) {
			// 被部分覆盖的情况。将新的切片offset移动i字节,然后remove掉老的切片中的i个字节。
			/* Eat head of the next overlapped fragment
			 * and leave the loop. The next ones cannot overlap.
			 */
			if (!pskb_pull(next, i))
				goto err;
			FRAG_CB(next)->offset += i;
			// 将接收到的源数据报的大小减去i,也就是remove掉不完全覆盖的那一部分。
			qp->q.meat -= i;
			// 重新计算l4层的校验。
			if (next->ip_summed != CHECKSUM_UNNECESSARY)
				next->ip_summed = CHECKSUM_NONE;
			break;
		} else {
			// 老的切片完全被新的切片覆盖,此时只需要remove掉老的切片就可以了。
			struct sk_buff *free_it = next;
			next = next->next;

			if (prev)
				prev->next = next;
			else
				qp->q.fragments = next;
			// 将qp的接受字节数更新。
			qp->q.meat -= free_it->len;
			frag_kfree_skb(qp->q.net, free_it, NULL);
		}
	}

	FRAG_CB(skb)->offset = offset;

....................................................
	atomic_add(skb->truesize, &qp->q.net->mem);
	// offset为0说明是第一个切片,因此设置相应的位。
	if (offset == 0)
		qp->q.last_in |= INET_FRAG_FIRST_IN;

	if (qp->q.last_in == (INET_FRAG_FIRST_IN | INET_FRAG_LAST_IN) &&
		qp->q.meat == qp->q.len)
		// 所有条件的满足了,就开始buildip包。
		return ip_frag_reasm(qp, prev, dev);
	write_lock(&ip4_frags.lock);
	// 从将此切片加入到lry链表中。
	list_move_tail(&qp->q.lru_list, &qp->q.net->lru_list);
	write_unlock(&ip4_frags.lock);
	return -EINPROGRESS;

err:
	kfree_skb(skb);
	return err;
}

如果网络设备提供L4层的硬件校验的话,输入ip帧还会进行L4的校验计算。当帧通过ip_frag_reasm组合好,它会进行校验的重新计算。我们这里通过设置skb->ip_summed到CHECKSUM_NONE,来表示需要娇艳的标志。

最后来看下GC。

内核为ip切片数据包实现了两种类型的垃圾回收。

1 系统内存使用限制。

2 组包的定时器

这里有一个全局的ip_frag_mem变量,来表示当前被切片所占用的内存数。每次一个新的切片被加入,这个值都会更新。而所能使用的最大内存可以在运行时改变,是通过/proc的sysctl_ipfrag_high_thresh来改变的,因此我们能看到当ip_defrag时,一开始会先判断内存的限制:

1
2
if (atomic_read(&net->ipv4.frags.mem) > net->ipv4.frags.high_thresh)
		ip_evictor(net);

当一个切片数据包到达后,内核会启动一个组包定时器,他是为了避免一个数据包占据ipq_hash太长时间,因此当定时器到期后,它就会清理掉在hash表中的相应的qp结构(也就是所有的未完成切片包).这个处理函数就是ip_expire,它的初始化是在ipfrag_init进行的。:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static void ip_expire(unsigned long arg)
{
	struct ipq *qp;
	struct net *net;
	// 取出相应的qp,以及net域。
	qp = container_of((struct inet_frag_queue *) arg, struct ipq, q);
	net = container_of(qp->q.net, struct net, ipv4.frags);

	spin_lock(&qp->q.lock);
	// 如果数据包已经传输完毕,则不进行任何处理,直接退出。
	if (qp->q.last_in & INET_FRAG_COMPLETE)
		goto out;
	// 调用ipq_kill,这个函数主要是减少qp的引用计数,并从相关链表(比如LRU_LIST)中移除它。
	ipq_kill(qp);

	IP_INC_STATS_BH(net, IPSTATS_MIB_REASMTIMEOUT);
	IP_INC_STATS_BH(net, IPSTATS_MIB_REASMFAILS);

	// 如果是第一个切片,则发送一个ICMP给源主机。
	if ((qp->q.last_in & INET_FRAG_FIRST_IN) && qp->q.fragments != NULL) {
		struct sk_buff *head = qp->q.fragments;

		/* Send an ICMP "Fragment Reassembly Timeout" message. */
		if ((head->dev = dev_get_by_index(net, qp->iif)) != NULL) {
			icmp_send(head, ICMP_TIME_EXCEEDED, ICMP_EXC_FRAGTIME, 0);
			dev_put(head->dev);
		}
	}
out:
	spin_unlock(&qp->q.lock);
	ipq_put(qp);
}

dev_queue_xmi函数详解

blog.chinaunix.net/uid-20788636-id-3181312.html

前面在分析IPv6的数据流程时,当所有的信息都准备好了之后,例如,出口设备,下一跳的地址,以及链路层地址。就会调用dev.c文件中的dev_queue_xmin函数,该函数是设备驱动程序执行传输的接口。也就是所有的数据包在填充完成后,最终发送数据时,都会调用该函数。

dev_queue_xmit函数只接收一个skb_buff结构作为输入的值。此数据结构包含了此函数所需要的一切信息。Skb->dev是出口设备,skb->data为有效的载荷的开头,其长度为skb->len.下面是2.6.37版本内核中的dev_queue_xmit函数,该版本的内核与之前的版本有了不少的区别。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
int dev_queue_xmit(struct sk_buff *skb)
{
	struct net_device *dev = skb->dev;
	struct netdev_queue *txq;
	struct Qdisc *q;
	int rc = -ENOMEM;

	/* Disable soft irqs for various locks below. Also
	 * stops preemption for RCU.
	 */
	//关闭软中断 - __rcu_read_lock_bh()--->local_bh_disable();
	rcu_read_lock_bh();
	// 选择一个发送队列,如果设备提供了select_queue回调函数就使用它,否则由内核选择一个队列,这里只是Linux内核多队列的实现,但是要真正的使用都队列,需要网卡支持多队列才可以,一般的网卡都只有一个队列。在调用alloc_etherdev分配net_device是,设置队列的个数
	txq = dev_pick_tx(dev, skb);
	//从netdev_queue结构上获取设备的qdisc
	q = rcu_dereference_bh(txq->qdisc);

#ifdef CONFIG_NET_CLS_ACT
	skb->tc_verd = SET_TC_AT(skb->tc_verd, AT_EGRESS);
#endif
	//如果硬件设备有队列可以使用,该函数由dev_queue_xmit函数直接调用或由dev_queue_xmit通过qdisc_run函数调用
	trace_net_dev_queue(skb);
	if (q->enqueue) {
		rc = __dev_xmit_skb(skb, q, dev, txq); //使用流控对象发送数据包(包含入队和出队)
		//更详细的内容参考说明3
		goto out;
	}

	//下面的处理是在没有发送队列的情况下
	/* The device has no queue. Common case for software devices:
	 loopback, all the sorts of tunnels...

	 Really, it is unlikely that netif_tx_lock protection is necessary
	 here. (f.e. loopback and IP tunnels are clean ignoring statistics
	 counters.)
	 However, it is possible, that they rely on protection
	 made by us here.

	 Check this and shot the lock. It is not prone from deadlocks.
	 Either shot noqueue qdisc, it is even simpler 8)
	 */
	//首先,确定设备是开启的,并且还要确定队列是运行的,启动和停止队列有驱动程序决定
	//设备没有输出队列典型的是回环设备。这里需要做的就是直接调用dev_start_queue_xmit、、函数,经过驱动发送出去,如果发送失败,就直接丢弃,没有队列可以保存。
	if (dev->flags & IFF_UP) {
		int cpu = smp_processor_id(); /* ok because BHs are off */

		if (txq->xmit_lock_owner != cpu) {

			if (__this_cpu_read(xmit_recursion) > RECURSION_LIMIT)
				goto recursion_alert;

			HARD_TX_LOCK(dev, txq, cpu);

			if (!netif_tx_queue_stopped(txq)) {
				__this_cpu_inc(xmit_recursion);
				rc = dev_hard_start_xmit(skb, dev, txq);//见说明4
				__this_cpu_dec(xmit_recursion);
				if (dev_xmit_complete(rc)) {
					HARD_TX_UNLOCK(dev, txq);
					goto out;
				}
			}
			HARD_TX_UNLOCK(dev, txq);
			if (net_ratelimit())
				printk(KERN_CRIT "Virtual device %s asks to "
				 "queue packet!\n", dev->name);
		} else {
			/* Recursion is It is possible,
			 * unfortunately
			 */
recursion_alert:
			if (net_ratelimit())
				printk(KERN_CRIT "Dead loop on virtual device "
				 "%s, fix it urgently!\n", dev->name);
		}
	}

	rc = -ENETDOWN;
	rcu_read_unlock_bh();

	kfree_skb(skb);
	return rc;
out:
	rcu_read_unlock_bh();
	return rc;
}
1. 下面是dev_pick_tx函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static struct netdev_queue *dev_pick_tx(struct net_device *dev,
					struct sk_buff *skb)
{
	int queue_index;
	const struct net_device_ops *ops = dev->netdev_ops;

	if (ops->ndo_select_queue) {
		//选择一个索引,这个策略可以设置,比如优先选择视频和音频队列,而哪个队列邦定哪个策略也是设定的。
		queue_index = ops->ndo_select_queue(dev, skb);
		queue_index = dev_cap_txqueue(dev, queue_index);
	} else {
		struct sock *sk = skb->sk;
		queue_index = sk_tx_queue_get(sk);
		if (queue_index < 0 || queue_index >= dev->real_num_tx_queues) {

			queue_index = 0;
			if (dev->real_num_tx_queues > 1)
				queue_index = skb_tx_hash(dev, skb);

			if (sk) {
				struct dst_entry *dst = rcu_dereference_check(sk->sk_dst_cache, 1);

				if (dst && skb_dst(skb) == dst)
					sk_tx_queue_set(sk, queue_index);
			}
		}
	}

	skb_set_queue_mapping(skb, queue_index);
	return netdev_get_tx_queue(dev, queue_index);
}
2. 下面是其中的一种网卡类型调用函数alloc_etherdev时,
1
dev = alloc_etherdev(sizeof(struct ether1_priv));

其实该函数是一个宏定义:其中第二参数表示的就是队列的数量,这里在Linux2.6.37内核中找到的一种硬件网卡的实现,可用的队列是1个。

1
#define alloc_etherdev(sizeof_priv) alloc_etherdev_mq(sizeof_priv, 1)

下面是alloc_etherdev_mq函数的定义实现。

1
2
3
4
struct net_device *alloc_etherdev_mq(int sizeof_priv, unsigned int queue_count)
{
	return alloc_netdev_mq(sizeof_priv, "eth%d", ether_setup, queue_count);
}
3.

几乎所有的设备都会使用队列调度出口的流量,而内核可以使用对了规则的算法安排那个帧进行发送,使其以最优效率的次序进行传输。这里检查这个队列中是否有enqueue函数,如果有则说明设备会使用这个队列,否则需另外处理。关于enqueue函数的设置,我找到dev_open->dev_activate中调用了qdisc_create_dflt来设置,需要注意的是,这里并不是将传进来的skb直接发送,而是先入队,然后调度队列,具体发送哪个数据包由enqueue和dequeue函数决定,这体现了设备的排队规则

Enqueue 把一个元素添加的队列

Dequeue 从队列中提取一个元素

Requeue 把一个原先已经提取的元素放回到队列,可以由于传输失败。

if (q->enqueue)为真的话,表明这个设备有队列,可以进行相关的流控。调用__dev_xmit_skb函数进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
				 struct net_device *dev,
				 struct netdev_queue *txq)
{
	spinlock_t *root_lock = qdisc_lock(q);
	bool contended = qdisc_is_running(q);
	int rc;

	/*
	 * Heuristic to force contended enqueues to serialize on a
	 * separate lock before trying to get qdisc main lock.
	 * This permits __QDISC_STATE_RUNNING owner to get the lock more often
	 * and dequeue packets faster.
	 */
	if (unlikely(contended))
		spin_lock(&q->busylock);

	spin_lock(root_lock);
	if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
		kfree_skb(skb);
		rc = NET_XMIT_DROP;
	} else if ((q->flags & TCQ_F_CAN_BYPASS) && !qdisc_qlen(q) &&
		 qdisc_run_begin(q)) {
		/*
		 * This is a work-conserving queue; there are no old skbs
		 * waiting to be sent out; and the qdisc is not running -
		 * xmit the skb directly.
		 */
		if (!(dev->priv_flags & IFF_XMIT_DST_RELEASE))
			skb_dst_force(skb);
		__qdisc_update_bstats(q, skb->len);
		if (sch_direct_xmit(skb, q, dev, txq, root_lock)) {
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);
		} else
			qdisc_run_end(q);

		rc = NET_XMIT_SUCCESS;
	} else {
		skb_dst_force(skb);
		rc = qdisc_enqueue_root(skb, q);
		if (qdisc_run_begin(q)) {
			if (unlikely(contended)) {
				spin_unlock(&q->busylock);
				contended = false;
			}
			__qdisc_run(q);
		}
	}
	spin_unlock(root_lock);
	if (unlikely(contended))
		spin_unlock(&q->busylock);
	return rc;
}

_dev_xmit_skb函数主要做两件事情:
(1) 如果流控对象为空的,试图直接发送数据包。
(2) 如果流控对象不空,将数据包加入流控对象,并运行流控对象。

当设备进入调度队列准备传输时,qdisc_run函数就会选出下一个要传输的帧,而该函数会间接的调用相关联的队列规则dequeue函数,从对了中取出数据进行传输。

有两个时机将会调用qdisc_run():
1.__dev_xmit_skb()
2.软中断服务线程NET_TX_SOFTIRQ

其实,真正的工作有qdisc_restart函数实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void __qdisc_run(struct Qdisc *q)
{
	unsigned long start_time = jiffies;

	while (qdisc_restart(q)) { //返回值大于0,说明流控对象非空。
		/*
		 * Postpone processing if
		 * 1. another process needs the CPU;
		 * 2. we've been doing it for too long.
		 */
		if (need_resched() || jiffies != start_time) { //已经不允许继续运行本流控对象。
			__netif_schedule(q); //将本队列加入软中断的output_queue链表中。
			break;
		}
	}

	qdisc_run_end(q);
}

如果发现本队列运行的时间太长了,将会停止队列的运行,并将队列加入output_queue链表头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static inline int qdisc_restart(struct Qdisc *q)
{
	struct netdev_queue *txq;
	struct net_device *dev;
	spinlock_t *root_lock;
	struct sk_buff *skb;

	/* Dequeue packet */
	skb = dequeue_skb(q);//一开始就调用dequeue函数。
	if (unlikely(!skb))
		return 0;
	WARN_ON_ONCE(skb_dst_is_noref(skb));
	root_lock = qdisc_lock(q);
	dev = qdisc_dev(q);
	txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb));

	return sch_direct_xmit(skb, q, dev, txq, root_lock);//用于发送数据包
}
* Returns to the caller:
 *                0 - queue is empty or throttled.
 *                >0 - queue is not empty.
 */
int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q,
		 struct net_device *dev, struct netdev_queue *txq,
		 spinlock_t *root_lock)
{
	int ret = NETDEV_TX_BUSY;

	/* And release qdisc */
	spin_unlock(root_lock);

	HARD_TX_LOCK(dev, txq, smp_processor_id());
	if (!netif_tx_queue_stopped(txq) && !netif_tx_queue_frozen(txq)) //设备没有被停止,且发送队列没有被冻结
		ret = dev_hard_start_xmit(skb, dev, txq); //发送数据包

	HARD_TX_UNLOCK(dev, txq);

	spin_lock(root_lock);

	if (dev_xmit_complete(ret)) {
		/* Driver sent out skb successfully or skb was consumed */
		//发送成功,返回新的队列的长度
		ret = qdisc_qlen(q);
	} else if (ret == NETDEV_TX_LOCKED) {
		/* Driver try lock failed */
		ret = handle_dev_cpu_collision(skb, txq, q);
	} else {
		/* Driver returned NETDEV_TX_BUSY - requeue skb */
		if (unlikely (ret != NETDEV_TX_BUSY && net_ratelimit()))
			printk(KERN_WARNING "BUG %s code %d qlen %d\n",
			 dev->name, ret, q->q.qlen);
		 //设备繁忙,重新调度发送(利用softirq)
		ret = dev_requeue_skb(skb, q);
	}

	if (ret && (netif_tx_queue_stopped(txq) ||
		 netif_tx_queue_frozen(txq)))
		ret = 0;

	return ret;
}
4. 我们看一下下面的发送函数。

从此函数可以看出,当驱动使用发送队列的时候会循环从队列中取出包发送, 而不使用队列的时候只发送一次,如果没发送成功就直接丢弃

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
struct netdev_queue *txq)
{
	const struct net_device_ops *ops = dev->netdev_ops;//驱动程序的函数集
	int rc = NETDEV_TX_OK;

	if (likely(!skb->next)) {
		if (!list_empty(&ptype_all))
			dev_queue_xmit_nit(skb, dev);//如果dev_add_pack加入的是ETH_P_ALL,那么就会复制一份给你的回调函数。

		/*
		 * If device doesnt need skb->dst, release it right now while
		 * its hot in this cpu cache
		 */
		if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
			skb_dst_drop(skb);

		skb_orphan_try(skb);

		if (vlan_tx_tag_present(skb) &&
		 !(dev->features & NETIF_F_HW_VLAN_TX)) {
			skb = __vlan_put_tag(skb, vlan_tx_tag_get(skb));
			if (unlikely(!skb))
				goto out;

			skb->vlan_tci = 0;
		}

		if (netif_needs_gso(dev, skb)) {
			if (unlikely(dev_gso_segment(skb)))
				goto out_kfree_skb;
			if (skb->next)
				goto gso;
		} else {
			if (skb_needs_linearize(skb, dev) &&
			 __skb_linearize(skb))
				goto out_kfree_skb;

			/* If packet is not checksummed and device does not
			 * support checksumming for this protocol, complete
			 * checksumming here.
			 */
			if (skb->ip_summed == CHECKSUM_PARTIAL) {
				skb_set_transport_header(skb, skb->csum_start -
					 skb_headroom(skb));
				if (!dev_can_checksum(dev, skb) &&
				 skb_checksum_help(skb))
					goto out_kfree_skb;
			}
		}

		rc = ops->ndo_start_xmit(skb, dev);//调用网卡的驱动程序发送数据。不同的网络设备有不同的发送函数
		trace_net_dev_xmit(skb, rc);
		if (rc == NETDEV_TX_OK)
			txq_trans_update(txq);
		return rc;
	}

gso:
	do {
		struct sk_buff *nskb = skb->next;

		skb->next = nskb->next;
		nskb->next = NULL;

		/*
		 * If device doesnt need nskb->dst, release it right now while
		 * its hot in this cpu cache
		 */
		if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
			skb_dst_drop(nskb);

		rc = ops->ndo_start_xmit(nskb, dev); //调用网卡的驱动程序发送数据。不同的网络设备有不同的发送函数
		trace_net_dev_xmit(nskb, rc);
		if (unlikely(rc != NETDEV_TX_OK)) {
			if (rc & ~NETDEV_TX_MASK)
				goto out_kfree_gso_skb;
			nskb->next = skb->next;
			skb->next = nskb;
			return rc;
		}
		txq_trans_update(txq);
		if (unlikely(netif_tx_queue_stopped(txq) && skb->next))
			return NETDEV_TX_BUSY;
	} while (skb->next);

out_kfree_gso_skb:
	if (likely(skb->next == NULL))
		skb->destructor = DEV_GSO_CB(skb)->destructor;
out_kfree_skb:
	kfree_skb(skb);
out:
	return rc;
}
5.下面看一下dev_queue_xmit_nit函数。

对于通过socket(AF_PACKET,SOCK_RAW,htons(ETH_P_ALL))创建的原始套接口,不但可以接受从外部输入的数据包,而且对于由于本地输出的数据包,如果满足条件,也可以能接受。

该函数就是用来接收由于本地输出的数据包,在链路层的输出过程中,会调用此函数,将满足条件的数据包输入到RAW套接口,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static void dev_queue_xmit_nit(struct sk_buff *skb, struct net_device *dev)
{
	struct packet_type *ptype;

#ifdef CONFIG_NET_CLS_ACT
	if (!(skb->tstamp.tv64 && (G_TC_FROM(skb->tc_verd) & AT_INGRESS)))
		net_timestamp_set(skb);-----------------(1)
#else
	net_timestamp_set(skb);
#endif

	rcu_read_lock();
	list_for_each_entry_rcu(ptype, &ptype_all, list) {-----------------(2)
		/* Never send packets back to the socket
		 * they originated from - MvS (miquels@drinkel.ow.org)
		 */
		if ((ptype->dev == dev || !ptype->dev) &&
		 (ptype->af_packet_priv == NULL ||
		 (struct sock *)ptype->af_packet_priv != skb->sk)) {-----------------(3)
			struct sk_buff *skb2 = skb_clone(skb, GFP_ATOMIC); -----------------(4)
			if (!skb2)
				break;

			/* skb->nh should be correctly
			 set by sender, so that the second statement is
			 just protection against buggy protocols.
			 */
			skb_reset_mac_header(skb2);

			if (skb_network_header(skb2) < skb2->data ||
			 skb2->network_header > skb2->tail) {
				if (net_ratelimit())
					printk(KERN_CRIT "protocol %04x is "
					 "buggy, dev %s\n",
					 ntohs(skb2->protocol),
					 dev->name);
				skb_reset_network_header(skb2); -----------------(5)
			}

			skb2->transport_header = skb2->network_header;
			skb2->pkt_type = PACKET_OUTGOING;
			ptype->func(skb2, skb->dev, ptype, skb->dev); -----------------(6)
		}
	}
	rcu_read_unlock();
}

说明:
(1) 记录该数据包输入的时间戳
(2) 遍历ptype_all链表,查找所有符合输入条件的原始套接口,并循环将数据包输入到满足条件的套接口
(3) 数据包的输出设备与套接口的输入设备相符或者套接口不指定输入设备,并且该数据包不是有当前用于比较的套接口输出,此时该套接口满足条件,数据包可以输入
(4) 由于该数据包是额外输入到这个原始套接口的,因此需要克隆一个数据包
(5) 校验数据包是否有效
(6) 将数据包输入原始套接口

6. 对于lookback设备来说处理有些不同。它的hard_start_xmit函数是loopback_xmit

在net/lookback.c文件中,定义的struct net_device_ops loopback_ops结构体

1
2
3
4
5
static const struct net_device_ops loopback_ops = {
	.ndo_init = loopback_dev_init,
	.ndo_start_xmit= loopback_xmit,
	.ndo_get_stats64 = loopback_get_stats64,
};

从这里可以看到起发送函数为loopback_xmit函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static netdev_tx_t loopback_xmit(struct sk_buff *skb,
				 struct net_device *dev)
{
	struct pcpu_lstats *lb_stats;
	int len;

	skb_orphan(skb);

	skb->protocol = eth_type_trans(skb, dev);

	/* it's OK to use per_cpu_ptr() because BHs are off */
	lb_stats = this_cpu_ptr(dev->lstats);

	len = skb->len;
	if (likely(netif_rx(skb) == NET_RX_SUCCESS)) {//直接调用了netif_rx进行了接收处理
		u64_stats_update_begin(&lb_stats->syncp);
		lb_stats->bytes += len;
		lb_stats->packets++;
		u64_stats_update_end(&lb_stats->syncp);
	}

	return NETDEV_TX_OK;
}
7. 已经有了dev_queue_xmit函数,为什么还需要软中断来发送呢?

dev_queue_xmit是对skb做些最后的处理并且第一次尝试发送,软中断是将前者发送失败或者没发完的包发送出去。

主要参考文献:

Linux发送函数dev_queue_xmit分析 http://shaojiashuai123456.iteye.com/blog/842236

TC流量控制实现分析(初步) http://blog.csdn.net/wwwlkk/article/details/5929308

Linux内核源码剖析 TCP/IP实现