|
- // Copyright 2018 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package tester
- import (
- "context"
- "fmt"
- "time"
- "github.com/coreos/etcd/clientv3"
- "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- "github.com/coreos/etcd/functional/rpcpb"
- "go.uber.org/zap"
- "google.golang.org/grpc"
- )
- type leaseExpireChecker struct {
- ctype rpcpb.Checker
- lg *zap.Logger
- m *rpcpb.Member
- ls *leaseStresser
- cli *clientv3.Client
- }
- func newLeaseExpireChecker(ls *leaseStresser) Checker {
- return &leaseExpireChecker{
- ctype: rpcpb.Checker_LEASE_EXPIRE,
- lg: ls.lg,
- m: ls.m,
- ls: ls,
- }
- }
- func (lc *leaseExpireChecker) Type() rpcpb.Checker {
- return lc.ctype
- }
- func (lc *leaseExpireChecker) EtcdClientEndpoints() []string {
- return []string{lc.m.EtcdClientEndpoint}
- }
- func (lc *leaseExpireChecker) Check() error {
- if lc.ls == nil {
- return nil
- }
- if lc.ls != nil &&
- (lc.ls.revokedLeases == nil ||
- lc.ls.aliveLeases == nil ||
- lc.ls.shortLivedLeases == nil) {
- return nil
- }
- cli, err := lc.m.CreateEtcdClient(grpc.WithBackoffMaxDelay(time.Second))
- if err != nil {
- return fmt.Errorf("%v (%q)", err, lc.m.EtcdClientEndpoint)
- }
- defer func() {
- if cli != nil {
- cli.Close()
- }
- }()
- lc.cli = cli
- if err := lc.check(true, lc.ls.revokedLeases.leases); err != nil {
- return err
- }
- if err := lc.check(false, lc.ls.aliveLeases.leases); err != nil {
- return err
- }
- return lc.checkShortLivedLeases()
- }
- const leaseExpireCheckerTimeout = 10 * time.Second
- // checkShortLivedLeases ensures leases expire.
- func (lc *leaseExpireChecker) checkShortLivedLeases() error {
- ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout)
- errc := make(chan error)
- defer cancel()
- for leaseID := range lc.ls.shortLivedLeases.leases {
- go func(id int64) {
- errc <- lc.checkShortLivedLease(ctx, id)
- }(leaseID)
- }
- var errs []error
- for range lc.ls.shortLivedLeases.leases {
- if err := <-errc; err != nil {
- errs = append(errs, err)
- }
- }
- return errsToError(errs)
- }
- func (lc *leaseExpireChecker) checkShortLivedLease(ctx context.Context, leaseID int64) (err error) {
- // retry in case of transient failure or lease is expired but not yet revoked due to the fact that etcd cluster didn't have enought time to delete it.
- var resp *clientv3.LeaseTimeToLiveResponse
- for i := 0; i < retries; i++ {
- resp, err = lc.getLeaseByID(ctx, leaseID)
- // lease not found, for ~v3.1 compatibilities, check ErrLeaseNotFound
- if (err == nil && resp.TTL == -1) || (err != nil && rpctypes.Error(err) == rpctypes.ErrLeaseNotFound) {
- return nil
- }
- if err != nil {
- lc.lg.Debug(
- "retrying; Lease TimeToLive failed",
- zap.Int("retries", i),
- zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
- zap.Error(err),
- )
- continue
- }
- if resp.TTL > 0 {
- dur := time.Duration(resp.TTL) * time.Second
- lc.lg.Debug(
- "lease has not been expired, wait until expire",
- zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
- zap.Int64("ttl", resp.TTL),
- zap.Duration("wait-duration", dur),
- )
- time.Sleep(dur)
- } else {
- lc.lg.Debug(
- "lease expired but not yet revoked",
- zap.Int("retries", i),
- zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
- zap.Int64("ttl", resp.TTL),
- zap.Duration("wait-duration", time.Second),
- )
- time.Sleep(time.Second)
- }
- if err = lc.checkLease(ctx, false, leaseID); err != nil {
- continue
- }
- return nil
- }
- return err
- }
- func (lc *leaseExpireChecker) checkLease(ctx context.Context, expired bool, leaseID int64) error {
- keysExpired, err := lc.hasKeysAttachedToLeaseExpired(ctx, leaseID)
- if err != nil {
- lc.lg.Warn(
- "hasKeysAttachedToLeaseExpired failed",
- zap.String("endpoint", lc.m.EtcdClientEndpoint),
- zap.Error(err),
- )
- return err
- }
- leaseExpired, err := lc.hasLeaseExpired(ctx, leaseID)
- if err != nil {
- lc.lg.Warn(
- "hasLeaseExpired failed",
- zap.String("endpoint", lc.m.EtcdClientEndpoint),
- zap.Error(err),
- )
- return err
- }
- if leaseExpired != keysExpired {
- return fmt.Errorf("lease %v expiration mismatch (lease expired=%v, keys expired=%v)", leaseID, leaseExpired, keysExpired)
- }
- if leaseExpired != expired {
- return fmt.Errorf("lease %v expected expired=%v, got %v", leaseID, expired, leaseExpired)
- }
- return nil
- }
- func (lc *leaseExpireChecker) check(expired bool, leases map[int64]time.Time) error {
- ctx, cancel := context.WithTimeout(context.Background(), leaseExpireCheckerTimeout)
- defer cancel()
- for leaseID := range leases {
- if err := lc.checkLease(ctx, expired, leaseID); err != nil {
- return err
- }
- }
- return nil
- }
- // TODO: handle failures from "grpc.FailFast(false)"
- func (lc *leaseExpireChecker) getLeaseByID(ctx context.Context, leaseID int64) (*clientv3.LeaseTimeToLiveResponse, error) {
- return lc.cli.TimeToLive(
- ctx,
- clientv3.LeaseID(leaseID),
- clientv3.WithAttachedKeys(),
- )
- }
- func (lc *leaseExpireChecker) hasLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
- // keep retrying until lease's state is known or ctx is being canceled
- for ctx.Err() == nil {
- resp, err := lc.getLeaseByID(ctx, leaseID)
- if err != nil {
- // for ~v3.1 compatibilities
- if rpctypes.Error(err) == rpctypes.ErrLeaseNotFound {
- return true, nil
- }
- } else {
- return resp.TTL == -1, nil
- }
- lc.lg.Warn(
- "hasLeaseExpired getLeaseByID failed",
- zap.String("endpoint", lc.m.EtcdClientEndpoint),
- zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
- zap.Error(err),
- )
- }
- return false, ctx.Err()
- }
- // The keys attached to the lease has the format of "<leaseID>_<idx>" where idx is the ordering key creation
- // Since the format of keys contains about leaseID, finding keys base on "<leaseID>" prefix
- // determines whether the attached keys for a given leaseID has been deleted or not
- func (lc *leaseExpireChecker) hasKeysAttachedToLeaseExpired(ctx context.Context, leaseID int64) (bool, error) {
- resp, err := lc.cli.Get(ctx, fmt.Sprintf("%d", leaseID), clientv3.WithPrefix())
- if err != nil {
- lc.lg.Warn(
- "hasKeysAttachedToLeaseExpired failed",
- zap.String("endpoint", lc.m.EtcdClientEndpoint),
- zap.String("lease-id", fmt.Sprintf("%016x", leaseID)),
- zap.Error(err),
- )
- return false, err
- }
- return len(resp.Kvs) == 0, nil
- }
|