circonus-gometrics.go 9.3 KB


  1. // Copyright 2016 Circonus, Inc. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package circonusgometrics provides instrumentation for your applications in the form
  5. // of counters, gauges and histograms and allows you to publish them to
  6. // Circonus
  7. //
  8. // Counters
  9. //
  10. // A counter is a monotonically-increasing, unsigned, 64-bit integer used to
  11. // represent the number of times an event has occurred. By tracking the deltas
  12. // between measurements of a counter over intervals of time, an aggregation
  13. // layer can derive rates, acceleration, etc.
  14. //
  15. // Gauges
  16. //
  17. // A gauge returns instantaneous measurements of something using signed, 64-bit
  18. // integers. This value does not need to be monotonic.
  19. //
  20. // Histograms
  21. //
  22. // A histogram tracks the distribution of a stream of values (e.g. the number of
  23. // seconds it takes to handle requests). Circonus can calculate complex
  24. // analytics on these.
  25. //
  26. // Reporting
  27. //
  28. // A period push to a Circonus httptrap is confgurable.
  29. package circonusgometrics
  30. import (
  31. "bufio"
  32. "bytes"
  33. "fmt"
  34. "io/ioutil"
  35. "log"
  36. "os"
  37. "strconv"
  38. "strings"
  39. "sync"
  40. "time"
  41. "github.com/circonus-labs/circonus-gometrics/api"
  42. "github.com/circonus-labs/circonus-gometrics/checkmgr"
  43. "github.com/pkg/errors"
  44. )
  45. const (
  46. defaultFlushInterval = "10s" // 10 * time.Second
  47. )
  48. // Metric defines an individual metric
  49. type Metric struct {
  50. Type string `json:"_type"`
  51. Value interface{} `json:"_value"`
  52. }
  53. // Metrics holds host metrics
  54. type Metrics map[string]Metric
  55. // Config options for circonus-gometrics
  56. type Config struct {
  57. Log *log.Logger
  58. Debug bool
  59. ResetCounters string // reset/delete counters on flush (default true)
  60. ResetGauges string // reset/delete gauges on flush (default true)
  61. ResetHistograms string // reset/delete histograms on flush (default true)
  62. ResetText string // reset/delete text on flush (default true)
  63. // API, Check and Broker configuration options
  64. CheckManager checkmgr.Config
  65. // how frequenly to submit metrics to Circonus, default 10 seconds.
  66. // Set to 0 to disable automatic flushes and call Flush manually.
  67. Interval string
  68. }
  69. type prevMetrics struct {
  70. metrics *Metrics
  71. metricsmu sync.Mutex
  72. ts time.Time
  73. }
  74. // CirconusMetrics state
  75. type CirconusMetrics struct {
  76. Log *log.Logger
  77. Debug bool
  78. resetCounters bool
  79. resetGauges bool
  80. resetHistograms bool
  81. resetText bool
  82. flushInterval time.Duration
  83. flushing bool
  84. flushmu sync.Mutex
  85. packagingmu sync.Mutex
  86. check *checkmgr.CheckManager
  87. lastMetrics *prevMetrics
  88. counters map[string]uint64
  89. cm sync.Mutex
  90. counterFuncs map[string]func() uint64
  91. cfm sync.Mutex
  92. gauges map[string]interface{}
  93. gm sync.Mutex
  94. gaugeFuncs map[string]func() int64
  95. gfm sync.Mutex
  96. histograms map[string]*Histogram
  97. hm sync.Mutex
  98. text map[string]string
  99. tm sync.Mutex
  100. textFuncs map[string]func() string
  101. tfm sync.Mutex
  102. }
  103. // NewCirconusMetrics returns a CirconusMetrics instance
  104. func NewCirconusMetrics(cfg *Config) (*CirconusMetrics, error) {
  105. return New(cfg)
  106. }
  107. // New returns a CirconusMetrics instance
  108. func New(cfg *Config) (*CirconusMetrics, error) {
  109. if cfg == nil {
  110. return nil, errors.New("invalid configuration (nil)")
  111. }
  112. cm := &CirconusMetrics{
  113. counters: make(map[string]uint64),
  114. counterFuncs: make(map[string]func() uint64),
  115. gauges: make(map[string]interface{}),
  116. gaugeFuncs: make(map[string]func() int64),
  117. histograms: make(map[string]*Histogram),
  118. text: make(map[string]string),
  119. textFuncs: make(map[string]func() string),
  120. lastMetrics: &prevMetrics{},
  121. }
  122. // Logging
  123. {
  124. cm.Debug = cfg.Debug
  125. cm.Log = cfg.Log
  126. if cm.Debug && cm.Log == nil {
  127. cm.Log = log.New(os.Stderr, "", log.LstdFlags)
  128. }
  129. if cm.Log == nil {
  130. cm.Log = log.New(ioutil.Discard, "", log.LstdFlags)
  131. }
  132. }
  133. // Flush Interval
  134. {
  135. fi := defaultFlushInterval
  136. if cfg.Interval != "" {
  137. fi = cfg.Interval
  138. }
  139. dur, err := time.ParseDuration(fi)
  140. if err != nil {
  141. return nil, errors.Wrap(err, "parsing flush interval")
  142. }
  143. cm.flushInterval = dur
  144. }
  145. // metric resets
  146. cm.resetCounters = true
  147. if cfg.ResetCounters != "" {
  148. setting, err := strconv.ParseBool(cfg.ResetCounters)
  149. if err != nil {
  150. return nil, errors.Wrap(err, "parsing reset counters")
  151. }
  152. cm.resetCounters = setting
  153. }
  154. cm.resetGauges = true
  155. if cfg.ResetGauges != "" {
  156. setting, err := strconv.ParseBool(cfg.ResetGauges)
  157. if err != nil {
  158. return nil, errors.Wrap(err, "parsing reset gauges")
  159. }
  160. cm.resetGauges = setting
  161. }
  162. cm.resetHistograms = true
  163. if cfg.ResetHistograms != "" {
  164. setting, err := strconv.ParseBool(cfg.ResetHistograms)
  165. if err != nil {
  166. return nil, errors.Wrap(err, "parsing reset histograms")
  167. }
  168. cm.resetHistograms = setting
  169. }
  170. cm.resetText = true
  171. if cfg.ResetText != "" {
  172. setting, err := strconv.ParseBool(cfg.ResetText)
  173. if err != nil {
  174. return nil, errors.Wrap(err, "parsing reset text")
  175. }
  176. cm.resetText = setting
  177. }
  178. // check manager
  179. {
  180. cfg.CheckManager.Debug = cm.Debug
  181. cfg.CheckManager.Log = cm.Log
  182. check, err := checkmgr.New(&cfg.CheckManager)
  183. if err != nil {
  184. return nil, errors.Wrap(err, "creating new check manager")
  185. }
  186. cm.check = check
  187. }
  188. // start background initialization
  189. cm.check.Initialize()
  190. // if automatic flush is enabled, start it.
  191. // NOTE: submit will jettison metrics until initialization has completed.
  192. if cm.flushInterval > time.Duration(0) {
  193. go func() {
  194. for range time.NewTicker(cm.flushInterval).C {
  195. cm.Flush()
  196. }
  197. }()
  198. }
  199. return cm, nil
  200. }
  201. // Start deprecated NOP, automatic flush is started in New if flush interval > 0.
  202. func (m *CirconusMetrics) Start() {
  203. // nop
  204. }
  205. // Ready returns true or false indicating if the check is ready to accept metrics
  206. func (m *CirconusMetrics) Ready() bool {
  207. return m.check.IsReady()
  208. }
  209. func (m *CirconusMetrics) packageMetrics() (map[string]*api.CheckBundleMetric, Metrics) {
  210. m.packagingmu.Lock()
  211. defer m.packagingmu.Unlock()
  212. if m.Debug {
  213. m.Log.Println("[DEBUG] Packaging metrics")
  214. }
  215. counters, gauges, histograms, text := m.snapshot()
  216. newMetrics := make(map[string]*api.CheckBundleMetric)
  217. output := make(Metrics, len(counters)+len(gauges)+len(histograms)+len(text))
  218. for name, value := range counters {
  219. send := m.check.IsMetricActive(name)
  220. if !send && m.check.ActivateMetric(name) {
  221. send = true
  222. newMetrics[name] = &api.CheckBundleMetric{
  223. Name: name,
  224. Type: "numeric",
  225. Status: "active",
  226. }
  227. }
  228. if send {
  229. output[name] = Metric{Type: "L", Value: value}
  230. }
  231. }
  232. for name, value := range gauges {
  233. send := m.check.IsMetricActive(name)
  234. if !send && m.check.ActivateMetric(name) {
  235. send = true
  236. newMetrics[name] = &api.CheckBundleMetric{
  237. Name: name,
  238. Type: "numeric",
  239. Status: "active",
  240. }
  241. }
  242. if send {
  243. output[name] = Metric{Type: m.getGaugeType(value), Value: value}
  244. }
  245. }
  246. for name, value := range histograms {
  247. send := m.check.IsMetricActive(name)
  248. if !send && m.check.ActivateMetric(name) {
  249. send = true
  250. newMetrics[name] = &api.CheckBundleMetric{
  251. Name: name,
  252. Type: "histogram",
  253. Status: "active",
  254. }
  255. }
  256. if send {
  257. output[name] = Metric{Type: "n", Value: value.DecStrings()}
  258. }
  259. }
  260. for name, value := range text {
  261. send := m.check.IsMetricActive(name)
  262. if !send && m.check.ActivateMetric(name) {
  263. send = true
  264. newMetrics[name] = &api.CheckBundleMetric{
  265. Name: name,
  266. Type: "text",
  267. Status: "active",
  268. }
  269. }
  270. if send {
  271. output[name] = Metric{Type: "s", Value: value}
  272. }
  273. }
  274. m.lastMetrics.metricsmu.Lock()
  275. defer m.lastMetrics.metricsmu.Unlock()
  276. m.lastMetrics.metrics = &output
  277. m.lastMetrics.ts = time.Now()
  278. return newMetrics, output
  279. }
  280. // PromOutput returns lines of metrics in prom format
  281. func (m *CirconusMetrics) PromOutput() (*bytes.Buffer, error) {
  282. m.lastMetrics.metricsmu.Lock()
  283. defer m.lastMetrics.metricsmu.Unlock()
  284. if m.lastMetrics.metrics == nil {
  285. return nil, errors.New("no metrics available")
  286. }
  287. var b bytes.Buffer
  288. w := bufio.NewWriter(&b)
  289. ts := m.lastMetrics.ts.UnixNano() / int64(time.Millisecond)
  290. for name, metric := range *m.lastMetrics.metrics {
  291. switch metric.Type {
  292. case "n":
  293. if strings.HasPrefix(fmt.Sprintf("%v", metric.Value), "[H[") {
  294. continue // circonus histogram != prom "histogram" (aka percentile)
  295. }
  296. case "s":
  297. continue // text metrics unsupported
  298. }
  299. fmt.Fprintf(w, "%s %v %d\n", name, metric.Value, ts)
  300. }
  301. err := w.Flush()
  302. if err != nil {
  303. return nil, errors.Wrap(err, "flushing metric buffer")
  304. }
  305. return &b, err
  306. }
  307. // FlushMetrics flushes current metrics to a structure and returns it (does NOT send to Circonus)
  308. func (m *CirconusMetrics) FlushMetrics() *Metrics {
  309. m.flushmu.Lock()
  310. if m.flushing {
  311. m.flushmu.Unlock()
  312. return &Metrics{}
  313. }
  314. m.flushing = true
  315. m.flushmu.Unlock()
  316. _, output := m.packageMetrics()
  317. m.flushmu.Lock()
  318. m.flushing = false
  319. m.flushmu.Unlock()
  320. return &output
  321. }
  322. // Flush metrics kicks off the process of sending metrics to Circonus
  323. func (m *CirconusMetrics) Flush() {
  324. m.flushmu.Lock()
  325. if m.flushing {
  326. m.flushmu.Unlock()
  327. return
  328. }
  329. m.flushing = true
  330. m.flushmu.Unlock()
  331. newMetrics, output := m.packageMetrics()
  332. if len(output) > 0 {
  333. m.submit(output, newMetrics)
  334. } else {
  335. if m.Debug {
  336. m.Log.Println("[DEBUG] No metrics to send, skipping")
  337. }
  338. }
  339. m.flushmu.Lock()
  340. m.flushing = false
  341. m.flushmu.Unlock()
  342. }