bufferpool.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package utils
  2. import (
  3. "bytes"
  4. "sort"
  5. "sync"
  6. "sync/atomic"
  7. )
  8. const (
  9. minBitSize = 6 // 2**6=64 is a CPU cache line size
  10. steps = 20
  11. minSize = 1 << minBitSize
  12. maxSize = 1 << (minBitSize + steps - 1)
  13. calibrateCallsThreshold = 42000
  14. maxPercentile = 0.95
  15. )
  16. // Pool represents byte buffer pool.
  17. //
  18. // Distinct pools may be used for distinct types of byte buffers.
  19. // Properly determined byte buffer types with their own pools may help reducing
  20. // memory waste.
  21. type Pool struct {
  22. calls [steps]uint64
  23. calibrating uint64
  24. defaultSize uint64
  25. maxSize uint64
  26. pool sync.Pool
  27. }
  28. var defaultPool Pool
  29. // Get returns an empty byte buffer from the pool.
  30. //
  31. // Got byte buffer may be returned to the pool via Put call.
  32. // This reduces the number of memory allocations required for byte buffer
  33. // management.
  34. func Get() *bytes.Buffer { return defaultPool.Get() }
  35. // Get returns new byte buffer with zero length.
  36. //
  37. // The byte buffer may be returned to the pool via Put after the use
  38. // in order to minimize GC overhead.
  39. func (p *Pool) Get() *bytes.Buffer {
  40. v := p.pool.Get()
  41. if v != nil {
  42. return v.(*bytes.Buffer)
  43. }
  44. return bytes.NewBuffer(make([]byte, 0, atomic.LoadUint64(&p.defaultSize)))
  45. }
  46. // Put returns byte buffer to the pool.
  47. //
  48. // bytes.Buffer.B mustn't be touched after returning it to the pool.
  49. // Otherwise data races will occur.
  50. func Put(b *bytes.Buffer) { b.Reset(); defaultPool.Put(b) }
  51. // Put releases byte buffer obtained via Get to the pool.
  52. //
  53. // The buffer mustn't be accessed after returning to the pool.
  54. func (p *Pool) Put(b *bytes.Buffer) {
  55. idx := index(b.Len())
  56. if atomic.AddUint64(&p.calls[idx], 1) > calibrateCallsThreshold {
  57. p.calibrate()
  58. }
  59. maxSize := int(atomic.LoadUint64(&p.maxSize))
  60. if maxSize == 0 || b.Cap() <= maxSize {
  61. b.Reset()
  62. p.pool.Put(b)
  63. }
  64. }
  65. func (p *Pool) calibrate() {
  66. if !atomic.CompareAndSwapUint64(&p.calibrating, 0, 1) {
  67. return
  68. }
  69. a := make(callSizes, 0, steps)
  70. var callsSum uint64
  71. for i := uint64(0); i < steps; i++ {
  72. calls := atomic.SwapUint64(&p.calls[i], 0)
  73. callsSum += calls
  74. a = append(a, callSize{
  75. calls: calls,
  76. size: minSize << i,
  77. })
  78. }
  79. sort.Sort(a)
  80. defaultSize := a[0].size
  81. maxSize := defaultSize
  82. maxSum := uint64(float64(callsSum) * maxPercentile)
  83. callsSum = 0
  84. for i := 0; i < steps; i++ {
  85. if callsSum > maxSum {
  86. break
  87. }
  88. callsSum += a[i].calls
  89. size := a[i].size
  90. if size > maxSize {
  91. maxSize = size
  92. }
  93. }
  94. atomic.StoreUint64(&p.defaultSize, defaultSize)
  95. atomic.StoreUint64(&p.maxSize, maxSize)
  96. atomic.StoreUint64(&p.calibrating, 0)
  97. }
  98. type callSize struct {
  99. calls uint64
  100. size uint64
  101. }
  102. type callSizes []callSize
  103. func (ci callSizes) Len() int {
  104. return len(ci)
  105. }
  106. func (ci callSizes) Less(i, j int) bool {
  107. return ci[i].calls > ci[j].calls
  108. }
  109. func (ci callSizes) Swap(i, j int) {
  110. ci[i], ci[j] = ci[j], ci[i]
  111. }
  112. func index(n int) int {
  113. n--
  114. n >>= minBitSize
  115. idx := 0
  116. for n > 0 {
  117. n >>= 1
  118. idx++
  119. }
  120. if idx >= steps {
  121. idx = steps - 1
  122. }
  123. return idx
  124. }