kube-proxy使用了k8s官方工具库中的BoundedFrequencyRunner实现基于事件及时间间隔的配置同步
1 BounderFrequencyRunner构建
1.1 相关核心代码
// name: Runner名称
// func():目标方法
// minInterval:最小时间间隔(2次目标方法调用的最小间隔)
// maxInterval:最大时间间隔(2次目标方法调用的最大间隔,当没有事件时,以最大时间间隔执行)
// burstRuns:允许突发
func NewBoundedFrequencyRunner(name string, fn func(), minInterval, maxInterval time.Duration, burstRuns int) *BoundedFrequencyRunner {
timer := &realTimer{timer: time.NewTimer(0)} // will tick immediately
<-timer.C() // consume the first tick
return construct(name, fn, minInterval, maxInterval, burstRuns, timer)
}
//construct方法中基于最小时间间隔构建了令牌桶限速器,关键代码如下
qps := float32(time.Second) / float32(minInterval)
bfr.limiter = flowcontrol.NewTokenBucketRateLimiterWithClock(qps, burstRuns, timer)
func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
limiter := rate.NewLimiter(rate.Limit(qps), burst)
return newTokenBucketRateLimiter(limiter, c, qps)
}
1.2 qps计算测试
func main() {
duration32 :=float32(time.Second) / float32(3*time.Second)
duration64 :=float64(time.Second) / float64(3*time.Second)
duration32To64 := float64(duration32)
fmt.Println(duration32)
fmt.Println(duration64)
fmt.Println(duration32To64)
}
0.33333334
0.3333333333333333
0.3333333432674408
结合代码及上述测试结果,可以发现,计算qps时产生了精度丢失,而在转换类型时,又产生了一次精度丢失,导致了实际使用的limit值大于期望值
注意:实际使用的limit值大于期望值
2 BounderFrequencyRunner Loop
BoundedFrequencyRunner 通过Loop方法运行
2.1 相关核心代码
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
klog.V(3).Infof("%s Loop running", bfr.name)
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
klog.V(3).Infof("%s Loop stopping", bfr.name)
return
//基于时间
case <-bfr.timer.C():
bfr.tryRun()
// 基于事件
case <-bfr.run:
bfr.tryRun()
// 基于事件
case <-bfr.retry:
bfr.doRetry()
}
}
}
3 tryRun
3.1 相关核心代码
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
// 处理逻辑1,限速令牌桶中至少有1个token
if bfr.limiter.TryAccept() {
// We're allowed to run the function right now.
bfr.fn()
bfr.lastRun = bfr.timer.Now()
bfr.timer.Stop()
bfr.timer.Reset(bfr.maxInterval)
klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
return
}
// 处理逻辑2,限速令牌桶中少于1个token
// It can't run right now, figure out when it can run next.
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
nextPossible := bfr.minInterval - elapsed // time to next possible run
nextScheduled := bfr.timer.Remaining() // time to next scheduled run
klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
// It's hard to avoid race conditions in the unit tests unless we always reset
// the timer here, even when it's unchanged
if nextPossible < nextScheduled {
nextScheduled = nextPossible
}
bfr.timer.Stop()
//下一次往time管道中填充数据的时间为:当前时间+nextScheduled
//当nextScheduled为负值时,下次调度时间永远小于当前值会引起Loop死循环
bfr.timer.Reset(nextScheduled)
}
资源事件或周期性定时器,都会触发tryRun方法,在该方法中,处理分为2种情况:
(1)限速令牌桶中至少有1个token,进入if中处理逻辑,此时同步方法被调用,进行配置同步处理下发
(2)限速令牌桶中少于1个token。在按最大时间间隔调用tryRun情况,可能会出现本来应该进行处理逻辑1处理,因为limit的精度误差导致了token偏差,而进行了处理逻辑2处理的情况。此时,elapsed值为最大时间间隔,那么nextPossible值必然小于0。这个时候会导致nextScheduled值为负数,进而导致Loop死循环,cpu占用100%。
4 advance
调用逻辑:TryAccept->AllowN->reserveN→advance
advance方法基于时间计算限速结果
4.1 相关核心代码
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
//计算还需要多少时间,把令牌桶填满
// Avoid making delta overflow below when last is very old.
// brust令牌桶的大小,maxElapsed=把桶填满的时间(生成剩余容量token需要多久)
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
// 如果TryAccept执行时间间隔大于把桶填满的时间,则时间间隔应该为maxElapsed,即不能桶溢出(如果只是按间隔运行,maxElapsed为1/qps)
if elapsed > maxElapsed {
elapsed = maxElapsed
}
// Calculate the new number of tokens, due to time that passed.
// 根据时间间隔,计算现在桶里面应该有多少令牌
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
//如果当前tokens值大于了桶容量,则当前桶中token数量为burst,即不能桶溢出
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
//用来计算生成指定数量token需要多少时间
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Nanosecond * time.Duration(1e9*seconds)
}
// 用来计算指定时间间隔可以生成多少token
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
// Split the integer and fractional parts ourself to minimize rounding errors.
// See golang.org/issues/34861.
sec := float64(d/time.Second) * float64(limit)
nsec := float64(d%time.Second) * float64(limit)
return sec + nsec/1e9
}
在没有事件,只按最大时间间隔执行时,elapsed值等于lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)(没事件时,maxElapsed=把桶填满的时间,elapsed= maxInterVal,这里把桶填满的最大时间为minInterval值,因此maxElapsed总是小于elapsed, elapsed=1/qps),且当最小时间间隔为2的指数时,maxElapsed=1/qps=最小时间间隔,delta为1。而当最小时间间隔不是2的指数时,maxElapsed=1/qps<最小时间间隔, delta值不为1。这个时候token会产生偏差值即delta-1.(这是由float32和float64的存储格式决定的。qps=1/minInterval会比实际值偏大,elapsed会比实际值偏小,delta值会比实际值偏小)
5 reserveN
该方法计算当前tokens值,并返回是否允许Accept结果
5.1 相关核心代码
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
// 对于TryAccept()调用,这里传入的n是1,这里是减去1个token
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
// 如果tokens值大于0,则表明令牌桶中有1个令牌,TryAccept()这个时候应该为true
// 如果tokens值小于0,则计算产生差值token需要的时间间隔,如果时间间隔小于1纳秒,则,TryAccept()为Ture,否则为false
if tokens < 0 {
//计算waitDuration时候会用到,这个值可能会比实际值偏小,偏小不会影响ACCEPT
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
//lim.burst值由创建Runner时传入的值决定,这里是1
//maxFutureReserve 值为0
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
// 这里token值实际上会不断累加每次的delta-1偏差值,直到这个偏差大于1纳秒,会引起本该Accept的没有Accept
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
reserveN在调用lim.advance(now),获取到tokens后会进行判断,判断当前桶中是否可以取出一个令牌。其做法是,用token值减去1,如果token减去1的值大于0,则表明当前桶中有令牌。如果token减去1的值小于0时,则计算产生这个token差的时间,如果这个时间小于1纳秒,则还是认为这次可以accept。在精度丢失情况下,delta值不是1,导致了每次tokens值都出现了delta-1的偏差,这个偏差累加到1纳秒时,会导致本该Accept的情况,没有Accept。导致了nextScheduled为负值
6 问题分析
由于浮点数精度问题,导致实际作用的limit值大于期望值,导致基于limit得到的elapsed偏小,导致基于elapsed得到的token偏小,进而使得某次不该限制的调用,被拒绝,导致了下次调度时间小于当前时间值,导致了死循环,导致了cpu 100%
7 问题复现
在最小时间间隔为3秒,最大时间间隔为10秒时,在运行11天14小时后,cpu 100%。
11天14小时 = 1000800秒 =1000800秒/10秒/周期= 100080周期
7.1 相关代码修改
1 tryRun
func (bfr *BoundedFrequencyRunner) tryRun() {
bfr.mu.Lock()
defer bfr.mu.Unlock()
//if bfr.limiter.TryAccept() {
// // We're allowed to run the function right now.
// bfr.fn()
// bfr.lastRun = bfr.timer.Now()
// bfr.timer.Stop()
// bfr.timer.Reset(bfr.maxInterval)
// klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval)
// return
//}
//打印出第1个TryAccept为false时的周期值
for i := 1; i < 100000000; i++ {
if !bfr.limiter.TryAccept() {
klog.Infof("----un accept %v", i)
break
}
}
// It can't run right now, figure out when it can run next.
elapsed := bfr.timer.Since(bfr.lastRun) // how long since last run
nextPossible := bfr.minInterval - elapsed // time to next possible run
nextScheduled := bfr.timer.Remaining() // time to next scheduled run
klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled)
if nextPossible < 0 {
klog.Infof("----nextScheduled %v < 0", nextScheduled)
return
}
// It's hard to avoid race conditions in the unit tests unless we always reset
// the timer here, even when it's unchanged
if nextPossible < nextScheduled {
nextScheduled = nextPossible
}
bfr.timer.Stop()
bfr.timer.Reset(nextScheduled)
}
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Avoid making delta overflow below when last is very old.
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
//elapsed := now.Sub(last)
//if elapsed > maxElapsed {
// elapsed = maxElapsed
//}
// 因为elapsed 为最大时间间隔,必然大于maxElapsed值,这里直接赋值为maxElapsed
elapsed := maxElapsed
// Calculate the new number of tokens, due to time that passed.
delta := lim.limit.tokensFromDuration(elapsed)
fmt.Printf("delta %v \n", delta)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
func main() {
runner := NewBoundedFrequencyRunner("vmELBSyncs", syncVMELBs, 3*time.Second, 10*time.Second, 1)
go runner.Loop(wait.NeverStop)
select {
}
}
8 解决方案
由于不能直接修改官方库,且修改后难以验证,这里直接选择使用修改参数的方式解决,即minInterval值为2的指数。
9 ISSUES
https://github.com/kubernetes/kubernetes/issues/103286
https://go-review.googlesource.com/c/time/+/336469/1/rate/rate.go#387
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Calculate the new number of tokens, due to time that passed.
// 这里不再计算maxElapsed,因为burst判断,可以保证tokens不会溢出
// 计算maxElapsed,会因为limit精度问题,导致maxElapsed偏小,进而导致delta偏小,进而导致tokens偏小
elapsed := now.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
// durationFromTokens is a unit conversion function from the number of tokens to the duration
// of time it takes to accumulate them at a rate of limit tokens per second.
func (limit Limit) durationFromTokens(tokens float64) time.Duration {
seconds := tokens / float64(limit)
return time.Duration(float64(time.Second) * seconds)
}
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
//这里相较于原先其实没有变化,只是进行了方法的封装
return d.Seconds() * float64(limit)
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)