stream.go 22 KB


  1. package engine
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "runtime"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "unsafe"
  12. . "github.com/logrusorgru/aurora/v4"
  13. "go.uber.org/zap"
  14. "m7s.live/engine/v4/common"
  15. . "m7s.live/engine/v4/common"
  16. "m7s.live/engine/v4/config"
  17. "m7s.live/engine/v4/log"
  18. "m7s.live/engine/v4/track"
  19. "m7s.live/engine/v4/util"
  20. )
  21. type StreamState byte
  22. type StreamAction byte
  23. func (s StreamState) String() string {
  24. return StateNames[s]
  25. }
  26. func (s StreamAction) String() string {
  27. return ActionNames[s]
  28. }
  29. // 四状态机
  30. const (
  31. STATE_WAITPUBLISH StreamState = iota // 等待发布者状态
  32. STATE_WAITTRACK // 等待音视频轨道激活
  33. STATE_PUBLISHING // 正在发布流状态
  34. STATE_WAITCLOSE // 等待关闭状态(自动关闭延时开启)
  35. STATE_CLOSED // 流已关闭,不可使用
  36. )
  37. const (
  38. ACTION_PUBLISH StreamAction = iota
  39. ACTION_TRACKAVAILABLE // 音视频轨道激活
  40. ACTION_TIMEOUT // 发布流长时间没有数据/长时间没有发布者发布流/等待关闭时间到
  41. ACTION_PUBLISHCLOSE // 发布者关闭
  42. ACTION_CLOSE // 主动关闭流
  43. ACTION_LASTLEAVE // 最后一个订阅者离开
  44. ACTION_FIRSTENTER // 第一个订阅者进入
  45. ACTION_NOTRACK // 没有音视频轨道
  46. )
  47. var StateNames = [...]string{"⌛0", "🟡1", "🟢2", "🟠3", "🔴4"}
  48. var ActionNames = [...]string{"publish", "track available", "timeout", "publish close", "close", "last leave", "first enter", "no tracks"}
  49. /*
  50. stateDiagram-v2
  51. [*] --> ⌛等待发布者 : 创建
  52. ⌛等待发布者 --> 🟡等待轨道 :发布
  53. ⌛等待发布者 --> 🔴已关闭 :关闭
  54. ⌛等待发布者 --> 🔴已关闭 :超时
  55. ⌛等待发布者 --> 🔴已关闭 :最后订阅者离开
  56. 🟡等待轨道 --> 🟢正在发布 :轨道激活
  57. 🟡等待轨道 --> 🔴已关闭 :关闭
  58. 🟡等待轨道 --> 🔴已关闭 :超时
  59. 🟡等待轨道 --> 🔴已关闭 :最后订阅者离开
  60. 🟢正在发布 --> ⌛等待发布者: 发布者断开
  61. 🟢正在发布 --> 🟠等待关闭: 最后订阅者离开
  62. 🟢正在发布 --> 🔴已关闭 :关闭
  63. 🟠等待关闭 --> 🟢正在发布 :第一个订阅者进入
  64. 🟠等待关闭 --> 🔴已关闭 :关闭
  65. 🟠等待关闭 --> 🔴已关闭 :超时
  66. 🟠等待关闭 --> 🔴已关闭 :发布者断开
  67. */
  68. var StreamFSM = [len(StateNames)]map[StreamAction]StreamState{
  69. {
  70. ACTION_PUBLISH: STATE_WAITTRACK,
  71. ACTION_TIMEOUT: STATE_CLOSED,
  72. ACTION_LASTLEAVE: STATE_CLOSED,
  73. ACTION_CLOSE: STATE_CLOSED,
  74. },
  75. {
  76. ACTION_TRACKAVAILABLE: STATE_PUBLISHING,
  77. ACTION_TIMEOUT: STATE_CLOSED,
  78. ACTION_LASTLEAVE: STATE_WAITCLOSE,
  79. ACTION_CLOSE: STATE_CLOSED,
  80. },
  81. {
  82. // ACTION_PUBLISHCLOSE: STATE_WAITPUBLISH,
  83. ACTION_TIMEOUT: STATE_WAITPUBLISH,
  84. ACTION_LASTLEAVE: STATE_WAITCLOSE,
  85. ACTION_CLOSE: STATE_CLOSED,
  86. },
  87. {
  88. // ACTION_PUBLISHCLOSE: STATE_CLOSED,
  89. ACTION_TIMEOUT: STATE_CLOSED,
  90. ACTION_FIRSTENTER: STATE_PUBLISHING,
  91. ACTION_CLOSE: STATE_CLOSED,
  92. },
  93. {},
  94. }
  95. // Streams 所有的流集合
  96. var Streams util.Map[string, *Stream]
  97. func FilterStreams[T IPublisher]() (ss []*Stream) {
  98. Streams.Range(func(_ string, s *Stream) {
  99. if _, ok := s.Publisher.(T); ok {
  100. ss = append(ss, s)
  101. }
  102. })
  103. return
  104. }
  105. type StreamTimeoutConfig struct {
  106. PublishTimeout time.Duration //发布者无数据后超时
  107. DelayCloseTimeout time.Duration //无订阅者后超时,必须先有一次订阅才会激活
  108. IdleTimeout time.Duration //无订阅者后超时,不需要订阅即可激活
  109. PauseTimeout time.Duration //暂停后超时
  110. NeverTimeout bool // 永不超时
  111. }
  112. type Tracks struct {
  113. sync.Map
  114. Video []*track.Video
  115. Audio []*track.Audio
  116. Data []common.Track
  117. MainVideo *track.Video
  118. MainAudio *track.Audio
  119. SEI *track.Data[[]byte]
  120. marshalLock sync.Mutex
  121. }
  122. func (tracks *Tracks) Range(f func(name string, t Track)) {
  123. tracks.Map.Range(func(k, v any) bool {
  124. f(k.(string), v.(Track))
  125. return true
  126. })
  127. }
  128. func (tracks *Tracks) Add(name string, t Track) bool {
  129. //fmt.Println("ADD TRACK 1111111111111111111111111",name)
  130. switch v := t.(type) {
  131. case *track.Video:
  132. if tracks.MainVideo == nil {
  133. tracks.MainVideo = v
  134. tracks.SetIDR(v)
  135. }
  136. if tracks.SEI != nil {
  137. v.SEIReader = &track.DataReader[[]byte]{}
  138. v.SEIReader.Ring = tracks.SEI.Ring
  139. }
  140. case *track.Audio:
  141. if tracks.MainAudio == nil {
  142. tracks.MainAudio = v
  143. }
  144. if tracks.MainVideo != nil {
  145. v.Narrow()
  146. }
  147. }
  148. _, loaded := tracks.LoadOrStore(name, t)
  149. if !loaded {
  150. switch v := t.(type) {
  151. case *track.Video:
  152. tracks.Video = append(tracks.Video, v)
  153. case *track.Audio:
  154. tracks.Audio = append(tracks.Audio, v)
  155. default:
  156. tracks.Data = append(tracks.Data, v)
  157. }
  158. }
  159. return !loaded
  160. }
  161. func (tracks *Tracks) SetIDR(video Track) {
  162. if video == tracks.MainVideo {
  163. tracks.Range(func(_ string, t Track) {
  164. if v, ok := t.(*track.Audio); ok {
  165. v.Narrow()
  166. }
  167. })
  168. }
  169. }
  170. func (tracks *Tracks) AddSEI(t byte, data []byte) bool {
  171. if tracks.SEI != nil {
  172. l := len(data)
  173. var buffer util.Buffer
  174. buffer.WriteByte(t)
  175. for l >= 255 {
  176. buffer.WriteByte(255)
  177. l -= 255
  178. }
  179. buffer.WriteByte(byte(l))
  180. buffer.Write(data)
  181. buffer.WriteByte(0x80)
  182. tracks.SEI.Push(buffer)
  183. return true
  184. }
  185. return false
  186. }
  187. func (tracks *Tracks) MarshalJSON() ([]byte, error) {
  188. var trackList []Track
  189. tracks.marshalLock.Lock()
  190. defer tracks.marshalLock.Unlock()
  191. tracks.Range(func(_ string, t Track) {
  192. t.SnapForJson()
  193. trackList = append(trackList, t)
  194. })
  195. return json.Marshal(trackList)
  196. }
  197. var streamIdGen atomic.Uint32
  198. // Stream 流定义
  199. type Stream struct {
  200. timeout *time.Timer //当前状态的超时定时器
  201. actionChan util.SafeChan[any]
  202. ID uint32 // 流ID
  203. *log.Logger
  204. StartTime time.Time //创建时间
  205. StreamTimeoutConfig
  206. Path string
  207. Publisher IPublisher
  208. State StreamState
  209. SEHistory []StateEvent // 事件历史
  210. Subscribers Subscribers // 订阅者
  211. Tracks Tracks
  212. AppName string
  213. StreamName string
  214. IsPause bool // 是否处于暂停状态
  215. pubLocker sync.Mutex
  216. }
  217. type StreamSummay struct {
  218. Path string
  219. State StreamState
  220. Subscribers int
  221. Tracks []string
  222. StartTime time.Time
  223. Type string
  224. BPS int
  225. }
  226. func (s *Stream) GetType() string {
  227. if s.Publisher == nil {
  228. return ""
  229. }
  230. return s.Publisher.GetPublisher().Type
  231. }
  232. func (s *Stream) GetStartTime() time.Time {
  233. return s.StartTime
  234. }
  235. func (s *Stream) GetPublisherConfig() *config.Publish {
  236. if s.Publisher == nil {
  237. s.Error("GetPublisherConfig: Publisher is nil")
  238. return nil
  239. }
  240. return s.Publisher.GetPublisher().Config
  241. }
  242. // Summary 返回流的简要信息
  243. func (s *Stream) Summary() (r StreamSummay) {
  244. if s.Publisher != nil {
  245. r.Type = s.Publisher.GetPublisher().Type
  246. }
  247. s.Tracks.Range(func(name string, t Track) {
  248. r.BPS += t.GetBPS()
  249. r.Tracks = append(r.Tracks, name)
  250. })
  251. r.Path = s.Path
  252. r.State = s.State
  253. r.Subscribers = s.Subscribers.Len()
  254. r.StartTime = s.StartTime
  255. return
  256. }
  257. func (s *Stream) SSRC() uint32 {
  258. return uint32(uintptr(unsafe.Pointer(s)))
  259. }
  260. func (s *Stream) SetIDR(video Track) {
  261. s.Tracks.SetIDR(video)
  262. }
  263. func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream, created bool) {
  264. p := strings.Split(streamPath, "/")
  265. if len(p) < 2 {
  266. log.Warn(Red("Stream Path Format Error:"), streamPath)
  267. return nil, false
  268. }
  269. actual, loaded := Streams.LoadOrStore(streamPath, &Stream{
  270. Path: streamPath,
  271. AppName: p[0],
  272. StreamName: strings.Join(p[1:], "/"),
  273. StartTime: time.Now(),
  274. timeout: time.NewTimer(waitTimeout),
  275. })
  276. if s := actual.(*Stream); loaded {
  277. for s.Logger == nil {
  278. runtime.Gosched()
  279. }
  280. s.Debug("found")
  281. return s, false
  282. } else {
  283. s.ID = streamIdGen.Add(1)
  284. s.Subscribers.Init()
  285. s.actionChan.Init(10)
  286. s.Logger = log.LocaleLogger.With(zap.String("stream", streamPath), zap.Uint32("id", s.ID))
  287. s.Debug("created")
  288. go s.run()
  289. return s, true
  290. }
  291. }
  292. func (r *Stream) action(action StreamAction) (ok bool) {
  293. //fmt.Println("ACTON 1111111111111111111111:",action.String(),r.State)
  294. var event StateEvent
  295. event.Target = r
  296. event.Action = action
  297. event.From = r.State
  298. event.Time = time.Now()
  299. var next StreamState
  300. if next, ok = event.Next(); ok {
  301. r.State = next
  302. //.Println("NEXT 1111111111111111111111:",next.String())
  303. r.SEHistory = append(r.SEHistory, event)
  304. // 给Publisher状态变更的回调,方便进行远程拉流等操作
  305. var stateEvent any
  306. r.Info(Sprintf("%s%s%s", event.From.String(), Yellow("->"), next.String()), zap.String("action", action.String()))
  307. switch next {
  308. case STATE_WAITPUBLISH:
  309. //fmt.Println("STATE_WAITPUBLISH 1111111111111111111111111")
  310. stateEvent = SEwaitPublish{event, r.Publisher}
  311. waitTime := time.Duration(0)
  312. if r.Publisher != nil {
  313. waitTime = r.Publisher.GetPublisher().Config.WaitCloseTimeout
  314. r.Tracks.Range(func(name string, t Track) {
  315. t.SetStuff(TrackStateOffline)
  316. })
  317. }
  318. r.Subscribers.OnPublisherLost(event)
  319. if suber := r.Subscribers.Pick(); suber != nil {
  320. r.Subscribers.Broadcast(stateEvent)
  321. if waitTime == 0 {
  322. waitTime = suber.GetSubscriber().Config.WaitTimeout
  323. }
  324. } else if waitTime == 0 {
  325. waitTime = time.Millisecond * 10 //没有订阅者也没有配置发布者等待重连时间,默认10ms后关闭流
  326. }
  327. r.timeout.Reset(waitTime)
  328. r.Info("wait publisher", zap.Duration("wait timeout", waitTime))
  329. case STATE_WAITTRACK:
  330. //fmt.Println("STATE_WAITTRACK 1111111111111111111111111")
  331. if len(r.SEHistory) > 1 {
  332. stateEvent = SErepublish{event}
  333. } else {
  334. stateEvent = SEpublish{event}
  335. }
  336. r.timeout.Reset(time.Second * 20) // 5秒心跳,检测track的存活度
  337. case STATE_PUBLISHING:
  338. //fmt.Println("STATE_PUBLISHING 1111111111111111111111111")
  339. stateEvent = SEtrackAvaliable{event}
  340. r.Subscribers.SendInviteTrack(r)
  341. r.Subscribers.Broadcast(stateEvent)
  342. if puller, ok := r.Publisher.(IPuller); ok {
  343. puller.OnConnected()
  344. }
  345. r.timeout.Reset(time.Second * 30)
  346. //r.timeout.Reset(time.Second * 15) // 5秒心跳,检测track的存活度
  347. case STATE_WAITCLOSE:
  348. //fmt.Println("STATE_WAITCLOSE 1111111111111111111111111")
  349. stateEvent = SEwaitClose{event}
  350. if r.IdleTimeout > 0 {
  351. r.timeout.Reset(r.IdleTimeout)
  352. } else {
  353. r.timeout.Reset(r.DelayCloseTimeout)
  354. }
  355. case STATE_CLOSED:
  356. //fmt.Println("STATE_CLOSED 1111111111111111111111111")
  357. Streams.Delete(r.Path)
  358. r.timeout.Stop()
  359. stateEvent = SEclose{event}
  360. r.Subscribers.Broadcast(stateEvent)
  361. r.Tracks.Range(func(_ string, t Track) {
  362. t.Dispose()
  363. })
  364. r.Subscribers.Dispose()
  365. r.actionChan.Close()
  366. }
  367. if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond {
  368. r.Warn("action timeout", zap.String("action", action.String()), zap.Duration("cost", actionCoust))
  369. }
  370. EventBus <- stateEvent
  371. if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond {
  372. r.Warn("action timeout after eventbus", zap.String("action", action.String()), zap.Duration("cost", actionCoust))
  373. }
  374. if r.Publisher != nil {
  375. r.Publisher.OnEvent(stateEvent)
  376. if actionCoust := time.Since(event.Time); actionCoust > 100*time.Millisecond {
  377. r.Warn("action timeout after send to publisher", zap.String("action", action.String()), zap.Duration("cost", actionCoust))
  378. }
  379. }
  380. } else {
  381. r.Debug("wrong action", zap.String("action", action.String()))
  382. }
  383. return
  384. }
  385. func (r *Stream) IsShutdown() bool {
  386. switch l := len(r.SEHistory); l {
  387. case 0:
  388. return false
  389. case 1:
  390. return r.SEHistory[0].Action == ACTION_CLOSE
  391. default:
  392. switch r.SEHistory[l-1].Action {
  393. case ACTION_CLOSE:
  394. return true
  395. case ACTION_TIMEOUT:
  396. return r.SEHistory[l-1].From == STATE_WAITCLOSE
  397. }
  398. }
  399. return false
  400. }
  401. func (r *Stream) IsClosed() bool {
  402. if r == nil {
  403. return true
  404. }
  405. return r.State == STATE_CLOSED
  406. }
  407. func (r *Stream) Close() {
  408. r.Receive(ACTION_CLOSE)
  409. }
  410. func (s *Stream) Receive(event any) bool {
  411. if s.IsClosed() {
  412. return false
  413. }
  414. return s.actionChan.Send(event)
  415. }
  416. func (s *Stream) onSuberClose(sub ISubscriber) {
  417. s.Subscribers.Delete(sub)
  418. if s.Publisher != nil {
  419. s.Publisher.OnEvent(sub) // 通知Publisher有订阅者离开,在回调中可以去获取订阅者数量
  420. }
  421. if (s.DelayCloseTimeout > 0 || s.IdleTimeout > 0) && s.Subscribers.Len() == 0 {
  422. s.action(ACTION_LASTLEAVE)
  423. }
  424. }
  425. func (s *Stream) checkRunCost(timeStart time.Time, timeOutInfo zap.Field) {
  426. if cost := time.Since(timeStart); cost > 100*time.Millisecond {
  427. s.Warn("run timeout", timeOutInfo, zap.Duration("cost", cost))
  428. }
  429. }
  430. // 流状态处理中枢,包括接收订阅发布指令等
  431. func (s *Stream) run() {
  432. EventBus <- SEcreate{StreamEvent{Event[*Stream]{Target: s, Time: time.Now()}}}
  433. pulseTicker := time.NewTicker(EngineConfig.PulseInterval)
  434. defer pulseTicker.Stop()
  435. var timeOutInfo zap.Field
  436. var timeStart time.Time
  437. for pulseSuber := make(map[ISubscriber]struct{}); ; s.checkRunCost(timeStart, timeOutInfo) {
  438. select {
  439. case <-pulseTicker.C:
  440. //fmt.Println(" <-pulseTicker.C 1111111111111111111111111111111")
  441. timeStart = time.Now()
  442. timeOutInfo = zap.String("type", "pulse")
  443. for sub := range pulseSuber {
  444. sub.OnEvent(PulseEvent{CreateEvent(struct{}{})})
  445. }
  446. case <-s.timeout.C:
  447. timeStart = time.Now()
  448. timeOutInfo = zap.String("state", s.State.String())
  449. if s.State == STATE_PUBLISHING || s.State == STATE_WAITTRACK {
  450. for sub := range s.Subscribers.internal {
  451. if sub.IsClosed() {
  452. delete(s.Subscribers.internal, sub)
  453. s.Info("innersuber -1", zap.Int("remains", len(s.Subscribers.internal)))
  454. }
  455. }
  456. for sub := range s.Subscribers.public {
  457. if sub.IsClosed() {
  458. s.onSuberClose(sub)
  459. }
  460. }
  461. if !s.NeverTimeout {
  462. lost := false
  463. trackCount := 0
  464. timeout := s.PublishTimeout
  465. if s.IsPause {
  466. timeout = s.PauseTimeout
  467. }
  468. s.Tracks.Range(func(name string, t Track) {
  469. trackCount++
  470. switch t.(type) {
  471. case *track.Video, *track.Audio:
  472. // track 超过一定时间没有更新数据了
  473. if lastWriteTime := t.LastWriteTime(); !lastWriteTime.IsZero() && time.Since(lastWriteTime) > timeout {
  474. s.Warn("track timeout", zap.String("name", name), zap.Time("last writetime", lastWriteTime), zap.Duration("timeout", timeout))
  475. lost = true
  476. }
  477. }
  478. })
  479. if !lost {
  480. if trackCount == 0 {
  481. s.Warn("no tracks")
  482. lost = true
  483. s.action(ACTION_CLOSE)
  484. continue
  485. } else if s.Publisher != nil && s.Publisher.IsClosed() {
  486. s.Warn("publish is closed", zap.Error(context.Cause(s.Publisher.GetPublisher())), zap.String("ptr", fmt.Sprintf("%p", s.Publisher.GetPublisher().Context)))
  487. lost = true
  488. if len(s.Tracks.Audio)+len(s.Tracks.Video) == 0 {
  489. s.action(ACTION_CLOSE)
  490. continue
  491. }
  492. }
  493. }
  494. if lost {
  495. s.action(ACTION_TIMEOUT)
  496. continue
  497. }
  498. if s.IdleTimeout > 0 && s.Subscribers.Len() == 0 && time.Since(s.StartTime) > s.IdleTimeout {
  499. s.action(ACTION_LASTLEAVE)
  500. continue
  501. }
  502. }
  503. if s.State == STATE_WAITTRACK {
  504. s.action(ACTION_TRACKAVAILABLE)
  505. }
  506. s.Subscribers.AbortWait()
  507. s.timeout.Reset(time.Second * 15)
  508. } else {
  509. s.Debug("timeout", timeOutInfo)
  510. s.action(ACTION_TIMEOUT)
  511. }
  512. case action, ok := <-s.actionChan.C:
  513. //fmt.Println(" <-s.actionChan.C: 1111111111111111111111111111111")
  514. if !ok {
  515. return
  516. }
  517. timeStart = time.Now()
  518. switch v := action.(type) {
  519. case SubPulse:
  520. //fmt.Println(" <-s.actionChan.C SubPulse: 1111111111111111111111111111111")
  521. timeOutInfo = zap.String("action", "SubPulse")
  522. pulseSuber[v] = struct{}{}
  523. case *util.Promise[IPublisher]:
  524. //fmt.Println(" <-s.actionChan.C util.Promise[IPublisher]:: 1111111111111111111111111111111")
  525. timeOutInfo = zap.String("action", "Publish")
  526. if s.IsClosed() {
  527. v.Reject(ErrStreamIsClosed)
  528. break
  529. }
  530. puber := v.Value.GetPublisher()
  531. var oldPuber *Publisher
  532. if s.Publisher != nil {
  533. oldPuber = s.Publisher.GetPublisher()
  534. }
  535. conf := puber.Config
  536. republish := s.Publisher == v.Value // 重复发布
  537. if republish {
  538. s.Info("republish")
  539. s.Tracks.Range(func(name string, t Track) {
  540. t.SetStuff(TrackStateOffline)
  541. })
  542. }
  543. needKick := !republish && s.Publisher != nil && conf.KickExist // 需要踢掉老的发布者
  544. if needKick {
  545. s.Warn("kick", zap.String("old type", oldPuber.Type))
  546. s.Publisher.OnEvent(SEKick{CreateEvent[struct{}](util.Null)})
  547. }
  548. s.Publisher = v.Value
  549. s.PublishTimeout = conf.PublishTimeout
  550. s.DelayCloseTimeout = conf.DelayCloseTimeout
  551. s.IdleTimeout = conf.IdleTimeout
  552. s.PauseTimeout = conf.PauseTimeout
  553. if s.action(ACTION_PUBLISH) || republish || needKick {
  554. if oldPuber != nil {
  555. // 接管老的发布者的音视频轨道
  556. puber.AudioTrack = oldPuber.AudioTrack
  557. puber.VideoTrack = oldPuber.VideoTrack
  558. }
  559. if conf.InsertSEI {
  560. if s.Tracks.SEI == nil {
  561. s.Tracks.SEI = track.NewDataTrack[[]byte]("sei")
  562. s.Tracks.SEI.Locker = &sync.Mutex{}
  563. s.Tracks.SEI.SetStuff(s)
  564. if s.Tracks.Add("sei", s.Tracks.SEI) {
  565. s.Info("sei track added")
  566. }
  567. }
  568. }
  569. //fmt.Println(" <-s.actionChan.C util.Promise[IPublisher] Resolve:: 1111111111111111111111111111111",time.Now().String())
  570. v.Resolve()
  571. } else {
  572. s.Warn("duplicate publish")
  573. v.Reject(ErrDuplicatePublish)
  574. }
  575. case *util.Promise[ISubscriber]:
  576. //fmt.Println(" util.Promise[ISubscriber] 1111111111111111111111111111111")
  577. timeOutInfo = zap.String("action", "Subscribe")
  578. if s.IsClosed() {
  579. v.Reject(ErrStreamIsClosed)
  580. break
  581. }
  582. suber := v.Value
  583. io := suber.GetSubscriber()
  584. sbConfig := io.Config
  585. waits := &waitTracks{
  586. Promise: v,
  587. }
  588. if ats := io.Args.Get(sbConfig.SubAudioArgName); ats != "" {
  589. waits.audio.Wait(strings.Split(ats, ",")...)
  590. } else if len(sbConfig.SubAudioTracks) > 0 {
  591. waits.audio.Wait(sbConfig.SubAudioTracks...)
  592. } else if sbConfig.SubAudio {
  593. waits.audio.Wait()
  594. }
  595. if vts := io.Args.Get(sbConfig.SubVideoArgName); vts != "" {
  596. waits.video.Wait(strings.Split(vts, ",")...)
  597. } else if len(sbConfig.SubVideoTracks) > 0 {
  598. waits.video.Wait(sbConfig.SubVideoTracks...)
  599. } else if sbConfig.SubVideo {
  600. waits.video.Wait()
  601. }
  602. if dts := io.Args.Get(sbConfig.SubDataArgName); dts != "" {
  603. waits.data.Wait(strings.Split(dts, ",")...)
  604. } else {
  605. // waits.data.Wait()
  606. }
  607. if s.Publisher != nil {
  608. s.Publisher.OnEvent(v) // 通知Publisher有新的订阅者加入,在回调中可以去获取订阅者数量
  609. pubConfig := s.Publisher.GetPublisher().Config
  610. s.Tracks.Range(func(name string, t Track) {
  611. waits.Accept(t)
  612. })
  613. if !pubConfig.PubAudio {
  614. waits.audio.StopWait()
  615. } else if s.State == STATE_PUBLISHING && len(waits.audio) > 0 {
  616. waits.audio.InviteTrack(suber)
  617. } else if s.Subscribers.waitAborted {
  618. waits.audio.StopWait()
  619. }
  620. if !pubConfig.PubVideo {
  621. waits.video.StopWait()
  622. } else if s.State == STATE_PUBLISHING && len(waits.video) > 0 {
  623. waits.video.InviteTrack(suber)
  624. } else if s.Subscribers.waitAborted {
  625. waits.video.StopWait()
  626. }
  627. }
  628. s.Subscribers.Add(suber, waits)
  629. if s.Subscribers.Len() == 1 && s.State == STATE_WAITCLOSE {
  630. s.action(ACTION_FIRSTENTER)
  631. }
  632. case Unsubscribe:
  633. //.Println("Unsubscribe 1111111111111111111111111111111")
  634. timeOutInfo = zap.String("action", "Unsubscribe")
  635. delete(pulseSuber, v)
  636. s.onSuberClose(v)
  637. case TrackRemoved:
  638. //fmt.Println("TrackRemoved 1111111111111111111111111111111")
  639. timeOutInfo = zap.String("action", "TrackRemoved")
  640. if s.IsClosed() {
  641. break
  642. }
  643. name := v.GetName()
  644. if t, ok := s.Tracks.LoadAndDelete(name); ok {
  645. s.Info("track -1", zap.String("name", name))
  646. s.Subscribers.Broadcast(t)
  647. t.(common.Track).Dispose()
  648. }
  649. case *util.Promise[Track]:
  650. //fmt.Println("util.Promise[Track] 1111111111111111111111111111111")
  651. timeOutInfo = zap.String("action", "Track")
  652. if s.IsClosed() {
  653. v.Reject(ErrStreamIsClosed)
  654. break
  655. }
  656. if s.State == STATE_WAITPUBLISH {
  657. s.action(ACTION_PUBLISH)
  658. }
  659. pubConfig := s.GetPublisherConfig()
  660. name := v.Value.GetName()
  661. if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubVideo {
  662. v.Reject(ErrTrackMute)
  663. continue
  664. }
  665. if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubAudio {
  666. v.Reject(ErrTrackMute)
  667. continue
  668. }
  669. if s.Tracks.Add(name, v.Value) {
  670. v.Resolve()
  671. s.Subscribers.OnTrack(v.Value)
  672. if _, ok := v.Value.(*track.Video); ok && !pubConfig.PubAudio {
  673. s.Subscribers.AbortWait()
  674. }
  675. if _, ok := v.Value.(*track.Audio); ok && !pubConfig.PubVideo {
  676. s.Subscribers.AbortWait()
  677. }
  678. if (s.Tracks.MainVideo != nil || !pubConfig.PubVideo) && (!pubConfig.PubAudio || s.Tracks.MainAudio != nil) {
  679. s.action(ACTION_TRACKAVAILABLE)
  680. }
  681. } else {
  682. v.Reject(ErrBadTrackName)
  683. }
  684. case NoMoreTrack:
  685. //fmt.Println("NoMoreTrack 1111111111111111111111111111111")
  686. s.Subscribers.AbortWait()
  687. case StreamAction:
  688. //fmt.Println("StreamAction 1111111111111111111111111111111")
  689. timeOutInfo = zap.String("action", "StreamAction"+v.String())
  690. s.action(v)
  691. default:
  692. timeOutInfo = zap.String("action", "unknown")
  693. s.Error("unknown action", timeOutInfo)
  694. }
  695. if s.IsClosed() && s.actionChan.Close() { //再次尝试关闭
  696. return
  697. }
  698. }
  699. }
  700. }
  701. func (s *Stream) AddTrack(t Track) (promise *util.Promise[Track]) {
  702. promise = util.NewPromise(t)
  703. if !s.Receive(promise) {
  704. promise.Reject(ErrStreamIsClosed)
  705. }
  706. return
  707. }
  708. func (s *Stream) RemoveTrack(t Track) {
  709. s.Receive(TrackRemoved{t})
  710. }
  711. func (s *Stream) Pause() {
  712. s.IsPause = true
  713. }
  714. func (s *Stream) Resume() {
  715. s.IsPause = false
  716. }
  717. type TrackRemoved struct {
  718. Track
  719. }
  720. type SubPulse struct {
  721. ISubscriber
  722. }
  723. type Unsubscribe ISubscriber
  724. type NoMoreTrack struct{}