6.824分布式

2023-11-05

MapReduce

例子加深理解
在这里插入图片描述

在这里插入图片描述

你的工作是实现一个分布式的MapReduce,包括两个程序,master 和 worker。只有一个master进程 以及 一个或多个worker进程并行执行。在真实的系统中,工作人员将在许多不同的机器上运行,但在本实验室中,您将在一台机器上运行所有的worker。worker将通过RPC与master通信。每个worker进程将向master请求任务,从一个或多个文件读取任务的输入,执行任务,并将任务的输出写入一个或多个文件。如果一个worker没有在合理的时间内完成任务(对于这个实验室,使用10秒),master应该注意,并把同样的任务交给另一个worker。

开始已经给了一些代码。main/mrcoordinator和main/mrworker.go分别是master和worker 的主例程,不要修改这些代码。

应该在mr/master.go、mr/worker.go和mr/rpc.go中给出实现。

lab1任务,干三件事

  • 写coordinator(master)
  • 写worker
  • 写rpc

GFS

如何将大文件保存到一个单机中:
在这里插入图片描述

  • 索引保存的是硬盘中的位置(chunkid、偏移量)

如何将一个大文件存到不同机器
在这里插入图片描述

  • 索引保存的信息:服务器id、偏移量、chunkid

1. 如何读

  1. client将请求访问的文件名、chunk id(chunk id可以在client段通过偏移量算出)发送给Master。
  2. Master通过文件名去找具体的文件,再通过文件的文件表(保存的是文件每个chunk保存在哪个服务器上)找到该chunk id对应的服务器。接着Master 返回Chunk handle(访问权限)、chunk id对应的服务器
  3. client通过 chunk handle、要访问的数据(chunkid) 发送给ChunkSever服务器进行通信

流程如下图所示:
在这里插入图片描述

2. 如何写

  1. client请求写数据,把chunid告诉Master
  2. Master返回两个信息:Primary的服务器id副本服务器id
  3. client 开始往离他最近的服务器(假设CS0)传输要写入的数据。CS0先缓存(WAL,也就是先写入日志),并向下通知CS3,CS3通知CS5。CS5缓存完向上通知CS3,CS3也缓存完就通知CS0,CS0通知client
  4. client收到CS0的缓存完的信息(说明所有chunkSever都缓存完),就通知Primary的CS(CS3)所有CS缓存完毕。
  5. 接收到client的缓存完毕信息后,Primary(CS3)通知其他CS开始写
  6. 其他CS写完后通知Primary,已经写入到磁盘
  7. 在收到其他CS的Done、以及自己写入磁盘后,Primary发送Done信息给client。

WAL(WriteAhead Logging),先写日志,再写硬盘。保证原子性和持久性

在这里插入图片描述

如果写错了,就告诉client说写错了,让客户端重试,分布式系统不要想着修复,往往越修复错误越多

3. 杂问题

1. 如何减少Master的数据和流量?

但是索引保存的信息:服务器id、偏移量、chunkid这种保存方式会很麻烦,一旦哪个文件修改了,都需要劳烦master更改。所以,为了减少master数据和流量将偏移量保存在服务器上,master只保存chunk对应的服务器信息。如下图所示:在这里插入图片描述

2. 如何发现数据损坏:校验和

chunk(64MB)由一系列的block(64KB)组成,每个block都会保存校验和(checksum)每次读取时通过校验和来判断数据是否损坏,如下图所示
在这里插入图片描述

3. 如何减少chunkServer挂掉带来的损失:副本

通过将chunk保存在3个不同的chunkServer上(也就是创建3个副本),来避免chunk服务器挂掉带来的损失。

4. 如何恢复chunkserver中已经丢失数据的chunk:向Master求助

chunkServer4中丢失(损坏)了chunk03的数据,向Master求助,Master告诉chunkServer4:CS3和CS5也保存了;CS4就会去找CS3或者CS5请求Chunk03。如下图所示:

5. 如何发现ChunkServer挂掉:心跳包

6. ChunkServer挂掉如何恢复数据:修复进程

会有一个修复进程,记录每个少于3个ChunkServer的数据,并按照副本的多少从少到多排序(先修复副本最少的数据)。如下图所示:在这里插入图片描述

7. 如何应对热点:热点平衡进程

热点平衡进程会记录每个chunk的访问频率,以及每个chunkServer的空余空间、带宽等信息。当某个副本过于繁忙,成为热点,会将其复制到更多的ChunkServer上。

那如何选择ChunkServer呢?基于带宽和剩余空间来选择

3. 通过复制来实现容错----Virtual Machines

3.1 复制的基本介绍

复制(replication)能解决fail-stop故障。比如:拔掉网线,硬件故障等,都可以通过复制来解决

fail-stop如果某些东西出了故障,比如说计算机,那么它会单纯的停止运行。

但是复制不能处理软件中的bug和硬件设计中的缺陷。

有两种办法实现复制

  • state transfer
  • replicated state machine(这个讨论比较多)

state tranfer就是发送Primary的状态。比如Primary将自己的完整状态,例如内存,拷贝并发送给Backup。 Backup会保存收到的最近一次状态。当Primary故障了,Backup就可以从它所保存的最新状态开始运行。

replicated state machine类似于redis的主从复制,在获取到初始化的状态后,将每一次输入(称外部操作或者外部事件)也发送给Backup,从而实现同步。 注意:通常来说,如果有两台计算机,如果它们从相同的状态开始,并且它们以相同的顺序,在相同的时间,看到了相同的输入,那么它们会一直互为副本(数据保持一致)。

replicated state machine更常用的原因是:外部事件或操作肯定比整个服务器的状态要小。

但是replicated state machine他会更加复杂一点,需要考虑的东西比较多。而state transfer只是简单的复制整个状态就行,不需要考虑其他事

❓: 随机操作在replicated state machine怎么处理?

