123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321 |
- package track
- import (
- "time"
- "unsafe"
- "github.com/pion/rtp"
- "go.uber.org/zap"
- . "m7s.live/engine/v4/common"
- "m7s.live/engine/v4/config"
- "m7s.live/engine/v4/log"
- "m7s.live/engine/v4/util"
- )
- var deltaDTSRange time.Duration = 90 * 10000 // 超过 10 秒
- type 流速控制 struct {
- 起始时间戳 time.Duration
- 起始dts time.Duration
- 等待上限 time.Duration
- 起始时间 time.Time
- }
- func (p *流速控制) 重置(绝对时间戳 time.Duration, dts time.Duration) {
- p.起始时间 = time.Now()
- p.起始时间戳 = 绝对时间戳
- p.起始dts = dts
- // println("重置", p.起始时间.Format("2006-01-02 15:04:05"), p.起始时间戳)
- }
- func (p *流速控制) 根据起始DTS计算绝对时间戳(dts time.Duration) time.Duration {
- if dts < p.起始dts {
- dts += (1 << 32)
- }
- return ((dts-p.起始dts)*time.Millisecond + p.起始时间戳*90) / 90
- }
- func (p *流速控制) 控制流速(绝对时间戳 time.Duration, dts time.Duration) (等待了 time.Duration) {
- 数据时间差, 实际时间差 := 绝对时间戳-p.起始时间戳, time.Since(p.起始时间)
- // println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05"))
- // if 实际时间差 > 数据时间差 {
- // p.重置(绝对时间戳)
- // return
- // }
- // 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep
- if 过快 := (数据时间差 - 实际时间差); 过快 > 100*time.Millisecond {
- // fmt.Println("过快毫秒", 过快.Milliseconds())
- // println("过快毫秒", p.name, 过快.Milliseconds())
- if 过快 > p.等待上限 {
- 等待了 = p.等待上限
- } else {
- 等待了 = 过快
- }
- time.Sleep(等待了)
- } else if 过快 < -100*time.Millisecond {
- // fmt.Println("过慢毫秒", 过快.Milliseconds())
- // p.重置(绝对时间戳, dts)
- // println("过慢毫秒", p.name, 过快.Milliseconds())
- }
- return
- }
- type SpesificTrack interface {
- CompleteRTP(*AVFrame)
- CompleteAVCC(*AVFrame)
- WriteSliceBytes([]byte)
- WriteRTPFrame(*util.ListItem[RTPFrame])
- generateTimestamp(uint32)
- WriteSequenceHead([]byte) error
- writeAVCCFrame(uint32, *util.BLLReader, *util.BLL) error
- GetNALU_SEI() *util.ListItem[util.Buffer]
- Flush()
- }
- type IDRingList struct {
- IDRList util.List[*util.Ring[*AVFrame]]
- IDRing *util.Ring[*AVFrame]
- HistoryRing *util.Ring[*AVFrame]
- }
- func (p *IDRingList) AddIDR(IDRing *util.Ring[*AVFrame]) {
- p.IDRList.PushValue(IDRing)
- p.IDRing = IDRing
- }
- func (p *IDRingList) ShiftIDR() {
- p.IDRList.Shift()
- p.HistoryRing = p.IDRList.Next.Value
- }
- // Media 基础媒体Track类
- type Media struct {
- Base[any, *AVFrame]
- BufferTime time.Duration //发布者配置中的缓冲时间(时光回溯)
- PayloadType byte
- IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
- SSRC uint32
- SampleRate uint32
- BytesPool util.BytesPool `json:"-" yaml:"-"`
- RtpPool util.Pool[RTPFrame] `json:"-" yaml:"-"`
- SequenceHead []byte `json:"-" yaml:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
- SequenceHeadSeq int
- RTPDemuxer
- SpesificTrack `json:"-" yaml:"-"`
- deltaTs time.Duration //用于接续发布后时间戳连续
- iframeReceived bool
- 流速控制
- }
- func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) {
- if b.Reuse() {
- item = av.BytesPool.Get(b.Len())
- copy(item.Value, b.Bytes())
- } else {
- return av.BytesPool.GetShell(b.Bytes())
- }
- return
- }
- func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
- result = av.RtpPool.Get()
- if result.Value.Packet == nil {
- result.Value.Packet = &rtp.Packet{}
- result.Value.PayloadType = av.PayloadType
- result.Value.SSRC = av.SSRC
- result.Value.Version = 2
- result.Value.Raw = make([]byte, 1460)
- }
- result.Value.Raw = result.Value.Raw[:1460]
- result.Value.Payload = result.Value.Raw[:0]
- return
- }
- // 为json序列化而计算的数据
- func (av *Media) SnapForJson() {
- v := av.LastValue
- if av.RawPart != nil {
- av.RawPart = av.RawPart[:0]
- } else {
- av.RawPart = make([]int, 0, 10)
- }
- if av.RawSize = v.AUList.ByteLength; av.RawSize > 0 {
- r := v.AUList.NewReader()
- for b, err := r.ReadByte(); err == nil && len(av.RawPart) < 10; b, err = r.ReadByte() {
- av.RawPart = append(av.RawPart, int(b))
- }
- }
- }
- func (av *Media) SetSpeedLimit(value time.Duration) {
- av.等待上限 = value
- }
- func (av *Media) SetStuff(stuff ...any) {
- // 代表发布者已经离线,该Track成为遗留Track,等待下一任发布者接续发布
- for _, s := range stuff {
- switch v := s.(type) {
- case IPuber:
- pubConf := v.GetConfig()
- av.BufferTime = pubConf.BufferTime
- av.Base.SetStuff(v)
- av.Init(256, NewAVFrame)
- av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
- av.等待上限 = pubConf.SpeedLimit
- case uint32:
- av.SampleRate = v
- case byte:
- av.PayloadType = v
- case util.BytesPool:
- av.BytesPool = v
- case SpesificTrack:
- av.SpesificTrack = v
- case []any:
- av.SetStuff(v...)
- default:
- av.Base.SetStuff(v)
- }
- }
- }
- func (av *Media) LastWriteTime() time.Time {
- return av.LastValue.WriteTime
- }
- func (av *Media) CurrentFrame() *AVFrame {
- return av.Value
- }
- func (av *Media) PreFrame() *AVFrame {
- return av.LastValue
- }
- func (av *Media) generateTimestamp(ts uint32) {
- av.Value.PTS = time.Duration(ts)
- av.Value.DTS = time.Duration(ts)
- }
- func (av *Media) WriteSequenceHead(sh []byte) {
- av.SequenceHead = sh
- av.SequenceHeadSeq++
- }
- func (av *Media) AppendAuBytes(b ...[]byte) {
- var au util.BLL
- for _, bb := range b {
- au.Push(av.BytesPool.GetShell(bb))
- }
- av.Value.AUList.PushValue(&au)
- }
- func (av *Media) narrow(gop int) {
- if l := av.Size - gop; l > 12 {
- if log.Trace {
- av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5))
- }
- //缩小缓冲环节省内存
- av.Reduce(5)
- }
- }
- func (av *Media) AddIDR() {
- if av.BufferTime > 0 {
- av.IDRingList.AddIDR(av.Ring)
- if av.HistoryRing == nil {
- av.HistoryRing = av.IDRing
- }
- } else {
- av.IDRing = av.Ring
- }
- }
- func (av *Media) Flush() {
- curValue, preValue, nextValue := av.Value, av.LastValue, av.Next()
- useDts := curValue.Timestamp == 0
- originDTS := curValue.DTS
- if av.State == TrackStateOffline {
- av.State = TrackStateOnline
- if useDts {
- av.deltaTs = curValue.DTS - preValue.DTS
- } else {
- av.deltaTs = curValue.Timestamp - preValue.Timestamp
- }
- curValue.DTS = preValue.DTS + 900
- curValue.PTS = preValue.PTS + 900
- curValue.Timestamp = preValue.Timestamp + time.Millisecond
- av.Info("track back online", zap.Duration("delta", av.deltaTs))
- } else if av.deltaTs != 0 {
- if useDts {
- curValue.DTS -= av.deltaTs
- curValue.PTS -= av.deltaTs
- } else {
- rtpts := av.deltaTs * 90 / time.Millisecond
- curValue.DTS -= rtpts
- curValue.PTS -= rtpts
- curValue.Timestamp -= av.deltaTs
- }
- }
- if av.起始时间.IsZero() {
- curValue.DeltaTime = 0
- if useDts {
- curValue.Timestamp = time.Since(av.Publisher.GetStream().GetStartTime())
- }
- av.重置(curValue.Timestamp, curValue.DTS)
- } else {
- if useDts {
- deltaDts := curValue.DTS - preValue.DTS
- if deltaDts > deltaDTSRange || deltaDts < -deltaDTSRange {
- // 时间戳跳变,等同于离线重连
- av.deltaTs = originDTS - preValue.DTS
- curValue.DTS = preValue.DTS + 900
- curValue.PTS = preValue.PTS + 900
- av.Warn("track dts reset", zap.Int64("delta1", int64(deltaDts)), zap.Int64("delta2", int64(av.deltaTs)))
- }
- curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS)
- }
- curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond)
- }
- if log.Trace {
- av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Int64("dts0", int64(preValue.DTS)), zap.Int64("dts1", int64(originDTS)), zap.Uint64("dts2", uint64(curValue.DTS)), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp), zap.Int("au", curValue.AUList.Length), zap.Int("rtp", curValue.RTP.Length), zap.Int("avcc", curValue.AVCC.ByteLength), zap.Int("raw", curValue.AUList.ByteLength), zap.Int("bps", av.BPS))
- }
- bufferTime := av.BufferTime
- if bufferTime > 0 && av.IDRingList.IDRList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.IDRList.Next.Next.Value.Value.Timestamp) > bufferTime {
- av.ShiftIDR()
- av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence))
- }
- // 下一帧为订阅起始帧,即将覆盖,需要扩环
- if nextValue == av.IDRing || nextValue == av.HistoryRing {
- // if av.AVRing.Size < 512 {
- if log.Trace {
- av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name))
- }
- av.Glow(5)
- // } else {
- // av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name))
- // }
- }
- if curValue.AUList.Length > 0 {
- // 补完RTP
- if config.Global.EnableRTP && curValue.RTP.Length == 0 {
- av.CompleteRTP(curValue)
- }
- // 补完AVCC
- if config.Global.EnableAVCC && curValue.AVCC.ByteLength == 0 {
- av.CompleteAVCC(curValue)
- }
- }
- av.ComputeBPS(curValue.BytesIn)
- av.Step()
- if av.等待上限 > 0 {
- 等待了 := av.控制流速(curValue.Timestamp, curValue.DTS)
- if log.Trace && 等待了 > 0 {
- av.Trace("speed control", zap.Duration("sleep", 等待了))
- }
- }
- }
- func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration {
- if curTs < preTs {
- return curTs + (1<<32)*time.Millisecond - preTs
- }
- return curTs - preTs
- }
|