data.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package track
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "go.uber.org/zap"
  7. . "m7s.live/engine/v4/common"
  8. "m7s.live/engine/v4/log"
  9. "m7s.live/engine/v4/util"
  10. )
  11. type Data[T any] struct {
  12. Base[T, *DataFrame[T]]
  13. sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁
  14. }
  15. func (dt *Data[T]) Init(n int) {
  16. dt.Base.Init(n, NewDataFrame[T])
  17. }
  18. func (dt *Data[T]) Push(data T) {
  19. if dt.Locker != nil {
  20. dt.Lock()
  21. defer dt.Unlock()
  22. }
  23. curValue := dt.Value
  24. if log.Trace {
  25. dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
  26. }
  27. curValue.Data = data
  28. dt.Step()
  29. }
  30. func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) {
  31. d.Debug("play data track")
  32. reader := DataReader[T]{}
  33. for err = reader.StartRead(d.Ring); err == nil; err = reader.ReadNext() {
  34. if log.Trace {
  35. d.Trace("read data", zap.Uint32("sequence", reader.Value.Sequence))
  36. }
  37. if err = onData(reader.Value); err == nil {
  38. err = ctx.Err()
  39. }
  40. if err != nil {
  41. reader.Value.ReaderLeave()
  42. return
  43. }
  44. }
  45. return
  46. }
  47. func (d *Data[T]) Attach(s IStream) {
  48. d.SetStuff(s)
  49. if err := s.AddTrack(d).Await(); err != nil {
  50. d.Error("attach data track failed", zap.Error(err))
  51. } else {
  52. d.Info("data track attached")
  53. }
  54. }
  55. func (d *Data[T]) LastWriteTime() time.Time {
  56. return d.LastValue.WriteTime
  57. }
  58. func NewDataTrack[T any](name string) (dt *Data[T]) {
  59. dt = &Data[T]{}
  60. dt.Init(10)
  61. dt.SetStuff(name)
  62. return
  63. }
  64. type RecycleData[T util.Recyclable] struct {
  65. Data[T]
  66. }
  67. func (dt *RecycleData[T]) Push(data T) {
  68. if dt.Locker != nil {
  69. dt.Lock()
  70. defer dt.Unlock()
  71. }
  72. curValue := dt.Value
  73. if log.Trace {
  74. dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
  75. }
  76. curValue.Data = data
  77. dt.Step()
  78. if !dt.Value.WriteTime.IsZero() {
  79. dt.Value.Data.Recycle()
  80. }
  81. }
  82. func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) {
  83. dt = &RecycleData[T]{}
  84. dt.Init(10)
  85. dt.SetStuff(name)
  86. return
  87. }
  88. type BytesData struct {
  89. RecycleData[*util.ListItem[util.Buffer]]
  90. Pool util.BytesPool
  91. }
  92. func NewBytesDataTrack(name string) (dt *BytesData) {
  93. dt = &BytesData{
  94. Pool: make(util.BytesPool, 17),
  95. }
  96. dt.Init(10)
  97. dt.SetStuff(name)
  98. return
  99. }