对于这一类问题的统一答案是,Primary会执行这些指令,并将结果发送给Backup。Backup不会执行这些指令,而是在应该执行指令的地方,等着Primary告诉它,正确的答案是什么。

需要关注的知识点:

  • 同步的频率:同步频率太快性能变差,同步太慢导致出现故障时,丢失的数据过多
  • 切换的方案:在发生故障时,如何切换到副本上
  • 异常:在切换过程中,必然会出现异常现象,必须找到一种方法去解决
  • 新的副本:在Primary故障后,一个副本需替代成为新的Primary,此时,需要一个添加一个新的副本。

3.2 VMwareFT的特点

绝大多数的复制都是类似于GFS的软件层级的复制(更高效),而VMware FT(这篇论文)的复制是最底层的复制(系统层面)(包括复制内存的内容,复制寄存器的内容等),所以VMware FT需要考虑更加细节的东西(比如说需要确保中断在Primary和Backup的相同位置执行)。

但是VMware FT的优势在于:它从底层机器级别实现复制,因此它不关心你在机器上运行什么样的软件,它就是复制底层的寄存器和内存。所以基本可以实现所有软件的复制。VMware FT就是可以让任何软件都具备容错性的魔法棒

3.3 VMwareFT的工作原理

基本工作流程

事实上,Primary和Backup是运行在两个不同的物理服务器的VMM(虚拟机软件)上:在一个物理服务器上有一个虚拟机,这个虚拟机跑了某个操作系统(或者某个应用程序),我们将之指定为Primary。在第二个物理服务器上,运行了相同的VMM,和一个相同的虚拟机作为Backup。

在这里插入图片描述

他们处在同一个局域网内(LAN),该局域网还有一些客户端(用于与多副本服务进行交互的计算机),如下图所示
在这里插入图片描述

基本的工作流程是,我们假设这两个副本,或者说这两个虚拟机:Primary和Backup,互为副本。某些我们服务的客户端,向Primary发送了一个请求,这个请求以网络数据包(network packet)的形式发出。

这个网络数据包产生一个中断(interrupt),之后这个中断送到了Primary的VMM。VMM看到热点(a hot)可以发现这是一个发给我们的多副本服务的一个输入,所以这里VMM会做两件事情:

  • 在虚拟机的操作系统上模拟网络数据包到达的中断,将相应的数据送到Primary的副本
  • VMM在将网络数据包拷贝一份,并通过网络送给Backup虚拟机所在的VMM(replicated state machine)。Backup虚拟机所在的VMM收到网络数据包后也会发送给backup。

Log Channel:将Primary到Backup之间同步的数据流通道称之为Log Channel。
Log Event/entry:从Primary发往Backup的事件被称为Log Event/Entry。

如何回复应答?

  • 在Primary虚拟机里面,服务会生成一个回复报文,并通过VMM在虚拟机内模拟的虚拟网卡发出。之后VMM可以看到这个报文,它会实际的将这个报文发送给客户端。
  • 而在Backup中,所生成的回复报文会被丢弃。

注意:当网卡有新的数据包到达时,VMM不会允许直接将网卡的数据包通过DMA传到Primary虚拟机中,因为这样就失去了对Primary的时序控制。 准确的做法是:

  • 网卡会将数据包拷贝到VMM的内存,并利用中断提醒VMM有新的数据包到达。VMM自身在将整个网络数据包拷贝到Primary虚拟机的内存,在模拟中断发送给Primary,并记录中断时的指令序号(为的是Backup在同样的位置执行中断)。同时在讲网络数据包和指令序号发送给Backup。

Backup如何检测Primary故障停止?

在Primary中,会通过周期性的时钟中断生成Log Entry来发送给Backup。在某个时间段内,如果没有收到中断,说明Primary故障停止(fail-stop)了。

此时,Backup虚拟机会上线,他不会在等待来自Primary的Log Channel的事件。并且,Backup的VMM会让后续的客户端请求发往Backup虚拟机,而不是Primary虚拟机,Backup的VMM不再会丢弃Backup虚拟机的输出。此时Backup虚拟机接管了服务,并称为Primary。

Non-Deterministic Events:非确定性事件的处理

非确定性事件有很多,主要有以下几种:

  • 客户端的输入:客户端的输入可能任意时间到达,是不可预测的,所输入的内容也是不可预测的。这里的系统输入的只能是网络数据包。所以当我们说输入的时候,我们实际上是指接收到了一个网络数据包。而一个网络数据包对于我们来说有两部分,一个是数据包中的数据,另一个是提示数据包送达了的中断。输入的内容,以及中断的时机(Primary和Backup需要在同一个指令位置触发中断)是需要考虑的
  • 随机指令(比如获取随机数)
  • 多核并发:并发会导致指令交替执行,没办法保证Primary和Backup在同样的交替情况下执行,而这会导致执行指令的顺序是不可预测的。因此, 在这篇论文中,只考虑单核的情况,多核会采用state transfer 来实现容错 。

3.4 Ouput Rule

在这个系统中,唯一的输出就是对于客户端请求的响应。

Output Rule:直到Backup的VMM确认收到了相应的Log entry后,Primary虚拟机才允许将生成输出发送。 其实核心思想就是确保Backup能接收到了Log entry,才发送输出信息。

所以,几乎每一个复制系统都有这个问题,在某个时间点,Primary必须要停下来等待Backup,这对于性能是实打实的限制。每次生成一个输出时,都需要至少等待5毫秒,等Backup确认收到了这个Log entry,然后VMM才能将Primary输出发送到网络。

这就是VMwareFT的缺陷,所以大家更喜欢在软件层面实现复制。因为这样的复制可以理解每一个操作含义,这样就不需要在每个网络数据包到达时都暂停一下(例如,如果是软件层面的复制,在读操作就可以不暂停,甚至写操作也可以放缓冲区,主机持续运行,参考redis)

3.5 Test-and-Set服务器

Test-and-Set服务器作用是:只要某个副本想上线接管服务时,他就需要去询问Test-and-Set服务器。主要是为了解决脑裂的。

