123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 |
- package cache
- import (
- "errors"
- "sync"
- "github.com/go-redis/redis"
- )
- var subMx sync.Mutex
- var subMap = map[string]func(string){}
- func (p *RedisCache) Publish(channel string, message interface{}) (int64, error) {
- if p.isCluster {
- return p.cluster.Publish(channel, message).Result()
- }
- return p.client.Publish(channel, message).Result()
- }
- func (p *RedisCache) RegisterFunc(channel string, f func(string)) {
- subMx.Lock()
- defer subMx.Unlock()
- subMap[channel] = f
- }
- func (p *RedisCache) Subscribe(channels ...string) *redis.PubSub {
- if p.isCluster {
- return p.cluster.Subscribe(channels...)
- }
- return p.client.Subscribe(channels...)
- }
- func (p *RedisCache) SubscribeAndHandle(channels ...string) error {
- var pubsub *redis.PubSub
- if p.isCluster {
- pubsub = p.cluster.Subscribe(channels...)
- } else {
- pubsub = p.client.Subscribe(channels...)
- }
- if pubsub == nil {
- return errors.New("subscribe failed")
- }
- ch := pubsub.Channel()
- for msg := range ch {
- if msg == nil {
- return nil
- }
- subMx.Lock()
- if f, ok := subMap[msg.Channel]; ok {
- f(msg.Payload)
- }
- subMx.Unlock()
- }
- return nil
- }
|