main.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package webrtc
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "net/http"
  7. "regexp"
  8. "strings"
  9. "time"
  10. "go.uber.org/zap"
  11. "m7s.live/engine/v4"
  12. _ "embed"
  13. "github.com/pion/interceptor"
  14. . "github.com/pion/webrtc/v4"
  15. "m7s.live/engine/v4/config"
  16. "m7s.live/engine/v4/util"
  17. "m7s.live/plugin/webrtc/v4/webrtc"
  18. )
  19. // }{[]string{
  20. // "stun:stun.ekiga.net",
  21. // "stun:stun.ideasip.com",
  22. // "stun:stun.schlund.de",
  23. // "stun:stun.stunprotocol.org:3478",
  24. // "stun:stun.voiparound.com",
  25. // "stun:stun.voipbuster.com",
  26. // "stun:stun.voipstunt.com",
  27. // "stun:stun.voxgratia.org",
  28. // "stun:stun.services.mozilla.com",
  29. // "stun:stun.xten.com",
  30. // "stun:stun.softjoys.com",
  31. // "stun:stunserver.org",
  32. // "stun:stun.schlund.de",
  33. // "stun:stun.rixtelecom.se",
  34. // "stun:stun.iptel.org",
  35. // "stun:stun.ideasip.com",
  36. // "stun:stun.fwdnet.net",
  37. // "stun:stun.ekiga.net",
  38. // "stun:stun01.sipphone.com",
  39. // }}
  40. // type udpConn struct {
  41. // conn *net.UDPConn
  42. // port int
  43. // }
  44. var (
  45. //go:embed publish.html
  46. publishHTML []byte
  47. //go:embed subscribe.html
  48. subscribeHTML []byte
  49. webrtcConfig WebRTCConfig
  50. reg_level = regexp.MustCompile("profile-level-id=(4.+f)")
  51. WebRTCPlugin = engine.InstallPlugin(&webrtcConfig)
  52. )
  53. type WebRTCConfig struct {
  54. config.Publish
  55. config.Subscribe
  56. ICEServers []ICEServer `desc:"ice服务器配置"`
  57. PublicIP string `desc:"公网IP"`
  58. PublicIPv6 string `desc:"公网IPv6"`
  59. Port string `default:"tcp:9000" desc:"监听端口"`
  60. PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后,发送PLI请求
  61. EnableOpus bool `default:"true" desc:"是否启用opus编码"` // 是否启用opus编码
  62. EnableAv1 bool `default:"true" desc:"是否启用av1编码"` // 是否启用av1编码
  63. m MediaEngine
  64. s SettingEngine
  65. api *API
  66. }
  67. func (conf *WebRTCConfig) OnEvent(event any) {
  68. switch event.(type) {
  69. case engine.FirstConfig:
  70. if len(conf.ICEServers) > 0 {
  71. for i := range conf.ICEServers {
  72. b, _ := conf.ICEServers[i].MarshalJSON()
  73. conf.ICEServers[i].UnmarshalJSON(b)
  74. }
  75. }
  76. webrtc.RegisterCodecs(&conf.m)
  77. if conf.EnableOpus {
  78. conf.m.RegisterCodec(RTPCodecParameters{
  79. RTPCodecCapability: RTPCodecCapability{MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil},
  80. PayloadType: 111,
  81. }, RTPCodecTypeAudio)
  82. }
  83. if conf.EnableAv1 {
  84. conf.m.RegisterCodec(RTPCodecParameters{
  85. RTPCodecCapability: RTPCodecCapability{MimeTypeAV1, 90000, 0, "profile=2;level-idx=8;tier=1", nil},
  86. PayloadType: 45,
  87. }, RTPCodecTypeVideo)
  88. }
  89. i := &interceptor.Registry{}
  90. if conf.PublicIP != "" {
  91. ips := []string{conf.PublicIP}
  92. if conf.PublicIPv6 != "" {
  93. ips = append(ips, conf.PublicIPv6)
  94. }
  95. conf.s.SetNAT1To1IPs(ips, ICECandidateTypeHost)
  96. }
  97. protocol, ports := util.Conf2Listener(conf.Port)
  98. if len(ports) == 0 {
  99. WebRTCPlugin.Fatal("webrtc port config error")
  100. }
  101. if protocol == "tcp" {
  102. tcpport := int(ports[0])
  103. tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
  104. IP: net.IP{0, 0, 0, 0},
  105. Port: tcpport,
  106. })
  107. if err != nil {
  108. WebRTCPlugin.Fatal("webrtc listener tcp", zap.Error(err))
  109. }
  110. WebRTCPlugin.Info("webrtc start listen", zap.Int("port", tcpport))
  111. conf.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
  112. conf.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
  113. } else if len(ports) == 2 {
  114. conf.s.SetEphemeralUDPPortRange(ports[0], ports[1])
  115. } else {
  116. // 创建共享WEBRTC端口 默认9000
  117. udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
  118. IP: net.IP{0, 0, 0, 0},
  119. Port: int(ports[0]),
  120. })
  121. if err != nil {
  122. WebRTCPlugin.Fatal("webrtc listener udp", zap.Error(err))
  123. }
  124. WebRTCPlugin.Info("webrtc start listen", zap.Uint16("port", ports[0]))
  125. conf.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
  126. conf.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
  127. }
  128. if err := RegisterDefaultInterceptors(&conf.m, i); err != nil {
  129. panic(err)
  130. }
  131. conf.api = NewAPI(WithMediaEngine(&conf.m),
  132. WithInterceptorRegistry(i), WithSettingEngine(conf.s))
  133. }
  134. }
  135. func (conf *WebRTCConfig) Play_(w http.ResponseWriter, r *http.Request) {
  136. w.Header().Set("Content-Type", "application/sdp")
  137. streamPath := r.URL.Path[len("/play/"):]
  138. rawQuery := r.URL.RawQuery
  139. bytes, err := io.ReadAll(r.Body)
  140. var suber WebRTCSubscriber
  141. suber.SDP = string(bytes)
  142. suber.RemoteAddr = r.RemoteAddr
  143. if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
  144. ICEServers: conf.ICEServers,
  145. }); err != nil {
  146. http.Error(w, err.Error(), http.StatusInternalServerError)
  147. return
  148. }
  149. suber.OnICECandidate(func(ice *ICECandidate) {
  150. if ice != nil {
  151. suber.Info(ice.ToJSON().Candidate)
  152. }
  153. })
  154. if err = suber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: suber.SDP}); err != nil {
  155. http.Error(w, err.Error(), http.StatusInternalServerError)
  156. return
  157. }
  158. if rawQuery != "" {
  159. streamPath += "?" + rawQuery
  160. }
  161. if err = WebRTCPlugin.Subscribe(streamPath, &suber); err != nil {
  162. http.Error(w, err.Error(), http.StatusBadRequest)
  163. return
  164. }
  165. if sdp, err := suber.GetAnswer(); err == nil {
  166. w.Write([]byte(sdp))
  167. } else {
  168. http.Error(w, err.Error(), http.StatusBadRequest)
  169. }
  170. }
  171. // https://datatracker.ietf.org/doc/html/draft-ietf-wish-whip
  172. func (conf *WebRTCConfig) Push_(w http.ResponseWriter, r *http.Request) {
  173. streamPath := r.URL.Path[len("/push/"):]
  174. rawQuery := r.URL.RawQuery
  175. auth := r.Header.Get("Authorization")
  176. if strings.HasPrefix(auth, "Bearer ") {
  177. auth = auth[len("Bearer "):]
  178. if rawQuery != "" {
  179. rawQuery += "&bearer=" + auth
  180. } else {
  181. rawQuery = "bearer=" + auth
  182. }
  183. WebRTCPlugin.Info("push", zap.String("stream", streamPath), zap.String("bearer", auth))
  184. }
  185. w.Header().Set("Content-Type", "application/sdp")
  186. w.Header().Set("Location", "/webrtc/api/stop/push/"+streamPath)
  187. if rawQuery != "" {
  188. streamPath += "?" + rawQuery
  189. }
  190. bytes, err := io.ReadAll(r.Body)
  191. var puber WebRTCPublisher
  192. puber.SDP = string(bytes)
  193. if puber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
  194. ICEServers: conf.ICEServers,
  195. }); err != nil {
  196. http.Error(w, err.Error(), http.StatusInternalServerError)
  197. return
  198. }
  199. puber.SetIO(puber.PeerConnection) //TODO: 单PC需要注释掉
  200. puber.OnICECandidate(func(ice *ICECandidate) {
  201. if ice != nil {
  202. puber.Info(ice.ToJSON().Candidate)
  203. }
  204. })
  205. puber.OnDataChannel(func(d *DataChannel) {
  206. puber.Info("OnDataChannel", zap.String("label", d.Label()))
  207. d.OnMessage(func(msg DataChannelMessage) {
  208. puber.SDP = string(msg.Data[1:])
  209. puber.Debug("dc message", zap.String("sdp", puber.SDP))
  210. if err := puber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: puber.SDP}); err != nil {
  211. return
  212. }
  213. if answer, err := puber.GetAnswer(); err == nil {
  214. d.SendText(answer)
  215. } else {
  216. return
  217. }
  218. switch msg.Data[0] {
  219. case '0':
  220. puber.Stop()
  221. case '1':
  222. }
  223. })
  224. })
  225. // if _, err = puber.AddTransceiverFromKind(RTPCodecTypeVideo); err != nil {
  226. // http.Error(w, err.Error(), http.StatusInternalServerError)
  227. // return
  228. // }
  229. // if _, err = puber.AddTransceiverFromKind(RTPCodecTypeAudio); err != nil {
  230. // http.Error(w, err.Error(), http.StatusInternalServerError)
  231. // return
  232. // }
  233. if err = WebRTCPlugin.Publish(streamPath, &puber); err != nil {
  234. http.Error(w, err.Error(), http.StatusBadRequest)
  235. return
  236. }
  237. puber.OnConnectionStateChange(func(state PeerConnectionState) {
  238. puber.Info("Connection State has changed:" + state.String())
  239. switch state {
  240. case PeerConnectionStateConnected:
  241. case PeerConnectionStateDisconnected, PeerConnectionStateFailed:
  242. puber.Stop()
  243. }
  244. })
  245. if err := puber.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: puber.SDP}); err != nil {
  246. http.Error(w, err.Error(), http.StatusBadRequest)
  247. return
  248. }
  249. if answer, err := puber.GetAnswer(); err == nil {
  250. w.WriteHeader(http.StatusCreated)
  251. fmt.Fprint(w, answer)
  252. } else {
  253. http.Error(w, err.Error(), http.StatusBadRequest)
  254. return
  255. }
  256. }
  257. func (conf *WebRTCConfig) Test_Publish(w http.ResponseWriter, r *http.Request) {
  258. w.Write(publishHTML)
  259. }
  260. func (conf *WebRTCConfig) Test_ScreenShare(w http.ResponseWriter, r *http.Request) {
  261. w.Write(publishHTML)
  262. }
  263. func (conf *WebRTCConfig) Test_Subscribe(w http.ResponseWriter, r *http.Request) {
  264. w.Write(subscribeHTML)
  265. }
  266. func (conf *WebRTCConfig) Batch(w http.ResponseWriter, r *http.Request) {
  267. bytes, err := io.ReadAll(r.Body)
  268. var suber WebRTCBatcher
  269. suber.RemoteAddr = r.RemoteAddr
  270. suber.SDP = string(bytes)
  271. if suber.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
  272. ICEServers: conf.ICEServers,
  273. }); err != nil {
  274. http.Error(w, err.Error(), http.StatusInternalServerError)
  275. return
  276. }
  277. if err = suber.Start(); err != nil {
  278. http.Error(w, err.Error(), http.StatusInternalServerError)
  279. return
  280. }
  281. if sdp, err := suber.GetAnswer(); err == nil {
  282. w.Header().Set("Content-Type", "application/sdp")
  283. fmt.Fprintf(w, "%s", sdp)
  284. } else {
  285. http.Error(w, err.Error(), http.StatusBadRequest)
  286. }
  287. }