subscriber.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package webrtc
  2. import (
  3. "fmt"
  4. "strings"
  5. "github.com/pion/rtcp"
  6. . "github.com/pion/webrtc/v4"
  7. "go.uber.org/zap"
  8. . "m7s.live/engine/v4"
  9. "m7s.live/engine/v4/codec"
  10. "m7s.live/engine/v4/track"
  11. "m7s.live/engine/v4/util"
  12. )
  13. type trackSender struct {
  14. *TrackLocalStaticRTP
  15. *RTPSender
  16. // seq uint32
  17. }
  18. type WebRTCSubscriber struct {
  19. Subscriber
  20. WebRTCIO
  21. audio trackSender
  22. video trackSender
  23. DC *DataChannel
  24. videoTracks []*track.Video
  25. audioTracks []*track.Audio
  26. // flvHeadCache []byte
  27. }
  28. func (suber *WebRTCSubscriber) queueDCData(data ...[]byte) (err error) {
  29. for _, d := range data {
  30. if err = suber.DC.Send(d); err != nil {
  31. return
  32. }
  33. }
  34. return
  35. }
  36. func (suber *WebRTCSubscriber) createDataChannel() {
  37. if suber.DC != nil {
  38. return
  39. }
  40. suber.DC, _ = suber.PeerConnection.CreateDataChannel(suber.Subscriber.Stream.Path, nil)
  41. // suber.flvHeadCache = make([]byte, 15)
  42. }
  43. // func (suber *WebRTCSubscriber) sendAvByDatachannel(t byte, reader *track.AVRingReader) {
  44. // suber.flvHeadCache[0] = t
  45. // frame := reader.Frame
  46. // dataSize := uint32(frame.AVCC.ByteLength)
  47. // result := net.Buffers{suber.flvHeadCache[:11]}
  48. // result = append(result, frame.AVCC.ToBuffers()...)
  49. // ts := reader.AbsTime
  50. // util.PutBE(suber.flvHeadCache[1:4], dataSize)
  51. // util.PutBE(suber.flvHeadCache[4:7], ts)
  52. // suber.flvHeadCache[7] = byte(ts >> 24)
  53. // result = append(result, util.PutBE(suber.flvHeadCache[11:15], dataSize+11))
  54. // for _, data := range util.SplitBuffers(result, 65535) {
  55. // for _, d := range data {
  56. // suber.queueDCData(d)
  57. // }
  58. // }
  59. // }
  60. func (suber *WebRTCSubscriber) OnSubscribe() {
  61. vm := make(map[codec.VideoCodecID]*track.Video)
  62. am := make(map[codec.AudioCodecID]*track.Audio)
  63. for _, track := range suber.videoTracks {
  64. vm[track.CodecID] = track
  65. }
  66. for _, track := range suber.audioTracks {
  67. am[track.CodecID] = track
  68. }
  69. if (vm[codec.CodecID_H264] != nil || vm[codec.CodecID_AV1] != nil || vm[codec.CodecID_H265] == nil) && (am[codec.CodecID_PCMA] != nil || am[codec.CodecID_PCMU] != nil || am[codec.CodecID_AAC] == nil) {
  70. video := vm[codec.CodecID_H264]
  71. if video != nil {
  72. suber.Subscriber.AddTrack(video)
  73. pli := fmt.Sprintf("%x", video.SPS[1:4])
  74. // pli := "42001f"
  75. if !strings.Contains(suber.SDP, pli) {
  76. list := reg_level.FindAllStringSubmatch(suber.SDP, -1)
  77. if len(list) > 0 {
  78. pli = list[0][1]
  79. }
  80. }
  81. suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, video.Name, suber.Subscriber.Stream.Path)
  82. } else if video = vm[codec.CodecID_AV1]; video != nil {
  83. suber.Subscriber.AddTrack(video)
  84. suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeAV1, SDPFmtpLine: fmt.Sprintf("profile=%d;level-idx=%d;tier=%d", video.ParamaterSets[1][1], video.ParamaterSets[1][0], video.ParamaterSets[1][2])}, video.Name, suber.Subscriber.Stream.Path)
  85. }
  86. if suber.video.TrackLocalStaticRTP != nil {
  87. suber.video.RTPSender, _ = suber.PeerConnection.AddTrack(suber.video.TrackLocalStaticRTP)
  88. go func() {
  89. rtcpBuf := make([]byte, 1500)
  90. for {
  91. if n, _, rtcpErr := suber.video.Read(rtcpBuf); rtcpErr != nil {
  92. suber.Warn("rtcp read error", zap.Error(rtcpErr))
  93. return
  94. } else {
  95. if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil {
  96. for _, pp := range p {
  97. switch pp.(type) {
  98. case *rtcp.PictureLossIndication:
  99. // fmt.Println("PictureLossIndication")
  100. }
  101. }
  102. }
  103. }
  104. }
  105. }()
  106. }
  107. var audio *track.Audio
  108. audioMimeType := MimeTypePCMA
  109. if am[codec.CodecID_PCMA] != nil {
  110. audio = am[codec.CodecID_PCMA]
  111. } else if am[codec.CodecID_PCMU] != nil {
  112. audioMimeType = MimeTypePCMU
  113. audio = am[codec.CodecID_PCMU]
  114. } else if am[codec.CodecID_OPUS] != nil {
  115. audioMimeType = MimeTypeOpus
  116. audio = am[codec.CodecID_OPUS]
  117. }
  118. if audio != nil {
  119. suber.Subscriber.AddTrack(audio)
  120. suber.audio.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, audio.Name, suber.Subscriber.Stream.Path)
  121. if suber.audio.TrackLocalStaticRTP != nil {
  122. suber.audio.RTPSender, _ = suber.PeerConnection.AddTrack(suber.audio.TrackLocalStaticRTP)
  123. }
  124. }
  125. } else {
  126. suber.createDataChannel()
  127. if len(suber.videoTracks) > 0 {
  128. suber.Subscriber.AddTrack(suber.videoTracks[0])
  129. }
  130. if len(suber.audioTracks) > 0 {
  131. suber.Subscriber.AddTrack(suber.audioTracks[0])
  132. }
  133. }
  134. }
  135. func (suber *WebRTCSubscriber) OnEvent(event any) {
  136. var err error
  137. switch v := event.(type) {
  138. case *track.Video:
  139. suber.videoTracks = append(suber.videoTracks, v)
  140. // switch v.CodecID {
  141. // case codec.CodecID_H264:
  142. // pli := fmt.Sprintf("%x", v.SPS[1:4])
  143. // // pli := "42001f"
  144. // if !strings.Contains(suber.SDP, pli) {
  145. // list := reg_level.FindAllStringSubmatch(suber.SDP, -1)
  146. // if len(list) > 0 {
  147. // pli = list[0][1]
  148. // }
  149. // }
  150. // suber.video.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH264, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, v.Name, suber.Subscriber.Stream.Path)
  151. // case codec.CodecID_H265:
  152. // suber.createDataChannel()
  153. // // suber.videoTrack, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: MimeTypeH265, SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=" + pli}, "video", suber.Subscriber.Stream.Path)
  154. // default:
  155. // return
  156. // }
  157. // suber.Subscriber.AddTrack(v) //接受这个track
  158. case *track.Audio:
  159. // audioMimeType := MimeTypePCMA
  160. // if v.CodecID == codec.CodecID_PCMU {
  161. // audioMimeType = MimeTypePCMU
  162. // }
  163. // switch v.CodecID {
  164. // case codec.CodecID_AAC:
  165. // suber.createDataChannel()
  166. // case codec.CodecID_PCMA, codec.CodecID_PCMU:
  167. // suber.audio.TrackLocalStaticRTP, _ = NewTrackLocalStaticRTP(RTPCodecCapability{MimeType: audioMimeType}, v.Name, suber.Subscriber.Stream.Path)
  168. // //suber.audio.RTPSender, _ = suber.PeerConnection.AddTrack(suber.audio.TrackLocalStaticRTP)
  169. // }
  170. // suber.Subscriber.AddTrack(v) //接受这个track
  171. suber.audioTracks = append(suber.audioTracks, v)
  172. // case VideoDeConf:
  173. // if suber.DC != nil {
  174. // suber.queueDCData(codec.VideoAVCC2FLV(0, v)...)
  175. // }
  176. // case AudioDeConf:
  177. // if suber.DC != nil {
  178. // suber.queueDCData(codec.AudioAVCC2FLV(0, v)...)
  179. // }
  180. case VideoRTP:
  181. // if suber.video.TrackLocalStaticRTP != nil {
  182. if err = suber.video.WriteRTP(v.Packet); err != nil {
  183. suber.Stop(zap.Error(err))
  184. return
  185. }
  186. // } else if suber.DC != nil && suber.VideoReader.Frame.Sequence != suber.video.seq {
  187. // suber.video.seq = suber.VideoReader.Frame.Sequence
  188. // suber.sendAvByDatachannel(9, suber.VideoReader)
  189. // }
  190. case AudioRTP:
  191. // if suber.audio.TrackLocalStaticRTP != nil {
  192. if err = suber.audio.WriteRTP(v.Packet); err != nil {
  193. suber.Stop(zap.Error(err))
  194. return
  195. }
  196. // } else if suber.DC != nil && suber.AudioReader.Frame.Sequence != suber.audio.seq {
  197. // suber.audio.seq = suber.AudioReader.Frame.Sequence
  198. // suber.sendAvByDatachannel(8, suber.AudioReader)
  199. // }
  200. case FLVFrame:
  201. for _, data := range util.SplitBuffers(v, 65535) {
  202. if err = suber.queueDCData(data...); err != nil {
  203. suber.Stop(zap.Error(err))
  204. return
  205. }
  206. }
  207. case ISubscriber:
  208. suber.OnSubscribe()
  209. if suber.DC != nil {
  210. suber.DC.OnOpen(func() {
  211. suber.DC.Send(codec.FLVHeader)
  212. go func() {
  213. suber.PlayFLV()
  214. suber.DC.Close()
  215. suber.PeerConnection.Close()
  216. }()
  217. })
  218. }
  219. suber.OnConnectionStateChange(func(pcs PeerConnectionState) {
  220. suber.Info("Connection State has changed:" + pcs.String())
  221. switch pcs {
  222. case PeerConnectionStateConnected:
  223. if suber.DC == nil {
  224. go func() {
  225. suber.PlayRTP()
  226. suber.PeerConnection.Close()
  227. }()
  228. }
  229. case PeerConnectionStateDisconnected, PeerConnectionStateFailed, PeerConnectionStateClosed:
  230. suber.Stop(zap.String("reason", pcs.String()))
  231. }
  232. })
  233. default:
  234. suber.Subscriber.OnEvent(event)
  235. }
  236. }
  237. type WebRTCBatchSubscriber struct {
  238. WebRTCSubscriber
  239. OnPlayDone func()
  240. }
  241. func (suber *WebRTCBatchSubscriber) OnEvent(event any) {
  242. switch event.(type) {
  243. case ISubscriber:
  244. suber.OnSubscribe()
  245. default:
  246. suber.WebRTCSubscriber.OnEvent(event)
  247. }
  248. }