kk Blog —— 通用基础

date [-d @int|str] [+%s|"+%F %T"]

gro收包

linux kernel 网络协议栈之GRO(Generic receive offload)

gro会合并多个gso_size不同的包, 会将gso_size设置成第一个包的gso_size.

如果此时把这个包发出去,那么就会导致不满足: skb->gso_size * (skb->segs-1) < skb->len <= skb->gso_size * skb->segs

那么后面的三个函数就有可能出错

一、tcp_shift_skb_data

1
2
3
4
5
6
7
mss = skb->gso_size
len = len/mss * mss

|---|-------|-------|
 mss    |
        V
|---|---|

二、tcp_mark_head_lost

1
2
3
4
5
6
len = (packets - cnt) * mss

|--------|--|--|
   mss   |
         V
|--------|--------|

三、tcp_match_skb_to_sack

1
2
3
4
5
6
7
8
new_len = (pkt_len/mm)*mss
in_sack = 1
pkt_len = new_len

|---|-------|-------|
 mss    |
        V
|---|---|

修改

加入发包队列前

1
2
3
skb_shinfo(skb)->gso_size = 0;
skb_shinfo(skb)->gso_segs = 0;
skb_shinfo(skb)->gso_type = 0;

tcp_trim_head BUG

http://kernel.opensuse.org/cgit/kernel/commit/?id=5b35e1e6e9ca651e6b291c96d1106043c9af314a

author Neal Cardwell ncardwell@google.com 2012-01-28 17:29:46 (GMT)
committer David S. Miller davem@davemloft.net 2012-01-30 17:42:58 (GMT)
commit 5b35e1e6e9ca651e6b291c96d1106043c9af314a (patch)
tree d18caadee5e93dc45d0c5fa2c530537cfa14586c
parent 4acb41903b2f99f3dffd4c3df9acc84ca5942cb2 (diff)

tcp: fix tcp_trim_head() to adjust segment count with skb MSS

This commit fixes tcp_trim_head() to recalculate the number of segments in the skb with the skb’s existing MSS, so trimming the head causes the skb segment count to be monotonically non-increasing - it should stay the same or go down, but not increase.

Previously tcp_trim_head() used the current MSS of the connection. But if there was a decrease in MSS between original transmission and ACK (e.g. due to PMTUD), this could cause tcp_trim_head() to counter-intuitively increase the segment count when trimming bytes off the head of an skb. This violated assumptions in tcp_tso_acked() that tcp_trim_head() only decreases the packet count, so that packets_acked in tcp_tso_acked() could underflow, leading tcp_clean_rtx_queue() to pass u32 pkts_acked values as large as 0xffffffff to ca_ops->pkts_acked().

As an aside, if tcp_trim_head() had really wanted the skb to reflect the current MSS, it should have called tcp_set_skb_tso_segs() unconditionally, since a decrease in MSS would mean that a single-packet skb should now be sliced into multiple segments.

Signed-off-by: Neal Cardwell ncardwell@google.com
Acked-by: Nandita Dukkipati nanditad@google.com
Acked-by: Ilpo Järvinen ilpo.jarvinen@helsinki.fi
Signed-off-by: David S. Miller davem@davemloft.net

1 files changed, 2 insertions, 4 deletions

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
diff --git a/net/ipv4/tcp_output.c b/net/ipv4/tcp_output.c
index 8c8de27..4ff3b6d 100644
--- a/net/ipv4/tcp_output.c
+++ b/net/ipv4/tcp_output.c
@@ -1141,11 +1141,9 @@ int tcp_trim_head(struct sock *sk, struct sk_buff *skb, u32 len)
  sk_mem_uncharge(sk, len);
  sock_set_flag(sk, SOCK_QUEUE_SHRUNK);
- /* Any change of skb->len requires recalculation of tso
-  * factor and mss.
-  */
+ /* Any change of skb->len requires recalculation of tso factor. */
  if (tcp_skb_pcount(skb) > 1)
-     tcp_set_skb_tso_segs(sk, skb, tcp_current_mss(sk));
+     tcp_set_skb_tso_segs(sk, skb, tcp_skb_mss(skb));
  return 0;
 }

会出现tp->packets_out不正确, 导致sk_write_queue为空时却掉tcp_rearm_rto(),判断tp->packets_out不为0,启动重传定时器,然后重传时取出的是list_head的地址,不是skb的地址,导致后面异常。

如果sk_write_queue异常

  • 注意,以下情况内核都不可能产生,纯属假设

一、连续的SYN/FIN

1
2
|---FIN---|---SYN/FIN---|
    skb       next_skb
  • 内核不可能出现是因为:发送FIN包后就不再发包。所以FIN包只可能在sk_write_queue的最后一个包

假设skb和next_skb发出去后都丢了,那tcp_retransmit_skb会重传skb, 重传的时候会调用tcp_retrans_try_collapse尝试去和下一个包合并。

skb和next_skb合并过程:
先检查一些条件,然后

1
2
3
4
...
skb_copy_from_linear_data(next_skb, skb_put(skb, next_skb_size), next_skb_size);
...
TCP_SKB_CB(skb)->end_seq = TCP_SKB_CB(next_skb)->end_seq;

也就是skb->len += next_skb->len; skb->end_seq = next_skb->end_seq;

假设:

1
2
skb->len = 0;      skb->seq = 10;      skb->end_seq = 10 + FIN = 11;
next_skb->len = 0; next_skb->seq = 11; next_skb->end_seq = 11 + SYN/FIN = 12;

那么合并后:

1
skb->len = 0;      skb->seq = 10;      skb->end_seq = 12;

很明显不正常了,正常情况下:skb->len <= skb->end_seq - skb->seq <= skb->len+1

