扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
随着互联网的发展,在处理流量的方法也不仅仅为 first-come,first-served,而在共享网络中实现流量管理的基本机制就是排队。而公平算法则是实现在优先级队列中基于哪些策略来排队的”公平队列“。Token Bucket
则是为公平排队提供了替代方案。Fair Queue 与 Token Bucket的区别主要在,对于Fair Queue来讲,如果请求者目前空闲,Queue会将该请求者的带宽分配给其他请求者;而 Token Bucket 则是分配给请求者的带宽是带宽的上限。
创新互联专注于瑶海企业网站建设,响应式网站开发,商城网站开发。瑶海网站建设公司,为瑶海等地区提供建站服务。全流程定制网站开发,专业设计,全程项目跟踪,创新互联专业和态度为您提供的服务
通过例子了解算法原理
假设出站带宽是 4个数据包/ms,此时有一个需求为,为一个特定的发送端 A 来分配 1个数据包/ms的带宽。此时可以使用公平排队的方法分给发送 A 25%的带宽。
此时存在的问题是我们希望可以灵活地允许 A 的数据包以无规则的时间间隔发送。例如假设 A 在每个数据包发送后等待1毫秒后再开始下一个数据包的发送。
显然sence2是合理的,这个场景的解决方法就是令牌桶算法,规定 A 的配额,允许指定平均速率和突发容量。当数据包不符合令牌桶规范,那么就认为其不合理,此时会做出一下相应:
delay 被称为 整形 shaping
, shaping 是指在某个时间间隔内发送超过 Bc(Committed Burst)的大小,Bc 在这里指桶的尺寸。由于数据流量是突发性的,当在一段时间内不活动后,再次激活后的在一个间隔内发送的数量大于 Bc ,那么额外的流量被称为Be (burst excess)。
将流量丢弃或标记超额流量,保持在一个流量速率限制称为 管制 policing
。
令牌桶的定义是指,有一个桶,以稳定的速度填充令牌;桶中的任何一个溢出都会被丢弃。当要发送一个数据包,需要能够从桶中取出一个令牌;如果桶是空的那么此时数据包是不合规的数据包,必须进行 delay
, drop
, mark
操作。如果桶是满的,则会发送与桶容量相对应的突发(短时间内的高带宽传输),这是桶是空的。
令牌桶的规范:\(TB(r,B_{max})\)
那么公式则表示,桶以指定的速率填充令牌,最大为 \(B_{max}\) 。这就说明了为了使大小为 S 的数据包合规,桶内必须至少有 S 个令牌,即 \(B \ge S\),否则数据包不合规,在发送时,桶为 \(B=B-S\)
场景1:假设令牌桶规范为 \(TB(\frac{1}{3}\ packet/ms, 4\ packet)\),桶最初是满的,数据包在以下时间到达 [0, 0, 0, 2, 3, 6, 9, 12]
在处理完所有 T=0
的数据包后,桶中还剩 1 个令牌。到第四个数据包 T=2
到达时,桶内已经有1个令牌 + \(\frac{2}{3}\) 个令牌;当发送完第四个数据包时,桶内令牌数为 \(\frac{2}{3}\) 。到 T=3
数据包时,桶内令牌为1,满足发送第 5 个数据包。万松完成后桶是空的,在后面 6 9 12时,都满足3/ms 一个数据包,都可以发送成功
场景2:另外一个实例,在同样的令牌桶规范下 \(TB(\frac{1}{3}, 4)\),数据包到达时间为 [0, 0, 0, 0, 12, 12, 12, 12, 24, 24, 24, 24]
,可以看到在这个场景下,数据到达为3个突发,每个突发4个数据包,此时每次发送完成后桶被清空,当再次填满时需要12ms,此时另外一组突发达。故这组数据是合规的。、
场景3:在同样的令牌桶规范下 \(TB(\frac{1}{3}, 4)\),数据包到达时间为 [0, 1, 2, 3, 4, 5]
, 这组数据是不合规的
用表格形式表示如下:
数据包到达时间 | 0 | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|---|
发送前桶内令牌 | 4 | 3 \(\frac{1}{3}\) | 2 \(\frac{2}{3}\) | 2 | 1 \(\frac{1}{3}\) | \(\frac{2}{3}\) |
发送后桶内令牌 | 3 | 2 \(\frac{1}{3}\) | 1 \(\frac{2}{3}\) | 1 | \(\frac{1}{3}\) | \(\frac{2}{3}\) |
如果一个数据包在桶中没有足够的令牌来发送它时到达,可以进行整形或管制,整形使数据包等到足够的令牌积累。管制会丢弃数据包。或者发送方可以立即发送数据包,但将其标记为不合规。
漏桶 (leaky bucket)是一种临时存储可变数量的请求并将它们组织成设定速率输出的数据包的方法。漏桶的概念与令牌桶比起是相反的,漏桶可以理解为是一个具有恒定服务时间的队列。
由下图可以看出,漏桶的概念是一个底部有孔的桶。无论水进入桶的速度是多少,它都会以恒定的速度通过孔从桶中泄漏出来。如果桶中没有水,则流速为零,如果桶已满,则多余的水溢出并丢失。
和令牌桶一样,漏桶用于流量整形和流量管制
Leaky | Token |
---|---|
桶中存放的是所有到达的数据包,必须入桶 | 桶中存放的是定期生成的令牌 |
桶以恒定速率泄漏 | 桶有最大容量 \(B_{max}\) |
突发流量入桶转换为恒定流量发送 | 发送数据包需要小号对应的token |
token较leaky的优势:
在golang中,内置的 rate
包实现了一个令牌桶算法,通过 rate.NewLimiter(r,B)
进行构造。与公式\(TB(r,B_{max})\) 意思相同。
type Limiter struct {
limit Limit // 向桶中放置令牌的速率
burst int // 桶的容量
mu sync.Mutex
tokens float64 // 可用令牌容量
last time.Time // 上次放入token的时间
lastEvent time.Time
}
Limiter中带有三种方法, Allow
、Reserve
、Wait
分别表示Token Bucket中的 shaping
和 policing
:
drop
delay
Reserve()
返回了 ReserveN(time.Now(), 1)
ReserveN()
无论如何都会返回一个 Reservation,指定了调用者在 n 个事件发生之前必须等待多长时间。Delay()
方法返回了需要等待的时间,如果时间为0则不需要等待Cancel()
将取消等待wait/waitN
reserveN()
拿到token是合规的,并消耗掉tokenAllowN 为截止到某一时刻,当前桶内桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之不消费 Token,false。
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok // 由于仅需要一个合规否,顾合规的通过,不合规的丢弃
}
reserveN()
是三个行为的核心,AllowN中指定的为 0 ,因为 maxFutureReserve
是最大的等待时间,AllowN给定的是0,即如果突发大的情况下丢弃额外的 Bc。
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
// 这里拿到的是now,上次更新token时间和桶内token数量
now, last, tokens := lim.advance(now)
// 计算剩余的token
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// 确定是否合规,n是token
// token 的数量要小于桶的容量,并且 等待时间小于最大等待时间
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
在reserveN中调用了一个 advance()
函数,
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) { // 计算上次放入token是否在传入now之前
last = now
}
// 当 last 很旧时,避免在下面进行 delta 溢出。
// maxElapsed 计算装满需要多少时间
maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last) // 上次装入到现在的时差
if elapsed > maxElapsed { // 上次如果放入token时间超长,就让他与装满时间相等
elapsed = maxElapsed // 即,让桶为满的
}
// 装桶的动作,下面函数表示,elapsed时间内可以生成多少个token
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta // 当前的token
if burst := float64(lim.burst); tokens > burst {
tokens = burst // 这里表示token溢出,让他装满就好
}
return now, last, tokens
}
reserveN()
拿到token是合规的,并消耗掉tokenfunc (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
if n > lim.burst && lim.limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
}
// 外部已取消
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// 三个方法的核心,这里给定了deatline
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
在 rate.limiter
中,支持调整速率和桶大小,这样就可以根据现有环境和条件,来动态的改变 Token生成速率和桶容量
SetLimit(Limit)
更改生成 Token 的速率SetBurst(int)
改变桶容量package main
import (
"log"
"strconv"
"time"
"golang.org/x/time/rate"
)
func main() {
timeLayout := "2006-01-02:15:04:05.0000"
limiter := rate.NewLimiter(1, 5) // BT(1,5)
log.Println("bucket current capacity: " + strconv.Itoa(limiter.Burst()))
length := 20 // 一共请求20次
chs := make([]chan string, length)
for i := 0; i < length; i++ {
chs[i] = make(chan string, 1)
go func(taskId string, ch chan string, r *rate.Limiter) {
err := limiter.Allow()
if !err {
ch <- "Task-" + taskId + " unallow " + time.Now().Format(timeLayout)
}
time.Sleep(time.Duration(5) * time.Millisecond)
ch <- "Task-" + taskId + " run success " + time.Now().Format(timeLayout)
return
}(strconv.FormatInt(int64(i), 10), chs[i], limiter)
}
for _, ch := range chs {
log.Println("task start at " + <-ch)
}
}
通过执行结果可以看出,在突发为20的情况下,allow仅允许了获得token的事件执行,,这种场景下实现了流量整形的特性。
package main
import (
"context"
"log"
"strconv"
"time"
"golang.org/x/time/rate"
)
func main() {
timeLayout := "2006-01-02:15:04:05.0000"
limiter := rate.NewLimiter(1, 5) // BT(1,5)
log.Println("bucket current capacity: " + strconv.Itoa(limiter.Burst()))
length := 20 // 一共请求20次
chs := make([]chan string, length)
for i := 0; i < length; i++ {
chs[i] = make(chan string, 1)
go func(taskId string, ch chan string, r *rate.Limiter) {
err := limiter.Wait(context.TODO())
if err != nil {
ch <- "Task-" + taskId + " unallow " + time.Now().Format(timeLayout)
}
ch <- "Task-" + taskId + " run success " + time.Now().Format(timeLayout)
return
}(strconv.FormatInt(int64(i), 10), chs[i], limiter)
}
for _, ch := range chs {
log.Println("task start at " + <-ch)
}
}
结果可以看出,在大突发的情况下,在拿到token的任务会立即执行,没有拿到token的会等待拿到token后继续执行,这种场景下实现了流量管制的特性
Reference
tokenbucket
QoS Policing
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流