pubsub_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. package redis_test
  2. import (
  3. "io"
  4. "net"
  5. "sync"
  6. "time"
  7. "gopkg.in/redis.v5"
  8. . "github.com/onsi/ginkgo"
  9. . "github.com/onsi/gomega"
  10. )
  11. var _ = Describe("PubSub", func() {
  12. var client *redis.Client
  13. BeforeEach(func() {
  14. client = redis.NewClient(redisOptions())
  15. Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
  16. })
  17. AfterEach(func() {
  18. Expect(client.Close()).NotTo(HaveOccurred())
  19. })
  20. It("should support pattern matching", func() {
  21. pubsub, err := client.PSubscribe("mychannel*")
  22. Expect(err).NotTo(HaveOccurred())
  23. defer pubsub.Close()
  24. {
  25. msgi, err := pubsub.ReceiveTimeout(time.Second)
  26. Expect(err).NotTo(HaveOccurred())
  27. subscr := msgi.(*redis.Subscription)
  28. Expect(subscr.Kind).To(Equal("psubscribe"))
  29. Expect(subscr.Channel).To(Equal("mychannel*"))
  30. Expect(subscr.Count).To(Equal(1))
  31. }
  32. {
  33. msgi, err := pubsub.ReceiveTimeout(time.Second)
  34. Expect(err.(net.Error).Timeout()).To(Equal(true))
  35. Expect(msgi).To(BeNil())
  36. }
  37. n, err := client.Publish("mychannel1", "hello").Result()
  38. Expect(err).NotTo(HaveOccurred())
  39. Expect(n).To(Equal(int64(1)))
  40. Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
  41. {
  42. msgi, err := pubsub.ReceiveTimeout(time.Second)
  43. Expect(err).NotTo(HaveOccurred())
  44. subscr := msgi.(*redis.Message)
  45. Expect(subscr.Channel).To(Equal("mychannel1"))
  46. Expect(subscr.Pattern).To(Equal("mychannel*"))
  47. Expect(subscr.Payload).To(Equal("hello"))
  48. }
  49. {
  50. msgi, err := pubsub.ReceiveTimeout(time.Second)
  51. Expect(err).NotTo(HaveOccurred())
  52. subscr := msgi.(*redis.Subscription)
  53. Expect(subscr.Kind).To(Equal("punsubscribe"))
  54. Expect(subscr.Channel).To(Equal("mychannel*"))
  55. Expect(subscr.Count).To(Equal(0))
  56. }
  57. stats := client.PoolStats()
  58. Expect(stats.Requests - stats.Hits).To(Equal(uint32(2)))
  59. })
  60. It("should pub/sub channels", func() {
  61. channels, err := client.PubSubChannels("mychannel*").Result()
  62. Expect(err).NotTo(HaveOccurred())
  63. Expect(channels).To(BeEmpty())
  64. pubsub, err := client.Subscribe("mychannel", "mychannel2")
  65. Expect(err).NotTo(HaveOccurred())
  66. defer pubsub.Close()
  67. channels, err = client.PubSubChannels("mychannel*").Result()
  68. Expect(err).NotTo(HaveOccurred())
  69. Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
  70. channels, err = client.PubSubChannels("").Result()
  71. Expect(err).NotTo(HaveOccurred())
  72. Expect(channels).To(BeEmpty())
  73. channels, err = client.PubSubChannels("*").Result()
  74. Expect(err).NotTo(HaveOccurred())
  75. Expect(len(channels)).To(BeNumerically(">=", 2))
  76. })
  77. It("should return the numbers of subscribers", func() {
  78. pubsub, err := client.Subscribe("mychannel", "mychannel2")
  79. Expect(err).NotTo(HaveOccurred())
  80. defer pubsub.Close()
  81. channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
  82. Expect(err).NotTo(HaveOccurred())
  83. Expect(channels).To(Equal(map[string]int64{
  84. "mychannel": 1,
  85. "mychannel2": 1,
  86. "mychannel3": 0,
  87. }))
  88. })
  89. It("should return the numbers of subscribers by pattern", func() {
  90. num, err := client.PubSubNumPat().Result()
  91. Expect(err).NotTo(HaveOccurred())
  92. Expect(num).To(Equal(int64(0)))
  93. pubsub, err := client.PSubscribe("*")
  94. Expect(err).NotTo(HaveOccurred())
  95. defer pubsub.Close()
  96. num, err = client.PubSubNumPat().Result()
  97. Expect(err).NotTo(HaveOccurred())
  98. Expect(num).To(Equal(int64(1)))
  99. })
  100. It("should pub/sub", func() {
  101. pubsub, err := client.Subscribe("mychannel", "mychannel2")
  102. Expect(err).NotTo(HaveOccurred())
  103. defer pubsub.Close()
  104. {
  105. msgi, err := pubsub.ReceiveTimeout(time.Second)
  106. Expect(err).NotTo(HaveOccurred())
  107. subscr := msgi.(*redis.Subscription)
  108. Expect(subscr.Kind).To(Equal("subscribe"))
  109. Expect(subscr.Channel).To(Equal("mychannel"))
  110. Expect(subscr.Count).To(Equal(1))
  111. }
  112. {
  113. msgi, err := pubsub.ReceiveTimeout(time.Second)
  114. Expect(err).NotTo(HaveOccurred())
  115. subscr := msgi.(*redis.Subscription)
  116. Expect(subscr.Kind).To(Equal("subscribe"))
  117. Expect(subscr.Channel).To(Equal("mychannel2"))
  118. Expect(subscr.Count).To(Equal(2))
  119. }
  120. {
  121. msgi, err := pubsub.ReceiveTimeout(time.Second)
  122. Expect(err.(net.Error).Timeout()).To(Equal(true))
  123. Expect(msgi).NotTo(HaveOccurred())
  124. }
  125. n, err := client.Publish("mychannel", "hello").Result()
  126. Expect(err).NotTo(HaveOccurred())
  127. Expect(n).To(Equal(int64(1)))
  128. n, err = client.Publish("mychannel2", "hello2").Result()
  129. Expect(err).NotTo(HaveOccurred())
  130. Expect(n).To(Equal(int64(1)))
  131. Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
  132. {
  133. msgi, err := pubsub.ReceiveTimeout(time.Second)
  134. Expect(err).NotTo(HaveOccurred())
  135. subscr := msgi.(*redis.Message)
  136. Expect(subscr.Channel).To(Equal("mychannel"))
  137. Expect(subscr.Payload).To(Equal("hello"))
  138. }
  139. {
  140. msgi, err := pubsub.ReceiveTimeout(time.Second)
  141. Expect(err).NotTo(HaveOccurred())
  142. msg := msgi.(*redis.Message)
  143. Expect(msg.Channel).To(Equal("mychannel2"))
  144. Expect(msg.Payload).To(Equal("hello2"))
  145. }
  146. {
  147. msgi, err := pubsub.ReceiveTimeout(time.Second)
  148. Expect(err).NotTo(HaveOccurred())
  149. subscr := msgi.(*redis.Subscription)
  150. Expect(subscr.Kind).To(Equal("unsubscribe"))
  151. Expect(subscr.Channel).To(Equal("mychannel"))
  152. Expect(subscr.Count).To(Equal(1))
  153. }
  154. {
  155. msgi, err := pubsub.ReceiveTimeout(time.Second)
  156. Expect(err).NotTo(HaveOccurred())
  157. subscr := msgi.(*redis.Subscription)
  158. Expect(subscr.Kind).To(Equal("unsubscribe"))
  159. Expect(subscr.Channel).To(Equal("mychannel2"))
  160. Expect(subscr.Count).To(Equal(0))
  161. }
  162. stats := client.PoolStats()
  163. Expect(stats.Requests - stats.Hits).To(Equal(uint32(2)))
  164. })
  165. It("should ping/pong", func() {
  166. pubsub, err := client.Subscribe("mychannel")
  167. Expect(err).NotTo(HaveOccurred())
  168. defer pubsub.Close()
  169. _, err = pubsub.ReceiveTimeout(time.Second)
  170. Expect(err).NotTo(HaveOccurred())
  171. err = pubsub.Ping("")
  172. Expect(err).NotTo(HaveOccurred())
  173. msgi, err := pubsub.ReceiveTimeout(time.Second)
  174. Expect(err).NotTo(HaveOccurred())
  175. pong := msgi.(*redis.Pong)
  176. Expect(pong.Payload).To(Equal(""))
  177. })
  178. It("should ping/pong with payload", func() {
  179. pubsub, err := client.Subscribe("mychannel")
  180. Expect(err).NotTo(HaveOccurred())
  181. defer pubsub.Close()
  182. _, err = pubsub.ReceiveTimeout(time.Second)
  183. Expect(err).NotTo(HaveOccurred())
  184. err = pubsub.Ping("hello")
  185. Expect(err).NotTo(HaveOccurred())
  186. msgi, err := pubsub.ReceiveTimeout(time.Second)
  187. Expect(err).NotTo(HaveOccurred())
  188. pong := msgi.(*redis.Pong)
  189. Expect(pong.Payload).To(Equal("hello"))
  190. })
  191. It("should multi-ReceiveMessage", func() {
  192. pubsub, err := client.Subscribe("mychannel")
  193. Expect(err).NotTo(HaveOccurred())
  194. defer pubsub.Close()
  195. err = client.Publish("mychannel", "hello").Err()
  196. Expect(err).NotTo(HaveOccurred())
  197. err = client.Publish("mychannel", "world").Err()
  198. Expect(err).NotTo(HaveOccurred())
  199. msg, err := pubsub.ReceiveMessage()
  200. Expect(err).NotTo(HaveOccurred())
  201. Expect(msg.Channel).To(Equal("mychannel"))
  202. Expect(msg.Payload).To(Equal("hello"))
  203. msg, err = pubsub.ReceiveMessage()
  204. Expect(err).NotTo(HaveOccurred())
  205. Expect(msg.Channel).To(Equal("mychannel"))
  206. Expect(msg.Payload).To(Equal("world"))
  207. })
  208. It("should ReceiveMessage after timeout", func() {
  209. timeout := 100 * time.Millisecond
  210. pubsub, err := client.Subscribe("mychannel")
  211. Expect(err).NotTo(HaveOccurred())
  212. defer pubsub.Close()
  213. done := make(chan bool, 1)
  214. go func() {
  215. defer GinkgoRecover()
  216. defer func() {
  217. done <- true
  218. }()
  219. time.Sleep(timeout + 100*time.Millisecond)
  220. n, err := client.Publish("mychannel", "hello").Result()
  221. Expect(err).NotTo(HaveOccurred())
  222. Expect(n).To(Equal(int64(1)))
  223. }()
  224. msg, err := pubsub.ReceiveMessageTimeout(timeout)
  225. Expect(err).NotTo(HaveOccurred())
  226. Expect(msg.Channel).To(Equal("mychannel"))
  227. Expect(msg.Payload).To(Equal("hello"))
  228. Eventually(done).Should(Receive())
  229. stats := client.PoolStats()
  230. Expect(stats.Requests).To(Equal(uint32(3)))
  231. Expect(stats.Hits).To(Equal(uint32(1)))
  232. })
  233. expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
  234. cn, _, err := pubsub.Pool().Get()
  235. Expect(err).NotTo(HaveOccurred())
  236. cn.SetNetConn(&badConn{
  237. readErr: io.EOF,
  238. writeErr: io.EOF,
  239. })
  240. pubsub.Pool().Put(cn)
  241. done := make(chan bool, 1)
  242. go func() {
  243. defer GinkgoRecover()
  244. defer func() {
  245. done <- true
  246. }()
  247. time.Sleep(100 * time.Millisecond)
  248. err := client.Publish("mychannel", "hello").Err()
  249. Expect(err).NotTo(HaveOccurred())
  250. }()
  251. msg, err := pubsub.ReceiveMessage()
  252. Expect(err).NotTo(HaveOccurred())
  253. Expect(msg.Channel).To(Equal("mychannel"))
  254. Expect(msg.Payload).To(Equal("hello"))
  255. Eventually(done).Should(Receive())
  256. stats := client.PoolStats()
  257. Expect(stats.Requests).To(Equal(uint32(4)))
  258. Expect(stats.Hits).To(Equal(uint32(1)))
  259. }
  260. It("Subscribe should reconnect on ReceiveMessage error", func() {
  261. pubsub, err := client.Subscribe("mychannel")
  262. Expect(err).NotTo(HaveOccurred())
  263. defer pubsub.Close()
  264. expectReceiveMessageOnError(pubsub)
  265. })
  266. It("PSubscribe should reconnect on ReceiveMessage error", func() {
  267. pubsub, err := client.PSubscribe("mychannel")
  268. Expect(err).NotTo(HaveOccurred())
  269. defer pubsub.Close()
  270. expectReceiveMessageOnError(pubsub)
  271. })
  272. It("should return on Close", func() {
  273. pubsub, err := client.Subscribe("mychannel")
  274. Expect(err).NotTo(HaveOccurred())
  275. defer pubsub.Close()
  276. var wg sync.WaitGroup
  277. wg.Add(1)
  278. go func() {
  279. defer GinkgoRecover()
  280. wg.Done()
  281. defer wg.Done()
  282. _, err := pubsub.ReceiveMessage()
  283. Expect(err).To(HaveOccurred())
  284. Expect(err).To(SatisfyAny(
  285. MatchError("redis: client is closed"),
  286. MatchError("use of closed network connection"), // Go 1.4
  287. ))
  288. }()
  289. wg.Wait()
  290. wg.Add(1)
  291. err = pubsub.Close()
  292. Expect(err).NotTo(HaveOccurred())
  293. wg.Wait()
  294. })
  295. It("should ReceiveMessage without a subscription", func() {
  296. timeout := 100 * time.Millisecond
  297. pubsub, err := client.Subscribe()
  298. Expect(err).NotTo(HaveOccurred())
  299. defer pubsub.Close()
  300. go func() {
  301. defer GinkgoRecover()
  302. time.Sleep(2 * timeout)
  303. err = pubsub.Subscribe("mychannel")
  304. Expect(err).NotTo(HaveOccurred())
  305. err := client.Publish("mychannel", "hello").Err()
  306. Expect(err).NotTo(HaveOccurred())
  307. }()
  308. msg, err := pubsub.ReceiveMessageTimeout(timeout)
  309. Expect(err).NotTo(HaveOccurred())
  310. Expect(msg.Channel).To(Equal("mychannel"))
  311. Expect(msg.Payload).To(Equal("hello"))
  312. })
  313. })