reader-av.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package track
  2. import (
  3. "errors"
  4. "time"
  5. "fmt"
  6. "go.uber.org/zap"
  7. "m7s.live/engine/v4/common"
  8. "m7s.live/engine/v4/log"
  9. )
  10. const (
  11. READSTATE_INIT = iota
  12. READSTATE_FIRST
  13. READSTATE_NORMAL
  14. )
  15. const (
  16. SUBMODE_REAL = iota
  17. SUBMODE_NOJUMP
  18. SUBMODE_BUFFER
  19. )
  20. var ErrDiscard = errors.New("discard")
  21. type AVRingReader struct {
  22. RingReader[any, *common.AVFrame]
  23. mode int
  24. Track *Media
  25. State byte
  26. FirstSeq uint32
  27. StartTs time.Duration
  28. FirstTs time.Duration
  29. SkipTs time.Duration //ms
  30. beforeJump time.Duration
  31. ConfSeq int
  32. startTime time.Time
  33. AbsTime uint32
  34. Delay uint32
  35. *log.Logger
  36. }
  37. func (r *AVRingReader) DecConfChanged() bool {
  38. return r.ConfSeq != r.Track.SequenceHeadSeq
  39. }
  40. func NewAVRingReader(t *Media) *AVRingReader {
  41. t.Debug("reader +1", zap.Int32("count", t.ReaderCount.Add(1)))
  42. return &AVRingReader{
  43. Track: t,
  44. }
  45. }
  46. func (r *AVRingReader) readFrame() (err error) {
  47. err = r.ReadNext()
  48. if err != nil {
  49. return err
  50. }
  51. // 超过一半的缓冲区大小,说明Reader太慢,需要丢帧
  52. if r.mode != SUBMODE_BUFFER && r.State == READSTATE_NORMAL && r.Track.LastValue.Sequence-r.Value.Sequence > uint32(r.Track.Size/2) && r.Track.IDRing != nil && r.Track.IDRing.Value.Sequence > r.Value.Sequence {
  53. r.Warn("reader too slow", zap.Uint32("lastSeq", r.Track.LastValue.Sequence), zap.Uint32("seq", r.Value.Sequence))
  54. return r.Read(r.Track.IDRing)
  55. }
  56. return
  57. }
  58. func (r *AVRingReader) ReadFrame(mode int) (err error) {
  59. // TODO 抓取panic
  60. defer func() {
  61. if r := recover(); r != nil {
  62. err = fmt.Errorf("ReadFrame Panic")
  63. }
  64. }()
  65. r.mode = mode
  66. switch r.State {
  67. case READSTATE_INIT:
  68. r.Info("start read", zap.Int("mode", mode))
  69. startRing := r.Track.Ring
  70. if r.Track.IDRing != nil {
  71. startRing = r.Track.IDRing
  72. } else {
  73. r.Warn("no IDRring")
  74. }
  75. switch mode {
  76. case SUBMODE_REAL:
  77. if r.Track.IDRing != nil {
  78. r.State = READSTATE_FIRST
  79. } else {
  80. r.State = READSTATE_NORMAL
  81. }
  82. case SUBMODE_NOJUMP:
  83. r.State = READSTATE_NORMAL
  84. case SUBMODE_BUFFER:
  85. if r.Track.HistoryRing != nil {
  86. startRing = r.Track.HistoryRing
  87. }
  88. r.State = READSTATE_NORMAL
  89. }
  90. if err = r.StartRead(startRing); err != nil {
  91. return
  92. }
  93. r.startTime = time.Now()
  94. if r.FirstTs == 0 {
  95. r.FirstTs = r.Value.Timestamp
  96. }
  97. r.SkipTs = r.FirstTs - r.StartTs
  98. r.FirstSeq = r.Value.Sequence
  99. r.Info("first frame read", zap.Duration("firstTs", r.FirstTs), zap.Uint32("firstSeq", r.FirstSeq))
  100. case READSTATE_FIRST:
  101. if r.Track.IDRing.Value.Sequence != r.FirstSeq {
  102. if err = r.Read(r.Track.IDRing); err != nil {
  103. return
  104. }
  105. r.SkipTs = r.Value.Timestamp - r.beforeJump - r.StartTs - 10*time.Millisecond
  106. r.Info("jump", zap.Uint32("skipSeq", r.Track.IDRing.Value.Sequence-r.FirstSeq), zap.Duration("skipTs", r.SkipTs))
  107. r.State = READSTATE_NORMAL
  108. } else {
  109. if err = r.readFrame(); err != nil {
  110. return
  111. }
  112. r.beforeJump = r.Value.Timestamp - r.FirstTs
  113. // 防止过快消费
  114. if fast := r.beforeJump - time.Since(r.startTime); fast > 0 && fast < time.Second {
  115. time.Sleep(fast)
  116. }
  117. }
  118. case READSTATE_NORMAL:
  119. if err = r.readFrame(); err != nil {
  120. return
  121. }
  122. }
  123. r.AbsTime = uint32((r.Value.Timestamp - r.SkipTs).Milliseconds())
  124. if r.AbsTime == 0 {
  125. r.AbsTime = 1
  126. }
  127. // r.Delay = uint32((r.Track.LastValue.Timestamp - r.Value.Timestamp).Milliseconds())
  128. r.Delay = uint32(r.Track.LastValue.Sequence - r.Value.Sequence)
  129. // fmt.Println(r.Track.Name, r.Delay)
  130. // fmt.Println(r.Track.Name, r.State, r.Value.Timestamp, r.SkipTs, r.AbsTime)
  131. return
  132. }
  133. func (r *AVRingReader) GetPTS32() uint32 {
  134. return uint32((r.Value.PTS - r.SkipTs*90/time.Millisecond))
  135. }
  136. func (r *AVRingReader) GetDTS32() uint32 {
  137. return uint32((r.Value.DTS - r.SkipTs*90/time.Millisecond))
  138. }
  139. func (r *AVRingReader) ResetAbsTime() {
  140. r.SkipTs = r.Value.Timestamp
  141. r.AbsTime = 1
  142. }