限流(Rate Limiting),也称流量控制。是指系统在面临高并发,或者大流量请求的情况下,限制新的请求对系统的访问,从而保证系统的稳定性。限流会导致部分用户请求处理不及时或者被拒,这就影响了用户体验。所以一般需要在系统稳定和用户体验之间平衡一下。
常见的限流算法包括固定窗口计数器算法、滑动窗口计数器算法、漏桶算法、令牌桶算法、基于用户的限流和动态限流。其中固定窗口计数器算法、滑动窗口计数器算法由都属于计数器算法。

以下将逐一介绍这些算法并附上 Go 语言的代码示例。
1. 固定窗口计数器算法
固定窗口计数器算法属于计数器算法中的一种。固定窗口计数器算法通过对请求进行计数,限制在固定时间窗口内的请求数。每个窗口开始时计数器被重置,遍历时间限制内的请求次数。
优点:实现简单,容易理解。缺点:窗口边界可能造成突发流量,将大量请求集中在窗口切换的瞬间。
假设一个接口 1s 中最多请求 100 次。最开始设置一个计数器 count=0,来一个请求count+1,1s 之内 count<=100 的请求可以正常访问,count>100 的请求则被拒绝,1s 之后 count 被重置为 0,重新开始计数。

固定窗口的问题是容易出现“突刺现象”,例如在 1s 内,100 个请求都是在前 100ms 过来的,那么后面的 900ms 的请求都会被拒绝,而系统此时是空闲的。另外还有“临界问题”,如果 100 个请求是在后 100ms 过来的,而下一个 1s 的 100 个请求在前 100ms 过来,此时系统在这 200ms 内就需要处理 200 个请求,跟我们想要的不符合。

