ring-writer.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package util
  2. import (
  3. "sync/atomic"
  4. )
  5. type emptyLocker struct{}
  6. func (emptyLocker) Lock() {}
  7. func (emptyLocker) Unlock() {}
  8. var EmptyLocker emptyLocker
  9. type IDataFrame[T any] interface {
  10. Init() // 初始化
  11. Reset() // 重置数据,复用内存
  12. Ready() // 标记为可读取
  13. ReaderEnter() int32 // 读取者数量+1
  14. ReaderLeave() int32 // 读取者数量-1
  15. StartWrite() bool // 开始写入
  16. SetSequence(uint32) // 设置序号
  17. GetSequence() uint32 // 获取序号
  18. ReaderCount() int32 // 读取者数量
  19. Discard() int32 // 如果写入时还有读取者没有离开则废弃该帧,剥离RingBuffer,防止并发读写
  20. IsDiscarded() bool // 是否已废弃
  21. IsWriting() bool // 是否正在写入
  22. Wait() // 阻塞等待可读取
  23. Broadcast() // 广播可读取
  24. }
  25. type RingWriter[T any, F IDataFrame[T]] struct {
  26. *Ring[F] `json:"-" yaml:"-"`
  27. ReaderCount atomic.Int32 `json:"-" yaml:"-"`
  28. pool *Ring[F]
  29. poolSize int
  30. Size int
  31. LastValue F
  32. constructor func() F
  33. }
  34. func (rb *RingWriter[T, F]) create(n int) (ring *Ring[F]) {
  35. ring = NewRing[F](n)
  36. for p, i := ring, n; i > 0; p, i = p.Next(), i-1 {
  37. p.Value = rb.constructor()
  38. p.Value.Init()
  39. }
  40. return
  41. }
  42. func (rb *RingWriter[T, F]) Init(n int, constructor func() F) *RingWriter[T, F] {
  43. rb.constructor = constructor
  44. rb.Ring = rb.create(n)
  45. rb.Size = n
  46. rb.LastValue = rb.Value
  47. return rb
  48. }
  49. // func (rb *RingBuffer[T, F]) MoveNext() F {
  50. // rb.LastValue = rb.Value
  51. // rb.Ring = rb.Next()
  52. // return rb.Value
  53. // }
  54. func (rb *RingWriter[T, F]) Glow(size int) (newItem *Ring[F]) {
  55. if size < rb.poolSize {
  56. newItem = rb.pool.Unlink(size)
  57. rb.poolSize -= size
  58. } else if size == rb.poolSize {
  59. newItem = rb.pool
  60. rb.poolSize = 0
  61. rb.pool = nil
  62. } else {
  63. newItem = rb.create(size - rb.poolSize).Link(rb.pool)
  64. rb.poolSize = 0
  65. rb.pool = nil
  66. }
  67. rb.Link(newItem)
  68. rb.Size += size
  69. return
  70. }
  71. func (rb *RingWriter[T, F]) Recycle(r *Ring[F]) {
  72. rb.poolSize++
  73. r.Value.Init()
  74. r.Value.Reset()
  75. if rb.pool == nil {
  76. rb.pool = r
  77. } else {
  78. rb.pool.Link(r)
  79. }
  80. }
  81. func (rb *RingWriter[T, F]) Reduce(size int) {
  82. r := rb.Unlink(size)
  83. if size > 1 {
  84. for p := r.Next(); p != r; {
  85. next := p.Next() //先保存下一个节点
  86. if p.Value.Discard() == 0 {
  87. rb.Recycle(p.Prev().Unlink(1))
  88. } else {
  89. // fmt.Println("Reduce", p.Value.ReaderCount())
  90. }
  91. p = next
  92. }
  93. }
  94. if r.Value.Discard() == 0 {
  95. rb.Recycle(r)
  96. }
  97. rb.Size -= size
  98. return
  99. }
  100. func (rb *RingWriter[T, F]) Step() (normal bool) {
  101. rb.LastValue.Broadcast() // 防止订阅者还在等待
  102. rb.LastValue = rb.Value
  103. nextSeq := rb.LastValue.GetSequence() + 1
  104. next := rb.Next()
  105. if normal = next.Value.StartWrite(); normal {
  106. next.Value.Reset()
  107. rb.Ring = next
  108. } else {
  109. rb.Reduce(1) //抛弃还有订阅者的节点
  110. rb.Ring = rb.Glow(1) //补充一个新节点
  111. rb.Value.StartWrite()
  112. }
  113. rb.Value.SetSequence(nextSeq)
  114. rb.LastValue.Ready()
  115. return
  116. }
  117. func (rb *RingWriter[T, F]) GetReaderCount() int32 {
  118. return rb.ReaderCount.Load()
  119. }