为什么会产生脑裂
Primary和Backup都在运行,但是它们之间的网络出现了问题,同时它们各自又能够与一些客户端通信。这时,它们都会以为对方挂了,自己需要上线并接管服务。

只要是涉及到网络,都有可能出现脑裂的现象,这不是系统设计的问题。所以需要通过Test-and-Set服务解决:向一个外部的第三方权威机构求证,来决定Primary还是Backup允许上线。这里的第三方就是Test-and-Set服务。

具体来说:Test-and-Set服务不运行在Primary和Backup的物理服务器上,VMware FT需要通过网络支持Test-and-Set服务。这个服务会在内存中保留一些标志位,当你向它发送一个Test-and-Set请求,它会设置标志位,并且返回旧的值。Primary和Backup都需要获取Test-and-Set标志位,这有点像一个锁。为了能够上线,它们或许会同时发送一个Test-and-Set请求,给Test-and-Set服务。当第一个请求送达时,Test-and-Set服务会说,这个标志位之前是0,现在是1。第二个请求送达时,Test-and-Set服务会说,标志位已经是1了,你不允许成为Primary。对于这个Test-and-Set服务,我们可以认为运行在单台服务器。当网络出现故障,并且两个副本都认为对方已经挂了时,Test-and-Set服务就是一个仲裁官,决定了两个副本中哪一个应该上线。

4. Go的简单应用

go主要的同步方式有四种

  • sync.Mutex
  • sync.NewCond(&mu) // 条件变量
  • make(chan bool, size) // channel。适用于生产者消费者模型。尽量避免使用,坑多
  • sync.WaitGroup // 计数信号量

sync.WaitGroup用法

var wg sync.WaitGroup
wg.Add(1)// wg 信号量加1
wg.Done()// wg 信号量-1,注意 wg.Add(-1)和wg.Done是完全等价的
wg.Wait()//当wg不为0时,阻塞。只有当wg为0时,才继续执行


在使用goroutine时使用匿名函数

func main(){
	var wg sync.WaitGroup
	for i:=0; i<5; i++{
		wg.Add(1)
		//goroutine 匿名函数(闭包)
		go func(x int){
	        // 协程执行任务
	        sendRPC(x)
	        wg.Done() // 减少计数器
	    }(i)//将参数传入匿名函数
    // 阻塞主进程,直到所有协程完成任务(计数器为0)
	wg.Wait()
}


func sendRPC(i int){
	println(i)
}

上述代码可以用在raft选举中,等待所有候选人投完票,主协程在继续往后执行。或者利用条件变量(效率更高,不需要等待所有人投完票才看结果,只需要yes占半数就能胜任leader,如下代码所示)

func main() {
	count:=1//投yes的数量。注意自己也会投自己一票
	finished:=1//已完成投票的数量。
	total:=10//假设节点总数为total
	var mu sync.Mutex
	cond := sync.NewCond(&mu)

	//开始进行选举
	for i:=0; i<total-1; i++{
		//协程运行
		go func() {
			vote := RequestVote()//投票RPC
			mu.Lock()
			defer mu.Unlock()
			if vote{
				count++
			}
			finished++
			cond.Broadcast()
		}()
	}
	mu.Lock()
	for count <= total/2 && finished<total{
		cond.Wait()//每当一个协程完成投票,就会解除阻塞一次,判断是否满足条件,如果满足条件了,就跳出循环
	}
	if(count>total/2){
		fmt.Println("success!","Yes=", count)
		//改变该follower状态
	}else {
		fmt.Println("lost!","Yes=", count)
	}
	mu.Unlock() 

}
func RequestVote() bool {
	rand.Seed(time.Now().UnixNano()) // 设置随机数种子
	return rand.Intn(2) == 0 // 生成 0 或者 1,对应 true 或者 false
}

在某个函数调用rpc调用函数不要使用锁锁住rpc调用函数,会造成死锁

在某个函数调用rpc调用函数不要使用锁锁住rpc调用函数(也就是一般来讲不要在 RPC 调用期间持有锁),会造成死锁。

意思是在等待其他peer响应时(调用rpc等对方回复消息),就不应该持有锁

Debug方法

方法1
util.go 文件中对打印 log 进行了封装,只有 Debug 参数等于 1 时才输出 log。

好处就是在代码中任何地方都可以加打印 log(如何用请看下面示例),例如每个节点的变化。而需要关闭时只改一个参数即可(我之前都是一个个删除)。

~~some code
DPrintf("这里打印输出信息,只有当把util.go的Debug参数=1时,才会输出这行信息")

方法2
ctrl + \,Go 会终止程序,退出所有协程并打印 stacktrace,从stacktrace 中就能找到可能出问题的地方。

方法3
打开race检测看是否存在并发冲突
go test -run 2A // 正常运行测试案例 go test -race -run 2A // 开启race检测

5. Raft

共识:分布式系统中多个节点之间,彼此对某个状态达成一致结果的过程。换句话说就是,一群分布式的机器得在一些问题上统一口径,例如现在所有节点都认同,现在A正拥有一个锁。

例如:

  • 逻辑时间的共识,来决定事件发生的顺序
  • 互斥性共识,决定谁正拥有访问的共享资源的权限
  • 协调者共识,谁是当下的leader

通过共识算法实现分布式锁

如何实现复制?
首先,领导者通过日志复制(AppendEntries)RPC消息,将日志项复制到集群其他节点上

接着,如果领导者接收到大多数的复制成功响应后,它将日志项应用到它的状态机,并返回成功给客户端。如果领导者没有接收到大多数的复制成功响应,那么就返回错误给客户端

领导者将日志项应用到它的状态机,怎么没通知跟随者应用日志项呢?

因为领导者的日志复制RPC消息或心跳消息,包含了当前最大的、将会被提交(应用到状态机)的日志项索引值。所以通过日志复制RPC消息或心跳消息,跟随者就可以知道领导者的日志提交位置信息
请添加图片描述

日志复制过程:

  1. 接收到客户端请求后,领导者基于客户端请求中的指令,创建一个新日志项,并附加到本地日志中
  2. 领导者通过日志复制RPC,将新的日志复制到其他的服务器
  3. 当领导者将日志项成功复制到大多数服务器的时候,领导者会将这条日志项应用到它的状态机中
  4. 领导者将执行的结果返回给客户端
  5. 当跟随者接收到心跳消息,或者新的日志复制RPC消息后,如果跟随者发现领导者已经提交了某条日志项,而它还没应用,那么跟随者就将这条日志项应用到本地的状态机上

在Raft算法中,服务器节点间的沟通联络采用的是远程过程调用(RPC),在领导者选举中,需要用到这两类的RPC:

  • 请求投票(RequestVote)RPC:是由候选人在选举期间发起,通知各节点进行投票
  • 日志复制(AppendEntries)RPC:是由领导者发起,用来复制日志和提供心跳消息

State:每个节点所需要存储的变量

在这里插入图片描述

基本不怎么变的变量:

  • currentTerm :每个节点目前的任期
  • voted for:票投给了谁(选举Leader时用到的)
  • log[]:保存的是操作和收到该log entry时leader的任期

常变的变量:

  • commit Index:当前服务器,最大的已经提交的log entry的索引号(该索引号体现在log[]中)
  • lastApplied:被应用到状态机里的最大的索引号。(注意,log entry先提交,提交后,再被应用到状态机)

Leader才需要的变量,每次选举后都需要重新初始化的变量:

  • nextIndex[]:长度等于所有节点的数量。对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值+1)
  • matchIndex[]:它记录了每个 follower 的 log 被Leader同化的最高位置,也就是要发送给该follower的 logs 的前一个 log 的索引(初始化为Leader最后的索引值)

