batcher.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package webrtc
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. . "github.com/pion/webrtc/v4"
  6. "go.uber.org/zap"
  7. "m7s.live/engine/v4/codec"
  8. "m7s.live/engine/v4/util"
  9. )
  10. type Signal struct {
  11. Type string `json:"type"`
  12. StreamList []string `json:"streamList"`
  13. Offer string `json:"offer"`
  14. Answer string `json:"answer"`
  15. StreamPath string `json:"streamPath"`
  16. }
  17. type SignalStreamPath struct {
  18. Type string `json:"type"`
  19. StreamPath string `json:"streamPath"`
  20. }
  21. func NewRemoveSingal(streamPath string) string {
  22. s := SignalStreamPath{
  23. Type: "remove",
  24. StreamPath: streamPath,
  25. }
  26. b, _ := json.Marshal(s)
  27. return string(b)
  28. }
  29. type SignalSDP struct {
  30. Type string `json:"type"`
  31. SDP string `json:"sdp"`
  32. }
  33. func NewAnswerSingal(sdp string) string {
  34. s := SignalSDP{
  35. Type: "answer",
  36. SDP: sdp,
  37. }
  38. b, _ := json.Marshal(s)
  39. return string(b)
  40. }
  41. type WebRTCBatcher struct {
  42. PageSize int
  43. PageNum int
  44. subscribers util.Map[string,*WebRTCBatchSubscriber]
  45. signalChannel *DataChannel
  46. WebRTCPublisher
  47. }
  48. func (suber *WebRTCBatcher) Start() (err error) {
  49. suber.OnICECandidate(func(ice *ICECandidate) {
  50. if ice != nil {
  51. WebRTCPlugin.Info(ice.ToJSON().Candidate)
  52. }
  53. })
  54. suber.OnDataChannel(func(d *DataChannel) {
  55. WebRTCPlugin.Info("OnDataChannel:" + d.Label())
  56. suber.signalChannel = d
  57. suber.signalChannel.OnMessage(suber.Signal)
  58. })
  59. if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil {
  60. return
  61. }
  62. suber.OnConnectionStateChange(func(pcs PeerConnectionState) {
  63. WebRTCPlugin.Info("Connection State has changed:" + pcs.String())
  64. switch pcs {
  65. case PeerConnectionStateConnected:
  66. case PeerConnectionStateDisconnected, PeerConnectionStateFailed:
  67. zr := zap.String("reason", pcs.String())
  68. suber.subscribers.Range(func(key string, value *WebRTCBatchSubscriber) {
  69. value.Stop(zr)
  70. })
  71. if suber.Publisher.Stream != nil {
  72. suber.Publisher.Stop(zr)
  73. }
  74. suber.PeerConnection.Close()
  75. }
  76. })
  77. return
  78. }
  79. func (suber *WebRTCBatcher) RemoveSubscribe(streamPath string) {
  80. suber.signalChannel.SendText(NewRemoveSingal(streamPath))
  81. }
  82. func (suber *WebRTCBatcher) Answer() (err error) {
  83. var answer string
  84. if answer, err = suber.GetAnswer(); err == nil {
  85. err = suber.signalChannel.SendText(NewAnswerSingal(answer))
  86. }
  87. if err != nil {
  88. WebRTCPlugin.Error("Signal GetAnswer", zap.Error(err))
  89. }
  90. return
  91. }
  92. func (suber *WebRTCBatcher) Signal(msg DataChannelMessage) {
  93. var s Signal
  94. // var offer SessionDescription
  95. if err := json.Unmarshal(msg.Data, &s); err != nil {
  96. WebRTCPlugin.Error("Signal", zap.Error(err))
  97. } else {
  98. switch s.Type {
  99. case "subscribe":
  100. if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil {
  101. WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))
  102. return
  103. }
  104. for _, streamPath := range s.StreamList {
  105. if suber.subscribers.Has(streamPath) {
  106. continue
  107. }
  108. sub := &WebRTCBatchSubscriber{}
  109. sub.ID = fmt.Sprintf("%s_%s", suber.ID, streamPath)
  110. sub.WebRTCIO = suber.WebRTCIO
  111. if err = WebRTCPlugin.SubscribeExist(streamPath, sub); err == nil {
  112. suber.subscribers.Add(streamPath, sub)
  113. go func(streamPath string) {
  114. if sub.DC == nil {
  115. sub.PlayRTP()
  116. if sub.audio.RTPSender != nil {
  117. suber.RemoveTrack(sub.audio.RTPSender)
  118. }
  119. if sub.video.RTPSender != nil {
  120. suber.RemoveTrack(sub.video.RTPSender)
  121. }
  122. suber.RemoveSubscribe(streamPath)
  123. } else {
  124. sub.DC.OnOpen(func() {
  125. sub.DC.Send(codec.FLVHeader)
  126. go func() {
  127. sub.PlayFLV()
  128. sub.DC.Close()
  129. suber.RemoveSubscribe(streamPath)
  130. }()
  131. })
  132. }
  133. }(streamPath)
  134. } else {
  135. WebRTCPlugin.Error("subscribe", zap.String("streamPath", streamPath), zap.Error(err))
  136. suber.RemoveSubscribe(streamPath)
  137. }
  138. }
  139. err = suber.Answer()
  140. // if offer, err = suber.CreateOffer(nil); err == nil {
  141. // b, _ := json.Marshal(offer)
  142. // err = suber.signalChannel.SendText(string(b))
  143. // suber.SetLocalDescription(offer)
  144. // }
  145. case "publish", "unpublish":
  146. if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: s.Offer}); err != nil {
  147. WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))
  148. return
  149. }
  150. if err = suber.Answer(); err == nil {
  151. switch s.Type {
  152. case "publish":
  153. WebRTCPlugin.Publish(s.StreamPath, suber)
  154. case "unpublish":
  155. suber.Stop()
  156. }
  157. }
  158. case "answer":
  159. if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeAnswer, SDP: s.Answer}); err != nil {
  160. WebRTCPlugin.Error("Signal SetRemoteDescription", zap.Error(err))
  161. return
  162. }
  163. }
  164. WebRTCPlugin.Info(s.Type)
  165. }
  166. }