TinyKv Project2 PartB RaftKV

2023-05-16

TinyKv Project2 PartB RaftKV

  • 前言
  • Project2 PartB RaftKV 文档翻译
  • PartB 到底想让我们做什么?
  • 分析要实现的函数到底要干什么事情
    • proposeRaftCommand 将上层命令打包成日志发送给raft
    • HandleRaftReady 处理已经Ready的数据
    • SaveReadyState - 持久化
    • Append - 持久化日志
  • 结构体与一些流程介绍
  • make project2b 看看~

前言

  partB部分主要是做持久化相关的内容,Build a fault-tolerant KV server on top of Raft:在Raft基础之上构建一个容错KV服务器。

Project2 PartB RaftKV 文档翻译

  在这一部分中,你将使用 Part A 中实现的 Raft 模块建立一个容错的KV存储服务。服务将是一个复制状态机,由几个使用 Raft 进行复制的KV服务器组成。KV服务应该继续处理客户的请求,只要大多数的服务器是活的并且可以通信,尽管有其他的故障或网络分区发生。

  在 Project1 中,你已经实现了一个独立的kv服务器,所以你应该已经熟悉了kv服务器的 API 和 Storage 接口。

  在介绍代码之前,你需要先了解三个术语:Store、Peer 和 Region,它们定义在proto/proto/metapb.proto 中。

  • Store 代表 tinykv-server 的一个实例
  • Peer 代表运行在 Store 上的 Raft 节点
  • Region 是 Peer 的集合,也叫 Raft 组

在这里插入图片描述

  为了简单起见,在 Project2 的集群中,一个 Store 上只有一个 Peer,一个 Region。所以现在不需要考虑Region的范围。多个区域将在 Project3 中进一步引入。

代码

  首先,你应该看看位于 kv/storage/raft_storage/raft_server.go 中的 RaftStorage 的代码,它也实现了存储接口。与 StandaloneStorage 直接从底层引擎写入或读取不同,它首先将每个写入或读取请求发送到 Raft,然后在 Raft 提交请求后对底层引擎进行实际写入和读取。通过这种方式,它可以保持多个 Stores 之间的一致性。

  RaftStorage 主要创建一个 Raftstore 来驱动 Raft。当调用 Reader 或 Write 函数时,它实际上是通过 channel (该通道的接收者是 raftWorker 的 raftCh)向 raftstore 发送 proto/proto/raft_cmdpb.proto 中定义的 RaftCmdRequest,其中有四种基本的命令类型(Get/Put/Delete/Snap),在 Raft 提交并应用该命令后返回响应。而读写函数的 kvrpc.Context 参数现在很有用,它从客户端的角度携带了 Region 信息,并作为 RaftCmdRequest 的 Head 传递。也许这些信息是错误的或过时的,所以 raftstore 需要检查它们并决定是否提出请求。

然后,这里是TinyKV的核心 - raftstore。这个结构有点复杂,你可以阅读TiKV的参考资料,让你对这个设计有更好的了解。

https://pingcap.com/blog-cn/the-design-and-implementation-of-multi-raft/#raftstore (中文版本)
https://pingcap.com/blog/2017-08-15-multi-raft/#raftstore (英文版本)

  raftstore 的入口是 Raftstore,见 kv/raftstore/raftstore.go。它启动了一些Worker 来异步处理特定的任务,现在大部分都没有用到,所以你可以直接忽略它们。你所需要关注的是 raftWorker。(kv/raftstore/raft_worker.go)

  整个过程分为两部分:raft worker 轮询 raftCh 以获得消息,这些消息包括驱动 Raft 模块的基本 tick 和作为 Raft 日志项的 Raft 命令;它从 Raft 模块获得并处理 ready,包括发送raft消息、持久化状态、将提交的日志项应用到状态机。一旦应用,将响应返回给客户。

实现 peer storage

  peer storage 是通过 Part A 中的存储接口进行交互,但是除了 raft 日志之外,peer storage 还管理着其他持久化的元数据,这对于重启后恢复到一致的状态机非常重要。此外,在 proto/proto/raft_serverpb.proto 中定义了三个重要状态。

  • RaftLocalState:用于存储当前 Raft hard state 和 Last Log Index。
  • RaftApplyState。用于存储 Raft applied 的 Last Log Index 和一些 truncated Log 信息。
  • RegionLocalState。用于存储 Region 信息和该 Store 上的 Peer State。Normal表示该 peer 是正常的,Tombstone表示该 peer 已从 Region 中移除,不能加入Raft 组。

