123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- // Copyright 2017 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package v2v3
- import (
- "context"
- "strings"
- "github.com/coreos/etcd/clientv3"
- etcdErr "github.com/coreos/etcd/error"
- "github.com/coreos/etcd/store"
- )
- func (s *v2v3Store) Watch(prefix string, recursive, stream bool, sinceIndex uint64) (store.Watcher, error) {
- ctx, cancel := context.WithCancel(s.ctx)
- wch := s.c.Watch(
- ctx,
- // TODO: very pricey; use a single store-wide watch in future
- s.pfx,
- clientv3.WithPrefix(),
- clientv3.WithRev(int64(sinceIndex)),
- clientv3.WithCreatedNotify(),
- clientv3.WithPrevKV())
- resp, ok := <-wch
- if err := resp.Err(); err != nil || !ok {
- cancel()
- return nil, etcdErr.NewError(etcdErr.EcodeRaftInternal, prefix, 0)
- }
- evc, donec := make(chan *store.Event), make(chan struct{})
- go func() {
- defer func() {
- close(evc)
- close(donec)
- }()
- for resp := range wch {
- for _, ev := range s.mkV2Events(resp) {
- k := ev.Node.Key
- if recursive {
- if !strings.HasPrefix(k, prefix) {
- continue
- }
- // accept events on hidden keys given in prefix
- k = strings.Replace(k, prefix, "/", 1)
- // ignore hidden keys deeper than prefix
- if strings.Contains(k, "/_") {
- continue
- }
- }
- if !recursive && k != prefix {
- continue
- }
- select {
- case evc <- ev:
- case <-ctx.Done():
- return
- }
- if !stream {
- return
- }
- }
- }
- }()
- return &v2v3Watcher{
- startRev: resp.Header.Revision,
- evc: evc,
- donec: donec,
- cancel: cancel,
- }, nil
- }
- func (s *v2v3Store) mkV2Events(wr clientv3.WatchResponse) (evs []*store.Event) {
- ak := s.mkActionKey()
- for _, rev := range mkRevs(wr) {
- var act, key *clientv3.Event
- for _, ev := range rev {
- if string(ev.Kv.Key) == ak {
- act = ev
- } else if key != nil && len(key.Kv.Key) < len(ev.Kv.Key) {
- // use longest key to ignore intermediate new
- // directories from Create.
- key = ev
- } else if key == nil {
- key = ev
- }
- }
- v2ev := &store.Event{
- Action: string(act.Kv.Value),
- Node: s.mkV2Node(key.Kv),
- PrevNode: s.mkV2Node(key.PrevKv),
- EtcdIndex: mkV2Rev(wr.Header.Revision),
- }
- evs = append(evs, v2ev)
- }
- return evs
- }
- func mkRevs(wr clientv3.WatchResponse) (revs [][]*clientv3.Event) {
- var curRev []*clientv3.Event
- for _, ev := range wr.Events {
- if curRev != nil && ev.Kv.ModRevision != curRev[0].Kv.ModRevision {
- revs = append(revs, curRev)
- curRev = nil
- }
- curRev = append(curRev, ev)
- }
- if curRev != nil {
- revs = append(revs, curRev)
- }
- return revs
- }
- type v2v3Watcher struct {
- startRev int64
- evc chan *store.Event
- donec chan struct{}
- cancel context.CancelFunc
- }
- func (w *v2v3Watcher) StartIndex() uint64 { return mkV2Rev(w.startRev) }
- func (w *v2v3Watcher) Remove() {
- w.cancel()
- <-w.donec
- }
- func (w *v2v3Watcher) EventChan() chan *store.Event { return w.evc }
|