main_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package redis_test
  2. import (
  3. "errors"
  4. "fmt"
  5. "net"
  6. "os"
  7. "os/exec"
  8. "path/filepath"
  9. "sync"
  10. "testing"
  11. "time"
  12. "github.com/go-redis/redis"
  13. . "github.com/onsi/ginkgo"
  14. . "github.com/onsi/gomega"
  15. )
  16. const (
  17. redisPort = "6380"
  18. redisAddr = ":" + redisPort
  19. redisSecondaryPort = "6381"
  20. )
  21. const (
  22. ringShard1Port = "6390"
  23. ringShard2Port = "6391"
  24. ringShard3Port = "6392"
  25. )
  26. const (
  27. sentinelName = "mymaster"
  28. sentinelMasterPort = "8123"
  29. sentinelSlave1Port = "8124"
  30. sentinelSlave2Port = "8125"
  31. sentinelPort = "8126"
  32. )
  33. var (
  34. redisMain *redisProcess
  35. ringShard1, ringShard2, ringShard3 *redisProcess
  36. sentinelMaster, sentinelSlave1, sentinelSlave2, sentinel *redisProcess
  37. )
  38. var cluster = &clusterScenario{
  39. ports: []string{"8220", "8221", "8222", "8223", "8224", "8225"},
  40. nodeIds: make([]string, 6),
  41. processes: make(map[string]*redisProcess, 6),
  42. clients: make(map[string]*redis.Client, 6),
  43. }
  44. var _ = BeforeSuite(func() {
  45. var err error
  46. redisMain, err = startRedis(redisPort)
  47. Expect(err).NotTo(HaveOccurred())
  48. ringShard1, err = startRedis(ringShard1Port)
  49. Expect(err).NotTo(HaveOccurred())
  50. ringShard2, err = startRedis(ringShard2Port)
  51. Expect(err).NotTo(HaveOccurred())
  52. ringShard3, err = startRedis(ringShard3Port)
  53. Expect(err).NotTo(HaveOccurred())
  54. sentinelMaster, err = startRedis(sentinelMasterPort)
  55. Expect(err).NotTo(HaveOccurred())
  56. sentinel, err = startSentinel(sentinelPort, sentinelName, sentinelMasterPort)
  57. Expect(err).NotTo(HaveOccurred())
  58. sentinelSlave1, err = startRedis(
  59. sentinelSlave1Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
  60. Expect(err).NotTo(HaveOccurred())
  61. sentinelSlave2, err = startRedis(
  62. sentinelSlave2Port, "--slaveof", "127.0.0.1", sentinelMasterPort)
  63. Expect(err).NotTo(HaveOccurred())
  64. Expect(startCluster(cluster)).NotTo(HaveOccurred())
  65. })
  66. var _ = AfterSuite(func() {
  67. Expect(redisMain.Close()).NotTo(HaveOccurred())
  68. Expect(ringShard1.Close()).NotTo(HaveOccurred())
  69. Expect(ringShard2.Close()).NotTo(HaveOccurred())
  70. Expect(ringShard3.Close()).NotTo(HaveOccurred())
  71. Expect(sentinel.Close()).NotTo(HaveOccurred())
  72. Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
  73. Expect(sentinelSlave2.Close()).NotTo(HaveOccurred())
  74. Expect(sentinelMaster.Close()).NotTo(HaveOccurred())
  75. Expect(stopCluster(cluster)).NotTo(HaveOccurred())
  76. })
  77. func TestGinkgoSuite(t *testing.T) {
  78. RegisterFailHandler(Fail)
  79. RunSpecs(t, "go-redis")
  80. }
  81. //------------------------------------------------------------------------------
  82. func redisOptions() *redis.Options {
  83. return &redis.Options{
  84. Addr: redisAddr,
  85. DB: 15,
  86. DialTimeout: 10 * time.Second,
  87. ReadTimeout: 30 * time.Second,
  88. WriteTimeout: 30 * time.Second,
  89. PoolSize: 10,
  90. PoolTimeout: 30 * time.Second,
  91. IdleTimeout: 500 * time.Millisecond,
  92. IdleCheckFrequency: 500 * time.Millisecond,
  93. }
  94. }
  95. func redisClusterOptions() *redis.ClusterOptions {
  96. return &redis.ClusterOptions{
  97. DialTimeout: 10 * time.Second,
  98. ReadTimeout: 30 * time.Second,
  99. WriteTimeout: 30 * time.Second,
  100. PoolSize: 10,
  101. PoolTimeout: 30 * time.Second,
  102. IdleTimeout: 500 * time.Millisecond,
  103. IdleCheckFrequency: 500 * time.Millisecond,
  104. }
  105. }
  106. func redisRingOptions() *redis.RingOptions {
  107. return &redis.RingOptions{
  108. Addrs: map[string]string{
  109. "ringShardOne": ":" + ringShard1Port,
  110. "ringShardTwo": ":" + ringShard2Port,
  111. },
  112. DialTimeout: 10 * time.Second,
  113. ReadTimeout: 30 * time.Second,
  114. WriteTimeout: 30 * time.Second,
  115. PoolSize: 10,
  116. PoolTimeout: 30 * time.Second,
  117. IdleTimeout: 500 * time.Millisecond,
  118. IdleCheckFrequency: 500 * time.Millisecond,
  119. }
  120. }
  121. func performAsync(n int, cbs ...func(int)) *sync.WaitGroup {
  122. var wg sync.WaitGroup
  123. for _, cb := range cbs {
  124. for i := 0; i < n; i++ {
  125. wg.Add(1)
  126. go func(cb func(int), i int) {
  127. defer GinkgoRecover()
  128. defer wg.Done()
  129. cb(i)
  130. }(cb, i)
  131. }
  132. }
  133. return &wg
  134. }
  135. func perform(n int, cbs ...func(int)) {
  136. wg := performAsync(n, cbs...)
  137. wg.Wait()
  138. }
  139. func eventually(fn func() error, timeout time.Duration) error {
  140. errCh := make(chan error, 1)
  141. done := make(chan struct{})
  142. exit := make(chan struct{})
  143. go func() {
  144. for {
  145. err := fn()
  146. if err == nil {
  147. close(done)
  148. return
  149. }
  150. select {
  151. case errCh <- err:
  152. default:
  153. }
  154. select {
  155. case <-exit:
  156. return
  157. case <-time.After(timeout / 100):
  158. }
  159. }
  160. }()
  161. select {
  162. case <-done:
  163. return nil
  164. case <-time.After(timeout):
  165. close(exit)
  166. select {
  167. case err := <-errCh:
  168. return err
  169. default:
  170. return fmt.Errorf("timeout after %s without an error", timeout)
  171. }
  172. }
  173. }
  174. func execCmd(name string, args ...string) (*os.Process, error) {
  175. cmd := exec.Command(name, args...)
  176. if testing.Verbose() {
  177. cmd.Stdout = os.Stdout
  178. cmd.Stderr = os.Stderr
  179. }
  180. return cmd.Process, cmd.Start()
  181. }
  182. func connectTo(port string) (*redis.Client, error) {
  183. client := redis.NewClient(&redis.Options{
  184. Addr: ":" + port,
  185. })
  186. err := eventually(func() error {
  187. return client.Ping().Err()
  188. }, 30*time.Second)
  189. if err != nil {
  190. return nil, err
  191. }
  192. return client, nil
  193. }
  194. type redisProcess struct {
  195. *os.Process
  196. *redis.Client
  197. }
  198. func (p *redisProcess) Close() error {
  199. if err := p.Kill(); err != nil {
  200. return err
  201. }
  202. err := eventually(func() error {
  203. if err := p.Client.Ping().Err(); err != nil {
  204. return nil
  205. }
  206. return errors.New("client is not shutdown")
  207. }, 10*time.Second)
  208. if err != nil {
  209. return err
  210. }
  211. p.Client.Close()
  212. return nil
  213. }
  214. var (
  215. redisServerBin, _ = filepath.Abs(filepath.Join("testdata", "redis", "src", "redis-server"))
  216. redisServerConf, _ = filepath.Abs(filepath.Join("testdata", "redis.conf"))
  217. )
  218. func redisDir(port string) (string, error) {
  219. dir, err := filepath.Abs(filepath.Join("testdata", "instances", port))
  220. if err != nil {
  221. return "", err
  222. }
  223. if err := os.RemoveAll(dir); err != nil {
  224. return "", err
  225. }
  226. if err := os.MkdirAll(dir, 0775); err != nil {
  227. return "", err
  228. }
  229. return dir, nil
  230. }
  231. func startRedis(port string, args ...string) (*redisProcess, error) {
  232. dir, err := redisDir(port)
  233. if err != nil {
  234. return nil, err
  235. }
  236. if err = exec.Command("cp", "-f", redisServerConf, dir).Run(); err != nil {
  237. return nil, err
  238. }
  239. baseArgs := []string{filepath.Join(dir, "redis.conf"), "--port", port, "--dir", dir}
  240. process, err := execCmd(redisServerBin, append(baseArgs, args...)...)
  241. if err != nil {
  242. return nil, err
  243. }
  244. client, err := connectTo(port)
  245. if err != nil {
  246. process.Kill()
  247. return nil, err
  248. }
  249. return &redisProcess{process, client}, err
  250. }
  251. func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
  252. dir, err := redisDir(port)
  253. if err != nil {
  254. return nil, err
  255. }
  256. process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
  257. if err != nil {
  258. return nil, err
  259. }
  260. client, err := connectTo(port)
  261. if err != nil {
  262. process.Kill()
  263. return nil, err
  264. }
  265. for _, cmd := range []*redis.StatusCmd{
  266. redis.NewStatusCmd("SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "1"),
  267. redis.NewStatusCmd("SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
  268. redis.NewStatusCmd("SENTINEL", "SET", masterName, "failover-timeout", "1000"),
  269. redis.NewStatusCmd("SENTINEL", "SET", masterName, "parallel-syncs", "1"),
  270. } {
  271. client.Process(cmd)
  272. if err := cmd.Err(); err != nil {
  273. process.Kill()
  274. return nil, err
  275. }
  276. }
  277. return &redisProcess{process, client}, nil
  278. }
  279. //------------------------------------------------------------------------------
  280. type badConnError string
  281. func (e badConnError) Error() string { return string(e) }
  282. func (e badConnError) Timeout() bool { return false }
  283. func (e badConnError) Temporary() bool { return false }
  284. type badConn struct {
  285. net.TCPConn
  286. readDelay, writeDelay time.Duration
  287. readErr, writeErr error
  288. }
  289. var _ net.Conn = &badConn{}
  290. func (cn *badConn) Read([]byte) (int, error) {
  291. if cn.readDelay != 0 {
  292. time.Sleep(cn.readDelay)
  293. }
  294. if cn.readErr != nil {
  295. return 0, cn.readErr
  296. }
  297. return 0, badConnError("bad connection")
  298. }
  299. func (cn *badConn) Write([]byte) (int, error) {
  300. if cn.writeDelay != 0 {
  301. time.Sleep(cn.writeDelay)
  302. }
  303. if cn.writeErr != nil {
  304. return 0, cn.writeErr
  305. }
  306. return 0, badConnError("bad connection")
  307. }