123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- package utils
- import (
- "context"
- "gd_statistics/common.in/cache"
- "errors"
- "fmt"
- "go.etcd.io/etcd/client"
- "sync"
- "time"
- )
- var lockKey = "/gd/lock"
- var lockValue = "locked"
- var etcdClient client.Client
- var keyApi client.KeysAPI
- var gmutex sync.Mutex
- var gm = map[string]string{}
- func SetEtcdKeyApi(c client.Client) {
- if keyApi == nil {
- keyApi = client.NewKeysAPI(c)
- }
- }
- func setKey(keyApi client.KeysAPI, key string, value string, prevExist client.PrevExistType) error {
- opts := &client.SetOptions{
- PrevExist: prevExist,
- TTL: 10 * time.Second,
- }
- resp, err := keyApi.Set(context.Background(), key, value, opts)
- fmt.Printf("set key:%v,%v\n", resp, err)
- return err
- }
- func delKey(keyApi client.KeysAPI, key string) error {
- resp, err := keyApi.Delete(context.Background(), key, nil)
- fmt.Printf("del key:%v,%v\n", resp, err)
- e, _ := err.(client.Error)
- if e.Code == client.ErrorCodeKeyNotFound {
- return nil
- }
- return err
- }
- func lock(realLockKey string) error {
- var err error
- err = setKey(keyApi, realLockKey, lockValue, client.PrevNoExist)
- if err == nil {
- return nil
- }
- return err
- /*
- e, _ := err.(client.Error)
- if e.Code != client.ErrorCodeNodeExist {
- return err
- }
- resp, err := keyApi.Get(context.Background(), realLockKey, nil)
- if err != nil {
- return err
- }
- watcherOptions := &client.WatcherOptions{
- AfterIndex: resp.Index,
- Recursive: false,
- }
- fmt.Printf("wait wait\n")
- watcher := keyApi.Watcher(realLockKey, watcherOptions)
- for {
- resp, err := watcher.Next(context.Background())
- if err != nil {
- return err
- }
- if resp.Action == "delete" || resp.Action == "expired" {
- return errors.New("can lock")
- }
- }*/
- return nil
- }
- func redisLock(key string) error {
- exist, err := cache.Redis.SetNxEx(key, "-", 10)
- if exist == false {
- return errors.New("false")
- }
- return err
- }
- func redisLockWithTimeout(key string, timeout int) error {
- if timeout == 0 {
- timeout = 10
- }
- exist, err := cache.Redis.SetNxEx(key, "-", int64(timeout))
- if exist == false {
- return errors.New("false")
- }
- return err
- }
- func redisUnlock(key string) error {
- _, err := cache.Redis.Del(key)
- return err
- }
- func localLock(key string) bool {
- now := time.Now().Unix()
- for {
- gmutex.Lock()
- if _, ok := gm[key]; ok == false {
- gm[key] = "-"
- gmutex.Unlock()
- return true
- }
- gmutex.Unlock()
- if time.Now().Unix()-now >= 60 {
- return false
- }
- time.Sleep(50 * time.Millisecond)
- }
- }
- func localLockWithTimeout(key string, timeout int) bool {
- now := time.Now().Unix()
- for {
- gmutex.Lock()
- if _, ok := gm[key]; ok == false {
- gm[key] = "-"
- gmutex.Unlock()
- return true
- }
- gmutex.Unlock()
- if time.Now().Unix()-now >= int64(timeout) {
- return false
- }
- time.Sleep(50 * time.Millisecond)
- }
- }
- func localUnlock(key string) {
- gmutex.Lock()
- if _, ok := gm[key]; ok == true {
- delete(gm, key)
- }
- if len(gm) == 0 {
- gm = map[string]string{}
- }
- gmutex.Unlock()
- }
- func Lock(flag string) error {
- realLockKey := lockKey + "/" + flag
- if localLock(realLockKey) == false {
- return errors.New("local lock timeout")
- }
- count := 0
- for {
- count++
- if count > 1000 {
- localUnlock(realLockKey)
- return errors.New("max count")
- }
- //err := lock(realLockKey)
- err := redisLock(realLockKey)
- if err == nil {
- return nil
- }
- time.Sleep(400 * time.Millisecond)
- /*
- if err.Error() != "can lock" {
- return err
- }*/
- }
- return nil
- }
- func LockWithTimeout(flag string, timeout int) error {
- now := time.Now().Unix()
- realLockKey := lockKey + "/" + flag
- if localLockWithTimeout(realLockKey, timeout) == false {
- return errors.New("local lock timeout")
- }
- for {
- loopNow := time.Now().Unix()
- //err := lock(realLockKey)
- err := redisLockWithTimeout(realLockKey, timeout)
- if err == nil {
- return nil
- }
- if loopNow-now >= int64(timeout) {
- localUnlock(realLockKey)
- return errors.New("max count")
- }
- time.Sleep(500 * time.Millisecond)
- /*
- if err.Error() != "can lock" {
- return err
- }*/
- }
- return nil
- }
- func UnLock(flag string) error {
- //return delKey(keyApi, lockKey+"/"+flag)
- err := redisUnlock(lockKey + "/" + flag)
- localUnlock(lockKey + "/" + flag)
- return err
- }
|