kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp
sar -n DEV 1

SSL协议握手过程报文解析

https://blog.csdn.net/tterminator/article/details/50675540

SSL建立握手连接目的:

1.身份的验证,client与server确认对方是它相连接的,而不是第三方冒充的,通过证书实现

2.client与server交换session key,用于连接后数据的传输加密和hash校验

简单的SSL握手连接过程(仅Server端交换证书给client):

1.client发送ClientHello,指定版本,随机数(RN),所有支持的密码套件(CipherSuites)

2.server回应ServerHello,指定版本,RN,选择CipherSuites,会话ID(Session ID)

3.server发送Certificate

4.Server发送ServerHelloDone

5.Client发送ClientKeyExchange,用于与server交换session key

6.Client发送ChangeCipherSpec,指示Server从现在开始发送的消息都是加密过的

7.Client发送Finishd,包含了前面所有握手消息的hash,可以让server验证握手过程是否被第三方篡改

8.Server发送ChangeCipherSpec,指示Client从现在开始发送的消息都是加密过的

9.Server发送Finishd,包含了前面所有握手消息的hash,可以让client验证握手过程是否被第三方篡改,并且证明自己是Certificate密钥的拥有者,即证明自己的身份

下面从抓包数据来具体分析这一过程并说明各部分数据的作用以及如实现前面列出的握手的目标,当然了,最重要的还是说明为何这一过程是安全可靠的,第三方无法截获,篡改或者假冒

1.client发送ClientHello

每一条消息都会包含有ContentType,Version,HandshakeType等信息。

ContentType指示SSL通信处于哪个阶段,是握手(Handshake),开始加密传输(ChangeCipherSpec)还是正常通信(Application)等,见下表

1
2
3
4
5
6
Hex  Dec Type

0x14  20  ChangeCipherSpec
0x15  21  Alert
0x16  22  Handshake
0x17  23  Application

Version是TLS的版本,见下表

1
2
3
4
5
6
Major Version    Minor Version   Version Type

3 0   SSLv3
3 1   TLS 1.0
3 2   TLS 1.1
3 3   TLS 1.2

Handshake Type是在handshanke阶段中的具体哪一步,见下表

1
2
3
4
5
6
7
8
9
10
11
12
Code Description

0 HelloRequest
1 ClientHello
2 ServerHello
11    Certificate
12    ServerKeyExchange
13    CertificateRequest
14    ServerHelloDone
15    CertificateVerify
16    ClientKeyExchange
20    Finished

ClientHello附带的数据随机数据RN,会在生成session key时使用,Cipher suite列出了client支持的所有加密算法组合,可以看出每一组包含3种算法,一个是非对称算法,如RSA,一个是对称算法如DES,3DES,RC4等,一个是Hash算法,如MD5,SHA等,server会从这些算法组合中选取一组,作为本次SSL连接中使用。

2.server回应ServerHello

ession id,如果SSL连接断开,再次连接时,可以使用该属性重新建立连接,在双方都有缓存的情况下可以省略握手的步骤。

server端也会生成随机的RN,用于生成session key使用

server会从client发送的Cipher suite列表中跳出一个,这里挑选的是RSA+RC4+MD5

这次server共发送的3个handshake 消息:Serverhello,Certificate和ServerHelloDone,共用一个ContentType:Handshake

3.server发送Certificate

server的证书信息,只包含public key,server将该public key对应的private key保存好,用于证明server是该证书的实际拥有者,那么如何验证呢?原理很简单:client随机生成一串数,用server这里的public key加密(显然是RSA算法),发给server,server用private key解密后返回给client,client与原文比较,如果一致,则说明server拥有private key,也就说明与client通信的正是证书的拥有者,因为public key加密的数据,只有private key才能解密,目前的技术还没发破解。利用这个原理,也能实现session key的交换,加密前的那串随机数就可用作session key,因为除了client和server,没有第三方能获得该数据了。原理很简单,实际使用时会复杂很多,数据经过多次hash,伪随机等的运算,前面提到的client和server端得RN都会参与计算。

4.Server发送ServerHelloDone

5.Client发送ClientKeyExchange

client拿到server的certificate后,就可以开始利用certificate里的public key进行session key的交换了。从图中可以看出,client发送的是130字节的字节流,显然是加过密的。client随机生成48字节的Pre-master secret,padding后用public key加密就得到这130字节的数据发送给server,server解密也能得到Pre-master secret。双方使用pre-master secret, “master secret"常量字节流,前期交换的server端RN和client的RN作为参数,使用一个伪随机函数PRF,其实就是hash之后再hash,最后得到48字节的master secret。master secret再与"key expansion"常量,双方RN经过伪随机函数运算得到key_block,PRF伪随机函数可以可以仿佛循环输出数据,因此我们想得到多少字节都可以,就如Random伪随机函数,给它一个种子,后续用hash计算能得到无数个随机数,如果每次种子相同,得到的序列是一样的,但是这里的输入时48字节的master secret,2个28字节的RN和一个字符串常量,碰撞的可能性是很小的。得到key block后,算法,就从中取出session key,IV(对称算法中使用的初始化向量)等。client和server使用的session key是不一样的,但只要双方都知道对方使用的是什么就行了。这里会取出4个:client/server加密正文的key,client/server计算handshake数据hash的key。

6.Client发送ChangeCipherSpec

client指示Server从现在开始发送的消息都是加密过的

7.Client发送Finished

client发送的加密数据,这个消息非常关键,一是能证明握手数据没有被篡改过,二也能证明自己确实是密钥的拥有者(这里是单边验证,只有server有certificate,server发送的Finished能证明自己含有private key,原理是一样的)。client将之前发送的所有握手消息存入handshake messages缓存,进行MD5和SHA-1两种hash运算,再与前面的master secret和一串常量"client finished"进行PRF伪随机运算得到12字节的verify data,还要经过改进的MD5计算得到加密信息。为什么能证明上述两点呢,前面说了只有密钥的拥有者才能解密得到pre-master key,master key,最后得到key block后,进行hash运算得到的结果才与发送方的一致。

8.Server发送ChangeCipherSpec

Server指示client从现在开始发送的消息都是加密过的

9.Server发送Finishd

与client发送Finished计算方法一致。server发送的Finished里包含hash给client,client会进行校验,如果通过,说明握手过程中的数据没有被第三方篡改过,也说明server是之前交换证书的拥有者。现在双方就可以开始后续通信,进入Application context了。

Linux网络栈之队列

https://zhensheng.im/2017/08/11/%e7%bf%bb%e8%af%91linux%e7%bd%91%e7%bb%9c%e6%a0%88%e4%b9%8b%e9%98%9f%e5%88%97.meow

