123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package rtsp
- import (
- "github.com/bluenviron/gortsplib/v4"
- "github.com/bluenviron/gortsplib/v4/pkg/base"
- "github.com/bluenviron/gortsplib/v4/pkg/description"
- "go.uber.org/zap"
- . "m7s.live/engine/v4"
- )
- type RTSPIO struct {
- server *gortsplib.Server
- session *description.Session
- tracks []*description.Media
- stream *gortsplib.ServerStream
- audioTrack *description.Media
- videoTrack *description.Media
- }
- func (conf *RTSPConfig) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
- RTSPPlugin.Debug("conn opened")
- }
- func (conf *RTSPConfig) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
- RTSPPlugin.Debug("conn closed")
- if p, ok := conf.LoadAndDelete(ctx.Conn); ok {
- p.(IIO).Stop(zap.String("conn", "closed"))
- }
- }
- func (conf *RTSPConfig) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
- RTSPPlugin.Debug("session opened")
- }
- func (conf *RTSPConfig) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
- RTSPPlugin.Debug("session closed")
- conf.Delete(ctx.Session)
- }
- // called after receiving a DESCRIBE request.
- func (conf *RTSPConfig) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
- RTSPPlugin.Debug("describe request", zap.String("sdp", string(ctx.Request.Body)))
- var suber RTSPSubscriber
- suber.server = conf.server
- suber.RemoteAddr = ctx.Conn.NetConn().RemoteAddr().String()
- suber.SetIO(ctx.Conn.NetConn())
- streamPath := ctx.Path
- if ctx.Query != "" {
- streamPath = streamPath + "?" + ctx.Query
- }
- if err := RTSPPlugin.Subscribe(streamPath, &suber); err == nil {
- RTSPPlugin.Debug("describe replay ok")
- conf.Store(ctx.Conn, &suber)
- return &base.Response{
- StatusCode: base.StatusOK,
- }, suber.stream, nil
- } else {
- return &base.Response{
- StatusCode: base.StatusNotFound,
- }, suber.stream, nil
- }
- }
- func (conf *RTSPConfig) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
- var resp base.Response
- resp.StatusCode = base.StatusOK
- if p, ok := conf.Load(ctx.Conn); ok {
- switch v := p.(type) {
- case *RTSPSubscriber:
- return &resp, v.stream, nil
- case *RTSPPublisher:
- return &resp, v.stream, nil
- }
- }
- resp.StatusCode = base.StatusNotFound
- return &resp, nil, nil
- }
- func (conf *RTSPConfig) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
- var resp base.Response
- resp.StatusCode = base.StatusNotFound
- if p, ok := conf.Load(ctx.Conn); ok {
- switch v := p.(type) {
- case *RTSPSubscriber:
- resp.StatusCode = base.StatusOK
- go func() {
- v.PlayRTP()
- ctx.Session.Close()
- }()
- }
- }
- return &resp, nil
- }
- func (conf *RTSPConfig) OnRecord(ctx *gortsplib.ServerHandlerOnRecordCtx) (*base.Response, error) {
- if p, ok := conf.Load(ctx.Session); ok {
- ctx.Session.OnPacketRTPAny(p.(*RTSPPublisher).OnPacket)
- }
- return &base.Response{
- StatusCode: base.StatusOK,
- }, nil
- }
- func (conf *RTSPConfig) OnAnnounce(ctx *gortsplib.ServerHandlerOnAnnounceCtx) (*base.Response, error) {
- RTSPPlugin.Debug("annouce request", zap.String("sdp", string(ctx.Request.Body)))
- p := &RTSPPublisher{}
- p.SetIO(ctx.Conn.NetConn())
- if err := RTSPPlugin.Publish(ctx.Path, p); err == nil {
- p.session = ctx.Description
- p.stream = gortsplib.NewServerStream(conf.server, ctx.Description)
- if err = p.SetTracks(); err != nil {
- return nil, err
- }
- conf.Store(ctx.Conn, p)
- conf.Store(ctx.Session, p)
- } else {
- return &base.Response{
- StatusCode: base.StatusBadRequest,
- }, nil
- }
- return &base.Response{
- StatusCode: base.StatusOK,
- }, nil
- }
|