server_stats_handler_test.go 10 KB


  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package ocgrpc
  16. import (
  17. "reflect"
  18. "testing"
  19. "context"
  20. "go.opencensus.io/trace"
  21. "go.opencensus.io/metric/metricdata"
  22. "go.opencensus.io/stats/view"
  23. "go.opencensus.io/tag"
  24. "google.golang.org/grpc/codes"
  25. "google.golang.org/grpc/stats"
  26. "google.golang.org/grpc/status"
  27. )
  28. func TestServerDefaultCollections(t *testing.T) {
  29. k1, _ := tag.NewKey("k1")
  30. k2, _ := tag.NewKey("k2")
  31. type tagPair struct {
  32. k tag.Key
  33. v string
  34. }
  35. type wantData struct {
  36. v func() *view.View
  37. rows []*view.Row
  38. }
  39. type rpc struct {
  40. tags []tagPair
  41. tagInfo *stats.RPCTagInfo
  42. inPayloads []*stats.InPayload
  43. outPayloads []*stats.OutPayload
  44. end *stats.End
  45. }
  46. type testCase struct {
  47. label string
  48. rpcs []*rpc
  49. wants []*wantData
  50. }
  51. tcs := []testCase{
  52. {
  53. "1",
  54. []*rpc{
  55. {
  56. []tagPair{{k1, "v1"}},
  57. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  58. []*stats.InPayload{
  59. {Length: 10},
  60. },
  61. []*stats.OutPayload{
  62. {Length: 10},
  63. },
  64. &stats.End{Error: nil},
  65. },
  66. },
  67. []*wantData{
  68. {
  69. func() *view.View { return ServerReceivedMessagesPerRPCView },
  70. []*view.Row{
  71. {
  72. Tags: []tag.Tag{
  73. {Key: KeyServerMethod, Value: "package.service/method"},
  74. },
  75. Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
  76. },
  77. },
  78. },
  79. {
  80. func() *view.View { return ServerSentMessagesPerRPCView },
  81. []*view.Row{
  82. {
  83. Tags: []tag.Tag{
  84. {Key: KeyServerMethod, Value: "package.service/method"},
  85. },
  86. Data: newDistributionData([]int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 1, 1, 1, 0),
  87. },
  88. },
  89. },
  90. {
  91. func() *view.View { return ServerReceivedBytesPerRPCView },
  92. []*view.Row{
  93. {
  94. Tags: []tag.Tag{
  95. {Key: KeyServerMethod, Value: "package.service/method"},
  96. },
  97. Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
  98. },
  99. },
  100. },
  101. {
  102. func() *view.View { return ServerSentBytesPerRPCView },
  103. []*view.Row{
  104. {
  105. Tags: []tag.Tag{
  106. {Key: KeyServerMethod, Value: "package.service/method"},
  107. },
  108. Data: newDistributionData([]int64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 1, 10, 10, 10, 0),
  109. },
  110. },
  111. },
  112. },
  113. },
  114. {
  115. "2",
  116. []*rpc{
  117. {
  118. []tagPair{{k1, "v1"}},
  119. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  120. []*stats.InPayload{
  121. {Length: 10},
  122. },
  123. []*stats.OutPayload{
  124. {Length: 10},
  125. {Length: 10},
  126. {Length: 10},
  127. },
  128. &stats.End{Error: nil},
  129. },
  130. {
  131. []tagPair{{k1, "v11"}},
  132. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  133. []*stats.InPayload{
  134. {Length: 10},
  135. {Length: 10},
  136. },
  137. []*stats.OutPayload{
  138. {Length: 10},
  139. {Length: 10},
  140. },
  141. &stats.End{Error: status.Error(codes.Canceled, "canceled")},
  142. },
  143. },
  144. []*wantData{
  145. {
  146. func() *view.View { return ServerReceivedMessagesPerRPCView },
  147. []*view.Row{
  148. {
  149. Tags: []tag.Tag{
  150. {Key: KeyServerMethod, Value: "package.service/method"},
  151. },
  152. Data: newDistributionData([]int64{0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 1, 2, 1.5, 0.5),
  153. },
  154. },
  155. },
  156. {
  157. func() *view.View { return ServerSentMessagesPerRPCView },
  158. []*view.Row{
  159. {
  160. Tags: []tag.Tag{
  161. {Key: KeyServerMethod, Value: "package.service/method"},
  162. },
  163. Data: newDistributionData([]int64{0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 2, 2, 3, 2.5, 0.5),
  164. },
  165. },
  166. },
  167. },
  168. },
  169. {
  170. "3",
  171. []*rpc{
  172. {
  173. []tagPair{{k1, "v1"}},
  174. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  175. []*stats.InPayload{
  176. {Length: 1},
  177. },
  178. []*stats.OutPayload{
  179. {Length: 1},
  180. {Length: 1024},
  181. {Length: 65536},
  182. },
  183. &stats.End{Error: nil},
  184. },
  185. {
  186. []tagPair{{k1, "v1"}, {k2, "v2"}},
  187. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  188. []*stats.InPayload{
  189. {Length: 1024},
  190. },
  191. []*stats.OutPayload{
  192. {Length: 4096},
  193. {Length: 16384},
  194. },
  195. &stats.End{Error: status.Error(codes.Aborted, "aborted")},
  196. },
  197. {
  198. []tagPair{{k1, "v11"}, {k2, "v22"}},
  199. &stats.RPCTagInfo{FullMethodName: "/package.service/method"},
  200. []*stats.InPayload{
  201. {Length: 2048},
  202. {Length: 16384},
  203. },
  204. []*stats.OutPayload{
  205. {Length: 2048},
  206. {Length: 4096},
  207. {Length: 16384},
  208. },
  209. &stats.End{Error: status.Error(codes.Canceled, "canceled")},
  210. },
  211. },
  212. []*wantData{
  213. {
  214. func() *view.View { return ServerReceivedMessagesPerRPCView },
  215. []*view.Row{
  216. {
  217. Tags: []tag.Tag{
  218. {Key: KeyServerMethod, Value: "package.service/method"},
  219. },
  220. Data: newDistributionData([]int64{0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 2, 1.333333333, 0.333333333*2),
  221. },
  222. },
  223. },
  224. {
  225. func() *view.View { return ServerSentMessagesPerRPCView },
  226. []*view.Row{
  227. {
  228. Tags: []tag.Tag{
  229. {Key: KeyServerMethod, Value: "package.service/method"},
  230. },
  231. Data: newDistributionData([]int64{0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 2, 3, 2.666666666, 0.333333333*2),
  232. },
  233. },
  234. },
  235. {
  236. func() *view.View { return ServerReceivedBytesPerRPCView },
  237. []*view.Row{
  238. {
  239. Tags: []tag.Tag{
  240. {Key: KeyServerMethod, Value: "package.service/method"},
  241. },
  242. Data: newDistributionData([]int64{1, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 1, 18432, 6485.6666667, 2.1459558466666667e+08),
  243. },
  244. },
  245. },
  246. {
  247. func() *view.View { return ServerSentBytesPerRPCView },
  248. []*view.Row{
  249. {
  250. Tags: []tag.Tag{
  251. {Key: KeyServerMethod, Value: "package.service/method"},
  252. },
  253. Data: newDistributionData([]int64{0, 0, 0, 0, 2, 1, 0, 0, 0, 0, 0, 0, 0, 0}, 3, 20480, 66561, 36523, 1.355519318e+09),
  254. },
  255. },
  256. },
  257. },
  258. },
  259. }
  260. views := append(DefaultServerViews[:], ServerReceivedMessagesPerRPCView, ServerSentMessagesPerRPCView)
  261. for _, tc := range tcs {
  262. if err := view.Register(views...); err != nil {
  263. t.Fatal(err)
  264. }
  265. h := &ServerHandler{}
  266. h.StartOptions.Sampler = trace.NeverSample()
  267. for _, rpc := range tc.rpcs {
  268. mods := []tag.Mutator{}
  269. for _, t := range rpc.tags {
  270. mods = append(mods, tag.Upsert(t.k, t.v))
  271. }
  272. ctx, err := tag.New(context.Background(), mods...)
  273. if err != nil {
  274. t.Errorf("%q: NewMap = %v", tc.label, err)
  275. }
  276. encoded := tag.Encode(tag.FromContext(ctx))
  277. ctx = stats.SetTags(context.Background(), encoded)
  278. ctx = h.TagRPC(ctx, rpc.tagInfo)
  279. for _, in := range rpc.inPayloads {
  280. h.HandleRPC(ctx, in)
  281. }
  282. for _, out := range rpc.outPayloads {
  283. h.HandleRPC(ctx, out)
  284. }
  285. h.HandleRPC(ctx, rpc.end)
  286. }
  287. for _, wantData := range tc.wants {
  288. gotRows, err := view.RetrieveData(wantData.v().Name)
  289. if err != nil {
  290. t.Errorf("%q: RetrieveData (%q) = %v", tc.label, wantData.v().Name, err)
  291. continue
  292. }
  293. for _, gotRow := range gotRows {
  294. if !containsRow(wantData.rows, gotRow) {
  295. t.Errorf("%q: unwanted row for view %q: %v", tc.label, wantData.v().Name, gotRow)
  296. break
  297. }
  298. }
  299. for _, wantRow := range wantData.rows {
  300. if !containsRow(gotRows, wantRow) {
  301. t.Errorf("%q: missing row for view %q: %v", tc.label, wantData.v().Name, wantRow)
  302. break
  303. }
  304. }
  305. }
  306. // Unregister views to cleanup.
  307. view.Unregister(views...)
  308. }
  309. }
  310. func newDistributionData(countPerBucket []int64, count int64, min, max, mean, sumOfSquaredDev float64) *view.DistributionData {
  311. return &view.DistributionData{
  312. Count: count,
  313. Min: min,
  314. Max: max,
  315. Mean: mean,
  316. SumOfSquaredDev: sumOfSquaredDev,
  317. CountPerBucket: countPerBucket,
  318. }
  319. }
  320. func TestServerRecordExemplar(t *testing.T) {
  321. key, _ := tag.NewKey("test_key")
  322. tagInfo := &stats.RPCTagInfo{FullMethodName: "/package.service/method"}
  323. out := &stats.OutPayload{Length: 2000}
  324. end := &stats.End{Error: nil}
  325. if err := view.Register(ServerSentBytesPerRPCView); err != nil {
  326. t.Error(err)
  327. }
  328. h := &ServerHandler{}
  329. h.StartOptions.Sampler = trace.AlwaysSample()
  330. ctx, err := tag.New(context.Background(), tag.Upsert(key, "test_val"))
  331. if err != nil {
  332. t.Error(err)
  333. }
  334. encoded := tag.Encode(tag.FromContext(ctx))
  335. ctx = stats.SetTags(context.Background(), encoded)
  336. ctx = h.TagRPC(ctx, tagInfo)
  337. out.Client = false
  338. h.HandleRPC(ctx, out)
  339. end.Client = false
  340. h.HandleRPC(ctx, end)
  341. span := trace.FromContext(ctx)
  342. if span == nil {
  343. t.Fatal("expected non-nil span, got nil")
  344. }
  345. if !span.IsRecordingEvents() {
  346. t.Errorf("span should be sampled")
  347. }
  348. attachments := map[string]interface{}{metricdata.AttachmentKeySpanContext: span.SpanContext()}
  349. wantExemplar := &metricdata.Exemplar{Value: 2000, Attachments: attachments}
  350. rows, err := view.RetrieveData(ServerSentBytesPerRPCView.Name)
  351. if err != nil {
  352. t.Fatal("Error RetrieveData ", err)
  353. }
  354. if len(rows) == 0 {
  355. t.Fatal("No data was recorded.")
  356. }
  357. data := rows[0].Data
  358. dis, ok := data.(*view.DistributionData)
  359. if !ok {
  360. t.Fatal("want DistributionData, got ", data)
  361. }
  362. // Only recorded value is 2000, which falls into the second bucket (1024, 2048].
  363. wantBuckets := []int64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
  364. if !reflect.DeepEqual(dis.CountPerBucket, wantBuckets) {
  365. t.Errorf("want buckets %v, got %v", wantBuckets, dis.CountPerBucket)
  366. }
  367. for i, e := range dis.ExemplarsPerBucket {
  368. // Only the second bucket should have an exemplar.
  369. if i == 1 {
  370. if diff := cmpExemplar(e, wantExemplar); diff != "" {
  371. t.Fatalf("Unexpected Exemplar -got +want: %s", diff)
  372. }
  373. } else if e != nil {
  374. t.Errorf("want nil exemplar, got %v", e)
  375. }
  376. }
  377. // Unregister views to cleanup.
  378. view.Unregister(ServerSentBytesPerRPCView)
  379. }