case.go 7.6 KB


  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package tester
  15. import (
  16. "fmt"
  17. "math/rand"
  18. "time"
  19. "github.com/coreos/etcd/functional/rpcpb"
  20. "go.uber.org/zap"
  21. )
  22. // Case defines failure/test injection interface.
  23. // To add a test case:
  24. // 1. implement "Case" interface
  25. // 2. define fail case name in "rpcpb.Case"
  26. type Case interface {
  27. // Inject injeccts the failure into the testing cluster at the given
  28. // round. When calling the function, the cluster should be in health.
  29. Inject(clus *Cluster) error
  30. // Recover recovers the injected failure caused by the injection of the
  31. // given round and wait for the recovery of the testing cluster.
  32. Recover(clus *Cluster) error
  33. // Desc returns a description of the failure
  34. Desc() string
  35. // TestCase returns "rpcpb.Case" enum type.
  36. TestCase() rpcpb.Case
  37. }
  38. type injectMemberFunc func(*Cluster, int) error
  39. type recoverMemberFunc func(*Cluster, int) error
  40. type caseByFunc struct {
  41. desc string
  42. rpcpbCase rpcpb.Case
  43. injectMember injectMemberFunc
  44. recoverMember recoverMemberFunc
  45. }
  46. func (c *caseByFunc) Desc() string {
  47. if c.desc != "" {
  48. return c.desc
  49. }
  50. return c.rpcpbCase.String()
  51. }
  52. func (c *caseByFunc) TestCase() rpcpb.Case {
  53. return c.rpcpbCase
  54. }
  55. type caseFollower struct {
  56. caseByFunc
  57. last int
  58. lead int
  59. }
  60. func (c *caseFollower) updateIndex(clus *Cluster) error {
  61. lead, err := clus.GetLeader()
  62. if err != nil {
  63. return err
  64. }
  65. c.lead = lead
  66. n := len(clus.Members)
  67. if c.last == -1 { // first run
  68. c.last = clus.rd % n
  69. if c.last == c.lead {
  70. c.last = (c.last + 1) % n
  71. }
  72. } else {
  73. c.last = (c.last + 1) % n
  74. if c.last == c.lead {
  75. c.last = (c.last + 1) % n
  76. }
  77. }
  78. return nil
  79. }
  80. func (c *caseFollower) Inject(clus *Cluster) error {
  81. if err := c.updateIndex(clus); err != nil {
  82. return err
  83. }
  84. return c.injectMember(clus, c.last)
  85. }
  86. func (c *caseFollower) Recover(clus *Cluster) error {
  87. return c.recoverMember(clus, c.last)
  88. }
  89. func (c *caseFollower) Desc() string {
  90. if c.desc != "" {
  91. return c.desc
  92. }
  93. return c.rpcpbCase.String()
  94. }
  95. func (c *caseFollower) TestCase() rpcpb.Case {
  96. return c.rpcpbCase
  97. }
  98. type caseLeader struct {
  99. caseByFunc
  100. last int
  101. lead int
  102. }
  103. func (c *caseLeader) updateIndex(clus *Cluster) error {
  104. lead, err := clus.GetLeader()
  105. if err != nil {
  106. return err
  107. }
  108. c.lead = lead
  109. c.last = lead
  110. return nil
  111. }
  112. func (c *caseLeader) Inject(clus *Cluster) error {
  113. if err := c.updateIndex(clus); err != nil {
  114. return err
  115. }
  116. return c.injectMember(clus, c.last)
  117. }
  118. func (c *caseLeader) Recover(clus *Cluster) error {
  119. return c.recoverMember(clus, c.last)
  120. }
  121. func (c *caseLeader) TestCase() rpcpb.Case {
  122. return c.rpcpbCase
  123. }
  124. type caseQuorum struct {
  125. caseByFunc
  126. injected map[int]struct{}
  127. }
  128. func (c *caseQuorum) Inject(clus *Cluster) error {
  129. c.injected = pickQuorum(len(clus.Members))
  130. for idx := range c.injected {
  131. if err := c.injectMember(clus, idx); err != nil {
  132. return err
  133. }
  134. }
  135. return nil
  136. }
  137. func (c *caseQuorum) Recover(clus *Cluster) error {
  138. for idx := range c.injected {
  139. if err := c.recoverMember(clus, idx); err != nil {
  140. return err
  141. }
  142. }
  143. return nil
  144. }
  145. func (c *caseQuorum) Desc() string {
  146. if c.desc != "" {
  147. return c.desc
  148. }
  149. return c.rpcpbCase.String()
  150. }
  151. func (c *caseQuorum) TestCase() rpcpb.Case {
  152. return c.rpcpbCase
  153. }
  154. func pickQuorum(size int) (picked map[int]struct{}) {
  155. picked = make(map[int]struct{})
  156. r := rand.New(rand.NewSource(time.Now().UnixNano()))
  157. quorum := size/2 + 1
  158. for len(picked) < quorum {
  159. idx := r.Intn(size)
  160. picked[idx] = struct{}{}
  161. }
  162. return picked
  163. }
  164. type caseAll caseByFunc
  165. func (c *caseAll) Inject(clus *Cluster) error {
  166. for i := range clus.Members {
  167. if err := c.injectMember(clus, i); err != nil {
  168. return err
  169. }
  170. }
  171. return nil
  172. }
  173. func (c *caseAll) Recover(clus *Cluster) error {
  174. for i := range clus.Members {
  175. if err := c.recoverMember(clus, i); err != nil {
  176. return err
  177. }
  178. }
  179. return nil
  180. }
  181. func (c *caseAll) Desc() string {
  182. if c.desc != "" {
  183. return c.desc
  184. }
  185. return c.rpcpbCase.String()
  186. }
  187. func (c *caseAll) TestCase() rpcpb.Case {
  188. return c.rpcpbCase
  189. }
  190. // caseUntilSnapshot injects a failure/test and waits for a snapshot event
  191. type caseUntilSnapshot struct {
  192. desc string
  193. rpcpbCase rpcpb.Case
  194. Case
  195. }
  196. // all delay failure cases except the ones failing with latency
  197. // greater than election timeout (trigger leader election and
  198. // cluster keeps operating anyways)
  199. var slowCases = map[rpcpb.Case]bool{
  200. rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER: true,
  201. rpcpb.Case_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
  202. rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ONE_FOLLOWER_UNTIL_TRIGGER_SNAPSHOT: true,
  203. rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER: true,
  204. rpcpb.Case_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
  205. rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_LEADER_UNTIL_TRIGGER_SNAPSHOT: true,
  206. rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_QUORUM: true,
  207. rpcpb.Case_RANDOM_DELAY_PEER_PORT_TX_RX_ALL: true,
  208. }
  209. func (c *caseUntilSnapshot) Inject(clus *Cluster) error {
  210. if err := c.Case.Inject(clus); err != nil {
  211. return err
  212. }
  213. snapshotCount := clus.Members[0].Etcd.SnapshotCount
  214. now := time.Now()
  215. clus.lg.Info(
  216. "trigger snapshot START",
  217. zap.String("desc", c.Desc()),
  218. zap.Int64("etcd-snapshot-count", snapshotCount),
  219. )
  220. // maxRev may fail since failure just injected, retry if failed.
  221. startRev, err := clus.maxRev()
  222. for i := 0; i < 10 && startRev == 0; i++ {
  223. startRev, err = clus.maxRev()
  224. }
  225. if startRev == 0 {
  226. return err
  227. }
  228. lastRev := startRev
  229. // healthy cluster could accept 1000 req/sec at least.
  230. // 3x time to trigger snapshot.
  231. retries := int(snapshotCount) / 1000 * 3
  232. if v, ok := slowCases[c.TestCase()]; v && ok {
  233. // slow network takes more retries
  234. retries *= 5
  235. }
  236. for i := 0; i < retries; i++ {
  237. lastRev, err = clus.maxRev()
  238. // If the number of proposals committed is bigger than snapshot count,
  239. // a new snapshot should have been created.
  240. diff := lastRev - startRev
  241. if diff > snapshotCount {
  242. clus.lg.Info(
  243. "trigger snapshot PASS",
  244. zap.Int("retries", i),
  245. zap.String("desc", c.Desc()),
  246. zap.Int64("committed-entries", diff),
  247. zap.Int64("etcd-snapshot-count", snapshotCount),
  248. zap.Int64("start-revision", startRev),
  249. zap.Int64("last-revision", lastRev),
  250. zap.Duration("took", time.Since(now)),
  251. )
  252. return nil
  253. }
  254. dur := time.Second
  255. if diff < 0 || err != nil {
  256. dur = 3 * time.Second
  257. }
  258. clus.lg.Info(
  259. "trigger snapshot PROGRESS",
  260. zap.Int("retries", i),
  261. zap.Int64("committed-entries", diff),
  262. zap.Int64("etcd-snapshot-count", snapshotCount),
  263. zap.Int64("start-revision", startRev),
  264. zap.Int64("last-revision", lastRev),
  265. zap.Duration("took", time.Since(now)),
  266. zap.Error(err),
  267. )
  268. time.Sleep(dur)
  269. }
  270. return fmt.Errorf("cluster too slow: only %d commits in %d retries", lastRev-startRev, retries)
  271. }
  272. func (c *caseUntilSnapshot) Desc() string {
  273. if c.desc != "" {
  274. return c.desc
  275. }
  276. if c.rpcpbCase.String() != "" {
  277. return c.rpcpbCase.String()
  278. }
  279. return c.Case.Desc()
  280. }
  281. func (c *caseUntilSnapshot) TestCase() rpcpb.Case {
  282. return c.rpcpbCase
  283. }