这时如果来了ack 11,那么会再重传合并后的skb,然后会调用tcp_trim_head(struct ws_st_sock sk, struct sk_buff skb, u32 len),参数len = tp->snd_una - TCP_SKB_CB(skb)->seq = 1,但skb->len = 0;

tcp_trim_head函数中会:

1
skb->len -= len;

这时skb->len = (U32)-1 = 0xFFFFFFFF,skb->len错误后,再调用skb_copy之类的就会访问越界,报BUG。

1
2
3
4
5
 821 struct sk_buff *skb_copy(const struct sk_buff *skb, gfp_t gfp_mask)
 822 {
		......
 835         if (skb_copy_bits(skb, -headerlen, n->head, headerlen + skb->len))
 836                 BUG();

二、write_queue的skb->end_seq > next_skb->seq可能的问题

  • 内核用tp->write_seq控制,保证了write_queue的skb->end_seq == next_skb->seq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
skb:       |------------------|
next_skb:  |---------------------|

假设skb已经发送出去,并被ack了,这时tp->snd_una = skb->end_seq
此时再发送next_skb,并且mss变小了,需要对next_skb分包,分包后如下:

skb:       |------------------|
next_skb:  |-------|-------:-----|
              skb1       skb2

next_skb 被分成了两个包,skb1->len = mss, skb1->gso_segs = 1; skb2->len > mss, skb2->gso_segs = 2;
skb1, skb2发送出去,丢了,然后重传skb1,
此时 skb1->end_seq < tp->snd_una

2092 int tcp_retransmit_skb(struct sock *sk, struct sk_buff *skb)
2093 {
		......
2111         if (before(TCP_SKB_CB(skb)->seq, tp->snd_una)) {
2112                 if (before(TCP_SKB_CB(skb)->end_seq, tp->snd_una))
2113                         BUG();

三、write_queue的skb->end_seq > next_skb->seq可能的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
skb:       |------------------|
next_skb:  |---------------------|

skb, next_skb 发送出去丢了,重传,调用tcp_retrans_try_collapse合并。
合并后:skb->len += next_skb->len; skb->end_seq = next_skb->end_seq;

假设   skb->len = 100;      skb->seq = 0;      skb->end_seq = 100;
      next_skb->len = 120  next_skb->seq = 0; next_skb->end_seq = 120;
合并后 skb->len = 200;      skb->seq = 0;      skb->end_seq = 120;

发送合并后的skb,再丢包,再重传,mss = 150,skb->len > mss, 会分包
      skb->len = 150;      skb->seq = 0;      skb->end_seq = 150;
      next_skb->len = 50;  next_skb->seq = 150; next_skb->end_seq = 120;
也就是出现了next_skb->seq > next_skb->end_seq
(此时如果ack skb也会把next_skb一起清了,因为next_skb->end_seq < skb->end_seq)

这时如果skb再重传分包,分成skb3,skb4
	skb3->len = 130;   skb3->seq = 0;   skb3->end_seq = 130;
	skb4->len = 20;    skb4->seq = 130; skb4->end_seq = 150;

这时ack了skb3,tp->snd_una = 130 (虽然next_skb->end_seq < skb3->end_seq, 但skb4->end_seq > skb3->end_seq, 所以不会把next_skb清掉)
重传skb4,skb5,此时skb5->end_seq < tp->snd_una

2092 int tcp_retransmit_skb(struct sock *sk, struct sk_buff *skb)
2093 {
		......
2111         if (before(TCP_SKB_CB(skb)->seq, tp->snd_una)) {
2112                 if (before(TCP_SKB_CB(skb)->end_seq, tp->snd_una))
2113                         BUG();

四、write_queue的skb->end_seq > next_skb->seq可能的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
skb:       |------------------|
next_skb:  |---------------------|

发送 skb,next_skb
接收到 sack:|---------------------|

调用tcp_sacktag_walk() ---> tcp_shift_skb_data() 将多个被sack的包合并成一个。
合并过程:
	skb->len += next_skb->len; skb->end_seq += next_skb->len;
那么就会合并出一超出原来end_seq的包:
           |----------------------------------------|
然后再ack:  |----------------------|
这时把合并出的包trim掉一部分,剩skb7:  |-----------------|

再发包skb_new:                     |-------|
这时tp->snd_nxt = skb_new->end_seq
再重传skb7, 并分包:                 |----------|------|
分包时skb7->end_seq > tp->snd_nxt, 所以不会调整tp->packets_out,
但ack到来时(tcp_clean_rtx_queue)tp->packets_out却会减去分包后的gso_segs。
导致tp->packets_out < 0, 但sk_write_queue却是空的。
tcp_rearm_rto()判断tp->packets_out不为0,启动重传定时器,然后重传时取出的是list_head的地址,不是skb的地址,导致后面异常。
代码:
 974 int tcp_fragment(struct sock *sk, struct sk_buff *skb, u32 len,
 975                  unsigned int mss_now)
 976 {
	......
1047         if (!before(tp->snd_nxt, TCP_SKB_CB(buff)->end_seq)) {
1048                 int diff = old_factor - tcp_skb_pcount(skb) -
1049                         tcp_skb_pcount(buff);
1050 
1051                 if (diff)
1052                         tcp_adjust_pcount(sk, skb, diff);
1053         }

五(发现好像没错)、write_queue的skb->end_seq > next_skb->seq可能的问题

  • 内核用tp->write_seq控制,保证了write_queue的skb->end_seq == next_skb->seq
1
2
3
4
5
6
7
8
9
10
11
12
skb:       |------------------|
next_skb:  |---------------------|

假设skb已经发送出去,这时tp->snd_nxt = skb->end_seq
发送next_skb时mss变小了,需要对next_skb分包,分包后如下:

skb:       |------------------|
next_skb:  |-------|-------:-----|
              skb1       skb2
next_skb 被分成了两个包,skb1->len = mss, skb1->gso_segs = 1; skb2->len > mss, skb2->gso_segs = 2;

然后将skb1, skb2发送出去, tp->packets_out += 3; 这时假设ack了skb,清掉skb1和skb2的一个mss,。。。没错。。。

中断子系统之(八):softirq

http://www.wowotech.net/linux_kenrel/soft-irq.html

一、前言

对于中断处理而言,linux将其分成了两个部分,一个叫做中断handler(top half),是全程关闭中断的,另外一部分是deferable task(bottom half),属于不那么紧急需要处理的事情。在执行bottom half的时候,是开中断的。有多种bottom half的机制,例如:softirq、tasklet、workqueue或是直接创建一个kernel thread来执行bottom half(这在旧的kernel驱动中常见,现在,一个理智的driver厂商是不会这么做的)。本文主要讨论softirq机制。由于tasklet是基于softirq的,因此本文也会提及tasklet,但主要是从需求层面考虑,不会涉及其具体的代码实现。

在普通的驱动中一般是不会用到softirq,但是由于驱动经常使用的tasklet是基于softirq的,因此,了解softirq机制有助于撰写更优雅的driver。softirq不能动态分配,都是静态定义的。内核已经定义了若干种softirq number,例如网络数据的收发、block设备的数据访问(数据量大,通信带宽高),timer的deferable task(时间方面要求高)。本文的第二章讨论了softirq和tasklet这两种机制有何不同,分别适用于什么样的场景。第三章描述了一些context的概念,这是要理解后续内容的基础。第四章是进入softirq的实现,对比hard irq来解析soft irq的注册、触发,调度的过程。

  • 注:本文中的linux kernel的版本是3.14

二、为何有softirq和tasklet

1、为何有top half和bottom half

中断处理模块是任何OS中最重要的一个模块,对系统的性能会有直接的影响。想像一下:如果在通过U盘进行大量数据拷贝的时候,你按下一个key,需要半秒的时间才显示出来,这个场景是否让你崩溃?因此,对于那些复杂的、需要大量数据处理的硬件中断,我们不能让handler中处理完一切再恢复现场(handler是全程关闭中断的),而是仅仅在handler中处理一部分,具体包括:
(1)有实时性要求的
(2)和硬件相关的。例如ack中断,read HW FIFO to ram等
(3)如果是共享中断,那么获取硬件中断状态以便判断是否是本中断发生

除此之外,其他的内容都是放到bottom half中处理。在把中断处理过程划分成top half和bottom half之后,关中断的top half被瘦身,可以非常快速的执行完毕,大大减少了系统关中断的时间,提高了系统的性能。

我们可以基于下面的系统进一步的进行讨论:

当网卡控制器的FIFO收到的来自以太网的数据的时候(例如半满的时候,可以软件设定),可以将该事件通过irq signal送达Interrupt Controller。Interrupt Controller可以把中断分发给系统中的Processor A or B。

NIC的中断处理过程大概包括:

1
mask and ack interrupt controller--->ack NIC--->copy FIFO to ram--->handle Data in the ram--->unmask interrupt controller

我们先假设Processor A处理了这个网卡中断事件,于是NIC的中断handler在Processor A上欢快的执行,这时候,Processor A的本地中断是disable的。NIC的中断handler在执行的过程中,网络数据仍然源源不断的到来,但是,如果NIC的中断handler不操作NIC的寄存器来ack这个中断的话,NIC是不会触发下一次中断的。还好,我们的NIC interrupt handler总是在最开始就会ack,因此,这不会导致性能问题。ack之后,NIC已经具体再次trigger中断的能力。当Processor A上的handler 在处理接收来自网络的数据的时候,NIC的FIFO很可能又收到新的数据,并trigger了中断,这时候,Interrupt controller还没有umask,因此,即便还有Processor B(也就是说有处理器资源),中断控制器也无法把这个中断送达处理器系统。因此,只能眼睁睁的看着NIC FIFO填满数据,数据溢出,或者向对端发出拥塞信号,无论如何,整体的系统性能是受到严重的影响。

注意:对于新的interrupt controller,可能没有mask和umask操作,但是原理是一样的,只不过NIC的handler执行完毕要发生EOI而已。

要解决上面的问题,最重要的是尽快的执行完中断handler,打开中断,unmask IRQ(或者发送EOI),方法就是把耗时的handle Data in the ram这个步骤踢出handler,让其在bottom half中执行。

2、为何有softirq和tasklet

OK,linux kernel已经把中断处理分成了top half和bottom half,看起来已经不错了,那为何还要提供softirq、tasklet和workqueue这些bottom half机制,linux kernel本来就够复杂了,bottom half还来添乱。实际上,在早期的linux kernel还真是只有一个bottom half机制,简称BH,简单好用,但是性能不佳。后来,linux kernel的开发者开发了task queue机制,试图来替代BH,当然,最后task queue也消失在内核代码中了。现在的linux kernel提供了三种bottom half的机制,来应对不同的需求。

workqueue和softirq、tasklet有本质的区别:workqueue运行在process context,而softirq和tasklet运行在interrupt context。因此,出现workqueue是不奇怪的,在有sleep需求的场景中,defering task必须延迟到kernel thread中执行,也就是说必须使用workqueue机制。softirq和tasklet是怎么回事呢?从本质上将,bottom half机制的设计有两方面的需求,一个是性能,一个是易用性。设计一个通用的bottom half机制来满足这两个需求非常的困难,因此,内核提供了softirq和tasklet两种机制。softirq更倾向于性能,而tasklet更倾向于易用性。

我们还是进入实际的例子吧,还是使用上一节的系统图。在引入softirq之后,网络数据的处理如下:

关中断:

1
mask and ack interrupt controller--->ack NIC--->copy FIFO to ram--->raise softirq--->unmask interrupt controller

开中断:在softirq上下文中进行handle Data in the ram的动作

同样的,我们先假设Processor A处理了这个网卡中断事件,很快的完成了基本的HW操作后,raise softirq。在返回中断现场前,会检查softirq的触发情况,因此,后续网络数据处理的softirq在processor A上执行。在执行过程中,NIC硬件再次触发中断,Interrupt controller将该中断分发给processor B,执行动作和Processor A是类似的,因此,最后,网络数据处理的softirq在processor B上执行。

为了性能,同一类型的softirq有可能在不同的CPU上并发执行,这给使用者带来了极大的痛苦,因为驱动工程师在撰写softirq的回调函数的时候要考虑重入,考虑并发,要引入同步机制。但是,为了性能,我们必须如此。

当网络数据处理的softirq同时在Processor A和B上运行的时候,网卡中断又来了(可能是10G的网卡吧)。这时候,中断分发给processor A,这时候,processor A上的handler仍然会raise softirq,但是并不会调度该softirq。也就是说,softirq在一个CPU上是串行执行的。这种情况下,系统性能瓶颈是CPU资源,需要增加更多的CPU来解决该问题。

如果是tasklet的情况会如何呢?为何tasklet性能不如softirq呢?如果一个tasklet在processor A上被调度执行,那么它永远也不会同时在processor B上执行,也就是说,tasklet是串行执行的(注:不同的tasklet还是会并发的),不需要考虑重入的问题。我们还是用网卡这个例子吧(注意:这个例子仅仅是用来对比,实际上,网络数据是使用softirq机制的),同样是上面的系统结构图。假设使用tasklet,网络数据的处理如下:

关中断:

1
mask and ack interrupt controller--->ack NIC--->copy FIFO to ram--->schedule tasklet--->unmask interrupt controller

开中断:在softirq上下文中(一般使用TASKLET_SOFTIRQ这个softirq)进行handle Data in the ram的动作

同样的,我们先假设Processor A处理了这个网卡中断事件,很快的完成了基本的HW操作后,schedule tasklet(同时也就raise TASKLET_SOFTIRQ softirq)。在返回中断现场前,会检查softirq的触发情况,因此,在TASKLET_SOFTIRQ softirq的handler中,获取tasklet相关信息并在processor A上执行该tasklet的handler。在执行过程中,NIC硬件再次触发中断,Interrupt controller将该中断分发给processor B,执行动作和Processor A是类似的,虽然TASKLET_SOFTIRQ softirq在processor B上可以执行,但是,在检查tasklet的状态的时候,如果发现该tasklet在其他processor上已经正在运行,那么该tasklet不会被处理,一直等到在processor A上的tasklet处理完,在processor B上的这个tasklet才能被执行。这样的串行化操作虽然对驱动工程师是一个福利,但是对性能而言是极大的损伤。

三、理解softirq需要的基础知识(各种context)

1、preempt_count

为了更好的理解下面的内容,我们需要先看看一些基础知识:一个task的thread info数据结构定义如下(只保留和本场景相关的内容):

1
2
3
4
5
struct thread_info { 
	......
	int preempt_count;    /* 0 => preemptable, <0 => bug */
	......
};

preempt_count这个成员被用来判断当前进程是否可以被抢占。如果preempt_count不等于0(可能是代码调用preempt_disable显式的禁止了抢占,也可能是处于中断上下文等),说明当前不能进行抢占,如果preempt_count等于0,说明已经具备了抢占的条件(当然具体是否要抢占当前进程还是要看看thread info中的flag成员是否设定了_TIF_NEED_RESCHED这个标记,可能是当前的进程的时间片用完了,也可能是由于中断唤醒了优先级更高的进程)。 具体preempt_count的数据格式可以参考下图:

preemption count用来记录当前被显式的禁止抢占的次数,也就是说,每调用一次preempt_disable,preemption count就会加一,调用preempt_enable,该区域的数值会减去一。preempt_disable和preempt_enable必须成对出现,可以嵌套,最大嵌套的深度是255。

hardirq count描述当前中断handler嵌套的深度。对于ARM平台的linux kernel,其中断部分的代码如下:

1
2
3
4
5
6
7
8
9
10
void handle_IRQ(unsigned int irq, struct pt_regs *regs)
{
	struct pt_regs *old_regs = set_irq_regs(regs);

	irq_enter(); 
	generic_handle_irq(irq);

	irq_exit();
	set_irq_regs(old_regs);
}

通用的IRQ handler被irq_enter和irq_exit这两个函数包围。irq_enter说明进入到IRQ context,而irq_exit则说明退出IRQ context。在irq_enter函数中会调用preempt_count_add(HARDIRQ_OFFSET),为hardirq count的bit field增加1。在irq_exit函数中,会调用preempt_count_sub(HARDIRQ_OFFSET),为hardirq count的bit field减去1。hardirq count占用了4个bit,说明硬件中断handler最大可以嵌套15层。在旧的内核中,hardirq count占用了12个bit,支持4096个嵌套。当然,在旧的kernel中还区分fast interrupt handler和slow interrupt handler,中断handler最大可以嵌套的次数理论上等于系统IRQ的个数。在实际中,这个数目不可能那么大(内核栈就受不了),因此,即使系统支持了非常大的中断个数,也不可能各个中断依次嵌套,达到理论的上限。基于这样的考虑,后来内核减少了hardirq count占用bit数目,改成了10个bit(在general arch的代码中修改为10,实际上,各个arch可以redefine自己的hardirq count的bit数)。但是,当内核大佬们决定废弃slow interrupt handler的时候,实际上,中断的嵌套已经不会发生了。因此,理论上,hardirq count要么是0,要么是1。不过呢,不能总拿理论说事,实际上,万一有写奇葩或者老古董driver在handler中打开中断,那么这时候中断嵌套还是会发生的,但是,应该不会太多(一个系统中怎么可能有那么多奇葩呢?呵呵),因此,目前hardirq count占用了4个bit,应付15个奇葩driver是妥妥的。

对softirq count进行操作有两个场景:

(1)也是在进入soft irq handler之前给 softirq count加一,退出soft irq handler之后给 softirq count减去一。由于soft irq handler在一个CPU上是不会并发的,总是串行执行,因此,这个场景下只需要一个bit就够了,也就是上图中的bit 8。通过该bit可以知道当前task是否在sofirq context。

(2)由于内核同步的需求,进程上下文需要禁止softirq。这时候,kernel提供了local_bf_enable和local_bf_disable这样的接口函数。这部分的概念是和preempt disable/enable类似的,占用了bit9~15,最大可以支持127次嵌套。

2、一个task的各种上下文

看完了preempt_count之后,我们来介绍各种context:

1
2
3
4
5
#define in_irq()        (hardirq_count())
#define in_softirq()        (softirq_count())
#define in_interrupt()        (irq_count())

#define in_serving_softirq()    (softirq_count() & SOFTIRQ_OFFSET)

这里首先要介绍的是一个叫做IRQ context的术语。这里的IRQ context其实就是hard irq context,也就是说明当前正在执行中断handler(top half),只要preempt_count中的hardirq count大于0(=1是没有中断嵌套,如果大于1,说明有中断嵌套),那么就是IRQ context。

softirq context并没有那么的直接,一般人会认为当sofirq handler正在执行的时候就是softirq context。这样说当然没有错,sofirq handler正在执行的时候,会增加softirq count,当然是softirq context。不过,在其他context的情况下,例如进程上下文中,有有可能因为同步的要求而调用local_bh_disable,这时候,通过local_bh_disable/enable保护起来的代码也是执行在softirq context中。当然,这时候其实并没有正在执行softirq handler。如果你确实想知道当前是否正在执行softirq handler,in_serving_softirq可以完成这个使命,这是通过操作preempt_count的bit 8来完成的。

所谓中断上下文,就是IRQ context + softirq context+NMI context。

四、softirq机制

softirq和hardirq(就是硬件中断啦)是对应的,因此softirq的机制可以参考hardirq对应理解,当然softirq是纯软件的,不需要硬件参与。

1、softirq number

和IRQ number一样,对于软中断,linux kernel也是用一个softirq number唯一标识一个softirq,具体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
enum
{
	HI_SOFTIRQ=0,
	TIMER_SOFTIRQ,
	NET_TX_SOFTIRQ,
	NET_RX_SOFTIRQ,
	BLOCK_SOFTIRQ,
	BLOCK_IOPOLL_SOFTIRQ,
	TASKLET_SOFTIRQ,
	SCHED_SOFTIRQ,
	HRTIMER_SOFTIRQ,
	RCU_SOFTIRQ,    /* Preferable RCU should always be the last softirq */

	NR_SOFTIRQS
};

HI_SOFTIRQ用于高优先级的tasklet,TASKLET_SOFTIRQ用于普通的tasklet。TIMER_SOFTIRQ是for software timer的(所谓software timer就是说该timer是基于系统tick的)。NET_TX_SOFTIRQ和NET_RX_SOFTIRQ是用于网卡数据收发的。BLOCK_SOFTIRQ和BLOCK_IOPOLL_SOFTIRQ是用于block device的。SCHED_SOFTIRQ用于多CPU之间的负载均衡的。HRTIMER_SOFTIRQ用于高精度timer的。RCU_SOFTIRQ是处理RCU的。这些具体使用情景分析会在各自的子系统中分析,本文只是描述softirq的工作原理。

2、softirq描述符

我们前面已经说了,softirq是静态定义的,也就是说系统中有一个定义softirq描述符的数组,而softirq number就是这个数组的index。这个概念和早期的静态分配的中断描述符概念是类似的。具体定义如下:

1
2
3
4
5
6
struct softirq_action
{
	void    (*action)(struct softirq_action *);
};

static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;

系统支持多少个软中断,静态定义的数组就会有多少个entry。____cacheline_aligned保证了在SMP的情况下,softirq_vec是对齐到cache line的。softirq描述符非常简单,只有一个action成员,表示如果触发了该softirq,那么应该调用action回调函数来处理这个soft irq。对于硬件中断而言,其mask、ack等都是和硬件寄存器相关并封装在irq chip函数中,对于softirq,没有硬件寄存器,只有“软件寄存器”,定义如下:

1
2
3
4
5
6
7
8
typedef struct {
	unsigned int __softirq_pending;
#ifdef CONFIG_SMP
	unsigned int ipi_irqs[NR_IPI];
#endif
} ____cacheline_aligned irq_cpustat_t;

irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;

ipi_irqs这个成员用于处理器之间的中断,我们留到下一个专题来描述。__softirq_pending就是这个“软件寄存器”。softirq采用谁触发,谁负责处理的。例如:当一个驱动的硬件中断被分发给了指定的CPU,并且在该中断handler中触发了一个softirq,那么该CPU负责调用该softirq number对应的action callback来处理该软中断。因此,这个“软件寄存器”应该是每个CPU拥有一个(专业术语叫做banked register)。为了性能,irq_stat中的每一个entry被定义对齐到cache line。

3、如何注册一个softirq

通过调用open_softirq接口函数可以注册softirq的action callback函数,具体如下:

1
2
3
4
void open_softirq(int nr, void (*action)(struct softirq_action *))
{
	softirq_vec[nr].action = action;
}

softirq_vec是一个多CPU之间共享的数据,不过,由于所有的注册都是在系统初始化的时候完成的,那时候,系统是串行执行的。此外,softirq是静态定义的,每个entry(或者说每个softirq number)都是固定分配的,因此,不需要保护。

4、如何触发softirq?

在linux kernel中,可以调用raise_softirq这个接口函数来触发本地CPU上的softirq,具体如下:

1
2
3
4
5
6
7
8
void raise_softirq(unsigned int nr)
{
	unsigned long flags;

	local_irq_save(flags);
	raise_softirq_irqoff(nr);
	local_irq_restore(flags);
}

虽然大部分的使用场景都是在中断handler中(也就是说关闭本地CPU中断)来执行softirq的触发动作,但是,这不是全部,在其他的上下文中也可以调用raise_softirq。因此,触发softirq的接口函数有两个版本,一个是raise_softirq,有关中断的保护,另外一个是raise_softirq_irqoff,调用者已经关闭了中断,不需要关中断来保护“soft irq status register”。

所谓trigger softirq,就是在__softirq_pending(也就是上面说的soft irq status register)的某个bit置一。从上面的定义可知,__softirq_pending是per cpu的,因此不需要考虑多个CPU的并发,只要disable本地中断,就可以确保对,__softirq_pending操作的原子性。

具体raise_softirq_irqoff的代码如下:

1
2
3
4
5
6
7
inline void raise_softirq_irqoff(unsigned int nr)
{
	__raise_softirq_irqoff(nr); ---------- (1)

	if (!in_interrupt())
		wakeup_softirqd();      ---------- (2)
}

(1)__raise_softirq_irqoff函数设定本CPU上的__softirq_pending的某个bit等于1,具体的bit是由soft irq number(nr参数)指定的。

(2)如果在中断上下文,我们只要set __softirq_pending的某个bit就OK了,在中断返回的时候自然会进行软中断的处理。但是,如果在context上下文调用这个函数的时候,我们必须要调用wakeup_softirqd函数用来唤醒本CPU上的softirqd这个内核线程。具体softirqd的内容请参考下一个章节。

5、disable/enable softirq

在linux kernel中,可以使用local_irq_disable和local_irq_enable来disable和enable本CPU中断。和硬件中断一样,软中断也可以disable,接口函数是local_bh_disable和local_bh_enable。虽然和想像的local_softirq_enable/disable有些出入,不过bh这个名字更准确反应了该接口函数的意涵,因为local_bh_disable/enable函数就是用来disable/enable bottom half的,这里就包括softirq和tasklet。

先看disable吧,毕竟禁止bottom half比较简单:

1
2
3
4
5
6
7
8
9
10
static inline void local_bh_disable(void)
{
	__local_bh_disable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);
}

static __always_inline void __local_bh_disable_ip(unsigned long ip, unsigned int cnt)
{
	preempt_count_add(cnt);
	barrier();
}

看起来disable bottom half比较简单,就是讲current thread info上的preempt_count成员中的softirq count的bit field9~15加上一就OK了。barrier是优化屏障(Optimization barrier),会在内核同步系列文章中描述。

enable函数比较复杂,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static inline void local_bh_enable(void)
{
	__local_bh_enable_ip(_THIS_IP_, SOFTIRQ_DISABLE_OFFSET);
}

void __local_bh_enable_ip(unsigned long ip, unsigned int cnt)
{
	WARN_ON_ONCE(in_irq() || irqs_disabled()); --------- (1)

	preempt_count_sub(cnt - 1);                --------- (2)

	if (unlikely(!in_interrupt() && local_softirq_pending())) {  ------- (3)
		do_softirq();
	}

	preempt_count_dec();                       --------- (4)
	preempt_check_resched();
}

(1)disable/enable bottom half是一种内核同步机制。在硬件中断的handler(top half)中,不应该调用disable/enable bottom half函数来保护共享数据,因为bottom half其实是不可能抢占top half的。同样的,soft irq也不会抢占另外一个soft irq的执行,也就是说,一旦一个softirq handler被调度执行(无论在哪一个processor上),那么,本地的softirq handler都无法抢占其运行,要等到当前的softirq handler运行完毕后,才能执行下一个soft irq handler。注意:上面我们说的是本地,是local,softirq handler是可以在多个CPU上同时运行的,但是,linux kernel中没有disable all softirq的接口函数(就好像没有disable all CPU interrupt的接口一样,注意体会local_bh_enable/disable中的local的含义)。

说了这么多,一言以蔽之,local_bh_enable/disable是给进程上下文使用的,用于防止softirq handler抢占local_bh_enable/disable之间的临界区的。

irqs_disabled接口函数可以获知当前本地CPU中断是否是disable的,如果返回1,那么当前是disable 本地CPU的中断的。如果irqs_disabled返回1,有可能是下面这样的代码造成的:

1
2
3
4
5
6
7
8
9
local_irq_disable();
......
local_bh_disable();

......

local_bh_enable();
......
local_irq_enable();

本质上,关本地中断是一种比关本地bottom half更强劲的锁,关本地中断实际上是禁止了top half和bottom half抢占当前进程上下文的运行。也许你会说:这也没有什么,就是有些浪费,至少代码逻辑没有问题。但事情没有这么简单,在local_bh_enable--->do_softirq--->__do_softirq中,有一条无条件打开当前中断的操作,也就是说,原本想通过local_irq_disable/local_irq_enable保护的临界区被破坏了,其他的中断handler可以插入执行,从而无法保证local_irq_disable/local_irq_enable保护的临界区的原子性,从而破坏了代码逻辑。

in_irq()这个函数如果不等于0的话,说明local_bh_enable被irq_enter和irq_exit包围,也就是说在中断handler中调用了local_bh_enable/disable。这道理是和上面类似的,这里就不再详细描述了。

(2)在local_bh_disable中我们为preempt_count增加了SOFTIRQ_DISABLE_OFFSET,在local_bh_enable函数中应该减掉同样的数值。这一步,我们首先减去了(SOFTIRQ_DISABLE_OFFSET-1),为何不一次性的减去SOFTIRQ_DISABLE_OFFSET呢?考虑下面运行在进程上下文的代码场景:

1
2
3
4
5
6
7
8
......

local_bh_disable

...需要被保护的临界区...

local_bh_enable
......

在临界区内,有进程context 和softirq共享的数据,因此,在进程上下文中使用local_bh_enable/disable进行保护。假设在临界区代码执行的时候,发生了中断,由于代码并没有阻止top half的抢占,因此中断handler会抢占当前正在执行的thread。在中断handler中,我们raise了softirq,在返回中断现场的时候,由于disable了bottom half,因此虽然触发了softirq,但是不会调度执行。因此,代码返回临界区继续执行,直到local_bh_enable。一旦enable了bottom half,那么之前raise的softirq就需要调度执行了,因此,这也是为什么在local_bh_enable会调用do_softirq函数。

调用do_softirq函数来处理pending的softirq的时候,当前的task是不能被抢占的,因为一旦被抢占,下一次该task被调度运行的时候很可能在其他的CPU上去了(还记得吗?softirq的pending 寄存器是per cpu的)。因此,我们不能一次性的全部减掉,那样的话有可能preempt_count等于0,那样就允许抢占了。因此,这里减去了(SOFTIRQ_DISABLE_OFFSET-1),既保证了softirq count的bit field9~15被减去了1,又保持了preempt disable的状态。

(3)如果当前不是interrupt context的话,并且有pending的softirq,那么调用do_softirq函数来处理软中断。

(4)该来的总会来,在step 2中我们少减了1,这里补上,其实也就是preempt count-1。

(5)在softirq handler中很可能wakeup了高优先级的任务,这里最好要检查一下,看看是否需要进行调度,确保高优先级的任务得以调度执行。

5、如何处理一个被触发的soft irq

我们说softirq是一种defering task的机制,也就是说top half没有做的事情,需要延迟到bottom half中来执行。那么具体延迟到什么时候呢?这是本节需要讲述的内容,也就是说soft irq是如何调度执行的。

在上一节已经描述一个softirq被调度执行的场景,本节主要关注在中断返回现场时候调度softirq的场景。我们来看中断退出的代码,具体如下:

1
2
3
4
5
6
7
8
void irq_exit(void)
{
	......
	if (!in_interrupt() && local_softirq_pending())
		invoke_softirq();

	......
}

代码中“!in_interrupt()”这个条件可以确保下面的场景不会触发sotfirq的调度:

(1)中断handler是嵌套的。也就是说本次irq_exit是退出到上一个中断handler。当然,在新的内核中,这种情况一般不会发生,因为中断handler都是关中断执行的。

(2)本次中断是中断了softirq handler的执行。也就是说本次irq_exit是不是退出到进程上下文,而是退出到上一个softirq context。这一点也保证了在一个CPU上的softirq是串行执行的(注意:多个CPU上还是有可能并发的)

我们继续看invoke_softirq的代码:

1
2
3
4
5
6
7
8
9
10
11
12
static inline void invoke_softirq(void)
{
	if (!force_irqthreads) {
#ifdef CONFIG_HAVE_IRQ_EXIT_ON_IRQ_STACK
		__do_softirq();
#else
		do_softirq_own_stack();
#endif
	} else {
		wakeup_softirqd();
	}
}

force_irqthreads是和强制线程化相关的,主要用于interrupt handler的调试(一般而言,在线程环境下比在中断上下文中更容易收集调试数据)。如果系统选择了对所有的interrupt handler进行线程化处理,那么softirq也没有理由在中断上下文中处理(中断handler都在线程中执行了,softirq怎么可能在中断上下文中执行)。本身invoke_softirq这个函数是在中断上下文中被调用的,如果强制线程化,那么系统中所有的软中断都在sofirq的daemon进程中被调度执行。

如果没有强制线程化,softirq的处理也分成两种情况,主要是和softirq执行的时候使用的stack相关。如果arch支持单独的IRQ STACK,这时候,由于要退出中断,因此irq stack已经接近全空了(不考虑中断栈嵌套的情况,因此新内核下,中断不会嵌套),因此直接调用__do_softirq()处理软中断就OK了,否则就调用do_softirq_own_stack函数在softirq自己的stack上执行。当然对ARM而言,softirq的处理就是在当前的内核栈上执行的,因此do_softirq_own_stack的调用就是调用__do_softirq(),代码如下(删除了部分无关代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
asmlinkage void __do_softirq(void)
{
......
	pending = local_softirq_pending();  ----------- 获取softirq pending的状态
	__local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET); ---- 标识下面的代码是正在处理softirq
	cpu = smp_processor_id();
restart:
	set_softirq_pending(0);  ------------- 清除pending标志
	local_irq_enable();      ------------- 打开中断,softirq handler是开中断执行的
	h = softirq_vec;         ------------- 获取软中断描述符指针

	while ((softirq_bit = ffs(pending))) { --------- 寻找pending中第一个被设定为1的bit
		unsigned int vec_nr;
		int prev_count;

		h += softirq_bit - 1; ----------- 指向pending的那个软中断描述符
		vec_nr = h - softirq_vec; ------- 获取soft irq number
		h->action(h);         ----------- 指向softirq handler
		h++;
		pending >>= softirq_bit;
	}

	local_irq_disable();      ----------- 打开中断

	pending = local_softirq_pending(); ------ (注1)
	if (pending) {
		if (time_before(jiffies, end) && !need_resched() &&
			--max_restart)
			goto restart;

		wakeup_softirqd();
	}
	__local_bh_enable(SOFTIRQ_OFFSET); ----------- 标识softirq处理完毕
}

(注1)再次检查softirq pending,有可能上面的softirq handler在执行过程中,发生了中断,又raise了softirq。如果的确如此,那么我们需要跳转到restart那里重新处理soft irq。当然,也不能总是在这里不断的loop,因此linux kernel设定了下面的条件:

(1)softirq的处理时间没有超过2个ms

(2)上次的softirq中没有设定TIF_NEED_RESCHED,也就是说没有有高优先级任务需要调度

(3)loop的次数小于 10次

因此,只有同时满足上面三个条件,程序才会跳转到restart那里重新处理soft irq。否则wakeup_softirqd就OK了。这样的设计也是一个平衡的方案。一方面照顾了调度延迟:本来,发生一个中断,系统期望在限定的时间内调度某个进程来处理这个中断,如果softirq handler不断触发,其实linux kernel是无法保证调度延迟时间的。另外一方面,也照顾了硬件的thoughput:已经预留了一定的时间来处理softirq。

内核源码分析之linux内核栈

http://www.cnblogs.com/liangning/p/3879177.html

基于3.16-rc4

在3.16-rc4内核源码中,内核给每个进程分配的内核栈大小为8KB。这个内核栈被称为异常栈,在进程的内核空间运行时或者执行异常处理程序时,使用的都是异常栈,看下异常栈的代码(include/linux/sched.h):

1
2
3
4
union thread_union {
	struct thread_info thread_info;
	unsigned long stack[THREAD_SIZE/sizeof(long)];
};

THREAD_SIZE值为8KB,因此内核为进程的异常栈(内核栈)分配了两个页框大小(页框大小4KB)。另外,进程的thread_info结构体保存在栈顶部。

此外,内核为每个cpu分配一个硬中断栈和一个软中断栈(这两个栈也是内核栈),用来执行中断服务例程和下半部(软中断),看看代码(arch/x86/kernel/irq_32.c)。这两个栈属于cpu,不属于进程,这和异常栈是有区别的。

1
2
DEFINE_PER_CPU(struct irq_stack *, hardirq_stack);
DEFINE_PER_CPU(struct irq_stack *, softirq_stack);

定义了两个数组hardirq_stack和softirq_stack,每个数组元素对应一个cpu,指向了该cpu的硬中断栈或者软中断栈。再来看下struct irq_stack结构体(arch/x86/include/asm/processor.h):

1
2
3
struct irq_stack {
	u32                     stack[THREAD_SIZE/sizeof(u32)];
} __aligned(THREAD_SIZE);

可见,硬中断栈和软中断栈的大小均为8KB。

内核在执行中断处理程序时,在do_IRQ函数中会调用handle_irq函数,在handle_irq函数中要进行堆栈切换,代码如下(arch/x86/kernel/irq_32.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bool handle_irq(unsigned irq, struct pt_regs *regs)
{
	struct irq_desc *desc;
	int overflow;

	overflow = check_stack_overflow();

	desc = irq_to_desc(irq);
	if (unlikely(!desc))
	return false;

	if (user_mode_vm(regs) || !execute_on_irq_stack(overflow, desc, irq)) {
		if (unlikely(overflow))
			print_stack_overflow();
		desc->handle_irq(irq, desc);
	}

	return true;
}

第12行中执行execute_on_irq_stack函数来判断是否需要堆栈切换,如果不需要,则执行if体的中断服务例程,即在当前堆栈中执行中断服务例程,如果需要切换堆栈,则在execute_on_irq_stack函数中切换堆栈并在该函数中(新堆栈中)执行中断服务例程。下面看下execute_on_irq_stack代码(arch/x86/kernel/irq_32.c):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
static inline int
execute_on_irq_stack(int overflow, struct irq_desc *desc, int irq)
{
	struct irq_stack *curstk, *irqstk;
	u32 *isp, *prev_esp, arg1, arg2;

	curstk = (struct irq_stack *) current_stack();
	irqstk = __this_cpu_read(hardirq_stack);

	/*
	 * this is where we switch to the IRQ stack. However, if we are
	 * already using the IRQ stack (because we interrupted a hardirq
	 * handler) we can't do that and just have to keep using the
	 * current stack (which is the irq stack already after all)
	 */
	if (unlikely(curstk == irqstk))
	    return 0;

	isp = (u32 *) ((char *)irqstk + sizeof(*irqstk));

	/* Save the next esp at the bottom of the stack */
	prev_esp = (u32 *)irqstk;
	*prev_esp = current_stack_pointer;

	if (unlikely(overflow))
	    call_on_stack(print_stack_overflow, isp);

	asm volatile("xchgl    %%ebx,%%esp    \n"
	         "call    *%%edi        \n"
	         "movl    %%ebx,%%esp    \n"
	         : "=a" (arg1), "=d" (arg2), "=b" (isp)
	         :  "0" (irq),   "1" (desc),  "2" (isp),
	        "D" (desc->handle_irq)
	         : "memory", "cc", "ecx");
	return 1;
}

第7行获取当前堆栈的指针,第8行获取本地cpu的硬中断栈指针,第16行对二者进行比较,如果相等,则不需要切换堆栈(说明当前堆栈就是硬中断栈,也说明是在中断处理程序中时又发生了中断)。如果不相等,就要进行堆栈切换,第22-23行将当前堆栈指针保存在将要切换到的堆栈中(用于返回)。第28行,交换ebx和esp寄存器的值(实现了堆栈切换,将中断栈指针给了esp),第29行跳转到相应的中断服务例程,第30行从中断服务例程返回后,又将原来的堆栈指针赋给esp,切换到原先堆栈。第33行将中断服务例程函数名存放在%edi中。