stats_common.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package ocgrpc
  16. import (
  17. "context"
  18. "strconv"
  19. "strings"
  20. "sync/atomic"
  21. "time"
  22. "go.opencensus.io/metric/metricdata"
  23. ocstats "go.opencensus.io/stats"
  24. "go.opencensus.io/stats/view"
  25. "go.opencensus.io/tag"
  26. "go.opencensus.io/trace"
  27. "google.golang.org/grpc/codes"
  28. "google.golang.org/grpc/grpclog"
  29. "google.golang.org/grpc/stats"
  30. "google.golang.org/grpc/status"
  31. )
  32. type grpcInstrumentationKey string
  33. // rpcData holds the instrumentation RPC data that is needed between the start
  34. // and end of an call. It holds the info that this package needs to keep track
  35. // of between the various GRPC events.
  36. type rpcData struct {
  37. // reqCount and respCount has to be the first words
  38. // in order to be 64-aligned on 32-bit architectures.
  39. sentCount, sentBytes, recvCount, recvBytes int64 // access atomically
  40. // startTime represents the time at which TagRPC was invoked at the
  41. // beginning of an RPC. It is an appoximation of the time when the
  42. // application code invoked GRPC code.
  43. startTime time.Time
  44. method string
  45. }
  46. // The following variables define the default hard-coded auxiliary data used by
  47. // both the default GRPC client and GRPC server metrics.
  48. var (
  49. DefaultBytesDistribution = view.Distribution(1024, 2048, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824, 4294967296)
  50. DefaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 50000, 100000)
  51. DefaultMessageCountDistribution = view.Distribution(1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536)
  52. )
  53. // Server tags are applied to the context used to process each RPC, as well as
  54. // the measures at the end of each RPC.
  55. var (
  56. KeyServerMethod, _ = tag.NewKey("grpc_server_method")
  57. KeyServerStatus, _ = tag.NewKey("grpc_server_status")
  58. )
  59. // Client tags are applied to measures at the end of each RPC.
  60. var (
  61. KeyClientMethod, _ = tag.NewKey("grpc_client_method")
  62. KeyClientStatus, _ = tag.NewKey("grpc_client_status")
  63. )
  64. var (
  65. rpcDataKey = grpcInstrumentationKey("opencensus-rpcData")
  66. )
  67. func methodName(fullname string) string {
  68. return strings.TrimLeft(fullname, "/")
  69. }
  70. // statsHandleRPC processes the RPC events.
  71. func statsHandleRPC(ctx context.Context, s stats.RPCStats) {
  72. switch st := s.(type) {
  73. case *stats.Begin, *stats.OutHeader, *stats.InHeader, *stats.InTrailer, *stats.OutTrailer:
  74. // do nothing for client
  75. case *stats.OutPayload:
  76. handleRPCOutPayload(ctx, st)
  77. case *stats.InPayload:
  78. handleRPCInPayload(ctx, st)
  79. case *stats.End:
  80. handleRPCEnd(ctx, st)
  81. default:
  82. grpclog.Infof("unexpected stats: %T", st)
  83. }
  84. }
  85. func handleRPCOutPayload(ctx context.Context, s *stats.OutPayload) {
  86. d, ok := ctx.Value(rpcDataKey).(*rpcData)
  87. if !ok {
  88. if grpclog.V(2) {
  89. grpclog.Infoln("Failed to retrieve *rpcData from context.")
  90. }
  91. return
  92. }
  93. atomic.AddInt64(&d.sentBytes, int64(s.Length))
  94. atomic.AddInt64(&d.sentCount, 1)
  95. }
  96. func handleRPCInPayload(ctx context.Context, s *stats.InPayload) {
  97. d, ok := ctx.Value(rpcDataKey).(*rpcData)
  98. if !ok {
  99. if grpclog.V(2) {
  100. grpclog.Infoln("Failed to retrieve *rpcData from context.")
  101. }
  102. return
  103. }
  104. atomic.AddInt64(&d.recvBytes, int64(s.Length))
  105. atomic.AddInt64(&d.recvCount, 1)
  106. }
  107. func handleRPCEnd(ctx context.Context, s *stats.End) {
  108. d, ok := ctx.Value(rpcDataKey).(*rpcData)
  109. if !ok {
  110. if grpclog.V(2) {
  111. grpclog.Infoln("Failed to retrieve *rpcData from context.")
  112. }
  113. return
  114. }
  115. elapsedTime := time.Since(d.startTime)
  116. var st string
  117. if s.Error != nil {
  118. s, ok := status.FromError(s.Error)
  119. if ok {
  120. st = statusCodeToString(s)
  121. }
  122. } else {
  123. st = "OK"
  124. }
  125. latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
  126. attachments := getSpanCtxAttachment(ctx)
  127. if s.Client {
  128. ocstats.RecordWithOptions(ctx,
  129. ocstats.WithTags(
  130. tag.Upsert(KeyClientMethod, methodName(d.method)),
  131. tag.Upsert(KeyClientStatus, st)),
  132. ocstats.WithAttachments(attachments),
  133. ocstats.WithMeasurements(
  134. ClientSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
  135. ClientSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
  136. ClientReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
  137. ClientReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
  138. ClientRoundtripLatency.M(latencyMillis)))
  139. } else {
  140. ocstats.RecordWithOptions(ctx,
  141. ocstats.WithTags(
  142. tag.Upsert(KeyServerStatus, st),
  143. ),
  144. ocstats.WithAttachments(attachments),
  145. ocstats.WithMeasurements(
  146. ServerSentBytesPerRPC.M(atomic.LoadInt64(&d.sentBytes)),
  147. ServerSentMessagesPerRPC.M(atomic.LoadInt64(&d.sentCount)),
  148. ServerReceivedMessagesPerRPC.M(atomic.LoadInt64(&d.recvCount)),
  149. ServerReceivedBytesPerRPC.M(atomic.LoadInt64(&d.recvBytes)),
  150. ServerLatency.M(latencyMillis)))
  151. }
  152. }
  153. func statusCodeToString(s *status.Status) string {
  154. // see https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
  155. switch c := s.Code(); c {
  156. case codes.OK:
  157. return "OK"
  158. case codes.Canceled:
  159. return "CANCELLED"
  160. case codes.Unknown:
  161. return "UNKNOWN"
  162. case codes.InvalidArgument:
  163. return "INVALID_ARGUMENT"
  164. case codes.DeadlineExceeded:
  165. return "DEADLINE_EXCEEDED"
  166. case codes.NotFound:
  167. return "NOT_FOUND"
  168. case codes.AlreadyExists:
  169. return "ALREADY_EXISTS"
  170. case codes.PermissionDenied:
  171. return "PERMISSION_DENIED"
  172. case codes.ResourceExhausted:
  173. return "RESOURCE_EXHAUSTED"
  174. case codes.FailedPrecondition:
  175. return "FAILED_PRECONDITION"
  176. case codes.Aborted:
  177. return "ABORTED"
  178. case codes.OutOfRange:
  179. return "OUT_OF_RANGE"
  180. case codes.Unimplemented:
  181. return "UNIMPLEMENTED"
  182. case codes.Internal:
  183. return "INTERNAL"
  184. case codes.Unavailable:
  185. return "UNAVAILABLE"
  186. case codes.DataLoss:
  187. return "DATA_LOSS"
  188. case codes.Unauthenticated:
  189. return "UNAUTHENTICATED"
  190. default:
  191. return "CODE_" + strconv.FormatInt(int64(c), 10)
  192. }
  193. }
  194. func getSpanCtxAttachment(ctx context.Context) metricdata.Attachments {
  195. attachments := map[string]interface{}{}
  196. span := trace.FromContext(ctx)
  197. if span == nil {
  198. return attachments
  199. }
  200. spanCtx := span.SpanContext()
  201. if spanCtx.IsSampled() {
  202. attachments[metricdata.AttachmentKeySpanContext] = spanCtx
  203. }
  204. return attachments
  205. }