main.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package rtsp
  2. import (
  3. "net/http"
  4. "strconv"
  5. "sync"
  6. "github.com/bluenviron/gortsplib/v4"
  7. "go.uber.org/zap"
  8. . "m7s.live/engine/v4"
  9. "m7s.live/engine/v4/config"
  10. "m7s.live/engine/v4/util"
  11. )
  12. type RTSPConfig struct {
  13. config.Publish
  14. config.Subscribe
  15. config.Pull
  16. config.Push
  17. ListenAddr string `default:":554" desc:"rtsp监听地址"`
  18. UDPAddr string `default:":8000" desc:"udp rtp监听地址"`
  19. RTCPAddr string `default:":8001" desc:"udp rtcp监听地址"`
  20. WriteBufferCount int `default:"2048" desc:"rtsp写缓冲区大小"`
  21. SendOptions bool `default:"true" desc:"是否发送options请求"`
  22. sync.Map
  23. server *gortsplib.Server
  24. }
  25. func (conf *RTSPConfig) OnEvent(event any) {
  26. switch v := event.(type) {
  27. case FirstConfig:
  28. conf.server = &gortsplib.Server{
  29. Handler: conf,
  30. RTSPAddress: conf.ListenAddr,
  31. UDPRTPAddress: conf.UDPAddr,
  32. UDPRTCPAddress: conf.RTCPAddr,
  33. MulticastIPRange: "224.1.0.0/16",
  34. MulticastRTPPort: 8002,
  35. MulticastRTCPPort: 8003,
  36. WriteQueueSize: conf.WriteBufferCount,
  37. }
  38. if err := conf.server.Start(); err != nil {
  39. RTSPPlugin.Error("server start", zap.Error(err))
  40. RTSPPlugin.Disabled = true
  41. }
  42. for streamPath, url := range conf.PullOnStart {
  43. if err := RTSPPlugin.Pull(streamPath, url, new(RTSPPuller), 0); err != nil {
  44. RTSPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
  45. }
  46. }
  47. case SEpublish:
  48. if remoteURL := conf.CheckPush(v.Target.Path); remoteURL != "" {
  49. if err := RTSPPlugin.Push(v.Target.Path, remoteURL, new(RTSPPusher), false); err != nil {
  50. RTSPPlugin.Error("push", zap.String("streamPath", v.Target.Path), zap.String("url", remoteURL), zap.Error(err))
  51. }
  52. }
  53. case InvitePublish: //按需拉流
  54. if remoteURL := conf.CheckPullOnSub(v.Target); remoteURL != "" {
  55. if err := RTSPPlugin.Pull(v.Target, remoteURL, new(RTSPPuller), 0); err != nil {
  56. RTSPPlugin.Error("pull", zap.String("streamPath", v.Target), zap.String("url", remoteURL), zap.Error(err))
  57. }
  58. }
  59. }
  60. }
  61. var rtspConfig = &RTSPConfig{}
  62. var RTSPPlugin = InstallPlugin(rtspConfig)
  63. func filterStreams() (ss []*Stream) {
  64. Streams.Range(func(key string, s *Stream) {
  65. switch s.Publisher.(type) {
  66. case *RTSPPublisher, *RTSPPuller:
  67. ss = append(ss, s)
  68. }
  69. })
  70. return
  71. }
  72. func (*RTSPConfig) API_list(w http.ResponseWriter, r *http.Request) {
  73. util.ReturnFetchValue(filterStreams, w, r)
  74. }
  75. func (*RTSPConfig) API_Pull(rw http.ResponseWriter, r *http.Request) {
  76. query := r.URL.Query()
  77. save, _ := strconv.Atoi(query.Get("save"))
  78. err := RTSPPlugin.Pull(query.Get("streamPath"), query.Get("target"), new(RTSPPuller), save)
  79. if err != nil {
  80. util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r)
  81. } else {
  82. util.ReturnOK(rw, r)
  83. }
  84. }
  85. func (*RTSPConfig) API_Push(rw http.ResponseWriter, r *http.Request) {
  86. query := r.URL.Query()
  87. err := RTSPPlugin.Push(query.Get("streamPath"), query.Get("target"), new(RTSPPusher), query.Has("save"))
  88. if err != nil {
  89. util.ReturnError(util.APIErrorQueryParse, err.Error(), rw, r)
  90. } else {
  91. util.ReturnOK(rw, r)
  92. }
  93. }