video.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package track
  2. import (
  3. "io"
  4. "time"
  5. //"fmt"
  6. "github.com/pion/rtp"
  7. "go.uber.org/zap"
  8. "m7s.live/engine/v4/codec"
  9. "m7s.live/engine/v4/common"
  10. . "m7s.live/engine/v4/common"
  11. "m7s.live/engine/v4/util"
  12. )
  13. type Video struct {
  14. Media
  15. CodecID codec.VideoCodecID
  16. GOP int //关键帧间隔
  17. nalulenSize int //avcc格式中表示nalu长度的字节数,通常为4
  18. dcChanged bool //解码器配置是否改变了,一般由于变码率导致
  19. dtsEst *DTSEstimator
  20. lostFlag bool // 是否丢帧
  21. codec.SPSInfo
  22. ParamaterSets `json:"-" yaml:"-"`
  23. SPS []byte `json:"-" yaml:"-"`
  24. PPS []byte `json:"-" yaml:"-"`
  25. SEIReader *DataReader[[]byte] `json:"-" yaml:"-"`
  26. }
  27. func (v *Video) Attach() {
  28. //fmt.Println("VIDEO ATTACH 111111111111111111111111111111111111111111111111111")
  29. if v.Attached.CompareAndSwap(false, true) {
  30. v.Info("attach video track", zap.Uint("width", v.Width), zap.Uint("height", v.Height))
  31. if err := v.Stream.AddTrack(v).Await(); err != nil {
  32. v.Error("attach video track failed", zap.Error(err))
  33. v.Attached.Store(false)
  34. } else {
  35. v.Info("video track attached", zap.Uint("width", v.Width), zap.Uint("height", v.Height))
  36. }
  37. }
  38. }
  39. func (v *Video) Detach() {
  40. if v.Attached.CompareAndSwap(true, false) {
  41. v.Stream.RemoveTrack(v)
  42. }
  43. }
  44. func (vt *Video) GetName() string {
  45. if vt.Name == "" {
  46. return vt.CodecID.String()
  47. }
  48. return vt.Name
  49. }
  50. // PlayFullAnnexB 订阅annex-b格式的流数据,每一个I帧增加sps、pps头
  51. // func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) error) error {
  52. // for vr := vt.ReadRing(); ctx.Err() == nil; vr.MoveNext() {
  53. // vp := vr.Read(ctx)
  54. // var data net.Buffers
  55. // if vp.IFrame {
  56. // data = vt.GetAnnexB()
  57. // }
  58. // data = append(data, codec.NALU_Delimiter2)
  59. // for slice := vp.AUList.Head; slice != nil; slice = slice.Next {
  60. // data = append(data, slice.ToBuffers()...)
  61. // if slice.Next != nil {
  62. // data = append(data, codec.NALU_Delimiter1)
  63. // }
  64. // }
  65. // if err := onMedia(data); err != nil {
  66. // // TODO: log err
  67. // return err
  68. // }
  69. // }
  70. // return ctx.Err()
  71. // }
  72. func (vt *Video) computeGOP() {
  73. if vt.IDRing != nil {
  74. vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence)
  75. if vt.HistoryRing == nil {
  76. vt.narrow(vt.GOP)
  77. }
  78. }
  79. vt.AddIDR()
  80. // var n int
  81. // for i := 0; i < len(vt.BytesPool); i++ {
  82. // n += vt.BytesPool[i].Length
  83. // }
  84. // println(n)
  85. }
  86. func (vt *Video) writeAnnexBSlice(nalu []byte) {
  87. common.SplitAnnexB(nalu, vt.WriteSliceBytes, codec.NALU_Delimiter1)
  88. }
  89. func (vt *Video) WriteNalu(pts uint32, dts uint32, nalu []byte) {
  90. if dts == 0 {
  91. vt.generateTimestamp(pts)
  92. } else {
  93. vt.Value.PTS = time.Duration(pts)
  94. vt.Value.DTS = time.Duration(dts)
  95. }
  96. vt.Value.BytesIn += len(nalu)
  97. vt.WriteSliceBytes(nalu)
  98. //fmt.Println("WriteNalu 11111111111111111111111")
  99. vt.Flush()
  100. }
  101. func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) {
  102. if dts == 0 {
  103. vt.generateTimestamp(pts)
  104. } else {
  105. vt.Value.PTS = time.Duration(pts)
  106. vt.Value.DTS = time.Duration(dts)
  107. }
  108. vt.Value.BytesIn += len(frame)
  109. common.SplitAnnexB(frame, vt.writeAnnexBSlice, codec.NALU_Delimiter2)
  110. if vt.Value.AUList.ByteLength > 0 {
  111. //fmt.Println("WriteAnnexB 11111111111111111111111")
  112. vt.Flush()
  113. }
  114. }
  115. func (vt *Video) writeAVCCFrame(ts uint32, r *util.BLLReader, frame *util.BLL) (err error) {
  116. var cts uint32
  117. cts, err = r.ReadBE(3)
  118. if err != nil {
  119. return err
  120. }
  121. vt.Value.PTS = time.Duration(ts+cts) * 90
  122. vt.Value.DTS = time.Duration(ts) * 90
  123. var nalulen uint32
  124. for nalulen, err = r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) {
  125. if remain := frame.ByteLength - r.GetOffset(); remain < int(nalulen) {
  126. vt.Error("read nalu length error", zap.Int("nalulen", int(nalulen)), zap.Int("remain", remain))
  127. frame.Recycle()
  128. vt.Value.Reset()
  129. return
  130. }
  131. vt.AppendAuBytes(r.ReadN(int(nalulen))...)
  132. }
  133. return nil
  134. }
  135. func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
  136. if l := frame.ByteLength; l < 6 {
  137. vt.Error("AVCC data too short", zap.Int("len", l))
  138. return io.ErrShortWrite
  139. }
  140. // bbb := util.Buffer(frame.ToBytes()[5:])
  141. r := frame.NewReader()
  142. b, _ := r.ReadByte()
  143. isExtHeader := (b >> 4) & 0b1000
  144. frameType := (b >> 4) & 0b0111
  145. vt.Value.IFrame = frameType == 1 || frameType == 4
  146. packetType := b & 0b1111
  147. if isExtHeader != 0 {
  148. r.ReadBE(4) // fourcc
  149. switch packetType {
  150. case codec.PacketTypeSequenceStart:
  151. err = vt.SpesificTrack.WriteSequenceHead(frame.ToBytes())
  152. frame.Recycle()
  153. return
  154. case codec.PacketTypeCodedFrames:
  155. err = vt.SpesificTrack.writeAVCCFrame(ts, r, frame)
  156. case codec.PacketTypeCodedFramesX:
  157. }
  158. } else {
  159. b, _ = r.ReadByte() //sequence frame flag
  160. if b == 0 {
  161. err = vt.SpesificTrack.WriteSequenceHead(frame.ToBytes())
  162. frame.Recycle()
  163. return
  164. }
  165. err = vt.SpesificTrack.writeAVCCFrame(ts, r, frame)
  166. }
  167. if err == nil {
  168. vt.Value.WriteAVCC(ts, frame)
  169. //fmt.Println("WriteAVCC 11111111111111111111111")
  170. vt.Flush()
  171. }
  172. return
  173. }
  174. func (vt *Video) WriteSliceByte(b ...byte) {
  175. // fmt.Println("write slice byte", b)
  176. vt.WriteSliceBytes(b)
  177. }
  178. // 在I帧前面插入sps pps webrtc需要
  179. func (vt *Video) insertDCRtp() {
  180. head := vt.Value.RTP.Next
  181. for _, nalu := range vt.ParamaterSets {
  182. var packet rtp.Packet
  183. packet.Version = 2
  184. packet.PayloadType = vt.PayloadType
  185. packet.Payload = nalu
  186. packet.SSRC = vt.SSRC
  187. packet.Timestamp = uint32(vt.Value.PTS)
  188. packet.Marker = false
  189. head.InsertBeforeValue(RTPFrame{Packet: &packet})
  190. }
  191. }
  192. func (vt *Video) generateTimestamp(ts uint32) {
  193. if vt.State == TrackStateOffline {
  194. vt.dtsEst = NewDTSEstimator()
  195. }
  196. vt.Value.PTS = time.Duration(ts)
  197. vt.Value.DTS = time.Duration(vt.dtsEst.Feed(ts))
  198. }
  199. func (vt *Video) SetLostFlag() {
  200. vt.lostFlag = true
  201. }
  202. func (vt *Video) CompleteAVCC(rv *AVFrame) {
  203. mem := vt.BytesPool.Get(5)
  204. b := mem.Value
  205. if rv.IFrame {
  206. b[0] = 0x10 | byte(vt.CodecID)
  207. } else {
  208. b[0] = 0x20 | byte(vt.CodecID)
  209. }
  210. b[1] = 1
  211. // println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS)
  212. // vt.Info("cts", zap.Uint32("cts", uint32((rv.PTS-rv.DTS)/90)))
  213. // 写入CTS
  214. util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90)
  215. rv.AVCC.Push(mem)
  216. // if rv.AVCC.ByteLength != 5 {
  217. // panic("error")
  218. // }
  219. // var tmp = 0
  220. rv.AUList.Range(func(au *util.BLL) bool {
  221. mem = vt.BytesPool.Get(4)
  222. // println(au.ByteLength)
  223. util.PutBE(mem.Value, uint32(au.ByteLength))
  224. rv.AVCC.Push(mem)
  225. au.Range(func(slice util.Buffer) bool {
  226. rv.AVCC.Push(vt.BytesPool.GetShell(slice))
  227. return true
  228. })
  229. // tmp += 4 + au.ByteLength
  230. // if rv.AVCC.ByteLength != 5+tmp {
  231. // panic("error")
  232. // }
  233. return true
  234. })
  235. }
  236. func (vt *Video) Flush() {
  237. rv := vt.Value
  238. if vt.SEIReader != nil {
  239. if seiFrame, err := vt.SEIReader.TryRead(); seiFrame != nil {
  240. var au util.BLL
  241. au.Push(vt.SpesificTrack.GetNALU_SEI())
  242. au.Push(vt.BytesPool.GetShell(seiFrame.Data))
  243. vt.Info("sei", zap.Int("len", len(seiFrame.Data)))
  244. vt.Value.AUList.UnshiftValue(&au)
  245. } else if err != nil {
  246. vt.SEIReader = nil
  247. }
  248. }
  249. if rv.IFrame {
  250. vt.computeGOP()
  251. vt.Stream.SetIDR(vt)
  252. }
  253. if !vt.Attached.Load() {
  254. if vt.IDRing != nil && vt.SequenceHeadSeq > 0 {
  255. defer vt.Attach()
  256. //fmt.Println("1111111111111111111111111111 vt attach")
  257. } else {
  258. rv.Reset()
  259. return
  260. }
  261. }
  262. if vt.lostFlag {
  263. if rv.IFrame {
  264. vt.lostFlag = false
  265. } else {
  266. rv.Reset()
  267. return
  268. }
  269. }
  270. vt.Media.Flush()
  271. vt.dcChanged = false
  272. }
  273. func (vt *Video) WriteSequenceHead(sh []byte) {
  274. vt.Media.WriteSequenceHead(sh)
  275. vt.dcChanged = true
  276. }
  277. /*
  278. Access Unit的首个nalu是4字节起始码。
  279. 这里举个例子说明,用JM可以生成这样一段码流(不要使用JM8.6,它在这部分与标准不符),这个码流可以见本楼附件:
  280. SPS (4字节头)
  281. PPS (4字节头)
  282. SEI (4字节头)
  283. I0(slice0) (4字节头)
  284. I0(slice1) (3字节头)
  285. P1(slice0) (4字节头)
  286. P1(slice1) (3字节头)
  287. P2(slice0) (4字节头)
  288. P2(slice1) (3字节头)
  289. I0(slice0)是序列第一帧(I帧)的第一个slice,是当前Access Unit的首个nalu,所以是4字节头。而I0(slice1)表示第一帧的第二个slice,所以是3字节头。P1(slice0) 、P1(slice1)同理。
  290. */