123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- package webrtc
- import (
- "sync/atomic"
- "time"
- "github.com/pion/rtcp"
- . "github.com/pion/webrtc/v4"
- "go.uber.org/zap"
- . "m7s.live/engine/v4"
- . "m7s.live/engine/v4/track"
- )
- type WebRTCPublisher struct {
- Publisher
- WebRTCIO
- audioTrack atomic.Pointer[TrackRemote]
- videoTrack atomic.Pointer[TrackRemote]
- }
- func (puber *WebRTCPublisher) OnEvent(event any) {
- switch event.(type) {
- case IPublisher:
- puber.OnTrack(puber.onTrack)
- }
- puber.Publisher.OnEvent(event)
- }
- func (puber *WebRTCPublisher) onTrack(track *TrackRemote, receiver *RTPReceiver) {
- puber.Info("onTrack", zap.String("kind", track.Kind().String()), zap.Uint8("payloadType", uint8(track.Codec().PayloadType)))
- if codec := track.Codec(); track.Kind() == RTPCodecTypeAudio {
- puber.audioTrack.Store(track)
- if puber.AudioTrack == nil {
- switch codec.PayloadType {
- case 111:
- puber.AudioTrack = NewOpus(puber.Stream)
- case 8:
- puber.AudioTrack = NewG711(puber.Stream, true)
- case 0:
- puber.AudioTrack = NewG711(puber.Stream, false)
- default:
- puber.AudioTrack = nil
- puber.Config.PubAudio = false
- return
- }
- }
- for {
- if puber.audioTrack.Load() != track {
- return
- }
- rtpItem := puber.AudioTrack.GetRTPFromPool()
- if i, _, err := track.Read(rtpItem.Value.Raw); err == nil {
- rtpItem.Value.Unmarshal(rtpItem.Value.Raw[:i])
- puber.AudioTrack.WriteRTP(rtpItem)
- } else {
- puber.Info("track stop", zap.String("kind", track.Kind().String()), zap.Error(err))
- rtpItem.Recycle()
- return
- }
- }
- } else {
- puber.videoTrack.Store(track)
- if puber.VideoTrack == nil {
- switch codec.PayloadType {
- case 45:
- puber.VideoTrack = NewAV1(puber.Stream, byte(codec.PayloadType))
- default:
- puber.VideoTrack = NewH264(puber.Stream, byte(codec.PayloadType))
- }
- }
- go puber.writeRTCP(track)
- for {
- if puber.videoTrack.Load() != track {
- return
- }
- rtpItem := puber.VideoTrack.GetRTPFromPool()
- if i, _, err := track.Read(rtpItem.Value.Raw); err == nil {
- rtpItem.Value.Unmarshal(rtpItem.Value.Raw[:i])
- if rtpItem.Value.Extension {
- for _, id := range rtpItem.Value.GetExtensionIDs() {
- puber.Debug("extension", zap.Uint8("id", id), zap.Binary("value", rtpItem.Value.GetExtension(id)))
- }
- }
- puber.VideoTrack.WriteRTP(rtpItem)
- } else {
- puber.Info("track stop", zap.String("kind", track.Kind().String()), zap.Error(err))
- rtpItem.Recycle()
- return
- }
- }
- }
- }
- func (puber *WebRTCPublisher) writeRTCP(track *TrackRemote) {
- ticker := time.NewTicker(webrtcConfig.PLI)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- if puber.videoTrack.Load() != track {
- return
- }
- if rtcpErr := puber.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil {
- puber.Error("writeRTCP", zap.Error(rtcpErr))
- return
- }
- case <-puber.Done():
- return
- }
- }
- }
|