http://simohayha.iteye.com/blog/720850
Receive packet steering简称rps,是google贡献给linux kernel的一个patch,主要的功能是解决多核情况下,网络协议栈的软中断的负载均衡。这里的负载均衡也就是指能够将软中断均衡的放在不同的cpu核心上运行。
简介在这里:
http://lwn.net/Articles/362339/
linux现在网卡的驱动支持两种模式,一种是NAPI,一种是非NAPI模式,这两种模式的区别,我前面的blog都有介绍,这里就再次简要的介绍下。
在NAPI中,中断收到数据包后调用__napi_schedule调度软中断,然后软中断处理函数中会调用注册的poll回掉函数中调用netif_receive_skb将数据包发送到3层,没有进行任何的软中断负载均衡。
在非NAPI中,中断收到数据包后调用netif_rx,这个函数会将数据包保存到input_pkt_queue,然后调度软中断,这里为了兼容NAPI的驱动,他的poll方法默认是process_backlog,最终这个函数会从input_pkt_queue中取得数据包然后发送到3层。
通过比较我们可以看到,不管是NAPI还是非NAPI的话都无法做到软中断的负载均衡,因为软中断此时都是运行在在硬件中断相应的cpu上。也就是说如果始终是cpu0相应网卡的硬件中断,那么始终都是cpu0在处理软中断,而此时cpu1就被浪费了,因为无法并行的执行多个软中断。
google的这个patch的基本原理是这样的,根据数据包的源地址,目的地址以及目的和源端口(这里它是将两个端口组合成一个4字节的无符数进行计算的,后面会看到)计算出一个hash值,然后根据这个hash值来选择软中断运行的cpu,从上层来看,也就是说将每个连接和cpu绑定,并通过这个hash值,来均衡软中断在多个cpu上。
这个介绍比较简单,我们来看代码是如何实现的。
它这里主要是hook了两个内核的函数,一个是netif_rx主要是针对非NAPI的驱动,一个是netif_receive_skb这个主要是针对NAPI的驱动,这两个函数我前面blog都有介绍过,想了解可以看我前面的blog,现在这里我只介绍打过patch的实现。
在看netif_rx和netif_receive_skb之前,我们先来看这个patch中两个重要的函数get_rps_cpu和enqueue_to_backlog,我们一个个看。
先来看相关的两个数据结构,首先是netdev_rx_queue,它表示对应的接收队列,因为有的网卡可能硬件上就支持多队列的模式,此时对应就会有多个rx队列,这个结构是挂载在net_device中的,也就是每个网络设备最终都会有一个或者多个rx队列。这个结构在sys文件系统中的表示类似这样的/sys/class/net//queues/rx- 几个队列就是rx-n.
1
2
3
4
5
6
7
8
9
10
struct netdev_rx_queue {
// 保存了当前队列的rps map
struct rps_map *rps_map;
// 对应的kobject
struct kobject kobj;
// 指向第一个rx队列
struct netdev_rx_queue *first;
// 引用计数
atomic_t count;
} ____cacheline_aligned_in_smp;
然后就是rps_map,其实这个也就是保存了能够执行数据包的cpu。
1
2
3
4
5
6
7
8
struct rps_map {
// cpu的个数,也就是cpus数组的个数
unsigned int len;
// RCU锁
struct rcu_head rcu;
// 保存了cpu的id.
u16 cpus[0];
};
看完上面的结构,我们来看函数的实现。
get_rps_cpu主要是通过传递进来的skb然后来选择这个skb所应该被处理的cpu。它的逻辑很简单,就是通过skb计算hash,然后通过hash从对应的队列的rps_mapping中取得对应的cpu id。
这里有个要注意的就是这个hash值是可以交给硬件网卡去计算的,作者自己说是最好交由硬件去计算这个hash值,因为如果是软件计算的话会导致CPU 缓存不命中,带来一定的性能开销。
还有就是rps_mapping这个值是可以通过sys 文件系统设置的,位置在这里:
/sys/class/net//queues/rx-/rps_cpus 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb)
{
struct ipv6hdr *ip6;
struct iphdr *ip;
struct netdev_rx_queue *rxqueue;
struct rps_map *map;
int cpu = -1;
u8 ip_proto;
u32 addr1, addr2, ports, ihl;
// rcu锁
rcu_read_lock();
// 取得设备对应的rx 队列
if (skb_rx_queue_recorded(skb)) {
..........................................
rxqueue = dev->_rx + index;
} else
rxqueue = dev->_rx;
if (!rxqueue->rps_map)
goto done;
// 如果硬件已经计算,则跳过计算过程
if (skb->rxhash)
goto got_hash; /* Skip hash computation on packet header */
switch (skb->protocol) {
case __constant_htons(ETH_P_IP):
if (!pskb_may_pull(skb, sizeof(*ip)))
goto done;
// 得到计算hash的几个值
ip = (struct iphdr *) skb->data;
ip_proto = ip->protocol;
// 两个地址
addr1 = ip->saddr;
addr2 = ip->daddr;
// 得到ip头
ihl = ip->ihl;
break;
case __constant_htons(ETH_P_IPV6):
..........................................
break;
default:
goto done;
}
ports = 0;
switch (ip_proto) {
case IPPROTO_TCP:
case IPPROTO_UDP:
case IPPROTO_DCCP:
case IPPROTO_ESP:
case IPPROTO_AH:
case IPPROTO_SCTP:
case IPPROTO_UDPLITE:
if (pskb_may_pull(skb, (ihl * 4) + 4))
// 我们知道tcp头的前4个字节就是源和目的端口,因此这里跳过ip头得到tcp头的前4个字节
ports = *((u32 *) (skb->data + (ihl * 4)));
break;
default:
break;
}
// 计算hash
skb->rxhash = jhash_3words(addr1, addr2, ports, hashrnd);
if (!skb->rxhash)
skb->rxhash = 1;
got_hash:
// 通过rcu得到对应rps map
map = rcu_dereference(rxqueue->rps_map);
if (map) {
// 取得对应的cpu
u16 tcpu = map->cpus[((u64) skb->rxhash * map->len) >> 32];
// 如果cpu是online的,则返回计算出的这个cpu,否则跳出循环。
if (cpu_online(tcpu)) {
cpu = tcpu;
goto done;
}
}
done:
rcu_read_unlock();
// 如果上面失败,则返回-1.
return cpu;
}
然后是enqueue_to_backlog这个方法,首先我们知道在每个cpu都有一个softnet结构,而他有一个input_pkt_queue的队列,以前这个主要是用于非NAPi的驱动的,而这个patch则将这个队列也用与NAPI的处理中了。也就是每个cpu现在都会有一个input_pkt_queue队列,用于保存需要处理的数据包队列。这个队列作用现在是,如果发现不属于当前cpu处理的数据包,则我们可以直接将数据包挂载到他所属的cpu的input_pkt_queue中。
enqueue_to_backlog接受一个skb和cpu为参数,通过cpu来判断skb如何处理。要么加入所属的input_pkt_queue中,要么schecule 软中断。
还有个要注意就是我们知道NAPI为了兼容非NAPI模式,有个backlog的napi_struct结构,也就是非NAPI驱动会schedule backlog这个napi结构,而在enqueue_to_backlog中则是利用了这个结构,也就是它会schedule backlog,因为它会将数据放到input_pkt_queue中,而backlog的pool方法process_backlog就是从input_pkt_queue中取得数据然后交给上层处理。
这里还有一个会用到结构就是 rps_remote_softirq_cpus,它主要是保存了当前cpu上需要去另外的cpu schedule 软中断的cpu 掩码。因为我们可能将要处理的数据包放到了另外的cpu的input queue上,因此我们需要schedule 另外的cpu上的napi(也就是软中断),所以我们需要保存对应的cpu掩码,以便于后面遍历,然后schedule。
而这里为什么mask有两个元素,注释写的很清楚:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
* This structure holds the per-CPU mask of CPUs for which IPIs are scheduled
* to be sent to kick remote softirq processing. There are two masks since
* the sending of IPIs must be done with interrupts enabled. The select field
* indicates the current mask that enqueue_backlog uses to schedule IPIs.
* select is flipped before net_rps_action is called while still under lock,
* net_rps_action then uses the non-selected mask to send the IPIs and clears
* it without conflicting with enqueue_backlog operation.
*/
struct rps_remote_softirq_cpus {
// 对应的cpu掩码
cpumask_t mask[2];
// 表示应该使用的数组索引
int select;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static int enqueue_to_backlog(struct sk_buff *skb, int cpu)
{
struct softnet_data *queue;
unsigned long flags;
// 取出传递进来的cpu的softnet-data结构
queue = &per_cpu(softnet_data, cpu);
local_irq_save(flags);
__get_cpu_var(netdev_rx_stat).total++;
// 自旋锁
spin_lock(&queue->input_pkt_queue.lock);
// 如果保存的队列还没到上限
if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
// 如果当前队列的输入队列长度不为空
if (queue->input_pkt_queue.qlen) {
enqueue:
// 将数据包加入到input_pkt_queue中,这里会有一个小问题,我们后面再说。
__skb_queue_tail(&queue->input_pkt_queue, skb);
spin_unlock_irqrestore(&queue->input_pkt_queue.lock,
flags);
return NET_RX_SUCCESS;
}
/* Schedule NAPI for backlog device */
// 如果可以调度软中断
if (napi_schedule_prep(&queue->backlog)) {
// 首先判断数据包该不该当前的cpu处理
if (cpu != smp_processor_id()) {
// 如果不该,
struct rps_remote_softirq_cpus *rcpus =
&__get_cpu_var(rps_remote_softirq_cpus);
cpu_set(cpu, rcpus->mask[rcpus->select]);
__raise_softirq_irqoff(NET_RX_SOFTIRQ);
} else
// 如果就是应该当前cpu处理,则直接schedule 软中断,这里可以看到传递进去的是backlog
__napi_schedule(&queue->backlog);
}
goto enqueue;
}
spin_unlock(&queue->input_pkt_queue.lock);
__get_cpu_var(netdev_rx_stat).dropped++;
local_irq_restore(flags);
kfree_skb(skb);
return NET_RX_DROP;
}
这里会有一个小问题,那就是假设此时一个属于cpu0的包进入处理,此时我们运行在cpu1,此时将数据包加入到input队列,然后cpu0上面刚好又来了一个cpu0需要处理的数据包,此时由于qlen不为0则又将数据包加入到input队列中,我们会发现cpu0上的napi没机会进行调度了。
google的patch对这个是这样处理的,在软中断处理函数中当数据包处理完毕,会调用net_rps_action来调度前面保存到其他cpu上的input队列。
下面就是代码片断(net_rx_action)
1
2
3
4
5
6
7
8
9
10
// 得到对应的rcpus.
rcpus = &__get_cpu_var(rps_remote_softirq_cpus);
select = rcpus->select;
// 翻转select,防止和enqueue_backlog冲突
rcpus->select ^= 1;
// 打开中断,此时下面的调度才会起作用.
local_irq_enable();
// 这个函数里面调度对应的远程cpu的napi.
net_rps_action(&rcpus->mask[select]);
然后就是net_rps_action,这个函数很简单,就是遍历所需要处理的cpu,然后调度napi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void net_rps_action(cpumask_t *mask)
{
int cpu;
/* Send pending IPI's to kick RPS processing on remote cpus. */
// 遍历
for_each_cpu_mask_nr(cpu, *mask) {
struct softnet_data *queue = &per_cpu(softnet_data, cpu);
if (cpu_online(cpu))
// 到对应的cpu调用csd方法。
__smp_call_function_single(cpu, &queue->csd, 0);
}
// 清理mask
cpus_clear(*mask);
}
上面我们看到会调用csd方法,而上面的csd回掉就是被初始化为trigger_softirq函数。
1
2
3
4
5
6
7
static void trigger_softirq(void *data)
{
struct softnet_data *queue = data;
// 调度napi可以看到依旧是backlog 这个napi结构体。
__napi_schedule(&queue->backlog);
__get_cpu_var(netdev_rx_stat).received_rps++;
}
上面的函数都分析完毕了,剩下的就很简单了。
首先来看netif_rx如何被修改的,它被修改的很简单,首先是得到当前skb所应该被处理的cpu id,然后再通过比较这个cpu和当前正在处理的cpu id进行比较来做不同的处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int netif_rx(struct sk_buff *skb)
{
int cpu;
/* if netpoll wants it, pretend we never saw it */
if (netpoll_rx(skb))
return NET_RX_DROP;
if (!skb->tstamp.tv64)
net_timestamp(skb);
// 得到cpu id。
cpu = get_rps_cpu(skb->dev, skb);
if (cpu < 0)
cpu = smp_processor_id();
// 通过cpu进行队列不同的处理
return enqueue_to_backlog(skb, cpu);
}
然后是netif_receive_skb,这里patch将内核本身的这个函数改写为__netif_receive_skb。然后当返回值小于0,则说明不需要对队列进行处理,此时直接发送到3层。
1
2
3
4
5
6
7
8
9
10
11
int netif_receive_skb(struct sk_buff *skb)
{
int cpu;
cpu = get_rps_cpu(skb->dev, skb);
if (cpu < 0)
return __netif_receive_skb(skb);
else
return enqueue_to_backlog(skb, cpu);
}
最后来总结一下,可以看到input_pkt_queue是一个FIFO的队列,而且如果当qlen有值的时候,也就是在另外的cpu有数据包放到input_pkt_queue中,则当前cpu不会调度napi,而是将数据包放到input_pkt_queue中,然后等待trigger_softirq来调度napi。
因此这个patch完美的解决了软中断在多核下的均衡问题,并且没有由于是同一个连接会map到相同的cpu,并且input_pkt_queue的使用,因此乱序的问题也不会出现。