publisher.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package webrtc
  2. import (
  3. "sync/atomic"
  4. "time"
  5. "github.com/pion/rtcp"
  6. . "github.com/pion/webrtc/v4"
  7. "go.uber.org/zap"
  8. . "m7s.live/engine/v4"
  9. . "m7s.live/engine/v4/track"
  10. )
  11. type WebRTCPublisher struct {
  12. Publisher
  13. WebRTCIO
  14. audioTrack atomic.Pointer[TrackRemote]
  15. videoTrack atomic.Pointer[TrackRemote]
  16. }
  17. func (puber *WebRTCPublisher) OnEvent(event any) {
  18. switch event.(type) {
  19. case IPublisher:
  20. puber.OnTrack(puber.onTrack)
  21. }
  22. puber.Publisher.OnEvent(event)
  23. }
  24. func (puber *WebRTCPublisher) onTrack(track *TrackRemote, receiver *RTPReceiver) {
  25. puber.Info("onTrack", zap.String("kind", track.Kind().String()), zap.Uint8("payloadType", uint8(track.Codec().PayloadType)))
  26. if codec := track.Codec(); track.Kind() == RTPCodecTypeAudio {
  27. puber.audioTrack.Store(track)
  28. if puber.AudioTrack == nil {
  29. switch codec.PayloadType {
  30. case 111:
  31. puber.AudioTrack = NewOpus(puber.Stream)
  32. case 8:
  33. puber.AudioTrack = NewG711(puber.Stream, true)
  34. case 0:
  35. puber.AudioTrack = NewG711(puber.Stream, false)
  36. default:
  37. puber.AudioTrack = nil
  38. puber.Config.PubAudio = false
  39. return
  40. }
  41. }
  42. for {
  43. if puber.audioTrack.Load() != track {
  44. return
  45. }
  46. rtpItem := puber.AudioTrack.GetRTPFromPool()
  47. if i, _, err := track.Read(rtpItem.Value.Raw); err == nil {
  48. rtpItem.Value.Unmarshal(rtpItem.Value.Raw[:i])
  49. puber.AudioTrack.WriteRTP(rtpItem)
  50. } else {
  51. puber.Info("track stop", zap.String("kind", track.Kind().String()), zap.Error(err))
  52. rtpItem.Recycle()
  53. return
  54. }
  55. }
  56. } else {
  57. puber.videoTrack.Store(track)
  58. if puber.VideoTrack == nil {
  59. switch codec.PayloadType {
  60. case 45:
  61. puber.VideoTrack = NewAV1(puber.Stream, byte(codec.PayloadType))
  62. default:
  63. puber.VideoTrack = NewH264(puber.Stream, byte(codec.PayloadType))
  64. }
  65. }
  66. go puber.writeRTCP(track)
  67. for {
  68. if puber.videoTrack.Load() != track {
  69. return
  70. }
  71. rtpItem := puber.VideoTrack.GetRTPFromPool()
  72. if i, _, err := track.Read(rtpItem.Value.Raw); err == nil {
  73. rtpItem.Value.Unmarshal(rtpItem.Value.Raw[:i])
  74. if rtpItem.Value.Extension {
  75. for _, id := range rtpItem.Value.GetExtensionIDs() {
  76. puber.Debug("extension", zap.Uint8("id", id), zap.Binary("value", rtpItem.Value.GetExtension(id)))
  77. }
  78. }
  79. puber.VideoTrack.WriteRTP(rtpItem)
  80. } else {
  81. puber.Info("track stop", zap.String("kind", track.Kind().String()), zap.Error(err))
  82. rtpItem.Recycle()
  83. return
  84. }
  85. }
  86. }
  87. }
  88. func (puber *WebRTCPublisher) writeRTCP(track *TrackRemote) {
  89. ticker := time.NewTicker(webrtcConfig.PLI)
  90. defer ticker.Stop()
  91. for {
  92. select {
  93. case <-ticker.C:
  94. if puber.videoTrack.Load() != track {
  95. return
  96. }
  97. if rtcpErr := puber.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil {
  98. puber.Error("writeRTCP", zap.Error(rtcpErr))
  99. return
  100. }
  101. case <-puber.Done():
  102. return
  103. }
  104. }
  105. }