package limit import ( "gd_auth_check/common.in/cache" "strconv" "sync" "sync/atomic" "time" "github.com/go-redis/redis" ) const ( script = `local capacity = tonumber(ARGV[1]) local requested = tonumber(ARGV[2]) local used = tonumber(redis.call("hget", KEYS[1], "used")) if used == nil then used = 0 end local allowed = capacity - used - requested >= 0 if allowed then redis.call("hincrby", KEYS[1], "used", requested) end redis.call("hset", KEYS[1], "capacity", capacity) return allowed` resetScript = `local keys = redis.call('keys', table.concat({'` + redisUsedTokenKey + `', KEYS[1] ,'*'})) if next(keys) ~= nil then redis.call('del', unpack(redis.call('keys', table.concat({'` + redisUsedTokenKey + `', KEYS[1] ,'*'})))) end return redis.call("hmset", KEYS[2], "capacity", ARGV[1], "used", 0)` pingInterval = time.Millisecond * 100 redisRouterKeyPre = "rate_limit:router:" redisMerchantKeyPre = "rate_limit:merchant:" fromMem = "0" fromRedis = "1" pingSuccess = "ping.success" ) type TokenLimiter struct { // 通容量 burst int // redis key tokenKey string // redi key prefix prefix string // 储存容器 store *cache.RedisCache // lock rescueLock sync.Mutex // redis 健康标识 redisAlive uint32 // redis故障时采用进程内 令牌桶限流器。容量 = 总容量 / 2 rescueLimter *Limiter // redis监控探测任务标识 monitorStarted bool // 是否更新,用已重载的时候做更新判断 update bool } // 初始化令牌桶 func NewTokenLimiter(burst int, prefix, tokenKey string, update bool, store *cache.RedisCache) *TokenLimiter { rescueBurst := burst / 2 if rescueBurst == 0 { rescueBurst = 1 } return &TokenLimiter{ burst: burst, prefix: prefix, tokenKey: tokenKey, store: store, redisAlive: 1, rescueLimter: NewLimiter(rescueBurst), update: update, } } // 获取令牌, 一次获取一个令牌 func (lim *TokenLimiter) Allow() (bool, string) { ok, tokenArr := lim.reserveN(1) if ok { return ok, tokenArr[0] } return ok, "" } func (lim *TokenLimiter) reserveN(n int) (bool, []string) { if atomic.LoadUint32(&lim.redisAlive) == 0 { return lim.rescueLimter.AllowN(n) } resp, err := lim.store.Eval(script, []string{ lim.prefix + lim.tokenKey, }, []string{ strconv.Itoa(lim.burst), strconv.Itoa(n), }) // redis allowed == false // Lua boolean false -> r Nil bulk reply if err == redis.Nil { return false, nil } else if err != nil { // 执行异常,开启redis健康探测任务 // 同时采用进程内限流器作为兜底 lim.startMonitor() return lim.rescueLimter.AllowN(n) } code, ok := resp.(int64) if !ok { // redis 限流执行失败,使用内存限流 // 执行异常,开启redis健康探测任务 // 同时采用进程内限流器作为兜底 lim.startMonitor() return lim.rescueLimter.AllowN(n) } if code == 1 { tokenArr := make([]string, 0, n) for i := 0; i < n; i++ { // 生成令牌 token := getToken(fromRedis) tokenArr = append(tokenArr, token) // 存储令牌 storeToken(lim.store, lim.prefix, lim.tokenKey, token) } return true, tokenArr } return false, nil } func (lim *TokenLimiter) startMonitor() { lim.rescueLock.Lock() defer lim.rescueLock.Unlock() if lim.monitorStarted { return } lim.monitorStarted = true atomic.StoreUint32(&lim.redisAlive, 0) go lim.waitForRedis() } func (lim *TokenLimiter) waitForRedis() { ticker := time.NewTicker(pingInterval) defer func() { ticker.Stop() lim.rescueLock.Lock() lim.monitorStarted = false lim.rescueLock.Unlock() }() for range ticker.C { if lim.store.Ping() { atomic.StoreUint32(&lim.redisAlive, 1) // 重置当前桶 // 在redis不可用的时候,某些令牌可能得不到释放,所以redis一旦恢复可用直接重置全部桶 reload(pingSuccess) return } } } func (lim *TokenLimiter) Reload() { lim.rescueLock.Lock() _, err := lim.store.Eval(resetScript, []string{ lim.tokenKey, lim.prefix + lim.tokenKey, }, []string{ strconv.Itoa(lim.burst), }) if err == redis.Nil { lim.rescueLock.Unlock() // 重载失败 lim.startMonitor() return } lim.update = false lim.rescueLock.Unlock() return } func (lim *TokenLimiter) release(token string) { from := getTokenfrom(token) switch from { case fromRedis: if !releaseToken(lim.store, lim.prefix, lim.tokenKey, token) { // redis 限流执行失败 // 执行异常,开启redis健康探测任务 lim.startMonitor() } break case fromMem: lim.rescueLimter.releaseToken(token) } }