任期(Term)

Raft算法中每个任期由单调递增的数字(任期编号)标识,任期编号是随着选举的举行而变化的

  1. 跟随者在等待领导者心跳信息超时后,推举自己为候选人时,会增加自己的任期编号,比如节点A的任期编号为0,那么在推举自己为候选人时,会将自己的任期编号增加为1
  2. 如果一个服务器节点,发现自己的任期编号比其他节点小,那么它会更新自己的任期编号到较大的编号值,比如节点B的任期编号是0,当收到来自节点A的请求投票RPC消息时,因为消息中包含了节点A的任期编号,且编号为1,那么节点B将把自己的任期编号更新为1
  3. 如果一个候选人或者领导者,发现自己的任期编号比其他节点小,那么它会立即恢复成跟随者状态(卸任)。 比如分区错误恢复后,任期编号为3的领导者节点B,收到来自新领导者的包含任期编号为4的心跳消息,那么节点B将立即恢复成跟随者状态
  4. 如果一个节点接收到一个包含较小的任期编号值的请求,那么它会直接拒绝这个请求。 比如节点C的任期编号为4,收到包含任期编号为3的请求投票RPC消息,那么它将拒绝这个消息

日志复制(AppendEntries)RPC

在这里插入图片描述
该方法是由领导者发起,用来复制日志和提供心跳消息

发送的变量包括:

  • term:leader的任期值
  • leaderId:leader的id
  • PrevLogIndex:表示当前要复制的日志项的前面一条日志项的索引值(用于定位要在哪里开始append领导者的日志)。
  • PrevLogTerm:表示当前要复制的日志项的前面一条日志项的任期编号。
  • entries[]:所需要发送给follower的log entry(可以保存多个,如果只是发心跳包,则可以为空)
  • leaderCommit:leader目前commit到的索引号。如果follower的commit index小于leader的commit index,则follower就会更新他们的commit index,并应用到自己服务器的状态机中

发送后,返回给leader的变量:

  • term:如果所返回的term大于leader的term,则leader自动卸任
  • success:如果follower匹配了PrevLogIndex和PrevLogTerm,就返回true,否则返回false

规则:

  1. 如果leader Term < rf.term,那么reply.success=false
  2. 如果
  3. 如果follower的entry存在冲突(比如,index相同,但是term不同),删除这些entry并同步leader的entry

如何实现follower日志和leader日志保持一致性?

在Raft算法中,领导者通过强制跟随者直接复制自己的日志项,处理不一致日志。也就是说,Raft是通过以领导者的日志为准,来实现各节点日志的一致性的

  1. 首先,领导者通过日志复制RPC的一致性检查,找到跟随者节点上与自己相同日志项的最大索引值(PrevLogIndex)。也就是说,这个索引值之前的日志,领导者和跟随者是一致的,之后的日志是不一致的
  2. 然后,领导者强制跟随者更新覆盖不一致的日志项,实现日志的一致

领导者通过日志复制RPC一致性检查,找到跟随者节点上与自己相同日志项的最大索引值,然后复制并更新覆盖该索引值之后的日志项,实现了各节点日志的一致。跟随者中的不一致日志项会被领导者的日志覆盖,而且领导者从来不会覆盖或者删除自己的日志

请求投票(RequestVote)RPC

在这里插入图片描述
发送的变量:

  • term:当前我的任期
  • candidateId:当前发送方的id
  • lastLogIndex:最大的日志索引号
  • lastLogTerm:最大日志索引号对应的任期

返回的结果:

  • term:所返回的term。如果大于当前自身term,就会更新自己
  • voteGranted:当返回true说明获得选票

注意:日志完整性更高的follower(日志索引项比candidate更大)是不会投票给candidate的。

持久化 Persistence

解决服务器故障的两种方式

  • 将坏的服务器从集群中摘除,并替换一个全新的空的服务器
  • 持久化

持久化的存储可以确保当服务器重启时,服务器可以找到相应的数据,并将其加载到内存中。这样可以使得服务器在故障并重启后,继续重启之前的状态。 当你更改了被标记为持久化的某个数据,服务器应该将更新写入到磁盘。

