近期工作的内容主要是设计一个通用的游戏排行榜服务器, 通用的含义指的是同时为多个项目组提供一套通用的解决方案, 经过不断的调研, 最终写下此文.
本文将会讲述 排行榜的多种实现方式, 不过最终我选择了 SkipList 作为底层的数据结构, 因此会着重讲SkipList.
先来讲一下排行榜的主要功能.
排行榜主要功能
- 更新/删除/获取 一个 Key 在某个 排行榜 上的 排名, 及展示信息
- 获取 一个排行榜 任意区间(例如排名区间) 的展示信息
排行榜排序方案
游戏中的排行榜的排序方案无非就两种
- 实时排序, 即玩家数值变化的瞬间, 就完成了对它的排序
- 非实时排序, 亦可称之为定时排序, 玩家数据变动不立即排序, 而是直到策划所配的时间点到的时候才进行排序
这两种排序方案没有好坏之分, 主要是看策划他想要什么? (或许他自己也不知道想要什么, 他都想试试看, 他只顾着他自己)
数据结构的抉择
首先, 因为我们要提供给多个项目组一套解决方案, 每个项目组需要的排序方案大概率是不同的, 因此 排除了定时排序, 如果一开始就往定时排序的思路上想, 到时候哪个项目组突然说要实时排序, 改动起来会非常痛苦, 而且定时排序实现起来比较简单, 直接到点快排就完事了.
那么 排除了 定时排序, 就只能朝着实时排序的思路上走.
业界一般采用两种数据结构进行实现
- RB Tree 红黑树
- 优势:
- 时间复杂度的常数相对 SkipList 低
- 内存占用也相对较低
- 不足:
- SkipList 跳表
或许 堆也可以加进去, 取topK, 但是超出了k项, 就不知道排名.
SkipList 在 redis 的有序集合中也有用到, 这时候可能会有人问 为什么你不考虑直接用 redis 呢?
我考虑的点主要有以下几点
- 引入 redis 会增加部署难度
- 引入 redis 仅仅只用来做排序, 是不是太笨重了?
- redis 的默认 SkipList 实现是从小到大, 而我们知道一般是分值越靠前的越经常访问, 如果从小到大 查找靠前的效率可能不如从大到小, 但是 redis 写死排序顺序了, 我不开心
- 现在游戏服务器为了避免宕机的风险, 都会把服务器拆的特别细, 但是这就会带来一个问题, 本来一件事情能在一个地方集中处理完, 现在要多走几条链路才能做完, 效率会变低, 如果再加入一个 redis, 链路只会更长
不过, 我们实现的时候可以参考 redis 的 SkipList.
排行榜服务实现
首先 我们要抽象出 key 和 value 这两个概念, key 你可以理解为 playerID, 那之所以叫 key 是因为 有的排行榜它不是以角色为单位, 它也有可能是以宠物为单位, 此时的 key 就是 petID. value 就是分值, 不过这里不能简单地就一个分值, 试想一下, 如果分值相同了怎么办?
以下代码均为伪代码, 随手一写.
1 2 3 4 5 6 7 8 9 10
| struct Value { uint64_t score; int64_t operatorTime; };
struct Data { uint64_t key; Value value; };
|
Data 表示的就是 这个 key 用于排序的信息, 那么这时候有个问题 如果 这个 key 同时出现在 A, B 两个排行榜, 那么 key 就可能造成数据冗余, 占用内存, 这个时候 就要改造 Data 这一 结构.
1 2 3 4 5 6
| struct Data { uint64_t key; Value unordered_map<uint32_t, Value> value; };
|
这时候 如果一个 key 出现在多个排行榜 只需要在 这个 map 里 通过 排行榜ID 去找属于这个排行榜的分值就行了.
此时又出现了一个问题, player key 和 pet key 有可能相同 通过 key 来查找数据 显然是不合理的.
这时候就要引入一个 DataType 可以理解为 key 的类型, 它与 排行榜LeaderBoard 绑定在一起.
1 2 3 4 5 6 7 8 9 10
| typedef uint32_t DataType; vector<Data> DataArray;
struct LeaderBoard { uint32_t leaderBoardID; DataType dataType; SkipList list; }; score = DataArray[leaderBoard.dataType].value[leaderboard.leaderBoard.ID].score;
|
这样就能通过 排行榜 反查到其旗下的排行信息.
讲清楚了排行榜的设计, 接下来就是重头戏 SkipList 的实现, 这里我 拿出 Redis 的源码来讲解.
SkipList 原理
其实你可以这么理解, 普通的链表 搜索一个东西 要遍历一次 时间复杂度为 O(n), 要想提高搜索速度, 就要尽可能跳过多的节点, 那么给它建多条索引链就行了.
1 2 3 4 5 6 7 8 9 10 11
| ┌────────┐ .─. .─. .─. │ Head ├──▶(Min───────────────▶ 7 ──────────────▶99 ) Level 3 └────┬───┘ `─' `┬' `┬' │ │ │ ┌────▼───┐ .─. .─. .▼. .─. .▼. │ Head ├──▶(Min──▶( 3 ────────▶ 7 ──▶30 ────────▶99 ) Level 2 └────┬───┘ `─' `┬' `┬' `┬' `┬' │ │ │ │ │ ┌────▼───┐ .─. .▼. .─. .▼. .▼. .─. .▼. │ Head ├──▶(Min──▶( 3 ──▶ 9 ──▶ 7 ──▶30 ──▶90 ──▶99 ) Level 1 └────────┘ `─' `─' `─' `─' `─' `─' `─'
|
从最上面的索引链表开始搜 30 的排名 可以像下面这样搜索
1 2 3 4 5 6 7 8 9 10 11
| ┌────────┐ .─. .─. .─. │ Head ├──▶(Min──────3────────▶ 7 ──────────────▶99 ) Level 3 └────┬───┘ `─' `┬' `┬' │ │ │ ┌────▼───┐ .─. .─. .▼. .─. .▼. │ Head ├──▶(Min──▶( 3 ────────▶ 7 1▶30 ────────▶99 ) Level 2 └────┬───┘ `─' `┬' `┬' `┬' `┬' │ │ │ │ │ ┌────▼───┐ .─. .▼. .─. .▼. .▼. .─. .▼. │ Head ├──▶(Min──▶( 3 ──▶ 9 ──▶ 7 ──▶30 ──▶90 ──▶99 ) Level 1 └────────┘ `─' `─' `─' `─' `─' `─' `─'
|
min 后面 要跨越 span = 3 才能到 7, 7要跨越 span = 1 才能到 30, 因此 30 从小到大为第四名.
至此, 我们知道 skiplist 本质就是多个索引 快速查找, 同时维护了一个 span值, 来快速得到 排名
Redis 中 SkipList 的实现
首先是要确定, 我们提升一个节点到索引链的概率, 和 要建多少条索引链, redis 中 最大层级为32层, 每次提升节点的概率为 0.25.
1 2 3 4 5 6 7 8 9 10
| #define ZSKIPLIST_MAXLEVEL 32 #define ZSKIPLIST_P 0.25 int zslRandomLevel(void) { int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; }
|
通过这个随机算法, 可以粗略的计算
- 层数为 1, 概率为 1 - p
- 层数为 2, 概率为 (1 - p)*p
- 层数为 3, 概率为 (1 - p)pp
平均层数为 (1-p)+ (1-p)*p + (1-p)pp + … = 1 / 1 - p
如果按照 redis 的提升概率 0.25 来算, 那么空间上膨胀 (4 / 3) 即 1.33倍.
下面来看 SkipList 的定义.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| typedef struct zskiplistNode { robj *obj; double score; struct zskiplistNode *backward; struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;
typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level;
} zskiplist;
|
Insert 插入操作
主要是从最高层, 依次向下找 新来的值应该插入到哪, 然后 保存到 update 这个数组中, 这样就不用每找一层就改一层.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) { zskiplistNode *update[32], *x; unsigned int rank[32]; int i, level;
x = zsl->header; for (i = zsl->level-1; i >= 0; i--) {
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;
x = x->level[i].forward; } update[i] = x; }
level = zslRandomLevel();
if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; }
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x;
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; }
for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; }
x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else zsl->tail = x;
zsl->length++;
return x; }
|
GetRank 获取排名
一样从最上层开始找… 然后累计 span 值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| unsigned long zslGetRank(zskiplist *zsl, double score, robj *o) { zskiplistNode *x; unsigned long rank = 0; int i;
x = zsl->header; for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && compareStringObjects(x->level[i].forward->obj,o) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward; }
if (x->obj && equalStringObjects(x->obj,o)) { return rank; } }
return 0; }
|
GetElementByRank 根据排名找对象
用 traversed 统计意境走过的步长, 这个思路还是比较清奇的…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) { zskiplistNode *x; unsigned long traversed = 0; int i;
x = zsl->header; for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank) { traversed += x->level[i].span; x = x->level[i].forward; }
if (traversed == rank) { return x; }
}
return NULL; }
|
至此, Redis 中的 SkipList 已经基本了解完了.
排行榜的另类做法
其实如果你知道 玩家的分值的区间, 例如皇室战争的皇冠杯数大致的范围你是知道的, 那你其实可以这么来做
1 2 3 4 5 6 7 8 9 10
| vector<vector<uint32_t>> array; array[皇冠数].push_back(👑的key); 这样要取出其排名, 只需要累加前面的 数组就可以了..
假设 分值为 1 - 5 [5] [4] [3] [2] [1] k1 k4 k2 k5 k7 k3 k6
那你要得到 k2 的排名, 你只需要 累加 [5] + [4] 的大小就能算出来
|