架构总览
Lab5在KVRaft的基础上实现了数据的分布式存储,把全量数据分成一个个"Shard",分别存储在不同的Group里,每个Group是一群Raft集群,这些Group里的服务器可以通过外部API实时调整他们的加入、退出。
下面这张图描述了ShardKV的架构
一些补充:
- 每个Group之间唯一的通信是Shard Migration,和Raft无关,而一个Group的内部几个Server形成一个Raft集群
- Group N里的KVServer 1是当前Group的raft leader,其余是follower。当Client请求KVServer的时候,会像KVRaft那样遍历这些KVServer,直到找到Leader
- Config的定义如下:
type Config struct {
Num int // config number
Shards [NShards]int // shardId -> gid
Groups map[int][]string // gid -> servers[]
}
- “Shard”,是由Key唯一确定的,课程代码里给的转换方式是:
func key2shard(key string) int {
shard := 0
if len(key) > 0 {
shard = int(key[0])
}
shard %= shardctrler.NShards
return shard
}
5A shardctrler
需要实现这几个接口:Join/Leave/Move/Query。
Join和Leave都是对一个或多个组里的Server进行了增加或者修改,controller需要根据新出现的Group或者删除的Group对所有Group分配的Shard做出管理。这里用了平均分配的办法(生产环境可以计算每个Group里的计算性能加权分配),也就是每个Group分配NShards/NGroups个Shard。
由于提供了Move接口,可能会导致已经本来配平的shards再次被破坏,所以在add或remove前需要preBalance处理,处理过程和最终的Balance处理相似:
把原来的shards数组转化为map(成为shardRev),然后用NShards/NGroups计算出每个Group需要几个Shard,然后考虑用一个队列pool,第一轮循环shardRev把每一组多余的shard回收,第二轮循环shardRev把不足的组补平,然后再把剩下的shard分给一轮循环shardRev。
因为是集群系统,需要消除循环map导致的无序性,所以写了一个双链表维护Node的顺序(类似Java里的LinkedHashMap类),称为orderedMap。
由于需要移动最少次数,所以在shard进入和退出pool的时候也需要根据实际情况定义顺序,所以也是写了一个deque(double end queue)。以及在最后把几个余数shard分配的时候要从shardRev.Keys()的逆序遍历来保证最少次数。
计M为Shard数,N为Group数,这种算法的时间复杂度是O(M+N*log(N)),主要在orderedMap的排序算法上;空间复杂度是O(M+N)。
5B Shard Movement && Challenges
以下的方案顺便把3个challenge也完成了,我的建议是写的时候以实现后2个challenge为目标,否则写到他们的时候可能得大改架构。
如何发现迁移目标
首先解决的一个问题就是当一个server发现自身shard多余或者缺失时,是push还是poll,这里我使用了push。我把Config拉取、检测自身Shard是否多余、是否缺失都用一个loop完成,这里存在一个次序问题,在一个Config完成更新之前,必须确保自身Shard没有多余,然后确保Shard没有缺失,再更新Config。
func (kv *ShardKV) ConfigLoop() {
for !kv.killed() {
if _, isLeader := kv.rf.GetState(); !isLeader {
loopSleep()
continue
}
kv.mu.Lock()
redundantShardIdList := make([]int, 0)
// 检测自己是否有多余Shard,如果有就加到redundantShardIdList里
// ...
for _, shardId := range redundantShardIdList {
// 把每个多余的Shard发到需要的server
}
if len(redundantShardIdList) > 0 {
kv.mu.Unlock()
loopSleep()
continue
}
recv := true
// 检测自己是否收到了所有需要的Shard
// ...
if !recv {
kv.mu.Unlock()
loopSleep()
continue
}
local := kv.curConfig
kv.mu.Unlock()
// 确保了上述两步,才更新Config
remote := kv.mck.Query(local.Num + 1)
// 更新Config
// ...
}
}
用这种方法将就能保证在shard还没有完全获取或者给予完成的时候,不影响已经存在的Shard的服务,也就是后两个challenge的内容。
如何检测Shard多余/缺失?可以在Shard里加入ConfigNum字段标识,这样就可以结合curConfig和prevConfig找出它们了。
type Shard struct {
ConfigNum int
Mp map[string]string
Id int
}
// 多余
for shardId, gid := range kv.prevConfig.Shards {
if gid == kv.gid && kv.curConfig.Shards[shardId] != gid && kv.shardMap[shardId].ConfigNum < kv.curConfig.Num {
redundantShardIdList = append(redundantShardIdList, shardId)
}
}
// 缺失
recv := true
for sid, gid := range kv.prevConfig.Shards {
if gid != kv.gid && kv.curConfig.Shards[sid] == kv.gid && kv.shardMap[sid].ConfigNum < kv.curConfig.Num {
recv = false
}
}
相应地在添加Shard的时候,需要填入对应的ConfigNum。
如何迁移Shard
得到了redundantShardIdList后,就可以向相应的server发送shard了。需要发送以下内容:
type PushShardArgs struct {
Shard Shard
ConfigNum int
SeqMap map[int64]int
ClientGid int
}
比较特别的是SeqMap,因为很有可能在config change的时候某个client从请求server A变成了请求server B,那么就会产生重复请求问题。所以server B需要找到自己的SeqMap和server A发送过来的SeqMap之间的差集,把增量部分加到自己的SeqMap里。
另外ConfigNum也必不可少,接收端比较自己的ConfigNum发现接受的ConfigNum更大的时候,说明还没准备好;如果更小则说明发送端过时,这两种情况都需要返回错误,让发送端继续发送或者放弃发送。
由于需要把Shard同步到Raft集群里,Leader收到请求后要向Raft层转递这个信息,然后在applier端完成最后的插入操作。
applier的代码:
info := cmd.Command.(PushShardInfo)
shard := info.Shard
if kv.curConfig.Num < cmd.ConfigNum {
res.Err = ErrNotReady
} else if shard.ConfigNum < kv.curConfig.Num {
res.Err = ErrConfigStale
} else if kv.shardMap[info.Shard.Id].Mp != nil {
// already received, do nothing
} else {
kv.shardMap[shard.Id] = shard.Clone(shard.ConfigNum)
for cid, rid := range info.SeqMap {
if rid0, ok := kv.seqMap[cid]; !ok || rid0 < rid {
kv.seqMap[cid] = rid
}
}
}
如何更新Config
在上述ConfigLoop的最后,leader给raft层传递一个更新Config的消息即可。注意applier的时候初始化Shard。
如何Garbage Collect
在ConfigLoop里生成的push协程发现返回结果是OK的话,就向raft层传递一个Garbage Collect的消息。
Snapshot
仅需要持久化这几个变量:
func (kv *ShardKV) takeSnapshot(lastIndex int) {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(kv.shardMap)
e.Encode(kv.seqMap)
e.Encode(kv.curConfig)
e.Encode(kv.prevConfig)
kv.rf.Snapshot(lastIndex, w.Bytes())
}
lastIndex是applier层收到的msg的index
Timing is Everything
仔细阅读Test代码会发现,一些Test会在对Config乱改一通之后sleep一段时间,然后直接把用不到的Group给kill掉,接着再去check。于是如果这期间shard migration没有完成的话就会一直not ready,永远没法完成测试。这一开始给我造成了很大困惑,最后还是通过微调timing通过了所有测试。我最终把raft层等待时间设为2000ms,config_loop时间设置为300ms。