这些状态被存储在两个badger实例中:raftdb 和 kvdb。

  • raftdb 存储 raft 日志和 RaftLocalState。
  • kvdb 在不同的列族中存储键值数据,RegionLocalState 和 RaftApplyState。你可以把 kvdb 看作是Raft论文中提到的状态机。

  你可能想知道为什么 TinyKV 需要两个 badger 实例。实际上,它只能使用一个badger 来存储 Raft 日志和状态机数据。分成两个实例只是为了与TiKV的设计保持一致。

  这些元数据应该在 PeerStorage 中创建和更新。当创建 PeerStorage 时,见kv/raftstore/peer_storage.go 。它初始化这个 Peer 的 RaftLocalState、RaftApplyState,或者在重启的情况下从底层引擎获得之前的值。注意,RAFT_INIT_LOG_TERM 和 RAFT_INIT_LOG_INDEX 的值都是5(只要大于1),但不是0。之所以不设置为0,是为了区别于 peer 在更改 conf 后被动创建的情况。你现在可能还不太明白,所以只需记住它,细节将在 project3b 中描述,当你实现 conf change 时。

  在这部分你需要实现的代码只有一个函数 PeerStorage.SaveReadyState,这个函数的作用是将 raft.Ready 中的数据保存到 badger 中,包括追加日志和保存 Raft 硬状态。

  要追加日志,只需将 raft.Ready.Entries 处的所有日志保存到 raftdb,并删除之前追加的任何日志,这些日志永远不会被提交。同时,更新 peer storage 的RaftLocalState 并将其保存到 raftdb。

  保存硬状态也很容易,只要更新peer storage 的 RaftLocalState.HardState 并保存到raftdb。

提示:

  • 使用WriteBatch来一次性保存这些状态。
  • 关于如何读写这些状态,请参见 peer_storage.go 的其他函数。

实现Raft ready 过程

  在 Project2 的 PartA ,你已经建立了一个基于 tick 的 Raft 模块。现在你需要编写驱动它的外部流程。大部分代码已经在 kv/raftstore/peer_msg_handler.go 和kv/raftstore/peer.go 下实现。所以你需要学习这些代码,完成proposalRaftCommand 和 HandleRaftReady 的逻辑。下面是对该框架的一些解释。

  Raft RawNode 已经用 PeerStorage 创建并存储在 peer 中。在 raft Worker 中,你可以看到它接收了 peer 并通过 peerMsgHandler 将其包装起来。peerMsgHandler主要有两个功能:一个是 HandleMsgs,另一个是 HandleRaftReady。

  HandleMsgs 处理所有从 raftCh 收到的消息,包括调用 RawNode.Tick() 驱动Raft的MsgTypeTick、包装来自客户端请求的 MsgTypeRaftCmd 和 Raft peer 之间传送的MsgTypeRaftMessage。所有的消息类型都在 kv/raftstore/message/msg.go 中定义。你可以查看它的细节,其中一些将在下面的部分中使用。

  在消息被处理后,Raft 节点应该有一些状态更新。所以 HandleRaftReady 应该从Raft 模块获得Ready,并做相应的动作,如持久化日志,应用已提交的日志,并通过网络向其他 peer 发送 raft 消息。

在一个伪代码中,raftstore 使用 Raft:

