etcd-raft的线性一致读方法一:ReadIndex

说明

在分布式系统中,存在多种一致性模型,诸如严格一致性、线性一致性、顺序一致性等。不同的一致性模型给应用提供的数据保证也不同,其代价也不一样,一致性越强,代价越高。但是一致性越强,对应用的使用也就越友好。关于一致性模型的更多描述可参考: https:// aphyr.com/posts/313-str ong-consistency-models

问题背景

在分布式系统中,etcd-raft的实现模式是Leader + Followers,即存在一个Leader和多个Follower,所有的更新请求都经由Leader处理,Leader再通过日志同步的方式复制到Follower节点。

读请求的处理则没有这种限制:所有的节点(Leader与Followers)都可以处理用户的读请求,但是由于以下几种原因,导致从不同的节点读数据可能会出现不一致:

  • Leader和Follower之间存在状态差:这是因为更新总是从Leader复制到Follower,因此,Follower的状态总的落后于Leader,不仅于此,Follower之间的状态也可能存在差异。因此,如果不作特殊处理,从集群不同的节点上读数据,读出的结果可能不相同;
  • 假如限制总是从某个特点节点读数据,一般是Leader,但是如果旧的Leader和集群其他节点出现了网络分区,其他节点选出了新的Leader,但是旧Leader并没有感知到新的Leader,于是出现了所谓的脑裂现象,旧的Leader依然认为自己是主,但是它上面的数据已经是过时的了,如果客户端的请求分别被旧的和新的Leader分别处理,其得到的结果也会不一致。

如果不对读流程作任何的特殊处理,上述限制就会导致一个非一致性的读。而线性一致性的两个要求: 一致性读读最新数据 更是无从谈起。

实现

原理

etcd-raft通过一种称为ReadIndex的机制来实现线性一致读,其基本原理也很简单:Leader节点在处理读请求时,首先需要与集群多数节点确认自己依然是Leader,然后读取已经被应用到应用状态机的最新数据。

基本原理包含了两方面内容:

  • Leader首先通过某种机制确认自己依然是Leader;
  • Leader需要给客户端返回最近已应用的数据:即最新被应用到状态机的数据。

数据结构

ReadState

type ReadState struct {
    Index      uint64
    RequestCtx []byte
}

ReadState负责记录每个客户端的读请求的状态,其中包含:

  • RequestCtx :客户端读请求的唯一标识,一般使用request id
  • Index :表示读请求产生时当前节点的Commit点

ReadState最终会被返回给应用(通过Ready),由应用负责处理客户端的读请求,而且,应用需要根据该请求发起时的节点Commit信息决定返回何时的数据。

readIndexStatus

type readIndexStatus struct {
   req   pb.Message
   index uint64 
   acks  map[uint64]struct{}
}

readIndexStatus用来追踪Leader向Followers发送的心跳信息的响应,其中:

  • req :表示原始的ReadIndex请求,这是应用在处理客户端读请求时向底层raft协议核心处理层发起的ReadIndex请求;
  • index :表示Leader当前的Commit信息;
  • acks :记录Followers的响应,某个Follower如果确认了Leader的心跳消息,acks便会记录一个

readOnly

type readOnly struct {
   option           ReadOnlyOption
   pendingReadIndex map[string]*readIndexStatus
   readIndexQueue   []string
}

readOnly管理全局的读ReadIndex请求,其中:

  • option :暂时不确定含义;
  • pendingReadIndex :保存所有的待处理的ReadIndex请求,实现上是一个map,其中key为请求的唯一标识,一般为节点为请求生成的唯一request id;
  • readIndexQueue :所有的ReadIndex的数组

关键流程

客户端读请求处理

由于etcd-raft自带的示例应用并没有实现线性一致性读,因此,选择etcd-server作为例子来说明如何通过read index方法来实现线性一致读。

etcd-server在启动时会创建一个后台协程,运行的方法是: linearizableReadLoop ,如下:

func (s *EtcdServer) Start() { 
   s.start()
   s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) }) 
   s.goAttach(s.purgeFile)
   s.goAttach(func() { monitorFileDescriptor(s.stopping) }) 
   s.goAttach(s.monitorVersions)
   s.goAttach(s.linearizableReadLoop)
   s.goAttach(s.monitorKVHash)
}

这个协程干了什么呢?其实很简单,等着有读请求的信号,并且在有信号来的时候调用底层的raft核心协议处理层来获取信号发生时刻的commit index,如下:

func (s *EtcdServer) linearizableReadLoop() {
    var rs raft.ReadState
    for {
        ctx := make([]byte, 8)
        binary.BigEndian.PutUint64(ctx, s.reqIDGen.Next())

        // 主要是等待readwaitc上的信号 
        select {
        case <-s.readwaitc: case="" <-s.stopping:="" return="" }="" 有信号到来="" nextnr="" :="newNotifier()" s.readmu.lock()="" nr="" s.readnotifier="nextnr" s.readmu.unlock()="" 准备读取当前节点的commit="" index="" cctx,="" cancel="" s.cfg.reqtimeout())="" if="" err="" ctx);="" !="nil" {="" ......="" cancel()="" 等待底层返回当次读请求时的commit="" 增加了超时控制="" for="" !timeout="" &&="" !done="" 判读应用当前已经应用至状态机的index是否小于已commit的index="" 如果是,等待状态机提交至commit="" ai="" <="" rs.index="" select="" <-s.applywait.wait(rs.index):="" 至此,commit="" index已经被应用至状态机="" 可以通知读请求从状态机中返回数据了="" nr.notify(nil)="" } 
 

