client.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package rtsp
  2. import (
  3. "context"
  4. "net"
  5. "github.com/bluenviron/gortsplib/v4"
  6. "github.com/bluenviron/gortsplib/v4/pkg/base"
  7. "github.com/bluenviron/gortsplib/v4/pkg/url"
  8. "go.uber.org/zap"
  9. "m7s.live/engine/v4"
  10. )
  11. type RTSPClient struct {
  12. *gortsplib.Client `json:"-" yaml:"-"`
  13. gortsplib.Transport
  14. DialContext func(ctx context.Context, network, address string) (net.Conn, error) `json:"-" yaml:"-"`
  15. }
  16. type RTSPPuller struct {
  17. RTSPPublisher
  18. engine.Puller
  19. RTSPClient
  20. }
  21. func (p *RTSPClient) Close() error {
  22. if p.Client != nil {
  23. p.Client.Close()
  24. }
  25. return nil
  26. }
  27. func (p *RTSPClient) Disconnect() {
  28. if p.Client != nil {
  29. p.Client.Close()
  30. }
  31. }
  32. func (p *RTSPPuller) Connect() error {
  33. client := &gortsplib.Client{
  34. DialContext: p.DialContext,
  35. AnyPortEnable: true,
  36. }
  37. p.Transport = gortsplib.TransportTCP
  38. client.Transport = &p.Transport
  39. // parse URL
  40. u, err := url.Parse(p.RemoteURL)
  41. if err != nil {
  42. return err
  43. }
  44. // connect to the server
  45. if err = client.Start(u.Scheme, u.Host); err != nil {
  46. return err
  47. }
  48. p.Client = client
  49. p.SetIO(p)
  50. return nil
  51. }
  52. func (p *RTSPPuller) Pull() (err error) {
  53. u, _ := url.Parse(p.RemoteURL)
  54. var res *base.Response
  55. if rtspConfig.SendOptions {
  56. if res, err = p.Options(u); err != nil {
  57. p.Error("Options", zap.Error(err))
  58. return
  59. }
  60. }
  61. p.Debug("Options", zap.Any("res", res))
  62. // find published tracks
  63. session, res, err := p.Describe(u)
  64. if err != nil {
  65. p.Error("Describe", zap.Error(err))
  66. return err
  67. }
  68. p.Debug("Describe", zap.Any("res", res))
  69. p.session = session
  70. err = p.SetTracks()
  71. if err != nil {
  72. p.Error("SetTracks", zap.Error(err))
  73. return err
  74. }
  75. if err = p.SetupAll(session.BaseURL, session.Medias); err != nil {
  76. p.Error("SetupAndPlay", zap.Error(err))
  77. return err
  78. }
  79. p.OnPacketRTPAny(p.OnPacket)
  80. res, err = p.Play(nil)
  81. p.Debug("Play", zap.Any("res", res))
  82. if err != nil {
  83. p.Error("Play", zap.Error(err))
  84. return err
  85. }
  86. return p.Wait()
  87. }
  88. type RTSPPusher struct {
  89. RTSPSubscriber
  90. engine.Pusher
  91. RTSPClient
  92. }
  93. func (p *RTSPPusher) OnEvent(event any) {
  94. switch v := event.(type) {
  95. case engine.VideoRTP:
  96. p.Client.WritePacketRTP(p.videoTrack, v.Packet)
  97. case engine.AudioRTP:
  98. p.Client.WritePacketRTP(p.audioTrack, v.Packet)
  99. default:
  100. p.RTSPSubscriber.OnEvent(event)
  101. }
  102. }
  103. func (p *RTSPPusher) Connect() error {
  104. p.Client = &gortsplib.Client{
  105. DialContext: p.DialContext,
  106. WriteQueueSize: rtspConfig.WriteBufferCount,
  107. }
  108. p.Transport = gortsplib.TransportTCP
  109. p.Client.Transport = &p.Transport
  110. // parse URL
  111. u, err := url.Parse(p.RemoteURL)
  112. if err != nil {
  113. p.Error("url.Parse", zap.Error(err))
  114. return err
  115. }
  116. // connect to the server
  117. if err = p.Client.Start(u.Scheme, u.Host); err != nil {
  118. p.Error("Client.Start", zap.Error(err))
  119. return err
  120. }
  121. p.SetIO(p)
  122. if rtspConfig.SendOptions {
  123. _, err = p.Client.Options(u)
  124. }
  125. return err
  126. }
  127. func (p *RTSPPusher) Push() (err error) {
  128. var u *url.URL
  129. u, err = url.Parse(p.RemoteURL)
  130. // startTime := time.Now()
  131. // for len(p.tracks) < 2 {
  132. // if time.Sleep(time.Second); time.Since(startTime) > time.Second*10 {
  133. // return fmt.Errorf("timeout")
  134. // }
  135. // }
  136. var res *base.Response
  137. if res, err = p.Announce(u, p.session); err != nil {
  138. p.Error("Announce", zap.Error(err))
  139. return
  140. } else {
  141. p.Debug("Announce", zap.Any("res", res))
  142. }
  143. err = p.SetupAll(u, p.session.Medias)
  144. if err != nil {
  145. p.Error("Setup", zap.Error(err))
  146. return
  147. }
  148. if res, err = p.Record(); err != nil {
  149. p.Error("Record", zap.Error(err))
  150. return
  151. } else {
  152. p.Debug("Record", zap.Any("res", res))
  153. }
  154. p.PlayRTP()
  155. return
  156. }