package utils import ( "context" "gd_statistics/common.in/cache" "errors" "fmt" "go.etcd.io/etcd/client" "sync" "time" ) var lockKey = "/gd/lock" var lockValue = "locked" var etcdClient client.Client var keyApi client.KeysAPI var gmutex sync.Mutex var gm = map[string]string{} func SetEtcdKeyApi(c client.Client) { if keyApi == nil { keyApi = client.NewKeysAPI(c) } } func setKey(keyApi client.KeysAPI, key string, value string, prevExist client.PrevExistType) error { opts := &client.SetOptions{ PrevExist: prevExist, TTL: 10 * time.Second, } resp, err := keyApi.Set(context.Background(), key, value, opts) fmt.Printf("set key:%v,%v\n", resp, err) return err } func delKey(keyApi client.KeysAPI, key string) error { resp, err := keyApi.Delete(context.Background(), key, nil) fmt.Printf("del key:%v,%v\n", resp, err) e, _ := err.(client.Error) if e.Code == client.ErrorCodeKeyNotFound { return nil } return err } func lock(realLockKey string) error { var err error err = setKey(keyApi, realLockKey, lockValue, client.PrevNoExist) if err == nil { return nil } return err /* e, _ := err.(client.Error) if e.Code != client.ErrorCodeNodeExist { return err } resp, err := keyApi.Get(context.Background(), realLockKey, nil) if err != nil { return err } watcherOptions := &client.WatcherOptions{ AfterIndex: resp.Index, Recursive: false, } fmt.Printf("wait wait\n") watcher := keyApi.Watcher(realLockKey, watcherOptions) for { resp, err := watcher.Next(context.Background()) if err != nil { return err } if resp.Action == "delete" || resp.Action == "expired" { return errors.New("can lock") } }*/ return nil } func redisLock(key string) error { exist, err := cache.Redis.SetNxEx(key, "-", 10) if exist == false { return errors.New("false") } return err } func redisLockWithTimeout(key string, timeout int) error { if timeout == 0 { timeout = 10 } exist, err := cache.Redis.SetNxEx(key, "-", int64(timeout)) if exist == false { return errors.New("false") } return err } func redisUnlock(key string) error { _, err := cache.Redis.Del(key) return err } func localLock(key string) bool { now := time.Now().Unix() for { gmutex.Lock() if _, ok := gm[key]; ok == false { gm[key] = "-" gmutex.Unlock() return true } gmutex.Unlock() if time.Now().Unix()-now >= 60 { return false } time.Sleep(50 * time.Millisecond) } } func localLockWithTimeout(key string, timeout int) bool { now := time.Now().Unix() for { gmutex.Lock() if _, ok := gm[key]; ok == false { gm[key] = "-" gmutex.Unlock() return true } gmutex.Unlock() if time.Now().Unix()-now >= int64(timeout) { return false } time.Sleep(50 * time.Millisecond) } } func localUnlock(key string) { gmutex.Lock() if _, ok := gm[key]; ok == true { delete(gm, key) } if len(gm) == 0 { gm = map[string]string{} } gmutex.Unlock() } func Lock(flag string) error { realLockKey := lockKey + "/" + flag if localLock(realLockKey) == false { return errors.New("local lock timeout") } count := 0 for { count++ if count > 1000 { localUnlock(realLockKey) return errors.New("max count") } //err := lock(realLockKey) err := redisLock(realLockKey) if err == nil { return nil } time.Sleep(400 * time.Millisecond) /* if err.Error() != "can lock" { return err }*/ } return nil } func LockWithTimeout(flag string, timeout int) error { now := time.Now().Unix() realLockKey := lockKey + "/" + flag if localLockWithTimeout(realLockKey, timeout) == false { return errors.New("local lock timeout") } for { loopNow := time.Now().Unix() //err := lock(realLockKey) err := redisLockWithTimeout(realLockKey, timeout) if err == nil { return nil } if loopNow-now >= int64(timeout) { localUnlock(realLockKey) return errors.New("max count") } time.Sleep(500 * time.Millisecond) /* if err.Error() != "can lock" { return err }*/ } return nil } func UnLock(flag string) error { //return delKey(keyApi, lockKey+"/"+flag) err := redisUnlock(lockKey + "/" + flag) localUnlock(lockKey + "/" + flag) return err }