ring.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. package redis
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand"
  6. "strconv"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "gopkg.in/redis.v5/internal"
  11. "gopkg.in/redis.v5/internal/consistenthash"
  12. "gopkg.in/redis.v5/internal/hashtag"
  13. "gopkg.in/redis.v5/internal/pool"
  14. )
  15. var errRingShardsDown = errors.New("redis: all ring shards are down")
  16. // RingOptions are used to configure a ring client and should be
  17. // passed to NewRing.
  18. type RingOptions struct {
  19. // Map of name => host:port addresses of ring shards.
  20. Addrs map[string]string
  21. // Frequency of PING commands sent to check shards availability.
  22. // Shard is considered down after 3 subsequent failed checks.
  23. HeartbeatFrequency time.Duration
  24. // Following options are copied from Options struct.
  25. DB int
  26. Password string
  27. MaxRetries int
  28. DialTimeout time.Duration
  29. ReadTimeout time.Duration
  30. WriteTimeout time.Duration
  31. PoolSize int
  32. PoolTimeout time.Duration
  33. IdleTimeout time.Duration
  34. IdleCheckFrequency time.Duration
  35. }
  36. func (opt *RingOptions) init() {
  37. if opt.HeartbeatFrequency == 0 {
  38. opt.HeartbeatFrequency = 500 * time.Millisecond
  39. }
  40. }
  41. func (opt *RingOptions) clientOptions() *Options {
  42. return &Options{
  43. DB: opt.DB,
  44. Password: opt.Password,
  45. DialTimeout: opt.DialTimeout,
  46. ReadTimeout: opt.ReadTimeout,
  47. WriteTimeout: opt.WriteTimeout,
  48. PoolSize: opt.PoolSize,
  49. PoolTimeout: opt.PoolTimeout,
  50. IdleTimeout: opt.IdleTimeout,
  51. IdleCheckFrequency: opt.IdleCheckFrequency,
  52. }
  53. }
  54. type ringShard struct {
  55. Client *Client
  56. down int32
  57. }
  58. func (shard *ringShard) String() string {
  59. var state string
  60. if shard.IsUp() {
  61. state = "up"
  62. } else {
  63. state = "down"
  64. }
  65. return fmt.Sprintf("%s is %s", shard.Client, state)
  66. }
  67. func (shard *ringShard) IsDown() bool {
  68. const threshold = 3
  69. return atomic.LoadInt32(&shard.down) >= threshold
  70. }
  71. func (shard *ringShard) IsUp() bool {
  72. return !shard.IsDown()
  73. }
  74. // Vote votes to set shard state and returns true if state was changed.
  75. func (shard *ringShard) Vote(up bool) bool {
  76. if up {
  77. changed := shard.IsDown()
  78. atomic.StoreInt32(&shard.down, 0)
  79. return changed
  80. }
  81. if shard.IsDown() {
  82. return false
  83. }
  84. atomic.AddInt32(&shard.down, 1)
  85. return shard.IsDown()
  86. }
  87. // Ring is a Redis client that uses constistent hashing to distribute
  88. // keys across multiple Redis servers (shards). It's safe for
  89. // concurrent use by multiple goroutines.
  90. //
  91. // Ring monitors the state of each shard and removes dead shards from
  92. // the ring. When shard comes online it is added back to the ring. This
  93. // gives you maximum availability and partition tolerance, but no
  94. // consistency between different shards or even clients. Each client
  95. // uses shards that are available to the client and does not do any
  96. // coordination when shard state is changed.
  97. //
  98. // Ring should be used when you need multiple Redis servers for caching
  99. // and can tolerate losing data when one of the servers dies.
  100. // Otherwise you should use Redis Cluster.
  101. type Ring struct {
  102. cmdable
  103. opt *RingOptions
  104. nreplicas int
  105. mu sync.RWMutex
  106. hash *consistenthash.Map
  107. shards map[string]*ringShard
  108. cmdsInfoOnce *sync.Once
  109. cmdsInfo map[string]*CommandInfo
  110. closed bool
  111. }
  112. func NewRing(opt *RingOptions) *Ring {
  113. const nreplicas = 100
  114. opt.init()
  115. ring := &Ring{
  116. opt: opt,
  117. nreplicas: nreplicas,
  118. hash: consistenthash.New(nreplicas, nil),
  119. shards: make(map[string]*ringShard),
  120. cmdsInfoOnce: new(sync.Once),
  121. }
  122. ring.cmdable.process = ring.Process
  123. for name, addr := range opt.Addrs {
  124. clopt := opt.clientOptions()
  125. clopt.Addr = addr
  126. ring.addClient(name, NewClient(clopt))
  127. }
  128. go ring.heartbeat()
  129. return ring
  130. }
  131. // PoolStats returns accumulated connection pool stats.
  132. func (c *Ring) PoolStats() *PoolStats {
  133. var acc PoolStats
  134. for _, shard := range c.shards {
  135. s := shard.Client.connPool.Stats()
  136. acc.Requests += s.Requests
  137. acc.Hits += s.Hits
  138. acc.Timeouts += s.Timeouts
  139. acc.TotalConns += s.TotalConns
  140. acc.FreeConns += s.FreeConns
  141. }
  142. return &acc
  143. }
  144. // ForEachShard concurrently calls the fn on each live shard in the ring.
  145. // It returns the first error if any.
  146. func (c *Ring) ForEachShard(fn func(client *Client) error) error {
  147. var wg sync.WaitGroup
  148. errCh := make(chan error, 1)
  149. for _, shard := range c.shards {
  150. if shard.IsDown() {
  151. continue
  152. }
  153. wg.Add(1)
  154. go func(shard *ringShard) {
  155. defer wg.Done()
  156. err := fn(shard.Client)
  157. if err != nil {
  158. select {
  159. case errCh <- err:
  160. default:
  161. }
  162. }
  163. }(shard)
  164. }
  165. wg.Wait()
  166. select {
  167. case err := <-errCh:
  168. return err
  169. default:
  170. return nil
  171. }
  172. }
  173. func (c *Ring) cmdInfo(name string) *CommandInfo {
  174. c.cmdsInfoOnce.Do(func() {
  175. for _, shard := range c.shards {
  176. cmdsInfo, err := shard.Client.Command().Result()
  177. if err == nil {
  178. c.cmdsInfo = cmdsInfo
  179. return
  180. }
  181. }
  182. c.cmdsInfoOnce = &sync.Once{}
  183. })
  184. if c.cmdsInfo == nil {
  185. return nil
  186. }
  187. return c.cmdsInfo[name]
  188. }
  189. func (c *Ring) addClient(name string, cl *Client) {
  190. c.mu.Lock()
  191. c.hash.Add(name)
  192. c.shards[name] = &ringShard{Client: cl}
  193. c.mu.Unlock()
  194. }
  195. func (c *Ring) shardByKey(key string) (*ringShard, error) {
  196. key = hashtag.Key(key)
  197. c.mu.RLock()
  198. if c.closed {
  199. c.mu.RUnlock()
  200. return nil, pool.ErrClosed
  201. }
  202. name := c.hash.Get(key)
  203. if name == "" {
  204. c.mu.RUnlock()
  205. return nil, errRingShardsDown
  206. }
  207. shard := c.shards[name]
  208. c.mu.RUnlock()
  209. return shard, nil
  210. }
  211. func (c *Ring) randomShard() (*ringShard, error) {
  212. return c.shardByKey(strconv.Itoa(rand.Int()))
  213. }
  214. func (c *Ring) shardByName(name string) (*ringShard, error) {
  215. if name == "" {
  216. return c.randomShard()
  217. }
  218. c.mu.RLock()
  219. shard := c.shards[name]
  220. c.mu.RUnlock()
  221. return shard, nil
  222. }
  223. func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
  224. cmdInfo := c.cmdInfo(cmd.name())
  225. firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
  226. return c.shardByKey(firstKey)
  227. }
  228. func (c *Ring) Process(cmd Cmder) error {
  229. shard, err := c.cmdShard(cmd)
  230. if err != nil {
  231. cmd.setErr(err)
  232. return err
  233. }
  234. return shard.Client.Process(cmd)
  235. }
  236. // rebalance removes dead shards from the Ring.
  237. func (c *Ring) rebalance() {
  238. hash := consistenthash.New(c.nreplicas, nil)
  239. for name, shard := range c.shards {
  240. if shard.IsUp() {
  241. hash.Add(name)
  242. }
  243. }
  244. c.mu.Lock()
  245. c.hash = hash
  246. c.mu.Unlock()
  247. }
  248. // heartbeat monitors state of each shard in the ring.
  249. func (c *Ring) heartbeat() {
  250. ticker := time.NewTicker(c.opt.HeartbeatFrequency)
  251. defer ticker.Stop()
  252. for _ = range ticker.C {
  253. var rebalance bool
  254. c.mu.RLock()
  255. if c.closed {
  256. c.mu.RUnlock()
  257. break
  258. }
  259. for _, shard := range c.shards {
  260. err := shard.Client.Ping().Err()
  261. if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
  262. internal.Logf("ring shard state changed: %s", shard)
  263. rebalance = true
  264. }
  265. }
  266. c.mu.RUnlock()
  267. if rebalance {
  268. c.rebalance()
  269. }
  270. }
  271. }
  272. // Close closes the ring client, releasing any open resources.
  273. //
  274. // It is rare to Close a Ring, as the Ring is meant to be long-lived
  275. // and shared between many goroutines.
  276. func (c *Ring) Close() error {
  277. c.mu.Lock()
  278. defer c.mu.Unlock()
  279. if c.closed {
  280. return nil
  281. }
  282. c.closed = true
  283. var firstErr error
  284. for _, shard := range c.shards {
  285. if err := shard.Client.Close(); err != nil && firstErr == nil {
  286. firstErr = err
  287. }
  288. }
  289. c.hash = nil
  290. c.shards = nil
  291. return firstErr
  292. }
  293. func (c *Ring) Pipeline() *Pipeline {
  294. pipe := Pipeline{
  295. exec: c.pipelineExec,
  296. }
  297. pipe.cmdable.process = pipe.Process
  298. pipe.statefulCmdable.process = pipe.Process
  299. return &pipe
  300. }
  301. func (c *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
  302. return c.Pipeline().pipelined(fn)
  303. }
  304. func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
  305. cmdsMap := make(map[string][]Cmder)
  306. for _, cmd := range cmds {
  307. cmdInfo := c.cmdInfo(cmd.name())
  308. name := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
  309. if name != "" {
  310. name = c.hash.Get(hashtag.Key(name))
  311. }
  312. cmdsMap[name] = append(cmdsMap[name], cmd)
  313. }
  314. for i := 0; i <= c.opt.MaxRetries; i++ {
  315. var failedCmdsMap map[string][]Cmder
  316. for name, cmds := range cmdsMap {
  317. shard, err := c.shardByName(name)
  318. if err != nil {
  319. setCmdsErr(cmds, err)
  320. if firstErr == nil {
  321. firstErr = err
  322. }
  323. continue
  324. }
  325. cn, _, err := shard.Client.conn()
  326. if err != nil {
  327. setCmdsErr(cmds, err)
  328. if firstErr == nil {
  329. firstErr = err
  330. }
  331. continue
  332. }
  333. canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
  334. shard.Client.putConn(cn, err, false)
  335. if err == nil {
  336. continue
  337. }
  338. if firstErr == nil {
  339. firstErr = err
  340. }
  341. if canRetry && internal.IsRetryableError(err) {
  342. if failedCmdsMap == nil {
  343. failedCmdsMap = make(map[string][]Cmder)
  344. }
  345. failedCmdsMap[name] = cmds
  346. }
  347. }
  348. if len(failedCmdsMap) == 0 {
  349. break
  350. }
  351. cmdsMap = failedCmdsMap
  352. }
  353. return firstErr
  354. }