这里的linearizableReadLoop相当于是一把锁,控制读请求何时可以从状态机中读取最新数据。

我们上面说到linearizableReadLoop是在收到信号的时候进行read index控制,那该信号又是由谁触发呢?

信号的直接来源是 linearizableReadNotify :

func (s *EtcdServer) linearizableReadNotify(ctx context.Context) error {
    s.readMu.RLock()
    nc := s.readNotifier
    s.readMu.RUnlock()

    // 这里发送信号 
    select {
    case s.readwaitc <- struct{}{}:="" default:="" }="" 等待read="" state结束="" select="" {="" case="" <-nc.c:="" return="" nc.err="" <-ctx.done():="" ctx.err()="" <-s.done:="" errstopped="" } 
 

linearizableReadNotify的实现非常简单,这里就不再赘述,linearizableReadNotify会在读请求中被调用,如下:

func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
    if !r.Serializable {
        err := s.linearizableReadNotify(ctx)
        if err != nil {
            return nil, err
        }
    }
    ......
}

上面我们就以etcd-server为例分析了客户端的读请求是如何在服务端被处理。由于增加了read index过程而导致了比正常的请求处理增加了复杂性。接下来,我们要看看raft的核心协议处理层如何处理这种ReadIndex请求。

核心协议层处理ReadIndex请求

也即:客户端的读请求也需要被参与到协议的核心处理层。为此,核心处理层提供了一个接口专门处理读,ReadIndex:

func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
    return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
}

func (n *node) step(ctx context.Context, m pb.Message) error {
    ch := n.recvc
    if m.Type == pb.MsgProp {
        ch = n.propc
    }
    select {
    case ch <- m:="" return="" nil="" case="" <-ctx.done():="" ctx.err()="" <-n.done:="" errstopped="" }="" } 
 

ReadIndex的消息最终被发往了recvc这个channel,后台协程会接管这个消息并调用核心的协议处理函数:

func (n *node) run(r *raft) {
    for {
        select {
        case m := <-n.recvc: _,="" ok="" :="r.prs[m.From]" if="" ||="" !isresponsemsg(m.type)="" {="" r.step(m)="" }="" case="" ...:="" } 
 

如果节点是Leader,那么该消息最终的处理函数是 stepLeader

func stepLeader(r *raft, m pb.Message) {
    switch m.Type {
    case pb.MsgReadIndex:
        if r.quorum() > 1 {
            if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
            return
        }
        switch r.readOnly.option {
        // 关注的是该分支 
        case ReadOnlySafe:
            r.readOnly.addRequest(r.raftLog.committed, m)
            r.bcastHeartbeatWithCtx(m.Entries[0].Data)
        case ReadOnlyLeaseBased:
            ......
        }
    }
}

本文讨论的ReadIndex方案对应的是ReadOnlySafe这个分支,其中addRequest()会把这个读请求到达时的leader的commit index保存起来,并且维护一些状态信息,而bcastHeartbeatWithCtx()则向其他Followers节点发送心跳消息MsgHeartbeat,而当leader收到心跳响应消息MsgHeartbeatResp时处理为:

func stepLeader(r *raft, m pb.Message) {
    ......
    case pb.MsgHeartbeatResp:
        ackCount := r.readOnly.recvAck(m)
        if ackCount < r.quorum() {
            return
        }
        rss := r.readOnly.advance(m)
        for _, rs := range rss {
            req := rs.req 
            if req.From == None || req.From == r.id { 
                // from local member
                r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
            } else {
                // 如果是follower节点发来的ReadIndex请求
                // 给它返回响应
                r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
            }
        }
    }
}

如果接收到了多数派的心跳响应,则会从刚才保存的信息中将对应读请求当时的commit index和请求id拿出来,填充到ReadState中,该结构最终会被返回给调用ReadIndex的应用。

需要说明的是:r.readOnly.advance(m)会返回m以前的所有的request id(m其实也就是应用调用ReadIndex时的请求id,而且请求id是单调递增的,因此当在某个时刻确认了请求id为m的主信息,那么m之前的主信息也认为是此),因此,上面代码的rss其实是一个slice。

以上我们其实是假设该读请求是发生在leader上面的,leader通过心跳确认自己是主,然后再保证读时刻的commit index被应用至状态机后返回读结果给客户端,这就保证了客户端得到的结果一定是最新的。

那假如读请求是发生在follower上呢?对于etcd-server应用来说,它照样是遵循上面的请求处理流程,应用层会调用协议的核心处理层的ReadIndex方法,但是核心协议层识别当前节点是follower,那请求处理流程是:

func stepFollower(r *raft, m pb.Message) {
    switch m.Type {
    ......
    case pb.MsgReadIndex:
        if r.lead == None {
            return
        }
        m.To = r.lead 
        r.send(m)
    case pb.MsgReadIndexResp:
        r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
    ......
}

可以发现,如果是在Follower节点上执行ReadIndex,那么它必须先要向Leader去查询commit index,然后收到响应后在创建ReadState记录commit信息,后续的处理和Leader别无二致。

总结

经过上面一通复杂处理,就达到了效果:无论从主还是从上去读,保证读到的数据都一致(都是在主上被commit后的状态)

参考

稿源:分布式和存储的那些事 (源链) | 关于 | 阅读提示

本站遵循[CC BY-NC-SA 4.0]。如您有版权、意见投诉等问题,请通过eMail联系我们处理。
酷辣虫 » 后端存储 » etcd-raft的线性一致读方法一:ReadIndex

喜欢 (0)or分享给?

专业 x 专注 x 聚合 x 分享 CC BY-NC-SA 4.0

使用声明 | 英豪名录