数据包队列是任何一个网络栈的核心组件,数据包队列实现了异步模块之间的通讯,提升了网络性能,并且拥有影响延迟的副作用。本文的目标,是解释Linux的网络栈中IP数据包在何处排队,新的延迟降低技术如BQL是多么的有趣,以及如何控制缓冲区以降低延迟。

下面这张图片将会贯穿全文,其多个修改版本将会用来解释一些特别的概念。

图片一 – Simplified high level overview of the queues on the transmit path of the Linux 网络栈

驱动队列(Driver Queue,又名环形缓冲)

在内核的IP stack和网络接口控制器(NIC)之间,存在一个驱动队列。这个队列典型地以一个先进先出的环形缓冲区实现 —— 即一个固定大小的缓冲区。驱动队列并不附带数据包数据,而是持有指向内核中名为socket kernel buffers(SKBs)的结构体的描述符,SKBs持有数据包的数据并且在整个内核中使用。

图片 2 – Partially full driver queue with descriptors pointing to SKBs

驱动队列的输入来源是一个为所有数据包排队的IP stack,这些数据包可能是本地生成,或者在一个路由器上,由一个NIC接收然后选路从另一个NIC发出。数据包从IP stack入队到驱动队列后,将会被驱动程序执行出队操作,然后通过数据总线进行传输。

驱动队列之所以存在,是为了保证系统无论在任何需要传输数据, NIC都能立即传输。换言之,驱动队列从硬件上给予了IP stack一个异步数据排队的地方。一个可选的方式是当NIC可以传输数据时,主动向IP stack索取数据,但这种设计模式下,无法实时对NIC响应,浪费了珍贵的传输机会,损失了网络吞吐量。另一个与此相反的方法是IP stack创建一个数据包后,需要同步等待NIC,直到NIC可以发送数据包,这也不是一个好的设计模式,因为在同步等待的过程中IP stack无法执行其它工作。

巨型数据包

绝大多数的NIC都拥有一个固定的最大传输单元(MTU),意思是物理媒介可以传输的最大帧。以太网默认的MTU是1,500字节,但一些以太网络支持上限9,000字节的巨型帧(Jumbo Frames)。在IP 网络栈中,MTU描述了一个可被传输的数据包大小上限。例如,一个应用程序通过TCP socket发送了2,000字节的数据,IP stack就需要把这份数据拆分成数个数据包,以保持单个数据包的小于或等于MTU(1,500)。传输大量数据时,小的MTU将会产生更多分包。

为了避免大量数据包排队,Linux内核实现了数个优化:TCP segmentation offload (TSO), UDP fragmentation offload (UFO) 和 generic segmentation offload (GSO),这些优化机制允许IP stack创建大于出口NIC MTU的数据包。以IPv4为例,可以创建上限为65,536字节的数据包,并且可以入队到驱动队列。在TSO和UFO中,NIC在硬件上实现并负责拆分大数据包,以适合在物理链路上传输。对于没有TSO和UFO支持的NIC,GSO则在软件上实现同样的功能。

前文提到,驱动队列只有固定容量,只能存放固定数量的描述符,由于TSO,UFO和GSO的特性,使得大型的数据包可以加入到驱动队列当中,从而间接地增加了队列的容量。图三与图二的比较,解释了这个概念。

图片 3 – Large packets can be sent to the NIC when TSO, UFO or GSO are enabled. This can greatly increase the number of bytes in the driver queue.

虽然本文的其余部分重点介绍传输路径,但值得注意的是Linux也有工作方式像TSO,UFO和GSO的接收端优化。这些优化的目标也是减少每一个数据包的开销。特别地,generic receive offload (GRO)允许NIC驱动把接收到的数据包组合成一个大型数据包,然后加入IP stack。在转发数据包的时候,为了维护端对端IP数据包的特性,GRO会重新组合接收到的数据包。然而,这只是单端效果,当大型数据包在转发方处拆分时,将会出现多个数据包一次性入队的情况,这种数据包"微型突发"会给网络延迟带来负面影响。

饥饿与延迟

先不讨论必要性与优点,在IP stack和硬件之间的队列描述了两个问题:饥饿与延迟。

如果NIC驱动程序要处理队列,此时队列为空,NIC将会失去一个传输数据的机会,导致系统的生产量降低。这种情况定义为饥饿。需要注意:当操作系统没有任何数据需要传输时,队列为空的话,并不归类为饥饿,而是正常。为了避免饥饿,IP stack在填充驱动队列的同时,NIC驱动程序也要进行出队操作。糟糕的是,队列填满或为空的事件持续的时间会随着系统和外部的情况而变化。例如,在一个繁忙的操作系统上,IP stack很少有机会往驱动队列中入队数据包,这样有很大的几率出现驱动队列为空的情况。拥有一个大容量的驱动队列缓冲区,有利于减少饥饿的几率,提高网络吞吐量。

虽然一个大的队列有利于增加吞吐量,但缺点也很明显:提高了延迟。

图片 4 – Interactive packet (yellow) behind bulk flow packets (blue)

图片4展示了驱动队列几乎被单个高流量(蓝色)的TCP段填满。队列中最后一个数据包来自VoIP或者游戏(黄色)。交互式应用,例如VoIP或游戏会在固定的间隔发送小数据包,占用大量带宽的数据传输会使用高数据包传输速率传输大量数据包,高速率的数据包传输将会在交互式数据包之间插入大量数据包,从而导致这些交互式数据包延迟传输。为了进一步解释这种情况,假设有如下场景:

一个网络接口拥有5 Mbit/sec(或5,000,000 bit/sec)的传输能力

每一个大流量的数据包都是1,500 bytes或12,000 bits。

每一个交互式数据包都是500 bytes。

驱动队列的长度为128。

有127个大流量数据包,还有1个交互式数据包排在队列末尾。

在上述情况下,发送127个大流量的数据包,需要(127 * 12,000) / 5,000,000 = 0.304 秒(以ping的方式来看,延迟值为304毫秒)。如此高的延迟,对于交互式程序来说是无法接受的,然而这还没计算往返时间。前文提到,通过TSO,UFO,GSO技术,大型数据包还可以在队列中排队,这将导致延迟问题更严重。

大的延迟,一般由过大、疏于管理的缓冲区造成,如Bufferbloat。更多关于此现象的细节,可以查阅控制队列延迟(Controlling Queue Delay),以及Bufferbloat项目。

如上所述,为驱动队列选择一个合适的容量是一个Goldilocks问题 – 这个值不能太小,否则损失吞吐量,也不能太大,否则过增延迟。