到这里我们很容易想到,1s 这个范围太大了,缩小一些就更好了,这种把固定窗口拆成更多个小窗口的做法就是滑动窗口算法了。
Go代码示例:
复制
package main
import (
"sync"
"time"
)
type FixedWindowCounter struct {
mu sync.Mutex
requestCount int
limit int
window time.Duration
resetTime time.Time
}
func NewFixedWindowCounter(limit int, window time.Duration) *FixedWindowCounter {
return &FixedWindowCounter{
limit: limit,
window: window,
resetTime: time.Now(),
}
}
func (fw *FixedWindowCounter) Allow() bool {
fw.mu.Lock()
defer fw.mu.Unlock()
now := time.Now()
if now.Sub(fw.resetTime) >= fw.window {
fw.requestCount = 0
fw.resetTime = now
}
if fw.requestCount < fw.limit {
fw.requestCount++
return true
}
return false
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.
2. 滑动窗口计数器算法
滑动窗口计数器算法属于计数器算法中的一种。滑动窗口的思想是将固定窗口拆成更多个小窗口,随着时间的推移,窗口不断的滑动,统计也在不断的变化。窗口拆分的越多,滑动就会越平滑,统计就会越精确,所消耗的资源就会越多。滑动窗口如果只拆为1个窗口,就退化为固定窗口。

优点:比固定窗口算法更平滑,减少请求的突发性。缺点:实现较复杂。
Go代码示例:
复制
package main
import (
"container/list"
"sync"
"time"
)
type SlidingWindowCounter struct {
mu sync.Mutex
events *list.List
limit int
window time.Duration
}
func NewSlidingWindowCounter(limit int, window time.Duration) *SlidingWindowCounter {
return &SlidingWindowCounter{
events: list.New(),
limit: limit,
window: window,
}
}
func (sw *SlidingWindowCounter) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now()
// 清理过期的事件
for sw.events.Len() > 0 {
if sw.events.Front().Value.(time.Time).Add(sw.window).Before(now) {
sw.events.Remove(sw.events.Front())
} else {
break
}
}
if sw.events.Len() < sw.limit {
sw.events.PushBack(now)
return true
}
return false
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.
3. 漏桶算法
漏桶算法通过一个固定速率的漏桶完成请求,任何超出桶容量的请求将被拒绝。请求以固定速率从桶中出桶。
优点:能够平滑处理流量,避免突发请求。缺点:如果桶满了,则请求会被立即拒绝。
漏桶算法的思想是将请求先放到一个桶中,然后像滴水一样不断的从中取出请求执行,桶满则溢,后面的请求会被拒绝。当漏斗满了,多余的水就被直接丢弃了。

漏桶算法的特点是流入速度不确定,但是流出速度是确定的,漏桶可以很平滑,均衡的处理请求,但是无法应对短暂的突发流量。
Go代码示例:
复制
package main
import (
"sync"
"time"
)
type LeakyBucket struct {
mu sync.Mutex
capacity int
available int
rate time.Duration
lastTime time.Time
}
func NewLeakyBucket(capacity int, rate time.Duration) *LeakyBucket {
return &LeakyBucket{
capacity: capacity,
available: capacity,
rate: rate,
lastTime: time.Now(),
}
}
func (lb *LeakyBucket) Allow() bool {
lb.mu.Lock()
defer lb.mu.Unlock()
now := time.Now()
elapsed := now.Sub(lb.lastTime)
// 更新可用令牌
lb.available += int(elapsed / lb.rate)
if lb.available > lb.capacity {
lb.available = lb.capacity
}
lb.lastTime = now
if lb.available > 0 {
lb.available--
return true
}
return false
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.
4. 令牌桶算法
令牌桶算法的思想是不断的生成令牌放到一个桶中,请求到来时到桶中申请令牌,申请得到就执行,申请不到就拒绝。如果桶中的令牌满了,新生成的令牌也会丢弃。

优点:允许突发流量,控制能力更强。缺点:稍微复杂。
与漏桶不同的是,令牌桶是流入速度确定(生成令牌的速度),流出速度不确定,所以它不像漏桶一样可以均衡的处理请求,但是由于有令牌桶这个缓冲,一旦有突增的流量,令牌桶里已有的令牌可以短暂的应对突发流量。
由于流出速度是不限制的,此时桶中已有的令牌都可以被申请到,请求一下子就会到我们的服务,给系统带来一定的压力,所以桶的大小需要合适,不宜过大。
举个例子:令牌桶的大小是 1000,每秒放 100 个令牌,经过一段时间后,请求有空闲时,桶里的令牌就会积压,最终保存了满 1000 个令牌,由于某刻流量突增,有 1000 个请求到来,此时能申请到 1000 个令牌,所有请求都会放行,最终达到我们的系统,如果令牌桶过大,系统可能会承受不了这波请求。
令牌桶算法可以说是对漏桶算法的改进。漏桶算法能限制请求的速率。而令牌桶算法在限制请求速率的同时还允许一定程度的突发调用。
过程如下:
一直放令牌,如果令牌桶达到上限则丢弃令牌,假设每秒放 10 个,可以应对一定程度的流量激增,如此时令牌桶有 100 个令牌,突然发生 200 次调用,则此时最开始的 100 次请求可以正常调用,后续的请求才会以 10个/s 的速率来调用。
Go代码示例:
复制
package main
import (
"sync"
"time"
)
type TokenBucket struct {
mu sync.Mutex
capacity int
tokens int
rate time.Duration
lastTime time.Time
}
func NewTokenBucket(capacity int, rate time.Duration) *TokenBucket {
return &TokenBucket{
capacity: capacity,
tokens: capacity,
rate: rate,
lastTime: time.Now(),
}
}
func (tb *TokenBucket) Allow() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
elapsed := now.Sub(tb.lastTime)
// 计算可用令牌数
tb.tokens += int(elapsed / tb.rate)
if tb.tokens > tb.capacity {
tb.tokens = tb.capacity
}
tb.lastTime = now
if tb.tokens > 0 {
tb.tokens--
return true
}
return false
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.
5. 基于用户的限流
基于用户的限流策略允许对不同用户设置不同的请求频率限制。可以使用上述任意算法作为基础,根据用户身份进行控制。
Go代码示例:
复制
package main
import (
"sync"
"time"
)
type UserRateLimiter struct {
mu sync.Mutex
userLimits map[string]int
userCounts map[string]int
limit int
window time.Duration
resetTime map[string]time.Time
}
func NewUserRateLimiter(limit int, window time.Duration) *UserRateLimiter {
return &UserRateLimiter{
userLimits: make(map[string]int),
userCounts: make(map[string]int),
limit: limit,
window: window,
resetTime: make(map[string]time.Time),
}
}
func (url *UserRateLimiter) Allow(userId string) bool {
url.mu.Lock()
defer url.mu.Unlock()
now := time.Now()
if _, exists := url.resetTime[userId]; !exists {
url.resetTime[userId] = now
}
if now.Sub(url.resetTime[userId]) >= url.window {
url.userCounts[userId] = 0
url.resetTime[userId] = now
}
if url.userCounts[userId] < url.limit {
url.userCounts[userId]++
return true
}
return false
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.
6. 动态限流
动态限流算法根据系统的实时性能和负载情况动态调整限流策略。具体实现可以结合上述算法,尤其是令牌桶算法。
Go代码示例:
复制
package main
import (
"sync"
"time"
)
type DynamicRateLimiter struct {
mu sync.Mutex
currentRate int
maxRate int
minRate int
rateChange time.Duration
lastChange time.Time
}
func NewDynamicRateLimiter(maxRate, minRate int, rateChange time.Duration) *DynamicRateLimiter {
return &DynamicRateLimiter{
currentRate: maxRate,
maxRate: maxRate,
minRate: minRate,
rateChange: rateChange,
lastChange: time.Now(),
}
}
func (dr *DynamicRateLimiter) AdjustRate(load int) {
dr.mu.Lock()
defer dr.mu.Unlock()
now := time.Now()
if now.Sub(dr.lastChange) < dr.rateChange {
return
}
if load > dr.currentRate {
dr.currentRate--
if dr.currentRate < dr.minRate {
dr.currentRate = dr.minRate
}
} else {
dr.currentRate++
if dr.currentRate > dr.maxRate {
dr.currentRate = dr.maxRate
}
}
dr.lastChange = now
}
func (dr *DynamicRateLimiter) Allow() bool {
// 这里可以使用任意一种算法实现,根据dr.currentRate来限制请求
// 简单示例,返回 true 表示请求被允许
return true
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.
总结
尽管限流算法在实现上各有不同,但它们的核心目标是确保系统在高并发情况下能够高效、稳定地运行。选择合适的限流算法需要根据具体业务需求、流量特征及系统架构来进行相应评估。