for {
  select {
  case <-s.Ticker:
    Node.Tick()
  default:
    if Node.HasReady() {
      rd := Node.Ready()
      saveToStorage(rd.State, rd.Entries, rd.Snapshot)
      send(rd.Messages)
      for _, entry := range rd.CommittedEntries {
        process(entry)
      }
      s.Node.Advance(rd)
    }
}

在这之后,整个读或写的过程将是这样的:

  • 客户端调用 RPC RawGet/RawPut/RawDelete/RawScan
  • RPC 处理程序调用 RaftStorage 的相关方法
  • RaftStorage 向 raftstore 发送一个 Raft 命令请求,并等待响应
  • RaftStore 将 Raft 命令请求作为 Raft Log 提出。
  • Raft 模块添加该日志,并由 PeerStorage 持久化。
  • Raft 模块提交该日志
  • Raft Worker 在处理 Raft Ready 时执行 Raft 命令,并通过 callback 返回响应。
  • RaftStorage 接收来自 callback 的响应,并返回给 RPC 处理程序。
  • RPC 处理程序进行一些操作并将 RPC 响应返回给客户。

  你应该运行 make project2b 来通过所有的测试。整个测试正在运行一个模拟集群,包括多个 TinyKV 实例和一个模拟网络。它执行一些读和写的操作,并检查返回值是否符合预期。

  要注意的是,错误处理是通过测试的一个重要部分。你可能已经注意到,在proto/proto/errorpb.proto 中定义了一些错误,错误是 gRPC 响应的一个字段。同时,在 kv/raftstore/util/error.go 中定义了实现 error 接口的相应错误,所以你可以把它们作为函数的返回值。

  这些错误主要与 Region 有关。所以它也是 RaftCmdResponse 的 RaftResponseHeader 的一个成员。当提出一个请求或应用一个命令时,可能会出现一些错误。如果是这样,你应该返回带有错误的 Raft 命令响应,然后错误将被进一步传递给 gRPC 响应。你可以使用 kv/raftstore/cmd_resp.go 中提供的 BindErrResp,在返回带有错误的响应时,将这些错误转换成 errorpb.proto 中定义的错误。

  在这个阶段,你可以考虑这些错误,其他的将在 Project3 中处理。

  • ErrNotLeader:raft 命令是在一个 Follower 上提出的。所以用它来让客户端尝试其他 peer。
  • ErrStaleCommand:可能由于领导者的变化,一些日志没有被提交,就被新的领导者的日志所覆盖。但是客户端并不知道,仍然在等待响应。所以你应该返回这个命令,让客户端知道并再次重试该命令。

提示:

  • PeerStorage 实现了 Raft 模块的存储接口,你应该使用提供的SaveRaftReady() 方法来持久化Raft的相关状态。
  • 使用 engine_util 中的 WriteBatch 来进行原子化的多次写入,例如,你需要确保在一个写入批次中应用提交的日志并更新应用的索引。
  • 使用 Transport 向其他 peer 发送 raft 消息,它在 GlobalContext 中。
  • 服务器不应该完成 RPC,如果它不是多数节点的一部分,并且没有最新的数据。你可以直接把获取操作放到 Raft 日志中,或者实现 Raft 论文第8节中描述的对只读操作的优化。
  • 在应用日志时,不要忘记更新和持久化应用状态机。
  • 你可以像TiKV那样以异步的方式应用已提交的Raft日志条目。这不是必须的,虽然对提高性能是一个很大的提升。
  • 提出命令时记录命令的 callback,应用后返回 callback。
  • 对于 snap 命令的响应,应该明确设置 badger Txn 为 callback。

PartB 到底想让我们做什么?

  Build a fault-tolerant KV server on top of Raft(在Raft基础之上构建一个容错KV服务器。)

  在partA中,我们实现了raft协议,并封装了一层与上层交互的接口。那么在partB中,我们就需要去实现真正的存储,将日志持久化,将apply的日志放入状态机去执行并返回结果。

  在此介绍三个名词,在后面的project中会用到

在这里插入图片描述

  • Store:每一个节点服务器叫做一个 store,也就是一个节点上面只有一个 Store。代码里面叫 RaftStore,后面统一使用 RaftStore 代称。

  • Peer:一个 RaftStore 里面会包含多个 peer,一个 RaftStore 里面的所有 peer 公用同一个底层存储,也就是多个 peer 公用同一个 badger 实例,一个peer对应一个raft节点。

  • Region:一个 Region 叫做一个 Raft group,即同属一个 raft 集群,一个 region 包含多个 peer,这些 peer 散落在不同的 RaftStore 上。

  在partB中,就三个函数需要我们去实现,分别在peer_msg_handler.gopeer_storage.go 中。

在这里插入图片描述

  这里将 Rawnode-Raft-RaftLog 统称为 raft 层,把要实现的部分称为 peer 层。peer 层首先接收来自 client 的 RaftCmdRequest,其中包含着不同的命令请求,接着它会把这些请求逐一以 entry 的形式传递给 raft 层,当然,这个 peer 应该是 Leader,不然 client 会找下一个 peer 继续试。raft 层收到条目后,会在集群内部进行同步,这就是 project2a 的内容。同步的过程中,peer 层会不时询问 raft 层有哪些已经同步好的 entry 可以拿来应用(执行)?哪些 entry 需要持久化?有没有快照需要应用?等等。 如果有ready返回回来,那就根据其中的内容去做持久化,应用快照,执行apply到状态机等等

//将 client 的请求包装成 entry 传递给 raft 层
func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *message.Callback) {
	err := d.preProposeRaftCommand(msg)
	if err != nil {
		cb.Done(ErrResp(err))
		return
	}
	// Your Code Here (2B).
}
// HandleRaftReady 处理 rawNode 传递来的 Ready
//HandleRaftReady 对这些 entries 进行 apply(即执行底层读写命令)
//每执行完一次 apply,都需要对 proposals 中的相应 Index 的 proposal 进行 callback 回应(调用 cb.Done())
//然后从中删除这个 proposal。
func (d *peerMsgHandler) HandleRaftReady() {
	if d.stopped {
		return
	}
	// Your Code Here (2B).
}
// Save memory states to disk.
//	将内存状态保存到磁盘。持久化
//将 raft.Ready 中的数据保存到 badger 中,包括追加日志和保存 Raft 硬状态。
// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
//不要在此函数中修改ready,这是稍后正确推进ready对象的要求。
// 处理 Ready 中的 Entries 和 HardState 数据

func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) {
	// Hint: you may call `Append()` and `ApplySnapshot()` in this function
	// Your Code Here (2B/2C).
}
// Append the given entries to the raft log and update ps.raftState also delete log entries that will
// never be committed
// 负责将 Ready 中的 entries 持久化到 raftDB 中去,然后更新 RaftLoaclState 的状态。同时,如果底层存储有冲突条目,则将其删除。
// 将给定的条目附加到raft日志并更新ps.raftState,同时删除永远不会提交的日志条目
func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error {
	// Your Code Here (2B).

}

分析要实现的函数到底要干什么事情

