main.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. // etcd-proxy is a proxy layer that simulates various network conditions.
  15. package main
  16. import (
  17. "context"
  18. "flag"
  19. "fmt"
  20. "net/http"
  21. "net/url"
  22. "os"
  23. "os/signal"
  24. "syscall"
  25. "time"
  26. "github.com/coreos/etcd/pkg/proxy"
  27. "go.uber.org/zap"
  28. )
  29. var from string
  30. var to string
  31. var httpPort int
  32. var verbose bool
  33. func main() {
  34. // TODO: support TLS
  35. flag.StringVar(&from, "from", "localhost:23790", "Address URL to proxy from.")
  36. flag.StringVar(&to, "to", "localhost:2379", "Address URL to forward.")
  37. flag.IntVar(&httpPort, "http-port", 2378, "Port to serve etcd-proxy API.")
  38. flag.BoolVar(&verbose, "verbose", false, "'true' to run proxy in verbose mode.")
  39. flag.Usage = func() {
  40. fmt.Fprintf(os.Stderr, "Usage of %q:\n", os.Args[0])
  41. fmt.Fprintln(os.Stderr, `
  42. etcd-proxy simulates various network conditions for etcd testing purposes.
  43. See README.md for more examples.
  44. Example:
  45. # build etcd
  46. $ ./build
  47. $ ./bin/etcd
  48. # build etcd-proxy
  49. $ make build-etcd-proxy
  50. # to test etcd with proxy layer
  51. $ ./bin/etcd-proxy --help
  52. $ ./bin/etcd-proxy --from localhost:23790 --to localhost:2379 --http-port 2378 --verbose
  53. $ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:2379 put foo bar
  54. $ ETCDCTL_API=3 ./bin/etcdctl --endpoints localhost:23790 put foo bar`)
  55. flag.PrintDefaults()
  56. }
  57. flag.Parse()
  58. cfg := proxy.ServerConfig{
  59. From: url.URL{Scheme: "tcp", Host: from},
  60. To: url.URL{Scheme: "tcp", Host: to},
  61. }
  62. if verbose {
  63. cfg.Logger = zap.NewExample()
  64. }
  65. p := proxy.NewServer(cfg)
  66. <-p.Ready()
  67. defer p.Close()
  68. mux := http.NewServeMux()
  69. mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
  70. w.Write([]byte(fmt.Sprintf("proxying [%s -> %s]\n", p.From(), p.To())))
  71. })
  72. mux.HandleFunc("/delay-tx", func(w http.ResponseWriter, req *http.Request) {
  73. switch req.Method {
  74. case http.MethodGet:
  75. w.Write([]byte(fmt.Sprintf("current send latency %v\n", p.LatencyTx())))
  76. case http.MethodPut, http.MethodPost:
  77. if err := req.ParseForm(); err != nil {
  78. w.Write([]byte(fmt.Sprintf("wrong form %q\n", err.Error())))
  79. return
  80. }
  81. lat, err := time.ParseDuration(req.PostForm.Get("latency"))
  82. if err != nil {
  83. w.Write([]byte(fmt.Sprintf("wrong latency form %q\n", err.Error())))
  84. return
  85. }
  86. rv, err := time.ParseDuration(req.PostForm.Get("random-variable"))
  87. if err != nil {
  88. w.Write([]byte(fmt.Sprintf("wrong random-variable form %q\n", err.Error())))
  89. return
  90. }
  91. p.DelayTx(lat, rv)
  92. w.Write([]byte(fmt.Sprintf("added send latency %v±%v (current latency %v)\n", lat, rv, p.LatencyTx())))
  93. case http.MethodDelete:
  94. lat := p.LatencyTx()
  95. p.UndelayTx()
  96. w.Write([]byte(fmt.Sprintf("removed latency %v\n", lat)))
  97. default:
  98. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  99. }
  100. })
  101. mux.HandleFunc("/delay-rx", func(w http.ResponseWriter, req *http.Request) {
  102. switch req.Method {
  103. case http.MethodGet:
  104. w.Write([]byte(fmt.Sprintf("current receive latency %v\n", p.LatencyRx())))
  105. case http.MethodPut, http.MethodPost:
  106. if err := req.ParseForm(); err != nil {
  107. w.Write([]byte(fmt.Sprintf("wrong form %q\n", err.Error())))
  108. return
  109. }
  110. lat, err := time.ParseDuration(req.PostForm.Get("latency"))
  111. if err != nil {
  112. w.Write([]byte(fmt.Sprintf("wrong latency form %q\n", err.Error())))
  113. return
  114. }
  115. rv, err := time.ParseDuration(req.PostForm.Get("random-variable"))
  116. if err != nil {
  117. w.Write([]byte(fmt.Sprintf("wrong random-variable form %q\n", err.Error())))
  118. return
  119. }
  120. p.DelayRx(lat, rv)
  121. w.Write([]byte(fmt.Sprintf("added receive latency %v±%v (current latency %v)\n", lat, rv, p.LatencyRx())))
  122. case http.MethodDelete:
  123. lat := p.LatencyRx()
  124. p.UndelayRx()
  125. w.Write([]byte(fmt.Sprintf("removed latency %v\n", lat)))
  126. default:
  127. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  128. }
  129. })
  130. mux.HandleFunc("/pause-tx", func(w http.ResponseWriter, req *http.Request) {
  131. switch req.Method {
  132. case http.MethodPut, http.MethodPost:
  133. p.PauseTx()
  134. w.Write([]byte(fmt.Sprintf("paused forwarding [%s -> %s]\n", p.From(), p.To())))
  135. case http.MethodDelete:
  136. p.UnpauseTx()
  137. w.Write([]byte(fmt.Sprintf("unpaused forwarding [%s -> %s]\n", p.From(), p.To())))
  138. default:
  139. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  140. }
  141. })
  142. mux.HandleFunc("/pause-rx", func(w http.ResponseWriter, req *http.Request) {
  143. switch req.Method {
  144. case http.MethodPut, http.MethodPost:
  145. p.PauseRx()
  146. w.Write([]byte(fmt.Sprintf("paused forwarding [%s <- %s]\n", p.From(), p.To())))
  147. case http.MethodDelete:
  148. p.UnpauseRx()
  149. w.Write([]byte(fmt.Sprintf("unpaused forwarding [%s <- %s]\n", p.From(), p.To())))
  150. default:
  151. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  152. }
  153. })
  154. mux.HandleFunc("/blackhole-tx", func(w http.ResponseWriter, req *http.Request) {
  155. switch req.Method {
  156. case http.MethodPut, http.MethodPost:
  157. p.BlackholeTx()
  158. w.Write([]byte(fmt.Sprintf("blackholed; dropping packets [%s -> %s]\n", p.From(), p.To())))
  159. case http.MethodDelete:
  160. p.UnblackholeTx()
  161. w.Write([]byte(fmt.Sprintf("unblackholed; restart forwarding [%s -> %s]\n", p.From(), p.To())))
  162. default:
  163. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  164. }
  165. })
  166. mux.HandleFunc("/blackhole-rx", func(w http.ResponseWriter, req *http.Request) {
  167. switch req.Method {
  168. case http.MethodPut, http.MethodPost:
  169. p.BlackholeRx()
  170. w.Write([]byte(fmt.Sprintf("blackholed; dropping packets [%s <- %s]\n", p.From(), p.To())))
  171. case http.MethodDelete:
  172. p.UnblackholeRx()
  173. w.Write([]byte(fmt.Sprintf("unblackholed; restart forwarding [%s <- %s]\n", p.From(), p.To())))
  174. default:
  175. w.Write([]byte(fmt.Sprintf("unsupported method %q\n", req.Method)))
  176. }
  177. })
  178. srv := &http.Server{
  179. Addr: fmt.Sprintf(":%d", httpPort),
  180. Handler: mux,
  181. }
  182. defer srv.Close()
  183. sig := make(chan os.Signal, 1)
  184. signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
  185. defer signal.Stop(sig)
  186. go func() {
  187. s := <-sig
  188. fmt.Printf("\n\nreceived signal %q, shutting down HTTP server\n\n", s)
  189. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  190. err := srv.Shutdown(ctx)
  191. cancel()
  192. fmt.Printf("gracefully stopped HTTP server with %v\n\n", err)
  193. os.Exit(0)
  194. }()
  195. fmt.Printf("\nserving HTTP server http://localhost:%d\n\n", httpPort)
  196. err := srv.ListenAndServe()
  197. fmt.Printf("HTTP server exit with error %v\n", err)
  198. }