reader-data.go 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package track
  2. import (
  3. "m7s.live/engine/v4/common"
  4. "m7s.live/engine/v4/util"
  5. )
  6. type RingReader[T any, F common.IDataFrame[T]] struct {
  7. *util.Ring[F]
  8. Count int // 读取的帧数
  9. }
  10. func (r *RingReader[T, F]) StartRead(ring *util.Ring[F]) (err error) {
  11. r.Ring = ring
  12. if r.Value.IsDiscarded() {
  13. return ErrDiscard
  14. }
  15. if r.Value.IsWriting() {
  16. // t := time.Now()
  17. r.Value.Wait()
  18. // log.Info("wait", time.Since(t))
  19. }
  20. r.Count++
  21. r.Value.ReaderEnter()
  22. return
  23. }
  24. func (r *RingReader[T, F]) TryRead() (f F, err error) {
  25. if r.Count > 0 {
  26. preValue := r.Value
  27. if preValue.IsDiscarded() {
  28. preValue.ReaderLeave()
  29. err = ErrDiscard
  30. return
  31. }
  32. if r.Next().Value.IsWriting() {
  33. return
  34. }
  35. defer preValue.ReaderLeave()
  36. r.Ring = r.Next()
  37. } else {
  38. if r.Value.IsWriting() {
  39. return
  40. }
  41. }
  42. if r.Value.IsDiscarded() {
  43. err = ErrDiscard
  44. return
  45. }
  46. r.Count++
  47. f = r.Value
  48. r.Value.ReaderEnter()
  49. return
  50. }
  51. func (r *RingReader[T, F]) ReadNext() (err error) {
  52. return r.Read(r.Next())
  53. }
  54. func (r *RingReader[T, F]) Read(ring *util.Ring[F]) (err error) {
  55. preValue := r.Value
  56. defer preValue.ReaderLeave()
  57. if preValue.IsDiscarded() {
  58. return ErrDiscard
  59. }
  60. return r.StartRead(ring)
  61. }
  62. type DataReader[T any] struct {
  63. RingReader[T, *common.DataFrame[T]]
  64. }