proposeRaftCommand 将上层命令打包成日志发送给raft

  该方法会将 client 的请求包装成 entry 传递给 raft 层。client 会传来一个 msg(类型为 *raft_cmdpb.RaftCmdRequest)。其中,该结构中包含了 Requests 字段和 AdminRequest 字段,即两种命令请求。在 project2B 中,暂时只用到 Requests 字段。核心分为两步:

  1. 在请求发过来的时候,同时会附带一个回调函数,我们需要记录这个回调函数与本条日志的索引,等待apply这条记录到状态机执行完后,就回调它,并将返回值返回。简单来说,就是封装回调
  2. 将这个Msg序列化,通过昨天的partA的rawnode封装的Propse接口将消息传递进去

  proposals 是 封装的回调结构体,是一个切片。当上层想要让 peer 执行一些命令时,会发送一个 RaftCmdRequest 给该 peer,而这个 RaftCmdRequest 里有很多个 Request,需要底层执行的命令就在这些 Requset 中。这些 Request 会被封装成一个 entry 交给 peer 去在集群中同步。当 entry 被 apply 时,对应的命令就会被执行。

  上层怎么知道底层的这些命令真的被执行并且得到命令的执行结果呢?这就是 callback 的作用。每当 peer 接收到 RaftCmdRequest 时,就会给里面的每一个 Request 一个 callback,然后封装成 proposal,其中 term 就为该 Request 对应 entry 生成时的 term 以及 index。

  当 rawNode 返回一个 Ready 回去时,说明上述那些 entries 已经完成了同步,因此上层就可以通过 HandleRaftReady 对这些 entries 进行 apply(即执行底层读写命令)。每执行完一次 apply,都需要对 proposals 中的相应 Index 的 proposal 进行 callback 回应(调用 cb.Done()),表示这条命令已经完成了(如果是 Get 命令还会返回取到的 value),然后从中删除这个 proposal。

//将 client 的请求包装成 entry 传递给 raft 层
func (d *peerMsgHandler) proposeRaftCommand(msg *raft_cmdpb.RaftCmdRequest, cb *message.Callback) {
	err := d.preProposeRaftCommand(msg)
	if err != nil {
		cb.Done(ErrResp(err))
		return
	}
	// Your Code Here (2B).
	if msg.Requests != nil {
		d.proposeRequest(msg, cb)
	} else {
		//暂时不管
		//d.proposeAdminRequest(msg, cb)
	}
}
func (d *peerMsgHandler) proposeRequest(msg *raft_cmdpb.RaftCmdRequest, cb *message.Callback) {
	//1. 封装回调,等待log被apply的时候调用
	//后续相应的 entry 执行完毕后,响应该 proposal,即 callback.Done( );
	d.proposals = append(d.proposals, &proposal{
		index: d.RaftGroup.Raft.RaftLog.LastIndex() + 1,
		term:  d.RaftGroup.Raft.Term,
		cb:    cb,
	})
	//2. 序列化RaftCmdRequest
	data, err := msg.Marshal()
	if err != nil {
		log.Panic(err)
	}
	//3. 将该字节流包装成 entry 传递给下层raft MessageType_MsgPropose
	err = d.RaftGroup.Propose(data)
	if err != nil {
		log.Panic(err)
	}
}

HandleRaftReady 处理已经Ready的数据

该方法用来处理 rawNode 传递来的 Ready。主要分为 5 步:

  1. 判断是否有新的 Ready,没有就什么都不处理;
  2. 调用 SaveReadyState 将 Ready 中需要持久化的内容保存到 badger。如果 Ready 中存在 snapshot,则应用它;
  3. 然后调用 d.Send() 方法将 Ready 中的 Msg 发送出去;
  4. 应用 Ready 中的 CommittedEntries;
  5. 调用 d.RaftGroup.Advance() 推进 RawNode;
// HandleRaftReady 处理 rawNode 传递来的 Ready
//HandleRaftReady 对这些 entries 进行 apply(即执行底层读写命令)
//每执行完一次 apply,都需要对 proposals 中的相应 Index 的 proposal 进行 callback 回应(调用 cb.Done())
//然后从中删除这个 proposal。
func (d *peerMsgHandler) HandleRaftReady() {
	if d.stopped {
		return
	}
	// Your Code Here (2B).
	//1. 判断是否有新的 Ready,没有就什么都不处理;
	if !d.RaftGroup.HasReady() {
		return
	}

	ready := d.RaftGroup.Ready()
	//2. 调用 SaveReadyState 将 Ready 中需要持久化的内容保存到 badger。
	//如果 Ready 中存在 snapshot,则应用它;
	//保存 unstable entries, hard state, snapshot
	_, err := d.peerStorage.SaveReadyState(&ready)
	if err != nil {
		log.Panic(err)
	}

	//3. 调用 d.Send() 方法将 Ready 中的 Msg 发送出去;
	d.Send(d.ctx.trans, ready.Messages)

	//4. 应用待apply的日志,即实际去执行
	if len(ready.CommittedEntries) > 0 {

		kvWB := &engine_util.WriteBatch{}
		for _, ent := range ready.CommittedEntries {
			kvWB = d.processCommittedEntry(&ent, kvWB)
			// 节点有可能在 processCommittedEntry 返回之后就销毁了
			// 如果销毁了需要直接返回,保证对这个节点而言不会再 DB 中写入数据
			if d.stopped {
				return
			}
		}
		// 更新 RaftApplyState
		lastEntry := ready.CommittedEntries[len(ready.CommittedEntries)-1]
		d.peerStorage.applyState.AppliedIndex = lastEntry.Index
		if err := kvWB.SetMeta(meta.ApplyStateKey(d.regionId), d.peerStorage.applyState); err != nil {
			log.Panic(err)
		}
		// 在这里一次性执行所有的 Command 操作和 ApplyState 更新操作
		kvWB.MustWriteToDB(d.peerStorage.Engines.Kv)
	}
	//5. 调用 d.RaftGroup.Advance() 推进 RawNode,更新 raft 状态
	d.RaftGroup.Advance(ready)
}

