video.go 7.8 KB


  1. package track
  2. import (
  3. "io"
  4. "time"
  5. "github.com/pion/rtp"
  6. "go.uber.org/zap"
  7. "m7s.live/engine/v4/codec"
  8. "m7s.live/engine/v4/common"
  9. . "m7s.live/engine/v4/common"
  10. "m7s.live/engine/v4/util"
  11. )
  12. type Video struct {
  13. Media
  14. CodecID codec.VideoCodecID
  15. GOP int //关键帧间隔
  16. nalulenSize int //avcc格式中表示nalu长度的字节数,通常为4
  17. dcChanged bool //解码器配置是否改变了,一般由于变码率导致
  18. dtsEst *DTSEstimator
  19. lostFlag bool // 是否丢帧
  20. codec.SPSInfo
  21. ParamaterSets `json:"-" yaml:"-"`
  22. SPS []byte `json:"-" yaml:"-"`
  23. PPS []byte `json:"-" yaml:"-"`
  24. SEIReader *DataReader[[]byte] `json:"-" yaml:"-"`
  25. }
  26. func (v *Video) Attach() {
  27. if v.Attached.CompareAndSwap(false, true) {
  28. v.Info("attach video track", zap.Uint("width", v.Width), zap.Uint("height", v.Height))
  29. if err := v.Stream.AddTrack(v).Await(); err != nil {
  30. v.Error("attach video track failed", zap.Error(err))
  31. v.Attached.Store(false)
  32. } else {
  33. v.Info("video track attached", zap.Uint("width", v.Width), zap.Uint("height", v.Height))
  34. }
  35. }
  36. }
  37. func (v *Video) Detach() {
  38. if v.Attached.CompareAndSwap(true, false) {
  39. v.Stream.RemoveTrack(v)
  40. }
  41. }
  42. func (vt *Video) GetName() string {
  43. if vt.Name == "" {
  44. return vt.CodecID.String()
  45. }
  46. return vt.Name
  47. }
  48. // PlayFullAnnexB 订阅annex-b格式的流数据,每一个I帧增加sps、pps头
  49. // func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) error) error {
  50. // for vr := vt.ReadRing(); ctx.Err() == nil; vr.MoveNext() {
  51. // vp := vr.Read(ctx)
  52. // var data net.Buffers
  53. // if vp.IFrame {
  54. // data = vt.GetAnnexB()
  55. // }
  56. // data = append(data, codec.NALU_Delimiter2)
  57. // for slice := vp.AUList.Head; slice != nil; slice = slice.Next {
  58. // data = append(data, slice.ToBuffers()...)
  59. // if slice.Next != nil {
  60. // data = append(data, codec.NALU_Delimiter1)
  61. // }
  62. // }
  63. // if err := onMedia(data); err != nil {
  64. // // TODO: log err
  65. // return err
  66. // }
  67. // }
  68. // return ctx.Err()
  69. // }
  70. func (vt *Video) computeGOP() {
  71. if vt.IDRing != nil {
  72. vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence)
  73. if vt.HistoryRing == nil {
  74. vt.narrow(vt.GOP)
  75. }
  76. }
  77. vt.AddIDR()
  78. // var n int
  79. // for i := 0; i < len(vt.BytesPool); i++ {
  80. // n += vt.BytesPool[i].Length
  81. // }
  82. // println(n)
  83. }
  84. func (vt *Video) writeAnnexBSlice(nalu []byte) {
  85. common.SplitAnnexB(nalu, vt.WriteSliceBytes, codec.NALU_Delimiter1)
  86. }
  87. func (vt *Video) WriteNalu(pts uint32, dts uint32, nalu []byte) {
  88. if dts == 0 {
  89. vt.generateTimestamp(pts)
  90. } else {
  91. vt.Value.PTS = time.Duration(pts)
  92. vt.Value.DTS = time.Duration(dts)
  93. }
  94. vt.Value.BytesIn += len(nalu)
  95. vt.WriteSliceBytes(nalu)
  96. vt.Flush()
  97. }
  98. func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) {
  99. if dts == 0 {
  100. vt.generateTimestamp(pts)
  101. } else {
  102. vt.Value.PTS = time.Duration(pts)
  103. vt.Value.DTS = time.Duration(dts)
  104. }
  105. vt.Value.BytesIn += len(frame)
  106. common.SplitAnnexB(frame, vt.writeAnnexBSlice, codec.NALU_Delimiter2)
  107. if vt.Value.AUList.ByteLength > 0 {
  108. vt.Flush()
  109. }
  110. }
  111. func (vt *Video) writeAVCCFrame(ts uint32, r *util.BLLReader, frame *util.BLL) (err error) {
  112. var cts uint32
  113. cts, err = r.ReadBE(3)
  114. if err != nil {
  115. return err
  116. }
  117. vt.Value.PTS = time.Duration(ts+cts) * 90
  118. vt.Value.DTS = time.Duration(ts) * 90
  119. var nalulen uint32
  120. for nalulen, err = r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) {
  121. if remain := frame.ByteLength - r.GetOffset(); remain < int(nalulen) {
  122. vt.Error("read nalu length error", zap.Int("nalulen", int(nalulen)), zap.Int("remain", remain))
  123. frame.Recycle()
  124. vt.Value.Reset()
  125. return
  126. }
  127. vt.AppendAuBytes(r.ReadN(int(nalulen))...)
  128. }
  129. return nil
  130. }
  131. func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
  132. if l := frame.ByteLength; l < 6 {
  133. vt.Error("AVCC data too short", zap.Int("len", l))
  134. return io.ErrShortWrite
  135. }
  136. // bbb := util.Buffer(frame.ToBytes()[5:])
  137. r := frame.NewReader()
  138. b, _ := r.ReadByte()
  139. isExtHeader := (b >> 4) & 0b1000
  140. frameType := (b >> 4) & 0b0111
  141. vt.Value.IFrame = frameType == 1 || frameType == 4
  142. packetType := b & 0b1111
  143. if isExtHeader != 0 {
  144. r.ReadBE(4) // fourcc
  145. switch packetType {
  146. case codec.PacketTypeSequenceStart:
  147. err = vt.SpesificTrack.WriteSequenceHead(frame.ToBytes())
  148. frame.Recycle()
  149. return
  150. case codec.PacketTypeCodedFrames:
  151. err = vt.SpesificTrack.writeAVCCFrame(ts, r, frame)
  152. case codec.PacketTypeCodedFramesX:
  153. }
  154. } else {
  155. b, _ = r.ReadByte() //sequence frame flag
  156. if b == 0 {
  157. err = vt.SpesificTrack.WriteSequenceHead(frame.ToBytes())
  158. frame.Recycle()
  159. return
  160. }
  161. err = vt.SpesificTrack.writeAVCCFrame(ts, r, frame)
  162. }
  163. if err == nil {
  164. vt.Value.WriteAVCC(ts, frame)
  165. vt.Flush()
  166. }
  167. return
  168. }
  169. func (vt *Video) WriteSliceByte(b ...byte) {
  170. // fmt.Println("write slice byte", b)
  171. vt.WriteSliceBytes(b)
  172. }
  173. // 在I帧前面插入sps pps webrtc需要
  174. func (vt *Video) insertDCRtp() {
  175. head := vt.Value.RTP.Next
  176. for _, nalu := range vt.ParamaterSets {
  177. var packet rtp.Packet
  178. packet.Version = 2
  179. packet.PayloadType = vt.PayloadType
  180. packet.Payload = nalu
  181. packet.SSRC = vt.SSRC
  182. packet.Timestamp = uint32(vt.Value.PTS)
  183. packet.Marker = false
  184. head.InsertBeforeValue(RTPFrame{Packet: &packet})
  185. }
  186. }
  187. func (vt *Video) generateTimestamp(ts uint32) {
  188. if vt.State == TrackStateOffline {
  189. vt.dtsEst = NewDTSEstimator()
  190. }
  191. vt.Value.PTS = time.Duration(ts)
  192. vt.Value.DTS = time.Duration(vt.dtsEst.Feed(ts))
  193. }
  194. func (vt *Video) SetLostFlag() {
  195. vt.lostFlag = true
  196. }
  197. func (vt *Video) CompleteAVCC(rv *AVFrame) {
  198. mem := vt.BytesPool.Get(5)
  199. b := mem.Value
  200. if rv.IFrame {
  201. b[0] = 0x10 | byte(vt.CodecID)
  202. } else {
  203. b[0] = 0x20 | byte(vt.CodecID)
  204. }
  205. b[1] = 1
  206. // println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS)
  207. // 写入CTS
  208. util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90)
  209. rv.AVCC.Push(mem)
  210. // if rv.AVCC.ByteLength != 5 {
  211. // panic("error")
  212. // }
  213. // var tmp = 0
  214. rv.AUList.Range(func(au *util.BLL) bool {
  215. mem = vt.BytesPool.Get(4)
  216. // println(au.ByteLength)
  217. util.PutBE(mem.Value, uint32(au.ByteLength))
  218. rv.AVCC.Push(mem)
  219. au.Range(func(slice util.Buffer) bool {
  220. rv.AVCC.Push(vt.BytesPool.GetShell(slice))
  221. return true
  222. })
  223. // tmp += 4 + au.ByteLength
  224. // if rv.AVCC.ByteLength != 5+tmp {
  225. // panic("error")
  226. // }
  227. return true
  228. })
  229. }
  230. func (vt *Video) Flush() {
  231. rv := vt.Value
  232. if vt.SEIReader != nil {
  233. if seiFrame, err := vt.SEIReader.TryRead(); seiFrame != nil {
  234. var au util.BLL
  235. au.Push(vt.SpesificTrack.GetNALU_SEI())
  236. au.Push(vt.BytesPool.GetShell(seiFrame.Data))
  237. vt.Value.AUList.UnshiftValue(&au)
  238. } else if err != nil {
  239. vt.SEIReader = nil
  240. }
  241. }
  242. if rv.IFrame {
  243. vt.computeGOP()
  244. vt.Stream.SetIDR(vt)
  245. }
  246. if !vt.Attached.Load() {
  247. if vt.IDRing != nil && vt.SequenceHeadSeq > 0 {
  248. defer vt.Attach()
  249. } else {
  250. rv.Reset()
  251. return
  252. }
  253. }
  254. if vt.lostFlag {
  255. if rv.IFrame {
  256. vt.lostFlag = false
  257. } else {
  258. rv.Reset()
  259. return
  260. }
  261. }
  262. vt.Media.Flush()
  263. vt.dcChanged = false
  264. }
  265. func (vt *Video) WriteSequenceHead(sh []byte) {
  266. vt.Media.WriteSequenceHead(sh)
  267. vt.dcChanged = true
  268. }
  269. /*
  270. Access Unit的首个nalu是4字节起始码。
  271. 这里举个例子说明,用JM可以生成这样一段码流(不要使用JM8.6,它在这部分与标准不符),这个码流可以见本楼附件:
  272. SPS (4字节头)
  273. PPS (4字节头)
  274. SEI (4字节头)
  275. I0(slice0) (4字节头)
  276. I0(slice1) (3字节头)
  277. P1(slice0) (4字节头)
  278. P1(slice1) (3字节头)
  279. P2(slice0) (4字节头)
  280. P2(slice1) (3字节头)
  281. I0(slice0)是序列第一帧(I帧)的第一个slice,是当前Access Unit的首个nalu,所以是4字节头。而I0(slice1)表示第一帧的第二个slice,所以是3字节头。P1(slice0) 、P1(slice1)同理。
  282. */