client_stats_handler_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package ocgrpc
  16. import (
  17. "reflect"
  18. "testing"
  19. "github.com/google/go-cmp/cmp"
  20. "github.com/google/go-cmp/cmp/cmpopts"
  21. "go.opencensus.io/trace"
  22. "google.golang.org/grpc/codes"
  23. "google.golang.org/grpc/status"
  24. "context"
  25. "go.opencensus.io/metric/metricdata"
  26. "go.opencensus.io/stats/view"
  27. "go.opencensus.io/tag"
  28. "google.golang.org/grpc/stats"
  29. )
  30. func TestClientDefaultCollections(t *testing.T) {
  31. k1, _ := tag.NewKey("k1")
  32. k2, _ := tag.NewKey("k2")
  33. type tagPair struct {
  34. k tag.Key
  35. v string
  36. }
  37. type wantData struct {
  38. v func() *view.View
  39. rows []*view.Row
  40. }
  41. type rpc struct {
  42. tags []tagPair
  43. tagInfo *stats.RPCTagInfo
  44. inPayloads []*stats.InPayload
  45. outPayloads []*stats.OutPayload
  46. end *stats.End
  47. }
  48. type testCase struct {
  49. label string
  50. rpcs []*rpc
  51. wants []*wantData
  52. }
  53. tcs := []testCase{
  54. {
  55. label: "1",
  56. rpcs: []*rpc{
  57. {
  58. []tagPair{{k1, "v1"}},
  59. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  60. []*stats.InPayload{
  61. {Length: 10},
  62. },
  63. []*stats.OutPayload{
  64. {Length: 10},
  65. },
  66. &stats.End{Error: nil},
  67. },
  68. },
  69. wants: []*wantData{
  70. {
  71. func() *view.View { return ClientSentMessagesPerRPCView },
  72. []*view.Row{
  73. {
  74. Tags: []tag.Tag{
  75. {Key: KeyClientMethod, Value: "package.service/method"},
  76. },
  77. Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
  78. },
  79. },
  80. },
  81. {
  82. func() *view.View { return ClientReceivedMessagesPerRPCView },
  83. []*view.Row{
  84. {
  85. Tags: []tag.Tag{
  86. {Key: KeyClientMethod, Value: "package.service/method"},
  87. },
  88. Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
  89. },
  90. },
  91. },
  92. {
  93. func() *view.View { return ClientSentBytesPerRPCView },
  94. []*view.Row{
  95. {
  96. Tags: []tag.Tag{
  97. {Key: KeyClientMethod, Value: "package.service/method"},
  98. },
  99. Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
  100. },
  101. },
  102. },
  103. {
  104. func() *view.View { return ClientReceivedBytesPerRPCView },
  105. []*view.Row{
  106. {
  107. Tags: []tag.Tag{
  108. {Key: KeyClientMethod, Value: "package.service/method"},
  109. },
  110. Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
  111. },
  112. },
  113. },
  114. },
  115. },
  116. {
  117. label: "2",
  118. rpcs: []*rpc{
  119. {
  120. []tagPair{{k1, "v1"}},
  121. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  122. []*stats.InPayload{
  123. {Length: 10},
  124. },
  125. []*stats.OutPayload{
  126. {Length: 10},
  127. {Length: 10},
  128. {Length: 10},
  129. },
  130. &stats.End{Error: nil},
  131. },
  132. {
  133. []tagPair{{k1, "v11"}},
  134. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  135. []*stats.InPayload{
  136. {Length: 10},
  137. {Length: 10},
  138. },
  139. []*stats.OutPayload{
  140. {Length: 10},
  141. {Length: 10},
  142. },
  143. &stats.End{Error: status.Error(codes.Canceled, "canceled")},
  144. },
  145. },
  146. wants: []*wantData{
  147. {
  148. func() *view.View { return ClientSentMessagesPerRPCView },
  149. []*view.Row{
  150. {
  151. Tags: []tag.Tag{
  152. {Key: KeyClientMethod, Value: "package.service/method"},
  153. },
  154. Data: newDistributionData([]int64{0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 2, 3, 2.5, 0.5),
  155. },
  156. },
  157. },
  158. {
  159. func() *view.View { return ClientReceivedMessagesPerRPCView },
  160. []*view.Row{
  161. {
  162. Tags: []tag.Tag{
  163. {Key: KeyClientMethod, Value: "package.service/method"},
  164. },
  165. Data: newDistributionData([]int64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 1, 2, 1.5, 0.5),
  166. },
  167. },
  168. },
  169. },
  170. },
  171. {
  172. label: "3",
  173. rpcs: []*rpc{
  174. {
  175. []tagPair{{k1, "v1"}},
  176. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  177. []*stats.InPayload{
  178. {Length: 1},
  179. },
  180. []*stats.OutPayload{
  181. {Length: 1},
  182. {Length: 1024},
  183. {Length: 65536},
  184. },
  185. &stats.End{Error: nil},
  186. },
  187. {
  188. []tagPair{{k1, "v1"}, {k2, "v2"}},
  189. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  190. []*stats.InPayload{
  191. {Length: 1024},
  192. },
  193. []*stats.OutPayload{
  194. {Length: 4096},
  195. {Length: 16384},
  196. },
  197. &stats.End{Error: status.Error(codes.Canceled, "canceled")},
  198. },
  199. {
  200. []tagPair{{k1, "v11"}, {k2, "v22"}},
  201. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  202. []*stats.InPayload{
  203. {Length: 2048},
  204. {Length: 16384},
  205. },
  206. []*stats.OutPayload{
  207. {Length: 2048},
  208. {Length: 4096},
  209. {Length: 16384},
  210. },
  211. &stats.End{Error: status.Error(codes.Aborted, "aborted")},
  212. },
  213. },
  214. wants: []*wantData{
  215. {
  216. func() *view.View { return ClientSentMessagesPerRPCView },
  217. []*view.Row{
  218. {
  219. Tags: []tag.Tag{
  220. {Key: KeyClientMethod, Value: "package.service/method"},
  221. },
  222. Data: newDistributionData([]int64{0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 2, 3, 2.666666666, 0.333333333*2),
  223. },
  224. },
  225. },
  226. {
  227. func() *view.View { return ClientReceivedMessagesPerRPCView },
  228. []*view.Row{
  229. {
  230. Tags: []tag.Tag{
  231. {Key: KeyClientMethod, Value: "package.service/method"},
  232. },
  233. Data: newDistributionData([]int64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 2, 1.333333333, 0.333333333*2),
  234. },
  235. },
  236. },
  237. {
  238. func() *view.View { return ClientSentBytesPerRPCView },
  239. []*view.Row{
  240. {
  241. Tags: []tag.Tag{
  242. {Key: KeyClientMethod, Value: "package.service/method"},
  243. },
  244. Data: newDistributionData([]int64{0, 0, 0, 0, 2 /*16384*/, 1 /*65536*/, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 20480, 66561, 36523, 1.355519318e+09),
  245. },
  246. },
  247. },
  248. {
  249. func() *view.View { return ClientReceivedBytesPerRPCView },
  250. []*view.Row{
  251. {
  252. Tags: []tag.Tag{
  253. {Key: KeyClientMethod, Value: "package.service/method"},
  254. },
  255. Data: newDistributionData([]int64{1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 18432, 6485.666667, 2.1459558466666666e+08),
  256. },
  257. },
  258. },
  259. },
  260. },
  261. }
  262. views := []*view.View{
  263. ClientSentBytesPerRPCView,
  264. ClientReceivedBytesPerRPCView,
  265. ClientRoundtripLatencyView,
  266. ClientCompletedRPCsView,
  267. ClientSentMessagesPerRPCView,
  268. ClientReceivedMessagesPerRPCView,
  269. }
  270. for _, tc := range tcs {
  271. // Register views.
  272. if err := view.Register(views...); err != nil {
  273. t.Error(err)
  274. }
  275. h := &ClientHandler{}
  276. h.StartOptions.Sampler = trace.NeverSample()
  277. for _, rpc := range tc.rpcs {
  278. var mods []tag.Mutator
  279. for _, t := range rpc.tags {
  280. mods = append(mods, tag.Upsert(t.k, t.v))
  281. }
  282. ctx, err := tag.New(context.Background(), mods...)
  283. if err != nil {
  284. t.Errorf("%q: NewMap = %v", tc.label, err)
  285. }
  286. encoded := tag.Encode(tag.FromContext(ctx))
  287. ctx = stats.SetTags(context.Background(), encoded)
  288. ctx = h.TagRPC(ctx, rpc.tagInfo)
  289. for _, out := range rpc.outPayloads {
  290. out.Client = true
  291. h.HandleRPC(ctx, out)
  292. }
  293. for _, in := range rpc.inPayloads {
  294. in.Client = true
  295. h.HandleRPC(ctx, in)
  296. }
  297. rpc.end.Client = true
  298. h.HandleRPC(ctx, rpc.end)
  299. }
  300. for _, wantData := range tc.wants {
  301. gotRows, err := view.RetrieveData(wantData.v().Name)
  302. if err != nil {
  303. t.Errorf("%q: RetrieveData(%q) = %v", tc.label, wantData.v().Name, err)
  304. continue
  305. }
  306. for _, gotRow := range gotRows {
  307. if !containsRow(wantData.rows, gotRow) {
  308. t.Errorf("%q: unwanted row for view %q = %v", tc.label, wantData.v().Name, gotRow)
  309. break
  310. }
  311. }
  312. for _, wantRow := range wantData.rows {
  313. if !containsRow(gotRows, wantRow) {
  314. t.Errorf("%q: row missing for view %q; want %v", tc.label, wantData.v().Name, wantRow)
  315. break
  316. }
  317. }
  318. }
  319. // Unregister views to cleanup.
  320. view.Unregister(views...)
  321. }
  322. }
  323. func TestClientRecordExemplar(t *testing.T) {
  324. key, _ := tag.NewKey("test_key")
  325. tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"}
  326. out := &stats.OutPayload{Length: 2000}
  327. end := &stats.End{Error: nil}
  328. if err := view.Register(ClientSentBytesPerRPCView); err != nil {
  329. t.Error(err)
  330. }
  331. h := &ClientHandler{}
  332. h.StartOptions.Sampler = trace.AlwaysSample()
  333. ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val"))
  334. if err != nil {
  335. t.Error(err)
  336. }
  337. encoded := tag.Encode(tag.FromContext(ctx))
  338. ctx = stats.SetTags(context.Background(), encoded)
  339. ctx = h.TagRPC(ctx, tagInfo)
  340. out.Client = true
  341. h.HandleRPC(ctx, out)
  342. end.Client = true
  343. h.HandleRPC(ctx, end)
  344. span := trace.FromContext(ctx)
  345. if span == nil {
  346. t.Fatal("expected non-nil span, got nil")
  347. }
  348. if !span.IsRecordingEvents() {
  349. t.Errorf("span should be sampled")
  350. }
  351. attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()}
  352. wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments}
  353. rows, err := view.RetrieveData(ClientSentBytesPerRPCView.Name)
  354. if err != nil {
  355. t.Fatal("Error RetrieveData ", err)
  356. }
  357. if len(rows) == 0 {
  358. t.Fatal("No data was recorded.")
  359. }
  360. data := rows[0].Data
  361. dis, ok := data.(*view.DistributionData)
  362. if !ok {
  363. t.Fatal("want DistributionData, got ", data)
  364. }
  365. // Only recorded value is 2000, which falls into the second bucket (1024, 2048].
  366. wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
  367. if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
  368. t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
  369. }
  370. for i, e := range dis.ExemplarsPerBucket {
  371. // Only the second bucket should have an exemplar.
  372. if i == 1 {
  373. if diff := cmpExemplar(e, wantExemplar); diff != "" {
  374. t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
  375. }
  376. } else if e != nil {
  377. t.Errorf("want nil exemplar, got %v", e)
  378. }
  379. }
  380. // Unregister views to cleanup.
  381. view.Unregister(ClientSentBytesPerRPCView)
  382. }
  383. // containsRow returns true if rows contain r.
  384. func containsRow(rows []*view.Row, r *view.Row) bool {
  385. for _, x := range rows {
  386. if r.Equal(x) {
  387. return true
  388. }
  389. }
  390. return false
  391. }
  392. // Compare exemplars while ignoring exemplar timestamp, since timestamp is non-deterministic.
  393. func cmpExemplar(got, want *metricdata.Exemplar) string {
  394. return cmp.Diff(got, want, cmpopts.IgnoreFields(metricdata.Exemplar{}, "Timestamp"), cmpopts.IgnoreUnexported(metricdata.Exemplar{}))
  395. }