字节级队列限制(Byte Queue Limits (BQL))

Byte Queue Limits (BQL)是一个在Linux Kernel 3.3.0加入的新特性,以自动解决驱动队列容量问题。BQL通过添加一个协议,计算出的当前情况下避免饥饿的最小数据包缓冲区大小,以决定是否允许继续向驱动队列中入队数据包。根据前文,排队的数据包越少,数据包排队的最大发送延迟就越低。

需要注意,驱动队列的容量并不能被BQL修改,BQL做的只是计算出一个限制值,表示当时有多少的数据可以被排队。任何超过此限制的数据,是等待还是被丢弃,会根据协议而定。

BQL机制在以下两种事件发生时将会触发:数据包入队,数据包传输完成。一个简化的BQL算法版本概括如下IMIT为BQL根据当前情况计算出的限制值。

1
2
3
4
5
6
****
** 数据包入驱动队列后
****

如果队列排队数据包的总数据量超过当前限制值
禁止数据包入驱动队列

这里要清楚,被排队的数据量可以超过LIMIT,因为在TSO,UFO或GSO启用的情况下,一个大型的数据包可以通过单个操作入队,因此LIMIT检查在入队之后才发生,如果你很注重延迟,那么可能需要考虑关闭这些功能,本文后面将会提到如何实现这个需求。

BQL的第二个阶段在硬件完成数据传输后触发(pseudo-code简化版):

1
2
3
4
5
6
7
8
9
10
11
12
13
****
** 当硬件已经完成一批次数据包的发送
** (一个周期结束)
****

如果硬件在一个周期内处于饥饿状态
提高LIMIT

否则,如果硬件在一个周期内都没有进入饥饿状态,并且仍然有数据需要发送
使LIMIT减少"本周期内留下未发送的数据量"

如果驱动队列中排队的数据量小于LIMIT
  允许数据包入驱动队列

如你所见,BQL是以测试设备是否被饥饿为基础实现的。如果设备被饥饿,LIMIT值将会增加,允许更多的数据排队,以减少饥饿,如果设备整个周期内都处于忙碌状态并且队列中仍然有数据需要传输,表明队列容量大于当前系统所需,LIMIT值将会降低,以避免延迟的提升。

BQL对数据排队的影响效果如何?一个真实世界的案例也许可以给你一个感觉。我的一个服务器的驱动队列大小为256个描述符,MTU 1,500字节,意味着最多能有256 * 1,500 = 384,000字节同时排队(TSO,GSO之类的已被关闭,否则这个值将会更高)。然而,由BQL计算的限制值是3,012字节。如你所见,BQL大大地限制了排队数据量。

BQL的一个有趣方面可以从它名字的第一个词思议——byte(字节)。不像驱动队列和大多数的队列容量,BQL直接操作字节,这是因为字节数与数据包数量相比,能更有效地影响数据传输的延迟。

BQL通过限制排队的数据量为避免饥饿的最小需求值以降低网络延迟。对于移动大量在入口NIC的驱动队列处排队的数据包到queueing discipline(QDisc)层,BQL起到了非常重要的影响。QDisc层实现了更复杂的排队策略,下一节介绍Linux QDisc层。

排队规则(Queuing Disciplines (QDisc))

驱动队列是一个很简单的先进先出(FIFO)队列,它平等对待所有数据包,没有区分不同流量数据包的功能。这样的设计优点是保持了驱动程序的简单以及高效。要注意更多高级的以太网络适配器以及绝大多数的无线网络适配器支持多种独立的传输队列,但同样的都是典型的FIFO。较高层的负责选择需要使用的传输队列。

在IP stack和驱动队列之间的是排队规则(queueing discipline(QDisc))层(见图1)。这一层实现了内核的流量管理能力,如流量分类,优先级和速率调整。QDisc层通过一些不透明的tc命令进行配置。QDisc层有三个关键的概念需要理解:QDiscs,classes(类)和filters(过滤器)。

QDisc是Linux对流量队列的一个抽象化,比标准的FIFO队列要复杂得多。这个接口允许QDisc提供复杂的队列管理机制而无需修改IP stack或者NIC驱动。默认地,每一个网络接口都被分配了一个pfifo_fast QDisc,这是一个实现了简单的三频优先方案的队列,排序以数据包的TOS位为基础。尽管这是默认的,pfifo_fast QDisc离最佳选择还很远,因为它默认拥有一个很深的队列(见下文的txqueuelen)并且无法区分流量。

第二个与QDisc关系很密切的概念是类,独立的QDiscs为了以不同方式处理子集流量,可能实现类。例如,分层令牌桶(Hierarchical Token Bucket (HTB))QDisc允许用户配置一个500 Kbps和300 Kbps的类,然后根据需要,把流量归为特定类。需要注意,并非所有QDiscs拥有对多个类的支持——那些被称为类的QDiscs。

过滤器(也被称为分类器),是一个用于流量分类到特定QDisc或类的机制。各种不同的过滤器复杂度不一,u32是一个最通用的也可能是一个最易用的流量过滤器。流量过滤器的文档比较缺乏,不过你可以在此找到使用例子:我的一个QoS脚本。

更多关于QDiscs,classes和filters的信息,可阅LARTC HOWTO,以及tc的man pages。

传输层与排队规则间的缓冲区

在前面的图片中,你可能会发现排队规则层并没有数据包队列。这意思是,网络栈直接放置数据包到排队规则中或者当队列已满时直接放回到更上层(例如socket缓冲区)。这很明显的一个问题是,如果接下来有大量数据需要发送,会发送什么?这种情况会在TCP链接发生大量堵塞或者甚至有些应用程序以其最快的速度发送UDP数据包时出现。对于一个持有单个队列的QDisc,与图4中驱动队列同样的问题将会发生,亦即单个大带宽或者高数据包传输速率流会把整个队列的空间消耗完毕,从而导致丢包,极大影响其它流的延迟。更糟糕的是,这产生了另一个缓冲点,其中可以形成standing queue,使得延迟增加并导致了TCP的RTT和拥塞窗口大小计算问题。Linux默认的pfifo_fast QDisc,由于大多数数据包TOS标记为0,因此基本可以视作单个队列,因此这种现象并不罕见。

Linux 3.6.0(2012-09-30),加入了一个新的特性,称为TCP小型队列,目标是解决上述问题。TCP小型队列限制了每个TCP流每次可在QDisc与驱动队列中排队的字节数。这有一个有趣的影响:内核会更早调度回应用程序,从而允许应用程序以更高效的优先级写入套接字。目前(2012-12-28),其它单个传输流仍然有可能淹没QDisc层。