需要持久化的数据有:

  1. cuttrentTerm
  2. votedFor
  3. log[]

1和2是为了防止产生2个ledaer,而3是为了保存应用程序(使用raft接口的应用)的状态

每次添加一个Log entry,更新currentTerm或者更新votedFor,你或许都需要持久化存储这些数据

注意

在leader将log entry应用到状态机后,才会返回响应发送给client

如果你赢得了选举,你需要立刻发送一条AppendEntries RPC消息给其他所有的服务器。

为什么Raft系统这么关注Log,Log究竟起了什么作用?
  1. Log是一种记录leader按顺序执行的操作的机制。这对于复制状态机(replicated state machine)而言至关重要,对于这些复制状态机来说,所有副本不仅要执行相同的操作,还需要用相同的顺序执行这些操作,以确保副本的一致性(这里忘记了可以看回3.3)。
  2. 对于follower而言,log还是一个暂存操作的地方,因为follower不确定,这些操作是否被应用到状态机了(这些操作可能会被丢弃)
  3. 对于leader而言,log可以保存leader的操作记录,在follower离线重上线后,leader能够向Follower重传因为离线所丢失了相应的操作,通过log。
  4. 帮助重启的服务器恢复状态。对于一个重启的服务器来说,会使用存储在磁盘中的Log。每个Raft节点都需要将Log写入到它的磁盘中,这样它故障重启之后,Log还能保留。**重启后的服务器会通过log来从头执行里头的操作,从而恢复到离线前的状态,并继续以这个状态运行。**所以,Log也会被用来持久化存储操作,服务器可以依赖这些操作来恢复状态。

raft接口介绍

这一部分简单介绍一下应用层和Raft层之间的接口。假设我们的应用程序是一个key-value数据库,下面一层是Raft层。

在Raft集群中,每一个副本上,这两层之间主要有两个接口。

接口一(KV层->raft):key-value层用来转发客户端请求的接口。如果客户端发送一个请求给key-value层,key-value层会将这个请求转发给Raft层,并说:请将这个请求存放在Log中的某处。如下图所示
请添加图片描述
这个接口实际上是个函数调用,称之为Start函数。这个函数只接收一个参数,就是客户端请求。key-value层说:我接到了这个请求,请把它存在Log中,并在committed之后告诉我。

接口二(raft->KV层):当大多数服务器(超半数)将请求保存到大多数服务器的log时,Raft层会通知key-value层:你刚刚在Start函数中传给我的请求已经commit了。 Raft层通知的,不一定是最近一次Start函数传入的请求。如下图所示请添加图片描述

这个向上的接口时通过向go语言中的channel发送信息来实现的, raft从channel的一端写,key-value层从另一端读。所以这里有个叫做applyChannel的channel,通过它raft可以发送apply消息告知key-value层可以执行该请求了! 。

注意:在raft发送apply消息消息之前,kv层还并没有真正执行客户端的命令,他必须等到raft接口发送了apply信息后,才能执行。然后返回reply给客户端

当然,key-value层需要知道从applyChannel中读取的消息,匹配之前调用的哪个Start函数,所以Start函数的返回需要有足够的信息给key-value层,这样才能完成对应。Start函数的返回值包括,这个请求将会存放在Log中的位置(index)。这个请求不一定能commit成功,但是如果commit成功的话,会存放在这个index位置。同时,它还会返回当前的任期号(term)和一些其它我们现在还不太关心的内容。

在apply消息中,将会包含请求(command)和对应的Log位置(index)

MIT老师讲解Leader Election

每次follower收到AppendEntries消息(心跳)都会重置raft节点的选举定时器,以压制其成为candidate。

如何设置每个节点的超时时间 (如果该结点在规定时间内没有收到心跳了,该结点就发送投票RPC,请求成为leader)

  • 超时时间应该是一个随机值,但是需要考虑超时时间随机的上限和下限。
  • 超时时间的下限:两倍的心跳间隔(从leader发出,到follower接收的时间称为一个心跳间隔)或者三倍的心跳间隔
  • 超时时间的上限:超时时间的上限影响了系统的恢复时间。老师没说具体应该设置多少
  • 还需要考虑不同节点的超时时间差 gap,该时间差必须满足第一个开始选举的节点能够收到其他人的投票(而不是请求投票rpc刚发出,其他人也成为candidate,导致没法给第一个成为candidate的投票),这里的gap至少大于发送一条rpc所需要的往返时间。这个gap如下图的S2到S3的距离
    请添加图片描述
  • 注意,每次节点重置自己的选举定时器时,都需要重新随机一个超时时间。(感觉加上这个的话,第三点不好实现)

考虑一个特殊场景:

假设网线故障了,旧的Leader在一个网络分区中,这个网络分区中有一些客户端和少数(未过半)的服务器。在网络的另一个分区中,有着过半的服务器,这些服务器选出了一个新的Leader。旧的Leader会怎样,或者说为什么旧的Leader不会执行错误的操作?
这里看起来有两个潜在的问题。第一个问题是,如果一个Leader在一个网络分区中,并且这个网络分区没有过半的服务器。那么下次客户端发送请求时,这个在少数分区的Leader,它会发出AppendEntries消息。但是因为它在少数分区,即使包括它自己,它也凑不齐过半服务器,所以它永远不会commit这个客户端请求,它永远不会执行这个请求,它也永远不会响应客户端,并告诉客户端它已经执行了这个请求。所以,如果一个旧的Leader在一个不同的网络分区中,客户端或许会发送一个请求给这个旧的Leader,但是客户端永远也不能从这个Leader获得响应。所以没有客户端会认为这个旧的Leader执行了任何操作。另一个更奇怪的问题是,有可能Leader在向一部分Followers发完AppendEntries消息之后就故障了,所以这个Leader还没决定commit这个请求。

Lab2A项目要求

