kk Blog —— 通用基础


date [-d @int|str] [+%s|"+%F %T"]
netstat -ltunp

Linux时间子系统之八:动态时钟框架(CONFIG_NO_HZ、tickless)

http://blog.csdn.net/DroidPhone/article/details/8112948

在前面章节的讨论中,我们一直基于一个假设:Linux中的时钟事件都是由一个周期时钟提供,不管系统中的clock_event_device是工作于周期触发模式,还是工作于单触发模式,也不管定时器系统是工作于低分辨率模式,还是高精度模式,内核都竭尽所能,用不同的方式提供周期时钟,以产生定期的tick事件,tick事件或者用于全局的时间管理(jiffies和时间的更新),或者用于本地cpu的进程统计、时间轮定时器框架等等。周期性时钟虽然简单有效,但是也带来了一些缺点,尤其在系统的功耗上,因为就算系统目前无事可做,也必须定期地发出时钟事件,激活系统。为此,内核的开发者提出了动态时钟这一概念,我们可以通过内核的配置项CONFIG_NO_HZ来激活特性。有时候这一特性也被叫做tickless,不过还是把它称呼为动态时钟比较合适,因为并不是真的没有tick事件了,只是在系统无事所做的idle阶段,我们可以通过停止周期时钟来达到降低系统功耗的目的,只要有进程处于活动状态,时钟事件依然会被周期性地发出。

在动态时钟正确工作之前,系统需要切换至动态时钟模式,而要切换至动态时钟模式,需要一些前提条件,最主要的一条就是cpu的时钟事件设备必须要支持单触发模式,当条件满足时,系统切换至动态时钟模式,接着,由idle进程决定是否可以停止周期时钟,退出idle进程时则需要恢复周期时钟。

1. 数据结构

在上一章的内容里,我们曾经提到,切换到高精度模式后,高精度定时器系统需要使用一个高精度定时器来模拟传统的周期时钟,其中利用了tick_sched结构中的一些字段,事实上,tick_sched结构也是实现动态时钟的一个重要的数据结构,在smp系统中,内核会为每个cpu都定义一个tick_sched结构,这通过一个percpu全局变量tick_cpu_sched来实现,它在kernel/time/tick-sched.c中定义:

1
2
3
4
/* 
 * Per cpu nohz control structure 
 */
static DEFINE_PER_CPU(struct tick_sched, tick_cpu_sched);

tick_sched结构在include/linux/tick.h中定义,我们看看tick_sched结构的详细定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct tick_sched {
	struct hrtimer          sched_timer;
	unsigned long           check_clocks;
	enum tick_nohz_mode     nohz_mode;
	ktime_t             idle_tick;
	int             inidle;
	int             tick_stopped;
	unsigned long           idle_jiffies;
	unsigned long           idle_calls;
	unsigned long           idle_sleeps;
	int             idle_active;
	ktime_t             idle_entrytime;
	ktime_t             idle_waketime;
	ktime_t             idle_exittime;
	ktime_t             idle_sleeptime;
	ktime_t             iowait_sleeptime;
	ktime_t             sleep_length;
	unsigned long           last_jiffies;
	unsigned long           next_jiffies;
	ktime_t             idle_expires;
	int             do_timer_last;
};

sched_timer 该字段用于在高精度模式下,模拟周期时钟的一个hrtimer,请参看Linux时间子系统之六:高精度定时器(HRTIMER)的原理和实现。

check_clocks 该字段用于实现clock_event_device和clocksource的异步通知机制,帮助系统切换至高精度模式或者是动态时钟模式。

nohz_mode 保存动态时钟的工作模式,基于低分辨率和高精度模式下,动态时钟的实现稍有不同,根据模式它可以是以下的值:

NOHZ_MODE_INACTIVE  系统动态时钟尚未激活
NOHZ_MODE_LOWRES  系统工作于低分辨率模式下的动态时钟
NOHZ_MODE_HIGHRES  系统工作于高精度模式下的动态时钟

idle_tick 该字段用于保存停止周期时钟是的内核时间,当退出idle时要恢复周期时钟,需要使用该时间,以保持系统中时间线(jiffies)的正确性。

tick_stopped 该字段用于表明idle状态的周期时钟已经停止。

idle_jiffies 系统进入idle时的jiffies值,用于信息统计。

idle_calls 系统进入idle的统计次数。

idle_sleeps 系统进入idle且成功停掉周期时钟的次数。

idle_active 表明目前系统是否处于idle状态中。

idle_entrytime 系统进入idle的时刻。

idle_waketime idle状态被打断的时刻。

idle_exittime 系统退出idle的时刻。

idle_sleeptime 累计各次idle中停止周期时钟的总时间。

sleep_length 本次idle中停止周期时钟的时间。

last_jiffies 系统中最后一次周期时钟的jiffies值。

next_jiffies 预计下一次周期时钟的jiffies。

idle_expires 进入idle后,下一个最先到期的定时器时刻。

我们知道,根据系统目前的工作模式,系统提供周期时钟(tick)的方式会有所不同,当处于低分辨率模式时,由cpu的tick_device提供周期时钟,而当处于高精度模式时,是由一个高精度定时器来提供周期时钟,下面我们分别讨论一下在两种模式下的动态时钟实现方式。

2. 低分辨率下的动态时钟

回看之前一篇文章:Linux时间子系统之四:定时器的引擎:clock_event_device中的关于tick_device一节,不管tick_device的工作模式(周期触发或者是单次触发),tick_device所关联的clock_event_device的事件回调处理函数都是:tick_handle_periodic,不管当前是否处于idle状态,他都会精确地按HZ数来提供周期性的tick事件,这不符合动态时钟的要求,所以,要使动态时钟发挥作用,系统首先要切换至支持动态时钟的工作模式:NOHZ_MODE_LOWRES 。

2.1 切换至动态时钟模式

动态时钟模式的切换过程的前半部分和切换至高精度定时器模式所经过的路径是一样的,请参考:Linux时间子系统之六:高精度定时器(HRTIMER)的原理和实现。这里再简单描述一下过程:系统工作于周期时钟模式,定期地发出tick事件中断,tick事件中断触发定时器软中断:TIMER_SOFTIRQ,执行软中断处理函数run_timer_softirq,run_timer_softirq调用hrtimer_run_pending函数:

1
2
3
4
5
6
7
8
void hrtimer_run_pending(void)
{
	if (hrtimer_hres_active())
		return;
		......
	if (tick_check_oneshot_change(!hrtimer_is_hres_enabled()))
		hrtimer_switch_to_hres();
}

tick_check_oneshot_change函数的参数决定了现在是要切换至低分辨率动态时钟模式,还是高精度定时器模式,我们现在假设系统不支持高精度定时器模式,hrtimer_is_hres_enabled会直接返回false,对应的tick_check_oneshot_change函数的参数则是true,表明需要切换至动态时钟模式。tick_check_oneshot_change在检查过timekeeper和clock_event_device都具备动态时钟的条件后,通过tick_nohz_switch_to_nohz函数切换至动态时钟模式:

首先,该函数通过tick_switch_to_oneshot函数把tick_device的工作模式设置为单触发模式,并把它的中断事件回调函数置换为tick_nohz_handler,接着把tick_sched结构中的模式字段设置为NOHZ_MODE_LOWRES:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static void tick_nohz_switch_to_nohz(void)
{
	struct tick_sched *ts = &__get_cpu_var(tick_cpu_sched);
	ktime_t next;

	if (!tick_nohz_enabled)
		return;

	local_irq_disable();
	if (tick_switch_to_oneshot(tick_nohz_handler)) {
		local_irq_enable();
		return;
	}

	ts->nohz_mode = NOHZ_MODE_LOWRES;

然后,初始化tick_sched结构中的sched_timer定时器,
通过tick_init_jiffy_update获取下一次tick事件的时间并初始化全局变量last_jiffies_update,
以便后续可以正确地更新jiffies计数值,最后,把下一次tick事件的时间编程到tick_device中,
到此,系统完成了到低分辨率动态时钟的切换过程。

	hrtimer_init(&ts->sched_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
	/* Get the next period */
	next = tick_init_jiffy_update();

	for (;;) {
		hrtimer_set_expires(&ts->sched_timer, next);
		if (!tick_program_event(next, 0))
			break;
		next = ktime_add(next, tick_period);
	}
	local_irq_enable();
}

上面的代码中,明明现在没有切换至高精度模式,为什么要初始化tick_sched结构中的高精度定时器?原因并不是要使用它的定时功能,而是想重用hrtimer代码中的hrtimer_forward函数,利用这个函数来计算下一次tick事件的时间。

2.2 低分辨率动态时钟下的事件中断处理函数

上一节提到,当切换至低分辨率动态时钟模式后,tick_device的事件中断处理函数会被设置为tick_nohz_handler,总体来说,它和周期时钟模式的事件处理函数tick_handle_periodic所完成的工作大致类似:更新时间、更新jiffies计数值、调用update_process_time更新进程信息和触发定时器软中断等等,最后重新编程tick_device,使得它在下一个正确的tick时刻再次触发本函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void tick_nohz_handler(struct clock_event_device *dev)
{
		......
	dev->next_event.tv64 = KTIME_MAX;

	if (unlikely(tick_do_timer_cpu == TICK_DO_TIMER_NONE))
		tick_do_timer_cpu = cpu;

	/* Check, if the jiffies need an update */
	if (tick_do_timer_cpu == cpu)
		tick_do_update_jiffies64(now);
		......  
	if (ts->tick_stopped) {
		touch_softlockup_watchdog();
		ts->idle_jiffies++;
	}

	update_process_times(user_mode(regs));
	profile_tick(CPU_PROFILING);

	while (tick_nohz_reprogram(ts, now)) {
		now = ktime_get();
		tick_do_update_jiffies64(now);
	}
}

因为现在工作于动态时钟模式,所以,tick时钟可能在idle进程中被停掉不止一个tick周期,所以当该函数被再次触发时,离上一次触发的时间可能已经不止一个tick周期,tick_nohz_reprogram对tick_device进行编程时必须正确地处理这一情况,它利用了前面所说的hrtimer_forward函数来实现这一特性:

1
2
3
4
5
static int tick_nohz_reprogram(struct tick_sched *ts, ktime_t now)
{
	hrtimer_forward(&ts->sched_timer, now, tick_period);
	return tick_program_event(hrtimer_get_expires(&ts->sched_timer), 0);
}
2.3 动态时钟:停止周期tick时钟事件

开启动态时钟模式后,周期时钟的开启和关闭由idle进程控制,idle进程内最终是一个循环,循环的一开始通过tick_nohz_idle_enter检测是否允许关闭周期时钟若干时间,然后进入低功耗的idle模式,当有中断事件使得cpu退出低功耗idle模式后,判断是否有新的进程被激活从而需要重新调度,如果需要则通过tick_nohz_idle_exit重新启用周期时钟,然后重新进行进程调度,等待下一次idle的发生,我们可以用下图来表示:

图2.3.1 idle进程中的动态时钟处理

停止周期时钟的时机在tick_nohz_idle_enter函数中,它把主要的工作交由tick_nohz_stop_sched_tick函数来完成。内核也不是每次进入tick_nohz_stop_sched_tick都会停止周期时钟,那么什么时候才会停止?我们想一想,这时候既然idle进程在运行,说明系统中的其他进程都在等待某种事件,系统处于无事所做的状态,唯一要处理的就是中断,除了定时器中断,其它的中断我们无法预测它会何时发生,但是我们可以知道最先一个到期的定时器的到期时间,也就是说,在该时间到期前,产生周期时钟是没有必要的,我们可以据此推算出周期时钟可以停止的tick数,然后重新对tick_device进行编程,使得在最早一个定时器到期前都不会产生周期时钟,实际上,tick_nohz_stop_sched_tick还做了一些限制:当下一个定时器的到期时间与当前jiffies值只相差1时,不会停止周期时钟,当定时器的到期时间与当前的jiffies值相差的时间大于timekeeper允许的最大idle时间时,则下一个tick时刻被设置timekeeper允许的最大idle时间,这主要是为了防止太长时间不去更新timekeeper中的系统时间,有可能导致clocksource的溢出问题。tick_nohz_stop_sched_tick函数体看起来很长,实现的也就是上述的逻辑,所以这里就不贴它的代码了,有兴趣的读者可以自行阅读内核的代码:kernel/time/tick-sched.c。

看了动态时钟的停止过程和tick_nohz_handler的实现方式,其实还有一个情况没有处理:当系统进入idle进程后,周期时钟被停止若干个tick周期,当这若干个tick周期到期后,tick事件必然会产生,tick_nohz_handler被触发调用,然后最先到期的定时器被处理。但是在tick_nohz_handler的最后,tick_device一定会被编程为紧跟着的下一个tick周期的时刻被触发,如果刚才的定时器处理后,并没有激活新的进程,我们的期望是周期时钟可以用下一个新的定时器重新计算可以停止的时间,而不是下一个tick时刻,但是tick_nohz_handler却仅仅简单地把tick_device的到期时间设为下一个周期的tick时刻,这导致了周期时钟被恢复,显然这不是我们想要的。为了处理这种情况,内核使用了一点小伎俩,我们知道定时器是在软中断中执行的,所以内核在irq_exit中的软件中断处理完后,加入了一小段代码,kernel/softirq.c :

1
2
3
4
5
6
7
8
9
10
11
12
13
void irq_exit(void)
{
		......
	if (!in_interrupt() && local_softirq_pending())
		invoke_softirq();

#ifdef CONFIG_NO_HZ
	/* Make sure that timer wheel updates are propagated */
	if (idle_cpu(smp_processor_id()) && !in_interrupt() && !need_resched())
		tick_nohz_irq_exit();
#endif
		......
}

关键的调用是tick_nohz_irq_exit:

1
2
3
4
5
6
7
8
9
void tick_nohz_irq_exit(void)
{
	struct tick_sched *ts = &__get_cpu_var(tick_cpu_sched);

	if (!ts->inidle)
		return;

	tick_nohz_stop_sched_tick(ts);
}

tick_nohz_irq_exit再次调用了tick_nohz_stop_sched_tick函数,使得系统有机会再次停止周期时钟若干个tick周期。

2.3 动态时钟:重新开启周期tick时钟事件

回到图2.3.1,当在idle进程中停止周期时钟后,在某一时刻,有新的进程被激活,在重新调度前,tick_nohz_idle_exit会被调用,该函数负责恢复被停止的周期时钟。tick_nohz_idle_exit最终会调用tick_nohz_restart函数,由tick_nohz_restart函数最后完成恢复周期时钟的工作。函数并不复杂:先是把上一次停止周期时钟的时刻设置到tick_sched结构的sched_timer定时器中,然后在通过hrtimer_forward函数把该定时器的到期时刻设置为当前时间的下一个tick时刻,对于高精度模式,启动该定时器即可,对于低分辨率模式,使用该时间对tick_device重新编程,最后通过tick_do_update_jiffies64更新jiffies数值,为了防止此时正在一个tick时刻的边界,可能当前时刻正好刚刚越过了该到期时间,函数使用了一个while循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void tick_nohz_restart(struct tick_sched *ts, ktime_t now)
{
	hrtimer_cancel(&ts->sched_timer);
	hrtimer_set_expires(&ts->sched_timer, ts->idle_tick);

	while (1) {
		/* Forward the time to expire in the future */
		hrtimer_forward(&ts->sched_timer, now, tick_period);

		if (ts->nohz_mode == NOHZ_MODE_HIGHRES) {
			hrtimer_start_expires(&ts->sched_timer,
						  HRTIMER_MODE_ABS_PINNED);
			/* Check, if the timer was already in the past */
			if (hrtimer_active(&ts->sched_timer))
				break;
		} else {
			if (!tick_program_event(
				hrtimer_get_expires(&ts->sched_timer), 0))
				break;
		}
		/* Reread time and update jiffies */
		now = ktime_get();
		tick_do_update_jiffies64(now);
	}
}

3. 高精度模式下的动态时钟

高精度模式和低分辨率模式的主要区别是在切换过程中,怎样切换到高精度模式,我已经在上一篇文章中做了说明,切换到高精度模式后,动态时钟的开启和关闭和低分辨率模式下没有太大的区别,也是通过tick_nohz_stop_sched_tick和tick_nohz_restart来控制,在这两个函数中,分别判断了当前的两种模式:

NOHZ_MODE_HIGHRES
NOHZ_MODE_LOWRES

如果是NOHZ_MODE_HIGHRES则对tick_sched结构的sched_timer定时器进行设置,如果是NOHZ_MODE_LOWRES,则直接对tick_device进行操作。

4. 动态时钟对中断的影响

在进入和退出中断时,因为动态时钟的关系,中断系统需要作出一些配合。先说中断发生于周期时钟停止期间,如果不做任何处理,中断服务程序中如果要访问jiffies计数值,可能得到一个滞后的jiffies值,因为正常状态下,jiffies值会在恢复周期时钟时正确地更新,所以,为了防止这种情况发生,在进入中断的irq_enter期间,tick_check_idle会被调用:

1
2
3
4
5
void tick_check_idle(int cpu)
{
	tick_check_oneshot_broadcast(cpu);
	tick_check_nohz(cpu);
}

tick_check_nohz函数的最重要的作用就是更新jiffies计数值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static inline void tick_check_nohz(int cpu)
{
	struct tick_sched *ts = &per_cpu(tick_cpu_sched, cpu);
	ktime_t now;

	if (!ts->idle_active && !ts->tick_stopped)
		return;
	now = ktime_get();
	if (ts->idle_active)
		tick_nohz_stop_idle(cpu, now);
	if (ts->tick_stopped) {
		tick_nohz_update_jiffies(now);
		tick_nohz_kick_tick(cpu, now);
	}
}

另外一种情况是在退出定时器中断时,需要重新评估周期时钟的运行状况,这一点已经在2.3节中做了说明,这里就不在赘述了。

Linux时间子系统之七:定时器的应用--msleep(),hrtimer_nanosleep()

http://blog.csdn.net/DroidPhone/article/details/8104433

我们已经在前面几章介绍了低分辨率定时器和高精度定时器的实现原理,内核为了方便其它子系统,在时间子系统中提供了一些用于延时或调度的API,例如msleep,hrtimer_nanosleep等等,这些API基于低分辨率定时器或高精度定时器来实现,本章的内容就是讨论这些方便、好用的API是如何利用定时器系统来完成所需的功能的。

1. msleep

msleep相信大家都用过,它可能是内核用使用最广泛的延时函数之一,它会使当前进程被调度并让出cpu一段时间,因为这一特性,它不能用于中断上下文,只能用于进程上下文中。要想在中断上下文中使用延时函数,请使用会阻塞cpu的无调度版本mdelay。msleep的函数原型如下:

1
void msleep(unsigned int msecs)

延时的时间由参数msecs指定,单位是毫秒,事实上,msleep的实现基于低分辨率定时器,所以msleep的实际精度只能也是1/HZ级别。内核还提供了另一个比较类似的延时函数msleep_interruptible:

1
unsigned long msleep_interruptible(unsigned int msecs)

延时的单位同样毫秒数,它们的区别如下:

函数                    延时单位        返回值          是否可被信号中断
msleep                  毫秒            无              否
msleep_interruptible    毫秒            未完成的毫秒数  是

最主要的区别就是msleep会保证所需的延时一定会被执行完,而msleep_interruptible则可以在延时进行到一半时被信号打断而退出延时,剩余的延时数则通过返回值返回。两个函数最终的代码都会到达schedule_timeout函数,它们的调用序列如下图所示:

图1.1 两个延时函数的调用序列

下面我们看看schedule_timeout函数的实现,函数首先处理两种特殊情况,一种是传入的延时jiffies数是个负数,则打印一句警告信息,然后马上返回,另一种是延时jiffies数是MAX_SCHEDULE_TIMEOUT,表明需要一直延时,直接执行调度即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
signed long __sched schedule_timeout(signed long timeout)
{
	struct timer_list timer;
	unsigned long expire;

	switch (timeout)
	{
	case MAX_SCHEDULE_TIMEOUT:
		schedule();
		goto out;
	default:
		if (timeout < 0) {
			printk(KERN_ERR "schedule_timeout: wrong timeout "
				"value %lx\n", timeout);
			dump_stack();
			current->state = TASK_RUNNING;
			goto out;
		}
	}

然后计算到期的jiffies数,并在堆栈上建立一个低分辨率定时器,把到期时间设置到该定时器中,启动定时器后,通过schedule把当前进程调度出cpu的运行队列:

1
2
3
4
5
expire = timeout + jiffies;

setup_timer_on_stack(&timer, process_timeout, (unsigned long)current);
__mod_timer(&timer, expire, false, TIMER_NOT_PINNED);
schedule();

到这个时候,进程已经被调度走,那它如何返回继续执行?我们看到定时器的到期回调函数是process_timeout,参数是当前进程的task_struct指针,看看它的实现:

1
2
3
4
static void process_timeout(unsigned long __data)
{
	wake_up_process((struct task_struct *)__data);
}

噢,没错,定时器一旦到期,进程会被唤醒并继续执行:

1
2
3
4
5
6
7
8
9
10
	del_singleshot_timer_sync(&timer);

	/* Remove the timer from the object tracker */
	destroy_timer_on_stack(&timer);

	timeout = expire - jiffies;

 out:
	return timeout < 0 ? 0 : timeout;
}

schedule返回后,说明要不就是定时器到期,要不就是因为其它时间导致进程被唤醒,函数要做的就是删除在堆栈上建立的定时器,返回剩余未完成的jiffies数。

说完了关键的schedule_timeout函数,我们看看msleep如何实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
signed long __sched schedule_timeout_uninterruptible(signed long timeout)
{
	__set_current_state(TASK_UNINTERRUPTIBLE);
	return schedule_timeout(timeout);
}

void msleep(unsigned int msecs)
{
	unsigned long timeout = msecs_to_jiffies(msecs) + 1;

	while (timeout)
		timeout = schedule_timeout_uninterruptible(timeout);
}

msleep先是把毫秒转换为jiffies数,通过一个while循环保证所有的延时被执行完毕,延时操作通过schedule_timeout_uninterruptible函数完成,它仅仅是在把进程的状态修改为TASK_UNINTERRUPTIBLE后,调用上述的schedule_timeout来完成具体的延时操作,TASK_UNINTERRUPTIBLE状态保证了msleep不会被信号唤醒,也就意味着在msleep期间,进程不能被kill掉。

看看msleep_interruptible的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
signed long __sched schedule_timeout_interruptible(signed long timeout)
{
	__set_current_state(TASK_INTERRUPTIBLE);
	return schedule_timeout(timeout);
}

unsigned long msleep_interruptible(unsigned int msecs)
{
	unsigned long timeout = msecs_to_jiffies(msecs) + 1;

	while (timeout && !signal_pending(current))
		timeout = schedule_timeout_interruptible(timeout);
	return jiffies_to_msecs(timeout);
}

msleep_interruptible通过schedule_timeout_interruptible中转,schedule_timeout_interruptible的唯一区别就是把进程的状态设置为了TASK_INTERRUPTIBLE,说明在延时期间有信号通知,while循环会马上终止,剩余的jiffies数被转换成毫秒返回。实际上,你也可以利用schedule_timeout_interruptible或schedule_timeout_uninterruptible构造自己的延时函数,同时,内核还提供了另外一个类似的函数,不用我解释,看代码就知道它的用意了:

1
2
3
4
5
signed long __sched schedule_timeout_killable(signed long timeout)
{
	__set_current_state(TASK_KILLABLE);
	return schedule_timeout(timeout);
}

2. hrtimer_nanosleep

第一节讨论的msleep函数基于时间轮定时系统,只能提供毫秒级的精度,实际上,它的精度取决于HZ的配置值,如果HZ小于1000,它甚至无法达到毫秒级的精度,要想得到更为精确的延时,我们自然想到的是要利用高精度定时器来实现。没错,linux为用户空间提供了一个api:nanosleep,它能提供纳秒级的延时精度,该用户空间函数对应的内核实现是sys_nanosleep,它的工作交由高精度定时器系统的hrtimer_nanosleep函数实现,最终的大部分工作则由do_nanosleep完成。调用过程如下图所示:

图 2.1 nanosleep的调用过程

与msleep的实现相类似,hrtimer_nanosleep函数首先在堆栈中创建一个高精度定时器,设置它的到期时间,然后通过do_nanosleep完成最终的延时工作,当前进程在挂起相应的延时时间后,退出do_nanosleep函数,销毁堆栈中的定时器并返回0值表示执行成功。不过do_nanosleep可能在没有达到所需延时数量时由于其它原因退出,如果出现这种情况,hrtimer_nanosleep的最后部分把剩余的延时时间记入进程的restart_block中,并返回ERESTART_RESTARTBLOCK错误代码,系统或者用户空间可以根据此返回值决定是否重新调用nanosleep以便把剩余的延时继续执行完成。下面是hrtimer_nanosleep的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
long hrtimer_nanosleep(struct timespec *rqtp, struct timespec __user *rmtp,
			   const enum hrtimer_mode mode, const clockid_t clockid)
{
	struct restart_block *restart;
	struct hrtimer_sleeper t;
	int ret = 0;
	unsigned long slack;

	slack = current->timer_slack_ns;
	if (rt_task(current))
		slack = 0;

	hrtimer_init_on_stack(&t.timer, clockid, mode);
	hrtimer_set_expires_range_ns(&t.timer, timespec_to_ktime(*rqtp), slack);
	if (do_nanosleep(&t, mode))
		goto out;

	/* Absolute timers do not update the rmtp value and restart: */
	if (mode == HRTIMER_MODE_ABS) {
		ret = -ERESTARTNOHAND;
		goto out;
	}

	if (rmtp) {
		ret = update_rmtp(&t.timer, rmtp);
		if (ret <= 0)
			goto out;
	}

	restart = ¤t_thread_info()->restart_block;
	restart->fn = hrtimer_nanosleep_restart;
	restart->nanosleep.clockid = t.timer.base->clockid;
	restart->nanosleep.rmtp = rmtp;
	restart->nanosleep.expires = hrtimer_get_expires_tv64(&t.timer);

	ret = -ERESTART_RESTARTBLOCK;
out:
	destroy_hrtimer_on_stack(&t.timer);
	return ret;
}

接着我们看看do_nanosleep的实现代码,它首先通过hrtimer_init_sleeper函数,把定时器的回调函数设置为hrtimer_wakeup,把当前进程的task_struct结构指针保存在hrtimer_sleeper结构的task字段中:

1
2
3
4
5
6
7
8
9
10
void hrtimer_init_sleeper(struct hrtimer_sleeper *sl, struct task_struct *task)
{
	sl->timer.function = hrtimer_wakeup;
	sl->task = task;
}
EXPORT_SYMBOL_GPL(hrtimer_init_sleeper);

static int __sched do_nanosleep(struct hrtimer_sleeper *t, enum hrtimer_mode mode)
{
	hrtimer_init_sleeper(t, current);

然后,通过一个do/while循环内:启动定时器,挂起当前进程,等待定时器或其它事件唤醒进程。这里的循环体实现比较怪异,它使用hrtimer_active函数间接地判断定时器是否到期,如果hrtimer_active返回false,说明定时器已经过期,然后把hrtimer_sleeper结构的task字段设置为NULL,从而导致循环体的结束,另一个结束条件是当前进程收到了信号事件,所以,当因为是定时器到期而退出时,do_nanosleep返回true,否则返回false,上述的hrtimer_nanosleep正是利用了这一特性来决定它的返回值。以下是do_nanosleep循环体的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
	do {
		set_current_state(TASK_INTERRUPTIBLE);
		hrtimer_start_expires(&t->timer, mode);
		if (!hrtimer_active(&t->timer))
			t->task = NULL;

		if (likely(t->task))
			schedule();

		hrtimer_cancel(&t->timer);
		mode = HRTIMER_MODE_ABS;

	} while (t->task && !signal_pending(current));

	__set_current_state(TASK_RUNNING);

	return t->task == NULL;
}

除了hrtimer_nanosleep,高精度定时器系统还提供了几种用于延时/挂起进程的api:

schedule_hrtimeout    使得当前进程休眠指定的时间,使用CLOCK_MONOTONIC计时系统;
schedule_hrtimeout_range    使得当前进程休眠指定的时间范围,使用CLOCK_MONOTONIC计时系统;
schedule_hrtimeout_range_clock    使得当前进程休眠指定的时间范围,可以自行指定计时系统;
usleep_range 使得当前进程休眠指定的微妙数,使用CLOCK_MONOTONIC计时系统;

它们之间的调用关系如下:

图 2.2 schedule_hrtimeout_xxxx系列函数

最终,所有的实现都会进入到schedule_hrtimeout_range_clock函数。需要注意的是schedule_hrtimeout_xxxx系列函数在调用前,最好利用set_current_state函数先设置进程的状态,在这些函数返回前,进城的状态会再次被设置为TASK_RUNNING。如果事先把状态设置为TASK_UNINTERRUPTIBLE,它们会保证函数返回前一定已经经过了所需的延时时间,如果事先把状态设置为TASK_INTERRUPTIBLE,则有可能在尚未到期时由其它信号唤醒进程从而导致函数返回。主要实现该功能的函数schedule_hrtimeout_range_clock和前面的do_nanosleep函数实现原理基本一致。大家可以自行参考内核的代码,它们位于:kernel/hrtimer.c。

Linux时间子系统之六:高精度定时器(HRTIMER)的原理和实现

http://blog.csdn.net/DroidPhone/article/details/8074892

上一篇文章,我介绍了传统的低分辨率定时器的实现原理。而随着内核的不断演进,大牛们已经对这种低分辨率定时器的精度不再满足,而且,硬件也在不断地发展,系统中的定时器硬件的精度也越来越高,这也给高分辨率定时器的出现创造了条件。内核从2.6.16开始加入了高精度定时器架构。在实现方式上,内核的高分辨率定时器的实现代码几乎没有借用低分辨率定时器的数据结构和代码,内核文档给出的解释主要有以下几点:

低分辨率定时器的代码和jiffies的关系太过紧密,并且默认按32位进行设计,并且它的代码已经经过长时间的优化,目前的使用也是没有任何错误,如果硬要基于它来实现高分辨率定时器,势必会打破原有的时间轮概念,并且会引入一大堆#if–#else判断;

虽然大部分时间里,时间轮可以实现O(1)时间复杂度,但是当有进位发生时,不可预测的O(N)定时器级联迁移时间,这对于低分辨率定时器来说问题不大,可是它大大地影响了定时器的精度;

低分辨率定时器几乎是为“超时”而设计的,并为此对它进行了大量的优化,对于这些以“超时”未目的而使用定时器,它们大多数期望在超时到来之前获得正确的结果,然后删除定时器,精确时间并不是它们主要的目的,例如网络通信、设备IO等等。

为此,内核为高精度定时器重新设计了一套软件架构,它可以为我们提供纳秒级的定时精度,以满足对精确时间有迫切需求的应用程序或内核驱动,例如多媒体应用,音频设备的驱动程序等等。以下的讨论用hrtimer(high resolution timer)表示高精度定时器。

1. 如何组织hrtimer?

我们知道,低分辨率定时器使用5个链表数组来组织timer_list结构,形成了著名的时间轮概念,对于高分辨率定时器,我们期望组织它们的数据结构至少具备以下条件:

稳定而且快速的查找能力;
快速地插入和删除定时器的能力;
排序功能;

内核的开发者考察了多种数据结构,例如基数树、哈希表等等,最终他们选择了红黑树(rbtree)来组织hrtimer,红黑树已经以库的形式存在于内核中,并被成功地使用在内存管理子系统和文件系统中,随着系统的运行,hrtimer不停地被创建和销毁,新的hrtimer按顺序被插入到红黑树中,树的最左边的节点就是最快到期的定时器,内核用一个hrtimer结构来表示一个高精度定时器:

1
2
3
4
5
6
7
8
struct hrtimer {
	struct timerqueue_node      node;
	ktime_t             _softexpires;
	enum hrtimer_restart        (*function)(struct hrtimer *);
	struct hrtimer_clock_base   *base;
	unsigned long           state;
		......
};

定时器的到期时间用ktime_t来表示,_softexpires字段记录了时间,定时器一旦到期,function字段指定的回调函数会被调用,该函数的返回值为一个枚举值,它决定了该hrtimer是否需要被重新激活:

1
2
3
4
enum hrtimer_restart {
	HRTIMER_NORESTART,  /* Timer is not restarted */
	HRTIMER_RESTART,    /* Timer must be restarted */
};

state字段用于表示hrtimer当前的状态,有几下几种位组合:

1
2
3
4
#define HRTIMER_STATE_INACTIVE  0x00  // 定时器未激活
#define HRTIMER_STATE_ENQUEUED  0x01  // 定时器已经被排入红黑树中
#define HRTIMER_STATE_CALLBACK  0x02  // 定时器的回调函数正在被调用
#define HRTIMER_STATE_MIGRATE   0x04  // 定时器正在CPU之间做迁移

hrtimer的到期时间可以基于以下几种时间基准系统:

1
2
3
4
5
6
enum  hrtimer_base_type {
	HRTIMER_BASE_MONOTONIC,  // 单调递增的monotonic时间,不包含休眠时间
	HRTIMER_BASE_REALTIME,   // 平常使用的墙上真实时间
	HRTIMER_BASE_BOOTTIME,   // 单调递增的boottime,包含休眠时间
	HRTIMER_MAX_CLOCK_BASES, // 用于后续数组的定义
};

和低分辨率定时器一样,处于效率和上锁的考虑,每个cpu单独管理属于自己的hrtimer,为此,专门定义了一个结构hrtimer_cpu_base:

1
2
3
4
struct hrtimer_cpu_base {
		......
	struct hrtimer_clock_base   clock_base[HRTIMER_MAX_CLOCK_BASES];
};

其中,clock_base数组为每种时间基准系统都定义了一个hrtimer_clock_base结构,它的定义如下:

1
2
3
4
5
6
7
8
9
struct hrtimer_clock_base {
	struct hrtimer_cpu_base *cpu_base;  // 指向所属cpu的hrtimer_cpu_base结构
		......
	struct timerqueue_head  active;     // 红黑树,包含了所有使用该时间基准系统的hrtimer
	ktime_t         resolution; // 时间基准系统的分辨率
	ktime_t         (*get_time)(void); // 获取该基准系统的时间函数
	ktime_t         softirq_time;// 当用jiffies
	ktime_t         offset;      // 
};

active字段是一个timerqueue_head结构,它实际上是对rbtree的进一步封装:

1
2
3
4
5
6
7
8
9
struct timerqueue_node {
	struct rb_node node;  // 红黑树的节点
	ktime_t expires;      // 该节点代表队hrtimer的到期时间,与hrtimer结构中的_softexpires稍有不同
};

struct timerqueue_head {
	struct rb_root head;          // 红黑树的根节点
	struct timerqueue_node *next; // 该红黑树中最早到期的节点,也就是最左下的节点
};

timerqueue_head结构在红黑树的基础上,增加了一个next字段,用于保存树中最先到期的定时器节点,实际上就是树的最左下方的节点,有了next字段,当到期事件到来时,系统不必遍历整个红黑树,只要取出next字段对应的节点进行处理即可。timerqueue_node用于表示一个hrtimer节点,它在标准红黑树节点rb_node的基础上增加了expires字段,该字段和hrtimer中的softexpires字段一起,设定了hrtimer的到期时间的一个范围,hrtimer可以在hrtimer.softexpires至timerqueue_node.expires之间的任何时刻到期,我们也称timerqueue_node.expires为硬过期时间(hard),意思很明显:到了此时刻,定时器一定会到期,有了这个范围可以选择,定时器系统可以让范围接近的多个定时器在同一时刻同时到期,这种设计可以降低进程频繁地被hrtimer进行唤醒。经过以上的讨论,我们可以得出以下的图示,它表明了每个cpu上的hrtimer是如何被组织在一起的:

图 1.1 每个cpu的hrtimer组织结构

总结一下:

每个cpu有一个hrtimer_cpu_base结构;
hrtimer_cpu_base结构管理着3种不同的时间基准系统的hrtimer,分别是:实时时间,启动时间和单调时间;
每种时间基准系统通过它的active字段(timerqueue_head结构指针),指向它们各自的红黑树;
红黑树上,按到期时间进行排序,最先到期的hrtimer位于最左下的节点,并被记录在active.next字段中;
3中时间基准的最先到期时间可能不同,所以,它们之中最先到期的时间被记录在hrtimer_cpu_base的expires_next字段中。

2. hrtimer如何运转

hrtimer的实现需要一定的硬件基础,它的实现依赖于我们前几章介绍的timekeeper和clock_event_device,如果你对timekeeper和clock_event_device不了解请参考以下文章:Linux时间子系统之三:时间的维护者:timekeeper,Linux时间子系统之四:定时器的引擎:clock_event_device。hrtimer系统需要通过timekeeper获取当前的时间,计算与到期时间的差值,并根据该差值,设定该cpu的tick_device(clock_event_device)的下一次的到期时间,时间一到,在clock_event_device的事件回调函数中处理到期的hrtimer。现在你或许有疑问:前面在介绍clock_event_device时,我们知道,每个cpu有自己的tick_device,通常用于周期性地产生进程调度和时间统计的tick事件,这里又说要用tick_device调度hrtimer系统,通常cpu只有一个tick_device,那他们如何协调工作?这个问题也一度困扰着我,如果再加上NO_HZ配置带来tickless特性,你可能会更晕。这里我们先把这个疑问放下,我将在后面的章节中来讨论这个问题,现在我们只要先知道,一旦开启了hrtimer,tick_device所关联的clock_event_device的事件回调函数会被修改为:hrtimer_interrupt,并且会被设置成工作于CLOCK_EVT_MODE_ONESHOT单触发模式。

2.1 添加一个hrtimer

要添加一个hrtimer,系统提供了一些api供我们使用,首先我们需要定义一个hrtimer结构的实例,然后用hrtimer_init函数对它进行初始化,它的原型如下:

1
2
void hrtimer_init(struct hrtimer *timer, clockid_t which_clock,
			 enum hrtimer_mode mode);

which_clock可以是CLOCK_REALTIME、CLOCK_MONOTONIC、CLOCK_BOOTTIME中的一种,mode则可以是相对时间HRTIMER_MODE_REL,也可以是绝对时间HRTIMER_MODE_ABS。设定回调函数:

1
timer.function = hr_callback;

如果定时器无需指定一个到期范围,可以在设定回调函数后直接使用hrtimer_start激活该定时器:

1
2
int hrtimer_start(struct hrtimer *timer, ktime_t tim,
			 const enum hrtimer_mode mode);

如果需要指定到期范围,则可以使用hrtimer_start_range_ns激活定时器:

1
2
hrtimer_start_range_ns(struct hrtimer *timer, ktime_t tim,
			unsigned long range_ns, const enum hrtimer_mode mode);

要取消一个hrtimer,使用hrtimer_cancel:

1
int hrtimer_cancel(struct hrtimer *timer);

以下两个函数用于推后定时器的到期时间:

1
2
3
4
5
6
7
8
9
extern u64
hrtimer_forward(struct hrtimer *timer, ktime_t now, ktime_t interval);

/* Forward a hrtimer so it expires after the hrtimer's current now */
static inline u64 hrtimer_forward_now(struct hrtimer *timer,
					  ktime_t interval)
{
	return hrtimer_forward(timer, timer->base->get_time(), interval);
}

以下几个函数用于获取定时器的当前状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static inline int hrtimer_active(const struct hrtimer *timer)
{
	return timer->state != HRTIMER_STATE_INACTIVE;
}

static inline int hrtimer_is_queued(struct hrtimer *timer)
{
	return timer->state & HRTIMER_STATE_ENQUEUED;
}

static inline int hrtimer_callback_running(struct hrtimer *timer)
{
	return timer->state & HRTIMER_STATE_CALLBACK;
}

hrtimer_init最终会进入__hrtimer_init函数,该函数的主要目的是初始化hrtimer的base字段,同时初始化作为红黑树的节点的node字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void __hrtimer_init(struct hrtimer *timer, clockid_t clock_id,
			   enum hrtimer_mode mode)
{
	struct hrtimer_cpu_base *cpu_base;
	int base;

	memset(timer, 0, sizeof(struct hrtimer));

	cpu_base = &__raw_get_cpu_var(hrtimer_bases);

	if (clock_id == CLOCK_REALTIME && mode != HRTIMER_MODE_ABS)
		clock_id = CLOCK_MONOTONIC;

	base = hrtimer_clockid_to_base(clock_id);
	timer->base = &cpu_base->clock_base[base];
	timerqueue_init(&timer->node);
		......
}

hrtimer_start和hrtimer_start_range_ns最终会把实际的工作交由__hrtimer_start_range_ns来完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int __hrtimer_start_range_ns(struct hrtimer *timer, ktime_t tim,
		unsigned long delta_ns, const enum hrtimer_mode mode,
		int wakeup)
{
	......        
	/* 取得hrtimer_clock_base指针 */
	base = lock_hrtimer_base(timer, &flags); 
	/* 如果已经在红黑树中,先移除它: */
	ret = remove_hrtimer(timer, base); ......
	/* 如果是相对时间,则需要加上当前时间,因为内部是使用绝对时间 */
	if (mode & HRTIMER_MODE_REL) {
			tim = ktime_add_safe(tim, new_base->get_time());
			......
	} 
	/* 设置到期的时间范围 */
	hrtimer_set_expires_range_ns(timer, tim, delta_ns);
	...... 
	/* 把hrtime按到期时间排序,加入到对应时间基准系统的红黑树中 */
	/* 如果该定时器的是最早到期的,将会返回true */
	leftmost = enqueue_hrtimer(timer, new_base);
	/*
	* Only allow reprogramming if the new base is on this CPU.
	* (it might still be on another CPU if the timer was pending)
	*
	* XXX send_remote_softirq() ? 
	* 定时器比之前的到期时间要早,所以需要重新对tick_device进行编程,重新设定的的到期时间 
	*/
	if (leftmost && new_base->cpu_base == &__get_cpu_var(hrtimer_bases))
			hrtimer_enqueue_reprogram(timer, new_base, wakeup);
	unlock_hrtimer_base(timer, &flags);
	return ret;
}
2.2 hrtimer的到期处理

高精度定时器系统有3个入口可以对到期定时器进行处理,它们分别是:

没有切换到高精度模式时,在每个jiffie的tick事件中断中进行查询和处理;
在HRTIMER_SOFTIRQ软中断中进行查询和处理;
切换到高精度模式后,在每个clock_event_device的到期事件中断中进行查询和处理;

低精度模式 因为系统并不是一开始就会支持高精度模式,而是在系统启动后的某个阶段,等待所有的条件都满足后,才会切换到高精度模式,当系统还没有切换到高精度模式时,所有的高精度定时器运行在低精度模式下,在每个jiffie的tick事件中断中进行到期定时器的查询和处理,显然这时候的精度和低分辨率定时器是一样的(HZ级别)。低精度模式下,每个tick事件中断中,hrtimer_run_queues函数会被调用,由它完成定时器的到期处理。hrtimer_run_queues首先判断目前高精度模式是否已经启用,如果已经切换到了高精度模式,什么也不做,直接返回:

1
2
3
4
5
void hrtimer_run_queues(void)
{

	if (hrtimer_hres_active())
		return;

如果hrtimer_hres_active返回false,说明目前处于低精度模式下,则继续处理,它用一个for循环遍历各个时间基准系统,查询每个hrtimer_clock_base对应红黑树的左下节点,判断它的时间是否到期,如果到期,通过__run_hrtimer函数,对到期定时器进行处理,包括:调用定时器的回调函数、从红黑树中移除该定时器、根据回调函数的返回值决定是否重新启动该定时器等等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
for (index = 0; index < HRTIMER_MAX_CLOCK_BASES; index++) {
	base = &cpu_base->clock_base[index];
	if (!timerqueue_getnext(&base->active))
		continue;

	if (gettime) {
		hrtimer_get_softirq_time(cpu_base);
		gettime = 0;
	}

	raw_spin_lock(&cpu_base->lock);

	while ((node = timerqueue_getnext(&base->active))) {
		struct hrtimer *timer;

		timer = container_of(node, struct hrtimer, node);
		if (base->softirq_time.tv64 <=
				hrtimer_get_expires_tv64(timer))
			break;

		__run_hrtimer(timer, &base->softirq_time);
	}
	raw_spin_unlock(&cpu_base->lock);
}

上面的timerqueue_getnext函数返回红黑树中的左下节点,之所以可以在while循环中使用该函数,是因为__run_hrtimer会在移除旧的左下节点时,新的左下节点会被更新到base->active->next字段中,使得循环可以继续执行,直到没有新的到期定时器为止。

高精度模式 切换到高精度模式后,原来给cpu提供tick事件的tick_device(clock_event_device)会被高精度定时器系统接管,它的中断事件回调函数被设置为hrtimer_interrupt,红黑树中最左下的节点的定时器的到期时间被编程到该clock_event_device中,这样每次clock_event_device的中断意味着至少有一个高精度定时器到期。另外,当timekeeper系统中的时间需要修正,或者clock_event_device的到期事件时间被重新编程时,系统会发出HRTIMER_SOFTIRQ软中断,软中断的处理函数run_hrtimer_softirq最终也会调用hrtimer_interrupt函数对到期定时器进行处理,所以在这里我们只要讨论hrtimer_interrupt函数的实现即可。

hrtimer_interrupt函数的前半部分和低精度模式下的hrtimer_run_queues函数完成相同的事情:它用一个for循环遍历各个时间基准系统,查询每个hrtimer_clock_base对应红黑树的左下节点,判断它的时间是否到期,如果到期,通过__run_hrtimer函数,对到期定时器进行处理,所以我们只讨论后半部分,在处理完所有到期定时器后,下一个到期定时器的到期时间保存在变量expires_next中,接下来的工作就是把这个到期时间编程到tick_device中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
void hrtimer_interrupt(struct clock_event_device *dev)
{
		......
	for (i = 0; i < HRTIMER_MAX_CLOCK_BASES; i++) {
				......
		while ((node = timerqueue_getnext(&base->active))) {
						......
			if (basenow.tv64 < hrtimer_get_softexpires_tv64(timer)) {
				ktime_t expires;

				expires = ktime_sub(hrtimer_get_expires(timer),
							base->offset);
				if (expires.tv64 < expires_next.tv64)
					expires_next = expires;
				break;
			}

			__run_hrtimer(timer, &basenow);
		}
	}

	/* 
	 * Store the new expiry value so the migration code can verify 
	 * against it. 
	 */
	cpu_base->expires_next = expires_next;
	raw_spin_unlock(&cpu_base->lock);

	/* Reprogramming necessary ? */
	if (expires_next.tv64 == KTIME_MAX ||
		!tick_program_event(expires_next, 0)) {
		cpu_base->hang_detected = 0;
		return;
	}

如果这时的tick_program_event返回了非0值,表示过期时间已经在当前时间的前面,这通常由以下原因造成:

	系统正在被调试跟踪,导致时间在走,程序不走;
	定时器的回调函数花了太长的时间;
	系统运行在虚拟机中,而虚拟机被调度导致停止运行;

为了避免这些情况的发生,接下来系统提供3次机会,重新执行前面的循环,处理到期的定时器:


	raw_spin_lock(&cpu_base->lock);
	now = hrtimer_update_base(cpu_base);
	cpu_base->nr_retries++;
	if (++retries < 3)
		goto retry;

如果3次循环后还无法完成到期处理,系统不再循环,转为计算本次总循环的时间,
然后把tick_device的到期时间强制设置为当前时间加上本次的总循环时间,不过推后的时间被限制在100ms以内:


	delta = ktime_sub(now, entry_time);
	if (delta.tv64 > cpu_base->max_hang_time.tv64)
		cpu_base->max_hang_time = delta;
	/* 
	 * Limit it to a sensible value as we enforce a longer 
	 * delay. Give the CPU at least 100ms to catch up. 
	 */
	if (delta.tv64 > 100 * NSEC_PER_MSEC)
		expires_next = ktime_add_ns(now, 100 * NSEC_PER_MSEC);
	else
		expires_next = ktime_add(now, delta);
	tick_program_event(expires_next, 1);
	printk_once(KERN_WARNING "hrtimer: interrupt took %llu ns\n",
			ktime_to_ns(delta));
}

3. 切换到高精度模式

上面提到,尽管内核配置成支持高精度定时器,但并不是一开始就工作于高精度模式,系统在启动的开始阶段,还是按照传统的模式在运行:tick_device按HZ频率定期地产生tick事件,这时的hrtimer工作在低分辨率模式,到期事件在每个tick事件中断中由hrtimer_run_queues函数处理,同时,在低分辨率定时器(时间轮)的软件中断TIMER_SOFTIRQ中,hrtimer_run_pending会被调用,系统在这个函数中判断系统的条件是否满足切换到高精度模式,如果条件满足,则会切换至高分辨率模式,另外提一下,NO_HZ模式也是在该函数中判断并切换。

1
2
3
4
5
6
7
8
void hrtimer_run_pending(void)
{
	if (hrtimer_hres_active())
		return;
		......
	if (tick_check_oneshot_change(!hrtimer_is_hres_enabled()))
		hrtimer_switch_to_hres();
}

因为不管系统是否工作于高精度模式,每个TIMER_SOFTIRQ期间,该函数都会被调用,所以函数一开始先用hrtimer_hres_active判断目前高精度模式是否已经激活,如果已经激活,则说明之前的调用中已经切换了工作模式,不必再次切换,直接返回。hrtimer_hres_active很简单:

1
2
3
4
5
6
7
8
DEFINE_PER_CPU(struct hrtimer_cpu_base, hrtimer_bases) = {
		......
}

static inline int hrtimer_hres_active(void)
{
	return __this_cpu_read(hrtimer_bases.hres_active);
}

hrtimer_run_pending函数接着通过tick_check_oneshot_change判断系统是否可以切换到高精度模式,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int tick_check_oneshot_change(int allow_nohz)
{
	struct tick_sched *ts = &__get_cpu_var(tick_cpu_sched);

	if (!test_and_clear_bit(0, &ts->check_clocks))
		return 0;

	if (ts->nohz_mode != NOHZ_MODE_INACTIVE)
		return 0;

	if (!timekeeping_valid_for_hres() || !tick_is_oneshot_available())
		return 0;

	if (!allow_nohz)
		return 1;

	tick_nohz_switch_to_nohz();
	return 0;
}

函数的一开始先判断check_clock标志的第0位是否被置位,如果没有置位,说明系统中没有注册符合要求的时钟事件设备,函数直接返回,check_clock标志由clocksource和clock_event_device系统的notify系统置位,当系统中有更高精度的clocksource被注册和选择后,或者有更精确的支持CLOCK_EVT_MODE_ONESHOT模式的clock_event_device被注册时,通过它们的notify函数,check_clock标志的第0为会置位。

如果tick_sched结构中的nohz_mode字段不是NOHZ_MODE_INACTIVE,表明系统已经切换到其它模式,直接返回。nohz_mode的取值有3种:

NOHZ_MODE_INACTIVE    // 未启用NO_HZ模式
NOHZ_MODE_LOWRES      // 启用NO_HZ模式,hrtimer工作于低精度模式下
NOHZ_MODE_HIGHRES     // 启用NO_HZ模式,hrtimer工作于高精度模式下

接下来的timerkeeping_valid_for_hres判断timekeeper系统是否支持高精度模式,tick_is_oneshot_available判断tick_device是否支持CLOCK_EVT_MODE_ONESHOT模式。如果都满足要求,则继续往下判断。allow_nohz是函数的参数,为true表明可以切换到NOHZ_MODE_LOWRES 模式,函数将进入tick_nohz_switch_to_nohz,切换至NOHZ_MODE_LOWRES 模式,这里我们传入的allow_nohz是表达式:

1
(!hrtimer_is_hres_enabled())

所以当系统不允许高精度模式时,将会在tick_check_oneshot_change函数内,通过tick_nohz_switch_to_nohz切换至NOHZ_MODE_LOWRES 模式,如果系统允许高精度模式,传入的allow_nohz参数为false,tick_check_oneshot_change函数返回1,回到上面的hrtimer_run_pending函数,hrtimer_switch_to_hres函数将会被调用,已完成切换到NOHZ_MODE_HIGHRES高精度模式。好啦,真正的切换函数找到了,我们看一看它如何切换:

首先,它通过hrtimer_cpu_base中的hres_active字段判断该cpu是否已经切换至高精度模式,如果是则直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static int hrtimer_switch_to_hres(void)
{
	int i, cpu = smp_processor_id();
	struct hrtimer_cpu_base *base = &per_cpu(hrtimer_bases, cpu);
	unsigned long flags;

	if (base->hres_active)
		return 1;

接着,通过tick_init_highres函数接管tick_device关联的clock_event_device:

	local_irq_save(flags);

	if (tick_init_highres()) {
		local_irq_restore(flags);
		printk(KERN_WARNING "Could not switch to high resolution "
					"mode on CPU %d\n", cpu);
		return 0;
	}

tick_init_highres函数把tick_device切换到CLOCK_EVT_FEAT_ONESHOT模式,同时把clock_event_device的回调handler设置为hrtimer_interrupt,这样设置以后,tick_device的中断回调将由hrtimer_interrupt接管,hrtimer_interrupt在上面已经讨论过,它将完成高精度定时器的调度和到期处理。

接着,设置hres_active标志,以表明高精度模式已经切换,然后把3个时间基准系统的resolution字段设为KTIME_HIGH_RES:

1
2
3
	base->hres_active = 1;
	for (i = 0; i < HRTIMER_MAX_CLOCK_BASES; i++)
		base->clock_base[i].resolution = KTIME_HIGH_RES;

最后,因为tick_device被高精度定时器接管,它将不会再提供原有的tick事件机制,所以需要由高精度定时器系统模拟一个tick事件设备,继续为系统提供tick事件能力,这个工作由tick_setup_sched_timer函数完成。因为刚刚完成切换,tick_device的到期时间并没有被正确地设置为下一个到期定时器的时间,这里使用retrigger_next_event函数,传入参数NULL,使得tick_device立刻产生到期中断,hrtimer_interrupt被调用一次,然后下一个到期的定时器的时间会编程到tick_device中,从而完成了到高精度模式的切换:

1
2
3
4
5
	tick_setup_sched_timer();
	/* "Retrigger" the interrupt to get things going */
	retrigger_next_event(NULL);
	local_irq_restore(flags);
	return 1;

整个切换过程可以用下图表示:

图3.1 低精度模式切换至高精度模式

4. 模拟tick事件

根据上一节的讨论,当系统切换到高精度模式后,tick_device被高精度定时器系统接管,不再定期地产生tick事件,我们知道,到目前的版本为止(V3.4),内核还没有彻底废除jiffies机制,系统还是依赖定期到来的tick事件,供进程调度系统和时间更新等操作,大量存在的低精度定时器也仍然依赖于jiffies的计数,所以,尽管tick_device被接管,高精度定时器系统还是要想办法继续提供定期的tick事件。为了达到这一目的,内核使用了一个取巧的办法:既然高精度模式已经启用,可以定义一个hrtimer,把它的到期时间设定为一个jiffy的时间,当这个hrtimer到期时,在这个hrtimer的到期回调函数中,进行和原来的tick_device同样的操作,然后把该hrtimer的到期时间顺延一个jiffy周期,如此反复循环,完美地模拟了原有tick_device的功能。下面我们看看具体点代码是如何实现的。

在kernel/time/tick-sched.c中,内核定义了一个per_cpu全局变量:tick_cpu_sched,从而为每个cpu提供了一个tick_sched结构, 该结构主要用于管理NO_HZ配置下的tickless处理,因为模拟tick事件与tickless有很强的相关性,所以高精度定时器系统也利用了该结构的以下字段,用于完成模拟tick事件的操作:

1
2
3
4
5
6
struct tick_sched {
	struct hrtimer          sched_timer;
	unsigned long           check_clocks;
	enum tick_nohz_mode     nohz_mode;
		......
};

sched_timer就是要用于模拟tick事件的hrtimer,check_clock上面几节已经讨论过,用于notify系统通知hrtimer系统需要检查是否切换到高精度模式,nohz_mode则用于表示当前的工作模式。

上一节提到,用于切换至高精度模式的函数是hrtimer_switch_to_hres,在它的最后,调用了函数tick_setup_sched_timer,该函数的作用就是设置一个用于模拟tick事件的hrtimer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void tick_setup_sched_timer(void)
{
	struct tick_sched *ts = &__get_cpu_var(tick_cpu_sched);
	ktime_t now = ktime_get();

	/* 
	 * Emulate tick processing via per-CPU hrtimers: 
	 */
	hrtimer_init(&ts->sched_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
	ts->sched_timer.function = tick_sched_timer;

	/* Get the next period (per cpu) */
	hrtimer_set_expires(&ts->sched_timer, tick_init_jiffy_update());

	for (;;) {
		hrtimer_forward(&ts->sched_timer, now, tick_period);
		hrtimer_start_expires(&ts->sched_timer,
					  HRTIMER_MODE_ABS_PINNED);
		/* Check, if the timer was already in the past */
		if (hrtimer_active(&ts->sched_timer))
			break;
		now = ktime_get();
	}

#ifdef CONFIG_NO_HZ
	if (tick_nohz_enabled)
		ts->nohz_mode = NOHZ_MODE_HIGHRES;
#endif
}

该函数首先初始化该cpu所属的tick_sched结构中sched_timer字段,把该hrtimer的回调函数设置为tick_sched_timer,然后把它的到期时间设定为下一个jiffy时刻,返回前把工作模式设置为NOHZ_MODE_HIGHRES,表明是利用高精度模式实现NO_HZ。

接着我们关注一下hrtimer的回调函数tick_sched_timer,我们知道,系统中的jiffies计数,时间更新等是全局操作,在smp系统中,只有一个cpu负责该工作,所以在tick_sched_timer的一开始,先判断当前cpu是否负责更新jiffies和时间,如果是,则执行更新操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static enum hrtimer_restart tick_sched_timer(struct hrtimer *timer)
{
		......

#ifdef CONFIG_NO_HZ
	if (unlikely(tick_do_timer_cpu == TICK_DO_TIMER_NONE))
		tick_do_timer_cpu = cpu;
#endif

	/* Check, if the jiffies need an update */
	if (tick_do_timer_cpu == cpu)
		tick_do_update_jiffies64(now);

然后,利用regs指针确保当前是在中断上下文中,然后调用update_process_timer:

	if (regs) {
				   ......
		update_process_times(user_mode(regs));
		......
	}

最后,把hrtimer的到期时间推进一个tick周期,返回HRTIMER_RESTART表明该hrtimer需要再次启动,以便产生下一个tick事件。

	hrtimer_forward(timer, now, tick_period);

	return HRTIMER_RESTART;
}

关于update_process_times,如果你你感兴趣,回看一下本系列关于clock_event_device的那一章:Linux时间子系统之四:定时器的引擎:clock_event_device中的第5小节,对比一下模拟tick事件的hrtimer的回调函数tick_sched_timer和切换前tick_device的回调函数tick_handle_periodic,它们是如此地相像,实际上,它们几乎完成了一样的工作。

Linux时间子系统之五:低分辨率定时器的原理和实现

http://blog.csdn.net/DroidPhone/article/details/8051405

利用定时器,我们可以设定在未来的某一时刻,触发一个特定的事件。所谓低分辨率定时器,是指这种定时器的计时单位基于jiffies值的计数,也就是说,它的精度只有1/HZ,假如你的内核配置的HZ是1000,那意味着系统中的低分辨率定时器的精度就是1ms。早期的内核版本中,内核并不支持高精度定时器,理所当然只能使用这种低分辨率定时器,我们有时候把这种基于HZ的定时器机制成为时间轮:time wheel。虽然后来出现了高分辨率定时器,但它只是内核的一个可选配置项,所以直到目前最新的内核版本,这种低分辨率定时器依然被大量地使用着。

1. 定时器的使用方法

在讨论定时器的实现原理之前,我们先看看如何使用定时器。要在内核编程中使用定时器,首先我们要定义一个time_list结构,该结构在include/Linux/timer.h中定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
struct timer_list {
	/* 
	 * All fields that change during normal runtime grouped to the 
	 * same cacheline 
	 */
	struct list_head entry;
	unsigned long expires;
	struct tvec_base *base;

	void (*function)(unsigned long);
	unsigned long data;

	int slack;
		......
};

entry  字段用于把一组定时器组成一个链表,至于内核如何对定时器进行分组,我们会在后面进行解释。

expires  字段指出了该定时器的到期时刻,也就是期望定时器到期时刻的jiffies计数值。

base  每个cpu拥有一个自己的用于管理定时器的tvec_base结构,该字段指向该定时器所属的cpu所对应tvec_base结构。

function  字段是一个函数指针,定时器到期时,系统将会调用该回调函数,用于响应该定时器的到期事件。

data  该字段用于上述回调函数的参数。

slack  对有些对到期时间精度不太敏感的定时器,到期时刻允许适当地延迟一小段时间,该字段用于计算每次延迟的HZ数。

要定义一个timer_list,我们可以使用静态和动态两种办法,静态方法使用DEFINE_TIMER宏:

1
#define DEFINE_TIMER(_name, _function, _expires, _data)

该宏将得到一个名字为name,并分别用function,expires,data参数填充timer_list的相关字段。

如果要使用动态的方法,则可以自己声明一个timer_list结构,然后手动初始化它的各个字段:

1
2
3
4
5
6
struct timer_list timer;
......
init_timer(&timer);
timer.function = _function;
timer.expires = _expires;
timer.data = _data;

要激活一个定时器,我们只要调用add_timer即可:

1
add_timer(&timer);

要修改定时器的到期时间,我们只要调用mod_timer即可:

1
mod_timer(&timer, jiffies+50);

要移除一个定时器,我们只要调用del_timer即可:

1
del_timer(&timer);

定时器系统还提供了以下这些API供我们使用:

1
2
3
4
5
void add_timer_on(struct timer_list *timer, int cpu);  // 在指定的cpu上添加定时器
int mod_timer_pending(struct timer_list *timer, unsigned long expires);  //  只有当timer已经处在激活状态时,才修改timer的到期时刻
int mod_timer_pinned(struct timer_list *timer, unsigned long expires);  //  当
void set_timer_slack(struct timer_list *time, int slack_hz);  //  设定timer允许的到期时刻的最大延迟,用于对精度不敏感的定时器
int del_timer_sync(struct timer_list *timer);  //  如果该timer正在被处理中,则等待timer处理完成才移除该timer

2. 定时器的软件架构

低分辨率定时器是基于HZ来实现的,也就是说,每个tick周期,都有可能有定时器到期,关于tick如何产生,请参考:Linux时间子系统之四:定时器的引擎:clock_event_device。系统中有可能有成百上千个定时器,难道在每个tick中断中遍历一下所有的定时器,检查它们是否到期?内核当然不会使用这么笨的办法,它使用了一个更聪明的办法:按定时器的到期时间对定时器进行分组。因为目前的多核处理器使用越来越广泛,连智能手机的处理器动不动就是4核心,内核对多核处理器有较好的支持,低分辨率定时器在实现时也充分地考虑了多核处理器的支持和优化。为了较好地利用cache line,也为了避免cpu之间的互锁,内核为多核处理器中的每个cpu单独分配了管理定时器的相关数据结构和资源,每个cpu独立地管理属于自己的定时器。

2.1 定时器的分组

首先,内核为每个cpu定义了一个tvec_base结构指针:

1
static DEFINE_PER_CPU(struct tvec_base *, tvec_bases) = &boot_tvec_bases;

tvec_base结构的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct tvec_base {
	spinlock_t lock;
	struct timer_list *running_timer;
	unsigned long timer_jiffies;
	unsigned long next_timer;
	struct tvec_root tv1;
	struct tvec tv2;
	struct tvec tv3;
	struct tvec tv4;
	struct tvec tv5;
} ____cacheline_aligned;

running_timer  该字段指向当前cpu正在处理的定时器所对应的timer_list结构。

timer_jiffies  该字段表示当前cpu定时器所经历过的jiffies数,大多数情况下,该值和jiffies计数值相等,当cpu的idle状态连续持续了多个jiffies时间时,当退出idle状态时,jiffies计数值就会大于该字段,在接下来的tick中断后,定时器系统会让该字段的值追赶上jiffies值。

next_timer  该字段指向该cpu下一个即将到期的定时器。

tv1--tv5  这5个字段用于对定时器进行分组,实际上,tv1--tv5都是一个链表数组,其中tv1的数组大小为TVR_SIZE, tv2 tv3 tv4 tv5的数组大小为TVN_SIZE,根据CONFIG_BASE_SMALL配置项的不同,它们有不同的大小:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define TVN_BITS (CONFIG_BASE_SMALL ? 4 : 6)
#define TVR_BITS (CONFIG_BASE_SMALL ? 6 : 8)
#define TVN_SIZE (1 << TVN_BITS)
#define TVR_SIZE (1 << TVR_BITS)
#define TVN_MASK (TVN_SIZE - 1)
#define TVR_MASK (TVR_SIZE - 1)

struct tvec {
	struct list_head vec[TVN_SIZE];
};

struct tvec_root {
	struct list_head vec[TVR_SIZE];
};

默认情况下,没有使能CONFIG_BASE_SMALL,TVR_SIZE的大小是256,TVN_SIZE的大小则是64,当需要节省内存空间时,也可以使能CONFIG_BASE_SMALL,这时TVR_SIZE的大小是64,TVN_SIZE的大小则是16,以下的讨论我都是基于没有使能CONFIG_BASE_SMALL的情况。当有一个新的定时器要加入时,系统根据定时器到期的jiffies值和timer_jiffies字段的差值来决定该定时器被放入tv1至tv5中的哪一个数组中,最终,系统中所有的定时器的组织结构如下图所示:

图 2.1.1 定时器在系统中的组织结构

2.2 定时器的添加

要加入一个新的定时器,我们可以通过api函数add_timer或mod_timer来完成,最终的工作会交由internal_add_timer函数来处理。该函数按以下步骤进行处理:

计算定时器到期时间和所属cpu的tvec_base结构中的timer_jiffies字段的差值,记为idx;

根据idx的值,选择该定时器应该被放到tv1–tv5中的哪一个链表数组中,可以认为tv1-tv5分别占据一个32位数的不同比特位,tv1占据最低的8位,tv2占据紧接着的6位,然后tv3再占位,以此类推,最高的6位分配给tv5。最终的选择规则如下表所示:

1
2
3
4
5
6
链表数组     idx范围
tv1   0-255(2^8)
tv2   256--16383(2^14)
tv3   16384--1048575(2^20)
tv4   1048576--67108863(2^26)
tv5   67108864--4294967295(2^32)

确定链表数组后,接着要确定把该定时器放入数组中的哪一个链表中,如果时间差idx小于256,按规则要放入tv1中,因为tv1包含了256个链表,所以可以简单地使用timer_list.expires的低8位作为数组的索引下标,把定时器链接到tv1中相应的链表中即可。如果时间差idx的值在256–18383之间,则需要把定时器放入tv2中,同样的,使用timer_list.expires的8–14位作为数组的索引下标,把定时器链接到tv2中相应的链表中,。定时器要加入tv3 tv4 tv5使用同样的原理。经过这样分组后的定时器,在后续的tick事件中,系统可以很方便地定位并取出相应的到期定时器进行处理。以上的讨论都体现在internal_add_timer的代码中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static void internal_add_timer(struct tvec_base *base, struct timer_list *timer)
{
	unsigned long expires = timer->expires;
	unsigned long idx = expires - base->timer_jiffies;
	struct list_head *vec;

	if (idx < TVR_SIZE) {
		int i = expires & TVR_MASK;
		vec = base->tv1.vec + i;
	} else if (idx < 1 << (TVR_BITS + TVN_BITS)) {
		int i = (expires >> TVR_BITS) & TVN_MASK;
		vec = base->tv2.vec + i;
	} else if (idx < 1 << (TVR_BITS + 2 * TVN_BITS)) {
		int i = (expires >> (TVR_BITS + TVN_BITS)) & TVN_MASK;
		vec = base->tv3.vec + i;
	} else if (idx < 1 << (TVR_BITS + 3 * TVN_BITS)) {
		int i = (expires >> (TVR_BITS + 2 * TVN_BITS)) & TVN_MASK;
		vec = base->tv4.vec + i;
	} else if ((signed long) idx < 0) {
				......
	} else {
				......
		i = (expires >> (TVR_BITS + 3 * TVN_BITS)) & TVN_MASK;
		vec = base->tv5.vec + i;
	}
	list_add_tail(&timer->entry, vec);
}
2.2 定时器的到期处理

经过2.1节的处理后,系统中的定时器按到期时间有规律地放置在tv1–tv5各个链表数组中,其中tv1中放置着在接下来的256个jiffies即将到期的定时器列表,需要注意的是,并不是tv1.vec[0]中放置着马上到期的定时器列表,tv1.vec[1]中放置着将在jiffies+1到期的定时器列表。因为base.timer_jiffies的值一直在随着系统的运行而动态地增加,原则上是每个tick事件会加1,base.timer_jiffies代表者该cpu定时器系统当前时刻,定时器也是动态地加入头256个链表tv1中,按2.1节的讨论,定时器加入tv1中使用的下标索引是定时器到期时间expires的低8位,所以假设当前的base.timer_jiffies值是0x34567826,则马上到期的定时器是在tv1.vec[0x26]中,如果这时候系统加入一个在jiffies值0x34567828到期的定时器,他将会加入到tv1.vec[0x28]中,运行两个tick后,base.timer_jiffies的值会变为0x34567828,很显然,在每次tick事件中,定时器系统只要以base.timer_jiffies的低8位作为索引,取出tv1中相应的链表,里面正好包含了所有在该jiffies值到期的定时器列表。

那什么时候处理tv2–tv5中的定时器?每当base.timer_jiffies的低8位为0值时,这表明base.timer_jiffies的第8-13位有进位发生,这6位正好代表着tv2,这时只要按base.timer_jiffies的第8-13位的值作为下标,移出tv2中对应的定时器链表,然后用internal_add_timer把它们从新加入到定时器系统中来,因为这些定时器一定会在接下来的256个tick期间到期,所以它们肯定会被加入到tv1数组中,这样就完成了tv2往tv1迁移的过程。同样地,当base.timer_jiffies的第8-13位为0时,这表明base.timer_jiffies的第14-19位有进位发生,这6位正好代表着tv3,按base.timer_jiffies的第14-19位的值作为下标,移出tv3中对应的定时器链表,然后用internal_add_timer把它们从新加入到定时器系统中来,显然它们会被加入到tv2中,从而完成tv3到tv2的迁移,tv4,tv5的处理可以以此作类推。具体迁移的代码如下,参数index为事先计算好的高一级tv的需要迁移的数组索引:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int cascade(struct tvec_base *base, struct tvec *tv, int index)
{
	/* cascade all the timers from tv up one level */
	struct timer_list *timer, *tmp;
	struct list_head tv_list;

	list_replace_init(tv->vec + index, &tv_list);  //  移除需要迁移的链表

	/* 
	 * We are removing _all_ timers from the list, so we 
	 * don't have to detach them individually. 
	 */
	list_for_each_entry_safe(timer, tmp, &tv_list, entry) {
		BUG_ON(tbase_get_base(timer->base) != base);
				//  重新加入到定时器系统中,实际上将会迁移到下一级的tv数组中
		internal_add_timer(base, timer);  
	}

	return index;
}

每个tick事件到来时,内核会在tick定时中断处理期间激活定时器软中断:TIMER_SOFTIRQ,关于软件中断,请参考另一篇博文:Linux中断(interrupt)子系统之五:软件中断(softIRQ。TIMER_SOFTIRQ的执行函数是__run_timers,它实现了本节讨论的逻辑,取出tv1中到期的定时器,执行定时器的回调函数,由此可见,低分辨率定时器的回调函数是执行在软件中断上下文中的,这点在写定时器的回调函数时需要注意。__run_timers的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static inline void __run_timers(struct tvec_base *base)
{
	struct timer_list *timer;

	spin_lock_irq(&base->lock);
		/* 同步jiffies,在NO_HZ情况下,base->timer_jiffies可能落后不止一个tick  */
	while (time_after_eq(jiffies, base->timer_jiffies)) {  
		struct list_head work_list;
		struct list_head *head = &work_list;
				/*  计算到期定时器链表在tv1中的索引  */
		int index = base->timer_jiffies & TVR_MASK;  

		/* 
		 * /*  tv2--tv5定时器列表迁移处理  */
		 */
		if (!index &&
			(!cascade(base, &base->tv2, INDEX(0))) &&              
				(!cascade(base, &base->tv3, INDEX(1))) &&      
					!cascade(base, &base->tv4, INDEX(2)))  
			cascade(base, &base->tv5, INDEX(3));  
				/*  该cpu定时器系统运行时间递增一个tick  */                 
		++base->timer_jiffies;  
				/*  取出到期的定时器链表  */                                       
		list_replace_init(base->tv1.vec + index, &work_list);
				/*  遍历所有的到期定时器  */          
		while (!list_empty(head)) {                                    
			void (*fn)(unsigned long);
			unsigned long data;

			timer = list_first_entry(head, struct timer_list,entry);
			fn = timer->function;
			data = timer->data;

			timer_stats_account_timer(timer);

			base->running_timer = timer;    /*  标记正在处理的定时器  */
			detach_timer(timer, 1);

			spin_unlock_irq(&base->lock);
			call_timer_fn(timer, fn, data);  /*  调用定时器的回调函数  */
			spin_lock_irq(&base->lock);
		}
	}
	base->running_timer = NULL;
	spin_unlock_irq(&base->lock);
}

通过上面的讨论,我们可以发现,内核的低分辨率定时器的实现非常精妙,既实现了大量定时器的管理,又实现了快速的O(1)查找到期定时器的能力,利用巧妙的数组结构,使得只需在间隔256个tick时间才处理一次迁移操作,5个数组就好比是5个齿轮,它们随着base->timer_jifffies的增长而不停地转动,每次只需处理第一个齿轮的某一个齿节,低一级的齿轮转动一圈,高一级的齿轮转动一个齿,同时自动把即将到期的定时器迁移到上一个齿轮中,所以低分辨率定时器通常又被叫做时间轮:time wheel。事实上,它的实现是一个很好的空间换时间软件算法。

3. 定时器软件中断

系统初始化时,start_kernel会调用定时器系统的初始化函数init_timers:

1
2
3
4
5
6
7
8
9
10
11
void __init init_timers(void)
{      
	int err = timer_cpu_notify(&timers_nb, (unsigned long)CPU_UP_PREPARE, 
				(void *)(long)smp_processor_id());

	init_timer_stats();

	BUG_ON(err != NOTIFY_OK);
	register_cpu_notifier(&timers_nb);  /* 注册cpu notify,以便在hotplug时在cpu之间进行定时器的迁移 */
	open_softirq(TIMER_SOFTIRQ, run_timer_softirq);
}

可见,open_softirq把run_timer_softirq注册为TIMER_SOFTIRQ的处理函数,另外,当cpu的每个tick事件到来时,在事件处理中断中,update_process_times会被调用,该函数会进一步调用run_local_timers,run_local_timers会触发TIMER_SOFTIRQ软中断:

1
2
3
4
5
void run_local_timers(void)
{
	hrtimer_run_queues();
	raise_softirq(TIMER_SOFTIRQ);
}

TIMER_SOFTIRQ的处理函数是run_timer_softirq:

1
2
3
4
5
6
7
8
9
static void run_timer_softirq(struct softirq_action *h)
{
	struct tvec_base *base = __this_cpu_read(tvec_bases);

	hrtimer_run_pending();

	if (time_after_eq(jiffies, base->timer_jiffies))
		__run_timers(base);
}

好啦,终于看到__run_timers函数了,2.2节已经介绍过,正是这个函数完成了对到期定时器的处理工作,也完成了时间轮的不停转动。

Linux时间子系统之四:定时器的引擎:clock_event_device

http://blog.csdn.net/DroidPhone/article/details/8017604

早期的内核版本中,进程的调度基于一个称之为tick的时钟滴答,通常使用时钟中断来定时地产生tick信号,每次tick定时中断都会进行进程的统计和调度,并对tick进行计数,记录在一个jiffies变量中,定时器的设计也是基于jiffies。这时候的内核代码中,几乎所有关于时钟的操作都是在machine级的代码中实现,很多公共的代码要在每个平台上重复实现。随后,随着通用时钟框架的引入,内核需要支持高精度的定时器,为此,通用时间框架为定时器硬件定义了一个标准的接口:clock_event_device,machine级的代码只要按这个标准接口实现相应的硬件控制功能,剩下的与平台无关的特性则统一由通用时间框架层来实现。

1. 时钟事件软件架构

本系列文章的第一节中,我们曾经讨论了时钟源设备:clocksource,现在又来一个时钟事件设备:clock_event_device,它们有何区别?看名字,好像都是给系统提供时钟的设备,实际上,clocksource不能被编程,没有产生事件的能力,它主要被用于timekeeper来实现对真实时间进行精确的统计,而clock_event_device则是可编程的,它可以工作在周期触发或单次触发模式,系统可以对它进行编程,以确定下一次事件触发的时间,clock_event_device主要用于实现普通定时器和高精度定时器,同时也用于产生tick事件,供给进程调度子系统使用。时钟事件设备与通用时间框架中的其他模块的关系如下图所示:

与clocksource一样,系统中可以存在多个clock_event_device,系统会根据它们的精度和能力,选择合适的clock_event_device对系统提供时钟事件服务。在smp系统中,为了减少处理器间的通信开销,基本上每个cpu都会具备一个属于自己的本地clock_event_device,独立地为该cpu提供时钟事件服务,smp中的每个cpu基于本地的clock_event_device,建立自己的tick_device,普通定时器和高精度定时器。

在软件架构上看,clock_event_device被分为了两层,与硬件相关的被放在了machine层,而与硬件无关的通用代码则被集中到了通用时间框架层,这符合内核对软件的设计需求,平台的开发者只需实现平台相关的接口即可,无需关注复杂的上层时间框架。

tick_device是基于clock_event_device的进一步封装,用于代替原有的时钟滴答中断,给内核提供tick事件,以完成进程的调度和进程信息统计,负载平衡和时间更新等操作。

2. 时钟事件设备相关数据结构

2.1 struct clock_event_device

时钟事件设备的核心数据结构是clock_event_device结构,它代表着一个时钟硬件设备,该设备就好像是一个具有事件触发能力(通常就是指中断)的clocksource,它不停地计数,当计数值达到预先编程设定的数值那一刻,会引发一个时钟事件中断,继而触发该设备的事件处理回调函数,以完成对时钟事件的处理。clock_event_device结构的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
struct clock_event_device {
	void            (*event_handler)(struct clock_event_device *);
	int         (*set_next_event)(unsigned long evt,
						  struct clock_event_device *);
	int         (*set_next_ktime)(ktime_t expires,
						  struct clock_event_device *);
	ktime_t         next_event;
	u64         max_delta_ns;
	u64         min_delta_ns;
	u32         mult;
	u32         shift;
	enum clock_event_mode   mode;
	unsigned int        features;
	unsigned long       retries;

	void            (*broadcast)(const struct cpumask *mask);
	void            (*set_mode)(enum clock_event_mode mode,
						struct clock_event_device *);
	unsigned long       min_delta_ticks;
	unsigned long       max_delta_ticks;

	const char      *name;
	int         rating;
	int         irq;
	const struct cpumask    *cpumask;
	struct list_head    list;
} ____cacheline_aligned;

event_handler  该字段是一个回调函数指针,通常由通用框架层设置,在时间中断到来时,machine底层的的中断服务程序会调用该回调,框架层利用该回调实现对时钟事件的处理。

set_next_event  设置下一次时间触发的时间,使用类似于clocksource的cycle计数值(离现在的cycle差值)作为参数。

set_next_ktime  设置下一次时间触发的时间,直接使用ktime时间作为参数。

max_delta_ns  可设置的最大时间差,单位是纳秒。

min_delta_ns  可设置的最小时间差,单位是纳秒。

mult shift  与clocksource中的类似,只不过是用于把纳秒转换为cycle。

mode  该时钟事件设备的工作模式,两种主要的工作模式分别是:
	CLOCK_EVT_MODE_PERIODIC  周期触发模式,设置后按给定的周期不停地触发事件;
	CLOCK_EVT_MODE_ONESHOT  单次触发模式,只在设置好的触发时刻触发一次;

set_mode  函数指针,用于设置时钟事件设备的工作模式。

rating  表示该设备的精度等级。

list  系统中注册的时钟事件设备用该字段挂在全局链表变量clockevent_devices上。
2.2 全局变量clockevent_devices

系统中所有注册的clock_event_device都会挂在该链表下面,它在kernel/time/clockevents.c中定义:

1
static LIST_HEAD(clockevent_devices);
2.3 全局变量clockevents_chain

通用时间框架初始化时会注册一个通知链(NOTIFIER),当系统中的时钟时间设备的状态发生变化时,利用该通知链通知系统的其它模块。

1
2
/* Notification for clock events */
static RAW_NOTIFIER_HEAD(clockevents_chain);

3. clock_event_device的初始化和注册

每一个machine,都要定义一个自己的machine_desc结构,该结构定义了该machine的一些最基本的特性,其中需要设定一个sys_timer结构指针,machine级的代码负责定义sys_timer结构,sys_timer的声明很简单:

1
2
3
4
5
6
7
8
struct sys_timer {
	void            (*init)(void);
	void            (*suspend)(void);
	void            (*resume)(void);
#ifdef CONFIG_ARCH_USES_GETTIMEOFFSET
	unsigned long       (*offset)(void);
#endif
};

通常,我们至少要定义它的init字段,系统初始化阶段,该init回调会被调用,该init回调函数的主要作用就是完成系统中的clocksource和clock_event_device的硬件初始化工作,以samsung的exynos4为例,在V3.4内核的代码树中,machine_desc的定义如下:

1
2
3
4
5
6
7
8
9
10
11
MACHINE_START(SMDK4412, "SMDK4412")
	/* Maintainer: Kukjin Kim <kgene.kim@samsung.com> */
	/* Maintainer: Changhwan Youn <chaos.youn@samsung.com> */
	.atag_offset    = 0x100,
	.init_irq   = exynos4_init_irq,
	.map_io     = smdk4x12_map_io,
	.handle_irq = gic_handle_irq,
	.init_machine   = smdk4x12_machine_init,
	.timer      = &exynos4_timer,
	.restart    = exynos4_restart,
MACHINE_END

定义的sys_timer是exynos4_timer,它的定义和init回调定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void __init exynos4_timer_init(void)
{
	if (soc_is_exynos4210())
		mct_int_type = MCT_INT_SPI;
	else
		mct_int_type = MCT_INT_PPI;

	exynos4_timer_resources();
	exynos4_clocksource_init();
	exynos4_clockevent_init();
}

struct sys_timer exynos4_timer = {
	.init       = exynos4_timer_init,
};

exynos4_clockevent_init函数显然是初始化和注册clock_event_device的合适时机,在这里,它注册了一个rating为250的clock_event_device,并把它指定给cpu0:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static struct clock_event_device mct_comp_device = {
	.name       = "mct-comp",
	.features       = CLOCK_EVT_FEAT_PERIODIC | CLOCK_EVT_FEAT_ONESHOT,
	.rating     = 250,
	.set_next_event = exynos4_comp_set_next_event,
	.set_mode   = exynos4_comp_set_mode,
};
......
static void exynos4_clockevent_init(void)
{
	clockevents_calc_mult_shift(&mct_comp_device, clk_rate, 5);
		......
	mct_comp_device.cpumask = cpumask_of(0);
	clockevents_register_device(&mct_comp_device);

	setup_irq(EXYNOS4_IRQ_MCT_G0, &mct_comp_event_irq);
}

因为这个阶段其它cpu核尚未开始工作,所以该clock_event_device也只是在启动阶段给系统提供服务,实际上,因为exynos4是一个smp系统,具备2-4个cpu核心,前面说过,smp系统中,通常会使用各个cpu的本地定时器来为每个cpu单独提供时钟事件服务,继续翻阅代码,在系统初始化的后段,kernel_init会被调用,它会调用smp_prepare_cpus,其中会调用percpu_timer_setup函数,在arch/arm/kernel/smp.c中,为每个cpu定义了一个clock_event_device:

1
2
3
4
/* 
 * Timer (local or broadcast) support 
 */
static DEFINE_PER_CPU(struct clock_event_device, percpu_clockevent);

percpu_timer_setup最终会调用exynos4_local_timer_setup函数完成对本地clock_event_device的初始化工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int __cpuinit exynos4_local_timer_setup(struct clock_event_device *evt)
{
	......
	evt->name = mevt->name;
	evt->cpumask = cpumask_of(cpu);
	evt->set_next_event = exynos4_tick_set_next_event;
	evt->set_mode = exynos4_tick_set_mode;
	evt->features = CLOCK_EVT_FEAT_PERIODIC | CLOCK_EVT_FEAT_ONESHOT;
	evt->rating = 450;

	clockevents_calc_mult_shift(evt, clk_rate / (TICK_BASE_CNT + 1), 5);
	......
	clockevents_register_device(evt);
	......
	enable_percpu_irq(EXYNOS_IRQ_MCT_LOCALTIMER, 0);
	......
	return 0;
}

由此可见,每个cpu的本地clock_event_device的rating是450,比启动阶段的250要高,显然,之前注册给cpu0的精度要高,系统会用本地clock_event_device替换掉原来分配给cpu0的clock_event_device,至于怎么替换?我们先停一停,到这里我们一直在讨论machine级别的初始化和注册,让我们回过头来,看看框架层的初始化。在继续之前,让我们看看整个clock_event_device的初始化的调用序列图:

图3.1 clock_event_device的系统初始化

由上面的图示可以看出,框架层的初始化步骤很简单,又start_kernel开始,调用tick_init,它位于kernel/time/tick-common.c中,也只是简单地调用clockevents_register_notifier,同时把类型为notifier_block的tick_notifier作为参数传入,回看2.3节,clockevents_register_notifier注册了一个通知链,这样,当系统中的clock_event_device状态发生变化时(新增,删除,挂起,唤醒等等),tick_notifier中的notifier_call字段中设定的回调函数tick_notify就会被调用。接下来start_kernel调用了time_init函数,该函数通常定义在体系相关的代码中,正如前面所讨论的一样,它主要完成machine级别对时钟系统的初始化工作,最终通过clockevents_register_device注册系统中的时钟事件设备,把每个时钟时间设备挂在clockevent_device全局链表上,最后通过clockevent_do_notify触发框架层事先注册好的通知链,其实就是调用了tick_notify函数,我们主要关注CLOCK_EVT_NOTIFY_ADD通知,其它通知请自行参考代码,下面是tick_notify的简化版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int tick_notify(struct notifier_block *nb, unsigned long reason,
				   void *dev)
{
	switch (reason) {

	case CLOCK_EVT_NOTIFY_ADD:
		return tick_check_new_device(dev);

	case CLOCK_EVT_NOTIFY_BROADCAST_ON:
	case CLOCK_EVT_NOTIFY_BROADCAST_OFF:
	case CLOCK_EVT_NOTIFY_BROADCAST_FORCE:
			......
	case CLOCK_EVT_NOTIFY_BROADCAST_ENTER:
	case CLOCK_EVT_NOTIFY_BROADCAST_EXIT:
			......
	case CLOCK_EVT_NOTIFY_CPU_DYING:
			......
	case CLOCK_EVT_NOTIFY_CPU_DEAD:
			......
	case CLOCK_EVT_NOTIFY_SUSPEND:
			......
	case CLOCK_EVT_NOTIFY_RESUME:
			......
	}

	return NOTIFY_OK;
}

可见,对于新注册的clock_event_device,会发出CLOCK_EVT_NOTIFY_ADD通知,最终会进入函数:tick_check_new_device,这个函数比对当前cpu所使用的与新注册的clock_event_device之间的特性,如果认为新的clock_event_device更好,则会进行切换工作。下一节将会详细的讨论该函数。到这里,每个cpu已经有了自己的clock_event_device,在这以后,框架层的代码会根据内核的配置项(CONFIG_NO_HZ、CONFIG_HIGH_RES_TIMERS),对注册的clock_event_device进行不同的设置,从而为系统的tick和高精度定时器提供服务,这些内容我们留在本系列的后续文章进行讨论。

4. tick_device

当内核没有配置成支持高精度定时器时,系统的tick由tick_device产生,tick_device其实是clock_event_device的简单封装,它内嵌了一个clock_event_device指针和它的工作模式:

1
2
3
4
struct tick_device {
	struct clock_event_device *evtdev;
	enum tick_device_mode mode;
};

在kernel/time/tick-common.c中,定义了一个per-cpu的tick_device全局变量,tick_cpu_device:

1
2
3
4
/* 
 * Tick devices 
 */
DEFINE_PER_CPU(struct tick_device, tick_cpu_device);

前面曾经说过,当machine的代码为每个cpu注册clock_event_device时,通知回调函数tick_notify会被调用,进而进入tick_check_new_device函数,下面让我们看看该函数如何工作,首先,该函数先判断注册的clock_event_device是否可用于本cpu,然后从per-cpu变量中取出本cpu的tick_device:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static int tick_check_new_device(struct clock_event_device *newdev)
{
		......
	cpu = smp_processor_id();
	if (!cpumask_test_cpu(cpu, newdev->cpumask))
		goto out_bc;

	td = &per_cpu(tick_cpu_device, cpu);
	curdev = td->evtdev;

如果不是本地clock_event_device,会做进一步的判断:如果不能把irq绑定到本cpu,则放弃处理,如果本cpu已经有了一个本地clock_event_device,也放弃处理:

	if (!cpumask_equal(newdev->cpumask, cpumask_of(cpu))) {
			   ......
		if (!irq_can_set_affinity(newdev->irq))
			goto out_bc;
			   ......
		if (curdev && cpumask_equal(curdev->cpumask, cpumask_of(cpu)))
			goto out_bc;
	}

反之,如果本cpu已经有了一个clock_event_device,则根据是否支持单触发模式和它的rating值,决定是否替换原来旧的clock_event_device:

	if (curdev) {
		if ((curdev->features & CLOCK_EVT_FEAT_ONESHOT) &&
			!(newdev->features & CLOCK_EVT_FEAT_ONESHOT))
			goto out_bc;  // 新的不支持单触发,但旧的支持,所以不能替换
		if (curdev->rating >= newdev->rating)
			goto out_bc;  // 旧的比新的精度高,不能替换
	}

在这些判断都通过之后,说明或者来cpu还没有绑定tick_device,或者是新的更好,需要替换:

	if (tick_is_broadcast_device(curdev)) {
		clockevents_shutdown(curdev);
		curdev = NULL;
	}
	clockevents_exchange_device(curdev, newdev);
	tick_setup_device(td, newdev, cpu, cpumask_of(cpu));

上面的tick_setup_device函数负责重新绑定当前cpu的tick_device和新注册的clock_event_device,如果发现是当前cpu第一次注册tick_device,就把它设置为TICKDEV_MODE_PERIODIC模式,如果是替换旧的tick_device,则根据新的tick_device的特性,设置为TICKDEV_MODE_PERIODIC或TICKDEV_MODE_ONESHOT模式。可见,在系统的启动阶段,tick_device是工作在周期触发模式的,直到框架层在合适的时机,才会开启单触发模式,以便支持NO_HZ和HRTIMER。

5. tick事件的处理–最简单的情况

clock_event_device最基本的应用就是实现tick_device,然后给系统定期地产生tick事件,通用时间框架对clock_event_device和tick_device的处理相当复杂,因为涉及配置项:CONFIG_NO_HZ和CONFIG_HIGH_RES_TIMERS的组合,两个配置项就有4种组合,这四种组合的处理都有所不同,所以这里我先只讨论最简单的情况:

1
2
CONFIG_NO_HZ == 0;
CONFIG_HIGH_RES_TIMERS == 0;

在这种配置模式下,我们回到上一节的tick_setup_device函数的最后:

1
2
3
4
if (td->mode == TICKDEV_MODE_PERIODIC)
	tick_setup_periodic(newdev, 0);
else
	tick_setup_oneshot(newdev, handler, next_event);

因为启动期间,第一个注册的tick_device必然工作在TICKDEV_MODE_PERIODIC模式,所以tick_setup_periodic会设置clock_event_device的事件回调字段event_handler为tick_handle_periodic,工作一段时间后,就算有新的支持TICKDEV_MODE_ONESHOT模式的clock_event_device需要替换,再次进入tick_setup_device函数,tick_setup_oneshot的handler参数也是之前设置的tick_handle_periodic函数,所以我们考察tick_handle_periodic即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void tick_handle_periodic(struct clock_event_device *dev)
{
	int cpu = smp_processor_id();
	ktime_t next;

	tick_periodic(cpu);

	if (dev->mode != CLOCK_EVT_MODE_ONESHOT)
		return;

	next = ktime_add(dev->next_event, tick_period);
	for (;;) {
		if (!clockevents_program_event(dev, next, false))
			return;
		if (timekeeping_valid_for_hres())
			tick_periodic(cpu);
		next = ktime_add(next, tick_period);
	}
}

该函数首先调用tick_periodic函数,完成tick事件的所有处理,如果是周期触发模式,处理结束,如果工作在单触发模式,则计算并设置下一次的触发时刻,这里用了一个循环,是为了防止当该函数被调用时,clock_event_device中的计时实际上已经经过了不止一个tick周期,这时候,tick_periodic可能被多次调用,使得jiffies和时间可以被正确地更新。tick_periodic的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static void tick_periodic(int cpu)
{
	if (tick_do_timer_cpu == cpu) {
		write_seqlock(&xtime_lock);

		/* Keep track of the next tick event */
		tick_next_period = ktime_add(tick_next_period, tick_period);

		do_timer(1);
		write_sequnlock(&xtime_lock);
	}

	update_process_times(user_mode(get_irq_regs()));
	profile_tick(CPU_PROFILING);
}

如果当前cpu负责更新时间,则通过do_timer进行以下操作:

更新jiffies_64变量;
更新墙上时钟;
每10个tick,更新一次cpu的负载信息;

调用update_peocess_times,完成以下事情:

更新进程的时间统计信息;
触发TIMER_SOFTIRQ软件中断,以便系统处理传统的低分辨率定时器;
检查rcu的callback;
通过scheduler_tick触发调度系统进行进程统计和调度工作;