另一个解决传输层洪水问题的方案是使用具有多个队列的QDisc,理想情况下每个网络流一个队列。随机公平队列(Stochastic Fairness Queueing (SFQ))和延迟控制公平队列(Fair Queueing with Controlled Delay (fq_codel))都有为每个网络流分配一个队列的机制,因此很适合解决这个洪水问题。

如何控制Linux的队列容量

驱动队列

ethtool命令可用于控制以太网设备驱动队列容量。ethtool也提供了底层接口分析,可以启用或关闭IP stack和设备的一些特性。

-g参数可以输出驱动队列的信息:

1
2
3
4
5
6
7
8
9
10
11
12
[root@alpha net-next]# ethtool -g eth0
Ring parameters for eth0:
Pre-set maximums:
RX:        16384
RX Mini:    0
RX Jumbo:    0
TX:        16384
Current hardware settings:
RX:        512
RX Mini:    0
RX Jumbo:    0
TX:        256

你可以从以上的输出看到本NIC的驱动程序默认拥有一个容量为256描述符的传输队列。早期,在Bufferbloat的探索中,这个队列的容量经常建议减少以降低延迟。随着BQL的使用(假设你的驱动程序支持它),再也没有任何必要去修改驱动队列的容量了(如何配置BQL见下文)。

Ethtool也允许你管理优化特性,例如TSO,UFO和GSO。-k参数输出当前的offload设置,-K修改它们。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[dan@alpha ~]$ ethtool -k eth0
Offload parameters for eth0:
rx-checksumming: off
tx-checksumming: off
scatter-gather: off
tcp-segmentation-offload: off
udp-fragmentation-offload: off
generic-segmentation-offload: off
generic-receive-offload: on
large-receive-offload: off
rx-vlan-offload: off
tx-vlan-offload: off
ntuple-filters: off
receive-hashing: off

由于TSO,GSO,UFO和GRO极大的提高了驱动队列中可以排队的字节数,如果你想优化延迟而不是吞吐量,那么你应该关闭这些特性。如果禁用这些特性,除非系统正在处理非常高的数据速率,否则您将不会注意到任何CPU影响或吞吐量降低。

Byte Queue Limits (BQL)

BQL是一个自适应算法,因此一般来说你不需要为此操心。然而,如果你想牺牲数据速率以换得最优延迟,你就需要修改LIMIT的上限值。BQL的状态和设置可以在/sys中NIC的目录找到,在我的服务器上,eth0的BQL目录是:

1
/sys/devices/pci0000:00/0000:00:14.0/net/eth0/queues/tx-0/byte_queue_limits

在该目录下的文件有:

hold_time: 修改LIMIT值的时间间隔,单位为毫秒

inflight: 还没发送且在排队的数据量

limit: BQL计算的LIMIT值,如果NIC驱动不支持BQL,值为0

limit_max: LIMIT的最大值,降低此值可以优化延迟

limit_min: LIMIT的最小值,增高此值可以优化吞吐量

要修改LIMIT的上限值,把你需要的值写入limit_max文件即可,单位为字节:

1
echo "3000" > limit_max

什么是txqueuelen?

在早期的Bufferbload讨论中,经常会提到静态地减少NIC传输队列长度。当前队列长度值可以通过ip和ifconfig命令取得。令人疑惑的是,这两个命令给了传输队列的长度不同的名字:

1
2
3
4
5
6
7
8
9
10
[dan@alpha ~]$ ifconfig eth0
eth0      Link encap:Ethernet  HWaddr 00:18:F3:51:44:10
          inet addr:69.41.199.58  Bcast:69.41.199.63  Mask:255.255.255.248
          inet6 addr: fe80::218:f3ff:fe51:4410/64 Scope:Link
          UP BROADCAST RUNNING MULTICAST  MTU:1500  Metric:1
          RX packets:435033 errors:0 dropped:0 overruns:0 frame:0
          TX packets:429919 errors:0 dropped:0 overruns:0 carrier:0
          collisions:0 txqueuelen:1000
          RX bytes:65651219 (62.6 MiB)  TX bytes:132143593 (126.0 MiB)
          Interrupt:23
1
2
3
4
5
 [dan@alpha ~]$ ip link
1: lo:  mtu 16436 qdisc noqueue state UNKNOWN
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
2: eth0:  mtu 1500 qdisc pfifo_fast state UP qlen 1000
    link/ether 00:18:f3:51:44:10 brd ff:ff:ff:ff:ff:ff

Linux默认的传输队列长度为1,000个数据包,这是一个很大的缓冲区,尤其在低带宽的情况下。

有趣的问题是,这个变量实际上是控制什么?

我也不清楚,因此我花了点时间深入探索内核源码。我现在能说的,txqueuelen只是用来作为一些排队规则的默认队列长度。例如:

1
2
3
4
5
6
7
pfifo_fast(Linux默认排队规则)
sch_fifo
sch_gred
sch_htb(只有默认队列)
sch_plug
sch_sfb
sch_teql

见图1,txqueuelen参数在排队规则中控制以上列出的队列类型的长度。绝大多数这些排队规则,tc的limit参数默认会覆盖掉txqueuelen。总的来说,如果你不是使用上述的排队规则,或者如果你用limit参数指定了队列长度,那么txqueuelen值就没有任何作用。

顺便一提,我发现一个令人疑惑的地方,ifconfig命令显示了网络接口的底层信息,例如MAC地址,但是txqueuelen却是来自高层的QDisc层,很自然的地,看起来ifconfig会输出驱动队列长度。

传输队列的长度可以使用ip或ifconfig命令修改:

1
[root@alpha dan]# ip link set txqueuelen 500 dev eth0

需要注意,ip命令使用"txqueuelen"但是输出时使用"qlen" —— 另一个不幸的不一致性。

排队规则

正如前文所描述,Linux内核拥有大量的排队规则(QDiscs),每一个都实现了自己的数据包排队方法。讨论如何配置每一个QDiscs已经超出了本文的范围。关于配置这些队列的信息,可以查阅tc的man page(man tc)。你可以使用"man tc qdisc-name"(例如:"man tc htb"或"man tc fq_codel")找到每一个队列的细节。LARTC也是一个很有用的资源,但是缺乏了一些新特性的信息。

以下是一些可能对你使用tc命令有用的建议和技巧:

HTB QDisc实现了一个接收所有未分类数据包的默认队列。一些如DRR QDiscs会直接把未分类的数据包丢进黑洞。使用命令"tc qdisc show",通过direct_packets_stat可以检查有多少数据包未被合适分类。

