server.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package rtsp
  2. import (
  3. "github.com/bluenviron/gortsplib/v4"
  4. "github.com/bluenviron/gortsplib/v4/pkg/base"
  5. "github.com/bluenviron/gortsplib/v4/pkg/description"
  6. "go.uber.org/zap"
  7. . "m7s.live/engine/v4"
  8. )
  9. type RTSPIO struct {
  10. server *gortsplib.Server
  11. session *description.Session
  12. tracks []*description.Media
  13. stream *gortsplib.ServerStream
  14. audioTrack *description.Media
  15. videoTrack *description.Media
  16. }
  17. func (conf *RTSPConfig) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
  18. RTSPPlugin.Debug("conn opened")
  19. }
  20. func (conf *RTSPConfig) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
  21. RTSPPlugin.Debug("conn closed")
  22. if p, ok := conf.LoadAndDelete(ctx.Conn); ok {
  23. p.(IIO).Stop(zap.String("conn", "closed"))
  24. }
  25. }
  26. func (conf *RTSPConfig) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
  27. RTSPPlugin.Debug("session opened")
  28. }
  29. func (conf *RTSPConfig) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
  30. RTSPPlugin.Debug("session closed")
  31. conf.Delete(ctx.Session)
  32. }
  33. // called after receiving a DESCRIBE request.
  34. func (conf *RTSPConfig) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
  35. RTSPPlugin.Debug("describe request", zap.String("sdp", string(ctx.Request.Body)))
  36. var suber RTSPSubscriber
  37. suber.server = conf.server
  38. suber.RemoteAddr = ctx.Conn.NetConn().RemoteAddr().String()
  39. suber.SetIO(ctx.Conn.NetConn())
  40. streamPath := ctx.Path
  41. if ctx.Query != "" {
  42. streamPath = streamPath + "?" + ctx.Query
  43. }
  44. if err := RTSPPlugin.Subscribe(streamPath, &suber); err == nil {
  45. RTSPPlugin.Debug("describe replay ok")
  46. conf.Store(ctx.Conn, &suber)
  47. return &base.Response{
  48. StatusCode: base.StatusOK,
  49. }, suber.stream, nil
  50. } else {
  51. return &base.Response{
  52. StatusCode: base.StatusNotFound,
  53. }, suber.stream, nil
  54. }
  55. }
  56. func (conf *RTSPConfig) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
  57. var resp base.Response
  58. resp.StatusCode = base.StatusOK
  59. if p, ok := conf.Load(ctx.Conn); ok {
  60. switch v := p.(type) {
  61. case *RTSPSubscriber:
  62. return &resp, v.stream, nil
  63. case *RTSPPublisher:
  64. return &resp, v.stream, nil
  65. }
  66. }
  67. resp.StatusCode = base.StatusNotFound
  68. return &resp, nil, nil
  69. }
  70. func (conf *RTSPConfig) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
  71. var resp base.Response
  72. resp.StatusCode = base.StatusNotFound
  73. if p, ok := conf.Load(ctx.Conn); ok {
  74. switch v := p.(type) {
  75. case *RTSPSubscriber:
  76. resp.StatusCode = base.StatusOK
  77. go func() {
  78. v.PlayRTP()
  79. ctx.Session.Close()
  80. }()
  81. }
  82. }
  83. return &resp, nil
  84. }
  85. func (conf *RTSPConfig) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
  86. if p, ok := conf.Load(ctx.Session); ok {
  87. ctx.Session.OnPacketRTPAny(p.(*RTSPPublisher).OnPacket)
  88. }
  89. return &base.Response{
  90. StatusCode: base.StatusOK,
  91. }, nil
  92. }
  93. func (conf *RTSPConfig) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
  94. RTSPPlugin.Debug("annouce request", zap.String("sdp", string(ctx.Request.Body)))
  95. p := &RTSPPublisher{}
  96. p.SetIO(ctx.Conn.NetConn())
  97. if err := RTSPPlugin.Publish(ctx.Path, p); err == nil {
  98. p.session = ctx.Description
  99. p.stream = gortsplib.NewServerStream(conf.server, ctx.Description)
  100. if err = p.SetTracks(); err != nil {
  101. return nil, err
  102. }
  103. conf.Store(ctx.Conn, p)
  104. conf.Store(ctx.Session, p)
  105. } else {
  106. return &base.Response{
  107. StatusCode: base.StatusBadRequest,
  108. }, nil
  109. }
  110. return &base.Response{
  111. StatusCode: base.StatusOK,
  112. }, nil
  113. }