SaveReadyState - 持久化

PeerStorage.go顾名思义需要负责该 peer 的存储。TinyKV 这里提供了两个 badger 实例,分别是 raftDB 和 kvDB,每个实例负责存储不同的内容:

raftDB 存储:

  • Ready 中需要 stable 的 entries。
  • RaftLocalState

kvDB 存储:

  • RaftApplyState
  • RegionLocalState

SaveReadyState方法用来持久化 Ready 中的数据。

  1. 通过 raft.isEmptySnap() 方法判断是否存在 Snapshot,如果有,则调用 ApplySnapshot() 方法应用;
  2. 调用 Append() 将需要持久化的 entries 保存到 raftDB;
  3. 保存 ready 中的 HardState 到 ps.raftState.HardState,注意先使用 raft.isEmptyHardState() 进行判空;
  4. 持久化 RaftLocalState 到 raftDB;
  5. 通过 raftWB.WriteToDB 和 kvWB.WriteToDB 进行底层写入。
// Save memory states to disk.
//	将内存状态保存到磁盘。持久化
//将 raft.Ready 中的数据保存到 badger 中,包括追加日志和保存 Raft 硬状态。
// Do not modify ready in this function, this is a requirement to advance the ready object properly later.
//不要在此函数中修改ready,这是稍后正确推进ready对象的要求。
// 处理 Ready 中的 Entries 和 HardState 数据

func (ps *PeerStorage) SaveReadyState(ready *raft.Ready) (*ApplySnapResult, error) {
	// Hint: you may call `Append()` and `ApplySnapshot()` in this function
	// Your Code Here (2B/2C).
	raftWB := &engine_util.WriteBatch{}
	var result *ApplySnapResult
	var err error
	// 1. 通过 raft.isEmptySnap() 方法判断是否存在 Snapshot,如果有,则调用ApplySnapshot() 方法应用;

	// 2. 调用 Append() 将需要持久化的 entries 保存到 raftDB;
	if err = ps.Append(ready.Entries, raftWB); err != nil {
		log.Panic(err)
	}
	// 3. 保存 ready 中的 HardState 到 ps.raftState.HardState,注意先使用raft.isEmptyHardState() 进行判空;
	if !raft.IsEmptyHardState(ready.HardState) {
		*ps.raftState.HardState = ready.HardState
	}
	// 4. 持久化 RaftLocalState 到 raftDB;
	if err = raftWB.SetMeta(meta.RaftStateKey(ps.region.Id), ps.raftState); err != nil {
		log.Panic(err)
	}
	// 5. 通过 raftWB.WriteToDB 和 kvWB.WriteToDB 进行原子的写入到存储引擎
	raftWB.MustWriteToDB(ps.Engines.Raft)
	return result, nil
}

Append - 持久化日志

  该方法供 SaveReadyState() 调用,负责将 Ready 中的 entries 持久化到 raftDB 中去,然后更新 RaftLoaclState 的状态。同时,如果底层存储有冲突条目,则将其删除。

// Append the given entries to the raft log and update ps.raftState also delete log entries that will
// never be committed
// 负责将 Ready 中的 entries 持久化到 raftDB 中去,然后更新 RaftLoaclState 的状态。同时,如果底层存储有冲突条目,则将其删除。
// 将给定的条目附加到raft日志并更新ps.raftState,同时删除永远不会提交的日志条目
func (ps *PeerStorage) Append(entries []eraftpb.Entry, raftWB *engine_util.WriteBatch) error {
	// Your Code Here (2B).
	if len(entries) == 0 {
		return nil
	}
	// 将所有的 Entry 都添加到 WriteBatch 中
	for _, ent := range entries {
		if err := raftWB.SetMeta(meta.RaftLogKey(ps.region.Id, ent.Index), &ent); err != nil {
			log.Panic(err)
		}
	}

	currLastTerm, currLastIndex := entries[len(entries)-1].Term, entries[len(entries)-1].Index
	prevLastIndex, _ := ps.LastIndex() // prevLastIndex 对应 RaftLog 中的 stabled
	for index := currLastIndex + 1; index <= prevLastIndex; index++ {
		raftWB.DeleteMeta(meta.RaftLogKey(ps.region.Id, index))
	}
	ps.raftState.LastTerm, ps.raftState.LastIndex = currLastTerm, currLastIndex // 更新 RaftLocalState
	return nil
}

结构体与一些流程介绍

  相信在上面的介绍中,partB想让我们做什么读者已经清楚了。下面介绍一些结构体

RaftStore 会在节点启动的时候被创建,它负责维护在该节点上所有的 region 和对应的 peer。RaftStore 使用 storeMeta 维护该节点所有的元数据。

