cluster_test.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702
  1. package redis_test
  2. import (
  3. "fmt"
  4. "net"
  5. "strconv"
  6. "strings"
  7. "sync"
  8. "testing"
  9. "time"
  10. "gopkg.in/redis.v5"
  11. "gopkg.in/redis.v5/internal/hashtag"
  12. . "github.com/onsi/ginkgo"
  13. . "github.com/onsi/gomega"
  14. )
  15. type clusterScenario struct {
  16. ports []string
  17. nodeIds []string
  18. processes map[string]*redisProcess
  19. clients map[string]*redis.Client
  20. }
  21. func (s *clusterScenario) masters() []*redis.Client {
  22. result := make([]*redis.Client, 3)
  23. for pos, port := range s.ports[:3] {
  24. result[pos] = s.clients[port]
  25. }
  26. return result
  27. }
  28. func (s *clusterScenario) slaves() []*redis.Client {
  29. result := make([]*redis.Client, 3)
  30. for pos, port := range s.ports[3:] {
  31. result[pos] = s.clients[port]
  32. }
  33. return result
  34. }
  35. func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
  36. addrs := make([]string, len(s.ports))
  37. for i, port := range s.ports {
  38. addrs[i] = net.JoinHostPort("127.0.0.1", port)
  39. }
  40. opt.Addrs = addrs
  41. return redis.NewClusterClient(opt)
  42. }
  43. func startCluster(scenario *clusterScenario) error {
  44. // Start processes and collect node ids
  45. for pos, port := range scenario.ports {
  46. process, err := startRedis(port, "--cluster-enabled", "yes")
  47. if err != nil {
  48. return err
  49. }
  50. client := redis.NewClient(&redis.Options{
  51. Addr: ":" + port,
  52. })
  53. info, err := client.ClusterNodes().Result()
  54. if err != nil {
  55. return err
  56. }
  57. scenario.processes[port] = process
  58. scenario.clients[port] = client
  59. scenario.nodeIds[pos] = info[:40]
  60. }
  61. // Meet cluster nodes
  62. for _, client := range scenario.clients {
  63. err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
  64. if err != nil {
  65. return err
  66. }
  67. }
  68. // Bootstrap masters
  69. slots := []int{0, 5000, 10000, 16384}
  70. for pos, master := range scenario.masters() {
  71. err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
  72. if err != nil {
  73. return err
  74. }
  75. }
  76. // Bootstrap slaves
  77. for idx, slave := range scenario.slaves() {
  78. masterId := scenario.nodeIds[idx]
  79. // Wait until master is available
  80. err := eventually(func() error {
  81. s := slave.ClusterNodes().Val()
  82. wanted := masterId
  83. if !strings.Contains(s, wanted) {
  84. return fmt.Errorf("%q does not contain %q", s, wanted)
  85. }
  86. return nil
  87. }, 10*time.Second)
  88. if err != nil {
  89. return err
  90. }
  91. err = slave.ClusterReplicate(masterId).Err()
  92. if err != nil {
  93. return err
  94. }
  95. }
  96. // Wait until all nodes have consistent info
  97. for _, client := range scenario.clients {
  98. err := eventually(func() error {
  99. res, err := client.ClusterSlots().Result()
  100. if err != nil {
  101. return err
  102. }
  103. wanted := []redis.ClusterSlot{
  104. {0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}},
  105. {5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}},
  106. {10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}},
  107. }
  108. return assertSlotsEqual(res, wanted)
  109. }, 30*time.Second)
  110. if err != nil {
  111. return err
  112. }
  113. }
  114. return nil
  115. }
  116. func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
  117. outer_loop:
  118. for _, s2 := range wanted {
  119. for _, s1 := range slots {
  120. if slotEqual(s1, s2) {
  121. continue outer_loop
  122. }
  123. }
  124. return fmt.Errorf("%v not found in %v", s2, slots)
  125. }
  126. return nil
  127. }
  128. func slotEqual(s1, s2 redis.ClusterSlot) bool {
  129. if s1.Start != s2.Start {
  130. return false
  131. }
  132. if s1.End != s2.End {
  133. return false
  134. }
  135. if len(s1.Nodes) != len(s2.Nodes) {
  136. return false
  137. }
  138. for i, n1 := range s1.Nodes {
  139. if n1.Addr != s2.Nodes[i].Addr {
  140. return false
  141. }
  142. }
  143. return true
  144. }
  145. func stopCluster(scenario *clusterScenario) error {
  146. for _, client := range scenario.clients {
  147. if err := client.Close(); err != nil {
  148. return err
  149. }
  150. }
  151. for _, process := range scenario.processes {
  152. if err := process.Close(); err != nil {
  153. return err
  154. }
  155. }
  156. return nil
  157. }
  158. //------------------------------------------------------------------------------
  159. var _ = Describe("ClusterClient", func() {
  160. var opt *redis.ClusterOptions
  161. var client *redis.ClusterClient
  162. assertClusterClient := func() {
  163. It("should CLUSTER SLOTS", func() {
  164. res, err := client.ClusterSlots().Result()
  165. Expect(err).NotTo(HaveOccurred())
  166. Expect(res).To(HaveLen(3))
  167. wanted := []redis.ClusterSlot{
  168. {0, 4999, []redis.ClusterNode{{"", "127.0.0.1:8220"}, {"", "127.0.0.1:8223"}}},
  169. {5000, 9999, []redis.ClusterNode{{"", "127.0.0.1:8221"}, {"", "127.0.0.1:8224"}}},
  170. {10000, 16383, []redis.ClusterNode{{"", "127.0.0.1:8222"}, {"", "127.0.0.1:8225"}}},
  171. }
  172. Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
  173. })
  174. It("should CLUSTER NODES", func() {
  175. res, err := client.ClusterNodes().Result()
  176. Expect(err).NotTo(HaveOccurred())
  177. Expect(len(res)).To(BeNumerically(">", 400))
  178. })
  179. It("should CLUSTER INFO", func() {
  180. res, err := client.ClusterInfo().Result()
  181. Expect(err).NotTo(HaveOccurred())
  182. Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
  183. })
  184. It("should CLUSTER KEYSLOT", func() {
  185. hashSlot, err := client.ClusterKeySlot("somekey").Result()
  186. Expect(err).NotTo(HaveOccurred())
  187. Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
  188. })
  189. It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
  190. n, err := client.ClusterCountFailureReports(cluster.nodeIds[0]).Result()
  191. Expect(err).NotTo(HaveOccurred())
  192. Expect(n).To(Equal(int64(0)))
  193. })
  194. It("should CLUSTER COUNTKEYSINSLOT", func() {
  195. n, err := client.ClusterCountKeysInSlot(10).Result()
  196. Expect(err).NotTo(HaveOccurred())
  197. Expect(n).To(Equal(int64(0)))
  198. })
  199. It("should CLUSTER SAVECONFIG", func() {
  200. res, err := client.ClusterSaveConfig().Result()
  201. Expect(err).NotTo(HaveOccurred())
  202. Expect(res).To(Equal("OK"))
  203. })
  204. It("should CLUSTER SLAVES", func() {
  205. nodesList, err := client.ClusterSlaves(cluster.nodeIds[0]).Result()
  206. Expect(err).NotTo(HaveOccurred())
  207. Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
  208. Expect(nodesList).Should(HaveLen(1))
  209. })
  210. It("should GET/SET/DEL", func() {
  211. val, err := client.Get("A").Result()
  212. Expect(err).To(Equal(redis.Nil))
  213. Expect(val).To(Equal(""))
  214. val, err = client.Set("A", "VALUE", 0).Result()
  215. Expect(err).NotTo(HaveOccurred())
  216. Expect(val).To(Equal("OK"))
  217. val, err = client.Get("A").Result()
  218. Expect(err).NotTo(HaveOccurred())
  219. Expect(val).To(Equal("VALUE"))
  220. cnt, err := client.Del("A").Result()
  221. Expect(err).NotTo(HaveOccurred())
  222. Expect(cnt).To(Equal(int64(1)))
  223. })
  224. It("returns pool stats", func() {
  225. Expect(client.PoolStats()).To(BeAssignableToTypeOf(&redis.PoolStats{}))
  226. })
  227. It("removes idle connections", func() {
  228. stats := client.PoolStats()
  229. Expect(stats.TotalConns).NotTo(BeZero())
  230. Expect(stats.FreeConns).NotTo(BeZero())
  231. time.Sleep(2 * time.Second)
  232. stats = client.PoolStats()
  233. Expect(stats.TotalConns).To(BeZero())
  234. Expect(stats.FreeConns).To(BeZero())
  235. })
  236. It("follows redirects", func() {
  237. Expect(client.Set("A", "VALUE", 0).Err()).NotTo(HaveOccurred())
  238. slot := hashtag.Slot("A")
  239. Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
  240. val, err := client.Get("A").Result()
  241. Expect(err).NotTo(HaveOccurred())
  242. Expect(val).To(Equal("VALUE"))
  243. })
  244. It("returns an error when there are no attempts left", func() {
  245. opt := redisClusterOptions()
  246. opt.MaxRedirects = -1
  247. client := cluster.clusterClient(opt)
  248. slot := hashtag.Slot("A")
  249. Expect(client.SwapSlotNodes(slot)).To(Equal([]string{"127.0.0.1:8224", "127.0.0.1:8221"}))
  250. err := client.Get("A").Err()
  251. Expect(err).To(HaveOccurred())
  252. Expect(err.Error()).To(ContainSubstring("MOVED"))
  253. Expect(client.Close()).NotTo(HaveOccurred())
  254. })
  255. It("distributes keys", func() {
  256. for i := 0; i < 100; i++ {
  257. err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  258. Expect(err).NotTo(HaveOccurred())
  259. }
  260. wanted := []string{"keys=31", "keys=29", "keys=40"}
  261. for i, master := range cluster.masters() {
  262. Expect(master.Info().Val()).To(ContainSubstring(wanted[i]))
  263. }
  264. })
  265. It("distributes keys when using EVAL", func() {
  266. script := redis.NewScript(`
  267. local r = redis.call('SET', KEYS[1], ARGV[1])
  268. return r
  269. `)
  270. var key string
  271. for i := 0; i < 100; i++ {
  272. key = fmt.Sprintf("key%d", i)
  273. err := script.Run(client, []string{key}, "value").Err()
  274. Expect(err).NotTo(HaveOccurred())
  275. }
  276. wanted := []string{"keys=31", "keys=29", "keys=40"}
  277. for i, master := range cluster.masters() {
  278. Expect(master.Info().Val()).To(ContainSubstring(wanted[i]))
  279. }
  280. })
  281. It("supports Watch", func() {
  282. var incr func(string) error
  283. // Transactionally increments key using GET and SET commands.
  284. incr = func(key string) error {
  285. err := client.Watch(func(tx *redis.Tx) error {
  286. n, err := tx.Get(key).Int64()
  287. if err != nil && err != redis.Nil {
  288. return err
  289. }
  290. _, err = tx.Pipelined(func(pipe *redis.Pipeline) error {
  291. pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
  292. return nil
  293. })
  294. return err
  295. }, key)
  296. if err == redis.TxFailedErr {
  297. return incr(key)
  298. }
  299. return err
  300. }
  301. var wg sync.WaitGroup
  302. for i := 0; i < 100; i++ {
  303. wg.Add(1)
  304. go func() {
  305. defer GinkgoRecover()
  306. defer wg.Done()
  307. err := incr("key")
  308. Expect(err).NotTo(HaveOccurred())
  309. }()
  310. }
  311. wg.Wait()
  312. n, err := client.Get("key").Int64()
  313. Expect(err).NotTo(HaveOccurred())
  314. Expect(n).To(Equal(int64(100)))
  315. })
  316. Describe("pipelining", func() {
  317. var pipe *redis.Pipeline
  318. assertPipeline := func() {
  319. keys := []string{"A", "B", "C", "D", "E", "F", "G"}
  320. It("follows redirects", func() {
  321. for _, key := range keys {
  322. slot := hashtag.Slot(key)
  323. client.SwapSlotNodes(slot)
  324. }
  325. for i, key := range keys {
  326. pipe.Set(key, key+"_value", 0)
  327. pipe.Expire(key, time.Duration(i+1)*time.Hour)
  328. }
  329. cmds, err := pipe.Exec()
  330. Expect(err).NotTo(HaveOccurred())
  331. Expect(cmds).To(HaveLen(14))
  332. if opt.RouteByLatency {
  333. return
  334. }
  335. for _, key := range keys {
  336. slot := hashtag.Slot(key)
  337. client.SwapSlotNodes(slot)
  338. }
  339. for _, key := range keys {
  340. pipe.Get(key)
  341. pipe.TTL(key)
  342. }
  343. cmds, err = pipe.Exec()
  344. Expect(err).NotTo(HaveOccurred())
  345. Expect(cmds).To(HaveLen(14))
  346. for i, key := range keys {
  347. get := cmds[i*2].(*redis.StringCmd)
  348. Expect(get.Val()).To(Equal(key + "_value"))
  349. ttl := cmds[(i*2)+1].(*redis.DurationCmd)
  350. Expect(ttl.Val()).To(BeNumerically("~", time.Duration(i+1)*time.Hour, time.Second))
  351. }
  352. })
  353. It("works with missing keys", func() {
  354. pipe.Set("A", "A_value", 0)
  355. pipe.Set("C", "C_value", 0)
  356. _, err := pipe.Exec()
  357. Expect(err).NotTo(HaveOccurred())
  358. a := pipe.Get("A")
  359. b := pipe.Get("B")
  360. c := pipe.Get("C")
  361. cmds, err := pipe.Exec()
  362. Expect(err).To(Equal(redis.Nil))
  363. Expect(cmds).To(HaveLen(3))
  364. Expect(a.Err()).NotTo(HaveOccurred())
  365. Expect(a.Val()).To(Equal("A_value"))
  366. Expect(b.Err()).To(Equal(redis.Nil))
  367. Expect(b.Val()).To(Equal(""))
  368. Expect(c.Err()).NotTo(HaveOccurred())
  369. Expect(c.Val()).To(Equal("C_value"))
  370. })
  371. }
  372. Describe("Pipeline", func() {
  373. BeforeEach(func() {
  374. pipe = client.Pipeline()
  375. })
  376. AfterEach(func() {
  377. Expect(pipe.Close()).NotTo(HaveOccurred())
  378. })
  379. assertPipeline()
  380. })
  381. Describe("TxPipeline", func() {
  382. BeforeEach(func() {
  383. pipe = client.TxPipeline()
  384. })
  385. AfterEach(func() {
  386. Expect(pipe.Close()).NotTo(HaveOccurred())
  387. })
  388. assertPipeline()
  389. })
  390. })
  391. It("calls fn for every master node", func() {
  392. for i := 0; i < 10; i++ {
  393. Expect(client.Set(strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
  394. }
  395. err := client.ForEachMaster(func(master *redis.Client) error {
  396. return master.FlushDb().Err()
  397. })
  398. Expect(err).NotTo(HaveOccurred())
  399. for _, client := range cluster.masters() {
  400. keys, err := client.Keys("*").Result()
  401. Expect(err).NotTo(HaveOccurred())
  402. Expect(keys).To(HaveLen(0))
  403. }
  404. })
  405. }
  406. Describe("default ClusterClient", func() {
  407. BeforeEach(func() {
  408. opt = redisClusterOptions()
  409. client = cluster.clusterClient(opt)
  410. _ = client.ForEachMaster(func(master *redis.Client) error {
  411. return master.FlushDb().Err()
  412. })
  413. })
  414. AfterEach(func() {
  415. Expect(client.Close()).NotTo(HaveOccurred())
  416. })
  417. assertClusterClient()
  418. })
  419. Describe("ClusterClient with RouteByLatency", func() {
  420. BeforeEach(func() {
  421. opt = redisClusterOptions()
  422. opt.RouteByLatency = true
  423. client = cluster.clusterClient(opt)
  424. _ = client.ForEachMaster(func(master *redis.Client) error {
  425. return master.FlushDb().Err()
  426. })
  427. })
  428. AfterEach(func() {
  429. client.FlushDb()
  430. Expect(client.Close()).NotTo(HaveOccurred())
  431. })
  432. assertClusterClient()
  433. })
  434. })
  435. var _ = Describe("ClusterClient without nodes", func() {
  436. var client *redis.ClusterClient
  437. BeforeEach(func() {
  438. client = redis.NewClusterClient(&redis.ClusterOptions{})
  439. })
  440. AfterEach(func() {
  441. Expect(client.Close()).NotTo(HaveOccurred())
  442. })
  443. It("returns an error", func() {
  444. err := client.Ping().Err()
  445. Expect(err).To(MatchError("redis: cluster has no nodes"))
  446. })
  447. It("pipeline returns an error", func() {
  448. _, err := client.Pipelined(func(pipe *redis.Pipeline) error {
  449. pipe.Ping()
  450. return nil
  451. })
  452. Expect(err).To(MatchError("redis: cluster has no nodes"))
  453. })
  454. })
  455. var _ = Describe("ClusterClient without valid nodes", func() {
  456. var client *redis.ClusterClient
  457. BeforeEach(func() {
  458. client = redis.NewClusterClient(&redis.ClusterOptions{
  459. Addrs: []string{redisAddr},
  460. })
  461. })
  462. AfterEach(func() {
  463. Expect(client.Close()).NotTo(HaveOccurred())
  464. })
  465. It("returns an error", func() {
  466. err := client.Ping().Err()
  467. Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
  468. })
  469. It("pipeline returns an error", func() {
  470. _, err := client.Pipelined(func(pipe *redis.Pipeline) error {
  471. pipe.Ping()
  472. return nil
  473. })
  474. Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
  475. })
  476. })
  477. var _ = Describe("ClusterClient timeout", func() {
  478. var client *redis.ClusterClient
  479. AfterEach(func() {
  480. _ = client.Close()
  481. })
  482. testTimeout := func() {
  483. It("Ping timeouts", func() {
  484. err := client.Ping().Err()
  485. Expect(err).To(HaveOccurred())
  486. Expect(err.(net.Error).Timeout()).To(BeTrue())
  487. })
  488. It("Pipeline timeouts", func() {
  489. _, err := client.Pipelined(func(pipe *redis.Pipeline) error {
  490. pipe.Ping()
  491. return nil
  492. })
  493. Expect(err).To(HaveOccurred())
  494. Expect(err.(net.Error).Timeout()).To(BeTrue())
  495. })
  496. It("Tx timeouts", func() {
  497. err := client.Watch(func(tx *redis.Tx) error {
  498. return tx.Ping().Err()
  499. })
  500. Expect(err).To(HaveOccurred())
  501. Expect(err.(net.Error).Timeout()).To(BeTrue())
  502. })
  503. It("Tx Pipeline timeouts", func() {
  504. err := client.Watch(func(tx *redis.Tx) error {
  505. _, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
  506. pipe.Ping()
  507. return nil
  508. })
  509. return err
  510. })
  511. Expect(err).To(HaveOccurred())
  512. Expect(err.(net.Error).Timeout()).To(BeTrue())
  513. })
  514. }
  515. Context("read timeout", func() {
  516. BeforeEach(func() {
  517. opt := redisClusterOptions()
  518. opt.ReadTimeout = time.Nanosecond
  519. opt.WriteTimeout = -1
  520. client = cluster.clusterClient(opt)
  521. })
  522. testTimeout()
  523. })
  524. Context("write timeout", func() {
  525. BeforeEach(func() {
  526. opt := redisClusterOptions()
  527. opt.ReadTimeout = time.Nanosecond
  528. opt.WriteTimeout = -1
  529. client = cluster.clusterClient(opt)
  530. })
  531. testTimeout()
  532. })
  533. Context("network timeout", func() {
  534. const pause = time.Second
  535. BeforeEach(func() {
  536. opt := redisClusterOptions()
  537. opt.ReadTimeout = 100 * time.Millisecond
  538. opt.WriteTimeout = 100 * time.Millisecond
  539. opt.MaxRedirects = 1
  540. client = cluster.clusterClient(opt)
  541. err := client.ForEachNode(func(client *redis.Client) error {
  542. return client.ClientPause(pause).Err()
  543. })
  544. Expect(err).NotTo(HaveOccurred())
  545. })
  546. AfterEach(func() {
  547. Eventually(func() error {
  548. return client.ForEachNode(func(client *redis.Client) error {
  549. return client.Ping().Err()
  550. })
  551. }, 2*pause).ShouldNot(HaveOccurred())
  552. })
  553. testTimeout()
  554. })
  555. })
  556. //------------------------------------------------------------------------------
  557. func BenchmarkRedisClusterPing(b *testing.B) {
  558. if testing.Short() {
  559. b.Skip("skipping in short mode")
  560. }
  561. cluster := &clusterScenario{
  562. ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
  563. nodeIds: make([]string, 6),
  564. processes: make(map[string]*redisProcess, 6),
  565. clients: make(map[string]*redis.Client, 6),
  566. }
  567. if err := startCluster(cluster); err != nil {
  568. b.Fatal(err)
  569. }
  570. defer stopCluster(cluster)
  571. client := cluster.clusterClient(redisClusterOptions())
  572. defer client.Close()
  573. b.ResetTimer()
  574. b.RunParallel(func(pb *testing.PB) {
  575. for pb.Next() {
  576. if err := client.Ping().Err(); err != nil {
  577. b.Fatal(err)
  578. }
  579. }
  580. })
  581. }