Lab2 Key/Value Server

lab2要求实现一个线性一致的KV数据库,可能会存在不稳定的网络(比如高延迟、丢包等情况),但是server不会crash。

难点在于client由于超时重传(类似于TCP),可能会发送一个请求多次,server接收到的这些重复请求也不一定是连续的。此时就需要server维护一个hashMap用于记录请求ID以及对应的第一次成功的返回结果。 hashSet显然需要及时清理。我采用的方案是client在确认发包成功后,继续发送携带请求ID的ResolveRPC直到成功,server通过ResovleRPC把hashMap中的请求ID删除。删除操作是幂等的,不需要保证线性一致。 例如,一个完整的PUT请求如下:

server维护done = make(map[int64]string)

● client发送PUT(x,1),ID=1

● server判断ID是否存在于done。如果存在,返回里面的值;否则处理并把结果存在done里

● client收到之后发送Resolve(1)

● server执行delete(done, 1)

一次上述的操作是原子性的。

Lab4 Fault-tolerant KVServer

基于RAFT协议实现一个Fault-tolerant KVServer

Client实现

所有的读写请求都必须直接从client发送给leader server。有两种情况client不知道leader是谁:

  • 第一次发送请求
  • server易主/crash,返回报错

此时client就需要不断重新尝试另一个server,直到请求被成功接收。注意到不同的命令有相同的发送方式,可以抽象出一层来专门用于发送RPC,我这里使用了策略模式。

Server实现

如何返回RPC结果

对于RPC接受的一个Op,leader需要将它通过Raft的Start接口达成共识。对于写请求(GET),client只需要知道它成功就行;对于读请求(PUT/APPEND),client还需要知道结果。

我们用一个专门的协程applier循环读取Raft commit上来的Op。如果是读请求,不做处理;如果是写请求,则更新KVmap。这样每一个Op都对应着一个独立的commandIndex,定义一个这样的map:

resultChMap map[int]chan opResult

RPC协程与applier协程通过这个map里的管道通信,以返回RPC结果。

要注意RPC在返回结果后需要销毁对应的通道。

幂等性设计

raft层commitIndex幂等

raft有可能提交log过时的commitIndex,因此要注意在真正apply的时候添加如下判断

 msg.CommandIndex > kv.lastApplied

但这样可能会造成过时请求得不到channel的信息而发生阻塞,所以还需要在RPC处添加timeout。

ticker := time.NewTicker(time.Duration(CHANNEL_TIMEOUT) * time.Millisecond)
defer ticker.Stop()

select {
case opResult := <-ch:
    if opResult.err != "" {
        reply.Err = Err(opResult.err)
    }
case <-ticker.C:
    reply.Err = NOTALEADER
}

client请求幂等

由于网络故障等原因,server接收到的命令可能是重复的,并且client只认最新结果(这和TCP超时重传有所差异)。对于读请求,并不会使得KVmap状态发生改变;但是对于写请求,就必须保证它的幂等性。

考虑维护这样一个map,key是clientID,value是序列号(从0开始增长)

seqMap      map[int64]int

applier每次读取到一个Op就判断一下序列号,如果Op序列号是旧序列号+1,则可以正常处理写请求(或者no-op的读请求)。

func (kv *KVServer) processSeq(op Op) bool {
	seq0, ok := kv.seqMap[op.ClientID]
	if !ok {
		kv.seqMap[op.ClientID] = op.Seq
		return true
	}
	if !(seq0+1 == op.Seq) {
		return false
	}
	kv.seqMap[op.ClientID]++
	return true
}
// applier处理RAFT层传上来的Op逻辑:
if kv.processSeq(op) {
    k := op.OpKey
    switch op.OpType {
    case PUTOP:
        {
            kv.mp[k] = op.OpValue
        }
    case APPENDOP:
        {
            kv.mp[k] = kv.mp[k] + op.OpValue
        }
    }
}

Snapshot

理解snapshot的数据流传递至关重要:

  • server发现Raft State太大,leader对其raft层发送snapshot指令,传递一些关于server状态的信息
  • raft层向persister保存snapshot和raft状态
  • raft层向follower发送InstallSnapshotRPC
  • follower向server层传递snapshot,server层将snapshot里的server状态应用于自身。

对于server检查raft state是否达到snapshot阈值的时机,在接收到applyMsg并更新了状态后是比较合适的。不应该在start后立即检查,因为此时还没有立刻达成共识。

server需要持久化的状态如下:

  • lastApplied
  • mp(即KVMap)
  • seqMap

拓展

一个上网冲浪搜集到的拓展,目前我还没实现

Read-only Operations Optimize

论文中第8节讲述其实现在处理读请求的时候,可以不用往log里加东西。这需要实现两点:

  • leader需要确保拥有最新的commit log。这可以在每一个leader选举成功后commit一个no-op实现。
  • leader在处理读请求的时候需要确保自身还是leader,这就需要leader在处理前与大多数节点确认自己仍是leader。