ring_test.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package redis_test
  2. import (
  3. "crypto/rand"
  4. "fmt"
  5. "time"
  6. "github.com/go-redis/redis"
  7. . "github.com/onsi/ginkgo"
  8. . "github.com/onsi/gomega"
  9. )
  10. var _ = Describe("Redis Ring", func() {
  11. const heartbeat = 100 * time.Millisecond
  12. var ring *redis.Ring
  13. setRingKeys := func() {
  14. for i := 0; i < 100; i++ {
  15. err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  16. Expect(err).NotTo(HaveOccurred())
  17. }
  18. }
  19. BeforeEach(func() {
  20. opt := redisRingOptions()
  21. opt.HeartbeatFrequency = heartbeat
  22. ring = redis.NewRing(opt)
  23. err := ring.ForEachShard(func(cl *redis.Client) error {
  24. return cl.FlushDB().Err()
  25. })
  26. Expect(err).NotTo(HaveOccurred())
  27. })
  28. AfterEach(func() {
  29. Expect(ring.Close()).NotTo(HaveOccurred())
  30. })
  31. It("distributes keys", func() {
  32. setRingKeys()
  33. // Both shards should have some keys now.
  34. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
  35. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  36. })
  37. It("distributes keys when using EVAL", func() {
  38. script := redis.NewScript(`
  39. local r = redis.call('SET', KEYS[1], ARGV[1])
  40. return r
  41. `)
  42. var key string
  43. for i := 0; i < 100; i++ {
  44. key = fmt.Sprintf("key%d", i)
  45. err := script.Run(ring, []string{key}, "value").Err()
  46. Expect(err).NotTo(HaveOccurred())
  47. }
  48. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
  49. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  50. })
  51. It("uses single shard when one of the shards is down", func() {
  52. // Stop ringShard2.
  53. Expect(ringShard2.Close()).NotTo(HaveOccurred())
  54. Eventually(func() int {
  55. return ring.Len()
  56. }, "30s").Should(Equal(1))
  57. setRingKeys()
  58. // RingShard1 should have all keys.
  59. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
  60. // Start ringShard2.
  61. var err error
  62. ringShard2, err = startRedis(ringShard2Port)
  63. Expect(err).NotTo(HaveOccurred())
  64. Eventually(func() int {
  65. return ring.Len()
  66. }, "30s").Should(Equal(2))
  67. setRingKeys()
  68. // RingShard2 should have its keys.
  69. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  70. })
  71. It("supports hash tags", func() {
  72. for i := 0; i < 100; i++ {
  73. err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
  74. Expect(err).NotTo(HaveOccurred())
  75. }
  76. Expect(ringShard1.Info("keyspace").Val()).ToNot(ContainSubstring("keys="))
  77. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
  78. })
  79. Describe("pipeline", func() {
  80. It("distributes keys", func() {
  81. pipe := ring.Pipeline()
  82. for i := 0; i < 100; i++ {
  83. err := pipe.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  84. Expect(err).NotTo(HaveOccurred())
  85. }
  86. cmds, err := pipe.Exec()
  87. Expect(err).NotTo(HaveOccurred())
  88. Expect(cmds).To(HaveLen(100))
  89. Expect(pipe.Close()).NotTo(HaveOccurred())
  90. for _, cmd := range cmds {
  91. Expect(cmd.Err()).NotTo(HaveOccurred())
  92. Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK"))
  93. }
  94. // Both shards should have some keys now.
  95. Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
  96. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
  97. })
  98. It("is consistent with ring", func() {
  99. var keys []string
  100. for i := 0; i < 100; i++ {
  101. key := make([]byte, 64)
  102. _, err := rand.Read(key)
  103. Expect(err).NotTo(HaveOccurred())
  104. keys = append(keys, string(key))
  105. }
  106. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  107. for _, key := range keys {
  108. pipe.Set(key, "value", 0).Err()
  109. }
  110. return nil
  111. })
  112. Expect(err).NotTo(HaveOccurred())
  113. for _, key := range keys {
  114. val, err := ring.Get(key).Result()
  115. Expect(err).NotTo(HaveOccurred())
  116. Expect(val).To(Equal("value"))
  117. }
  118. })
  119. It("supports hash tags", func() {
  120. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  121. for i := 0; i < 100; i++ {
  122. pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
  123. }
  124. return nil
  125. })
  126. Expect(err).NotTo(HaveOccurred())
  127. Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
  128. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
  129. })
  130. })
  131. })
  132. var _ = Describe("empty Redis Ring", func() {
  133. var ring *redis.Ring
  134. BeforeEach(func() {
  135. ring = redis.NewRing(&redis.RingOptions{})
  136. })
  137. AfterEach(func() {
  138. Expect(ring.Close()).NotTo(HaveOccurred())
  139. })
  140. It("returns an error", func() {
  141. err := ring.Ping().Err()
  142. Expect(err).To(MatchError("redis: all ring shards are down"))
  143. })
  144. It("pipeline returns an error", func() {
  145. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  146. pipe.Ping()
  147. return nil
  148. })
  149. Expect(err).To(MatchError("redis: all ring shards are down"))
  150. })
  151. })