123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- package webrtc
- import (
- "fmt"
- "strings"
- "github.com/pion/rtcp"
- . "github.com/pion/webrtc/v4"
- "go.uber.org/zap"
- . "m7s.live/engine/v4"
- "m7s.live/engine/v4/codec"
- "m7s.live/engine/v4/track"
- "m7s.live/engine/v4/util"
- )
- type trackSender struct {
- *TrackLocalStaticRTP
- *RTPSender
- // seq uint32
- }
- type WebRTCSubscriber struct {
- Subscriber
- WebRTCIO
- audio trackSender
- video trackSender
- DC *DataChannel
- videoTracks []*track.Video
- audioTracks []*track.Audio
- // flvHeadCache []byte
- }
- func (suber *WebRTCSubscriber) queueDCData(data ...[]byte) (err error) {
- for _, d := range data {
- if err = suber.DC.Send(d); err != nil {
- return
- }
- }
- return
- }
- func (suber *WebRTCSubscriber) createDataChannel() {
- if suber.DC != nil {
- return
- }
- suber.DC, _ = suber.PeerConnection.CreateDataChannel(suber.Subscriber.Stream.Path, nil)
- // suber.flvHeadCache = make([]byte, 15)
- }
- // func (suber *WebRTCSubscriber) sendAvByDatachannel(t byte, reader *track.AVRingReader) {
- // suber.flvHeadCache[0] = t
- // frame := reader.Frame
- // dataSize := uint32(frame.AVCC.ByteLength)
- // result := net.Buffers{suber.flvHeadCache[:11]}
- // result = append(result, frame.AVCC.ToBuffers()...)
- // ts := reader.AbsTime
- // util.PutBE(suber.flvHeadCache[1:4], dataSize)
- // util.PutBE(suber.flvHeadCache[4:7], ts)
- // suber.flvHeadCache[7] = byte(ts >> 24)
- // result = append(result, util.PutBE(suber.flvHeadCache[11:15], dataSize+11))
- // for _, data := range util.SplitBuffers(result, 65535) {
- // for _, d := range data {
- // suber.queueDCData(d)
- // }
- // }
- // }
- func (suber *WebRTCSubscriber) OnSubscribe() {
- vm := make(map[codec.VideoCodecID]*track.Video)
- am := make(map[codec.AudioCodecID]*track.Audio)
- for _, track := range suber.videoTracks {
- vm[track.CodecID] = track
- }
- for _, track := range suber.audioTracks {
- am[track.CodecID] = track
- }
- if (vm[codec.CodecID_H264] != nil || vm[codec.CodecID_AV1] != nil || vm[codec.CodecID_H265] == nil) && (am[codec.CodecID_PCMA] != nil || am[codec.CodecID_PCMU] != nil || am[codec.CodecID_AAC] == nil) {
- video := vm[codec.CodecID_H264]
- if video != nil {
- suber.Subscriber.AddTrack(video)
- pli := fmt.Sprintf("%x", video.SPS[1:4])
- // pli := "42001f"
- if !strings.Contains(suber.SDP, pli) {
- list := reg_level.FindAllStringSubmatch(suber.SDP, -1)
- if len(list) > 0 {
- pli = list[0][1]
- }
- }
- suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, video.Name, suber.Subscriber.Stream.Path)
- } else if video = vm[codec.CodecID_AV1]; video != nil {
- suber.Subscriber.AddTrack(video)
- suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeAV1, SDPFmtpLine: fmt.Sprintf("profile=%d;level-idx=%d;tier=%d", video.ParamaterSets[1][1], video.ParamaterSets[1][0], video.ParamaterSets[1][2])}, video.Name, suber.Subscriber.Stream.Path)
- }
- if suber.video.TrackLocalStaticRTP != nil {
- suber.video.RTPSender, _ = suber.PeerConnection.AddTrack(suber.video.TrackLocalStaticRTP)
- go func() {
- rtcpBuf := make([]byte, 1500)
- for {
- if n, _, rtcpErr := suber.video.Read(rtcpBuf); rtcpErr != nil {
- suber.Warn("rtcp read error", zap.Error(rtcpErr))
- return
- } else {
- if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil {
- for _, pp := range p {
- switch pp.(type) {
- case *rtcp.PictureLossIndication:
- // fmt.Println("PictureLossIndication")
- }
- }
- }
- }
- }
- }()
- }
- var audio *track.Audio
- audioMimeType := MimeTypePCMA
- if am[codec.CodecID_PCMA] != nil {
- audio = am[codec.CodecID_PCMA]
- } else if am[codec.CodecID_PCMU] != nil {
- audioMimeType = MimeTypePCMU
- audio = am[codec.CodecID_PCMU]
- } else if am[codec.CodecID_OPUS] != nil {
- audioMimeType = MimeTypeOpus
- audio = am[codec.CodecID_OPUS]
- }
- if audio != nil {
- suber.Subscriber.AddTrack(audio)
- suber.audio.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, audio.Name, suber.Subscriber.Stream.Path)
- if suber.audio.TrackLocalStaticRTP != nil {
- suber.audio.RTPSender, _ = suber.PeerConnection.AddTrack(suber.audio.TrackLocalStaticRTP)
- }
- }
- } else {
- suber.createDataChannel()
- if len(suber.videoTracks) > 0 {
- suber.Subscriber.AddTrack(suber.videoTracks[0])
- }
- if len(suber.audioTracks) > 0 {
- suber.Subscriber.AddTrack(suber.audioTracks[0])
- }
- }
- }
- func (suber *WebRTCSubscriber) OnEvent(event any) {
- var err error
- switch v := event.(type) {
- case *track.Video:
- suber.videoTracks = append(suber.videoTracks, v)
- // switch v.CodecID {
- // case codec.CodecID_H264:
- // pli := fmt.Sprintf("%x", v.SPS[1:4])
- // // pli := "42001f"
- // if !strings.Contains(suber.SDP, pli) {
- // list := reg_level.FindAllStringSubmatch(suber.SDP, -1)
- // if len(list) > 0 {
- // pli = list[0][1]
- // }
- // }
- // suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, v.Name, suber.Subscriber.Stream.Path)
- // case codec.CodecID_H265:
- // suber.createDataChannel()
- // // suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH265, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path)
- // default:
- // return
- // }
- // suber.Subscriber.AddTrack(v) //接受这个track
- case *track.Audio:
- // audioMimeType := MimeTypePCMA
- // if v.CodecID == codec.CodecID_PCMU {
- // audioMimeType = MimeTypePCMU
- // }
- // switch v.CodecID {
- // case codec.CodecID_AAC:
- // suber.createDataChannel()
- // case codec.CodecID_PCMA, codec.CodecID_PCMU:
- // suber.audio.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, v.Name, suber.Subscriber.Stream.Path)
- // //suber.audio.RTPSender, _ = suber.PeerConnection.AddTrack(suber.audio.TrackLocalStaticRTP)
- // }
- // suber.Subscriber.AddTrack(v) //接受这个track
- suber.audioTracks = append(suber.audioTracks, v)
- // case VideoDeConf:
- // if suber.DC != nil {
- // suber.queueDCData(codec.VideoAVCC2FLV(0, v)...)
- // }
- // case AudioDeConf:
- // if suber.DC != nil {
- // suber.queueDCData(codec.AudioAVCC2FLV(0, v)...)
- // }
- case VideoRTP:
- // if suber.video.TrackLocalStaticRTP != nil {
- if err = suber.video.WriteRTP(v.Packet); err != nil {
- suber.Stop(zap.Error(err))
- return
- }
- // } else if suber.DC != nil && suber.VideoReader.Frame.Sequence != suber.video.seq {
- // suber.video.seq = suber.VideoReader.Frame.Sequence
- // suber.sendAvByDatachannel(9, suber.VideoReader)
- // }
- case AudioRTP:
- // if suber.audio.TrackLocalStaticRTP != nil {
- if err = suber.audio.WriteRTP(v.Packet); err != nil {
- suber.Stop(zap.Error(err))
- return
- }
- // } else if suber.DC != nil && suber.AudioReader.Frame.Sequence != suber.audio.seq {
- // suber.audio.seq = suber.AudioReader.Frame.Sequence
- // suber.sendAvByDatachannel(8, suber.AudioReader)
- // }
- case FLVFrame:
- for _, data := range util.SplitBuffers(v, 65535) {
- if err = suber.queueDCData(data...); err != nil {
- suber.Stop(zap.Error(err))
- return
- }
- }
- case ISubscriber:
- suber.OnSubscribe()
- if suber.DC != nil {
- suber.DC.OnOpen(func() {
- suber.DC.Send(codec.FLVHeader)
- go func() {
- suber.PlayFLV()
- suber.DC.Close()
- suber.PeerConnection.Close()
- }()
- })
- }
- suber.OnConnectionStateChange(func(pcs PeerConnectionState) {
- suber.Info("Connection State has changed:" + pcs.String())
- switch pcs {
- case PeerConnectionStateConnected:
- if suber.DC == nil {
- go func() {
- suber.PlayRTP()
- suber.PeerConnection.Close()
- }()
- }
- case PeerConnectionStateDisconnected, PeerConnectionStateFailed, PeerConnectionStateClosed:
- suber.Stop(zap.String("reason", pcs.String()))
- }
- })
- default:
- suber.Subscriber.OnEvent(event)
- }
- }
- type WebRTCBatchSubscriber struct {
- WebRTCSubscriber
- OnPlayDone func()
- }
- func (suber *WebRTCBatchSubscriber) OnEvent(event any) {
- switch event.(type) {
- case ISubscriber:
- suber.OnSubscribe()
- default:
- suber.WebRTCSubscriber.OnEvent(event)
- }
- }
|