ring_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. package redis_test
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/go-redis/redis"
  11. . "github.com/onsi/ginkgo"
  12. . "github.com/onsi/gomega"
  13. )
  14. var _ = Describe("Redis Ring", func() {
  15. const heartbeat = 100 * time.Millisecond
  16. var ring *redis.Ring
  17. setRingKeys := func() {
  18. for i := 0; i < 100; i++ {
  19. err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  20. Expect(err).NotTo(HaveOccurred())
  21. }
  22. }
  23. BeforeEach(func() {
  24. opt := redisRingOptions()
  25. opt.HeartbeatFrequency = heartbeat
  26. ring = redis.NewRing(opt)
  27. err := ring.ForEachShard(func(cl *redis.Client) error {
  28. return cl.FlushDB().Err()
  29. })
  30. Expect(err).NotTo(HaveOccurred())
  31. })
  32. AfterEach(func() {
  33. Expect(ring.Close()).NotTo(HaveOccurred())
  34. })
  35. It("distributes keys", func() {
  36. setRingKeys()
  37. // Both shards should have some keys now.
  38. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
  39. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  40. })
  41. It("distributes keys when using EVAL", func() {
  42. script := redis.NewScript(`
  43. local r = redis.call('SET', KEYS[1], ARGV[1])
  44. return r
  45. `)
  46. var key string
  47. for i := 0; i < 100; i++ {
  48. key = fmt.Sprintf("key%d", i)
  49. err := script.Run(ring, []string{key}, "value").Err()
  50. Expect(err).NotTo(HaveOccurred())
  51. }
  52. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
  53. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  54. })
  55. It("uses single shard when one of the shards is down", func() {
  56. // Stop ringShard2.
  57. Expect(ringShard2.Close()).NotTo(HaveOccurred())
  58. Eventually(func() int {
  59. return ring.Len()
  60. }, "30s").Should(Equal(1))
  61. setRingKeys()
  62. // RingShard1 should have all keys.
  63. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
  64. // Start ringShard2.
  65. var err error
  66. ringShard2, err = startRedis(ringShard2Port)
  67. Expect(err).NotTo(HaveOccurred())
  68. Eventually(func() int {
  69. return ring.Len()
  70. }, "30s").Should(Equal(2))
  71. setRingKeys()
  72. // RingShard2 should have its keys.
  73. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  74. })
  75. It("supports hash tags", func() {
  76. for i := 0; i < 100; i++ {
  77. err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
  78. Expect(err).NotTo(HaveOccurred())
  79. }
  80. Expect(ringShard1.Info("keyspace").Val()).ToNot(ContainSubstring("keys="))
  81. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
  82. })
  83. It("propagates process for WithContext", func() {
  84. var fromWrap []string
  85. wrapper := func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
  86. return func(cmd redis.Cmder) error {
  87. fromWrap = append(fromWrap, cmd.Name())
  88. return oldProcess(cmd)
  89. }
  90. }
  91. ctx := context.Background()
  92. ring = ring.WithContext(ctx)
  93. ring.WrapProcess(wrapper)
  94. ring.Ping()
  95. Expect(fromWrap).To(Equal([]string{"ping"}))
  96. ring.Ping()
  97. Expect(fromWrap).To(Equal([]string{"ping", "ping"}))
  98. })
  99. Describe("pipeline", func() {
  100. It("distributes keys", func() {
  101. pipe := ring.Pipeline()
  102. for i := 0; i < 100; i++ {
  103. err := pipe.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  104. Expect(err).NotTo(HaveOccurred())
  105. }
  106. cmds, err := pipe.Exec()
  107. Expect(err).NotTo(HaveOccurred())
  108. Expect(cmds).To(HaveLen(100))
  109. Expect(pipe.Close()).NotTo(HaveOccurred())
  110. for _, cmd := range cmds {
  111. Expect(cmd.Err()).NotTo(HaveOccurred())
  112. Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK"))
  113. }
  114. // Both shards should have some keys now.
  115. Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
  116. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
  117. })
  118. It("is consistent with ring", func() {
  119. var keys []string
  120. for i := 0; i < 100; i++ {
  121. key := make([]byte, 64)
  122. _, err := rand.Read(key)
  123. Expect(err).NotTo(HaveOccurred())
  124. keys = append(keys, string(key))
  125. }
  126. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  127. for _, key := range keys {
  128. pipe.Set(key, "value", 0).Err()
  129. }
  130. return nil
  131. })
  132. Expect(err).NotTo(HaveOccurred())
  133. for _, key := range keys {
  134. val, err := ring.Get(key).Result()
  135. Expect(err).NotTo(HaveOccurred())
  136. Expect(val).To(Equal("value"))
  137. }
  138. })
  139. It("supports hash tags", func() {
  140. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  141. for i := 0; i < 100; i++ {
  142. pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
  143. }
  144. return nil
  145. })
  146. Expect(err).NotTo(HaveOccurred())
  147. Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
  148. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
  149. })
  150. })
  151. })
  152. var _ = Describe("empty Redis Ring", func() {
  153. var ring *redis.Ring
  154. BeforeEach(func() {
  155. ring = redis.NewRing(&redis.RingOptions{})
  156. })
  157. AfterEach(func() {
  158. Expect(ring.Close()).NotTo(HaveOccurred())
  159. })
  160. It("returns an error", func() {
  161. err := ring.Ping().Err()
  162. Expect(err).To(MatchError("redis: all ring shards are down"))
  163. })
  164. It("pipeline returns an error", func() {
  165. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  166. pipe.Ping()
  167. return nil
  168. })
  169. Expect(err).To(MatchError("redis: all ring shards are down"))
  170. })
  171. })
  172. var _ = Describe("Ring watch", func() {
  173. const heartbeat = 100 * time.Millisecond
  174. var ring *redis.Ring
  175. BeforeEach(func() {
  176. opt := redisRingOptions()
  177. opt.HeartbeatFrequency = heartbeat
  178. ring = redis.NewRing(opt)
  179. err := ring.ForEachShard(func(cl *redis.Client) error {
  180. return cl.FlushDB().Err()
  181. })
  182. Expect(err).NotTo(HaveOccurred())
  183. })
  184. AfterEach(func() {
  185. Expect(ring.Close()).NotTo(HaveOccurred())
  186. })
  187. It("should Watch", func() {
  188. var incr func(string) error
  189. // Transactionally increments key using GET and SET commands.
  190. incr = func(key string) error {
  191. err := ring.Watch(func(tx *redis.Tx) error {
  192. n, err := tx.Get(key).Int64()
  193. if err != nil && err != redis.Nil {
  194. return err
  195. }
  196. _, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
  197. pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
  198. return nil
  199. })
  200. return err
  201. }, key)
  202. if err == redis.TxFailedErr {
  203. return incr(key)
  204. }
  205. return err
  206. }
  207. var wg sync.WaitGroup
  208. for i := 0; i < 100; i++ {
  209. wg.Add(1)
  210. go func() {
  211. defer GinkgoRecover()
  212. defer wg.Done()
  213. err := incr("key")
  214. Expect(err).NotTo(HaveOccurred())
  215. }()
  216. }
  217. wg.Wait()
  218. n, err := ring.Get("key").Int64()
  219. Expect(err).NotTo(HaveOccurred())
  220. Expect(n).To(Equal(int64(100)))
  221. })
  222. It("should discard", func() {
  223. err := ring.Watch(func(tx *redis.Tx) error {
  224. cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
  225. pipe.Set("key1", "hello1", 0)
  226. pipe.Discard()
  227. pipe.Set("key2", "hello2", 0)
  228. return nil
  229. })
  230. Expect(err).NotTo(HaveOccurred())
  231. Expect(cmds).To(HaveLen(1))
  232. return err
  233. }, "key1", "key2")
  234. Expect(err).NotTo(HaveOccurred())
  235. get := ring.Get("key1")
  236. Expect(get.Err()).To(Equal(redis.Nil))
  237. Expect(get.Val()).To(Equal(""))
  238. get = ring.Get("key2")
  239. Expect(get.Err()).NotTo(HaveOccurred())
  240. Expect(get.Val()).To(Equal("hello2"))
  241. })
  242. It("returns no error when there are no commands", func() {
  243. err := ring.Watch(func(tx *redis.Tx) error {
  244. _, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
  245. return err
  246. }, "key")
  247. Expect(err).NotTo(HaveOccurred())
  248. v, err := ring.Ping().Result()
  249. Expect(err).NotTo(HaveOccurred())
  250. Expect(v).To(Equal("PONG"))
  251. })
  252. It("should exec bulks", func() {
  253. const N = 20000
  254. err := ring.Watch(func(tx *redis.Tx) error {
  255. cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
  256. for i := 0; i < N; i++ {
  257. pipe.Incr("key")
  258. }
  259. return nil
  260. })
  261. Expect(err).NotTo(HaveOccurred())
  262. Expect(len(cmds)).To(Equal(N))
  263. for _, cmd := range cmds {
  264. Expect(cmd.Err()).NotTo(HaveOccurred())
  265. }
  266. return err
  267. }, "key")
  268. Expect(err).NotTo(HaveOccurred())
  269. num, err := ring.Get("key").Int64()
  270. Expect(err).NotTo(HaveOccurred())
  271. Expect(num).To(Equal(int64(N)))
  272. })
  273. It("should Watch/Unwatch", func() {
  274. var C, N int
  275. err := ring.Set("key", "0", 0).Err()
  276. Expect(err).NotTo(HaveOccurred())
  277. perform(C, func(id int) {
  278. for i := 0; i < N; i++ {
  279. err := ring.Watch(func(tx *redis.Tx) error {
  280. val, err := tx.Get("key").Result()
  281. Expect(err).NotTo(HaveOccurred())
  282. Expect(val).NotTo(Equal(redis.Nil))
  283. num, err := strconv.ParseInt(val, 10, 64)
  284. Expect(err).NotTo(HaveOccurred())
  285. cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
  286. pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
  287. return nil
  288. })
  289. Expect(cmds).To(HaveLen(1))
  290. return err
  291. }, "key")
  292. if err == redis.TxFailedErr {
  293. i--
  294. continue
  295. }
  296. Expect(err).NotTo(HaveOccurred())
  297. }
  298. })
  299. val, err := ring.Get("key").Int64()
  300. Expect(err).NotTo(HaveOccurred())
  301. Expect(val).To(Equal(int64(C * N)))
  302. })
  303. It("should close Tx without closing the client", func() {
  304. err := ring.Watch(func(tx *redis.Tx) error {
  305. _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
  306. pipe.Ping()
  307. return nil
  308. })
  309. return err
  310. }, "key")
  311. Expect(err).NotTo(HaveOccurred())
  312. Expect(ring.Ping().Err()).NotTo(HaveOccurred())
  313. })
  314. It("respects max size on multi", func() {
  315. perform(1000, func(id int) {
  316. var ping *redis.StatusCmd
  317. err := ring.Watch(func(tx *redis.Tx) error {
  318. cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
  319. ping = pipe.Ping()
  320. return nil
  321. })
  322. Expect(err).NotTo(HaveOccurred())
  323. Expect(cmds).To(HaveLen(1))
  324. return err
  325. }, "key")
  326. Expect(err).NotTo(HaveOccurred())
  327. Expect(ping.Err()).NotTo(HaveOccurred())
  328. Expect(ping.Val()).To(Equal("PONG"))
  329. })
  330. ring.ForEachShard(func(cl *redis.Client) error {
  331. pool := cl.Pool()
  332. Expect(pool.Len()).To(BeNumerically("<=", 10))
  333. Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
  334. Expect(pool.Len()).To(Equal(pool.IdleLen()))
  335. return nil
  336. })
  337. })
  338. })
  339. var _ = Describe("Ring Tx timeout", func() {
  340. const heartbeat = 100 * time.Millisecond
  341. var ring *redis.Ring
  342. AfterEach(func() {
  343. _ = ring.Close()
  344. })
  345. testTimeout := func() {
  346. It("Tx timeouts", func() {
  347. err := ring.Watch(func(tx *redis.Tx) error {
  348. return tx.Ping().Err()
  349. }, "foo")
  350. Expect(err).To(HaveOccurred())
  351. Expect(err.(net.Error).Timeout()).To(BeTrue())
  352. })
  353. It("Tx Pipeline timeouts", func() {
  354. err := ring.Watch(func(tx *redis.Tx) error {
  355. _, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
  356. pipe.Ping()
  357. return nil
  358. })
  359. return err
  360. }, "foo")
  361. Expect(err).To(HaveOccurred())
  362. Expect(err.(net.Error).Timeout()).To(BeTrue())
  363. })
  364. }
  365. const pause = 5 * time.Second
  366. Context("read/write timeout", func() {
  367. BeforeEach(func() {
  368. opt := redisRingOptions()
  369. opt.ReadTimeout = 250 * time.Millisecond
  370. opt.WriteTimeout = 250 * time.Millisecond
  371. opt.HeartbeatFrequency = heartbeat
  372. ring = redis.NewRing(opt)
  373. err := ring.ForEachShard(func(client *redis.Client) error {
  374. return client.ClientPause(pause).Err()
  375. })
  376. Expect(err).NotTo(HaveOccurred())
  377. })
  378. AfterEach(func() {
  379. _ = ring.ForEachShard(func(client *redis.Client) error {
  380. defer GinkgoRecover()
  381. Eventually(func() error {
  382. return client.Ping().Err()
  383. }, 2*pause).ShouldNot(HaveOccurred())
  384. return nil
  385. })
  386. })
  387. testTimeout()
  388. })
  389. })