server_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597
  1. package ochttp
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "crypto/tls"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net"
  11. "net/http"
  12. "net/http/httptest"
  13. "strings"
  14. "sync"
  15. "testing"
  16. "time"
  17. "golang.org/x/net/http2"
  18. "go.opencensus.io/stats/view"
  19. "go.opencensus.io/trace"
  20. )
  21. func httpHandler(statusCode, respSize int) http.Handler {
  22. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  23. w.WriteHeader(statusCode)
  24. body := make([]byte, respSize)
  25. w.Write(body)
  26. })
  27. }
  28. func updateMean(mean float64, sample, count int) float64 {
  29. if count == 1 {
  30. return float64(sample)
  31. }
  32. return mean + (float64(sample)-mean)/float64(count)
  33. }
  34. func TestHandlerStatsCollection(t *testing.T) {
  35. if err := view.Register(DefaultServerViews...); err != nil {
  36. t.Fatalf("Failed to register ochttp.DefaultServerViews error: %v", err)
  37. }
  38. views := []string{
  39. "opencensus.io/http/server/request_count",
  40. "opencensus.io/http/server/latency",
  41. "opencensus.io/http/server/request_bytes",
  42. "opencensus.io/http/server/response_bytes",
  43. }
  44. // TODO: test latency measurements?
  45. tests := []struct {
  46. name, method, target string
  47. count, statusCode, reqSize, respSize int
  48. }{
  49. {"get 200", "GET", "http://opencensus.io/request/one", 10, 200, 512, 512},
  50. {"post 503", "POST", "http://opencensus.io/request/two", 5, 503, 1024, 16384},
  51. {"no body 302", "GET", "http://opencensus.io/request/three", 2, 302, 0, 0},
  52. }
  53. totalCount, meanReqSize, meanRespSize := 0, 0.0, 0.0
  54. for _, test := range tests {
  55. t.Run(test.name, func(t *testing.T) {
  56. body := bytes.NewBuffer(make([]byte, test.reqSize))
  57. r := httptest.NewRequest(test.method, test.target, body)
  58. w := httptest.NewRecorder()
  59. mux := http.NewServeMux()
  60. mux.Handle("/request/", httpHandler(test.statusCode, test.respSize))
  61. h := &Handler{
  62. Handler: mux,
  63. StartOptions: trace.StartOptions{
  64. Sampler: trace.NeverSample(),
  65. },
  66. }
  67. for i := 0; i < test.count; i++ {
  68. h.ServeHTTP(w, r)
  69. totalCount++
  70. // Distributions do not track sum directly, we must
  71. // mimic their behaviour to avoid rounding failures.
  72. meanReqSize = updateMean(meanReqSize, test.reqSize, totalCount)
  73. meanRespSize = updateMean(meanRespSize, test.respSize, totalCount)
  74. }
  75. })
  76. }
  77. for _, viewName := range views {
  78. v := view.Find(viewName)
  79. if v == nil {
  80. t.Errorf("view not found %q", viewName)
  81. continue
  82. }
  83. rows, err := view.RetrieveData(viewName)
  84. if err != nil {
  85. t.Error(err)
  86. continue
  87. }
  88. if got, want := len(rows), 1; got != want {
  89. t.Errorf("len(%q) = %d; want %d", viewName, got, want)
  90. continue
  91. }
  92. data := rows[0].Data
  93. var count int
  94. var sum float64
  95. switch data := data.(type) {
  96. case *view.CountData:
  97. count = int(data.Value)
  98. case *view.DistributionData:
  99. count = int(data.Count)
  100. sum = data.Sum()
  101. default:
  102. t.Errorf("Unknown data type: %v", data)
  103. continue
  104. }
  105. if got, want := count, totalCount; got != want {
  106. t.Fatalf("%s = %d; want %d", viewName, got, want)
  107. }
  108. // We can only check sum for distribution views.
  109. switch viewName {
  110. case "opencensus.io/http/server/request_bytes":
  111. if got, want := sum, meanReqSize*float64(totalCount); got != want {
  112. t.Fatalf("%s = %g; want %g", viewName, got, want)
  113. }
  114. case "opencensus.io/http/server/response_bytes":
  115. if got, want := sum, meanRespSize*float64(totalCount); got != want {
  116. t.Fatalf("%s = %g; want %g", viewName, got, want)
  117. }
  118. }
  119. }
  120. }
  121. type testResponseWriterHijacker struct {
  122. httptest.ResponseRecorder
  123. }
  124. func (trw *testResponseWriterHijacker) Hijack() (net.Conn, *bufio.ReadWriter, error) {
  125. return nil, nil, nil
  126. }
  127. func TestUnitTestHandlerProxiesHijack(t *testing.T) {
  128. tests := []struct {
  129. w http.ResponseWriter
  130. hasHijack bool
  131. }{
  132. {httptest.NewRecorder(), false},
  133. {nil, false},
  134. {new(testResponseWriterHijacker), true},
  135. }
  136. for i, tt := range tests {
  137. tw := &trackingResponseWriter{writer: tt.w}
  138. w := tw.wrappedResponseWriter()
  139. _, ttHijacker := w.(http.Hijacker)
  140. if want, have := tt.hasHijack, ttHijacker; want != have {
  141. t.Errorf("#%d Hijack got %t, want %t", i, have, want)
  142. }
  143. }
  144. }
  145. // Integration test with net/http to ensure that our Handler proxies to its
  146. // response the call to (http.Hijack).Hijacker() and that that successfully
  147. // passes with HTTP/1.1 connections. See Issue #642
  148. func TestHandlerProxiesHijack_HTTP1(t *testing.T) {
  149. cst := httptest.NewServer(&Handler{
  150. Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  151. var writeMsg func(string)
  152. defer func() {
  153. err := recover()
  154. writeMsg(fmt.Sprintf("Proto=%s\npanic=%v", r.Proto, err != nil))
  155. }()
  156. conn, _, _ := w.(http.Hijacker).Hijack()
  157. writeMsg = func(msg string) {
  158. fmt.Fprintf(conn, "%s 200\nContentLength: %d", r.Proto, len(msg))
  159. fmt.Fprintf(conn, "\r\n\r\n%s", msg)
  160. conn.Close()
  161. }
  162. }),
  163. })
  164. defer cst.Close()
  165. testCases := []struct {
  166. name string
  167. tr *http.Transport
  168. want string
  169. }{
  170. {
  171. name: "http1-transport",
  172. tr: new(http.Transport),
  173. want: "Proto=HTTP/1.1\npanic=false",
  174. },
  175. {
  176. name: "http2-transport",
  177. tr: func() *http.Transport {
  178. tr := new(http.Transport)
  179. http2.ConfigureTransport(tr)
  180. return tr
  181. }(),
  182. want: "Proto=HTTP/1.1\npanic=false",
  183. },
  184. }
  185. for _, tc := range testCases {
  186. c := &http.Client{Transport: &Transport{Base: tc.tr}}
  187. res, err := c.Get(cst.URL)
  188. if err != nil {
  189. t.Errorf("(%s) unexpected error %v", tc.name, err)
  190. continue
  191. }
  192. blob, _ := ioutil.ReadAll(res.Body)
  193. res.Body.Close()
  194. if g, w := string(blob), tc.want; g != w {
  195. t.Errorf("(%s) got = %q; want = %q", tc.name, g, w)
  196. }
  197. }
  198. }
  199. // Integration test with net/http, x/net/http2 to ensure that our Handler proxies
  200. // to its response the call to (http.Hijack).Hijacker() and that that crashes
  201. // since http.Hijacker and HTTP/2.0 connections are incompatible, but the
  202. // detection is only at runtime and ensure that we can stream and flush to the
  203. // connection even after invoking Hijack(). See Issue #642.
  204. func TestHandlerProxiesHijack_HTTP2(t *testing.T) {
  205. cst := httptest.NewUnstartedServer(&Handler{
  206. Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  207. if _, ok := w.(http.Hijacker); ok {
  208. conn, _, err := w.(http.Hijacker).Hijack()
  209. if conn != nil {
  210. data := fmt.Sprintf("Surprisingly got the Hijacker() Proto: %s", r.Proto)
  211. fmt.Fprintf(conn, "%s 200\nContent-Length:%d\r\n\r\n%s", r.Proto, len(data), data)
  212. conn.Close()
  213. return
  214. }
  215. switch {
  216. case err == nil:
  217. fmt.Fprintf(w, "Unexpectedly did not encounter an error!")
  218. default:
  219. fmt.Fprintf(w, "Unexpected error: %v", err)
  220. case strings.Contains(err.(error).Error(), "Hijack"):
  221. // Confirmed HTTP/2.0, let's stream to it
  222. for i := 0; i < 5; i++ {
  223. fmt.Fprintf(w, "%d\n", i)
  224. w.(http.Flusher).Flush()
  225. }
  226. }
  227. } else {
  228. // Confirmed HTTP/2.0, let's stream to it
  229. for i := 0; i < 5; i++ {
  230. fmt.Fprintf(w, "%d\n", i)
  231. w.(http.Flusher).Flush()
  232. }
  233. }
  234. }),
  235. })
  236. cst.TLS = &tls.Config{NextProtos: []string{"h2"}}
  237. cst.StartTLS()
  238. defer cst.Close()
  239. if wantPrefix := "https://"; !strings.HasPrefix(cst.URL, wantPrefix) {
  240. t.Fatalf("URL got = %q wantPrefix = %q", cst.URL, wantPrefix)
  241. }
  242. tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
  243. http2.ConfigureTransport(tr)
  244. c := &http.Client{Transport: tr}
  245. res, err := c.Get(cst.URL)
  246. if err != nil {
  247. t.Fatalf("Unexpected error %v", err)
  248. }
  249. blob, _ := ioutil.ReadAll(res.Body)
  250. res.Body.Close()
  251. if g, w := string(blob), "0\n1\n2\n3\n4\n"; g != w {
  252. t.Errorf("got = %q; want = %q", g, w)
  253. }
  254. }
  255. func TestEnsureTrackingResponseWriterSetsStatusCode(t *testing.T) {
  256. // Ensure that the trackingResponseWriter always sets the spanStatus on ending the span.
  257. // Because we can only examine the Status after exporting, this test roundtrips a
  258. // couple of requests and then later examines the exported spans.
  259. // See Issue #700.
  260. exporter := &spanExporter{cur: make(chan *trace.SpanData, 1)}
  261. trace.RegisterExporter(exporter)
  262. defer trace.UnregisterExporter(exporter)
  263. tests := []struct {
  264. res *http.Response
  265. want trace.Status
  266. }{
  267. {res: &http.Response{StatusCode: 200}, want: trace.Status{Code: trace.StatusCodeOK, Message: `OK`}},
  268. {res: &http.Response{StatusCode: 500}, want: trace.Status{Code: trace.StatusCodeUnknown, Message: `UNKNOWN`}},
  269. {res: &http.Response{StatusCode: 403}, want: trace.Status{Code: trace.StatusCodePermissionDenied, Message: `PERMISSION_DENIED`}},
  270. {res: &http.Response{StatusCode: 401}, want: trace.Status{Code: trace.StatusCodeUnauthenticated, Message: `UNAUTHENTICATED`}},
  271. {res: &http.Response{StatusCode: 429}, want: trace.Status{Code: trace.StatusCodeResourceExhausted, Message: `RESOURCE_EXHAUSTED`}},
  272. }
  273. for _, tt := range tests {
  274. t.Run(tt.want.Message, func(t *testing.T) {
  275. ctx := context.Background()
  276. prc, pwc := io.Pipe()
  277. go func() {
  278. pwc.Write([]byte("Foo"))
  279. pwc.Close()
  280. }()
  281. inRes := tt.res
  282. inRes.Body = prc
  283. tr := &traceTransport{
  284. base: &testResponseTransport{res: inRes},
  285. formatSpanName: spanNameFromURL,
  286. startOptions: trace.StartOptions{
  287. Sampler: trace.AlwaysSample(),
  288. },
  289. }
  290. req, err := http.NewRequest("POST", "https://example.org", bytes.NewReader([]byte("testing")))
  291. if err != nil {
  292. t.Fatalf("NewRequest error: %v", err)
  293. }
  294. req = req.WithContext(ctx)
  295. res, err := tr.RoundTrip(req)
  296. if err != nil {
  297. t.Fatalf("RoundTrip error: %v", err)
  298. }
  299. _, _ = ioutil.ReadAll(res.Body)
  300. res.Body.Close()
  301. cur := <-exporter.cur
  302. if got, want := cur.Status, tt.want; got != want {
  303. t.Fatalf("SpanData:\ngot = (%#v)\nwant = (%#v)", got, want)
  304. }
  305. })
  306. }
  307. }
  308. type spanExporter struct {
  309. sync.Mutex
  310. cur chan *trace.SpanData
  311. }
  312. var _ trace.Exporter = (*spanExporter)(nil)
  313. func (se *spanExporter) ExportSpan(sd *trace.SpanData) {
  314. se.Lock()
  315. se.cur <- sd
  316. se.Unlock()
  317. }
  318. type testResponseTransport struct {
  319. res *http.Response
  320. }
  321. var _ http.RoundTripper = (*testResponseTransport)(nil)
  322. func (rb *testResponseTransport) RoundTrip(*http.Request) (*http.Response, error) {
  323. return rb.res, nil
  324. }
  325. func TestHandlerImplementsHTTPPusher(t *testing.T) {
  326. cst := setupAndStartServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  327. pusher, ok := w.(http.Pusher)
  328. if !ok {
  329. w.Write([]byte("false"))
  330. return
  331. }
  332. err := pusher.Push("/static.css", &http.PushOptions{
  333. Method: "GET",
  334. Header: http.Header{"Accept-Encoding": r.Header["Accept-Encoding"]},
  335. })
  336. if err != nil && false {
  337. // TODO: (@odeke-em) consult with Go stdlib for why trying
  338. // to configure even an HTTP/2 server and HTTP/2 transport
  339. // still return http.ErrNotSupported even without using ochttp.Handler.
  340. http.Error(w, err.Error(), http.StatusBadRequest)
  341. return
  342. }
  343. w.Write([]byte("true"))
  344. }), asHTTP2)
  345. defer cst.Close()
  346. tests := []struct {
  347. rt http.RoundTripper
  348. wantBody string
  349. }{
  350. {
  351. rt: h1Transport(),
  352. wantBody: "false",
  353. },
  354. {
  355. rt: h2Transport(),
  356. wantBody: "true",
  357. },
  358. {
  359. rt: &Transport{Base: h1Transport()},
  360. wantBody: "false",
  361. },
  362. {
  363. rt: &Transport{Base: h2Transport()},
  364. wantBody: "true",
  365. },
  366. }
  367. for i, tt := range tests {
  368. c := &http.Client{Transport: &Transport{Base: tt.rt}}
  369. res, err := c.Get(cst.URL)
  370. if err != nil {
  371. t.Errorf("#%d: Unexpected error %v", i, err)
  372. continue
  373. }
  374. body, _ := ioutil.ReadAll(res.Body)
  375. _ = res.Body.Close()
  376. if g, w := string(body), tt.wantBody; g != w {
  377. t.Errorf("#%d: got = %q; want = %q", i, g, w)
  378. }
  379. }
  380. }
  381. const (
  382. isNil = "isNil"
  383. hang = "hang"
  384. ended = "ended"
  385. nonNotifier = "nonNotifier"
  386. asHTTP1 = false
  387. asHTTP2 = true
  388. )
  389. func setupAndStartServer(hf func(http.ResponseWriter, *http.Request), isHTTP2 bool) *httptest.Server {
  390. cst := httptest.NewUnstartedServer(&Handler{
  391. Handler: http.HandlerFunc(hf),
  392. })
  393. if isHTTP2 {
  394. http2.ConfigureServer(cst.Config, new(http2.Server))
  395. cst.TLS = cst.Config.TLSConfig
  396. cst.StartTLS()
  397. } else {
  398. cst.Start()
  399. }
  400. return cst
  401. }
  402. func insecureTLS() *tls.Config { return &tls.Config{InsecureSkipVerify: true} }
  403. func h1Transport() *http.Transport { return &http.Transport{TLSClientConfig: insecureTLS()} }
  404. func h2Transport() *http.Transport {
  405. tr := &http.Transport{TLSClientConfig: insecureTLS()}
  406. http2.ConfigureTransport(tr)
  407. return tr
  408. }
  409. type concurrentBuffer struct {
  410. sync.RWMutex
  411. bw *bytes.Buffer
  412. }
  413. func (cw *concurrentBuffer) Write(b []byte) (int, error) {
  414. cw.Lock()
  415. defer cw.Unlock()
  416. return cw.bw.Write(b)
  417. }
  418. func (cw *concurrentBuffer) String() string {
  419. cw.Lock()
  420. defer cw.Unlock()
  421. return cw.bw.String()
  422. }
  423. func handleCloseNotify(outLog io.Writer) http.HandlerFunc {
  424. return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  425. cn, ok := w.(http.CloseNotifier)
  426. if !ok {
  427. fmt.Fprintln(outLog, nonNotifier)
  428. return
  429. }
  430. ch := cn.CloseNotify()
  431. if ch == nil {
  432. fmt.Fprintln(outLog, isNil)
  433. return
  434. }
  435. <-ch
  436. fmt.Fprintln(outLog, ended)
  437. })
  438. }
  439. func TestHandlerImplementsHTTPCloseNotify(t *testing.T) {
  440. http1Log := &concurrentBuffer{bw: new(bytes.Buffer)}
  441. http1Server := setupAndStartServer(handleCloseNotify(http1Log), asHTTP1)
  442. http2Log := &concurrentBuffer{bw: new(bytes.Buffer)}
  443. http2Server := setupAndStartServer(handleCloseNotify(http2Log), asHTTP2)
  444. defer http1Server.Close()
  445. defer http2Server.Close()
  446. tests := []struct {
  447. url string
  448. want string
  449. }{
  450. {url: http1Server.URL, want: nonNotifier},
  451. {url: http2Server.URL, want: ended},
  452. }
  453. transports := []struct {
  454. name string
  455. rt http.RoundTripper
  456. }{
  457. {name: "http2+ochttp", rt: &Transport{Base: h2Transport()}},
  458. {name: "http1+ochttp", rt: &Transport{Base: h1Transport()}},
  459. {name: "http1-ochttp", rt: h1Transport()},
  460. {name: "http2-ochttp", rt: h2Transport()},
  461. }
  462. // Each transport invokes one of two server types, either HTTP/1 or HTTP/2
  463. for _, trc := range transports {
  464. // Try out all the transport combinations
  465. for i, tt := range tests {
  466. req, err := http.NewRequest("GET", tt.url, nil)
  467. if err != nil {
  468. t.Errorf("#%d: Unexpected error making request: %v", i, err)
  469. continue
  470. }
  471. // Using a timeout to ensure that the request is cancelled and the server
  472. // if its handler implements CloseNotify will see this as the client leaving.
  473. ctx, cancel := context.WithTimeout(context.Background(), 80*time.Millisecond)
  474. defer cancel()
  475. req = req.WithContext(ctx)
  476. client := &http.Client{Transport: trc.rt}
  477. res, err := client.Do(req)
  478. if err != nil && !strings.Contains(err.Error(), "context deadline exceeded") {
  479. t.Errorf("#%d: %sClient Unexpected error %v", i, trc.name, err)
  480. continue
  481. }
  482. if res != nil && res.Body != nil {
  483. io.CopyN(ioutil.Discard, res.Body, 5)
  484. _ = res.Body.Close()
  485. }
  486. }
  487. }
  488. // Wait for a couple of milliseconds for the GoAway frames to be properly propagated
  489. <-time.After(200 * time.Millisecond)
  490. wantHTTP1Log := strings.Repeat("ended\n", len(transports))
  491. wantHTTP2Log := strings.Repeat("ended\n", len(transports))
  492. if g, w := http1Log.String(), wantHTTP1Log; g != w {
  493. t.Errorf("HTTP1Log got\n\t%q\nwant\n\t%q", g, w)
  494. }
  495. if g, w := http2Log.String(), wantHTTP2Log; g != w {
  496. t.Errorf("HTTP2Log got\n\t%q\nwant\n\t%q", g, w)
  497. }
  498. }
  499. func TestIgnoreHealthz(t *testing.T) {
  500. var spans int
  501. ts := httptest.NewServer(&Handler{
  502. Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  503. span := trace.FromContext(r.Context())
  504. if span != nil {
  505. spans++
  506. }
  507. fmt.Fprint(w, "ok")
  508. }),
  509. StartOptions: trace.StartOptions{
  510. Sampler: trace.AlwaysSample(),
  511. },
  512. })
  513. defer ts.Close()
  514. client := &http.Client{}
  515. for _, path := range []string{"/healthz", "/_ah/health"} {
  516. resp, err := client.Get(ts.URL + path)
  517. if err != nil {
  518. t.Fatalf("Cannot GET %q: %v", path, err)
  519. }
  520. b, err := ioutil.ReadAll(resp.Body)
  521. if err != nil {
  522. t.Fatalf("Cannot read body for %q: %v", path, err)
  523. }
  524. if got, want := string(b), "ok"; got != want {
  525. t.Fatalf("Body for %q = %q; want %q", path, got, want)
  526. }
  527. resp.Body.Close()
  528. }
  529. if spans > 0 {
  530. t.Errorf("Got %v spans; want no spans", spans)
  531. }
  532. }