HTB类分层只适用于分类,对于带宽分配无效。所有带宽分配通过检查Leaves和它们的优先级进行。

QDisc中,使用一个major和一个minor数字作为QDiscs和classes的基本标识,major和minor之间使用英文冒号分隔。tc命令使用十六进制代表这些数字。由于很多字符串,例如10,在十进制和十六进制都是正确的,因此很多用户不知道tc使用十六进制。见我的tc脚本,可以查看我是如何处理这个问题的。

如果你正在使用基于ATM的ADSL(绝大多数的DLS服务是基于ATM,新的变体例如VDSL2可能不是),你很可能需要考虑添加一个"linklayer adsl"的选项。这个统计把IP数据包分解成一组53字节的ATM单元所产生的开销。

如果你正在使用PPPoE,你很可能需要考虑通过"overhead"参数统计PPPoE开销。

TCP小型队列

每个TCP Socket的队列限制可以通过/proc中的文件查看或修改:

1
/proc/sys/net/ipv4/tcp_limit_output_bytes

Linux内核的自旋锁

http://www.wowotech.net/kernel_synchronization/460.html

自旋锁用于处理器之间的互斥,适合保护很短的临界区,并且不允许在临界区睡眠。申请自旋锁的时候,如果自旋锁被其他处理器占有,本处理器自旋等待(也称为忙等待)。

进程、软中断和硬中断都可以使用自旋锁。

自旋锁的实现经历了3个阶段:

(1) 最早的自旋锁是无序竞争的,不保证先申请的进程先获得锁。

(2) 第2个阶段是入场券自旋锁,进程按照申请锁的顺序排队,先申请的进程先获得锁。

(3) 第3个阶段是MCS自旋锁。入场券自旋锁存在性能问题:所有申请锁的处理器在同一个变量上自旋等待,缓存同步的开销大,不适合处理器很多的系统。MCS自旋锁的策略是为每个处理器创建一个变量副本,每个处理器在自己的本地变量上自旋等待,解决了性能问题。

入场券自旋锁和MCS自旋锁都属于排队自旋锁(queued spinlock),进程按照申请锁的顺序排队,先申请的进程先获得锁。

1. 数据结构

自旋锁的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
include/linux/spinlock_types.h
typedef struct spinlock {
	union {
		struct raw_spinlock rlock;
		...
	};
} spinlock_t;

typedef struct raw_spinlock {
	arch_spinlock_t raw_lock;
	...
} raw_spinlock_t;

可以看到,数据类型spinlock对raw_spinlock做了封装,然后数据类型raw_spinlock对arch_spinlock_t做了封装,各种处理器架构需要自定义数据类型arch_spinlock_t。

spinlock和raw_spinlock(原始自旋锁)有什么关系?

Linux内核有一个实时内核分支(开启配置宏CONFIG_PREEMPT_RT)来支持硬实时特性,内核主线只支持软实时。

对于没有打上实时内核补丁的内核,spinlock只是封装raw_spinlock,它们完全一样。如果打上实时内核补丁,那么spinlock使用实时互斥锁保护临界区,在临界区内可以被抢占和睡眠,但raw_spinlock还是自旋锁。

目前主线版本还没有合并实时内核补丁,说不定哪天就会合并进来,为了使代码可以兼容实时内核,最好坚持3个原则:

(1)尽可能使用spinlock。

(2)绝对不允许被抢占和睡眠的地方,使用raw_spinlock,否则使用spinlock。

(3)如果临界区足够小,使用raw_spinlock。

2. 使用方法

定义并且初始化静态自旋锁的方法是:

1
DEFINE_SPINLOCK(x);

在运行时动态初始化自旋锁的方法是:

1
spin_lock_init(x);

申请自旋锁的函数是:

1
2
3
4
5
(1)void spin_lock(spinlock_t *lock); 申请自旋锁,如果锁被其他处理器占有,当前处理器自旋等待。
(2)`void spin_lock_bh(spinlock_t *lock);` 申请自旋锁,并且禁止当前处理器的软中断。
(3)`void spin_lock_irq(spinlock_t *lock);` 申请自旋锁,并且禁止当前处理器的硬中断。
(4)`spin_lock_irqsave(lock, flags);` 申请自旋锁,保存当前处理器的硬中断状态,并且禁止当前处理器的硬中断。
(5)`int spin_trylock(spinlock_t *lock);` 申请自旋锁,如果申请成功,返回1;如果锁被其他处理器占有,当前处理器不等待,立即返回0。

释放自旋锁的函数是:

1
2
3
4
(1)void spin_unlock(spinlock_t *lock);
(2)void spin_unlock_bh(spinlock_t *lock); 释放自旋锁,并且开启当前处理器的软中断。
(3)void spin_unlock_irq(spinlock_t *lock); 释放自旋锁,并且开启当前处理器的硬中断。
(4)void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags); 释放自旋锁,并且恢复当前处理器的硬中断状态。

定义并且初始化静态原始自旋锁的方法是:

1
DEFINE_RAW_SPINLOCK(x);

在运行时动态初始化原始自旋锁的方法是:

1
raw_spin_lock_init (x);

申请原始自旋锁的函数是:

1
2
3
4
5
(1)raw_spin_lock(lock) 申请原始自旋锁,如果锁被其他处理器占有,当前处理器自旋等待。
(2)raw_spin_lock_bh(lock) 申请原始自旋锁,并且禁止当前处理器的软中断。
(3)raw_spin_lock_irq(lock) 申请原始自旋锁,并且禁止当前处理器的硬中断。
(4)raw_spin_lock_irqsave(lock, flags) 申请原始自旋锁,保存当前处理器的硬中断状态,并且禁止当前处理器的硬中断。
(5)raw_spin_trylock(lock) 申请原始自旋锁,如果申请成功,返回1;如果锁被其他处理器占有,当前处理器不等待,立即返回0。

释放原始自旋锁的函数是:

1
2
3
4
(1)raw_spin_unlock(lock)
(2)raw_spin_unlock_bh(lock) 释放原始自旋锁,并且开启当前处理器的软中断。
(3)raw_spin_unlock_irq(lock) 释放原始自旋锁,并且开启当前处理器的硬中断。
(4)raw_spin_unlock_irqrestore(lock, flags) 释放原始自旋锁,并且恢复当前处理器的硬中断状态。

3. 入场券自旋锁

入场券自旋锁(ticket spinlock)的算法类似于银行柜台的排队叫号:

