publisher-rtpdump.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package engine
  2. import (
  3. "os"
  4. "sync"
  5. "time"
  6. "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
  7. "github.com/pion/webrtc/v3/pkg/media/rtpdump"
  8. "go.uber.org/zap"
  9. "m7s.live/engine/v4/codec"
  10. "m7s.live/engine/v4/common"
  11. "m7s.live/engine/v4/track"
  12. "m7s.live/engine/v4/util"
  13. )
  14. type RTPDumpPublisher struct {
  15. Publisher
  16. VCodec codec.VideoCodecID
  17. ACodec codec.AudioCodecID
  18. VPayloadType uint8
  19. APayloadType uint8
  20. other rtpdump.Packet
  21. sync.Mutex
  22. }
  23. func (t *RTPDumpPublisher) Feed(file *os.File) {
  24. r, h, err := rtpdump.NewReader(file)
  25. if err != nil {
  26. t.Stream.Error("RTPDumpPublisher open file error", zap.Error(err))
  27. return
  28. }
  29. t.Lock()
  30. t.Stream.Info("RTPDumpPublisher open file success", zap.String("file", file.Name()), zap.String("start", h.Start.String()), zap.String("source", h.Source.String()), zap.Uint16("port", h.Port))
  31. if t.VideoTrack == nil {
  32. switch t.VCodec {
  33. case codec.CodecID_H264:
  34. t.VideoTrack = track.NewH264(t.Publisher.Stream, t.VPayloadType)
  35. case codec.CodecID_H265:
  36. t.VideoTrack = track.NewH265(t.Publisher.Stream, t.VPayloadType)
  37. }
  38. if t.VideoTrack != nil {
  39. t.VideoTrack.SetSpeedLimit(500 * time.Millisecond)
  40. }
  41. }
  42. if t.AudioTrack == nil {
  43. switch t.ACodec {
  44. case codec.CodecID_AAC:
  45. at := track.NewAAC(t.Publisher.Stream, t.APayloadType)
  46. t.AudioTrack = at
  47. var c mpeg4audio.Config
  48. c.ChannelCount = 2
  49. c.SampleRate = 48000
  50. asc, _ := c.Marshal()
  51. at.WriteSequenceHead(append([]byte{0xAF, 0x00}, asc...))
  52. case codec.CodecID_PCMA:
  53. t.AudioTrack = track.NewG711(t.Publisher.Stream, true, t.APayloadType)
  54. case codec.CodecID_PCMU:
  55. t.AudioTrack = track.NewG711(t.Publisher.Stream, false, t.APayloadType)
  56. }
  57. if t.AudioTrack != nil {
  58. t.AudioTrack.SetSpeedLimit(500 * time.Millisecond)
  59. }
  60. }
  61. t.Unlock()
  62. needLock := true
  63. for {
  64. packet, err := r.Next()
  65. if err != nil {
  66. t.Stream.Error("RTPDumpPublisher read file error", zap.Error(err))
  67. return
  68. }
  69. if packet.IsRTCP {
  70. continue
  71. }
  72. if needLock {
  73. t.Lock()
  74. }
  75. if t.other.Payload == nil {
  76. t.other = packet
  77. t.Unlock()
  78. needLock = true
  79. continue
  80. }
  81. if packet.Offset >= t.other.Offset {
  82. t.WriteRTP(t.other.Payload)
  83. t.other = packet
  84. t.Unlock()
  85. needLock = true
  86. continue
  87. }
  88. needLock = false
  89. t.WriteRTP(packet.Payload)
  90. }
  91. }
  92. func (t *RTPDumpPublisher) WriteRTP(raw []byte) {
  93. var frame common.RTPFrame
  94. frame.Unmarshal(raw)
  95. switch frame.PayloadType {
  96. case t.VPayloadType:
  97. t.VideoTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame})
  98. case t.APayloadType:
  99. t.AudioTrack.WriteRTP(&util.ListItem[common.RTPFrame]{Value: frame})
  100. default:
  101. t.Stream.Warn("RTPDumpPublisher unknown payload type", zap.Uint8("payloadType", frame.PayloadType))
  102. }
  103. }