123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- package track
- import (
- "io"
- "time"
- //"fmt"
- "github.com/pion/rtp"
- "go.uber.org/zap"
- "m7s.live/engine/v4/codec"
- "m7s.live/engine/v4/common"
- . "m7s.live/engine/v4/common"
- "m7s.live/engine/v4/util"
- )
- type Video struct {
- Media
- CodecID codec.VideoCodecID
- GOP int //关键帧间隔
- nalulenSize int //avcc格式中表示nalu长度的字节数,通常为4
- dcChanged bool //解码器配置是否改变了,一般由于变码率导致
- dtsEst *DTSEstimator
- lostFlag bool // 是否丢帧
- codec.SPSInfo
- ParamaterSets `json:"-" yaml:"-"`
- SPS []byte `json:"-" yaml:"-"`
- PPS []byte `json:"-" yaml:"-"`
- SEIReader *DataReader[[]byte] `json:"-" yaml:"-"`
- }
- func (v *Video) Attach() {
- //fmt.Println("VIDEO ATTACH 111111111111111111111111111111111111111111111111111")
- if v.Attached.CompareAndSwap(false, true) {
- v.Info("attach video track", zap.Uint("width", v.Width), zap.Uint("height", v.Height))
- if err := v.Stream.AddTrack(v).Await(); err != nil {
- v.Error("attach video track failed", zap.Error(err))
- v.Attached.Store(false)
- } else {
- v.Info("video track attached", zap.Uint("width", v.Width), zap.Uint("height", v.Height))
- }
- }
- }
- func (v *Video) Detach() {
- if v.Attached.CompareAndSwap(true, false) {
- v.Stream.RemoveTrack(v)
- }
- }
- func (vt *Video) GetName() string {
- if vt.Name == "" {
- return vt.CodecID.String()
- }
- return vt.Name
- }
- // PlayFullAnnexB 订阅annex-b格式的流数据,每一个I帧增加sps、pps头
- // func (vt *Video) PlayFullAnnexB(ctx context.Context, onMedia func(net.Buffers) error) error {
- // for vr := vt.ReadRing(); ctx.Err() == nil; vr.MoveNext() {
- // vp := vr.Read(ctx)
- // var data net.Buffers
- // if vp.IFrame {
- // data = vt.GetAnnexB()
- // }
- // data = append(data, codec.NALU_Delimiter2)
- // for slice := vp.AUList.Head; slice != nil; slice = slice.Next {
- // data = append(data, slice.ToBuffers()...)
- // if slice.Next != nil {
- // data = append(data, codec.NALU_Delimiter1)
- // }
- // }
- // if err := onMedia(data); err != nil {
- // // TODO: log err
- // return err
- // }
- // }
- // return ctx.Err()
- // }
- func (vt *Video) computeGOP() {
- if vt.IDRing != nil {
- vt.GOP = int(vt.Value.Sequence - vt.IDRing.Value.Sequence)
- if vt.HistoryRing == nil {
- vt.narrow(vt.GOP)
- }
- }
- vt.AddIDR()
- // var n int
- // for i := 0; i < len(vt.BytesPool); i++ {
- // n += vt.BytesPool[i].Length
- // }
- // println(n)
- }
- func (vt *Video) writeAnnexBSlice(nalu []byte) {
- common.SplitAnnexB(nalu, vt.WriteSliceBytes, codec.NALU_Delimiter1)
- }
- func (vt *Video) WriteNalu(pts uint32, dts uint32, nalu []byte) {
- if dts == 0 {
- vt.generateTimestamp(pts)
- } else {
- vt.Value.PTS = time.Duration(pts)
- vt.Value.DTS = time.Duration(dts)
- }
- vt.Value.BytesIn += len(nalu)
- vt.WriteSliceBytes(nalu)
- //fmt.Println("WriteNalu 11111111111111111111111")
- vt.Flush()
- }
- func (vt *Video) WriteAnnexB(pts uint32, dts uint32, frame []byte) {
- if dts == 0 {
- vt.generateTimestamp(pts)
- } else {
- vt.Value.PTS = time.Duration(pts)
- vt.Value.DTS = time.Duration(dts)
- }
- vt.Value.BytesIn += len(frame)
- common.SplitAnnexB(frame, vt.writeAnnexBSlice, codec.NALU_Delimiter2)
- if vt.Value.AUList.ByteLength > 0 {
- //fmt.Println("WriteAnnexB 11111111111111111111111")
- vt.Flush()
- }
- }
- func (vt *Video) writeAVCCFrame(ts uint32, r *util.BLLReader, frame *util.BLL) (err error) {
- var cts uint32
- cts, err = r.ReadBE(3)
- if err != nil {
- return err
- }
- vt.Value.PTS = time.Duration(ts+cts) * 90
- vt.Value.DTS = time.Duration(ts) * 90
- var nalulen uint32
- for nalulen, err = r.ReadBE(vt.nalulenSize); err == nil; nalulen, err = r.ReadBE(vt.nalulenSize) {
- if remain := frame.ByteLength - r.GetOffset(); remain < int(nalulen) {
- vt.Error("read nalu length error", zap.Int("nalulen", int(nalulen)), zap.Int("remain", remain))
- frame.Recycle()
- vt.Value.Reset()
- return
- }
- vt.AppendAuBytes(r.ReadN(int(nalulen))...)
- }
- return nil
- }
- func (vt *Video) WriteAVCC(ts uint32, frame *util.BLL) (err error) {
- if l := frame.ByteLength; l < 6 {
- vt.Error("AVCC data too short", zap.Int("len", l))
- return io.ErrShortWrite
- }
- // bbb := util.Buffer(frame.ToBytes()[5:])
- r := frame.NewReader()
- b, _ := r.ReadByte()
- isExtHeader := (b >> 4) & 0b1000
- frameType := (b >> 4) & 0b0111
- vt.Value.IFrame = frameType == 1 || frameType == 4
- packetType := b & 0b1111
- if isExtHeader != 0 {
- r.ReadBE(4) // fourcc
- switch packetType {
- case codec.PacketTypeSequenceStart:
- err = vt.SpesificTrack.WriteSequenceHead(frame.ToBytes())
- frame.Recycle()
- return
- case codec.PacketTypeCodedFrames:
- err = vt.SpesificTrack.writeAVCCFrame(ts, r, frame)
- case codec.PacketTypeCodedFramesX:
- }
- } else {
- b, _ = r.ReadByte() //sequence frame flag
- if b == 0 {
- err = vt.SpesificTrack.WriteSequenceHead(frame.ToBytes())
- frame.Recycle()
- return
- }
- err = vt.SpesificTrack.writeAVCCFrame(ts, r, frame)
- }
- if err == nil {
- vt.Value.WriteAVCC(ts, frame)
- //fmt.Println("WriteAVCC 11111111111111111111111")
- vt.Flush()
- }
- return
- }
- func (vt *Video) WriteSliceByte(b ...byte) {
- // fmt.Println("write slice byte", b)
- vt.WriteSliceBytes(b)
- }
- // 在I帧前面插入sps pps webrtc需要
- func (vt *Video) insertDCRtp() {
- head := vt.Value.RTP.Next
- for _, nalu := range vt.ParamaterSets {
- var packet rtp.Packet
- packet.Version = 2
- packet.PayloadType = vt.PayloadType
- packet.Payload = nalu
- packet.SSRC = vt.SSRC
- packet.Timestamp = uint32(vt.Value.PTS)
- packet.Marker = false
- head.InsertBeforeValue(RTPFrame{Packet: &packet})
- }
- }
- func (vt *Video) generateTimestamp(ts uint32) {
- if vt.State == TrackStateOffline {
- vt.dtsEst = NewDTSEstimator()
- }
- vt.Value.PTS = time.Duration(ts)
- vt.Value.DTS = time.Duration(vt.dtsEst.Feed(ts))
- }
- func (vt *Video) SetLostFlag() {
- vt.lostFlag = true
- }
- func (vt *Video) CompleteAVCC(rv *AVFrame) {
- mem := vt.BytesPool.Get(5)
- b := mem.Value
- if rv.IFrame {
- b[0] = 0x10 | byte(vt.CodecID)
- } else {
- b[0] = 0x20 | byte(vt.CodecID)
- }
- b[1] = 1
- // println(rv.PTS < rv.DTS, "\t", rv.PTS, "\t", rv.DTS, "\t", rv.PTS-rv.DTS)
- // vt.Info("cts", zap.Uint32("cts", uint32((rv.PTS-rv.DTS)/90)))
- // 写入CTS
- util.PutBE(b[2:5], (rv.PTS-rv.DTS)/90)
- rv.AVCC.Push(mem)
- // if rv.AVCC.ByteLength != 5 {
- // panic("error")
- // }
- // var tmp = 0
- rv.AUList.Range(func(au *util.BLL) bool {
- mem = vt.BytesPool.Get(4)
- // println(au.ByteLength)
- util.PutBE(mem.Value, uint32(au.ByteLength))
- rv.AVCC.Push(mem)
- au.Range(func(slice util.Buffer) bool {
- rv.AVCC.Push(vt.BytesPool.GetShell(slice))
- return true
- })
- // tmp += 4 + au.ByteLength
- // if rv.AVCC.ByteLength != 5+tmp {
- // panic("error")
- // }
- return true
- })
- }
- func (vt *Video) Flush() {
- rv := vt.Value
- if vt.SEIReader != nil {
- if seiFrame, err := vt.SEIReader.TryRead(); seiFrame != nil {
- var au util.BLL
- au.Push(vt.SpesificTrack.GetNALU_SEI())
- au.Push(vt.BytesPool.GetShell(seiFrame.Data))
- vt.Info("sei", zap.Int("len", len(seiFrame.Data)))
- vt.Value.AUList.UnshiftValue(&au)
- } else if err != nil {
- vt.SEIReader = nil
- }
- }
- if rv.IFrame {
- vt.computeGOP()
- vt.Stream.SetIDR(vt)
- }
- if !vt.Attached.Load() {
- if vt.IDRing != nil && vt.SequenceHeadSeq > 0 {
- defer vt.Attach()
- //fmt.Println("1111111111111111111111111111 vt attach")
- } else {
- rv.Reset()
- return
- }
- }
- if vt.lostFlag {
- if rv.IFrame {
- vt.lostFlag = false
- } else {
- rv.Reset()
- return
- }
- }
- vt.Media.Flush()
- vt.dcChanged = false
- }
- func (vt *Video) WriteSequenceHead(sh []byte) {
- vt.Media.WriteSequenceHead(sh)
- vt.dcChanged = true
- }
- /*
- Access Unit的首个nalu是4字节起始码。
- 这里举个例子说明,用JM可以生成这样一段码流(不要使用JM8.6,它在这部分与标准不符),这个码流可以见本楼附件:
- SPS (4字节头)
- PPS (4字节头)
- SEI (4字节头)
- I0(slice0) (4字节头)
- I0(slice1) (3字节头)
- P1(slice0) (4字节头)
- P1(slice1) (3字节头)
- P2(slice0) (4字节头)
- P2(slice1) (3字节头)
- I0(slice0)是序列第一帧(I帧)的第一个slice,是当前Access Unit的首个nalu,所以是4字节头。而I0(slice1)表示第一帧的第二个slice,所以是3字节头。P1(slice0) 、P1(slice1)同理。
- */
|