1
2
3
(1)锁拥有排队号和服务号,服务号是当前占有锁的进程的排队号。
(2)每个进程申请锁的时候,首先申请一个排队号,然后轮询锁的服务号是否等于自己的排队号,如果等于,表示自己占有锁,可以进入临界区,否则继续轮询。
(3)当进程释放锁时,把服务号加一,下一个进程看到服务号等于自己的排队号,退出自旋,进入临界区。

ARM64架构定义的数据类型arch_spinlock_t如下所示:

1
2
3
4
5
6
7
8
9
10
arch/arm64/include/asm/spinlock_types.h
typedef struct {
#ifdef __AARCH64EB__     /* 大端字节序(高位存放在低地址) */
	u16 next;
	u16 owner;
#else                    /* 小端字节序(低位存放在低地址) */
	u16 owner;
	u16 next;
#endif
} __aligned(4) arch_spinlock_t;

成员next是排队号,成员owner是服务号。

在多处理器系统中,函数spin_lock()负责申请自旋锁,ARM64架构的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
spin_lock() -> raw_spin_lock() -> _raw_spin_lock() -> __raw_spin_lock()  -> do_raw_spin_lock() -> arch_spin_lock()

arch/arm64/include/asm/spinlock.h
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
	unsigned int tmp;
	arch_spinlock_t lockval, newval;

	asm volatile (
	ARM64_LSE_ATOMIC_INSN (
		/* LL/SC */
		"   prfm    pstl1strm, %3\n"
		"1:   ldaxr   %w0, %3\n"
		"   add   %w1, %w0, %w5\n"
		"   stxr   %w2, %w1, %3\n"
		"   cbnz   %w2, 1b\n",
		/* 大系统扩展的原子指令 */
		"   mov   %w2, %w5\n"
		"   ldadda   %w2, %w0, %3\n"
		__nops(3)
	)

	/* 我们得到锁了吗?*/
	eor   %w1, %w0, %w0, ror #16\n"
	cbz   %w1, 3f\n"
	sevl\n"
	2:   wfe\n"
	ldaxrh   %w2, %4\n"
	eor   %w1, %w2, %w0, lsr #16\n"
	cbnz   %w1, 2b\n"
	/* 得到锁,临界区从这里开始*/
	3:"
	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
	: "Q" (lock->owner), "I" (1 << TICKET_SHIFT)
	: "memory");

第6~18行代码,申请排队号,然后把自旋锁的排队号加1,这是一个原子操作,有两种实现方法:

1)第9~13行代码,使用指令ldaxr(带有获取语义的独占加载)和stxr(独占存储)实现,指令ldaxr带有获取语义,后面的加载/存储指令必须在指令ldaxr完成之后开始执行。

2)第15~16行代码,如果处理器支持大系统扩展,那么使用带有获取语义的原子加法指令ldadda实现,指令ldadda带有获取语义,后面的加载/存储指令必须在指令ldadda完成之后开始执行。

第21~22行代码,如果服务号等于当前进程的排队号,进入临界区。

第24~27行代码,如果服务号不等于当前进程的排队号,那么自旋等待。使用指令ldaxrh(带有获取语义的独占加载,h表示halfword,即2字节)读取服务号,指令ldaxrh带有获取语义,后面的加载/存储指令必须在指令ldaxrh完成之后开始执行。

第23行代码,sevl(send event local)指令的功能是发送一个本地事件,避免错过其他处理器释放自旋锁时发送的事件。

第24行代码,wfe(wait for event)指令的功能是使处理器进入低功耗状态,等待事件。

函数spin_unlock()负责释放自旋锁,ARM64架构的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
spin_unlock() -> raw_spin_unlock() -> _raw_spin_unlock() -> __raw_spin_unlock()  -> do_raw_spin_unlock() -> arch_spin_unlock()

arch/arm64/include/asm/spinlock.h
static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
	unsigned long tmp;

	asm volatile(ARM64_LSE_ATOMIC_INSN(
	/* LL/SC */
	"    ldrh   %w1, %0\n"
	"    add   %w1, %w1, #1\n"
	"    stlrh   %w1, %0",
	/* 大多统扩展的原子指令 */
	"    mov   %w1, #1\n"
	"    staddlh   %w1, %0\n"
	__nops(1))
	: "=Q" (lock->owner), "=&r" (tmp)
	:
	: "memory");
}

把自旋锁的服务号加1,有两种实现方法:

(1)第7~9行代码,使用指令ldrh(加载,h表示halfword,即2字节)和stlrh(带有释放语义的存储)实现,指令stlrh带有释放语义,前面的加载/存储指令必须在指令stlrh开始执行之前执行完。因为一次只能有一个进程进入临界区,所以只有一个进程把自旋锁的服务号加1,不需要是原子操作。

(2)第11~12行代码,如果处理器支持大系统扩展,那么使用带有释放语义的原子加法指令staddlh实现,指令staddlh带有释放语义,前面的加载/存储指令必须在指令staddlh开始执行之前执行完。

在单处理器系统中,自旋锁是空的。

1
2
3
4
5
6
7
8
9
10
include/linux/spinlock_types_up.h
typedef struct { } arch_spinlock_t;
函数spin_lock()只是禁止内核抢占。
spin_lock() -> raw_spin_lock() -> _raw_spin_lock()
include/linux/spinlock_api_up.h
#define _raw_spin_lock(lock)             __LOCK(lock)
#define __LOCK(lock) \
  do { preempt_disable(); ___LOCK(lock); } while (0)
#define ___LOCK(lock) \
  do { __acquire(lock); (void)(lock); } while (0)

4. MCS自旋锁

入场券自旋锁存在性能问题:所有等待同一个自旋锁的处理器在同一个变量上自旋等待,申请或者释放锁的时候会修改锁,导致其他处理器存放自旋锁的缓存行失效,在拥有几百甚至几千个处理器的大型系统中,处理器申请自旋锁时竞争可能很激烈,缓存同步的开销很大,导致系统性能大幅度下降。

MCS(MCS是“Mellor-Crummey”和“Scott”这两个发明人的名字的首字母缩写)自旋锁解决了这个缺点,它的策略是为每个处理器创建一个变量副本,每个处理器在申请自旋锁的时候在自己的本地变量上自旋等待,避免缓存同步的开销。

4.1. 传统的MCS自旋锁

传统的MCS自旋锁包含:

(1)一个指针tail指向队列的尾部。

(2)每个处理器对应一个队列节点,即mcs_lock_node结构体,其中成员next指向队列的下一个节点,成员locked指示锁是否被其他处理器占有,如果成员locked的值为1,表示锁被其他处理器占有。

结构体的定义如下所示:

