rtp.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package track
  2. import (
  3. "time"
  4. "github.com/pion/rtp"
  5. "go.uber.org/zap"
  6. . "m7s.live/engine/v4/common"
  7. "m7s.live/engine/v4/util"
  8. )
  9. const RTPMTU = 1400
  10. // WriteRTPPack 写入已反序列化的RTP包,已经排序过了的
  11. func (av *Media) WriteRTPPack(p *rtp.Packet) {
  12. var frame RTPFrame
  13. p.SSRC = av.SSRC
  14. p.Padding = false
  15. p.PaddingSize = 0
  16. frame.Packet = p
  17. av.Value.BytesIn += len(frame.Payload) + 12
  18. av.lastSeq2 = av.lastSeq
  19. av.lastSeq = frame.SequenceNumber
  20. av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
  21. if len(p.Payload) > 0 {
  22. av.WriteRTPFrame(util.NewListItem(frame))
  23. }
  24. }
  25. // WriteRTPFrame 写入未反序列化的RTP包, 未排序的
  26. func (av *Media) WriteRTP(raw *util.ListItem[RTPFrame]) {
  27. for frame := av.recorderRTP(raw); frame != nil; frame = av.nextRTPFrame() {
  28. frame.Value.SSRC = av.SSRC
  29. av.Value.BytesIn += len(frame.Value.Payload) + 12
  30. av.DropCount += int(av.lastSeq - av.lastSeq2 - 1)
  31. if len(frame.Value.Payload) > 0 {
  32. av.WriteRTPFrame(frame)
  33. // av.Info("rtp", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Int("len", len(frame.Value.Payload)), zap.Bool("marker", frame.Value.Marker), zap.Uint16("seq", frame.Value.SequenceNumber))
  34. } else {
  35. av.Debug("rtp payload is empty", zap.Uint32("ts", (frame.Value.Timestamp)), zap.Any("ext", frame.Value.GetExtensionIDs()), zap.Uint16("seq", frame.Value.SequenceNumber))
  36. frame.Recycle()
  37. }
  38. }
  39. }
  40. // https://www.cnblogs.com/moonwalk/p/15903760.html
  41. // Packetize packetizes the payload of an RTP packet and returns one or more RTP packets
  42. func (av *Media) PacketizeRTP(payloads ...[][]byte) {
  43. var rtpItem *util.ListItem[RTPFrame]
  44. for _, pp := range payloads {
  45. rtpItem = av.GetRTPFromPool()
  46. packet := &rtpItem.Value
  47. br := util.LimitBuffer{Buffer: packet.Payload}
  48. if av.SampleRate != 90000 {
  49. packet.Timestamp = uint32(time.Duration(av.SampleRate) * av.Value.PTS / 90000)
  50. } else {
  51. packet.Timestamp = uint32(av.Value.PTS)
  52. }
  53. packet.Marker = false
  54. for _, p := range pp {
  55. if _, err := br.Write(p); err != nil {
  56. av.Error("rtp payload write error", zap.Error(err))
  57. for i, pp := range payloads {
  58. for j, p := range pp {
  59. av.Error("rtp payload", zap.Int("i", i), zap.Int("j", j), zap.Int("len", len(p)))
  60. }
  61. }
  62. return
  63. }
  64. }
  65. packet.Payload = br.Bytes()
  66. av.Value.RTP.Push(rtpItem)
  67. }
  68. // 最后一个rtp包标记为true
  69. rtpItem.Value.Marker = true
  70. }
  71. type RTPDemuxer struct {
  72. lastSeq uint16 //上一个rtp包的序号
  73. lastSeq2 uint16 //上上一个rtp包的序号
  74. 乱序重排 util.RTPReorder[*util.ListItem[RTPFrame]]
  75. }
  76. // 获取缓存中下一个rtpFrame
  77. func (av *RTPDemuxer) nextRTPFrame() (frame *util.ListItem[RTPFrame]) {
  78. frame = av.乱序重排.Pop()
  79. if frame == nil {
  80. return
  81. }
  82. av.lastSeq2 = av.lastSeq
  83. av.lastSeq = frame.Value.SequenceNumber
  84. return
  85. }
  86. // 对RTP包乱序重排
  87. func (av *RTPDemuxer) recorderRTP(item *util.ListItem[RTPFrame]) (frame *util.ListItem[RTPFrame]) {
  88. frame = av.乱序重排.Push(item.Value.SequenceNumber, item)
  89. if frame == nil {
  90. return
  91. }
  92. av.lastSeq2 = av.lastSeq
  93. av.lastSeq = frame.Value.SequenceNumber
  94. return
  95. }