123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package mvcc
- import (
- "fmt"
- "math"
- "github.com/coreos/etcd/mvcc/mvccpb"
- "github.com/coreos/etcd/pkg/adt"
- )
- var (
- // watchBatchMaxRevs is the maximum distinct revisions that
- // may be sent to an unsynced watcher at a time. Declared as
- // var instead of const for testing purposes.
- watchBatchMaxRevs = 1000
- )
- type eventBatch struct {
- // evs is a batch of revision-ordered events
- evs []mvccpb.Event
- // revs is the minimum unique revisions observed for this batch
- revs int
- // moreRev is first revision with more events following this batch
- moreRev int64
- }
- func (eb *eventBatch) add(ev mvccpb.Event) {
- if eb.revs > watchBatchMaxRevs {
- // maxed out batch size
- return
- }
- if len(eb.evs) == 0 {
- // base case
- eb.revs = 1
- eb.evs = append(eb.evs, ev)
- return
- }
- // revision accounting
- ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
- evRev := ev.Kv.ModRevision
- if evRev > ebRev {
- eb.revs++
- if eb.revs > watchBatchMaxRevs {
- eb.moreRev = evRev
- return
- }
- }
- eb.evs = append(eb.evs, ev)
- }
- type watcherBatch map[*watcher]*eventBatch
- func (wb watcherBatch) add(w *watcher, ev mvccpb.Event) {
- eb := wb[w]
- if eb == nil {
- eb = &eventBatch{}
- wb[w] = eb
- }
- eb.add(ev)
- }
- // newWatcherBatch maps watchers to their matched events. It enables quick
- // events look up by watcher.
- func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
- if len(wg.watchers) == 0 {
- return nil
- }
- wb := make(watcherBatch)
- for _, ev := range evs {
- for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
- if ev.Kv.ModRevision >= w.minRev {
- // don't double notify
- wb.add(w, ev)
- }
- }
- }
- return wb
- }
- type watcherSet map[*watcher]struct{}
- func (w watcherSet) add(wa *watcher) {
- if _, ok := w[wa]; ok {
- panic("add watcher twice!")
- }
- w[wa] = struct{}{}
- }
- func (w watcherSet) union(ws watcherSet) {
- for wa := range ws {
- w.add(wa)
- }
- }
- func (w watcherSet) delete(wa *watcher) {
- if _, ok := w[wa]; !ok {
- panic("removing missing watcher!")
- }
- delete(w, wa)
- }
- type watcherSetByKey map[string]watcherSet
- func (w watcherSetByKey) add(wa *watcher) {
- set := w[string(wa.key)]
- if set == nil {
- set = make(watcherSet)
- w[string(wa.key)] = set
- }
- set.add(wa)
- }
- func (w watcherSetByKey) delete(wa *watcher) bool {
- k := string(wa.key)
- if v, ok := w[k]; ok {
- if _, ok := v[wa]; ok {
- delete(v, wa)
- if len(v) == 0 {
- // remove the set; nothing left
- delete(w, k)
- }
- return true
- }
- }
- return false
- }
- // watcherGroup is a collection of watchers organized by their ranges
- type watcherGroup struct {
- // keyWatchers has the watchers that watch on a single key
- keyWatchers watcherSetByKey
- // ranges has the watchers that watch a range; it is sorted by interval
- ranges adt.IntervalTree
- // watchers is the set of all watchers
- watchers watcherSet
- }
- func newWatcherGroup() watcherGroup {
- return watcherGroup{
- keyWatchers: make(watcherSetByKey),
- ranges: adt.NewIntervalTree(),
- watchers: make(watcherSet),
- }
- }
- // add puts a watcher in the group.
- func (wg *watcherGroup) add(wa *watcher) {
- wg.watchers.add(wa)
- if wa.end == nil {
- wg.keyWatchers.add(wa)
- return
- }
- // interval already registered?
- ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
- if iv := wg.ranges.Find(ivl); iv != nil {
- iv.Val.(watcherSet).add(wa)
- return
- }
- // not registered, put in interval tree
- ws := make(watcherSet)
- ws.add(wa)
- wg.ranges.Insert(ivl, ws)
- }
- // contains is whether the given key has a watcher in the group.
- func (wg *watcherGroup) contains(key string) bool {
- _, ok := wg.keyWatchers[key]
- return ok || wg.ranges.Intersects(adt.NewStringAffinePoint(key))
- }
- // size gives the number of unique watchers in the group.
- func (wg *watcherGroup) size() int { return len(wg.watchers) }
- // delete removes a watcher from the group.
- func (wg *watcherGroup) delete(wa *watcher) bool {
- if _, ok := wg.watchers[wa]; !ok {
- return false
- }
- wg.watchers.delete(wa)
- if wa.end == nil {
- wg.keyWatchers.delete(wa)
- return true
- }
- ivl := adt.NewStringAffineInterval(string(wa.key), string(wa.end))
- iv := wg.ranges.Find(ivl)
- if iv == nil {
- return false
- }
- ws := iv.Val.(watcherSet)
- delete(ws, wa)
- if len(ws) == 0 {
- // remove interval missing watchers
- if ok := wg.ranges.Delete(ivl); !ok {
- panic("could not remove watcher from interval tree")
- }
- }
- return true
- }
- // choose selects watchers from the watcher group to update
- func (wg *watcherGroup) choose(maxWatchers int, curRev, compactRev int64) (*watcherGroup, int64) {
- if len(wg.watchers) < maxWatchers {
- return wg, wg.chooseAll(curRev, compactRev)
- }
- ret := newWatcherGroup()
- for w := range wg.watchers {
- if maxWatchers <= 0 {
- break
- }
- maxWatchers--
- ret.add(w)
- }
- return &ret, ret.chooseAll(curRev, compactRev)
- }
- func (wg *watcherGroup) chooseAll(curRev, compactRev int64) int64 {
- minRev := int64(math.MaxInt64)
- for w := range wg.watchers {
- if w.minRev > curRev {
- // after network partition, possibly choosing future revision watcher from restore operation
- // with watch key "proxy-namespace__lostleader" and revision "math.MaxInt64 - 2"
- // do not panic when such watcher had been moved from "synced" watcher during restore operation
- if !w.restore {
- panic(fmt.Errorf("watcher minimum revision %d should not exceed current revision %d", w.minRev, curRev))
- }
- // mark 'restore' done, since it's chosen
- w.restore = false
- }
- if w.minRev < compactRev {
- select {
- case w.ch <- WatchResponse{WatchID: w.id, CompactRevision: compactRev}:
- w.compacted = true
- wg.delete(w)
- default:
- // retry next time
- }
- continue
- }
- if minRev > w.minRev {
- minRev = w.minRev
- }
- }
- return minRev
- }
- // watcherSetByKey gets the set of watchers that receive events on the given key.
- func (wg *watcherGroup) watcherSetByKey(key string) watcherSet {
- wkeys := wg.keyWatchers[key]
- wranges := wg.ranges.Stab(adt.NewStringAffinePoint(key))
- // zero-copy cases
- switch {
- case len(wranges) == 0:
- // no need to merge ranges or copy; reuse single-key set
- return wkeys
- case len(wranges) == 0 && len(wkeys) == 0:
- return nil
- case len(wranges) == 1 && len(wkeys) == 0:
- return wranges[0].Val.(watcherSet)
- }
- // copy case
- ret := make(watcherSet)
- ret.union(wg.keyWatchers[key])
- for _, item := range wranges {
- ret.union(item.Val.(watcherSet))
- }
- return ret
- }
|