实现一个领导者选举和心跳:选举一个领导者,如果出现超时,则新leader接管

  1. 通过go test -run 2A运行测试
  2. 遵循论文的图2。实现发送和接收RequestVote RPC,与选举相关的服务器规则以及与领导者选举相关的状态。
  3. 将图2状态关于领导者选举的添加到raft struct中。还需要定义一个结构来保存日志信息
  4. 填写RequestVoteArgsRequestVoteReply结构。修改make函数:当一段时间没有收到其他peer信息,将通过发送requestvote rpc来启动leader 选举。
  5. 因为要实现心跳,所以请定义AppendEntries RPC结构(尽管您可能还不需要所有参数),并让leader定期发送它们。编写一个AppendEntries RPC处理程序方法,该方法可以重置选举超时。
  6. 确保超时时间尽量不同
  7. 要求:leader发送心跳每秒不超过十次(100毫秒发一次心跳)
  8. 要求:在五秒内选出新leader(超时时间的上限为5s内)。注意,leader election可能会有多轮(相同的超时时间或者数据包丢失),所以即使要多轮选举,也必须在5s内完成
  9. 要使用比论文150-300毫秒更长的选举超时时间,但不能太长了,否则无法在5s内完成选举
  10. 记得要使用rand,rand很好用
  11. 你需要编写定期执行或延时执行的代码。最简单的方法是创建一个带有循环的 goroutine,并在循环中调用 time.Sleep();可以参考 Make() 函数创建的 ticker() goroutine。
  12. 如果您的代码在通过测试时遇到问题,请再次阅读论文的图2;领导人选举的完整逻辑分布在图的多个部分。
  13. 不要忘记实现GetState()。
  14. 当测试程序永久关闭一个 Raft 实例时,它会调用 rf.Kill()。你可以使用 rf.killed() 来检查是否调用了 Kill()。为了避免死亡的 Raft 实例打印混乱的消息,可能需要在所有循环中都进行这个检查。
  15. 在 Go RPC 中,只有以大写字母开头的字段才会被发送。子结构体也必须具有以大写字母开头的字段名(例如,数组中的日志记录的字段)。labgob 包会对此发出警告,请不要忽略这些警告。

请务必通过2A测试,以便您看到以下内容:

$ go test -run 2A
Test (2A): initial election ...
  ... Passed --   3.5  3   58   16840    0
Test (2A): election after network failure ...
  ... Passed --   5.4  3  118   25269    0
Test (2A): multiple elections ...
  ... Passed --   7.3  7  624  138014    0
PASS
ok  	6.5840/raft	16.265s
$

所有测试花费超过600秒,单独的测试超过120秒都算失败。

Lab2A项目总结

难点:
由于我设计思路是:A节点发送投票rpc后,主线程就会沉睡,等待半数赞成票或者全部投票投完才会苏醒。这样的结果就是某个节点可能断开链接了,导致A节点没办法收到所有的投票结果,并且没法半数赞成,导致一直阻塞。所以需要设置一个协程负责监督计时,当超过选举时间了,那么就唤醒主线程成为Follower

//该协程负责监控这次选举是否超时,如果超时就通知主线程不阻塞,
go func() {
	timer :=time.NewTimer(300 * time.Millisecond)
	<-timer.C
	rf.mu.Lock()
	//把节点设置回Follower
	if rf.state !=Leader {
		rf.state = Follower
		rf.votedFor = -1
		rf.BroadcastHeartbeat(false)
	}
	cond.Broadcast()//时间到了通知主线程
	rf.mu.Unlock()
}()

Lab2B项目要求

  • 补充start函数
  • 投票部分需要判断日志完整性:日志完整性更高的follower(日志索引项比candidate更大)是不会投票给candidate的
  • 如果某个节点赢得了选举,该结点需要立刻发送一条AppendEntries RPC消息给其他所有的服务器
  • 心跳包prevlogIndex的PrevLogTerm的匹配问题
  • 使用协程来监控matchIndex数量过半的log,将这些log commit

额外补充(2B)

  • 新leader需要识别某些命令是提交的,不能被丢弃的(通过commitIndex)
  • 用nextIndex来校准。每次F返回fasle 就减少nextIndex(回退一个log entry),然后重新发送心跳包。当L发现F返回true后,就会更新nextIndex = F的log 的长度
  • 由于一条一条回退log entry太慢了,可以让F 以Term为单位进行会退,而不是每次只回退一条log entry。需要F在拒绝AppendEntries的信息时,reply带上三个额外信息:
    1. XTerm: 这个是Follower中与Leader冲突的Log对应的任期号。在之前有介绍Leader会在prevLogTerm中带上本地Log记录中前一条Log的任期号。如果Follower在对应位置的任期号不匹配,它会拒绝Leader的AppendEntries消息,并将自己的任期号放在XTerm中。如果Follower在对应位置没有Log,那么这里会返回 -1。
    2. XIndex: 这个是Follower中,对应任期号为XTerm的第一条Log条目的槽位号。
    3. XLen: 如果Follower在对应位置没有Log,那么XTerm会返回-1,XLen表示空白的Log槽位数。
  • MatchIndex用于:提交日志到状态机
  • NextIndex用于:日志校准

LAB2B项目总结

难点:
log复制的相关问题特别难:具体来说:ABC三个节点,A为Leader,此时日志状态为

A [1, 1, 1, 1, 1]
B [1]
C [1]

A在发送完第一个entry后,就由于网络问题,自己划为一个分区,此时B节点由于选举超时,成为leader,发送entry给C(A由于还在分区内,无法接收到B的rpc)

A [1, 1, 1, 1, 1]
B [1, 2]
C [1, 2]

此时A成功恢复网络,B将修改A内的日志

A [1, 2]
B [1, 2]
C [1, 2]

而此时B陷入沉睡,C由于选举超时成为Ledaer,并发送entry给A

A [1, 2, 3]
B [1, 2]
C [1, 2, 3]

此时,B突然苏醒,给A发送entry,如果此时没有处理,那么A就会修改索引为2的位置,将3改为2。而这个行为是不对的,因为B已经是老的leader了,没资格修改term为3的数据。这个bug找了修改了好久!