type storeMeta struct {
	sync.RWMutex
	/// region start key -> region
	regionRanges *btree.BTree
	/// region_id -> region
	regions map[uint64]*metapb.Region
	/// `MsgRequestVote` messages from newly split Regions shouldn't be dropped if there is no
	/// such Region in this store now. So the messages are recorded temporarily and will be handled later.
	pendingVotes []*rspb.RaftMessage
}
  • sync.RWMutex ,因为一个 RaftStore 上面的多个 region 可以同时接收请求,所以为了防止它们同时修改 storeMeta 元数据,这里会提供一个锁。
  • regionRanges,用于快速定位某一个 key 在哪个 region 中。
  • regions,region id 映射 region 结构体。
  • pendingVotes,自己不用管,已经写好了。

  RaftStore 被启动后,其首先会加载 peers,然后将 peers 注册进 router。加载 peer 的时候如果底层存储着之前 peer 信息,那么根据存储的信息加载,否则就新建。之后会启动一系列的 worker。RaftStore 的主要工作你可以认为就是接收来自其他节点的 msg,然后根据 msg 里面的 region id 将其路由到指定 region 的 peer 上。同时 RaftStore 也会将 peer 获取需要发送给其他节点的信息,发到其他的 RaftStore 上。

  可以看到run函数显示将Msg读出来然后调用HandleMsg,如果是raftcmd,最终会调用到我们写的proposeRaftCommand中。将Msg全部转发下去之后,就调用HandleRaftReady去处理Ready的数据了。

// run runs raft commands.
// On each loop, raft commands are batched by channel buffer.
// After commands are handled, we collect apply messages by peers, make a applyBatch, send it to apply channel.
//run运行raft命令。
//在每个循环中,raft命令由通道缓冲区进行批处理。
//处理完命令后,我们通过对等方收集apply消息,制作applyBatch,并将其发送到apply通道。
//整个过程分为两部分:raft worker 轮询 raftCh 以获得消息,
//这些消息包括驱动 Raft 模块的基本 tick 和作为 Raft 日志项的 Raft 命令;
//它从 Raft 模块获得并处理 ready,包括发送raft消息、持久化状态、将提交的日志项应用到状态机。
//一旦应用,将响应返回给客户。
func (rw *raftWorker) run(closeCh <-chan struct{}, wg *sync.WaitGroup) {
	defer wg.Done()
	var msgs []message.Msg
	for {
		msgs = msgs[:0]
		select {
		case <-closeCh:
			return
		case msg := <-rw.raftCh:
			msgs = append(msgs, msg)
		}
		pending := len(rw.raftCh)
		for i := 0; i < pending; i++ {
			msgs = append(msgs, <-rw.raftCh)
		}
		peerStateMap := make(map[uint64]*peerState)
		for _, msg := range msgs {
			peerState := rw.getPeerState(peerStateMap, msg.RegionID)
			if peerState == nil {
				continue
			}
			newPeerMsgHandler(peerState.peer, rw.ctx).HandleMsg(msg)
		}
		for _, peerState := range peerStateMap {
			newPeerMsgHandler(peerState.peer, rw.ctx).HandleRaftReady()
		}
	}
}

HandleMsg(msg message.Msg) 负责分类处理各种 msg。

  • MsgTypeRaftMessage,来自外部接收的 msg,在接收 msg 前会进行一系列的检查,最后通过 RawNode 的 Step() 方法直接输入,不需要回复 proposal。
  • MsgTypeRaftCmd,通常都是从 client 或自身发起的请求,比如 Admin 管理命令,read/write 等请求,需要回复 proposal。
  • MsgTypeTick,驱动 RawNode 的 tick 用。
  • MsgTypeSplitRegion,触发 region split,在 project 3B split 中会用到。
  • MsgTypeRegionApproximateSize,修改自己的 ApproximateSize 属性,不用管。
  • MsgTypeGcSnap,清理已经安装完的 snapshot。
  • MsgTypeStart,启动 peer,新建的 peer 的时候需要发送这个请求启动 peer,在 Project3B split 中会遇到。
func (d *peerMsgHandler) HandleMsg(msg message.Msg) {
	switch msg.Type {
	case message.MsgTypeRaftMessage:
		raftMsg := msg.Data.(*rspb.RaftMessage)
		if err := d.onRaftMsg(raftMsg); err != nil {
			log.Errorf("%s handle raft message error %v", d.Tag, err)
		}
	case message.MsgTypeRaftCmd:
		raftCMD := msg.Data.(*message.MsgRaftCmd)
		d.proposeRaftCommand(raftCMD.Request, raftCMD.Callback)
	case message.MsgTypeTick:
		d.onTick()
	case message.MsgTypeSplitRegion:
		split := msg.Data.(*message.MsgSplitRegion)
		log.Infof("%s on split with %v", d.Tag, split.SplitKey)
		d.onPrepareSplitRegion(split.RegionEpoch, split.SplitKey, split.Callback)
	case message.MsgTypeRegionApproximateSize:
		d.onApproximateRegionSize(msg.Data.(uint64))
	case message.MsgTypeGcSnap:
		gcSnap := msg.Data.(*message.MsgGCSnap)
		d.onGCSnap(gcSnap.Snaps)
	case message.MsgTypeStart:
		d.startTicker()
	}
}

make project2b 看看~

pass

