pubsub.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package redis
  2. import (
  3. "fmt"
  4. "net"
  5. "sync"
  6. "time"
  7. "gopkg.in/redis.v5/internal"
  8. "gopkg.in/redis.v5/internal/pool"
  9. )
  10. // PubSub implements Pub/Sub commands as described in
  11. // http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
  12. // multiple goroutines.
  13. type PubSub struct {
  14. base baseClient
  15. cmd *Cmd
  16. mu sync.Mutex
  17. channels []string
  18. patterns []string
  19. }
  20. func (c *PubSub) conn() (*pool.Conn, bool, error) {
  21. cn, isNew, err := c.base.conn()
  22. if err != nil {
  23. return nil, false, err
  24. }
  25. if isNew {
  26. c.resubscribe()
  27. }
  28. return cn, isNew, nil
  29. }
  30. func (c *PubSub) putConn(cn *pool.Conn, err error) {
  31. c.base.putConn(cn, err, true)
  32. }
  33. func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
  34. args := make([]interface{}, 1+len(channels))
  35. args[0] = redisCmd
  36. for i, channel := range channels {
  37. args[1+i] = channel
  38. }
  39. cmd := NewSliceCmd(args...)
  40. cn, _, err := c.conn()
  41. if err != nil {
  42. return err
  43. }
  44. cn.SetWriteTimeout(c.base.opt.WriteTimeout)
  45. err = writeCmd(cn, cmd)
  46. c.putConn(cn, err)
  47. return err
  48. }
  49. // Subscribes the client to the specified channels.
  50. func (c *PubSub) Subscribe(channels ...string) error {
  51. err := c.subscribe("SUBSCRIBE", channels...)
  52. if err == nil {
  53. c.channels = appendIfNotExists(c.channels, channels...)
  54. }
  55. return err
  56. }
  57. // Subscribes the client to the given patterns.
  58. func (c *PubSub) PSubscribe(patterns ...string) error {
  59. err := c.subscribe("PSUBSCRIBE", patterns...)
  60. if err == nil {
  61. c.patterns = appendIfNotExists(c.patterns, patterns...)
  62. }
  63. return err
  64. }
  65. // Unsubscribes the client from the given channels, or from all of
  66. // them if none is given.
  67. func (c *PubSub) Unsubscribe(channels ...string) error {
  68. err := c.subscribe("UNSUBSCRIBE", channels...)
  69. if err == nil {
  70. c.channels = remove(c.channels, channels...)
  71. }
  72. return err
  73. }
  74. // Unsubscribes the client from the given patterns, or from all of
  75. // them if none is given.
  76. func (c *PubSub) PUnsubscribe(patterns ...string) error {
  77. err := c.subscribe("PUNSUBSCRIBE", patterns...)
  78. if err == nil {
  79. c.patterns = remove(c.patterns, patterns...)
  80. }
  81. return err
  82. }
  83. func (c *PubSub) Close() error {
  84. return c.base.Close()
  85. }
  86. func (c *PubSub) Ping(payload ...string) error {
  87. args := []interface{}{"PING"}
  88. if len(payload) == 1 {
  89. args = append(args, payload[0])
  90. }
  91. cmd := NewCmd(args...)
  92. cn, _, err := c.conn()
  93. if err != nil {
  94. return err
  95. }
  96. cn.SetWriteTimeout(c.base.opt.WriteTimeout)
  97. err = writeCmd(cn, cmd)
  98. c.putConn(cn, err)
  99. return err
  100. }
  101. // Message received after a successful subscription to channel.
  102. type Subscription struct {
  103. // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
  104. Kind string
  105. // Channel name we have subscribed to.
  106. Channel string
  107. // Number of channels we are currently subscribed to.
  108. Count int
  109. }
  110. func (m *Subscription) String() string {
  111. return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
  112. }
  113. // Message received as result of a PUBLISH command issued by another client.
  114. type Message struct {
  115. Channel string
  116. Pattern string
  117. Payload string
  118. }
  119. func (m *Message) String() string {
  120. return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
  121. }
  122. // Pong received as result of a PING command issued by another client.
  123. type Pong struct {
  124. Payload string
  125. }
  126. func (p *Pong) String() string {
  127. if p.Payload != "" {
  128. return fmt.Sprintf("Pong<%s>", p.Payload)
  129. }
  130. return "Pong"
  131. }
  132. func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
  133. switch reply := reply.(type) {
  134. case string:
  135. return &Pong{
  136. Payload: reply,
  137. }, nil
  138. case []interface{}:
  139. switch kind := reply[0].(string); kind {
  140. case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
  141. return &Subscription{
  142. Kind: kind,
  143. Channel: reply[1].(string),
  144. Count: int(reply[2].(int64)),
  145. }, nil
  146. case "message":
  147. return &Message{
  148. Channel: reply[1].(string),
  149. Payload: reply[2].(string),
  150. }, nil
  151. case "pmessage":
  152. return &Message{
  153. Pattern: reply[1].(string),
  154. Channel: reply[2].(string),
  155. Payload: reply[3].(string),
  156. }, nil
  157. case "pong":
  158. return &Pong{
  159. Payload: reply[1].(string),
  160. }, nil
  161. default:
  162. return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
  163. }
  164. default:
  165. return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
  166. }
  167. }
  168. // ReceiveTimeout acts like Receive but returns an error if message
  169. // is not received in time. This is low-level API and most clients
  170. // should use ReceiveMessage.
  171. func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
  172. if c.cmd == nil {
  173. c.cmd = NewCmd()
  174. }
  175. cn, _, err := c.conn()
  176. if err != nil {
  177. return nil, err
  178. }
  179. cn.SetReadTimeout(timeout)
  180. err = c.cmd.readReply(cn)
  181. c.putConn(cn, err)
  182. if err != nil {
  183. return nil, err
  184. }
  185. return c.newMessage(c.cmd.Val())
  186. }
  187. // Receive returns a message as a Subscription, Message, Pong or error.
  188. // See PubSub example for details. This is low-level API and most clients
  189. // should use ReceiveMessage.
  190. func (c *PubSub) Receive() (interface{}, error) {
  191. return c.ReceiveTimeout(0)
  192. }
  193. // ReceiveMessage returns a Message or error ignoring Subscription or Pong
  194. // messages. It automatically reconnects to Redis Server and resubscribes
  195. // to channels in case of network errors.
  196. func (c *PubSub) ReceiveMessage() (*Message, error) {
  197. return c.receiveMessage(5 * time.Second)
  198. }
  199. func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
  200. var errNum uint
  201. for {
  202. msgi, err := c.ReceiveTimeout(timeout)
  203. if err != nil {
  204. if !internal.IsNetworkError(err) {
  205. return nil, err
  206. }
  207. errNum++
  208. if errNum < 3 {
  209. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  210. err := c.Ping()
  211. if err != nil {
  212. internal.Logf("PubSub.Ping failed: %s", err)
  213. }
  214. }
  215. } else {
  216. // 3 consequent errors - connection is broken or
  217. // Redis Server is down.
  218. // Sleep to not exceed max number of open connections.
  219. time.Sleep(time.Second)
  220. }
  221. continue
  222. }
  223. // Reset error number, because we received a message.
  224. errNum = 0
  225. switch msg := msgi.(type) {
  226. case *Subscription:
  227. // Ignore.
  228. case *Pong:
  229. // Ignore.
  230. case *Message:
  231. return msg, nil
  232. default:
  233. return nil, fmt.Errorf("redis: unknown message: %T", msgi)
  234. }
  235. }
  236. }
  237. func (c *PubSub) resubscribe() {
  238. if len(c.channels) > 0 {
  239. if err := c.Subscribe(c.channels...); err != nil {
  240. internal.Logf("Subscribe failed: %s", err)
  241. }
  242. }
  243. if len(c.patterns) > 0 {
  244. if err := c.PSubscribe(c.patterns...); err != nil {
  245. internal.Logf("PSubscribe failed: %s", err)
  246. }
  247. }
  248. }
  249. func remove(ss []string, es ...string) []string {
  250. if len(es) == 0 {
  251. return ss[:0]
  252. }
  253. for _, e := range es {
  254. for i, s := range ss {
  255. if s == e {
  256. ss = append(ss[:i], ss[i+1:]...)
  257. break
  258. }
  259. }
  260. }
  261. return ss
  262. }
  263. func appendIfNotExists(ss []string, es ...string) []string {
  264. loop:
  265. for _, e := range es {
  266. for _, s := range ss {
  267. if s == e {
  268. continue loop
  269. }
  270. }
  271. ss = append(ss, e)
  272. }
  273. return ss
  274. }