A [1, 2, 2] ❌
B [1, 2, 2]
C [1, 2, 3]

LAB 2C项目要求

  • 补充raft.go中的persist()(保存状态)和readPersist()(恢复状态)函数。
  • 在需要持久化的地方添加函数调用rf.persist();只要修改了 rf.currentTermrf.votedForrf.log的地方,都调用rf.persist()就可以了。

2C比较简单,补充下函数,再调用下rf.persist()就过了。没什么难点。

5. 强一致性简单介绍

如果我们能构造满足下面两个要求的这么一个序列,并且生成的图没有带环,那么就说明是强一致的

  1. 序列中的请求的顺序与实际时间匹配
  2. 每个读请求看到的都是序列中前一个写请求写入的值

如下图所示,就是一个强一致性的例子
请添加图片描述

相同请求不执行2次原则:(可以当作难点)

客户端在发送请求时,可能该请求由于网络或者其他原因,导致结果没有回传,那么客户端将会重新发送该请求给同一个leader/或者其他服务器。

所以,要求服务器有两个功能:

  1. 有能力过滤出重复的请求
  2. 能记住第一次请求的回复

解决方法是,服务器会根据请求的唯一号或者其他的客户端信息来保存一个表。这样服务器可以记住,哦,我之前看过这个请求,并且执行过它,我会发送一个相同的回复给它,因为我不想执行相同的请求两次

这将在Lab3中完成这样的机制,服务器发现了重复的请求,并将之前的回复重新发给客户端。

在这里插入图片描述

6 Zookeeper

Zookeeper是读写分离:主服务器负责写,从服务器负责读。

那么如何保证读请求一定返回最新的数据(线性一致)?

实际上,Zookeeper并不要求返回最新的写入数据。 Zookeeper允许客户端将读请求发送给任意副本,并由副本根据自己的状态来响应读请求。副本的Log可能并没有拥有最新的条目,所以尽管系统中可能有一些更新的数据,这个副本可能还是会返回旧的数据。

一致性保证

