token_limit.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. package limit
  2. import (
  3. "gd_auth_check/common.in/cache"
  4. "strconv"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/go-redis/redis"
  9. )
  10. const (
  11. script = `local capacity = tonumber(ARGV[1])
  12. local requested = tonumber(ARGV[2])
  13. local used = tonumber(redis.call("hget", KEYS[1], "used"))
  14. if used == nil then
  15. used = 0
  16. end
  17. local allowed = capacity - used - requested >= 0
  18. if allowed then
  19. redis.call("hincrby", KEYS[1], "used", requested)
  20. end
  21. redis.call("hset", KEYS[1], "capacity", capacity)
  22. return allowed`
  23. resetScript = `local keys = redis.call('keys', table.concat({'` + redisUsedTokenKey + `', KEYS[1] ,'*'}))
  24. if next(keys) ~= nil then
  25. redis.call('del', unpack(redis.call('keys', table.concat({'` + redisUsedTokenKey + `', KEYS[1] ,'*'}))))
  26. end
  27. return redis.call("hmset", KEYS[2], "capacity", ARGV[1], "used", 0)`
  28. pingInterval = time.Millisecond * 100
  29. redisRouterKeyPre = "rate_limit:router:"
  30. redisMerchantKeyPre = "rate_limit:merchant:"
  31. fromMem = "0"
  32. fromRedis = "1"
  33. pingSuccess = "ping.success"
  34. )
  35. type TokenLimiter struct {
  36. // 通容量
  37. burst int
  38. // redis key
  39. tokenKey string
  40. // redi key prefix
  41. prefix string
  42. // 储存容器
  43. store *cache.RedisCache
  44. // lock
  45. rescueLock sync.Mutex
  46. // redis 健康标识
  47. redisAlive uint32
  48. // redis故障时采用进程内 令牌桶限流器。容量 = 总容量 / 2
  49. rescueLimter *Limiter
  50. // redis监控探测任务标识
  51. monitorStarted bool
  52. // 是否更新,用已重载的时候做更新判断
  53. update bool
  54. }
  55. // 初始化令牌桶
  56. func NewTokenLimiter(burst int, prefix, tokenKey string, update bool, store *cache.RedisCache) *TokenLimiter {
  57. rescueBurst := burst / 2
  58. if rescueBurst == 0 {
  59. rescueBurst = 1
  60. }
  61. return &TokenLimiter{
  62. burst: burst,
  63. prefix: prefix,
  64. tokenKey: tokenKey,
  65. store: store,
  66. redisAlive: 1,
  67. rescueLimter: NewLimiter(rescueBurst),
  68. update: update,
  69. }
  70. }
  71. // 获取令牌, 一次获取一个令牌
  72. func (lim *TokenLimiter) Allow() (bool, string) {
  73. ok, tokenArr := lim.reserveN(1)
  74. if ok {
  75. return ok, tokenArr[0]
  76. }
  77. return ok, ""
  78. }
  79. func (lim *TokenLimiter) reserveN(n int) (bool, []string) {
  80. if atomic.LoadUint32(&lim.redisAlive) == 0 {
  81. return lim.rescueLimter.AllowN(n)
  82. }
  83. resp, err := lim.store.Eval(script, []string{
  84. lim.prefix + lim.tokenKey,
  85. }, []string{
  86. strconv.Itoa(lim.burst),
  87. strconv.Itoa(n),
  88. })
  89. // redis allowed == false
  90. // Lua boolean false -> r Nil bulk reply
  91. if err == redis.Nil {
  92. return false, nil
  93. } else if err != nil {
  94. // 执行异常,开启redis健康探测任务
  95. // 同时采用进程内限流器作为兜底
  96. lim.startMonitor()
  97. return lim.rescueLimter.AllowN(n)
  98. }
  99. code, ok := resp.(int64)
  100. if !ok {
  101. // redis 限流执行失败,使用内存限流
  102. // 执行异常,开启redis健康探测任务
  103. // 同时采用进程内限流器作为兜底
  104. lim.startMonitor()
  105. return lim.rescueLimter.AllowN(n)
  106. }
  107. if code == 1 {
  108. tokenArr := make([]string, 0, n)
  109. for i := 0; i < n; i++ {
  110. // 生成令牌
  111. token := getToken(fromRedis)
  112. tokenArr = append(tokenArr, token)
  113. // 存储令牌
  114. storeToken(lim.store, lim.prefix, lim.tokenKey, token)
  115. }
  116. return true, tokenArr
  117. }
  118. return false, nil
  119. }
  120. func (lim *TokenLimiter) startMonitor() {
  121. lim.rescueLock.Lock()
  122. defer lim.rescueLock.Unlock()
  123. if lim.monitorStarted {
  124. return
  125. }
  126. lim.monitorStarted = true
  127. atomic.StoreUint32(&lim.redisAlive, 0)
  128. go lim.waitForRedis()
  129. }
  130. func (lim *TokenLimiter) waitForRedis() {
  131. ticker := time.NewTicker(pingInterval)
  132. defer func() {
  133. ticker.Stop()
  134. lim.rescueLock.Lock()
  135. lim.monitorStarted = false
  136. lim.rescueLock.Unlock()
  137. }()
  138. for range ticker.C {
  139. if lim.store.Ping() {
  140. atomic.StoreUint32(&lim.redisAlive, 1)
  141. // 重置当前桶
  142. // 在redis不可用的时候,某些令牌可能得不到释放,所以redis一旦恢复可用直接重置全部桶
  143. reload(pingSuccess)
  144. return
  145. }
  146. }
  147. }
  148. func (lim *TokenLimiter) Reload() {
  149. lim.rescueLock.Lock()
  150. _, err := lim.store.Eval(resetScript, []string{
  151. lim.tokenKey,
  152. lim.prefix + lim.tokenKey,
  153. }, []string{
  154. strconv.Itoa(lim.burst),
  155. })
  156. if err == redis.Nil {
  157. lim.rescueLock.Unlock()
  158. // 重载失败
  159. lim.startMonitor()
  160. return
  161. }
  162. lim.update = false
  163. lim.rescueLock.Unlock()
  164. return
  165. }
  166. func (lim *TokenLimiter) release(token string) {
  167. from := getTokenfrom(token)
  168. switch from {
  169. case fromRedis:
  170. if !releaseToken(lim.store, lim.prefix, lim.tokenKey, token) {
  171. // redis 限流执行失败
  172. // 执行异常,开启redis健康探测任务
  173. lim.startMonitor()
  174. }
  175. break
  176. case fromMem:
  177. lim.rescueLimter.releaseToken(token)
  178. }
  179. }