123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- package limit
- import (
- "gd_auth_check/common.in/cache"
- "strconv"
- "sync"
- "sync/atomic"
- "time"
- "github.com/go-redis/redis"
- )
- const (
- script = `local capacity = tonumber(ARGV[1])
- local requested = tonumber(ARGV[2])
- local used = tonumber(redis.call("hget", KEYS[1], "used"))
- if used == nil then
- used = 0
- end
- local allowed = capacity - used - requested >= 0
- if allowed then
- redis.call("hincrby", KEYS[1], "used", requested)
- end
- redis.call("hset", KEYS[1], "capacity", capacity)
- return allowed`
- resetScript = `local keys = redis.call('keys', table.concat({'` + redisUsedTokenKey + `', KEYS[1] ,'*'}))
- if next(keys) ~= nil then
- redis.call('del', unpack(redis.call('keys', table.concat({'` + redisUsedTokenKey + `', KEYS[1] ,'*'}))))
- end
- return redis.call("hmset", KEYS[2], "capacity", ARGV[1], "used", 0)`
- pingInterval = time.Millisecond * 100
- redisRouterKeyPre = "rate_limit:router:"
- redisMerchantKeyPre = "rate_limit:merchant:"
- fromMem = "0"
- fromRedis = "1"
- pingSuccess = "ping.success"
- )
- type TokenLimiter struct {
- // 通容量
- burst int
- // redis key
- tokenKey string
- // redi key prefix
- prefix string
- // 储存容器
- store *cache.RedisCache
- // lock
- rescueLock sync.Mutex
- // redis 健康标识
- redisAlive uint32
- // redis故障时采用进程内 令牌桶限流器。容量 = 总容量 / 2
- rescueLimter *Limiter
- // redis监控探测任务标识
- monitorStarted bool
- // 是否更新,用已重载的时候做更新判断
- update bool
- }
- // 初始化令牌桶
- func NewTokenLimiter(burst int, prefix, tokenKey string, update bool, store *cache.RedisCache) *TokenLimiter {
- rescueBurst := burst / 2
- if rescueBurst == 0 {
- rescueBurst = 1
- }
- return &TokenLimiter{
- burst: burst,
- prefix: prefix,
- tokenKey: tokenKey,
- store: store,
- redisAlive: 1,
- rescueLimter: NewLimiter(rescueBurst),
- update: update,
- }
- }
- // 获取令牌, 一次获取一个令牌
- func (lim *TokenLimiter) Allow() (bool, string) {
- ok, tokenArr := lim.reserveN(1)
- if ok {
- return ok, tokenArr[0]
- }
- return ok, ""
- }
- func (lim *TokenLimiter) reserveN(n int) (bool, []string) {
- if atomic.LoadUint32(&lim.redisAlive) == 0 {
- return lim.rescueLimter.AllowN(n)
- }
- resp, err := lim.store.Eval(script, []string{
- lim.prefix + lim.tokenKey,
- }, []string{
- strconv.Itoa(lim.burst),
- strconv.Itoa(n),
- })
- // redis allowed == false
- // Lua boolean false -> r Nil bulk reply
- if err == redis.Nil {
- return false, nil
- } else if err != nil {
- // 执行异常,开启redis健康探测任务
- // 同时采用进程内限流器作为兜底
- lim.startMonitor()
- return lim.rescueLimter.AllowN(n)
- }
- code, ok := resp.(int64)
- if !ok {
- // redis 限流执行失败,使用内存限流
- // 执行异常,开启redis健康探测任务
- // 同时采用进程内限流器作为兜底
- lim.startMonitor()
- return lim.rescueLimter.AllowN(n)
- }
- if code == 1 {
- tokenArr := make([]string, 0, n)
- for i := 0; i < n; i++ {
- // 生成令牌
- token := getToken(fromRedis)
- tokenArr = append(tokenArr, token)
- // 存储令牌
- storeToken(lim.store, lim.prefix, lim.tokenKey, token)
- }
- return true, tokenArr
- }
- return false, nil
- }
- func (lim *TokenLimiter) startMonitor() {
- lim.rescueLock.Lock()
- defer lim.rescueLock.Unlock()
- if lim.monitorStarted {
- return
- }
- lim.monitorStarted = true
- atomic.StoreUint32(&lim.redisAlive, 0)
- go lim.waitForRedis()
- }
- func (lim *TokenLimiter) waitForRedis() {
- ticker := time.NewTicker(pingInterval)
- defer func() {
- ticker.Stop()
- lim.rescueLock.Lock()
- lim.monitorStarted = false
- lim.rescueLock.Unlock()
- }()
- for range ticker.C {
- if lim.store.Ping() {
- atomic.StoreUint32(&lim.redisAlive, 1)
- // 重置当前桶
- // 在redis不可用的时候,某些令牌可能得不到释放,所以redis一旦恢复可用直接重置全部桶
- reload(pingSuccess)
- return
- }
- }
- }
- func (lim *TokenLimiter) Reload() {
- lim.rescueLock.Lock()
- _, err := lim.store.Eval(resetScript, []string{
- lim.tokenKey,
- lim.prefix + lim.tokenKey,
- }, []string{
- strconv.Itoa(lim.burst),
- })
- if err == redis.Nil {
- lim.rescueLock.Unlock()
- // 重载失败
- lim.startMonitor()
- return
- }
- lim.update = false
- lim.rescueLock.Unlock()
- return
- }
- func (lim *TokenLimiter) release(token string) {
- from := getTokenfrom(token)
- switch from {
- case fromRedis:
- if !releaseToken(lim.store, lim.prefix, lim.tokenKey, token) {
- // redis 限流执行失败
- // 执行异常,开启redis健康探测任务
- lim.startMonitor()
- }
- break
- case fromMem:
- lim.rescueLimter.releaseToken(token)
- }
- }
|