pubsub.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package cache
  2. import (
  3. "errors"
  4. "sync"
  5. "github.com/go-redis/redis"
  6. )
  7. var subMx sync.Mutex
  8. var subMap = map[string]func(string){}
  9. func (p *RedisCache) Publish(channel string, message interface{}) (int64, error) {
  10. if p.isCluster {
  11. return p.cluster.Publish(channel, message).Result()
  12. }
  13. return p.client.Publish(channel, message).Result()
  14. }
  15. func (p *RedisCache) RegisterFunc(channel string, f func(string)) {
  16. subMx.Lock()
  17. defer subMx.Unlock()
  18. subMap[channel] = f
  19. }
  20. func (p *RedisCache) Subscribe(channels ...string) *redis.PubSub {
  21. if p.isCluster {
  22. return p.cluster.Subscribe(channels...)
  23. }
  24. return p.client.Subscribe(channels...)
  25. }
  26. func (p *RedisCache) SubscribeAndHandle(channels ...string) error {
  27. var pubsub *redis.PubSub
  28. if p.isCluster {
  29. pubsub = p.cluster.Subscribe(channels...)
  30. } else {
  31. pubsub = p.client.Subscribe(channels...)
  32. }
  33. if pubsub == nil {
  34. return errors.New("subscribe failed")
  35. }
  36. ch := pubsub.Channel()
  37. for msg := range ch {
  38. if msg == nil {
  39. return nil
  40. }
  41. subMx.Lock()
  42. if f, ok := subMap[msg.Channel]; ok {
  43. f(msg.Payload)
  44. }
  45. subMx.Unlock()
  46. }
  47. return nil
  48. }