key.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package recipe
  15. import (
  16. "context"
  17. "fmt"
  18. "strings"
  19. "time"
  20. v3 "github.com/coreos/etcd/clientv3"
  21. "github.com/coreos/etcd/clientv3/concurrency"
  22. )
  23. // RemoteKV is a key/revision pair created by the client and stored on etcd
  24. type RemoteKV struct {
  25. kv v3.KV
  26. key string
  27. rev int64
  28. val string
  29. }
  30. func newKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) {
  31. return newKV(kv, key, "", leaseID)
  32. }
  33. func newKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) {
  34. rev, err := putNewKV(kv, key, val, leaseID)
  35. if err != nil {
  36. return nil, err
  37. }
  38. return &RemoteKV{kv, key, rev, val}, nil
  39. }
  40. func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
  41. for {
  42. newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
  43. rev, err := putNewKV(kv, newKey, val, 0)
  44. if err == nil {
  45. return &RemoteKV{kv, newKey, rev, val}, nil
  46. }
  47. if err != ErrKeyExists {
  48. return nil, err
  49. }
  50. }
  51. }
  52. // putNewKV attempts to create the given key, only succeeding if the key did
  53. // not yet exist.
  54. func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
  55. cmp := v3.Compare(v3.Version(key), "=", 0)
  56. req := v3.OpPut(key, val, v3.WithLease(leaseID))
  57. txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
  58. if err != nil {
  59. return 0, err
  60. }
  61. if !txnresp.Succeeded {
  62. return 0, ErrKeyExists
  63. }
  64. return txnresp.Header.Revision, nil
  65. }
  66. // newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
  67. // prefix and value. Note: a bookkeeping node __<prefix> is also allocated.
  68. func newSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
  69. resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...)
  70. if err != nil {
  71. return nil, err
  72. }
  73. // add 1 to last key, if any
  74. newSeqNum := 0
  75. if len(resp.Kvs) != 0 {
  76. fields := strings.Split(string(resp.Kvs[0].Key), "/")
  77. _, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
  78. if serr != nil {
  79. return nil, serr
  80. }
  81. newSeqNum++
  82. }
  83. newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum)
  84. // base prefix key must be current (i.e., <=) with the server update;
  85. // the base key is important to avoid the following:
  86. // N1: LastKey() == 1, start txn.
  87. // N2: new Key 2, new Key 3, Delete Key 2
  88. // N1: txn succeeds allocating key 2 when it shouldn't
  89. baseKey := "__" + prefix
  90. // current revision might contain modification so +1
  91. cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1)
  92. reqPrefix := v3.OpPut(baseKey, "")
  93. reqnewKey := v3.OpPut(newKey, val)
  94. txn := kv.Txn(context.TODO())
  95. txnresp, err := txn.If(cmp).Then(reqPrefix, reqnewKey).Commit()
  96. if err != nil {
  97. return nil, err
  98. }
  99. if !txnresp.Succeeded {
  100. return newSequentialKV(kv, prefix, val)
  101. }
  102. return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
  103. }
  104. func (rk *RemoteKV) Key() string { return rk.key }
  105. func (rk *RemoteKV) Revision() int64 { return rk.rev }
  106. func (rk *RemoteKV) Value() string { return rk.val }
  107. func (rk *RemoteKV) Delete() error {
  108. if rk.kv == nil {
  109. return nil
  110. }
  111. _, err := rk.kv.Delete(context.TODO(), rk.key)
  112. rk.kv = nil
  113. return err
  114. }
  115. func (rk *RemoteKV) Put(val string) error {
  116. _, err := rk.kv.Put(context.TODO(), rk.key, val)
  117. return err
  118. }
  119. // EphemeralKV is a new key associated with a session lease
  120. type EphemeralKV struct{ RemoteKV }
  121. // newEphemeralKV creates a new key/value pair associated with a session lease
  122. func newEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) {
  123. k, err := newKV(s.Client(), key, val, s.Lease())
  124. if err != nil {
  125. return nil, err
  126. }
  127. return &EphemeralKV{*k}, nil
  128. }
  129. // newUniqueEphemeralKey creates a new unique valueless key associated with a session lease
  130. func newUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) {
  131. return newUniqueEphemeralKV(s, prefix, "")
  132. }
  133. // newUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
  134. func newUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) {
  135. for {
  136. newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
  137. ek, err = newEphemeralKV(s, newKey, val)
  138. if err == nil || err != ErrKeyExists {
  139. break
  140. }
  141. }
  142. return ek, err
  143. }