reader-av.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package track
  2. import (
  3. "errors"
  4. "time"
  5. "go.uber.org/zap"
  6. "m7s.live/engine/v4/common"
  7. "m7s.live/engine/v4/log"
  8. )
  9. const (
  10. READSTATE_INIT = iota
  11. READSTATE_FIRST
  12. READSTATE_NORMAL
  13. )
  14. const (
  15. SUBMODE_REAL = iota
  16. SUBMODE_NOJUMP
  17. SUBMODE_BUFFER
  18. )
  19. var ErrDiscard = errors.New("discard")
  20. type AVRingReader struct {
  21. RingReader[any, *common.AVFrame]
  22. mode int
  23. Track *Media
  24. State byte
  25. FirstSeq uint32
  26. StartTs time.Duration
  27. FirstTs time.Duration
  28. SkipTs time.Duration //ms
  29. beforeJump time.Duration
  30. ConfSeq int
  31. startTime time.Time
  32. AbsTime uint32
  33. Delay uint32
  34. *log.Logger
  35. }
  36. func (r *AVRingReader) DecConfChanged() bool {
  37. return r.ConfSeq != r.Track.SequenceHeadSeq
  38. }
  39. func NewAVRingReader(t *Media) *AVRingReader {
  40. t.Debug("reader +1", zap.Int32("count", t.ReaderCount.Add(1)))
  41. return &AVRingReader{
  42. Track: t,
  43. }
  44. }
  45. func (r *AVRingReader) readFrame() (err error) {
  46. err = r.ReadNext()
  47. if err != nil {
  48. return err
  49. }
  50. // 超过一半的缓冲区大小,说明Reader太慢,需要丢帧
  51. if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Value.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Value.Sequence {
  52. r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Value.Sequence))
  53. return r.Read(r.Track.IDRing)
  54. }
  55. return
  56. }
  57. func (r *AVRingReader) ReadFrame(mode int) (err error) {
  58. r.mode = mode
  59. switch r.State {
  60. case READSTATE_INIT:
  61. r.Info("start read", zap.Int("mode", mode))
  62. startRing := r.Track.Ring
  63. if r.Track.IDRing != nil {
  64. startRing = r.Track.IDRing
  65. } else {
  66. r.Warn("no IDRring")
  67. }
  68. switch mode {
  69. case SUBMODE_REAL:
  70. if r.Track.IDRing != nil {
  71. r.State = READSTATE_FIRST
  72. } else {
  73. r.State = READSTATE_NORMAL
  74. }
  75. case SUBMODE_NOJUMP:
  76. r.State = READSTATE_NORMAL
  77. case SUBMODE_BUFFER:
  78. if r.Track.HistoryRing != nil {
  79. startRing = r.Track.HistoryRing
  80. }
  81. r.State = READSTATE_NORMAL
  82. }
  83. if err = r.StartRead(startRing); err != nil {
  84. return
  85. }
  86. r.startTime = time.Now()
  87. if r.FirstTs == 0 {
  88. r.FirstTs = r.Value.Timestamp
  89. }
  90. r.SkipTs = r.FirstTs - r.StartTs
  91. r.FirstSeq = r.Value.Sequence
  92. r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
  93. case READSTATE_FIRST:
  94. if r.Track.IDRing.Value.Sequence != r.FirstSeq {
  95. if err = r.Read(r.Track.IDRing); err != nil {
  96. return
  97. }
  98. r.SkipTs = r.Value.Timestamp - r.beforeJump - r.StartTs - 10*time.Millisecond
  99. r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs))
  100. r.State = READSTATE_NORMAL
  101. } else {
  102. if err = r.readFrame(); err != nil {
  103. return
  104. }
  105. r.beforeJump = r.Value.Timestamp - r.FirstTs
  106. // 防止过快消费
  107. if fast := r.beforeJump - time.Since(r.startTime); fast > 0 && fast < time.Second {
  108. time.Sleep(fast)
  109. }
  110. }
  111. case READSTATE_NORMAL:
  112. if err = r.readFrame(); err != nil {
  113. return
  114. }
  115. }
  116. r.AbsTime = uint32((r.Value.Timestamp - r.SkipTs).Milliseconds())
  117. if r.AbsTime == 0 {
  118. r.AbsTime = 1
  119. }
  120. // r.Delay = uint32((r.Track.LastValue.Timestamp - r.Value.Timestamp).Milliseconds())
  121. r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence)
  122. // fmt.Println(r.Track.Name, r.Delay)
  123. // fmt.Println(r.Track.Name, r.Value.Sequence, r.Delay, r.AbsTime)
  124. return
  125. }
  126. func (r *AVRingReader) GetPTS32() uint32 {
  127. return uint32((r.Value.PTS - r.SkipTs*90/time.Millisecond))
  128. }
  129. func (r *AVRingReader) GetDTS32() uint32 {
  130. return uint32((r.Value.DTS - r.SkipTs*90/time.Millisecond))
  131. }
  132. func (r *AVRingReader) ResetAbsTime() {
  133. r.SkipTs = r.Value.Timestamp
  134. r.AbsTime = 1
  135. }