stresser.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "fmt"
  17. "time"
  18. "github.com/coreos/etcd/functional/rpcpb"
  19. "go.uber.org/zap"
  20. )
  21. // Stresser defines stressing client operations.
  22. type Stresser interface {
  23. // Stress starts to stress the etcd cluster
  24. Stress() error
  25. // Pause stops the stresser from sending requests to etcd. Resume by calling Stress.
  26. Pause() map[string]int
  27. // Close releases all of the Stresser's resources.
  28. Close() map[string]int
  29. // ModifiedKeys reports the number of keys created and deleted by stresser
  30. ModifiedKeys() int64
  31. }
  32. // newStresser creates stresser from a comma separated list of stresser types.
  33. func newStresser(clus *Cluster, m *rpcpb.Member) (stressers []Stresser) {
  34. stressers = make([]Stresser, len(clus.Tester.Stressers))
  35. for i, stype := range clus.Tester.Stressers {
  36. clus.lg.Info(
  37. "creating stresser",
  38. zap.String("type", stype),
  39. zap.String("endpoint", m.EtcdClientEndpoint),
  40. )
  41. switch stype {
  42. case "KV":
  43. // TODO: Too intensive stressing clients can panic etcd member with
  44. // 'out of memory' error. Put rate limits in server side.
  45. stressers[i] = &keyStresser{
  46. stype: rpcpb.Stresser_KV,
  47. lg: clus.lg,
  48. m: m,
  49. keySize: int(clus.Tester.StressKeySize),
  50. keyLargeSize: int(clus.Tester.StressKeySizeLarge),
  51. keySuffixRange: int(clus.Tester.StressKeySuffixRange),
  52. keyTxnSuffixRange: int(clus.Tester.StressKeySuffixRangeTxn),
  53. keyTxnOps: int(clus.Tester.StressKeyTxnOps),
  54. clientsN: int(clus.Tester.StressClients),
  55. rateLimiter: clus.rateLimiter,
  56. }
  57. case "LEASE":
  58. stressers[i] = &leaseStresser{
  59. stype: rpcpb.Stresser_LEASE,
  60. lg: clus.lg,
  61. m: m,
  62. numLeases: 10, // TODO: configurable
  63. keysPerLease: 10, // TODO: configurable
  64. rateLimiter: clus.rateLimiter,
  65. }
  66. case "ELECTION_RUNNER":
  67. reqRate := 100
  68. args := []string{
  69. "election",
  70. fmt.Sprintf("%v", time.Now().UnixNano()), // election name as current nano time
  71. "--dial-timeout=10s",
  72. "--endpoints", m.EtcdClientEndpoint,
  73. "--total-client-connections=10",
  74. "--rounds=0", // runs forever
  75. "--req-rate", fmt.Sprintf("%v", reqRate),
  76. }
  77. stressers[i] = newRunnerStresser(
  78. rpcpb.Stresser_ELECTION_RUNNER,
  79. m.EtcdClientEndpoint,
  80. clus.lg,
  81. clus.Tester.RunnerExecPath,
  82. args,
  83. clus.rateLimiter,
  84. reqRate,
  85. )
  86. case "WATCH_RUNNER":
  87. reqRate := 100
  88. args := []string{
  89. "watcher",
  90. "--prefix", fmt.Sprintf("%v", time.Now().UnixNano()), // prefix all keys with nano time
  91. "--total-keys=1",
  92. "--total-prefixes=1",
  93. "--watch-per-prefix=1",
  94. "--endpoints", m.EtcdClientEndpoint,
  95. "--rounds=0", // runs forever
  96. "--req-rate", fmt.Sprintf("%v", reqRate),
  97. }
  98. stressers[i] = newRunnerStresser(
  99. rpcpb.Stresser_WATCH_RUNNER,
  100. m.EtcdClientEndpoint,
  101. clus.lg,
  102. clus.Tester.RunnerExecPath,
  103. args,
  104. clus.rateLimiter,
  105. reqRate,
  106. )
  107. case "LOCK_RACER_RUNNER":
  108. reqRate := 100
  109. args := []string{
  110. "lock-racer",
  111. fmt.Sprintf("%v", time.Now().UnixNano()), // locker name as current nano time
  112. "--endpoints", m.EtcdClientEndpoint,
  113. "--total-client-connections=10",
  114. "--rounds=0", // runs forever
  115. "--req-rate", fmt.Sprintf("%v", reqRate),
  116. }
  117. stressers[i] = newRunnerStresser(
  118. rpcpb.Stresser_LOCK_RACER_RUNNER,
  119. m.EtcdClientEndpoint,
  120. clus.lg,
  121. clus.Tester.RunnerExecPath,
  122. args,
  123. clus.rateLimiter,
  124. reqRate,
  125. )
  126. case "LEASE_RUNNER":
  127. args := []string{
  128. "lease-renewer",
  129. "--ttl=30",
  130. "--endpoints", m.EtcdClientEndpoint,
  131. }
  132. stressers[i] = newRunnerStresser(
  133. rpcpb.Stresser_LEASE_RUNNER,
  134. m.EtcdClientEndpoint,
  135. clus.lg,
  136. clus.Tester.RunnerExecPath,
  137. args,
  138. clus.rateLimiter,
  139. 0,
  140. )
  141. }
  142. }
  143. return stressers
  144. }