123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- package track
- import (
- "context"
- "sync"
- "time"
- "go.uber.org/zap"
- . "m7s.live/engine/v4/common"
- "m7s.live/engine/v4/log"
- "m7s.live/engine/v4/util"
- )
- type Data[T any] struct {
- Base[T, *DataFrame[T]]
- sync.Locker `json:"-" yaml:"-"` // 写入锁,可选,单一协程写入可以不加锁
- }
- func (dt *Data[T]) Init(n int) {
- dt.Base.Init(n, NewDataFrame[T])
- }
- func (dt *Data[T]) Push(data T) {
- if dt.Locker != nil {
- dt.Lock()
- defer dt.Unlock()
- }
- curValue := dt.Value
- if log.Trace {
- dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
- }
- curValue.Data = data
- dt.Step()
- }
- func (d *Data[T]) Play(ctx context.Context, onData func(*DataFrame[T]) error) (err error) {
- d.Debug("play data track")
- reader := DataReader[T]{}
- for err = reader.StartRead(d.Ring); err == nil; err = reader.ReadNext() {
- if log.Trace {
- d.Trace("read data", zap.Uint32("sequence", reader.Value.Sequence))
- }
- if err = onData(reader.Value); err == nil {
- err = ctx.Err()
- }
- if err != nil {
- reader.Value.ReaderLeave()
- return
- }
- }
- return
- }
- func (d *Data[T]) Attach(s IStream) {
- d.SetStuff(s)
- if err := s.AddTrack(d).Await(); err != nil {
- d.Error("attach data track failed", zap.Error(err))
- } else {
- d.Info("data track attached")
- }
- }
- func (d *Data[T]) LastWriteTime() time.Time {
- return d.LastValue.WriteTime
- }
- func NewDataTrack[T any](name string) (dt *Data[T]) {
- dt = &Data[T]{}
- dt.Init(10)
- dt.SetStuff(name)
- return
- }
- type RecycleData[T util.Recyclable] struct {
- Data[T]
- }
- func (dt *RecycleData[T]) Push(data T) {
- if dt.Locker != nil {
- dt.Lock()
- defer dt.Unlock()
- }
- curValue := dt.Value
- if log.Trace {
- dt.Trace("push data", zap.Uint32("sequence", curValue.Sequence))
- }
- curValue.Data = data
- dt.Step()
- if !dt.Value.WriteTime.IsZero() {
- dt.Value.Data.Recycle()
- }
- }
- func NewRecycleDataTrack[T util.Recyclable](name string) (dt *RecycleData[T]) {
- dt = &RecycleData[T]{}
- dt.Init(10)
- dt.SetStuff(name)
- return
- }
- type BytesData struct {
- RecycleData[*util.ListItem[util.Buffer]]
- Pool util.BytesPool
- }
- func NewBytesDataTrack(name string) (dt *BytesData) {
- dt = &BytesData{
- Pool: make(util.BytesPool, 17),
- }
- dt.Init(10)
- dt.SetStuff(name)
- return
- }
|