package engine import ( "context" "encoding/json" "fmt" "runtime" "strings" "sync" "sync/atomic" "time" "unsafe" . "github.com/logrusorgru/aurora/v4" "go.uber.org/zap" "m7s.live/engine/v4/common" . "m7s.live/engine/v4/common" "m7s.live/engine/v4/config" "m7s.live/engine/v4/log" "m7s.live/engine/v4/track" "m7s.live/engine/v4/util" ) type StreamState byte type StreamAction byte func (s StreamState) String() string { return StateNames[s] } func (s StreamAction) String() string { return ActionNames[s] } // 四状态机 const ( STATE_WAITPUBLISH StreamState = iota // 等待发布者状态 STATE_WAITTRACK // 等待音视频轨道激活 STATE_PUBLISHING // 正在发布流状态 STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启) STATE_CLOSED // 流已关闭,不可使用 ) const ( ACTION_PUBLISH StreamAction = iota ACTION_TRACKAVAILABLE // 音视频轨道激活 ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到 ACTION_PUBLISHCLOSE // 发布者关闭 ACTION_CLOSE // 主动关闭流 ACTION_LASTLEAVE // 最后一个订阅者离开 ACTION_FIRSTENTER // 第一个订阅者进入 ACTION_NOTRACK // 没有音视频轨道 ) var StateNames = [...]string{"⌛0", "🟡1", "🟢2", "🟠3", "🔴4"} var ActionNames = [...]string{"publish", "track available", "timeout", "publish close", "close", "last leave", "first enter", "no tracks"} /* stateDiagram-v2 [*] --> ⌛等待发布者 : 创建 ⌛等待发布者 --> 🟡等待轨道 :发布 ⌛等待发布者 --> 🔴已关闭 :关闭 ⌛等待发布者 --> 🔴已关闭 :超时 ⌛等待发布者 --> 🔴已关闭 :最后订阅者离开 🟡等待轨道 --> 🟢正在发布 :轨道激活 🟡等待轨道 --> 🔴已关闭 :关闭 🟡等待轨道 --> 🔴已关闭 :超时 🟡等待轨道 --> 🔴已关闭 :最后订阅者离开 🟢正在发布 --> ⌛等待发布者: 发布者断开 🟢正在发布 --> 🟠等待关闭: 最后订阅者离开 🟢正在发布 --> 🔴已关闭 :关闭 🟠等待关闭 --> 🟢正在发布 :第一个订阅者进入 🟠等待关闭 --> 🔴已关闭 :关闭 🟠等待关闭 --> 🔴已关闭 :超时 🟠等待关闭 --> 🔴已关闭 :发布者断开 */ var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{ { ACTION_PUBLISH: STATE_WAITTRACK, ACTION_TIMEOUT: STATE_CLOSED, ACTION_LASTLEAVE: STATE_CLOSED, ACTION_CLOSE: STATE_CLOSED, }, { ACTION_TRACKAVAILABLE: STATE_PUBLISHING, ACTION_TIMEOUT: STATE_CLOSED, ACTION_LASTLEAVE: STATE_WAITCLOSE, ACTION_CLOSE: STATE_CLOSED, }, { // ACTION_PUBLISHCLOSE: STATE_WAITPUBLISH, ACTION_TIMEOUT: STATE_WAITPUBLISH, ACTION_LASTLEAVE: STATE_WAITCLOSE, ACTION_CLOSE: STATE_CLOSED, }, { // ACTION_PUBLISHCLOSE: STATE_CLOSED, ACTION_TIMEOUT: STATE_CLOSED, ACTION_FIRSTENTER: STATE_PUBLISHING, ACTION_CLOSE: STATE_CLOSED, }, {}, } // Streams 所有的流集合 var Streams util.Map[string, *Stream] func FilterStreams[T IPublisher]() (ss []*Stream) { Streams.Range(func(_ string, s *Stream) { if _, ok := s.Publisher.(T); ok { ss = append(ss, s) } }) return } type StreamTimeoutConfig struct { PublishTimeout time.Duration //发布者无数据后超时 DelayCloseTimeout time.Duration //无订阅者后超时,必须先有一次订阅才会激活 IdleTimeout time.Duration //无订阅者后超时,不需要订阅即可激活 PauseTimeout time.Duration //暂停后超时 NeverTimeout bool // 永不超时 } type Tracks struct { sync.Map Video []*track.Video Audio []*track.Audio Data []common.Track MainVideo *track.Video MainAudio *track.Audio SEI *track.Data[[]byte] marshalLock sync.Mutex } func (tracks *Tracks) Range(f func(name string, t Track)) { tracks.Map.Range(func(k, v any) bool { f(k.(string), v.(Track)) return true }) } func (tracks *Tracks) Add(name string, t Track) bool { //fmt.Println("ADD TRACK 1111111111111111111111111",name) switch v := t.(type) { case *track.Video: if tracks.MainVideo == nil { tracks.MainVideo = v tracks.SetIDR(v) } if tracks.SEI != nil { v.SEIReader = &track.DataReader[[]byte]{} v.SEIReader.Ring = tracks.SEI.Ring } case *track.Audio: if tracks.MainAudio == nil { tracks.MainAudio = v } if tracks.MainVideo != nil { v.Narrow() } } _, loaded := tracks.LoadOrStore(name, t) if !loaded { switch v := t.(type) { case *track.Video: tracks.Video = append(tracks.Video, v) case *track.Audio: tracks.Audio = append(tracks.Audio, v) default: tracks.Data = append(tracks.Data, v) } } return !loaded } func (tracks *Tracks) SetIDR(video Track) { if video == tracks.MainVideo { tracks.Range(func(_ string, t Track) { if v, ok := t.(*track.Audio); ok { v.Narrow() } }) } } func (tracks *Tracks) AddSEI(t byte, data []byte) bool { if tracks.SEI != nil { l := len(data) var buffer util.Buffer buffer.WriteByte(t) for l >= 255 { buffer.WriteByte(255) l -= 255 } buffer.WriteByte(byte(l)) buffer.Write(data) buffer.WriteByte(0x80) tracks.SEI.Push(buffer) return true } return false } func (tracks *Tracks) MarshalJSON() ([]byte, error) { var trackList []Track tracks.marshalLock.Lock() defer tracks.marshalLock.Unlock() tracks.Range(func(_ string, t Track) { t.SnapForJson() trackList = append(trackList, t) }) return json.Marshal(trackList) } var streamIdGen atomic.Uint32 // Stream 流定义 type Stream struct { timeout *time.Timer //当前状态的超时定时器 actionChan util.SafeChan[any] ID uint32 // 流ID *log.Logger StartTime time.Time //创建时间 StreamTimeoutConfig Path string Publisher IPublisher State StreamState SEHistory []StateEvent // 事件历史 Subscribers Subscribers // 订阅者 Tracks Tracks AppName string StreamName string IsPause bool // 是否处于暂停状态 pubLocker sync.Mutex } type StreamSummay struct { Path string State StreamState Subscribers int Tracks []string StartTime time.Time Type string BPS int } func (s *Stream) GetType() string { if s.Publisher == nil { return "" } return s.Publisher.GetPublisher().Type } func (s *Stream) GetStartTime() time.Time { return s.StartTime } func (s *Stream) GetPublisherConfig() *config.Publish { if s.Publisher == nil { s.Error("GetPublisherConfig: Publisher is nil") return nil } return s.Publisher.GetPublisher().Config } // Summary 返回流的简要信息 func (s *Stream) Summary() (r StreamSummay) { if s.Publisher != nil { r.Type = s.Publisher.GetPublisher().Type } s.Tracks.Range(func(name string, t Track) { r.BPS += t.GetBPS() r.Tracks = append(r.Tracks, name) }) r.Path = s.Path r.State = s.State r.Subscribers = s.Subscribers.Len() r.StartTime = s.StartTime return } func (s *Stream) SSRC() uint32 { return uint32(uintptr(unsafe.Pointer(s))) } func (s *Stream) SetIDR(video Track) { s.Tracks.SetIDR(video) } func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream, created bool) { p := strings.Split(streamPath, "/") if len(p) < 2 { log.Warn(Red("Stream Path Format Error:"), streamPath) return nil, false } actual, loaded := Streams.LoadOrStore(streamPath, &Stream{ Path: streamPath, AppName: p[0], StreamName: strings.Join(p[1:], "/"), StartTime: time.Now(), timeout: time.NewTimer(waitTimeout), }) if s := actual.(*Stream); loaded { for s.Logger == nil { runtime.Gosched() } s.Debug("found") return s, false } else { s.ID = streamIdGen.Add(1) s.Subscribers.Init() s.actionChan.Init(10) s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath), zap.Uint32("id", s.ID)) s.Debug("created") go s.run() return s, true } } func (r *Stream) action(action StreamAction) (ok bool) { //fmt.Println("ACTON 1111111111111111111111:",action.String(),r.State) var event StateEvent event.Target = r event.Action = action event.From = r.State event.Time = time.Now() var next StreamState if next, ok = event.Next(); ok { r.State = next //.Println("NEXT 1111111111111111111111:",next.String()) r.SEHistory = append(r.SEHistory, event) // 给Publisher状态变更的回调,方便进行远程拉流等操作 var stateEvent any r.Info(Sprintf("%s%s%s", event.From.String(), Yellow("->"), next.String()), zap.String("action", action.String())) switch next { case STATE_WAITPUBLISH: //fmt.Println("STATE_WAITPUBLISH 1111111111111111111111111") stateEvent = SEwaitPublish{event, r.Publisher} waitTime := time.Duration(0) if r.Publisher != nil { waitTime = r.Publisher.GetPublisher().Config.WaitCloseTimeout r.Tracks.Range(func(name string, t Track) { t.SetStuff(TrackStateOffline) }) } r.Subscribers.OnPublisherLost(event) if suber := r.Subscribers.Pick(); suber != nil { r.Subscribers.Broadcast(stateEvent) if waitTime == 0 { waitTime = suber.GetSubscriber().Config.WaitTimeout } } else if waitTime == 0 { waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流 } r.timeout.Reset(waitTime) r.Info("wait publisher", zap.Duration("wait timeout", waitTime)) case STATE_WAITTRACK: //fmt.Println("STATE_WAITTRACK 1111111111111111111111111") if len(r.SEHistory) > 1 { stateEvent = SErepublish{event} } else { stateEvent = SEpublish{event} } r.timeout.Reset(time.Second * 20) // 5秒心跳,检测track的存活度 case STATE_PUBLISHING: //fmt.Println("STATE_PUBLISHING 1111111111111111111111111") stateEvent = SEtrackAvaliable{event} r.Subscribers.SendInviteTrack(r) r.Subscribers.Broadcast(stateEvent) if puller, ok := r.Publisher.(IPuller); ok { puller.OnConnected() } r.timeout.Reset(time.Second * 30) //r.timeout.Reset(time.Second * 15) // 5秒心跳,检测track的存活度 case STATE_WAITCLOSE: //fmt.Println("STATE_WAITCLOSE 1111111111111111111111111") stateEvent = SEwaitClose{event} if r.IdleTimeout > 0 { r.timeout.Reset(r.IdleTimeout) } else { r.timeout.Reset(r.DelayCloseTimeout) } case STATE_CLOSED: //fmt.Println("STATE_CLOSED 1111111111111111111111111") Streams.Delete(r.Path) r.timeout.Stop() stateEvent = SEclose{event} r.Subscribers.Broadcast(stateEvent) r.Tracks.Range(func(_ string, t Track) { t.Dispose() }) r.Subscribers.Dispose() r.actionChan.Close() } if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond { r.Warn("action timeout", zap.String("action", action.String()), zap.Duration("cost", actionCoust)) } EventBus <- stateEvent if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond { r.Warn("action timeout after eventbus", zap.String("action", action.String()), zap.Duration("cost", actionCoust)) } if r.Publisher != nil { r.Publisher.OnEvent(stateEvent) if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond { r.Warn("action timeout after send to publisher", zap.String("action", action.String()), zap.Duration("cost", actionCoust)) } } } else { r.Debug("wrong action", zap.String("action", action.String())) } return } func (r *Stream) IsShutdown() bool { switch l := len(r.SEHistory); l { case 0: return false case 1: return r.SEHistory[0].Action == ACTION_CLOSE default: switch r.SEHistory[l-1].Action { case ACTION_CLOSE: return true case ACTION_TIMEOUT: return r.SEHistory[l-1].From == STATE_WAITCLOSE } } return false } func (r *Stream) IsClosed() bool { if r == nil { return true } return r.State == STATE_CLOSED } func (r *Stream) Close() { r.Receive(ACTION_CLOSE) } func (s *Stream) Receive(event any) bool { if s.IsClosed() { return false } return s.actionChan.Send(event) } func (s *Stream) onSuberClose(sub ISubscriber) { s.Subscribers.Delete(sub) if s.Publisher != nil { s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量 } if (s.DelayCloseTimeout > 0 || s.IdleTimeout > 0) && s.Subscribers.Len() == 0 { s.action(ACTION_LASTLEAVE) } } func (s *Stream) checkRunCost(timeStart time.Time, timeOutInfo zap.Field) { if cost := time.Since(timeStart); cost > 100*time.Millisecond { s.Warn("run timeout", timeOutInfo, zap.Duration("cost", cost)) } } // 流状态处理中枢,包括接收订阅发布指令等 func (s *Stream) run() { EventBus <- SEcreate{StreamEvent{Event[*Stream]{Target: s, Time: time.Now()}}} pulseTicker := time.NewTicker(EngineConfig.PulseInterval) defer pulseTicker.Stop() var timeOutInfo zap.Field var timeStart time.Time for pulseSuber := make(map[ISubscriber]struct{}); ; s.checkRunCost(timeStart, timeOutInfo) { select { case <-pulseTicker.C: //fmt.Println(" <-pulseTicker.C 1111111111111111111111111111111") timeStart = time.Now() timeOutInfo = zap.String("type", "pulse") for sub := range pulseSuber { sub.OnEvent(PulseEvent{CreateEvent(struct{}{})}) } case <-s.timeout.C: timeStart = time.Now() timeOutInfo = zap.String("state", s.State.String()) if s.State == STATE_PUBLISHING || s.State == STATE_WAITTRACK { for sub := range s.Subscribers.internal { if sub.IsClosed() { delete(s.Subscribers.internal, sub) s.Info("innersuber -1", zap.Int("remains", len(s.Subscribers.internal))) } } for sub := range s.Subscribers.public { if sub.IsClosed() { s.onSuberClose(sub) } } if !s.NeverTimeout { lost := false trackCount := 0 timeout := s.PublishTimeout if s.IsPause { timeout = s.PauseTimeout } s.Tracks.Range(func(name string, t Track) { trackCount++ switch t.(type) { case *track.Video, *track.Audio: // track 超过一定时间没有更新数据了 if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > timeout { s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", timeout)) lost = true } } }) if !lost { if trackCount == 0 { s.Warn("no tracks") lost = true s.action(ACTION_CLOSE) continue } else if s.Publisher != nil && s.Publisher.IsClosed() { s.Warn("publish is closed", zap.Error(context.Cause(s.Publisher.GetPublisher())), zap.String("ptr", fmt.Sprintf("%p", s.Publisher.GetPublisher().Context))) lost = true if len(s.Tracks.Audio)+len(s.Tracks.Video) == 0 { s.action(ACTION_CLOSE) continue } } } if lost { s.action(ACTION_TIMEOUT) continue } if s.IdleTimeout > 0 && s.Subscribers.Len() == 0 && time.Since(s.StartTime) > s.IdleTimeout { s.action(ACTION_LASTLEAVE) continue } } if s.State == STATE_WAITTRACK { s.action(ACTION_TRACKAVAILABLE) } s.Subscribers.AbortWait() s.timeout.Reset(time.Second * 15) } else { s.Debug("timeout", timeOutInfo) s.action(ACTION_TIMEOUT) } case action, ok := <-s.actionChan.C: //fmt.Println(" <-s.actionChan.C: 1111111111111111111111111111111") if !ok { return } timeStart = time.Now() switch v := action.(type) { case SubPulse: //fmt.Println(" <-s.actionChan.C SubPulse: 1111111111111111111111111111111") timeOutInfo = zap.String("action", "SubPulse") pulseSuber[v] = struct{}{} case *util.Promise[IPublisher]: //fmt.Println(" <-s.actionChan.C util.Promise[IPublisher]:: 1111111111111111111111111111111") timeOutInfo = zap.String("action", "Publish") if s.IsClosed() { v.Reject(ErrStreamIsClosed) break } puber := v.Value.GetPublisher() var oldPuber *Publisher if s.Publisher != nil { oldPuber = s.Publisher.GetPublisher() } conf := puber.Config republish := s.Publisher == v.Value // 重复发布 if republish { s.Info("republish") s.Tracks.Range(func(name string, t Track) { t.SetStuff(TrackStateOffline) }) } needKick := !republish && s.Publisher != nil && conf.KickExist // 需要踢掉老的发布者 if needKick { s.Warn("kick", zap.String("old type", oldPuber.Type)) s.Publisher.OnEvent(SEKick{CreateEvent[struct{}](util.Null)}) } s.Publisher = v.Value s.PublishTimeout = conf.PublishTimeout s.DelayCloseTimeout = conf.DelayCloseTimeout s.IdleTimeout = conf.IdleTimeout s.PauseTimeout = conf.PauseTimeout if s.action(ACTION_PUBLISH) || republish || needKick { if oldPuber != nil { // 接管老的发布者的音视频轨道 puber.AudioTrack = oldPuber.AudioTrack puber.VideoTrack = oldPuber.VideoTrack } if conf.InsertSEI { if s.Tracks.SEI == nil { s.Tracks.SEI = track.NewDataTrack[[]byte]("sei") s.Tracks.SEI.Locker = &sync.Mutex{} s.Tracks.SEI.SetStuff(s) if s.Tracks.Add("sei", s.Tracks.SEI) { s.Info("sei track added") } } } //fmt.Println(" <-s.actionChan.C util.Promise[IPublisher] Resolve:: 1111111111111111111111111111111",time.Now().String()) v.Resolve() } else { s.Warn("duplicate publish") v.Reject(ErrDuplicatePublish) } case *util.Promise[ISubscriber]: //fmt.Println(" util.Promise[ISubscriber] 1111111111111111111111111111111") timeOutInfo = zap.String("action", "Subscribe") if s.IsClosed() { v.Reject(ErrStreamIsClosed) break } suber := v.Value io := suber.GetSubscriber() sbConfig := io.Config waits := &waitTracks{ Promise: v, } if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" { waits.audio.Wait(strings.Split(ats, ",")...) } else if len(sbConfig.SubAudioTracks) > 0 { waits.audio.Wait(sbConfig.SubAudioTracks...) } else if sbConfig.SubAudio { waits.audio.Wait() } if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" { waits.video.Wait(strings.Split(vts, ",")...) } else if len(sbConfig.SubVideoTracks) > 0 { waits.video.Wait(sbConfig.SubVideoTracks...) } else if sbConfig.SubVideo { waits.video.Wait() } if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" { waits.data.Wait(strings.Split(dts, ",")...) } else { // waits.data.Wait() } if s.Publisher != nil { s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量 pubConfig := s.Publisher.GetPublisher().Config s.Tracks.Range(func(name string, t Track) { waits.Accept(t) }) if !pubConfig.PubAudio { waits.audio.StopWait() } else if s.State == STATE_PUBLISHING && len(waits.audio) > 0 { waits.audio.InviteTrack(suber) } else if s.Subscribers.waitAborted { waits.audio.StopWait() } if !pubConfig.PubVideo { waits.video.StopWait() } else if s.State == STATE_PUBLISHING && len(waits.video) > 0 { waits.video.InviteTrack(suber) } else if s.Subscribers.waitAborted { waits.video.StopWait() } } s.Subscribers.Add(suber, waits) if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE { s.action(ACTION_FIRSTENTER) } case Unsubscribe: //.Println("Unsubscribe 1111111111111111111111111111111") timeOutInfo = zap.String("action", "Unsubscribe") delete(pulseSuber, v) s.onSuberClose(v) case TrackRemoved: //fmt.Println("TrackRemoved 1111111111111111111111111111111") timeOutInfo = zap.String("action", "TrackRemoved") if s.IsClosed() { break } name := v.GetName() if t, ok := s.Tracks.LoadAndDelete(name); ok { s.Info("track -1", zap.String("name", name)) s.Subscribers.Broadcast(t) t.(common.Track).Dispose() } case *util.Promise[Track]: //fmt.Println("util.Promise[Track] 1111111111111111111111111111111") timeOutInfo = zap.String("action", "Track") if s.IsClosed() { v.Reject(ErrStreamIsClosed) break } if s.State == STATE_WAITPUBLISH { s.action(ACTION_PUBLISH) } pubConfig := s.GetPublisherConfig() name := v.Value.GetName() if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo { v.Reject(ErrTrackMute) continue } if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubAudio { v.Reject(ErrTrackMute) continue } if s.Tracks.Add(name, v.Value) { v.Resolve() s.Subscribers.OnTrack(v.Value) if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubAudio { s.Subscribers.AbortWait() } if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo { s.Subscribers.AbortWait() } if (s.Tracks.MainVideo != nil || !pubConfig.PubVideo) && (!pubConfig.PubAudio || s.Tracks.MainAudio != nil) { s.action(ACTION_TRACKAVAILABLE) } } else { v.Reject(ErrBadTrackName) } case NoMoreTrack: //fmt.Println("NoMoreTrack 1111111111111111111111111111111") s.Subscribers.AbortWait() case StreamAction: //fmt.Println("StreamAction 1111111111111111111111111111111") timeOutInfo = zap.String("action", "StreamAction"+v.String()) s.action(v) default: timeOutInfo = zap.String("action", "unknown") s.Error("unknown action", timeOutInfo) } if s.IsClosed() && s.actionChan.Close() { //再次尝试关闭 return } } } } func (s *Stream) AddTrack(t Track) (promise *util.Promise[Track]) { promise = util.NewPromise(t) if !s.Receive(promise) { promise.Reject(ErrStreamIsClosed) } return } func (s *Stream) RemoveTrack(t Track) { s.Receive(TrackRemoved{t}) } func (s *Stream) Pause() { s.IsPause = true } func (s *Stream) Resume() { s.IsPause = false } type TrackRemoved struct { Track } type SubPulse struct { ISubscriber } type Unsubscribe ISubscriber type NoMoreTrack struct{}