一文掌握常见的限流算法:计数器、漏桶、令牌桶等

限流(Rate Limiting),也称流量控制。是指系统在面临高并发,或者大流量请求的情况下,限制新的请求对系统的访问,从而保证系统的稳定性。限流会导致部分用户请求处理不及时或者被拒,这就影响了用户体验。所以一般需要在系统稳定和用户体验之间平衡一下。

常见的限流算法包括固定窗口计数器算法、滑动窗口计数器算法、漏桶算法、令牌桶算法、基于用户的限流和动态限流。其中固定窗口计数器算法、滑动窗口计数器算法由都属于计数器算法。

以下将逐一介绍这些算法并附上 Go 语言的代码示例。

1. 固定窗口计数器算法

固定窗口计数器算法属于计数器算法中的一种。固定窗口计数器算法通过对请求进行计数,限制在固定时间窗口内的请求数。每个窗口开始时计数器被重置,遍历时间限制内的请求次数。

优点:实现简单,容易理解。缺点:窗口边界可能造成突发流量,将大量请求集中在窗口切换的瞬间。

假设一个接口 1s 中最多请求 100 次。最开始设置一个计数器 count=0,来一个请求count+1,1s 之内 count<=100 的请求可以正常访问,count>100 的请求则被拒绝,1s 之后 count 被重置为 0,重新开始计数。

固定窗口的问题是容易出现“突刺现象”,例如在 1s 内,100 个请求都是在前 100ms 过来的,那么后面的 900ms 的请求都会被拒绝,而系统此时是空闲的。另外还有“临界问题”,如果 100 个请求是在后 100ms 过来的,而下一个 1s 的 100 个请求在前 100ms 过来,此时系统在这 200ms 内就需要处理 200 个请求,跟我们想要的不符合。

到这里我们很容易想到,1s 这个范围太大了,缩小一些就更好了,这种把固定窗口拆成更多个小窗口的做法就是滑动窗口算法了。

Go代码示例:

复制
package main import ( "sync" "time" ) type FixedWindowCounter struct { mu sync.Mutex requestCount int limit int window time.Duration resetTime time.Time } func NewFixedWindowCounter(limit int, window time.Duration) *FixedWindowCounter { return &FixedWindowCounter{ limit: limit, window: window, resetTime: time.Now(), } } func (fw *FixedWindowCounter) Allow() bool { fw.mu.Lock() defer fw.mu.Unlock() now := time.Now() if now.Sub(fw.resetTime) >= fw.window { fw.requestCount = 0 fw.resetTime = now } if fw.requestCount < fw.limit { fw.requestCount++ return true } return false }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.
2. 滑动窗口计数器算法

滑动窗口计数器算法属于计数器算法中的一种。滑动窗口的思想是将固定窗口拆成更多个小窗口,随着时间的推移,窗口不断的滑动,统计也在不断的变化。窗口拆分的越多,滑动就会越平滑,统计就会越精确,所消耗的资源就会越多。滑动窗口如果只拆为1个窗口,就退化为固定窗口。

优点:比固定窗口算法更平滑,减少请求的突发性。缺点:实现较复杂。

Go代码示例:

复制
package main import ( "container/list" "sync" "time" ) type SlidingWindowCounter struct { mu sync.Mutex events *list.List limit int window time.Duration } func NewSlidingWindowCounter(limit int, window time.Duration) *SlidingWindowCounter { return &SlidingWindowCounter{ events: list.New(), limit: limit, window: window, } } func (sw *SlidingWindowCounter) Allow() bool { sw.mu.Lock() defer sw.mu.Unlock() now := time.Now() // 清理过期的事件 for sw.events.Len() > 0 { if sw.events.Front().Value.(time.Time).Add(sw.window).Before(now) { sw.events.Remove(sw.events.Front()) } else { break } } if sw.events.Len() < sw.limit { sw.events.PushBack(now) return true } return false }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.
3. 漏桶算法

漏桶算法通过一个固定速率的漏桶完成请求,任何超出桶容量的请求将被拒绝。请求以固定速率从桶中出桶。

优点:能够平滑处理流量,避免突发请求。缺点:如果桶满了,则请求会被立即拒绝。

漏桶算法的思想是将请求先放到一个桶中,然后像滴水一样不断的从中取出请求执行,桶满则溢,后面的请求会被拒绝。当漏斗满了,多余的水就被直接丢弃了。

漏桶算法的特点是流入速度不确定,但是流出速度是确定的,漏桶可以很平滑,均衡的处理请求,但是无法应对短暂的突发流量。

Go代码示例:

复制
package main import ( "sync" "time" ) type LeakyBucket struct { mu sync.Mutex capacity int available int rate time.Duration lastTime time.Time } func NewLeakyBucket(capacity int, rate time.Duration) *LeakyBucket { return &LeakyBucket{ capacity: capacity, available: capacity, rate: rate, lastTime: time.Now(), } } func (lb *LeakyBucket) Allow() bool { lb.mu.Lock() defer lb.mu.Unlock() now := time.Now() elapsed := now.Sub(lb.lastTime) // 更新可用令牌 lb.available += int(elapsed / lb.rate) if lb.available > lb.capacity { lb.available = lb.capacity } lb.lastTime = now if lb.available > 0 { lb.available-- return true } return false }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.
4. 令牌桶算法

令牌桶算法的思想是不断的生成令牌放到一个桶中,请求到来时到桶中申请令牌,申请得到就执行,申请不到就拒绝。如果桶中的令牌满了,新生成的令牌也会丢弃。

优点:允许突发流量,控制能力更强。缺点:稍微复杂。

与漏桶不同的是,令牌桶是流入速度确定(生成令牌的速度),流出速度不确定,所以它不像漏桶一样可以均衡的处理请求,但是由于有令牌桶这个缓冲,一旦有突增的流量,令牌桶里已有的令牌可以短暂的应对突发流量。

由于流出速度是不限制的,此时桶中已有的令牌都可以被申请到,请求一下子就会到我们的服务,给系统带来一定的压力,所以桶的大小需要合适,不宜过大。

举个例子:令牌桶的大小是 1000,每秒放 100 个令牌,经过一段时间后,请求有空闲时,桶里的令牌就会积压,最终保存了满 1000 个令牌,由于某刻流量突增,有 1000 个请求到来,此时能申请到 1000 个令牌,所有请求都会放行,最终达到我们的系统,如果令牌桶过大,系统可能会承受不了这波请求。

令牌桶算法可以说是对漏桶算法的改进。漏桶算法能限制请求的速率。而令牌桶算法在限制请求速率的同时还允许一定程度的突发调用。

过程如下:

一直放令牌,如果令牌桶达到上限则丢弃令牌,假设每秒放 10 个,可以应对一定程度的流量激增,如此时令牌桶有 100 个令牌,突然发生 200 次调用,则此时最开始的 100 次请求可以正常调用,后续的请求才会以 10个/s 的速率来调用。

Go代码示例:

复制
package main import ( "sync" "time" ) type TokenBucket struct { mu sync.Mutex capacity int tokens int rate time.Duration lastTime time.Time } func NewTokenBucket(capacity int, rate time.Duration) *TokenBucket { return &TokenBucket{ capacity: capacity, tokens: capacity, rate: rate, lastTime: time.Now(), } } func (tb *TokenBucket) Allow() bool { tb.mu.Lock() defer tb.mu.Unlock() now := time.Now() elapsed := now.Sub(tb.lastTime) // 计算可用令牌数 tb.tokens += int(elapsed / tb.rate) if tb.tokens > tb.capacity { tb.tokens = tb.capacity } tb.lastTime = now if tb.tokens > 0 { tb.tokens-- return true } return false }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.
5. 基于用户的限流

基于用户的限流策略允许对不同用户设置不同的请求频率限制。可以使用上述任意算法作为基础,根据用户身份进行控制。

Go代码示例:

复制
package main import ( "sync" "time" ) type UserRateLimiter struct { mu sync.Mutex userLimits map[string]int userCounts map[string]int limit int window time.Duration resetTime map[string]time.Time } func NewUserRateLimiter(limit int, window time.Duration) *UserRateLimiter { return &UserRateLimiter{ userLimits: make(map[string]int), userCounts: make(map[string]int), limit: limit, window: window, resetTime: make(map[string]time.Time), } } func (url *UserRateLimiter) Allow(userId string) bool { url.mu.Lock() defer url.mu.Unlock() now := time.Now() if _, exists := url.resetTime[userId]; !exists { url.resetTime[userId] = now } if now.Sub(url.resetTime[userId]) >= url.window { url.userCounts[userId] = 0 url.resetTime[userId] = now } if url.userCounts[userId] < url.limit { url.userCounts[userId]++ return true } return false }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.
6. 动态限流

动态限流算法根据系统的实时性能和负载情况动态调整限流策略。具体实现可以结合上述算法,尤其是令牌桶算法。

Go代码示例:

复制
package main import ( "sync" "time" ) type DynamicRateLimiter struct { mu sync.Mutex currentRate int maxRate int minRate int rateChange time.Duration lastChange time.Time } func NewDynamicRateLimiter(maxRate, minRate int, rateChange time.Duration) *DynamicRateLimiter { return &DynamicRateLimiter{ currentRate: maxRate, maxRate: maxRate, minRate: minRate, rateChange: rateChange, lastChange: time.Now(), } } func (dr *DynamicRateLimiter) AdjustRate(load int) { dr.mu.Lock() defer dr.mu.Unlock() now := time.Now() if now.Sub(dr.lastChange) < dr.rateChange { return } if load > dr.currentRate { dr.currentRate-- if dr.currentRate < dr.minRate { dr.currentRate = dr.minRate } } else { dr.currentRate++ if dr.currentRate > dr.maxRate { dr.currentRate = dr.maxRate } } dr.lastChange = now } func (dr *DynamicRateLimiter) Allow() bool { // 这里可以使用任意一种算法实现,根据dr.currentRate来限制请求 // 简单示例,返回 true 表示请求被允许 return true }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.
总结

尽管限流算法在实现上各有不同,但它们的核心目标是确保系统在高并发情况下能够高效、稳定地运行。选择合适的限流算法需要根据具体业务需求、流量特征及系统架构来进行相应评估。

THE END
本站服务器由亿华云赞助提供-企业级高防云服务器