rpcz.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package zpages
  16. import (
  17. "fmt"
  18. "io"
  19. "log"
  20. "math"
  21. "net/http"
  22. "sort"
  23. "sync"
  24. "text/tabwriter"
  25. "time"
  26. "go.opencensus.io/plugin/ocgrpc"
  27. "go.opencensus.io/stats/view"
  28. )
  29. const bytesPerKb = 1024
  30. var (
  31. programStartTime = time.Now()
  32. mu sync.Mutex // protects snaps
  33. snaps = make(map[methodKey]*statSnapshot)
  34. // viewType lists the views we are interested in for RPC stats.
  35. // A view's map value indicates whether that view contains data for received
  36. // RPCs.
  37. viewType = map[*view.View]bool{
  38. ocgrpc.ClientCompletedRPCsView: false,
  39. ocgrpc.ClientSentBytesPerRPCView: false,
  40. ocgrpc.ClientSentMessagesPerRPCView: false,
  41. ocgrpc.ClientReceivedBytesPerRPCView: false,
  42. ocgrpc.ClientReceivedMessagesPerRPCView: false,
  43. ocgrpc.ClientRoundtripLatencyView: false,
  44. ocgrpc.ServerCompletedRPCsView: true,
  45. ocgrpc.ServerReceivedBytesPerRPCView: true,
  46. ocgrpc.ServerReceivedMessagesPerRPCView: true,
  47. ocgrpc.ServerSentBytesPerRPCView: true,
  48. ocgrpc.ServerSentMessagesPerRPCView: true,
  49. ocgrpc.ServerLatencyView: true,
  50. }
  51. )
  52. func registerRPCViews() {
  53. views := make([]*view.View, 0, len(viewType))
  54. for v := range viewType {
  55. views = append(views, v)
  56. }
  57. if err := view.Register(views...); err != nil {
  58. log.Printf("error subscribing to views: %v", err)
  59. }
  60. view.RegisterExporter(snapExporter{})
  61. }
  62. func rpczHandler(w http.ResponseWriter, r *http.Request) {
  63. w.Header().Set("Content-Type", "text/html; charset=utf-8")
  64. WriteHTMLRpczPage(w)
  65. }
  66. // WriteHTMLRpczPage writes an HTML document to w containing per-method RPC stats.
  67. func WriteHTMLRpczPage(w io.Writer) {
  68. if err := headerTemplate.Execute(w, headerData{Title: "RPC Stats"}); err != nil {
  69. log.Printf("zpages: executing template: %v", err)
  70. }
  71. WriteHTMLRpczSummary(w)
  72. if err := footerTemplate.Execute(w, nil); err != nil {
  73. log.Printf("zpages: executing template: %v", err)
  74. }
  75. }
  76. // WriteHTMLRpczSummary writes HTML to w containing per-method RPC stats.
  77. //
  78. // It includes neither a header nor footer, so you can embed this data in other pages.
  79. func WriteHTMLRpczSummary(w io.Writer) {
  80. mu.Lock()
  81. if err := statsTemplate.Execute(w, getStatsPage()); err != nil {
  82. log.Printf("zpages: executing template: %v", err)
  83. }
  84. mu.Unlock()
  85. }
  86. // WriteTextRpczPage writes formatted text to w containing per-method RPC stats.
  87. func WriteTextRpczPage(w io.Writer) {
  88. mu.Lock()
  89. defer mu.Unlock()
  90. page := getStatsPage()
  91. for i, sg := range page.StatGroups {
  92. switch i {
  93. case 0:
  94. fmt.Fprint(w, "Sent:\n")
  95. case 1:
  96. fmt.Fprint(w, "\nReceived:\n")
  97. }
  98. tw := tabwriter.NewWriter(w, 6, 8, 1, ' ', 0)
  99. fmt.Fprint(tw, "Method\tCount\t\t\tAvgLat\t\t\tMaxLat\t\t\tRate\t\t\tIn (MiB/s)\t\t\tOut (MiB/s)\t\t\tErrors\t\t\n")
  100. fmt.Fprint(tw, "\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\tMin\tHr\tTot\n")
  101. for _, s := range sg.Snapshots {
  102. fmt.Fprintf(tw, "%s\t%d\t%d\t%d\t%v\t%v\t%v\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%.2f\t%d\t%d\t%d\n",
  103. s.Method,
  104. s.CountMinute,
  105. s.CountHour,
  106. s.CountTotal,
  107. s.AvgLatencyMinute,
  108. s.AvgLatencyHour,
  109. s.AvgLatencyTotal,
  110. s.RPCRateMinute,
  111. s.RPCRateHour,
  112. s.RPCRateTotal,
  113. s.InputRateMinute/bytesPerKb,
  114. s.InputRateHour/bytesPerKb,
  115. s.InputRateTotal/bytesPerKb,
  116. s.OutputRateMinute/bytesPerKb,
  117. s.OutputRateHour/bytesPerKb,
  118. s.OutputRateTotal/bytesPerKb,
  119. s.ErrorsMinute,
  120. s.ErrorsHour,
  121. s.ErrorsTotal)
  122. }
  123. tw.Flush()
  124. }
  125. }
  126. // headerData contains data for the header template.
  127. type headerData struct {
  128. Title string
  129. }
  130. // statsPage aggregates stats on the page for 'sent' and 'received' categories
  131. type statsPage struct {
  132. StatGroups []*statGroup
  133. }
  134. // statGroup aggregates snapshots for a directional category
  135. type statGroup struct {
  136. Direction string
  137. Snapshots []*statSnapshot
  138. }
  139. func (s *statGroup) Len() int {
  140. return len(s.Snapshots)
  141. }
  142. func (s *statGroup) Swap(i, j int) {
  143. s.Snapshots[i], s.Snapshots[j] = s.Snapshots[j], s.Snapshots[i]
  144. }
  145. func (s *statGroup) Less(i, j int) bool {
  146. return s.Snapshots[i].Method < s.Snapshots[j].Method
  147. }
  148. // statSnapshot holds the data items that are presented in a single row of RPC
  149. // stat information.
  150. type statSnapshot struct {
  151. // TODO: compute hour/minute values from cumulative
  152. Method string
  153. Received bool
  154. CountMinute uint64
  155. CountHour uint64
  156. CountTotal uint64
  157. AvgLatencyMinute time.Duration
  158. AvgLatencyHour time.Duration
  159. AvgLatencyTotal time.Duration
  160. RPCRateMinute float64
  161. RPCRateHour float64
  162. RPCRateTotal float64
  163. InputRateMinute float64
  164. InputRateHour float64
  165. InputRateTotal float64
  166. OutputRateMinute float64
  167. OutputRateHour float64
  168. OutputRateTotal float64
  169. ErrorsMinute uint64
  170. ErrorsHour uint64
  171. ErrorsTotal uint64
  172. }
  173. type methodKey struct {
  174. method string
  175. received bool
  176. }
  177. type snapExporter struct{}
  178. func (s snapExporter) ExportView(vd *view.Data) {
  179. received, ok := viewType[vd.View]
  180. if !ok {
  181. return
  182. }
  183. if len(vd.Rows) == 0 {
  184. return
  185. }
  186. ageSec := float64(time.Now().Sub(programStartTime)) / float64(time.Second)
  187. computeRate := func(maxSec, x float64) float64 {
  188. dur := ageSec
  189. if maxSec > 0 && dur > maxSec {
  190. dur = maxSec
  191. }
  192. return x / dur
  193. }
  194. convertTime := func(ms float64) time.Duration {
  195. if math.IsInf(ms, 0) || math.IsNaN(ms) {
  196. return 0
  197. }
  198. return time.Duration(float64(time.Millisecond) * ms)
  199. }
  200. haveResetErrors := make(map[string]struct{})
  201. mu.Lock()
  202. defer mu.Unlock()
  203. for _, row := range vd.Rows {
  204. var method string
  205. for _, tag := range row.Tags {
  206. if tag.Key == ocgrpc.KeyClientMethod || tag.Key == ocgrpc.KeyServerMethod {
  207. method = tag.Value
  208. break
  209. }
  210. }
  211. key := methodKey{method: method, received: received}
  212. s := snaps[key]
  213. if s == nil {
  214. s = &statSnapshot{Method: method, Received: received}
  215. snaps[key] = s
  216. }
  217. var (
  218. sum float64
  219. count float64
  220. )
  221. switch v := row.Data.(type) {
  222. case *view.CountData:
  223. sum = float64(v.Value)
  224. count = float64(v.Value)
  225. case *view.DistributionData:
  226. sum = v.Sum()
  227. count = float64(v.Count)
  228. case *view.SumData:
  229. sum = v.Value
  230. count = v.Value
  231. }
  232. // Update field of s corresponding to the view.
  233. switch vd.View {
  234. case ocgrpc.ClientCompletedRPCsView:
  235. if _, ok := haveResetErrors[method]; !ok {
  236. haveResetErrors[method] = struct{}{}
  237. s.ErrorsTotal = 0
  238. }
  239. for _, tag := range row.Tags {
  240. if tag.Key == ocgrpc.KeyClientStatus && tag.Value != "OK" {
  241. s.ErrorsTotal += uint64(count)
  242. }
  243. }
  244. case ocgrpc.ClientRoundtripLatencyView:
  245. s.AvgLatencyTotal = convertTime(sum / count)
  246. case ocgrpc.ClientSentBytesPerRPCView:
  247. s.OutputRateTotal = computeRate(0, sum)
  248. case ocgrpc.ClientReceivedBytesPerRPCView:
  249. s.InputRateTotal = computeRate(0, sum)
  250. case ocgrpc.ClientSentMessagesPerRPCView:
  251. s.CountTotal = uint64(count)
  252. s.RPCRateTotal = computeRate(0, count)
  253. case ocgrpc.ClientReceivedMessagesPerRPCView:
  254. // currently unused
  255. case ocgrpc.ServerCompletedRPCsView:
  256. if _, ok := haveResetErrors[method]; !ok {
  257. haveResetErrors[method] = struct{}{}
  258. s.ErrorsTotal = 0
  259. }
  260. for _, tag := range row.Tags {
  261. if tag.Key == ocgrpc.KeyServerStatus && tag.Value != "OK" {
  262. s.ErrorsTotal += uint64(count)
  263. }
  264. }
  265. case ocgrpc.ServerLatencyView:
  266. s.AvgLatencyTotal = convertTime(sum / count)
  267. case ocgrpc.ServerSentBytesPerRPCView:
  268. s.OutputRateTotal = computeRate(0, sum)
  269. case ocgrpc.ServerReceivedMessagesPerRPCView:
  270. s.CountTotal = uint64(count)
  271. s.RPCRateTotal = computeRate(0, count)
  272. case ocgrpc.ServerSentMessagesPerRPCView:
  273. // currently unused
  274. }
  275. }
  276. }
  277. func getStatsPage() *statsPage {
  278. sentStats := statGroup{Direction: "Sent"}
  279. receivedStats := statGroup{Direction: "Received"}
  280. for key, sg := range snaps {
  281. if key.received {
  282. receivedStats.Snapshots = append(receivedStats.Snapshots, sg)
  283. } else {
  284. sentStats.Snapshots = append(sentStats.Snapshots, sg)
  285. }
  286. }
  287. sort.Sort(&sentStats)
  288. sort.Sort(&receivedStats)
  289. return &statsPage{
  290. StatGroups: []*statGroup{&sentStats, &receivedStats},
  291. }
  292. }