1
2
3
4
5
6
7
8
9
typedef struct __mcs_lock_node {
	struct __mcs_lock_node *next;
	int locked;
} ____cacheline_aligned_in_smp mcs_lock_node;

typedef struct {
	mcs_lock_node *tail;
	mcs_lock_node nodes[NR_CPUS];/* NR_CPUS是处理器的数量 */
} spinlock_t;

其中____cacheline_aligned_in_smp 的作用是:在多处理器系统中,结构体的起始地址和长度都是一级缓存行长度的整数倍。

当没有处理器占有或者等待自旋锁的时候,队列是空的,tail是空指针。

图 4.1 处理器0申请MCS自旋锁

如图 4.1所示,当处理器0申请自旋锁的时候,执行原子交换操作,使tail指向处理器0的mcs_lock_node结构体,并且返回tail的旧值。tail的旧值是空指针,说明自旋锁处于空闲状态,那么处理器0获得自旋锁。

图 4.2 处理器1申请MCS自旋锁

如图 4.2所示,当处理器0占有自旋锁的时候,处理器1申请自旋锁,执行原子交换操作,使tail指向处理器1的mcs_lock_node结构体,并且返回tail的旧值。tail的旧值是处理器0的mcs_lock_node结构体的地址,说明自旋锁被其他处理器占有,那么使处理器0的mcs_lock_node结构体的成员next指向处理器1的mcs_lock_node结构体,把处理器1的mcs_lock_node结构体的成员locked设置为1,然后处理器1在自己的mcs_lock_node结构体的成员locked上面自旋等待,等待成员locked的值变成0。

图 4.3 处理器0释放MCS自旋锁

如图 4.3所示,处理器0释放自旋锁,发现自己的mcs_lock_node结构体的成员next不是空指针,说明有申请者正在等待锁,于是把下一个节点的成员locked设置为0,处理器1获得自旋锁。

处理器1释放自旋锁,发现自己的mcs_lock_node结构体的成员next是空指针,说明自己是最后一个申请者,于是执行原子比较交换操作:如果tail指向自己的mcs_lock_node结构体,那么把tail设置为空指针。

4.2. 小巧的MCS自旋锁

传统的MCS自旋锁存在的缺陷是:结构体的长度太大,因为mcs_lock_node结构体的起始地址和长度都必须是一级缓存行长度的整数倍,所以MCS自旋锁的长度是(一级缓存行长度 + 处理器数量 * 一级缓存行长度),而入场券自旋锁的长度只有4字节。自旋锁被嵌入到内核的很多结构体中,如果自旋锁的长度增加,会导致这些结构体的长度增加。

经过内核社区技术专家的努力,成功地把MCS自旋锁放进4个字节,实现了小巧的MCS自旋锁。自旋锁的定义如下所示:

1
2
3
4
5
include/asm-generic/qspinlock_types.h

typedef struct qspinlock {
	atomic_t  val;
} arch_spinlock_t;

另外,为每个处理器定义1个队列节点数组,如下所示:

1
2
3
4
5
6
7
kernel/locking/qspinlock.c
#ifdef CONFIG_PARAVIRT_SPINLOCKS
#define MAX_NODES  8
#else
#define MAX_NODES  4
#endif
static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]);

配置宏CONFIG_PARAVIRT_SPINLOCKS用来启用半虚拟化的自旋锁,给虚拟机使用,本文不考虑这种使用场景。每个处理器需要4个队列节点,原因如下:

(1) 申请自旋锁的函数禁止内核抢占,所以进程在等待自旋锁的过程中不会被其他进程抢占。

(2) 进程在等待自旋锁的过程中可能被软中断抢占,然后软中断等待另一个自旋锁。

(3) 软中断在等待自旋锁的过程中可能被硬中断抢占,然后硬中断等待另一个自旋锁。

(4) 硬中断在等待自旋锁的过程中可能被不可屏蔽中断抢占,然后不可屏蔽中断等待另一个自旋锁。

综上所述,一个处理器最多同时等待4个自旋锁。

和入场券自旋锁相比,MCS自旋锁增加的内存开销是数组mcs_nodes。

队列节点的定义如下所示:

1
2
3
4
5
6
7
kernel/locking/mcs_spinlock.h

struct mcs_spinlock {
	struct mcs_spinlock *next;
	int locked;
	int count;
};

其中成员next指向队列的下一个节点;成员locked指示锁是否被前一个等待者占有,如果值为1,表示锁被前一个等待者占有;成员count是嵌套层数,也就是数组mcs_nodes已分配的数组项的数量。

自旋锁的32个二进制位被划分成4个字段:

(1) locked字段,指示锁已经被占有,长度是一个字节,占用第0~7位。

(2) 一个pending位,占用第8位,第1个等待自旋锁的处理器设置pending位。

(3) index字段,是数组索引,指示队列的尾部节点使用数组mcs_nodes的哪一项。

(4) cpu字段,存放队列的尾部节点的处理器编号,实际存储的值是处理器编号加上1,cpu字段减去1才是真实的处理器编号。

index字段和cpu字段合起来称为tail字段,存放队列的尾部节点的信息,布局分两种情况:

(1) 如果处理器的数量小于2的14次方,那么第9~15位没有使用,第16~17位是index字段,第18~31位是cpu字段。

(2) 如果处理器的数量大于或等于2的14次方,那么第9~10位是index字段,第11~31位是cpu字段。

把MCS自旋锁放进4个字节的关键是:存储处理器编号和数组索引,而不是存储尾部节点的地址。

内核对MCS自旋锁做了优化:第1个等待自旋锁的处理器直接在锁自身上面自旋等待,不是在自己的mcs_spinlock结构体上自旋等待。这个优化带来的好处是:当锁被释放的时候,不需要访问mcs_spinlock结构体的缓存行,相当于减少了一次缓存没命中。后续的处理器在自己的mcs_spinlock结构体上面自旋等待,直到它们移动到队列的首部为止。

自旋锁的pending位进一步扩展这个优化策略。第1个等待自旋锁的处理器简单地设置pending位,不需要使用自己的mcs_spinlock结构体。第2个处理器看到pending被设置,开始创建等待队列,在自己的mcs_spinlock结构体的locked字段上自旋等待。这种做法消除了两个等待者之间的缓存同步,而且第1个等待者没使用自己的mcs_spinlock结构体,减少了一次缓存行没命中。

在多处理器系统中,申请MCS自旋锁的代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spin_lock() -> raw_spin_lock() -> _raw_spin_lock() -> __raw_spin_lock()  -> do_raw_spin_lock() -> arch_spin_lock()

include/asm-generic/qspinlock.h

#define arch_spin_lock(l)         queued_spin_lock(l)

