stream.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717
  1. package engine
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "runtime"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "unsafe"
  12. . "github.com/logrusorgru/aurora/v4"
  13. "go.uber.org/zap"
  14. "m7s.live/engine/v4/common"
  15. "m7s.live/engine/v4/config"
  16. "m7s.live/engine/v4/log"
  17. "m7s.live/engine/v4/track"
  18. "m7s.live/engine/v4/util"
  19. )
  20. type StreamState byte
  21. type StreamAction byte
  22. func (s StreamState) String() string {
  23. return StateNames[s]
  24. }
  25. func (s StreamAction) String() string {
  26. return ActionNames[s]
  27. }
  28. // 四状态机
  29. const (
  30. STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
  31. STATE_WAITTRACK // 等待音视频轨道激活
  32. STATE_PUBLISHING // 正在发布流状态
  33. STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启)
  34. STATE_CLOSED // 流已关闭,不可使用
  35. )
  36. const (
  37. ACTION_PUBLISH StreamAction = iota
  38. ACTION_TRACKAVAILABLE // 音视频轨道激活
  39. ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到
  40. ACTION_PUBLISHCLOSE // 发布者关闭
  41. ACTION_CLOSE // 主动关闭流
  42. ACTION_LASTLEAVE // 最后一个订阅者离开
  43. ACTION_FIRSTENTER // 第一个订阅者进入
  44. ACTION_NOTRACK // 没有音视频轨道
  45. )
  46. var StateNames = [...]string{"⌛", "🟡", "🟢", "🟠", "🔴"}
  47. var ActionNames = [...]string{"publish", "track available", "timeout", "publish close", "close", "last leave", "first enter", "no tracks"}
  48. /*
  49. stateDiagram-v2
  50. [*] --> ⌛等待发布者 : 创建
  51. ⌛等待发布者 --> 🟡等待轨道 :发布
  52. ⌛等待发布者 --> 🔴已关闭 :关闭
  53. ⌛等待发布者 --> 🔴已关闭 :超时
  54. ⌛等待发布者 --> 🔴已关闭 :最后订阅者离开
  55. 🟡等待轨道 --> 🟢正在发布 :轨道激活
  56. 🟡等待轨道 --> 🔴已关闭 :关闭
  57. 🟡等待轨道 --> 🔴已关闭 :超时
  58. 🟡等待轨道 --> 🔴已关闭 :最后订阅者离开
  59. 🟢正在发布 --> ⌛等待发布者: 发布者断开
  60. 🟢正在发布 --> 🟠等待关闭: 最后订阅者离开
  61. 🟢正在发布 --> 🔴已关闭 :关闭
  62. 🟠等待关闭 --> 🟢正在发布 :第一个订阅者进入
  63. 🟠等待关闭 --> 🔴已关闭 :关闭
  64. 🟠等待关闭 --> 🔴已关闭 :超时
  65. 🟠等待关闭 --> 🔴已关闭 :发布者断开
  66. */
  67. var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{
  68. {
  69. ACTION_PUBLISH: STATE_WAITTRACK,
  70. ACTION_TIMEOUT: STATE_CLOSED,
  71. ACTION_LASTLEAVE: STATE_CLOSED,
  72. ACTION_CLOSE: STATE_CLOSED,
  73. },
  74. {
  75. ACTION_TRACKAVAILABLE: STATE_PUBLISHING,
  76. ACTION_TIMEOUT: STATE_CLOSED,
  77. ACTION_LASTLEAVE: STATE_WAITCLOSE,
  78. ACTION_CLOSE: STATE_CLOSED,
  79. },
  80. {
  81. // ACTION_PUBLISHCLOSE: STATE_WAITPUBLISH,
  82. ACTION_TIMEOUT: STATE_WAITPUBLISH,
  83. ACTION_LASTLEAVE: STATE_WAITCLOSE,
  84. ACTION_CLOSE: STATE_CLOSED,
  85. },
  86. {
  87. // ACTION_PUBLISHCLOSE: STATE_CLOSED,
  88. ACTION_TIMEOUT: STATE_CLOSED,
  89. ACTION_FIRSTENTER: STATE_PUBLISHING,
  90. ACTION_CLOSE: STATE_CLOSED,
  91. },
  92. {},
  93. }
  94. // Streams 所有的流集合
  95. var Streams util.Map[string, *Stream]
  96. func FilterStreams[T IPublisher]() (ss []*Stream) {
  97. Streams.Range(func(_ string, s *Stream) {
  98. if _, ok := s.Publisher.(T); ok {
  99. ss = append(ss, s)
  100. }
  101. })
  102. return
  103. }
  104. type StreamTimeoutConfig struct {
  105. PublishTimeout time.Duration //发布者无数据后超时
  106. DelayCloseTimeout time.Duration //无订阅者后超时,必须先有一次订阅才会激活
  107. IdleTimeout time.Duration //无订阅者后超时,不需要订阅即可激活
  108. PauseTimeout time.Duration //暂停后超时
  109. NeverTimeout bool // 永不超时
  110. }
  111. type Tracks struct {
  112. sync.Map
  113. Video []*track.Video
  114. Audio []*track.Audio
  115. Data []common.Track
  116. MainVideo *track.Video
  117. MainAudio *track.Audio
  118. marshalLock sync.Mutex
  119. }
  120. func (tracks *Tracks) Range(f func(name string, t common.Track)) {
  121. tracks.Map.Range(func(k, v any) bool {
  122. f(k.(string), v.(common.Track))
  123. return true
  124. })
  125. }
  126. func (tracks *Tracks) Add(name string, t common.Track) bool {
  127. switch v := t.(type) {
  128. case *track.Video:
  129. if tracks.MainVideo == nil {
  130. tracks.MainVideo = v
  131. tracks.SetIDR(v)
  132. }
  133. case *track.Audio:
  134. if tracks.MainAudio == nil {
  135. tracks.MainAudio = v
  136. }
  137. if tracks.MainVideo != nil {
  138. v.Narrow()
  139. }
  140. }
  141. _, loaded := tracks.LoadOrStore(name, t)
  142. if !loaded {
  143. switch v := t.(type) {
  144. case *track.Video:
  145. tracks.Video = append(tracks.Video, v)
  146. case *track.Audio:
  147. tracks.Audio = append(tracks.Audio, v)
  148. default:
  149. tracks.Data = append(tracks.Data, v)
  150. }
  151. }
  152. return !loaded
  153. }
  154. func (tracks *Tracks) SetIDR(video common.Track) {
  155. if video == tracks.MainVideo {
  156. tracks.Range(func(_ string, t common.Track) {
  157. if v, ok := t.(*track.Audio); ok {
  158. v.Narrow()
  159. }
  160. })
  161. }
  162. }
  163. func (tracks *Tracks) MarshalJSON() ([]byte, error) {
  164. var trackList []common.Track
  165. tracks.marshalLock.Lock()
  166. defer tracks.marshalLock.Unlock()
  167. tracks.Range(func(_ string, t common.Track) {
  168. t.SnapForJson()
  169. trackList = append(trackList, t)
  170. })
  171. return json.Marshal(trackList)
  172. }
  173. var streamIdGen atomic.Uint32
  174. // Stream 流定义
  175. type Stream struct {
  176. timeout *time.Timer //当前状态的超时定时器
  177. actionChan util.SafeChan[any]
  178. ID uint32 // 流ID
  179. *log.Logger
  180. StartTime time.Time //创建时间
  181. StreamTimeoutConfig
  182. Path string
  183. Publisher IPublisher
  184. publisher *Publisher
  185. State StreamState
  186. SEHistory []StateEvent // 事件历史
  187. Subscribers Subscribers // 订阅者
  188. Tracks Tracks
  189. AppName string
  190. StreamName string
  191. IsPause bool // 是否处于暂停状态
  192. pubLocker sync.Mutex
  193. }
  194. type StreamSummay struct {
  195. Path string
  196. State StreamState
  197. Subscribers int
  198. Tracks []string
  199. StartTime time.Time
  200. Type string
  201. BPS int
  202. }
  203. func (s *Stream) GetType() string {
  204. if s.Publisher == nil {
  205. return ""
  206. }
  207. return s.publisher.Type
  208. }
  209. func (s *Stream) GetPath() string {
  210. return s.Path
  211. }
  212. func (s *Stream) GetStartTime() time.Time {
  213. return s.StartTime
  214. }
  215. func (s *Stream) GetPublisherConfig() *config.Publish {
  216. if s.Publisher == nil {
  217. s.Error("GetPublisherConfig: Publisher is nil")
  218. return nil
  219. }
  220. return s.Publisher.GetConfig()
  221. }
  222. // Summary 返回流的简要信息
  223. func (s *Stream) Summary() (r StreamSummay) {
  224. if s.publisher != nil {
  225. r.Type = s.publisher.Type
  226. }
  227. s.Tracks.Range(func(name string, t common.Track) {
  228. r.BPS += t.GetBPS()
  229. r.Tracks = append(r.Tracks, name)
  230. })
  231. r.Path = s.Path
  232. r.State = s.State
  233. r.Subscribers = s.Subscribers.Len()
  234. r.StartTime = s.StartTime
  235. return
  236. }
  237. func (s *Stream) SSRC() uint32 {
  238. return uint32(uintptr(unsafe.Pointer(s)))
  239. }
  240. func (s *Stream) SetIDR(video common.Track) {
  241. s.Tracks.SetIDR(video)
  242. }
  243. func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream, created bool) {
  244. p := strings.Split(streamPath, "/")
  245. pl := len(p)
  246. if pl < 2 {
  247. log.Warn(Red("Stream Path Format Error:"), streamPath)
  248. return nil, false
  249. }
  250. actual, loaded := Streams.LoadOrStore(streamPath, &Stream{
  251. Path: streamPath,
  252. AppName: strings.Join(p[1:pl-1], "/"),
  253. StreamName: p[pl-1],
  254. StartTime: time.Now(),
  255. timeout: time.NewTimer(waitTimeout),
  256. })
  257. if s := actual.(*Stream); loaded {
  258. for s.Logger == nil {
  259. runtime.Gosched()
  260. }
  261. s.Debug("found")
  262. return s, false
  263. } else {
  264. s.ID = streamIdGen.Add(1)
  265. s.Subscribers.Init()
  266. s.actionChan.Init(10)
  267. s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath), zap.Uint32("id", s.ID))
  268. s.Info("created")
  269. go s.run()
  270. return s, true
  271. }
  272. }
  273. func (r *Stream) action(action StreamAction) (ok bool) {
  274. var event StateEvent
  275. event.Target = r
  276. event.Action = action
  277. event.From = r.State
  278. event.Time = time.Now()
  279. var next StreamState
  280. if next, ok = event.Next(); ok {
  281. r.State = next
  282. r.SEHistory = append(r.SEHistory, event)
  283. // 给Publisher状态变更的回调,方便进行远程拉流等操作
  284. var stateEvent any
  285. r.Info(Sprintf("%s%s%s", event.From.String(), Yellow("->"), next.String()), zap.String("action", action.String()))
  286. switch next {
  287. case STATE_WAITPUBLISH:
  288. stateEvent = SEwaitPublish{event, r.Publisher}
  289. waitTime := time.Duration(0)
  290. if r.Publisher != nil {
  291. waitTime = r.Publisher.GetConfig().WaitCloseTimeout
  292. r.Tracks.Range(func(name string, t common.Track) {
  293. t.SetStuff(common.TrackStateOffline)
  294. })
  295. }
  296. r.Subscribers.OnPublisherLost(event)
  297. if suber := r.Subscribers.Pick(); suber != nil {
  298. r.Subscribers.Broadcast(stateEvent)
  299. if waitTime == 0 {
  300. waitTime = suber.GetSubscriber().Config.WaitTimeout
  301. }
  302. } else if waitTime == 0 {
  303. waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流
  304. }
  305. r.timeout.Reset(waitTime)
  306. r.Debug("wait publisher", zap.Duration("wait timeout", waitTime))
  307. case STATE_WAITTRACK:
  308. if len(r.SEHistory) > 1 {
  309. stateEvent = SErepublish{event}
  310. } else {
  311. stateEvent = SEpublish{event}
  312. }
  313. r.timeout.Reset(time.Second * 5) // 5秒心跳,检测track的存活度
  314. case STATE_PUBLISHING:
  315. stateEvent = SEtrackAvaliable{event}
  316. r.Subscribers.SendInviteTrack(r)
  317. r.Subscribers.Broadcast(stateEvent)
  318. if puller, ok := r.Publisher.(IPuller); ok {
  319. puller.OnConnected()
  320. }
  321. r.timeout.Reset(time.Second * 5) // 5秒心跳,检测track的存活度
  322. case STATE_WAITCLOSE:
  323. stateEvent = SEwaitClose{event}
  324. if r.IdleTimeout > 0 {
  325. r.timeout.Reset(r.IdleTimeout)
  326. } else {
  327. r.timeout.Reset(r.DelayCloseTimeout)
  328. }
  329. case STATE_CLOSED:
  330. Streams.Delete(r.Path)
  331. r.timeout.Stop()
  332. stateEvent = SEclose{event}
  333. r.Subscribers.Broadcast(stateEvent)
  334. r.Tracks.Range(func(_ string, t common.Track) {
  335. if t.GetPublisher().GetStream() == r {
  336. t.Dispose()
  337. }
  338. })
  339. r.Subscribers.Dispose()
  340. r.actionChan.Close()
  341. }
  342. if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond {
  343. r.Warn("action timeout", zap.String("action", action.String()), zap.Duration("cost", actionCoust))
  344. }
  345. EventBus <- stateEvent
  346. if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond {
  347. r.Warn("action timeout after eventbus", zap.String("action", action.String()), zap.Duration("cost", actionCoust))
  348. }
  349. if r.Publisher != nil {
  350. r.Publisher.OnEvent(stateEvent)
  351. if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond {
  352. r.Warn("action timeout after send to publisher", zap.String("action", action.String()), zap.Duration("cost", actionCoust))
  353. }
  354. }
  355. } else {
  356. r.Debug("wrong action", zap.String("action", action.String()))
  357. }
  358. return
  359. }
  360. func (r *Stream) IsShutdown() bool {
  361. switch l := len(r.SEHistory); l {
  362. case 0:
  363. return false
  364. case 1:
  365. return r.SEHistory[0].Action == ACTION_CLOSE
  366. default:
  367. switch r.SEHistory[l-1].Action {
  368. case ACTION_CLOSE:
  369. return true
  370. case ACTION_TIMEOUT:
  371. return r.SEHistory[l-1].From == STATE_WAITCLOSE
  372. }
  373. }
  374. return false
  375. }
  376. func (r *Stream) IsClosed() bool {
  377. if r == nil {
  378. return true
  379. }
  380. return r.State == STATE_CLOSED
  381. }
  382. func (r *Stream) Close() {
  383. r.Receive(ACTION_CLOSE)
  384. }
  385. func (s *Stream) Receive(event any) bool {
  386. if s.IsClosed() {
  387. return false
  388. }
  389. return s.actionChan.Send(event)
  390. }
  391. func (s *Stream) onSuberClose(sub ISubscriber) {
  392. s.Subscribers.Delete(sub)
  393. if s.Publisher != nil {
  394. s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量
  395. }
  396. if (s.DelayCloseTimeout > 0 || s.IdleTimeout > 0) && s.Subscribers.Len() == 0 {
  397. s.action(ACTION_LASTLEAVE)
  398. }
  399. }
  400. func (s *Stream) checkRunCost(timeStart time.Time, timeOutInfo zap.Field) {
  401. if cost := time.Since(timeStart); cost > 100*time.Millisecond {
  402. s.Warn("run timeout", timeOutInfo, zap.Duration("cost", cost))
  403. }
  404. }
  405. // 流状态处理中枢,包括接收订阅发布指令等
  406. func (s *Stream) run() {
  407. EventBus <- SEcreate{StreamEvent{Event[*Stream]{Target: s, Time: time.Now()}}}
  408. pulseTicker := time.NewTicker(EngineConfig.PulseInterval)
  409. defer pulseTicker.Stop()
  410. var timeOutInfo zap.Field
  411. var timeStart time.Time
  412. for pulseSuber := make(map[ISubscriber]struct{}); ; s.checkRunCost(timeStart, timeOutInfo) {
  413. select {
  414. case <-pulseTicker.C:
  415. timeStart = time.Now()
  416. timeOutInfo = zap.String("type", "pulse")
  417. for sub := range pulseSuber {
  418. sub.OnEvent(PulseEvent{CreateEvent(struct{}{})})
  419. }
  420. case <-s.timeout.C:
  421. timeStart = time.Now()
  422. timeOutInfo = zap.String("state", s.State.String())
  423. if s.State == STATE_PUBLISHING || s.State == STATE_WAITTRACK {
  424. for sub := range s.Subscribers.internal {
  425. if sub.IsClosed() {
  426. delete(s.Subscribers.internal, sub)
  427. s.Info("innersuber -1", zap.Int("remains", len(s.Subscribers.internal)))
  428. }
  429. }
  430. for sub := range s.Subscribers.public {
  431. if sub.IsClosed() {
  432. s.onSuberClose(sub)
  433. }
  434. }
  435. if !s.NeverTimeout {
  436. lost := false
  437. trackCount := 0
  438. timeout := s.PublishTimeout
  439. if s.IsPause {
  440. timeout = s.PauseTimeout
  441. }
  442. s.Tracks.Range(func(name string, t common.Track) {
  443. trackCount++
  444. switch t.(type) {
  445. case *track.Video, *track.Audio:
  446. // track 超过一定时间没有更新数据了
  447. if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > timeout {
  448. s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", timeout))
  449. lost = true
  450. }
  451. }
  452. })
  453. if !lost {
  454. if trackCount == 0 {
  455. s.Warn("no tracks")
  456. if time.Since(s.StartTime) > timeout {
  457. lost = true
  458. s.action(ACTION_CLOSE)
  459. }
  460. continue
  461. } else if s.Publisher != nil && s.Publisher.IsClosed() {
  462. s.Warn("publish is closed", zap.Error(context.Cause(s.publisher)), zap.String("ptr", fmt.Sprintf("%p", s.publisher.Context)))
  463. lost = true
  464. if len(s.Tracks.Audio)+len(s.Tracks.Video) == 0 {
  465. s.action(ACTION_CLOSE)
  466. continue
  467. }
  468. }
  469. }
  470. if lost {
  471. s.action(ACTION_TIMEOUT)
  472. continue
  473. }
  474. if s.IdleTimeout > 0 && s.Subscribers.Len() == 0 && time.Since(s.StartTime) > s.IdleTimeout {
  475. s.action(ACTION_LASTLEAVE)
  476. continue
  477. }
  478. }
  479. if s.State == STATE_WAITTRACK {
  480. s.action(ACTION_TRACKAVAILABLE)
  481. }
  482. s.Subscribers.AbortWait()
  483. s.timeout.Reset(time.Second * 5)
  484. } else {
  485. s.Debug("timeout", timeOutInfo)
  486. s.action(ACTION_TIMEOUT)
  487. }
  488. case action, ok := <-s.actionChan.C:
  489. if !ok {
  490. return
  491. }
  492. timeStart = time.Now()
  493. switch v := action.(type) {
  494. case SubPulse:
  495. timeOutInfo = zap.String("action", "SubPulse")
  496. pulseSuber[v] = struct{}{}
  497. case *util.Promise[IPublisher]:
  498. timeOutInfo = zap.String("action", "Publish")
  499. if s.IsClosed() {
  500. v.Reject(ErrStreamIsClosed)
  501. break
  502. }
  503. puber := v.Value.GetPublisher()
  504. oldPuber := s.publisher
  505. s.publisher = puber
  506. conf := puber.Config
  507. republish := s.Publisher == v.Value // 重复发布
  508. if republish {
  509. s.Info("republish")
  510. s.Tracks.Range(func(name string, t common.Track) {
  511. t.SetStuff(common.TrackStateOffline)
  512. })
  513. }
  514. needKick := !republish && oldPuber != nil && conf.KickExist // 需要踢掉老的发布者
  515. if needKick {
  516. s.Warn("kick", zap.String("old type", oldPuber.Type))
  517. s.Publisher.OnEvent(SEKick{CreateEvent(util.Null)})
  518. }
  519. s.Publisher = v.Value
  520. s.PublishTimeout = conf.PublishTimeout
  521. s.DelayCloseTimeout = conf.DelayCloseTimeout
  522. s.IdleTimeout = conf.IdleTimeout
  523. s.PauseTimeout = conf.PauseTimeout
  524. if s.action(ACTION_PUBLISH) || republish || needKick {
  525. if oldPuber != nil {
  526. // 接管老的发布者的音视频轨道
  527. puber.AudioTrack = oldPuber.AudioTrack
  528. puber.VideoTrack = oldPuber.VideoTrack
  529. }
  530. v.Resolve()
  531. } else {
  532. s.Warn("duplicate publish")
  533. v.Reject(ErrDuplicatePublish)
  534. }
  535. case *util.Promise[ISubscriber]:
  536. timeOutInfo = zap.String("action", "Subscribe")
  537. if s.IsClosed() {
  538. v.Reject(ErrStreamIsClosed)
  539. break
  540. }
  541. suber := v.Value
  542. io := suber.GetSubscriber()
  543. sbConfig := io.Config
  544. waits := &waitTracks{
  545. Promise: v,
  546. }
  547. if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" {
  548. waits.audio.Wait(strings.Split(ats, ",")...)
  549. } else if len(sbConfig.SubAudioTracks) > 0 {
  550. waits.audio.Wait(sbConfig.SubAudioTracks...)
  551. } else if sbConfig.SubAudio {
  552. waits.audio.Wait()
  553. }
  554. if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" {
  555. waits.video.Wait(strings.Split(vts, ",")...)
  556. } else if len(sbConfig.SubVideoTracks) > 0 {
  557. waits.video.Wait(sbConfig.SubVideoTracks...)
  558. } else if sbConfig.SubVideo {
  559. waits.video.Wait()
  560. }
  561. if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" {
  562. waits.data.Wait(strings.Split(dts, ",")...)
  563. } else {
  564. // waits.data.Wait()
  565. }
  566. if s.Publisher != nil {
  567. s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量
  568. pubConfig := s.Publisher.GetConfig()
  569. s.Tracks.Range(func(name string, t common.Track) {
  570. waits.Accept(t)
  571. })
  572. if !pubConfig.PubAudio {
  573. waits.audio.StopWait()
  574. } else if s.State == STATE_PUBLISHING && len(waits.audio) > 0 {
  575. waits.audio.InviteTrack(suber)
  576. } else if s.Subscribers.waitAborted {
  577. waits.audio.StopWait()
  578. }
  579. if !pubConfig.PubVideo {
  580. waits.video.StopWait()
  581. } else if s.State == STATE_PUBLISHING && len(waits.video) > 0 {
  582. waits.video.InviteTrack(suber)
  583. } else if s.Subscribers.waitAborted {
  584. waits.video.StopWait()
  585. }
  586. }
  587. s.Subscribers.Add(suber, waits)
  588. if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE {
  589. s.action(ACTION_FIRSTENTER)
  590. }
  591. case Unsubscribe:
  592. timeOutInfo = zap.String("action", "Unsubscribe")
  593. delete(pulseSuber, v)
  594. s.onSuberClose(v)
  595. case TrackRemoved:
  596. timeOutInfo = zap.String("action", "TrackRemoved")
  597. if s.IsClosed() {
  598. break
  599. }
  600. name := v.GetName()
  601. if t, ok := s.Tracks.LoadAndDelete(name); ok {
  602. s.Info("track -1", zap.String("name", name))
  603. s.Subscribers.Broadcast(t)
  604. t.(common.Track).Dispose()
  605. }
  606. case *util.Promise[common.Track]:
  607. timeOutInfo = zap.String("action", "Track")
  608. if s.IsClosed() {
  609. v.Reject(ErrStreamIsClosed)
  610. break
  611. }
  612. if s.State == STATE_WAITPUBLISH {
  613. s.action(ACTION_PUBLISH)
  614. }
  615. pubConfig := s.GetPublisherConfig()
  616. name := v.Value.GetName()
  617. if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo {
  618. v.Reject(ErrTrackMute)
  619. continue
  620. }
  621. if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubAudio {
  622. v.Reject(ErrTrackMute)
  623. continue
  624. }
  625. if s.Tracks.Add(name, v.Value) {
  626. v.Resolve()
  627. s.Subscribers.OnTrack(v.Value)
  628. if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubAudio {
  629. s.Subscribers.AbortWait()
  630. }
  631. if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo {
  632. s.Subscribers.AbortWait()
  633. }
  634. if (s.Tracks.MainVideo != nil || !pubConfig.PubVideo) && (!pubConfig.PubAudio || s.Tracks.MainAudio != nil) {
  635. s.action(ACTION_TRACKAVAILABLE)
  636. }
  637. } else {
  638. v.Reject(ErrBadTrackName)
  639. }
  640. case NoMoreTrack:
  641. s.Subscribers.AbortWait()
  642. case StreamAction:
  643. timeOutInfo = zap.String("action", "StreamAction"+v.String())
  644. s.action(v)
  645. default:
  646. timeOutInfo = zap.String("action", "unknown")
  647. s.Error("unknown action", timeOutInfo)
  648. }
  649. if s.IsClosed() && s.actionChan.Close() { //再次尝试关闭
  650. return
  651. }
  652. }
  653. }
  654. }
  655. func (s *Stream) AddTrack(t common.Track) (promise *util.Promise[common.Track]) {
  656. promise = util.NewPromise(t)
  657. if !s.Receive(promise) {
  658. promise.Reject(ErrStreamIsClosed)
  659. }
  660. return
  661. }
  662. func (s *Stream) RemoveTrack(t common.Track) {
  663. s.Receive(TrackRemoved{t})
  664. }
  665. func (s *Stream) Pause() {
  666. s.IsPause = true
  667. }
  668. func (s *Stream) Resume() {
  669. s.IsPause = false
  670. }
  671. type TrackRemoved struct {
  672. common.Track
  673. }
  674. type SubPulse struct {
  675. ISubscriber
  676. }
  677. type Unsubscribe ISubscriber
  678. type NoMoreTrack struct{}