spanstore.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright 2017, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package trace
  15. import (
  16. "sync"
  17. "time"
  18. "go.opencensus.io/internal"
  19. )
  20. const (
  21. maxBucketSize = 100000
  22. defaultBucketSize = 10
  23. )
  24. var (
  25. ssmu sync.RWMutex // protects spanStores
  26. spanStores = make(map[string]*spanStore)
  27. )
  28. // This exists purely to avoid exposing internal methods used by z-Pages externally.
  29. type internalOnly struct{}
  30. func init() {
  31. //TODO(#412): remove
  32. internal.Trace = &internalOnly{}
  33. }
  34. // ReportActiveSpans returns the active spans for the given name.
  35. func (i internalOnly) ReportActiveSpans(name string) []*SpanData {
  36. s := spanStoreForName(name)
  37. if s == nil {
  38. return nil
  39. }
  40. var out []*SpanData
  41. s.mu.Lock()
  42. defer s.mu.Unlock()
  43. for span := range s.active {
  44. out = append(out, span.makeSpanData())
  45. }
  46. return out
  47. }
  48. // ReportSpansByError returns a sample of error spans.
  49. //
  50. // If code is nonzero, only spans with that status code are returned.
  51. func (i internalOnly) ReportSpansByError(name string, code int32) []*SpanData {
  52. s := spanStoreForName(name)
  53. if s == nil {
  54. return nil
  55. }
  56. var out []*SpanData
  57. s.mu.Lock()
  58. defer s.mu.Unlock()
  59. if code != 0 {
  60. if b, ok := s.errors[code]; ok {
  61. for _, sd := range b.buffer {
  62. if sd == nil {
  63. break
  64. }
  65. out = append(out, sd)
  66. }
  67. }
  68. } else {
  69. for _, b := range s.errors {
  70. for _, sd := range b.buffer {
  71. if sd == nil {
  72. break
  73. }
  74. out = append(out, sd)
  75. }
  76. }
  77. }
  78. return out
  79. }
  80. // ConfigureBucketSizes sets the number of spans to keep per latency and error
  81. // bucket for different span names.
  82. func (i internalOnly) ConfigureBucketSizes(bcs []internal.BucketConfiguration) {
  83. for _, bc := range bcs {
  84. latencyBucketSize := bc.MaxRequestsSucceeded
  85. if latencyBucketSize < 0 {
  86. latencyBucketSize = 0
  87. }
  88. if latencyBucketSize > maxBucketSize {
  89. latencyBucketSize = maxBucketSize
  90. }
  91. errorBucketSize := bc.MaxRequestsErrors
  92. if errorBucketSize < 0 {
  93. errorBucketSize = 0
  94. }
  95. if errorBucketSize > maxBucketSize {
  96. errorBucketSize = maxBucketSize
  97. }
  98. spanStoreSetSize(bc.Name, latencyBucketSize, errorBucketSize)
  99. }
  100. }
  101. // ReportSpansPerMethod returns a summary of what spans are being stored for each span name.
  102. func (i internalOnly) ReportSpansPerMethod() map[string]internal.PerMethodSummary {
  103. out := make(map[string]internal.PerMethodSummary)
  104. ssmu.RLock()
  105. defer ssmu.RUnlock()
  106. for name, s := range spanStores {
  107. s.mu.Lock()
  108. p := internal.PerMethodSummary{
  109. Active: len(s.active),
  110. }
  111. for code, b := range s.errors {
  112. p.ErrorBuckets = append(p.ErrorBuckets, internal.ErrorBucketSummary{
  113. ErrorCode: code,
  114. Size: b.size(),
  115. })
  116. }
  117. for i, b := range s.latency {
  118. min, max := latencyBucketBounds(i)
  119. p.LatencyBuckets = append(p.LatencyBuckets, internal.LatencyBucketSummary{
  120. MinLatency: min,
  121. MaxLatency: max,
  122. Size: b.size(),
  123. })
  124. }
  125. s.mu.Unlock()
  126. out[name] = p
  127. }
  128. return out
  129. }
  130. // ReportSpansByLatency returns a sample of successful spans.
  131. //
  132. // minLatency is the minimum latency of spans to be returned.
  133. // maxLatency, if nonzero, is the maximum latency of spans to be returned.
  134. func (i internalOnly) ReportSpansByLatency(name string, minLatency, maxLatency time.Duration) []*SpanData {
  135. s := spanStoreForName(name)
  136. if s == nil {
  137. return nil
  138. }
  139. var out []*SpanData
  140. s.mu.Lock()
  141. defer s.mu.Unlock()
  142. for i, b := range s.latency {
  143. min, max := latencyBucketBounds(i)
  144. if i+1 != len(s.latency) && max <= minLatency {
  145. continue
  146. }
  147. if maxLatency != 0 && maxLatency < min {
  148. continue
  149. }
  150. for _, sd := range b.buffer {
  151. if sd == nil {
  152. break
  153. }
  154. if minLatency != 0 || maxLatency != 0 {
  155. d := sd.EndTime.Sub(sd.StartTime)
  156. if d < minLatency {
  157. continue
  158. }
  159. if maxLatency != 0 && d > maxLatency {
  160. continue
  161. }
  162. }
  163. out = append(out, sd)
  164. }
  165. }
  166. return out
  167. }
  168. // spanStore keeps track of spans stored for a particular span name.
  169. //
  170. // It contains all active spans; a sample of spans for failed requests,
  171. // categorized by error code; and a sample of spans for successful requests,
  172. // bucketed by latency.
  173. type spanStore struct {
  174. mu sync.Mutex // protects everything below.
  175. active map[*Span]struct{}
  176. errors map[int32]*bucket
  177. latency []bucket
  178. maxSpansPerErrorBucket int
  179. }
  180. // newSpanStore creates a span store.
  181. func newSpanStore(name string, latencyBucketSize int, errorBucketSize int) *spanStore {
  182. s := &spanStore{
  183. active: make(map[*Span]struct{}),
  184. latency: make([]bucket, len(defaultLatencies)+1),
  185. maxSpansPerErrorBucket: errorBucketSize,
  186. }
  187. for i := range s.latency {
  188. s.latency[i] = makeBucket(latencyBucketSize)
  189. }
  190. return s
  191. }
  192. // spanStoreForName returns the spanStore for the given name.
  193. //
  194. // It returns nil if it doesn't exist.
  195. func spanStoreForName(name string) *spanStore {
  196. var s *spanStore
  197. ssmu.RLock()
  198. s, _ = spanStores[name]
  199. ssmu.RUnlock()
  200. return s
  201. }
  202. // spanStoreForNameCreateIfNew returns the spanStore for the given name.
  203. //
  204. // It creates it if it didn't exist.
  205. func spanStoreForNameCreateIfNew(name string) *spanStore {
  206. ssmu.RLock()
  207. s, ok := spanStores[name]
  208. ssmu.RUnlock()
  209. if ok {
  210. return s
  211. }
  212. ssmu.Lock()
  213. defer ssmu.Unlock()
  214. s, ok = spanStores[name]
  215. if ok {
  216. return s
  217. }
  218. s = newSpanStore(name, defaultBucketSize, defaultBucketSize)
  219. spanStores[name] = s
  220. return s
  221. }
  222. // spanStoreSetSize resizes the spanStore for the given name.
  223. //
  224. // It creates it if it didn't exist.
  225. func spanStoreSetSize(name string, latencyBucketSize int, errorBucketSize int) {
  226. ssmu.RLock()
  227. s, ok := spanStores[name]
  228. ssmu.RUnlock()
  229. if ok {
  230. s.resize(latencyBucketSize, errorBucketSize)
  231. return
  232. }
  233. ssmu.Lock()
  234. defer ssmu.Unlock()
  235. s, ok = spanStores[name]
  236. if ok {
  237. s.resize(latencyBucketSize, errorBucketSize)
  238. return
  239. }
  240. s = newSpanStore(name, latencyBucketSize, errorBucketSize)
  241. spanStores[name] = s
  242. }
  243. func (s *spanStore) resize(latencyBucketSize int, errorBucketSize int) {
  244. s.mu.Lock()
  245. for i := range s.latency {
  246. s.latency[i].resize(latencyBucketSize)
  247. }
  248. for _, b := range s.errors {
  249. b.resize(errorBucketSize)
  250. }
  251. s.maxSpansPerErrorBucket = errorBucketSize
  252. s.mu.Unlock()
  253. }
  254. // add adds a span to the active bucket of the spanStore.
  255. func (s *spanStore) add(span *Span) {
  256. s.mu.Lock()
  257. s.active[span] = struct{}{}
  258. s.mu.Unlock()
  259. }
  260. // finished removes a span from the active set, and adds a corresponding
  261. // SpanData to a latency or error bucket.
  262. func (s *spanStore) finished(span *Span, sd *SpanData) {
  263. latency := sd.EndTime.Sub(sd.StartTime)
  264. if latency < 0 {
  265. latency = 0
  266. }
  267. code := sd.Status.Code
  268. s.mu.Lock()
  269. delete(s.active, span)
  270. if code == 0 {
  271. s.latency[latencyBucket(latency)].add(sd)
  272. } else {
  273. if s.errors == nil {
  274. s.errors = make(map[int32]*bucket)
  275. }
  276. if b := s.errors[code]; b != nil {
  277. b.add(sd)
  278. } else {
  279. b := makeBucket(s.maxSpansPerErrorBucket)
  280. s.errors[code] = &b
  281. b.add(sd)
  282. }
  283. }
  284. s.mu.Unlock()
  285. }