media.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. package track
  2. import (
  3. "time"
  4. "unsafe"
  5. "github.com/pion/rtp"
  6. "go.uber.org/zap"
  7. . "m7s.live/engine/v4/common"
  8. "m7s.live/engine/v4/config"
  9. "m7s.live/engine/v4/log"
  10. "m7s.live/engine/v4/util"
  11. )
  12. var deltaDTSRange time.Duration = 90 * 10000 // 超过 10 秒
  13. type 流速控制 struct {
  14. 起始时间戳 time.Duration
  15. 起始dts time.Duration
  16. 等待上限 time.Duration
  17. 起始时间 time.Time
  18. }
  19. func (p *流速控制) 重置(绝对时间戳 time.Duration, dts time.Duration) {
  20. p.起始时间 = time.Now()
  21. p.起始时间戳 = 绝对时间戳
  22. p.起始dts = dts
  23. // println("重置", p.起始时间.Format("2006-01-02 15:04:05"), p.起始时间戳)
  24. }
  25. func (p *流速控制) 根据起始DTS计算绝对时间戳(dts time.Duration) time.Duration {
  26. if dts < p.起始dts {
  27. dts += (1 << 32)
  28. }
  29. return ((dts-p.起始dts)*time.Millisecond + p.起始时间戳*90) / 90
  30. }
  31. func (p *流速控制) 控制流速(绝对时间戳 time.Duration, dts time.Duration) (等待了 time.Duration) {
  32. 数据时间差, 实际时间差 := 绝对时间戳-p.起始时间戳, time.Since(p.起始时间)
  33. // println("数据时间差", 数据时间差, "实际时间差", 实际时间差, "绝对时间戳", 绝对时间戳, "起始时间戳", p.起始时间戳, "起始时间", p.起始时间.Format("2006-01-02 15:04:05"))
  34. // if 实际时间差 > 数据时间差 {
  35. // p.重置(绝对时间戳)
  36. // return
  37. // }
  38. // 如果收到的帧的时间戳超过实际消耗的时间100ms就休息一下,100ms作为一个弹性区间防止频繁调用sleep
  39. if 过快 := (数据时间差 - 实际时间差); 过快 > 100*time.Millisecond {
  40. // fmt.Println("过快毫秒", 过快.Milliseconds())
  41. // println("过快毫秒", p.name, 过快.Milliseconds())
  42. if 过快 > p.等待上限 {
  43. 等待了 = p.等待上限
  44. } else {
  45. 等待了 = 过快
  46. }
  47. time.Sleep(等待了)
  48. } else if 过快 < -100*time.Millisecond {
  49. // fmt.Println("过慢毫秒", 过快.Milliseconds())
  50. // p.重置(绝对时间戳, dts)
  51. // println("过慢毫秒", p.name, 过快.Milliseconds())
  52. }
  53. return
  54. }
  55. type SpesificTrack interface {
  56. CompleteRTP(*AVFrame)
  57. CompleteAVCC(*AVFrame)
  58. WriteSliceBytes([]byte)
  59. WriteRTPFrame(*util.ListItem[RTPFrame])
  60. generateTimestamp(uint32)
  61. WriteSequenceHead([]byte) error
  62. writeAVCCFrame(uint32, *util.BLLReader, *util.BLL) error
  63. GetNALU_SEI() *util.ListItem[util.Buffer]
  64. Flush()
  65. }
  66. type IDRingList struct {
  67. IDRList util.List[*util.Ring[*AVFrame]]
  68. IDRing *util.Ring[*AVFrame]
  69. HistoryRing *util.Ring[*AVFrame]
  70. }
  71. func (p *IDRingList) AddIDR(IDRing *util.Ring[*AVFrame]) {
  72. p.IDRList.PushValue(IDRing)
  73. p.IDRing = IDRing
  74. }
  75. func (p *IDRingList) ShiftIDR() {
  76. p.IDRList.Shift()
  77. p.HistoryRing = p.IDRList.Next.Value
  78. }
  79. // Media 基础媒体Track类
  80. type Media struct {
  81. Base[any, *AVFrame]
  82. BufferTime time.Duration //发布者配置中的缓冲时间(时光回溯)
  83. PayloadType byte
  84. IDRingList `json:"-" yaml:"-"` //最近的关键帧位置,首屏渲染
  85. SSRC uint32
  86. SampleRate uint32
  87. BytesPool util.BytesPool `json:"-" yaml:"-"`
  88. RtpPool util.Pool[RTPFrame] `json:"-" yaml:"-"`
  89. SequenceHead []byte `json:"-" yaml:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
  90. SequenceHeadSeq int
  91. RTPDemuxer
  92. SpesificTrack `json:"-" yaml:"-"`
  93. deltaTs time.Duration //用于接续发布后时间戳连续
  94. iframeReceived bool
  95. 流速控制
  96. }
  97. func (av *Media) GetFromPool(b util.IBytes) (item *util.ListItem[util.Buffer]) {
  98. if b.Reuse() {
  99. item = av.BytesPool.Get(b.Len())
  100. copy(item.Value, b.Bytes())
  101. } else {
  102. return av.BytesPool.GetShell(b.Bytes())
  103. }
  104. return
  105. }
  106. func (av *Media) GetRTPFromPool() (result *util.ListItem[RTPFrame]) {
  107. result = av.RtpPool.Get()
  108. if result.Value.Packet == nil {
  109. result.Value.Packet = &rtp.Packet{}
  110. result.Value.PayloadType = av.PayloadType
  111. result.Value.SSRC = av.SSRC
  112. result.Value.Version = 2
  113. result.Value.Raw = make([]byte, 1460)
  114. }
  115. result.Value.Raw = result.Value.Raw[:1460]
  116. result.Value.Payload = result.Value.Raw[:0]
  117. return
  118. }
  119. // 为json序列化而计算的数据
  120. func (av *Media) SnapForJson() {
  121. v := av.LastValue
  122. if av.RawPart != nil {
  123. av.RawPart = av.RawPart[:0]
  124. } else {
  125. av.RawPart = make([]int, 0, 10)
  126. }
  127. if av.RawSize = v.AUList.ByteLength; av.RawSize > 0 {
  128. r := v.AUList.NewReader()
  129. for b, err := r.ReadByte(); err == nil && len(av.RawPart) < 10; b, err = r.ReadByte() {
  130. av.RawPart = append(av.RawPart, int(b))
  131. }
  132. }
  133. }
  134. func (av *Media) SetSpeedLimit(value time.Duration) {
  135. av.等待上限 = value
  136. }
  137. func (av *Media) SetStuff(stuff ...any) {
  138. // 代表发布者已经离线,该Track成为遗留Track,等待下一任发布者接续发布
  139. for _, s := range stuff {
  140. switch v := s.(type) {
  141. case IPuber:
  142. pubConf := v.GetConfig()
  143. av.BufferTime = pubConf.BufferTime
  144. av.Base.SetStuff(v)
  145. av.Init(256, NewAVFrame)
  146. av.SSRC = uint32(uintptr(unsafe.Pointer(av)))
  147. av.等待上限 = pubConf.SpeedLimit
  148. case uint32:
  149. av.SampleRate = v
  150. case byte:
  151. av.PayloadType = v
  152. case util.BytesPool:
  153. av.BytesPool = v
  154. case SpesificTrack:
  155. av.SpesificTrack = v
  156. case []any:
  157. av.SetStuff(v...)
  158. default:
  159. av.Base.SetStuff(v)
  160. }
  161. }
  162. }
  163. func (av *Media) LastWriteTime() time.Time {
  164. return av.LastValue.WriteTime
  165. }
  166. func (av *Media) CurrentFrame() *AVFrame {
  167. return av.Value
  168. }
  169. func (av *Media) PreFrame() *AVFrame {
  170. return av.LastValue
  171. }
  172. func (av *Media) generateTimestamp(ts uint32) {
  173. av.Value.PTS = time.Duration(ts)
  174. av.Value.DTS = time.Duration(ts)
  175. }
  176. func (av *Media) WriteSequenceHead(sh []byte) {
  177. av.SequenceHead = sh
  178. av.SequenceHeadSeq++
  179. }
  180. func (av *Media) AppendAuBytes(b ...[]byte) {
  181. var au util.BLL
  182. for _, bb := range b {
  183. au.Push(av.BytesPool.GetShell(bb))
  184. }
  185. av.Value.AUList.PushValue(&au)
  186. }
  187. func (av *Media) narrow(gop int) {
  188. if l := av.Size - gop; l > 12 {
  189. if log.Trace {
  190. av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size-5))
  191. }
  192. //缩小缓冲环节省内存
  193. av.Reduce(5)
  194. }
  195. }
  196. func (av *Media) AddIDR() {
  197. if av.BufferTime > 0 {
  198. av.IDRingList.AddIDR(av.Ring)
  199. if av.HistoryRing == nil {
  200. av.HistoryRing = av.IDRing
  201. }
  202. } else {
  203. av.IDRing = av.Ring
  204. }
  205. }
  206. func (av *Media) Flush() {
  207. curValue, preValue, nextValue := av.Value, av.LastValue, av.Next()
  208. useDts := curValue.Timestamp == 0
  209. originDTS := curValue.DTS
  210. if av.State == TrackStateOffline {
  211. av.State = TrackStateOnline
  212. if useDts {
  213. av.deltaTs = curValue.DTS - preValue.DTS
  214. } else {
  215. av.deltaTs = curValue.Timestamp - preValue.Timestamp
  216. }
  217. curValue.DTS = preValue.DTS + 900
  218. curValue.PTS = preValue.PTS + 900
  219. curValue.Timestamp = preValue.Timestamp + time.Millisecond
  220. av.Info("track back online", zap.Duration("delta", av.deltaTs))
  221. } else if av.deltaTs != 0 {
  222. if useDts {
  223. curValue.DTS -= av.deltaTs
  224. curValue.PTS -= av.deltaTs
  225. } else {
  226. rtpts := av.deltaTs * 90 / time.Millisecond
  227. curValue.DTS -= rtpts
  228. curValue.PTS -= rtpts
  229. curValue.Timestamp -= av.deltaTs
  230. }
  231. }
  232. if av.起始时间.IsZero() {
  233. curValue.DeltaTime = 0
  234. if useDts {
  235. curValue.Timestamp = time.Since(av.Publisher.GetStream().GetStartTime())
  236. }
  237. av.重置(curValue.Timestamp, curValue.DTS)
  238. } else {
  239. if useDts {
  240. deltaDts := curValue.DTS - preValue.DTS
  241. if deltaDts > deltaDTSRange || deltaDts < -deltaDTSRange {
  242. // 时间戳跳变,等同于离线重连
  243. av.deltaTs = originDTS - preValue.DTS
  244. curValue.DTS = preValue.DTS + 900
  245. curValue.PTS = preValue.PTS + 900
  246. av.Warn("track dts reset", zap.Int64("delta1", int64(deltaDts)), zap.Int64("delta2", int64(av.deltaTs)))
  247. }
  248. curValue.Timestamp = av.根据起始DTS计算绝对时间戳(curValue.DTS)
  249. }
  250. curValue.DeltaTime = uint32(deltaTS(curValue.Timestamp, preValue.Timestamp) / time.Millisecond)
  251. }
  252. if log.Trace {
  253. av.Trace("write", zap.Uint32("seq", curValue.Sequence), zap.Int64("dts0", int64(preValue.DTS)), zap.Int64("dts1", int64(originDTS)), zap.Uint64("dts2", uint64(curValue.DTS)), zap.Uint32("delta", curValue.DeltaTime), zap.Duration("timestamp", curValue.Timestamp), zap.Int("au", curValue.AUList.Length), zap.Int("rtp", curValue.RTP.Length), zap.Int("avcc", curValue.AVCC.ByteLength), zap.Int("raw", curValue.AUList.ByteLength), zap.Int("bps", av.BPS))
  254. }
  255. bufferTime := av.BufferTime
  256. if bufferTime > 0 && av.IDRingList.IDRList.Length > 1 && deltaTS(curValue.Timestamp, av.IDRingList.IDRList.Next.Next.Value.Value.Timestamp) > bufferTime {
  257. av.ShiftIDR()
  258. av.narrow(int(curValue.Sequence - av.HistoryRing.Value.Sequence))
  259. }
  260. // 下一帧为订阅起始帧,即将覆盖,需要扩环
  261. if nextValue == av.IDRing || nextValue == av.HistoryRing {
  262. // if av.AVRing.Size < 512 {
  263. if log.Trace {
  264. av.Trace("resize", zap.Int("before", av.Size), zap.Int("after", av.Size+5), zap.String("name", av.Name))
  265. }
  266. av.Glow(5)
  267. // } else {
  268. // av.Stream.Error("sub ring overflow", zap.Int("size", av.AVRing.Size), zap.String("name", av.Name))
  269. // }
  270. }
  271. if curValue.AUList.Length > 0 {
  272. // 补完RTP
  273. if config.Global.EnableRTP && curValue.RTP.Length == 0 {
  274. av.CompleteRTP(curValue)
  275. }
  276. // 补完AVCC
  277. if config.Global.EnableAVCC && curValue.AVCC.ByteLength == 0 {
  278. av.CompleteAVCC(curValue)
  279. }
  280. }
  281. av.ComputeBPS(curValue.BytesIn)
  282. av.Step()
  283. if av.等待上限 > 0 {
  284. 等待了 := av.控制流速(curValue.Timestamp, curValue.DTS)
  285. if log.Trace && 等待了 > 0 {
  286. av.Trace("speed control", zap.Duration("sleep", 等待了))
  287. }
  288. }
  289. }
  290. func deltaTS(curTs time.Duration, preTs time.Duration) time.Duration {
  291. if curTs < preTs {
  292. return curTs + (1<<32)*time.Millisecond - preTs
  293. }
  294. return curTs - preTs
  295. }