static bool tcp_small_queue_check(struct sock *sk, const struct sk_buff *skb,
unsigned int factor)
{
unsigned int limit;
limit = max(2 * skb->truesize, sk->sk_pacing_rate >> 10); //每毫秒的速率,或者两个当前包的大小
limit = min_t(u32, limit, sysctl_tcp_limit_output_bytes); //默认最大256K
limit <<= factor; //在重传的话*2
if (atomic_read(&sk->sk_wmem_alloc) > limit) { //qdisc中的数据超过限制
set_bit(TSQ_THROTTLED, &tcp_sk(sk)->tsq_flags); //设置标记,标记当前sock没通过tsq检测
/* It is possible TX completion already happened
* before we set TSQ_THROTTLED, so we must
* test again the condition.
*/
smp_mb__after_atomic();
if (atomic_read(&sk->sk_wmem_alloc) > limit)
return true;
}
return false;
}
static int tcp_transmit_skb(struct sock *sk, struct sk_buff *skb, int clone_it,
gfp_t gfp_mask)
{
...
skb->destructor = skb_is_tcp_pure_ack(skb) ? __sock_wfree : tcp_wfree;
...
}
/*
* Write buffer destructor automatically called from kfree_skb.
* We can't xmit new skbs from this context, as we might already
* hold qdisc lock.
*/
void tcp_wfree(struct sk_buff *skb)
{
struct sock *sk = skb->sk;
struct tcp_sock *tp = tcp_sk(sk);
int wmem;
/* Keep one reference on sk_wmem_alloc.
* Will be released by sk_free() from here or tcp_tasklet_func()
*/
wmem = atomic_sub_return(skb->truesize - 1, &sk->sk_wmem_alloc);
/* If this softirq is serviced by ksoftirqd, we are likely under stress.
* Wait until our queues (qdisc + devices) are drained.
* This gives :
* - less callbacks to tcp_write_xmit(), reducing stress (batches)
* - chance for incoming ACK (processed by another cpu maybe)
* to migrate this flow (skb->ooo_okay will be eventually set)
*/
if (wmem >= SKB_TRUESIZE(1) && this_cpu_ksoftirqd() == current)
goto out;
if (test_and_clear_bit(TSQ_THROTTLED, &tp->tsq_flags) && //判断并清除成功, 避免重复插入队列
!test_and_set_bit(TSQ_QUEUED, &tp->tsq_flags)) {// 设置TSQ_QUEUED
unsigned long flags;
struct tsq_tasklet *tsq;
/* queue this socket to tasklet queue */
local_irq_save(flags);
tsq = this_cpu_ptr(&tsq_tasklet);
list_add(&tp->tsq_node, &tsq->head); //添加sock到percpu列表
tasklet_schedule(&tsq->tasklet); //等待在softirq中被调度
local_irq_restore(flags);
return;
}
out:
sk_free(sk);
}
/* TCP SMALL QUEUES (TSQ)
*
* TSQ goal is to keep small amount of skbs per tcp flow in tx queues (qdisc+dev)
* to reduce RTT and bufferbloat.
* We do this using a special skb destructor (tcp_wfree).
*
* Its important tcp_wfree() can be replaced by sock_wfree() in the event skb
* needs to be reallocated in a driver.
* The invariant being skb->truesize subtracted from sk->sk_wmem_alloc
*
* Since transmit from skb destructor is forbidden, we use a tasklet
* to process all sockets that eventually need to send more skbs.
* We use one tasklet per cpu, with its own queue of sockets.
*/
struct tsq_tasklet {
struct tasklet_struct tasklet;
struct list_head head; /* queue of tcp sockets */
};
static DEFINE_PER_CPU(struct tsq_tasklet, tsq_tasklet);
void __init tcp_tasklet_init(void)
{
int i;
for_each_possible_cpu(i) {
struct tsq_tasklet *tsq = &per_cpu(tsq_tasklet, i);
INIT_LIST_HEAD(&tsq->head);
tasklet_init(&tsq->tasklet,
tcp_tasklet_func,
(unsigned long)tsq);
}
}
void __init tcp_init(void)
{
...
tcp_tasklet_init();
}
static void tcp_push(struct sock *sk, int flags, int mss_now,
int nonagle, int size_goal)
{
...
if (tcp_should_autocork(sk, skb, size_goal)) {
//不发了,设置tsq标记后返回
/* avoid atomic op if TSQ_THROTTLED bit is already set */
if (!test_bit(TSQ_THROTTLED, &tp->tsq_flags)) {
NET_INC_STATS(sock_net(sk), LINUX_MIB_TCPAUTOCORKING);
set_bit(TSQ_THROTTLED, &tp->tsq_flags);
}
/* It is possible TX completion already happened
* before we set TSQ_THROTTLED.
*/
if (atomic_read(&sk->sk_wmem_alloc) > skb->truesize)
return;
}
if (flags & MSG_MORE) //应用程序标记了很快有新的数据到来,则标记cork,不发送小包
nonagle = TCP_NAGLE_CORK;
__tcp_push_pending_frames(sk, mss_now, nonagle); //最终调用tcp_write_xmit
}