123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- package rtsp
- import (
- "context"
- "net"
- "github.com/bluenviron/gortsplib/v4"
- "github.com/bluenviron/gortsplib/v4/pkg/base"
- "github.com/bluenviron/gortsplib/v4/pkg/url"
- "go.uber.org/zap"
- "m7s.live/engine/v4"
- )
- type RTSPClient struct {
- *gortsplib.Client `json:"-" yaml:"-"`
- gortsplib.Transport
- DialContext func(ctx context.Context, network, address string) (net.Conn, error) `json:"-" yaml:"-"`
- }
- type RTSPPuller struct {
- RTSPPublisher
- engine.Puller
- RTSPClient
- }
- func (p *RTSPClient) Close() error {
- if p.Client != nil {
- p.Client.Close()
- }
- return nil
- }
- func (p *RTSPClient) Disconnect() {
- if p.Client != nil {
- p.Client.Close()
- }
- }
- func (p *RTSPPuller) Connect() error {
- client := &gortsplib.Client{
- DialContext: p.DialContext,
- AnyPortEnable: true,
- }
- p.Transport = gortsplib.TransportTCP
- client.Transport = &p.Transport
- // parse URL
- u, err := url.Parse(p.RemoteURL)
- if err != nil {
- return err
- }
- // connect to the server
- if err = client.Start(u.Scheme, u.Host); err != nil {
- return err
- }
- p.Client = client
- p.SetIO(p)
- return nil
- }
- func (p *RTSPPuller) Pull() (err error) {
- u, _ := url.Parse(p.RemoteURL)
- var res *base.Response
- if rtspConfig.SendOptions {
- if res, err = p.Options(u); err != nil {
- p.Error("Options", zap.Error(err))
- return
- }
- }
- p.Debug("Options", zap.Any("res", res))
- // find published tracks
- session, res, err := p.Describe(u)
- if err != nil {
- p.Error("Describe", zap.Error(err))
- return err
- }
- p.Debug("Describe", zap.Any("res", res))
- p.session = session
- err = p.SetTracks()
- if err != nil {
- p.Error("SetTracks", zap.Error(err))
- return err
- }
- if err = p.SetupAll(session.BaseURL, session.Medias); err != nil {
- p.Error("SetupAndPlay", zap.Error(err))
- return err
- }
- p.OnPacketRTPAny(p.OnPacket)
- res, err = p.Play(nil)
- p.Debug("Play", zap.Any("res", res))
- if err != nil {
- p.Error("Play", zap.Error(err))
- return err
- }
- return p.Wait()
- }
- type RTSPPusher struct {
- RTSPSubscriber
- engine.Pusher
- RTSPClient
- }
- func (p *RTSPPusher) OnEvent(event any) {
- switch v := event.(type) {
- case engine.VideoRTP:
- p.Client.WritePacketRTP(p.videoTrack, v.Packet)
- case engine.AudioRTP:
- p.Client.WritePacketRTP(p.audioTrack, v.Packet)
- default:
- p.RTSPSubscriber.OnEvent(event)
- }
- }
- func (p *RTSPPusher) Connect() error {
- p.Client = &gortsplib.Client{
- DialContext: p.DialContext,
- WriteQueueSize: rtspConfig.WriteBufferCount,
- }
- p.Transport = gortsplib.TransportTCP
- p.Client.Transport = &p.Transport
- // parse URL
- u, err := url.Parse(p.RemoteURL)
- if err != nil {
- p.Error("url.Parse", zap.Error(err))
- return err
- }
- // connect to the server
- if err = p.Client.Start(u.Scheme, u.Host); err != nil {
- p.Error("Client.Start", zap.Error(err))
- return err
- }
- p.SetIO(p)
- if rtspConfig.SendOptions {
- _, err = p.Client.Options(u)
- }
- return err
- }
- func (p *RTSPPusher) Push() (err error) {
- var u *url.URL
- u, err = url.Parse(p.RemoteURL)
- // startTime := time.Now()
- // for len(p.tracks) < 2 {
- // if time.Sleep(time.Second); time.Since(startTime) > time.Second*10 {
- // return fmt.Errorf("timeout")
- // }
- // }
- var res *base.Response
- if res, err = p.Announce(u, p.session); err != nil {
- p.Error("Announce", zap.Error(err))
- return
- } else {
- p.Debug("Announce", zap.Any("res", res))
- }
- err = p.SetupAll(u, p.session.Medias)
- if err != nil {
- p.Error("Setup", zap.Error(err))
- return
- }
- if res, err = p.Record(); err != nil {
- p.Error("Record", zap.Error(err))
- return
- } else {
- p.Debug("Record", zap.Any("res", res))
- }
- p.PlayRTP()
- return
- }
|