example_instrumentation_test.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package redis_test
  2. import (
  3. "fmt"
  4. "sync/atomic"
  5. "time"
  6. redis "gopkg.in/redis.v5"
  7. )
  8. func Example_instrumentation() {
  9. ring := redis.NewRing(&redis.RingOptions{
  10. Addrs: map[string]string{
  11. "shard1": ":6379",
  12. },
  13. })
  14. ring.ForEachShard(func(client *redis.Client) error {
  15. wrapRedisProcess(client)
  16. return nil
  17. })
  18. for {
  19. ring.Ping()
  20. }
  21. }
  22. func wrapRedisProcess(client *redis.Client) {
  23. const precision = time.Microsecond
  24. var count, avgDur uint32
  25. go func() {
  26. for _ = range time.Tick(3 * time.Second) {
  27. n := atomic.LoadUint32(&count)
  28. dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision
  29. fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur)
  30. }
  31. }()
  32. client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error {
  33. return func(cmd redis.Cmder) error {
  34. start := time.Now()
  35. err := oldProcess(cmd)
  36. dur := time.Since(start)
  37. const decay = float64(1) / 100
  38. ms := float64(dur / precision)
  39. for {
  40. avg := atomic.LoadUint32(&avgDur)
  41. newAvg := uint32((1-decay)*float64(avg) + decay*ms)
  42. if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) {
  43. break
  44. }
  45. }
  46. atomic.AddUint32(&count, 1)
  47. return err
  48. }
  49. })
  50. }