ring-writer.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package common
  2. import (
  3. "sync/atomic"
  4. "m7s.live/engine/v4/util"
  5. )
  6. type emptyLocker struct{}
  7. func (emptyLocker) Lock() {}
  8. func (emptyLocker) Unlock() {}
  9. var EmptyLocker emptyLocker
  10. type RingWriter[T any, F IDataFrame[T]] struct {
  11. *util.Ring[F] `json:"-" yaml:"-"`
  12. ReaderCount atomic.Int32 `json:"-" yaml:"-"`
  13. pool *util.Ring[F]
  14. poolSize int
  15. Size int
  16. LastValue F
  17. constructor func() F
  18. }
  19. func (rb *RingWriter[T, F]) create(n int) (ring *util.Ring[F]) {
  20. ring = util.NewRing[F](n)
  21. for p, i := ring, n; i > 0; p, i = p.Next(), i-1 {
  22. p.Value = rb.constructor()
  23. p.Value.Init()
  24. }
  25. return
  26. }
  27. func (rb *RingWriter[T, F]) Init(n int, constructor func() F) *RingWriter[T, F] {
  28. rb.constructor = constructor
  29. rb.Ring = rb.create(n)
  30. rb.Size = n
  31. rb.LastValue = rb.Value
  32. return rb
  33. }
  34. // func (rb *RingBuffer[T, F]) MoveNext() F {
  35. // rb.LastValue = rb.Value
  36. // rb.Ring = rb.Next()
  37. // return rb.Value
  38. // }
  39. func (rb *RingWriter[T, F]) Glow(size int) (newItem *util.Ring[F]) {
  40. if size < rb.poolSize {
  41. newItem = rb.pool.Unlink(size)
  42. rb.poolSize -= size
  43. } else if size == rb.poolSize {
  44. newItem = rb.pool
  45. rb.poolSize = 0
  46. rb.pool = nil
  47. } else {
  48. newItem = rb.create(size - rb.poolSize).Link(rb.pool)
  49. rb.poolSize = 0
  50. rb.pool = nil
  51. }
  52. rb.Link(newItem)
  53. rb.Size += size
  54. return
  55. }
  56. func (rb *RingWriter[T, F]) Recycle(r *util.Ring[F]) {
  57. rb.poolSize++
  58. r.Value.Init()
  59. r.Value.Reset()
  60. if rb.pool == nil {
  61. rb.pool = r
  62. } else {
  63. rb.pool.Link(r)
  64. }
  65. }
  66. func (rb *RingWriter[T, F]) Reduce(size int) {
  67. r := rb.Unlink(size)
  68. if size > 1 {
  69. for p := r.Next(); p != r; {
  70. next := p.Next() //先保存下一个节点
  71. if p.Value.Discard() == 0 {
  72. rb.Recycle(p.Prev().Unlink(1))
  73. } else {
  74. // fmt.Println("Reduce", p.Value.ReaderCount())
  75. }
  76. p = next
  77. }
  78. }
  79. if r.Value.Discard() == 0 {
  80. rb.Recycle(r)
  81. }
  82. rb.Size -= size
  83. return
  84. }
  85. func (rb *RingWriter[T, F]) Step() (normal bool) {
  86. rb.LastValue.Broadcast() // 防止订阅者还在等待
  87. rb.LastValue = rb.Value
  88. nextSeq := rb.LastValue.GetSequence() + 1
  89. next := rb.Next()
  90. if normal = next.Value.StartWrite(); normal {
  91. next.Value.Reset()
  92. rb.Ring = next
  93. } else {
  94. rb.Reduce(1) //抛弃还有订阅者的节点
  95. rb.Ring = rb.Glow(1) //补充一个新节点
  96. rb.Value.StartWrite()
  97. }
  98. rb.Value.SetSequence(nextSeq)
  99. rb.LastValue.Ready()
  100. return
  101. }
  102. func (rb *RingWriter[T, F]) GetReaderCount() int32 {
  103. return rb.ReaderCount.Load()
  104. }