前言
定时器的实现通常使用有序数据结构来实现,一般通过红黑树、跳表、最小堆、时间轮来实现。
其中又以最小堆最容易实现,红黑树最难实现。
Skynet 选择时间轮的原因估计是多线程,时间轮的插入平均复杂度比其他几个都要低,非常适用于多线程场景。
本篇就简单剖析一下 Skynet 实现的 TimingWheel。以下代码为方便阅读有删减。
时间轮
首先实现上是采用数组 + 链表的形式进行实现。
先定义了一个链表,存放了过期时间,从 *tail
可以看出,此结构为尾插法,毕竟后插入的定时器后执行,很合理。
1 2 3 4 5 6 7 8 9
| struct timer_node { struct timer_node *next; uint32_t expire; };
struct link_list { struct timer_node head; struct timer_node *tail; };
|
时间轮数据结构中含有一把自旋锁,时间轮在框架中会被多线程访问,又由于插入的时候冲突的粒度比较小,所以用自旋锁而不是互斥锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| #define TIME_NEAR_SHIFT 12 #define TIME_NEAR (1 << TIME_NEAR_SHIFT) #define TIME_LEVEL_SHIFT 5 #define TIME_LEVEL (1 << TIME_LEVEL_SHIFT) #define TIME_NEAR_MASK (TIME_NEAR-1) #define TIME_LEVEL_MASK (TIME_LEVEL-1)
struct timer { struct link_list near[TIME_NEAR]; struct link_list t[4][TIME_LEVEL]; struct spinlock lock; uint32_t time; uint64_t current; uint64_t current_point; };
|
从中可以看出,Skynet 的时间轮有5个层级,其中会执行的那层 为 near
数组,其他的4层均不会被执行到。
之所以要分为5层,是为了节约内存,不然你完全可以定义一个巨大的数组,每个槽位表示每秒要执行的任务。
其中第一层大小为 1 << 12
即 4096
,2-5层为 1<<5
即 32
。
可以看出定时器最大值为 12 + (4 * 5) = 32 位。
每当遍历完整个 near
数组后,则从下面几层中取出一个槽位,将其填充到 near
数组继续模拟计时。
大致了解数据结构之后,再来看初始化逻辑。
定时器初始化
1 2 3 4 5 6 7
| void skynet_timer_init(void) { TI = timer_create_timer(); uint32_t current = 0; TI->current = current; TI->current_point = gettime(); }
|
均为简单的初始化链表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| static struct timer * timer_create_timer() { struct timer *r=(struct timer *)skynet_malloc(sizeof(struct timer)); memset(r,0,sizeof(*r));
int i,j;
for (i=0;i<TIME_NEAR;i++) { link_clear(&r->near[i]); }
for (i=0;i<4;i++) { for (j=0;j<TIME_LEVEL;j++) { link_clear(&r->t[i][j]); } }
SPIN_INIT(r)
r->current = 0;
return r; }
|
其中 gettime
使用了 clock_gettime
而且还是单调时间,避免系统时间被修改。 clock_gettime
的时间精度为纳秒,此函数进行换算后,最后精度为毫秒,而且还是10毫秒,此时我们可以猜测时间轮的精度为10ms。
1 2 3 4 5 6 7 8 9
| static uint64_t gettime() { uint64_t t; struct timespec ti; clock_gettime(CLOCK_MONOTONIC, &ti); t = (uint64_t)ti.tv_sec * 1000; t += ti.tv_nsec / 1000000; return t; }
|
定时器更新
定时器更新由定时器线程去执行,每隔 100 微秒(也就是 0.1毫秒) 触发一次定时器更新,之所以外面调用是 0.1 毫秒,而时间轮精度为10 毫秒,是为了留足时间给定时器回调函数执行,否则某些函数执行时间过长,可能会导致定时器越来越晚触发。
1 2 3 4 5 6 7
| static void * thread_timer(void *p) { for (;;) { skynet_updatetime(); usleep(100); } }
|
skynet_updatetime
还考虑到时间倒流的问题,虽然我认为是不会触发,因为 clock_gettime
取的是 CLOCK_MONOTONIC
的时间,即系统启动后至今的时间,不会倒流。
之所以此处 要判断 (cp != TI->current_point)
是因为 update
的间隔为 0.1 毫秒,而时间轮精度为10 毫秒,可能 update
执行的时候 还没到定时的最小精度,最终触发 timer_update
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| int skynet_updatetime(void) { int count = 0; uint64_t cp = gettime(); if(cp < TI->current_point) { skynet_error(NULL, "ERROR: time diff error: change from %lld to %lld", cp, TI->current_point); TI->current_point = cp; } else if (cp != TI->current_point) { uint32_t diff = (uint32_t)(cp - TI->current_point); TI->current_point = cp; TI->current += diff; int i; for (i=0;i<diff;i++) { count += timer_update(TI); } } return count; }
|
时间轮执行
先加自旋锁,然后进行执行 timeout 为 0 的回调,否则后面进行 转移 2-4 层的时间轮,将 near
层时间轮覆盖后,就再也执行不到了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| static int timer_update(struct timer *T) { int count = 0;
SPIN_LOCK(T);
count += timer_execute(T);
timer_shift(T);
count += timer_execute(T);
SPIN_UNLOCK(T); return count; }
|
执行逻辑很简单,用当前 time(为tick) % 4096 找到需要执行的槽位的链表,在代码中为了提升性能用了位运算 & 实现。这里还能注意到小细节,执行回调函数链表时不需要加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| static inline int timer_execute(struct timer *T) { int count = 0; int idx = T->time & TIME_NEAR_MASK;
while (T->near[idx].head.next) { struct timer_node *current = link_clear(&T->near[idx]); SPIN_UNLOCK(T); count += dispatch_list(current); SPIN_LOCK(T); } return count; }
|
时间轮 Shift 操作
该函数由于用了大量位运算,所以看起来会比较难看,可以先从 while 的条件开始看,先是用当前 tick 也就是 ct % 4096,如果为 0,则说明 near
层的槽位已经全部走完 此时走完了 4096 * 10 毫秒,大约是40.96秒。接下来就是要找到正确的层次,然后从层次中找到正确的槽位,将其填充到 near
数组,即 ct / 4096 % 32,若为 0,则说明不在当前层次,还能再 除以一个 32。总之将位运算代码 & 翻译为 取余,>> 翻译为 除法,此函数的逻辑便不言自明。
同时 T→time 会一直递增,最后溢出回到0,uint32 的最大值为 4294967295
溢出为 0,则说明正确的值为 4294967296
那么 用该值 / 4096 / 32 / 32 / 32 / 32 发现 为 1,这就说明我们此时需要将 第四层的第0个槽位挪到 near
数组。 非常简单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| static void timer_shift(struct timer *T) { int mask = TIME_NEAR; uint32_t ct = ++T->time; if (ct == 0) { move_list(T, 3, 0); } else { uint32_t time = ct >> TIME_NEAR_SHIFT; int i=0;
while ((ct & (mask-1))==0) { int idx=time & TIME_LEVEL_MASK; if (idx!=0) { move_list(T, i, idx); break; } mask <<= TIME_LEVEL_SHIFT; time >>= TIME_LEVEL_SHIFT; ++i; } } }
|
时间轮添加
timer_add
告诉了我们定时器传进来的参数 time
并不是要计时的值,而是多少个 tick
,比如 time
传入 10 时,则以为这 10个 tick
也就是 10 * 10 毫秒后到时。 expire
表示的就是多少个 tick
后到时。
1 2 3 4 5 6 7 8 9 10 11 12
| static void timer_add(struct timer *T,void *arg,size_t sz,int time) { struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz); memcpy(node+1,arg,sz);
SPIN_LOCK(T);
node->expire=time+T->time; add_node(T,node);
SPIN_UNLOCK(T); }
|
又是一大坨位运算,就是判断 expire
应该插入到哪一层,如果将其改写,则需要大量的 if 判断。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| static void add_node(struct timer *T,struct timer_node *node) { uint32_t time=node->expire; uint32_t current_time=T->time;
if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) { link(&T->near[time&TIME_NEAR_MASK],node); } else { int i; uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT; for (i=0;i<3;i++) { if ((time|(mask-1))==(current_time|(mask-1))) { break; } mask <<= TIME_LEVEL_SHIFT; } link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node); } }
|