frame.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package common
  2. import (
  3. "bytes"
  4. "io"
  5. "net"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. "github.com/pion/rtp"
  10. "m7s.live/engine/v4/codec"
  11. "m7s.live/engine/v4/log"
  12. "m7s.live/engine/v4/util"
  13. )
  14. func SplitAnnexB[T ~[]byte](frame T, process func(T), delimiter []byte) {
  15. for after := frame; len(frame) > 0; frame = after {
  16. if frame, after, _ = bytes.Cut(frame, delimiter); len(frame) > 0 {
  17. process(frame)
  18. }
  19. }
  20. }
  21. type RTPFrame struct {
  22. *rtp.Packet
  23. Raw []byte
  24. }
  25. func (r *RTPFrame) H264Type() (naluType codec.H264NALUType) {
  26. return naluType.Parse(r.Payload[0])
  27. }
  28. func (r *RTPFrame) H265Type() (naluType codec.H265NALUType) {
  29. return naluType.Parse(r.Payload[0])
  30. }
  31. func (r *RTPFrame) Unmarshal(raw []byte) *RTPFrame {
  32. if r.Packet == nil {
  33. r.Packet = &rtp.Packet{}
  34. }
  35. if err := r.Packet.Unmarshal(raw); err != nil {
  36. log.Error(err)
  37. return nil
  38. }
  39. return r
  40. }
  41. type IDataFrame[T any] interface {
  42. Init() // 初始化
  43. Reset() // 重置数据,复用内存
  44. Ready() // 标记为可读取
  45. ReaderEnter() int32 // 读取者数量+1
  46. ReaderLeave() int32 // 读取者数量-1
  47. StartWrite() bool // 开始写入
  48. SetSequence(uint32) // 设置序号
  49. GetSequence() uint32 // 获取序号
  50. ReaderCount() int32 // 读取者数量
  51. Discard() int32 // 如果写入时还有读取者没有离开则废弃该帧,剥离RingBuffer,防止并发读写
  52. IsDiscarded() bool // 是否已废弃
  53. IsWriting() bool // 是否正在写入
  54. Wait() // 阻塞等待可读取
  55. Broadcast() // 广播可读取
  56. }
  57. type DataFrame[T any] struct {
  58. DeltaTime uint32 // 相对上一帧时间戳,毫秒
  59. WriteTime time.Time // 写入时间,可用于比较两个帧的先后
  60. Sequence uint32 // 在一个Track中的序号
  61. BytesIn int // 输入字节数用于计算BPS
  62. CanRead bool `json:"-" yaml:"-"` // 是否可读取
  63. readerCount atomic.Int32 `json:"-" yaml:"-"` // 读取者数量
  64. Data T `json:"-" yaml:"-"`
  65. sync.Cond `json:"-" yaml:"-"`
  66. }
  67. func NewDataFrame[T any]() *DataFrame[T] {
  68. return &DataFrame[T]{}
  69. }
  70. func (df *DataFrame[T]) IsWriting() bool {
  71. return !df.CanRead
  72. }
  73. func (df *DataFrame[T]) IsDiscarded() bool {
  74. return df.L == nil
  75. }
  76. func (df *DataFrame[T]) Discard() int32 {
  77. df.L = nil //标记为废弃
  78. return df.readerCount.Load()
  79. }
  80. func (df *DataFrame[T]) SetSequence(sequence uint32) {
  81. df.Sequence = sequence
  82. }
  83. func (df *DataFrame[T]) GetSequence() uint32 {
  84. return df.Sequence
  85. }
  86. func (df *DataFrame[T]) ReaderEnter() int32 {
  87. return df.readerCount.Add(1)
  88. }
  89. func (df *DataFrame[T]) ReaderCount() int32 {
  90. return df.readerCount.Load()
  91. }
  92. func (df *DataFrame[T]) ReaderLeave() int32 {
  93. return df.readerCount.Add(-1)
  94. }
  95. func (df *DataFrame[T]) StartWrite() bool {
  96. if df.readerCount.Load() > 0 {
  97. df.Discard() //标记为废弃
  98. return false
  99. } else {
  100. df.CanRead = false //标记为正在写入
  101. return true
  102. }
  103. }
  104. func (df *DataFrame[T]) Ready() {
  105. df.WriteTime = time.Now()
  106. df.CanRead = true //标记为可读取
  107. df.Broadcast()
  108. }
  109. func (df *DataFrame[T]) Init() {
  110. df.L = EmptyLocker
  111. }
  112. func (df *DataFrame[T]) Reset() {
  113. df.BytesIn = 0
  114. df.DeltaTime = 0
  115. }
  116. type AVFrame struct {
  117. DataFrame[any]
  118. IFrame bool
  119. PTS time.Duration
  120. DTS time.Duration
  121. Timestamp time.Duration // 绝对时间戳
  122. ADTS *util.ListItem[util.Buffer] `json:"-" yaml:"-"` // ADTS头
  123. AVCC util.BLL `json:"-" yaml:"-"` // 打包好的AVCC格式(MPEG-4格式、Byte-Stream Format)
  124. RTP util.List[RTPFrame] `json:"-" yaml:"-"`
  125. AUList util.BLLs `json:"-" yaml:"-"` // 裸数据
  126. }
  127. func NewAVFrame() *AVFrame {
  128. return &AVFrame{}
  129. }
  130. func (av *AVFrame) WriteAVCC(ts uint32, frame *util.BLL) {
  131. if ts == 0 {
  132. ts = 1
  133. }
  134. av.Timestamp = time.Duration(ts) * time.Millisecond
  135. av.BytesIn += frame.ByteLength
  136. for {
  137. item := frame.Shift()
  138. if item == nil {
  139. break
  140. }
  141. av.AVCC.Push(item)
  142. }
  143. // frame.Transfer(&av.AVCC)
  144. // frame.ByteLength = 0
  145. }
  146. // Reset 重置数据,复用内存
  147. func (av *AVFrame) Reset() {
  148. av.RTP.Recycle()
  149. av.AVCC.Recycle()
  150. av.AUList.Recycle()
  151. if av.ADTS != nil {
  152. av.ADTS.Recycle()
  153. av.ADTS = nil
  154. }
  155. av.Timestamp = 0
  156. av.IFrame = false
  157. av.DataFrame.Reset()
  158. }
  159. type ParamaterSets [][]byte
  160. func (v ParamaterSets) GetAnnexB() (r net.Buffers) {
  161. for _, v := range v {
  162. r = append(r, codec.NALU_Delimiter2, v)
  163. }
  164. return
  165. }
  166. func (v ParamaterSets) WriteAnnexBTo(w io.Writer) (n int, err error) {
  167. var n1, n2 int
  168. for _, v := range v {
  169. if n1, err = w.Write(codec.NALU_Delimiter2); err != nil {
  170. return
  171. }
  172. n += n1
  173. if n2, err = w.Write(v); err != nil {
  174. return
  175. }
  176. n += n2
  177. }
  178. return
  179. }