rm -rf /tmp/*test-raftstore*
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestBasic2B|| true
=== RUN   TestBasic2B
--- PASS: TestBasic2B (25.71s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	25.765s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestConcurrent2B|| true
=== RUN   TestConcurrent2B
--- PASS: TestConcurrent2B (35.32s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	35.354s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestUnreliable2B|| true
=== RUN   TestUnreliable2B
--- PASS: TestUnreliable2B (33.55s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	33.616s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestOnePartition2B|| true
=== RUN   TestOnePartition2B
--- PASS: TestOnePartition2B (3.59s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	3.639s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestManyPartitionsOneClient2B|| true
=== RUN   TestManyPartitionsOneClient2B
--- PASS: TestManyPartitionsOneClient2B (24.82s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	24.897s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestManyPartitionsManyClients2B|| true
=== RUN   TestManyPartitionsManyClients2B
--- PASS: TestManyPartitionsManyClients2B (25.36s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	25.431s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestPersistOneClient2B|| true
=== RUN   TestPersistOneClient2B
--- PASS: TestPersistOneClient2B (29.36s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	29.405s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestPersistConcurrent2B|| true
=== RUN   TestPersistConcurrent2B
--- PASS: TestPersistConcurrent2B (44.34s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	44.423s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestPersistConcurrentUnreliable2B|| true
=== RUN   TestPersistConcurrentUnreliable2B
--- PASS: TestPersistConcurrentUnreliable2B (44.30s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	44.363s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestPersistPartition2B|| true
=== RUN   TestPersistPartition2B
--- PASS: TestPersistPartition2B (35.76s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	35.828s
GO111MODULE=on go test -v --count=1 --parallel=1 -p=1 ./kv/test_raftstore -run ^TestPersistPartitionUnreliable2B|| true
=== RUN   TestPersistPartitionUnreliable2B
--- PASS: TestPersistPartitionUnreliable2B (34.59s)
PASS
ok  	github.com/pingcap-incubator/tinykv/kv/test_raftstore	34.673s
rm -rf /tmp/*test-raftstore*

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

TinyKv Project2 PartB RaftKV 的相关文章

  • 大端小端和C实现大小端字节序的转化

    大端小端 小端就是低位字节放在内存的低地址端 xff0c 高位字节放在内存的高地址端 大端就是高位字节放在内存的低地址端 xff0c 低位字节放在内存的高地址端 举一个例子 xff0c 比如数字0x12 34 56 78 xff08 注意7
  • 【计算机图形学/实时渲染】 阴影(GAMES202)

    阴影 对于静态的物体 xff0c 可以使用Lightmap烘焙的方法来获取物体的影子 xff08 静态阴影 xff09 xff0c 而对于动态的物体 xff0c 一般采用的是Shadowmap的技术 光照贴图 xff08 Lightmap
  • 解决Mingw-w64下载太慢问题

    官网下载太慢了 xff0c 我们只用换一个镜像源就可以 1 点击Problems Downloading 2 切换香港的
  • 嵌入式Linux开发8——UART串口通讯

    1 背景知识 1 1 UART通讯格式 串口全称叫做串行接口 xff0c 通常也叫做 COM 接口 xff0c 串行接口指的是数据一个一个的顺序传输 xff0c 通信线路简单 使用两条线即可实现双向通信 xff0c 一条用于发送 xff0c
  • 二叉树笔记

    二叉树 二叉搜索 xff08 排序 查找 xff09 树 二叉查找树 xff08 Binary Search Tree xff09 xff0c xff08 又 xff1a 二叉搜索树 xff0c 二叉排序树 xff09 它或者是一棵空树 x
  • C++面试常见题目

    C 43 43 面试常见题目 c 43 43 编译过程自动类型推导auto和decltype重载 重写 xff08 覆盖 xff09 和隐藏的区别C 43 43 构造函数和析构函数能调用虚函数吗volatile关键词运算符重载格式noexe
  • 计算机网络面试常问问题

    C 43 43 面试 计算机网络常见问题 计算机网络常见问题TCP IP协议笔记TCPTCP的特点及目的序列号与确认应答提高可靠性为什么是三次握手和四次挥手滑动窗口流量控制拥塞控制TCP粘包问题 httphttp和https的区别https
  • Trajectory generation for quadrotor while tracking a moving target in cluttered environment

    四旋翼在杂波环境下跟踪运动目标的轨迹生成 摘要1 文章主要贡献2 前言2 1 轨迹公式2 2 实现结构 3 跟踪轨迹生成3 1 标称路径点生成3 2 可行路径点生成3 3 安全飞行走廊生成3 4 代价函数3 5 强制约束3 6 求解跟踪轨迹

随机推荐

  • 翻译-Frustum PointNets for 3D Object Detection from RGB-D Data

    Frustum PointNets for 3D Object Detection from RGB D Data 摘要介绍相关工作从RGB D数据中检测三维物体基于前视图图像的方法 xff1a 基于鸟瞰图的方法 基于3D的方法 点云的深度
  • Online Trajectory Generation of a MAV for Chasing a Moving Target in 3D Dense Environments

    微型无人机的在线轨迹生成 xff0c 用于在3D密集环境中追踪运动目标 摘要一 介绍二 相关工作A 在障碍物环境中追逐B 通过预先规划安全地生成轨迹 三 问题陈述A 问题设置B 能力C 命名 IV 视点生成A 可见度指标B 具有安全性和可见
  • 配置目标跟踪开源项目traj_gen_vis踩过的坑

    项目地址 https github com icsl Jeon traj gen vis 安装依赖需注意的问题 traj gen with qpoases 需安装ros分支的代码 xff08 这个作者并没有指出 xff0c 坑 xff09
  • cmake arm-none-eabi-gcc for stm32 cpp project

    尝试把原有的stm32工程F1canBootloader用cmake来管理 xff0c 遇到了以下几个坑 xff1a 1 报错 xff0c undefined reference to 96 dso handle 39 CMakeFiles
  • 网络攻防之wireshark抓取登录信息

    使用wireshark抓取登录信息 简介 xff1a Wireshark xff08 前称Ethereal xff09 是一个网络封包分析软件 网络封包分析软件的功能是撷取网络封包 xff0c 并尽可能显示出最为详细的网络封包资料 Wire
  • 头文件互相包含所引发的的问题(深入剖析)

    今天写程序出现了一个让人蛋疼的错误 xff0c 后来发现是由于头文件互相包含所引起的 原本只是简单的以为头文件互相包含只会触发 xff0c 头文件的递归包含 即 xff0c A包含B xff0c 所以才A的头文件里会将B的头文件内容拷贝过来
  • C++11异步操作future和aysnc 、function和bind

    C 43 43 11异步操作future和aysnc function和bind 前言异步操作std future和std aysnc 介绍std future和std aysnc的使用Demostd packaged task 介绍std
  • C++文件服务器项目—FastDFS—1

    C 43 43 文件服务器项目 FastDFS 1 前言1 项目架构2 分布式文件系统2 1 传统文件系统2 2 分布式文件系统 3 FastDFS介绍3 1 fdfs概述3 2 fdfs框架中的三个角色3 3 fdfs三个角色之间的关系3
  • C++文件服务器项目—Redis—2

    C 43 43 文件服务器项目 Redis 2 前言1 数据库类型1 1 基本概念1 2 关系 非关系型数据库搭配使用 2 redis基础知识点2 1 redis安装2 2 redis中的两个角色2 3 redis中数据的组织格式2 4 r
  • C++文件服务器项目—Nginx—3

    C 43 43 文件服务器项目 Nginx 3 前言1 Nginx一些基本概念1 1 Nginx初步认识1 2 正向代理概念理解1 3 反向代理概念理解 2 Nginx的安装与配置2 1 Nginx与相关依赖库的安装2 2 Nginx相关的
  • C++文件服务器项目—FastCGI—4

    C 43 43 文件服务器项目 FastCGI 4 前言1 CGI 概念理解2 FastCGI 概念理解3 FastCGI和spawn fcgi安装4 FastCGI和 Nginx的关系5 Nginx数据转发 修改配置文件6 spawn f
  • C++文件服务器项目—Nginx+FastDFS插件—5

    C 43 43 文件服务器项目 Nginx 43 FastDFS插件 5 前言1 文件上传下载流程1 1 文件上传流程1 2 文件下载流程1 3 文件下载优化流程 2 Nginx和fastDFS插件2 1 安装Nginx和fastdfs n
  • C++文件服务器项目—数据库表设计 与 后端接口设计—6

    C 43 43 文件服务器项目 数据库表的设计 6 前言1 数据库建表1 1 用户信息表 user info1 2 文件信息表 file info1 3 用户文件列表表 user file list1 4 用户文件数量表 user file
  • C语言中宏定义的使用

    1 引言 预处理命令可以改变程序设计环境 提高编程效率 它们并不是 C 语言本身的组成部分 不能直接对 它们进行编译 必须在对程序进行编译之前 先对程序中这些特殊的命令进行 预处理 经过预处理后 程序就不再包括预处理命令了 最后再由编译程序
  • C++文件服务器项目—项目总结与反向代理—7

    C 43 43 文件服务器项目 项目总结与反向代理 7 1 项目总结2 项目提炼3 web服务器的反向代理4 存储节点的反向代理 组件介绍基本写完了 xff0c 后续进行深入 本专栏知识点是通过零声教育的线上课学习 xff0c 进行梳理总结
  • https相关内容

    https相关内容 前言基础概念理解https传输过程 前言 本文写https相关内容 xff0c 持续补充 基础概念理解 对称加密 加解密秘钥是同一个 非对称加密 公钥 私钥 sa gt 公钥私钥都是两个数字ecc gt 椭圆曲线 两个点
  • TinyKv介绍

    TinyKv介绍 前言tinykv架构代码结构如何去写TinyKv参考内容 前言 开一个新坑 xff0c 将tinykv的4个project全部实现 虽然今天我点进去看的时候就萌生退意 好在没有放弃之前 xff0c 把project1完成了
  • TinyKv Project1 Standalone KV

    TinyKv Project1 Standalone KV 前言Project1 StandaloneKV 文档翻译文档的重点内容StandAloneStorageWriteReader Server单元测试 前言 project1还是比较
  • TinyKv Project2 PartA RaftKV

    TinyKv Project2a RaftKV 前言Project2 RaftKV 文档翻译Project2A重点内容抛出RaftLogRaftLog结构体字段详解RaftLog核心函数详解 RaftRaft 驱动规则Msg的作用与含义Ms
  • TinyKv Project2 PartB RaftKV

    TinyKv Project2 PartB RaftKV 前言Project2 PartB RaftKV 文档翻译PartB 到底想让我们做什么 xff1f 分析要实现的函数到底要干什么事情proposeRaftCommand 将上层命令打