123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510 |
- // Copyright 2015 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package mvcc
- import (
- "context"
- "encoding/binary"
- "errors"
- "hash/crc32"
- "math"
- "sync"
- "sync/atomic"
- "time"
- "github.com/coreos/etcd/lease"
- "github.com/coreos/etcd/mvcc/backend"
- "github.com/coreos/etcd/mvcc/mvccpb"
- "github.com/coreos/etcd/pkg/schedule"
- "github.com/coreos/pkg/capnslog"
- )
- var (
- keyBucketName = []byte("key")
- metaBucketName = []byte("meta")
- consistentIndexKeyName = []byte("consistent_index")
- scheduledCompactKeyName = []byte("scheduledCompactRev")
- finishedCompactKeyName = []byte("finishedCompactRev")
- ErrCompacted = errors.New("mvcc: required revision has been compacted")
- ErrFutureRev = errors.New("mvcc: required revision is a future revision")
- ErrCanceled = errors.New("mvcc: watcher is canceled")
- ErrClosed = errors.New("mvcc: closed")
- plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "mvcc")
- )
- const (
- // markedRevBytesLen is the byte length of marked revision.
- // The first `revBytesLen` bytes represents a normal revision. The last
- // one byte is the mark.
- markedRevBytesLen = revBytesLen + 1
- markBytePosition = markedRevBytesLen - 1
- markTombstone byte = 't'
- )
- var restoreChunkKeys = 10000 // non-const for testing
- // ConsistentIndexGetter is an interface that wraps the Get method.
- // Consistent index is the offset of an entry in a consistent replicated log.
- type ConsistentIndexGetter interface {
- // ConsistentIndex returns the consistent index of current executing entry.
- ConsistentIndex() uint64
- }
- type store struct {
- ReadView
- WriteView
- // consistentIndex caches the "consistent_index" key's value. Accessed
- // through atomics so must be 64-bit aligned.
- consistentIndex uint64
- // mu read locks for txns and write locks for non-txn store changes.
- mu sync.RWMutex
- ig ConsistentIndexGetter
- b backend.Backend
- kvindex index
- le lease.Lessor
- // revMuLock protects currentRev and compactMainRev.
- // Locked at end of write txn and released after write txn unlock lock.
- // Locked before locking read txn and released after locking.
- revMu sync.RWMutex
- // currentRev is the revision of the last completed transaction.
- currentRev int64
- // compactMainRev is the main revision of the last compaction.
- compactMainRev int64
- // bytesBuf8 is a byte slice of length 8
- // to avoid a repetitive allocation in saveIndex.
- bytesBuf8 []byte
- fifoSched schedule.Scheduler
- stopc chan struct{}
- }
- // NewStore returns a new store. It is useful to create a store inside
- // mvcc pkg. It should only be used for testing externally.
- func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
- s := &store{
- b: b,
- ig: ig,
- kvindex: newTreeIndex(),
- le: le,
- currentRev: 1,
- compactMainRev: -1,
- bytesBuf8: make([]byte, 8),
- fifoSched: schedule.NewFIFOScheduler(),
- stopc: make(chan struct{}),
- }
- s.ReadView = &readView{s}
- s.WriteView = &writeView{s}
- if s.le != nil {
- s.le.SetRangeDeleter(func() lease.TxnDelete { return s.Write() })
- }
- tx := s.b.BatchTx()
- tx.Lock()
- tx.UnsafeCreateBucket(keyBucketName)
- tx.UnsafeCreateBucket(metaBucketName)
- tx.Unlock()
- s.b.ForceCommit()
- if err := s.restore(); err != nil {
- // TODO: return the error instead of panic here?
- panic("failed to recover store from backend")
- }
- return s
- }
- func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
- if ctx == nil || ctx.Err() != nil {
- s.mu.Lock()
- select {
- case <-s.stopc:
- default:
- f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
- s.fifoSched.Schedule(f)
- }
- s.mu.Unlock()
- return
- }
- close(ch)
- }
- func (s *store) Hash() (hash uint32, revision int64, err error) {
- start := time.Now()
- s.b.ForceCommit()
- h, err := s.b.Hash(DefaultIgnores)
- hashDurations.Observe(time.Since(start).Seconds())
- return h, s.currentRev, err
- }
- func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
- start := time.Now()
- s.mu.RLock()
- s.revMu.RLock()
- compactRev, currentRev = s.compactMainRev, s.currentRev
- s.revMu.RUnlock()
- if rev > 0 && rev <= compactRev {
- s.mu.RUnlock()
- return 0, 0, compactRev, ErrCompacted
- } else if rev > 0 && rev > currentRev {
- s.mu.RUnlock()
- return 0, currentRev, 0, ErrFutureRev
- }
- if rev == 0 {
- rev = currentRev
- }
- keep := s.kvindex.Keep(rev)
- tx := s.b.ReadTx()
- tx.Lock()
- defer tx.Unlock()
- s.mu.RUnlock()
- upper := revision{main: rev + 1}
- lower := revision{main: compactRev + 1}
- h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
- h.Write(keyBucketName)
- err = tx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
- kr := bytesToRev(k)
- if !upper.GreaterThan(kr) {
- return nil
- }
- // skip revisions that are scheduled for deletion
- // due to compacting; don't skip if there isn't one.
- if lower.GreaterThan(kr) && len(keep) > 0 {
- if _, ok := keep[kr]; !ok {
- return nil
- }
- }
- h.Write(k)
- h.Write(v)
- return nil
- })
- hash = h.Sum32()
- hashRevDurations.Observe(time.Since(start).Seconds())
- return hash, currentRev, compactRev, err
- }
- func (s *store) Compact(rev int64) (<-chan struct{}, error) {
- s.mu.Lock()
- defer s.mu.Unlock()
- s.revMu.Lock()
- defer s.revMu.Unlock()
- if rev <= s.compactMainRev {
- ch := make(chan struct{})
- f := func(ctx context.Context) { s.compactBarrier(ctx, ch) }
- s.fifoSched.Schedule(f)
- return ch, ErrCompacted
- }
- if rev > s.currentRev {
- return nil, ErrFutureRev
- }
- start := time.Now()
- s.compactMainRev = rev
- rbytes := newRevBytes()
- revToBytes(revision{main: rev}, rbytes)
- tx := s.b.BatchTx()
- tx.Lock()
- tx.UnsafePut(metaBucketName, scheduledCompactKeyName, rbytes)
- tx.Unlock()
- // ensure that desired compaction is persisted
- s.b.ForceCommit()
- keep := s.kvindex.Compact(rev)
- ch := make(chan struct{})
- var j = func(ctx context.Context) {
- if ctx.Err() != nil {
- s.compactBarrier(ctx, ch)
- return
- }
- if !s.scheduleCompaction(rev, keep) {
- s.compactBarrier(nil, ch)
- return
- }
- close(ch)
- }
- s.fifoSched.Schedule(j)
- indexCompactionPauseDurations.Observe(float64(time.Since(start) / time.Millisecond))
- return ch, nil
- }
- // DefaultIgnores is a map of keys to ignore in hash checking.
- var DefaultIgnores map[backend.IgnoreKey]struct{}
- func init() {
- DefaultIgnores = map[backend.IgnoreKey]struct{}{
- // consistent index might be changed due to v2 internal sync, which
- // is not controllable by the user.
- {Bucket: string(metaBucketName), Key: string(consistentIndexKeyName)}: {},
- }
- }
- func (s *store) Commit() {
- s.mu.Lock()
- defer s.mu.Unlock()
- tx := s.b.BatchTx()
- tx.Lock()
- s.saveIndex(tx)
- tx.Unlock()
- s.b.ForceCommit()
- }
- func (s *store) Restore(b backend.Backend) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- close(s.stopc)
- s.fifoSched.Stop()
- atomic.StoreUint64(&s.consistentIndex, 0)
- s.b = b
- s.kvindex = newTreeIndex()
- s.currentRev = 1
- s.compactMainRev = -1
- s.fifoSched = schedule.NewFIFOScheduler()
- s.stopc = make(chan struct{})
- return s.restore()
- }
- func (s *store) restore() error {
- b := s.b
- reportDbTotalSizeInBytesMu.Lock()
- reportDbTotalSizeInBytes = func() float64 { return float64(b.Size()) }
- reportDbTotalSizeInBytesMu.Unlock()
- reportDbTotalSizeInUseInBytesMu.Lock()
- reportDbTotalSizeInUseInBytes = func() float64 { return float64(b.SizeInUse()) }
- reportDbTotalSizeInUseInBytesMu.Unlock()
- min, max := newRevBytes(), newRevBytes()
- revToBytes(revision{main: 1}, min)
- revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
- keyToLease := make(map[string]lease.LeaseID)
- // restore index
- tx := s.b.BatchTx()
- tx.Lock()
- _, finishedCompactBytes := tx.UnsafeRange(metaBucketName, finishedCompactKeyName, nil, 0)
- if len(finishedCompactBytes) != 0 {
- s.compactMainRev = bytesToRev(finishedCompactBytes[0]).main
- plog.Printf("restore compact to %d", s.compactMainRev)
- }
- _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
- scheduledCompact := int64(0)
- if len(scheduledCompactBytes) != 0 {
- scheduledCompact = bytesToRev(scheduledCompactBytes[0]).main
- }
- // index keys concurrently as they're loaded in from tx
- keysGauge.Set(0)
- rkvc, revc := restoreIntoIndex(s.kvindex)
- for {
- keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
- if len(keys) == 0 {
- break
- }
- // rkvc blocks if the total pending keys exceeds the restore
- // chunk size to keep keys from consuming too much memory.
- restoreChunk(rkvc, keys, vals, keyToLease)
- if len(keys) < restoreChunkKeys {
- // partial set implies final set
- break
- }
- // next set begins after where this one ended
- newMin := bytesToRev(keys[len(keys)-1][:revBytesLen])
- newMin.sub++
- revToBytes(newMin, min)
- }
- close(rkvc)
- s.currentRev = <-revc
- // keys in the range [compacted revision -N, compaction] might all be deleted due to compaction.
- // the correct revision should be set to compaction revision in the case, not the largest revision
- // we have seen.
- if s.currentRev < s.compactMainRev {
- s.currentRev = s.compactMainRev
- }
- if scheduledCompact <= s.compactMainRev {
- scheduledCompact = 0
- }
- for key, lid := range keyToLease {
- if s.le == nil {
- panic("no lessor to attach lease")
- }
- err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
- if err != nil {
- plog.Errorf("unexpected Attach error: %v", err)
- }
- }
- tx.Unlock()
- if scheduledCompact != 0 {
- s.Compact(scheduledCompact)
- plog.Printf("resume scheduled compaction at %d", scheduledCompact)
- }
- return nil
- }
- type revKeyValue struct {
- key []byte
- kv mvccpb.KeyValue
- kstr string
- }
- func restoreIntoIndex(idx index) (chan<- revKeyValue, <-chan int64) {
- rkvc, revc := make(chan revKeyValue, restoreChunkKeys), make(chan int64, 1)
- go func() {
- currentRev := int64(1)
- defer func() { revc <- currentRev }()
- // restore the tree index from streaming the unordered index.
- kiCache := make(map[string]*keyIndex, restoreChunkKeys)
- for rkv := range rkvc {
- ki, ok := kiCache[rkv.kstr]
- // purge kiCache if many keys but still missing in the cache
- if !ok && len(kiCache) >= restoreChunkKeys {
- i := 10
- for k := range kiCache {
- delete(kiCache, k)
- if i--; i == 0 {
- break
- }
- }
- }
- // cache miss, fetch from tree index if there
- if !ok {
- ki = &keyIndex{key: rkv.kv.Key}
- if idxKey := idx.KeyIndex(ki); idxKey != nil {
- kiCache[rkv.kstr], ki = idxKey, idxKey
- ok = true
- }
- }
- rev := bytesToRev(rkv.key)
- currentRev = rev.main
- if ok {
- if isTombstone(rkv.key) {
- ki.tombstone(rev.main, rev.sub)
- continue
- }
- ki.put(rev.main, rev.sub)
- } else if !isTombstone(rkv.key) {
- ki.restore(revision{rkv.kv.CreateRevision, 0}, rev, rkv.kv.Version)
- idx.Insert(ki)
- kiCache[rkv.kstr] = ki
- }
- }
- }()
- return rkvc, revc
- }
- func restoreChunk(kvc chan<- revKeyValue, keys, vals [][]byte, keyToLease map[string]lease.LeaseID) {
- for i, key := range keys {
- rkv := revKeyValue{key: key}
- if err := rkv.kv.Unmarshal(vals[i]); err != nil {
- plog.Fatalf("cannot unmarshal event: %v", err)
- }
- rkv.kstr = string(rkv.kv.Key)
- if isTombstone(key) {
- delete(keyToLease, rkv.kstr)
- } else if lid := lease.LeaseID(rkv.kv.Lease); lid != lease.NoLease {
- keyToLease[rkv.kstr] = lid
- } else {
- delete(keyToLease, rkv.kstr)
- }
- kvc <- rkv
- }
- }
- func (s *store) Close() error {
- close(s.stopc)
- s.fifoSched.Stop()
- return nil
- }
- func (s *store) saveIndex(tx backend.BatchTx) {
- if s.ig == nil {
- return
- }
- bs := s.bytesBuf8
- ci := s.ig.ConsistentIndex()
- binary.BigEndian.PutUint64(bs, ci)
- // put the index into the underlying backend
- // tx has been locked in TxnBegin, so there is no need to lock it again
- tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs)
- atomic.StoreUint64(&s.consistentIndex, ci)
- }
- func (s *store) ConsistentIndex() uint64 {
- if ci := atomic.LoadUint64(&s.consistentIndex); ci > 0 {
- return ci
- }
- tx := s.b.BatchTx()
- tx.Lock()
- defer tx.Unlock()
- _, vs := tx.UnsafeRange(metaBucketName, consistentIndexKeyName, nil, 0)
- if len(vs) == 0 {
- return 0
- }
- v := binary.BigEndian.Uint64(vs[0])
- atomic.StoreUint64(&s.consistentIndex, v)
- return v
- }
- // appendMarkTombstone appends tombstone mark to normal revision bytes.
- func appendMarkTombstone(b []byte) []byte {
- if len(b) != revBytesLen {
- plog.Panicf("cannot append mark to non normal revision bytes")
- }
- return append(b, markTombstone)
- }
- // isTombstone checks whether the revision bytes is a tombstone.
- func isTombstone(b []byte) bool {
- return len(b) == markedRevBytesLen && b[markBytePosition] == markTombstone
- }
|