123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- // Copyright 2018 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package tester
- import (
- "context"
- "fmt"
- "math/rand"
- "reflect"
- "sync"
- "sync/atomic"
- "time"
- "github.com/coreos/etcd/clientv3"
- "github.com/coreos/etcd/etcdserver"
- "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- "github.com/coreos/etcd/functional/rpcpb"
- "go.uber.org/zap"
- "golang.org/x/time/rate"
- "google.golang.org/grpc"
- )
- type keyStresser struct {
- stype rpcpb.Stresser
- lg *zap.Logger
- m *rpcpb.Member
- keySize int
- keyLargeSize int
- keySuffixRange int
- keyTxnSuffixRange int
- keyTxnOps int
- rateLimiter *rate.Limiter
- wg sync.WaitGroup
- clientsN int
- ctx context.Context
- cancel func()
- cli *clientv3.Client
- emu sync.RWMutex
- ems map[string]int
- paused bool
- // atomicModifiedKeys records the number of keys created and deleted by the stresser.
- atomicModifiedKeys int64
- stressTable *stressTable
- }
- func (s *keyStresser) Stress() error {
- var err error
- s.cli, err = s.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(1 * time.Second))
- if err != nil {
- return fmt.Errorf("%v (%q)", err, s.m.EtcdClientEndpoint)
- }
- s.ctx, s.cancel = context.WithCancel(context.Background())
- s.wg.Add(s.clientsN)
- var stressEntries = []stressEntry{
- {weight: 0.7, f: newStressPut(s.cli, s.keySuffixRange, s.keySize)},
- {
- weight: 0.7 * float32(s.keySize) / float32(s.keyLargeSize),
- f: newStressPut(s.cli, s.keySuffixRange, s.keyLargeSize),
- },
- {weight: 0.07, f: newStressRange(s.cli, s.keySuffixRange)},
- {weight: 0.07, f: newStressRangeInterval(s.cli, s.keySuffixRange)},
- {weight: 0.07, f: newStressDelete(s.cli, s.keySuffixRange)},
- {weight: 0.07, f: newStressDeleteInterval(s.cli, s.keySuffixRange)},
- }
- if s.keyTxnSuffixRange > 0 {
- // adjust to make up ±70% of workloads with writes
- stressEntries[0].weight = 0.35
- stressEntries = append(stressEntries, stressEntry{
- weight: 0.35,
- f: newStressTxn(s.cli, s.keyTxnSuffixRange, s.keyTxnOps),
- })
- }
- s.stressTable = createStressTable(stressEntries)
- s.emu.Lock()
- s.paused = false
- s.ems = make(map[string]int, 100)
- s.emu.Unlock()
- for i := 0; i < s.clientsN; i++ {
- go s.run()
- }
- s.lg.Info(
- "stress START",
- zap.String("stress-type", s.stype.String()),
- zap.String("endpoint", s.m.EtcdClientEndpoint),
- )
- return nil
- }
- func (s *keyStresser) run() {
- defer s.wg.Done()
- for {
- if err := s.rateLimiter.Wait(s.ctx); err == context.Canceled {
- return
- }
- // TODO: 10-second is enough timeout to cover leader failure
- // and immediate leader election. Find out what other cases this
- // could be timed out.
- sctx, scancel := context.WithTimeout(s.ctx, 10*time.Second)
- err, modifiedKeys := s.stressTable.choose()(sctx)
- scancel()
- if err == nil {
- atomic.AddInt64(&s.atomicModifiedKeys, modifiedKeys)
- continue
- }
- switch rpctypes.ErrorDesc(err) {
- case context.DeadlineExceeded.Error():
- // This retries when request is triggered at the same time as
- // leader failure. When we terminate the leader, the request to
- // that leader cannot be processed, and times out. Also requests
- // to followers cannot be forwarded to the old leader, so timing out
- // as well. We want to keep stressing until the cluster elects a
- // new leader and start processing requests again.
- case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
- // This retries when request is triggered at the same time as
- // leader failure and follower nodes receive time out errors
- // from losing their leader. Followers should retry to connect
- // to the new leader.
- case etcdserver.ErrStopped.Error():
- // one of the etcd nodes stopped from failure injection
- // case transport.ErrConnClosing.Desc:
- // // server closed the transport (failure injected node)
- case rpctypes.ErrNotCapable.Error():
- // capability check has not been done (in the beginning)
- case rpctypes.ErrTooManyRequests.Error():
- // hitting the recovering member.
- case context.Canceled.Error():
- // from stresser.Cancel method:
- return
- case grpc.ErrClientConnClosing.Error():
- // from stresser.Cancel method:
- return
- default:
- s.lg.Warn(
- "stress run exiting",
- zap.String("stress-type", s.stype.String()),
- zap.String("endpoint", s.m.EtcdClientEndpoint),
- zap.String("error-type", reflect.TypeOf(err).String()),
- zap.Error(err),
- )
- return
- }
- // only record errors before pausing stressers
- s.emu.Lock()
- if !s.paused {
- s.ems[err.Error()]++
- }
- s.emu.Unlock()
- }
- }
- func (s *keyStresser) Pause() map[string]int {
- return s.Close()
- }
- func (s *keyStresser) Close() map[string]int {
- s.cancel()
- s.cli.Close()
- s.wg.Wait()
- s.emu.Lock()
- s.paused = true
- ess := s.ems
- s.ems = make(map[string]int, 100)
- s.emu.Unlock()
- s.lg.Info(
- "stress STOP",
- zap.String("stress-type", s.stype.String()),
- zap.String("endpoint", s.m.EtcdClientEndpoint),
- )
- return ess
- }
- func (s *keyStresser) ModifiedKeys() int64 {
- return atomic.LoadInt64(&s.atomicModifiedKeys)
- }
- type stressFunc func(ctx context.Context) (err error, modifiedKeys int64)
- type stressEntry struct {
- weight float32
- f stressFunc
- }
- type stressTable struct {
- entries []stressEntry
- sumWeights float32
- }
- func createStressTable(entries []stressEntry) *stressTable {
- st := stressTable{entries: entries}
- for _, entry := range st.entries {
- st.sumWeights += entry.weight
- }
- return &st
- }
- func (st *stressTable) choose() stressFunc {
- v := rand.Float32() * st.sumWeights
- var sum float32
- var idx int
- for i := range st.entries {
- sum += st.entries[i].weight
- if sum >= v {
- idx = i
- break
- }
- }
- return st.entries[idx].f
- }
- func newStressPut(cli *clientv3.Client, keySuffixRange, keySize int) stressFunc {
- return func(ctx context.Context) (error, int64) {
- _, err := cli.Put(
- ctx,
- fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)),
- string(randBytes(keySize)),
- )
- return err, 1
- }
- }
- func newStressTxn(cli *clientv3.Client, keyTxnSuffixRange, txnOps int) stressFunc {
- keys := make([]string, keyTxnSuffixRange)
- for i := range keys {
- keys[i] = fmt.Sprintf("/k%03d", i)
- }
- return writeTxn(cli, keys, txnOps)
- }
- func writeTxn(cli *clientv3.Client, keys []string, txnOps int) stressFunc {
- return func(ctx context.Context) (error, int64) {
- ks := make(map[string]struct{}, txnOps)
- for len(ks) != txnOps {
- ks[keys[rand.Intn(len(keys))]] = struct{}{}
- }
- selected := make([]string, 0, txnOps)
- for k := range ks {
- selected = append(selected, k)
- }
- com, delOp, putOp := getTxnOps(selected[0], "bar00")
- thenOps := []clientv3.Op{delOp}
- elseOps := []clientv3.Op{putOp}
- for i := 1; i < txnOps; i++ { // nested txns
- k, v := selected[i], fmt.Sprintf("bar%02d", i)
- com, delOp, putOp = getTxnOps(k, v)
- txnOp := clientv3.OpTxn(
- []clientv3.Cmp{com},
- []clientv3.Op{delOp},
- []clientv3.Op{putOp},
- )
- thenOps = append(thenOps, txnOp)
- elseOps = append(elseOps, txnOp)
- }
- _, err := cli.Txn(ctx).
- If(com).
- Then(thenOps...).
- Else(elseOps...).
- Commit()
- return err, int64(txnOps)
- }
- }
- func getTxnOps(k, v string) (
- cmp clientv3.Cmp,
- dop clientv3.Op,
- pop clientv3.Op) {
- // if key exists (version > 0)
- cmp = clientv3.Compare(clientv3.Version(k), ">", 0)
- dop = clientv3.OpDelete(k)
- pop = clientv3.OpPut(k, v)
- return cmp, dop, pop
- }
- func newStressRange(cli *clientv3.Client, keySuffixRange int) stressFunc {
- return func(ctx context.Context) (error, int64) {
- _, err := cli.Get(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
- return err, 0
- }
- }
- func newStressRangeInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
- return func(ctx context.Context) (error, int64) {
- start := rand.Intn(keySuffixRange)
- end := start + 500
- _, err := cli.Get(
- ctx,
- fmt.Sprintf("foo%016x", start),
- clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
- )
- return err, 0
- }
- }
- func newStressDelete(cli *clientv3.Client, keySuffixRange int) stressFunc {
- return func(ctx context.Context) (error, int64) {
- _, err := cli.Delete(ctx, fmt.Sprintf("foo%016x", rand.Intn(keySuffixRange)))
- return err, 1
- }
- }
- func newStressDeleteInterval(cli *clientv3.Client, keySuffixRange int) stressFunc {
- return func(ctx context.Context) (error, int64) {
- start := rand.Intn(keySuffixRange)
- end := start + 500
- resp, err := cli.Delete(ctx,
- fmt.Sprintf("foo%016x", start),
- clientv3.WithRange(fmt.Sprintf("foo%016x", end)),
- )
- if err == nil {
- return nil, resp.Deleted
- }
- return err, 0
- }
- }
|