base.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package track
  2. import (
  3. "time"
  4. "unsafe"
  5. "github.com/pion/rtp"
  6. "go.uber.org/zap"
  7. . "m7s.live/engine/v4/common"
  8. "m7s.live/engine/v4/config"
  9. "m7s.live/engine/v4/log"
  10. "m7s.live/engine/v4/util"
  11. )
  12. var deltaDTSRange time.Duration = 90 * 10000 // 超过 10 秒
  13. type 流速控制 struct {
  14. 起始时间戳 time.Duration
  15. 起始dts time.Duration
  16. 等待上限 time.Duration
  17. 起始时间 time.Time
  18. }
  19. func (p *流速控制) 重置(绝对时间戳 time.Duration, dts time.Duration) {
  20. p.起始时间 = time.Now()
  21. p.起始时间戳 = 绝对时间戳
  22. p.起始dts = dts
  23. // println("重置", p.起始时间.Format("2006-01-02 15:04:05"), p.起始时间戳)
  24. }
  25. func (p *流速控制) 根据起始DTS计算绝对时间戳(dts time.Duration) time.Duration {
  26. if dts < p.起始dts {
  27. dts += (1 << 32)
  28. }
  29. return ((dts-p.起始dts)*time.Millisecond + p.起始时间戳*90) / 90
  30. }
  31. func (p *流速控制) 控制流速(绝对时间戳 time.Duration, dts time.Duration) (等待了 time.Duration) {
  32. 数据时间差, 实际时间差 := 绝对时间戳-p.起始时间戳, time.Since(p.起始时间)
  33. // println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05"))
  34. // if 实际时间差 > 数据时间差 {
  35. // p.重置(绝对时间戳)
  36. // return
  37. // }
  38. // 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep
  39. if 过快 := (数据时间差 - 实际时间差); 过快 > 100*time.Millisecond {
  40. // fmt.Println("过快毫秒", 过快.Milliseconds())
  41. // println("过快毫秒", p.name, 过快.Milliseconds())
  42. if 过快 > p.等待上限 {
  43. 等待了 = p.等待上限
  44. } else {
  45. 等待了 = 过快
  46. }
  47. time.Sleep(等待了)
  48. } else if 过快 < -100*time.Millisecond {
  49. // fmt.Println("过慢毫秒", 过快.Milliseconds())
  50. // p.重置(绝对时间戳, dts)
  51. // println("过慢毫秒", p.name, 过快.Milliseconds())
  52. }
  53. return
  54. }
  55. type SpesificTrack interface {
  56. CompleteRTP(*AVFrame)
  57. CompleteAVCC(*AVFrame)
  58. WriteSliceBytes([]byte)
  59. WriteRTPFrame(*util.ListItem[RTPFrame])
  60. generateTimestamp(uint32)
  61. WriteSequenceHead([]byte) error
  62. writeAVCCFrame(uint32, *util.BLLReader, *util.BLL) error
  63. GetNALU_SEI() *util.ListItem[util.Buffer]
  64. Flush()
  65. }
  66. type IDRingList struct {
  67. IDRList util.List[*util.Ring[*AVFrame]]
  68. IDRing *util.Ring[*AVFrame]
  69. HistoryRing *util.Ring[*AVFrame]
  70. }
  71. func (p *IDRingList) AddIDR(IDRing *util.Ring[*AVFrame]) {
  72. p.IDRList.PushValue(IDRing)
  73. p.IDRing = IDRing
  74. }
  75. func (p *IDRingList) ShiftIDR() {
  76. p.IDRList.Shift()
  77. p.HistoryRing = p.IDRList.Next.Value
  78. }
  79. // Media 基础媒体Track类
  80. type Media struct {
  81. Base[any, *AVFrame]
  82. PayloadType byte
  83. IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
  84. SSRC uint32
  85. SampleRate uint32
  86. BytesPool util.BytesPool `json:"-" yaml:"-"`
  87. RtpPool util.Pool[RTPFrame] `json:"-" yaml:"-"`
  88. SequenceHead []byte `json:"-" yaml:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
  89. SequenceHeadSeq int
  90. RTPDemuxer
  91. SpesificTrack `json:"-" yaml:"-"`
  92. deltaTs time.Duration //用于接续发布后时间戳连续
  93. 流速控制
  94. }
  95. func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) {
  96. if b.Reuse() {
  97. item = av.BytesPool.Get(b.Len())
  98. copy(item.Value, b.Bytes())
  99. } else {
  100. return av.BytesPool.GetShell(b.Bytes())
  101. }
  102. return
  103. }
  104. func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
  105. result = av.RtpPool.Get()
  106. if result.Value.Packet == nil {
  107. result.Value.Packet = &rtp.Packet{}
  108. result.Value.PayloadType = av.PayloadType
  109. result.Value.SSRC = av.SSRC
  110. result.Value.Version = 2
  111. result.Value.Raw = make([]byte, 1460)
  112. }
  113. result.Value.Raw = result.Value.Raw[:1460]
  114. result.Value.Payload = result.Value.Raw[:0]
  115. return
  116. }
  117. // 为json序列化而计算的数据
  118. func (av *Media) SnapForJson() {
  119. v := av.LastValue
  120. if av.RawPart != nil {
  121. av.RawPart = av.RawPart[:0]
  122. } else {
  123. av.RawPart = make([]int, 0, 10)
  124. }
  125. if av.RawSize = v.AUList.ByteLength; av.RawSize > 0 {
  126. r := v.AUList.NewReader()
  127. for b, err := r.ReadByte(); err == nil && len(av.RawPart) < 10; b, err = r.ReadByte() {
  128. av.RawPart = append(av.RawPart, int(b))
  129. }
  130. }
  131. }
  132. func (av *Media) SetSpeedLimit(value time.Duration) {
  133. av.等待上限 = value
  134. }
  135. func (av *Media) SetStuff(stuff ...any) {
  136. // 代表发布者已经离线,该Track成为遗留Track,等待下一任发布者接续发布
  137. for _, s := range stuff {
  138. switch v := s.(type) {
  139. case IStream:
  140. pubConf := v.GetPublisherConfig()
  141. av.Base.SetStuff(v)
  142. av.Init(256, NewAVFrame)
  143. av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
  144. av.等待上限 = pubConf.SpeedLimit
  145. case uint32:
  146. av.SampleRate = v
  147. case byte:
  148. av.PayloadType = v
  149. case util.BytesPool:
  150. av.BytesPool = v
  151. case SpesificTrack:
  152. av.SpesificTrack = v
  153. case []any:
  154. av.SetStuff(v...)
  155. default:
  156. av.Base.SetStuff(v)
  157. }
  158. }
  159. }
  160. func (av *Media) LastWriteTime() time.Time {
  161. return av.LastValue.WriteTime
  162. }
  163. func (av *Media) CurrentFrame() *AVFrame {
  164. return av.Value
  165. }
  166. func (av *Media) PreFrame() *AVFrame {
  167. return av.LastValue
  168. }
  169. func (av *Media) generateTimestamp(ts uint32) {
  170. av.Value.PTS = time.Duration(ts)
  171. av.Value.DTS = time.Duration(ts)
  172. }
  173. func (av *Media) WriteSequenceHead(sh []byte) {
  174. av.SequenceHead = sh
  175. av.SequenceHeadSeq++
  176. }
  177. func (av *Media) AppendAuBytes(b ...[]byte) {
  178. var au util.BLL
  179. for _, bb := range b {
  180. au.Push(av.BytesPool.GetShell(bb))
  181. }
  182. av.Value.AUList.PushValue(&au)
  183. }
  184. func (av *Media) narrow(gop int) {
  185. if l := av.Size - gop; l > 12 {
  186. if log.Trace {
  187. av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5))
  188. }
  189. //缩小缓冲环节省内存
  190. av.Reduce(5)
  191. }
  192. }
  193. func (av *Media) AddIDR() {
  194. if av.Stream.GetPublisherConfig().BufferTime > 0 {
  195. av.IDRingList.AddIDR(av.Ring)
  196. if av.HistoryRing == nil {
  197. av.HistoryRing = av.IDRing
  198. }
  199. } else {
  200. av.IDRing = av.Ring
  201. }
  202. }
  203. func (av *Media) Flush() {
  204. curValue, preValue, nextValue := av.Value, av.LastValue, av.Next()
  205. useDts := curValue.Timestamp == 0
  206. originDTS := curValue.DTS
  207. if av.State == TrackStateOffline {
  208. av.State = TrackStateOnline
  209. if useDts {
  210. av.deltaTs = curValue.DTS - preValue.DTS
  211. } else {
  212. av.deltaTs = curValue.Timestamp - preValue.Timestamp
  213. }
  214. curValue.DTS = preValue.DTS + 900
  215. curValue.PTS = preValue.PTS + 900
  216. curValue.Timestamp = preValue.Timestamp + time.Millisecond
  217. av.Info("track back online", zap.Duration("delta", av.deltaTs))
  218. } else if av.deltaTs != 0 {
  219. if useDts {
  220. curValue.DTS -= av.deltaTs
  221. curValue.PTS -= av.deltaTs
  222. } else {
  223. rtpts := av.deltaTs * 90 / time.Millisecond
  224. curValue.DTS -= rtpts
  225. curValue.PTS -= rtpts
  226. curValue.Timestamp -= av.deltaTs
  227. }
  228. }
  229. if av.起始时间.IsZero() {
  230. curValue.DeltaTime = 0
  231. if useDts {
  232. curValue.Timestamp = time.Since(av.Stream.GetStartTime())
  233. }
  234. av.重置(curValue.Timestamp, curValue.DTS)
  235. } else {
  236. if useDts {
  237. deltaDts := curValue.DTS - preValue.DTS
  238. if deltaDts > deltaDTSRange || deltaDts < -deltaDTSRange {
  239. // 时间戳跳变,等同于离线重连
  240. av.deltaTs = originDTS - preValue.DTS
  241. curValue.DTS = preValue.DTS + 900
  242. curValue.PTS = preValue.PTS + 900
  243. av.Warn("track dts reset", zap.Int64("delta1", int64(deltaDts)), zap.Int64("delta2", int64(av.deltaTs)))
  244. }
  245. curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS)
  246. }
  247. curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond)
  248. }
  249. if log.Trace {
  250. av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Int64("dts0", int64(preValue.DTS)), zap.Int64("dts1", int64(originDTS)), zap.Uint64("dts2", uint64(curValue.DTS)), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp), zap.Int("au", curValue.AUList.Length), zap.Int("rtp", curValue.RTP.Length), zap.Int("avcc", curValue.AVCC.ByteLength), zap.Int("raw", curValue.AUList.ByteLength), zap.Int("bps", av.BPS))
  251. }
  252. bufferTime := av.Stream.GetPublisherConfig().BufferTime
  253. if bufferTime > 0 && av.IDRingList.IDRList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.IDRList.Next.Next.Value.Value.Timestamp) > bufferTime {
  254. av.ShiftIDR()
  255. av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence))
  256. }
  257. // 下一帧为订阅起始帧,即将覆盖,需要扩环
  258. if nextValue == av.IDRing || nextValue == av.HistoryRing {
  259. // if av.AVRing.Size < 512 {
  260. if log.Trace {
  261. av.Stream.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name))
  262. }
  263. av.Glow(5)
  264. // } else {
  265. // av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name))
  266. // }
  267. }
  268. if curValue.AUList.Length > 0 {
  269. // 补完RTP
  270. if config.Global.EnableRTP && curValue.RTP.Length == 0 {
  271. av.CompleteRTP(curValue)
  272. }
  273. // 补完AVCC
  274. if config.Global.EnableAVCC && curValue.AVCC.ByteLength == 0 {
  275. av.CompleteAVCC(curValue)
  276. }
  277. }
  278. av.ComputeBPS(curValue.BytesIn)
  279. av.Step()
  280. if av.等待上限 > 0 {
  281. 等待了 := av.控制流速(curValue.Timestamp, curValue.DTS)
  282. if log.Trace && 等待了 > 0 {
  283. av.Trace("speed control", zap.Duration("sleep", 等待了))
  284. }
  285. }
  286. }
  287. func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration {
  288. if curTs < preTs {
  289. return curTs + (1<<32)*time.Millisecond - preTs
  290. }
  291. return curTs - preTs
  292. }