static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
	u32 val;

	val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL);
	if (likely(val == 0))
		return;
	ueued_spin_lock_slowpath(lock, val);

第7行代码,执行带有获取语义的原子比较交换操作,如果锁的值是0,那么把锁的locked字段设置为1。获取语义保证后面的加载/存储指令必须在函数atomic_cmpxchg_acquire()完成之后开始执行。函数atomic_cmpxchg_acquire()返回锁的旧值。

第8~9行代码,如果锁的旧值是0,说明申请锁的时候锁处于空闲状态,那么成功地获得锁。

第10行代码,如果锁的旧值不是0,说明锁不是处于空闲状态,那么执行申请自旋锁的慢速路径。

申请MCS自旋锁的慢速路径如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
kernel/locking/qspinlock.c

void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
	struct mcs_spinlock *prev, *next, *node;
	u32 new, old, tail;
	int idx;

	...
	if (val == _Q_PENDING_VAL) {
		while ((val = atomic_read(&lock->val)) == _Q_PENDING_VAL)
		cpu_relax();
	}

	for (;;) {
		if (val & ~_Q_LOCKED_MASK)
			goto queue;

		new = _Q_LOCKED_VAL;
		if (val == new)
			new |= _Q_PENDING_VAL;

		old = atomic_cmpxchg_acquire(&lock->val, val, new);
		if (old == val)
			break;

		val = old;
	}

	if (new == _Q_LOCKED_VAL)
		return;

	smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK));

	clear_pending_set_locked(lock);
	return;

queue:
	node = this_cpu_ptr(&mcs_nodes[0]);
	idx = node->count++;
	tail = encode_tail(smp_processor_id(), idx);

	node += idx;
	node->locked = 0;
	node->next = NULL;
	...

	if (queued_spin_trylock(lock))
		goto release;

	old = xchg_tail(lock, tail);
	next = NULL;

	if (old & _Q_TAIL_MASK) {
		prev = decode_tail(old);
		smp_read_barrier_depends();

		WRITE_ONCE(prev->next, node);

		...
		arch_mcs_spin_lock_contended(&node->locked);

		next = READ_ONCE(node->next);
		if (next)
			prefetchw(next);
	}

	...
	val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK));

locked:
	for (;;) {
		if ((val & _Q_TAIL_MASK) != tail) {
			set_locked(lock);
			break;
		}

		old = atomic_cmpxchg_relaxed(&lock->val, val, _Q_LOCKED_VAL);
		if (old == val)
			goto release;

		val = old;
	}

	if (!next) {
		while (!(next = READ_ONCE(node->next)))
			cpu_relax();
	}

	arch_mcs_spin_unlock_contended(&next->locked);
	...

release:
	__this_cpu_dec(mcs_nodes[0].count);
}

第8~11行代码,如果锁的状态是pending,即{tail=0,pending=1,locked=0},那么等待锁的状态变成locked,即{tail=0,pending=0,locked=1}。

第14~15行代码,如果锁的tail字段不是0或者pending位是1,说明已经有处理器在等待自旋锁,那么跳转到标号queue,本处理器加入等待队列。

第17~21行代码,如果锁处于locked状态,那么把锁的状态设置为locked & pending,即{tail=0,pending=1,locked=1};如果锁处于空闲状态(占有锁的处理器刚刚释放自旋锁),那么把锁的状态设置为locked。

第28~29行代码,如果上一步锁的状态从空闲变成locked,那么成功地获得锁。

第31行代码,等待占有锁的处理器释放自旋锁,即锁的locked字段变成0。

第32行代码,成功地获得锁,把锁的状态从pending改成locked,即清除pending位,把locked字段设置为1。

从第2个等待自旋锁的处理器开始,需要加入等待队列,处理如下:

(1) 第37~43行代码,从本处理器的数组mcs_nodes分配一个数组项,然后初始化。

(2) 第46~47行代码,如果锁处于空闲状态,那么获得锁。

(3) 第49行代码,把自旋锁的tail字段设置为本处理器的队列节点的信息,并且返回前一个队列节点的信息。

(4) 第52行代码,如果本处理器的队列节点不是队列首部,那么处理如下:

1)第56行代码,把前一个队列节点的next字段设置为本处理器的队列节点的地址。

2)第59行代码,本处理器在自己的队列节点的locked字段上面自旋等待,等待locked字段从0变成1,也就是等待本处理器的队列节点移动到队列首部。

(5) 第67行代码,本处理器的队列节点移动到队列首部以后,在锁自身上面自旋等待,等待自旋锁的pending位和locked字段都变成0,也就是等待锁的状态变成空闲。

(6) 锁的状态变成空闲以后,本处理器把锁的状态设置为locked,分两种情况:

1)第71行代码,如果队列还有其他节点,即还有其他处理器在等待锁,那么处理如下:

q第72行代码,把锁的locked字段设置为1。

q第83~86行代码,等待下一个等待者设置本处理器的队列节点的next字段。

q第88行代码,把下一个队列节点的locked字段设置为1。

2)第76行代码,如果队列只有一个节点,即本处理器是唯一的等待者,那么把锁的tail字段设置为0,把locked字段设置为1。

(7) 第92行代码,释放本处理器的队列节点。

释放MCS自旋锁的代码如下所示:

1
2
3
4
5
6
7
8
9
spin_unlock() -> raw_spin_unlock() -> _raw_spin_unlock() -> __raw_spin_unlock()  -> do_raw_spin_unlock() -> arch_spin_unlock()

include/asm-generic/qspinlock.h
#define arch_spin_unlock(l)       queued_spin_unlock(l)

static __always_inline void queued_spin_unlock(struct qspinlock *lock)
{
	(void)atomic_sub_return_release(_Q_LOCKED_VAL, &lock->val);
}

第5行代码,执行带释放语义的原子减法操作,把锁的locked字段设置为0,释放语义保证前面的加载/存储指令在函数atomic_sub_return_release()开始执行之前执行完。

MCS自旋锁的配置宏是CONFIG_ARCH_USE_QUEUED_SPINLOCKS 和CONFIG_QUEUED_SPINLOCKS,目前只有x86处理器架构使用MCS自旋锁,默认开启MCS自旋锁的配置宏,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
arch/x86/kconfig

config X86
	def_bool y
	...
	select ARCH_USE_QUEUED_SPINLOCKS
	...


kernel/kconfig.locks

config ARCH_USE_QUEUED_SPINLOCKS
	bool
config QUEUED_SPINLOCKS
	def_bool y if ARCH_USE_QUEUED_SPINLOCKS
	depends on SMP