有两个主要的要求来保证一致性:

  1. 写请求是线性一致的
  2. 任何一个客户端的请求,都会按照客户端所发送的请求的顺序来执行(FIFO(First In First Out)客户端序列)。并且请求可以是异步的(不需要等前一个请求的结果返回,才能发送下一个写请求,具体来说:
    • 写请求的FIFO指的是:按照客户端发送的顺序来执行
    • 读请求的FIFO指的是:后续的读请求,必须要在不早于之前的读请求对应的Log点执行。也就是一个客户端发起了两个读请求,如果第一个读请求在Log中的一个位置执行,那么第二个读请求只允许在第一个读请求对应的位置或者更后的位置执行。
    • 对于读请求的特殊情况:如果一个客户端正在与一个副本交互,客户端发送了一些读请求给这个副本,之后这个副本故障了,客户端需要将读请求发送给另一个副本。这时,尽管客户端切换到了一个新的副本,FIFO客户端序列仍然有效,即尽管客户端切换到了一个新的副本,客户端的在新的副本的读请求,必须在之前其他副本读请求的Log这个点或者之后的点执行。 所以这意味着,客户端需要维护一个索引号,用来表示当前Log中执行点的前一条Log条目号,当客户端发送请求时,需要将这个索引号一同发送。

更进一步,FIFO客户端请求序列是对一个客户端的所有读请求,写请求生效。所以,如果我发送一个写请求给Leader,在Leader commit这个请求之前需要消耗一些时间,所以我现在给Leader发了一个写请求,而Leader还没有处理完它,或者commit它。之后,我发送了一个读请求给某个副本。这个读请求需要暂缓一下,以确保FIFO客户端请求序列。读请求需要暂缓,直到这个副本发现之前的写请求已经执行了。这是FIFO客户端请求序列的必然结果,(对于某个特定的客户端)读写请求是线性一致的。

可以通过使用命令sync,先把所有写操作复制到副本后,再发送读请求,这样读到的数据一定是最新的。

7. Lab3

Lab3A

要求:

  1. client.go文件中的Clerk的Put/Append/Get方法中添加RPC发送代码
  2. server.go文件中实现PutAppend()和Get()的RPC处理程序
  3. 补充Op结构体
  4. 处理返回给服务器端的applyCh(需要解封装)请添加图片描述

请添加图片描述
我的实现是服务器协程将日志放入 raft 层去同步后即注册一个 channel 去阻塞等待,接着 apply 协程监控 applyCh,在得到 raft 层已经 commit 的日志后,apply 协程首先将其 apply 到状态机中,接着根据 index 得到对应的 channel ,最后将状态机执行的结果 push 到 channel 中,这使得服务器协程能够解除阻塞并回复结果给客户端。

整个Lab3的整体流程

客户端:
客户端通过RPC把需要put/append/get的信息发送给服务器。

服务器:
每个服务器开启一个raft协议,客户端通过调用服务器的函数将信息通过调用raft的start函数下发到raft中,raft在通过一致性协议,将信息通过日志的形式保存在raft日志中。接着,raft会通过applyChannel告诉服务器,我已经将信息保存在raft日志中,服务器中的一个负责监控applyChannel的协程就会接收到,该协程会做两件事:

  1. 检查是否是第一次发送该命令,如果是的话就保存到数据库(put/append),
  2. 并返回给之前哪个调用服务器函数的协程:由他返回结果给客户端。

相同请求不执行2次原则:(可以当作难点)

客户端在发送请求时,可能该请求由于网络或者其他原因,导致结果没有回传,那么客户端将会重新发送该请求给同一个leader/或者其他服务器。

所以,要求服务器有两个功能:

  1. 有能力过滤出重复的请求
  2. 能记住第一次请求的回复

解决方法是,服务器会记录每个命令的序号(序号是递增的),并且通过map结构保存每个客户端已经执行命令的最大序号。这样服务器可以记住,哦,我之前执行过这个请求,我会发送一个相同的回复给它,因为我不想执行相同的请求两次

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

6.824分布式 的相关文章

  • ansible 重新启动 2.1.1.0 失败

    我一直在尝试创建一个非常简单的 Ansible 剧本 它将重新启动服务器并等待它回来 我过去在 Ansible 1 9 上有一个可以运行的 但我最近升级到 2 1 1 0 并且失败了 我正在重新启动的主机名为 idm IP 为 192 16
  • Locale.getDefault() 始终返回 en

    unix 机器上的服务器始终使用 en 作为默认区域设置 以下是区域设置输出 LANG en US LC CTYPE C LC NUMERIC C LC TIME C LC COLLATE C LC MONETARY C LC MESSAG
  • 使用 find - 删除除任何一个之外的所有文件/目录(在 Linux 中)

    如果我们想删除我们使用的所有文件和目录 rm rf 但是 如果我希望一次性删除除一个特定文件之外的所有文件和目录怎么办 有什么命令可以做到这一点吗 rm rf 可以轻松地一次性删除 甚至可以删除我最喜欢的文件 目录 提前致谢 find ht
  • 从 PL/SQL 调用 shell 脚本,但 shell 以 grid 用户而非 oracle 身份执行

    我正在尝试使用 Runtime getRuntime exec 从 Oracle 数据库内部执行 shell 脚本 在 Red Hat 5 5 上运行的 Oracle 11 2 0 4 EE CREATE OR REPLACE proced
  • 如何通过替换为空页映射来取消映射 mmap 文件

    Linux 用户空间有没有办法用空页面 映射自 dev null 或者可能是一个空页面 重复映射到从文件映射的页面的顶部 对于上下文 我想找到这个 JDK bug 的修复 https bugs openjdk java net browse
  • 应用程序无缘无故地被杀死。怀疑 BSS 高。如何调试呢?

    我已经在CentOs6 6中成功运行我的应用程序 最近 硬件 主板和内存 更新了 我的应用程序现在毫无理由地被杀死 root localhost PktBlaster PktBlaster Killed 文件和 ldd 输出 root lo
  • 如何禁用 GNOME 桌面屏幕锁定? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 如何阻止 GNOME 桌面在几分钟空闲时间后锁定屏幕 我已经尝试过官方手册了在红帽 https access redhat com doc
  • Jenkins中找不到环境变量

    我想在詹金斯中设置很多变量 我试过把它们放进去 bashrc bash profile and profile of the jenkins用户 但 Jenkins 在构建发生时找不到它们 唯一有效的方法是将所有环境变量放入Jenkinsf
  • Linux中的CONFIG_OF是什么?

    我看到它在很多地方被广泛使用 但不明白在什么场景下我需要使用它 What is 配置 OF OF 的全名是什么 打开固件 这是很久以前发明的 当时苹果公司正在生产基于 PowerPC CPU 的笔记本电脑 而 Sun Microsystem
  • 大多数 Linux 系统头文件与 C++ 兼容吗?

    大多数 Linux 系统头文件 API C 兼容吗 今天我试图做这样的事情 include
  • 从 Python 调用 PARI/GP

    我想打电话PARI GP http pari math u bordeaux fr dochtml gpman html仅从Python计算函数nextprime n 对于不同的n是我定义的 不幸的是我无法得到帕里蟒蛇 http code
  • 使用 sed 更新 xml 属性(Windows + cygwin 和 Linux)?

    我需要使用 sed 命令对 xml 文件进行更新 但我在这方面遇到了麻烦 它需要在 Windows 使用 cygwin 和 Linux 上运行 XML 具有以下元素
  • Linux 中的动态环境变量?

    Linux 中是否可以通过某种方式拥有动态环境变量 我有一个网络服务器 网站遵循以下布局 site qa production 我想要一个环境变量 例如 APPLICATION ENV 当我在 qa 目录中时设置为 qa 当我在生产目录中时
  • 加载数据infile,Windows和Linux的区别

    我有一个需要导入到 MySQL 表的文件 这是我的命令 LOAD DATA LOCAL INFILE C test csv INTO TABLE logs fields terminated by LINES terminated BY n
  • 如何有效截断文件头?

    大家都知道truncate file size 函数 通过截断文件尾部将文件大小更改为给定大小 但是如何做同样的事情 只截断文件的尾部和头部呢 通常 您必须重写整个文件 最简单的方法是跳过前几个字节 将其他所有内容复制到临时文件中 并在完成
  • linux perf:如何解释和查找热点

    我尝试了linux perf https perf wiki kernel org index php Main Page今天很实用 但在解释其结果时遇到了困难 我习惯了 valgrind 的 callgrind 这当然是与基于采样的 pe
  • arm64和armhf有什么区别?

    Raspberry Pi Type 3 具有 64 位 CPU 但其架构不是arm64 but armhf 有什么区别arm64 and armhf armhf代表 arm hard float 是给定的名称Debian 端口 https
  • 如何在 Linux shell 中将十六进制转换为 ASCII 字符?

    假设我有一个字符串5a 这是 ASCII 字母的十六进制表示Z 我需要找到一个 Linux shell 命令 它将接受一个十六进制字符串并输出该十六进制字符串代表的 ASCII 字符 所以如果我这样做 echo 5a command im
  • Pyaudio 安装错误 - “命令‘gcc’失败,退出状态 1”

    我正在运行 Ubuntu 11 04 Python 2 7 1 并想安装 Pyaudio 于是我跑了 sudo easy install pyaudio 在终端中 进程退出并显示以下错误消息 Searching for pyaudio Re
  • 将 PDF 转换为 600dpi 的 TIFF 和 jpg 96 dpi

    我想使用 ImageMagick 从 Python 脚本将 pdf 转换为 600 dpi 的 tiff 和 96 dpi 的 jpg 我使用 imagemagick 命令行完成了这项任务 但我想使用python中的Imagemagick将

随机推荐