puller.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package engine
  2. import (
  3. "io"
  4. "strings"
  5. "time"
  6. "go.uber.org/zap"
  7. "m7s.live/engine/v4/config"
  8. )
  9. var zshutdown = zap.String("reason", "shutdown")
  10. var znomorereconnect = zap.String("reason", "no more reconnect")
  11. type IPuller interface {
  12. IPublisher
  13. Connect() error
  14. OnConnected()
  15. Disconnect()
  16. Pull() error
  17. Reconnect() bool
  18. init(streamPath string, url string, conf *config.Pull)
  19. startPull(IPuller)
  20. }
  21. // 用于远程拉流的发布者
  22. type Puller struct {
  23. ClientIO[config.Pull]
  24. }
  25. func (pub *Puller) OnConnected() {
  26. pub.ReConnectCount = 0 // 重置重连次数
  27. }
  28. // 是否需要重连
  29. func (pub *Puller) Reconnect() (ok bool) {
  30. ok = pub.Config.RePull == -1 || pub.ReConnectCount <= pub.Config.RePull
  31. pub.ReConnectCount++
  32. return
  33. }
  34. func (pub *Puller) startPull(puller IPuller) {
  35. badPuller := true
  36. var stream *Stream
  37. var err error
  38. streamPath := pub.StreamPath
  39. if i := strings.Index(streamPath, "?"); i >= 0 {
  40. streamPath = streamPath[:i]
  41. }
  42. if oldPuller, loaded := Pullers.LoadOrStore(streamPath, puller); loaded {
  43. pub := oldPuller.(IPuller).GetPublisher()
  44. stream = pub.Stream
  45. if stream != nil {
  46. puller.Error("puller already exists", zap.Int8("streamState", int8(stream.State)))
  47. if stream.State == STATE_CLOSED {
  48. oldPuller.(IPuller).Stop(zap.String("reason", "dead puller"))
  49. }
  50. } else {
  51. puller.Error("puller already exists", zap.Time("createAt", pub.StartTime))
  52. }
  53. return
  54. }
  55. defer func() {
  56. Pullers.Delete(streamPath)
  57. puller.Disconnect()
  58. if stream != nil {
  59. stream.Close()
  60. }
  61. }()
  62. puber := puller.GetPublisher()
  63. var startTime time.Time
  64. for puller.Info("start pull"); puller.Reconnect(); puller.Warn("restart pull") {
  65. if time.Since(startTime) < 5*time.Second {
  66. time.Sleep(5 * time.Second)
  67. }
  68. startTime = time.Now()
  69. if err = puller.Connect(); err != nil {
  70. if err == io.EOF {
  71. puller.Info("pull complete")
  72. return
  73. }
  74. puller.Error("pull connect", zap.Error(err))
  75. if badPuller {
  76. return
  77. }
  78. } else {
  79. if err = puller.Publish(pub.StreamPath, puller); err != nil {
  80. puller.Error("pull publish", zap.Error(err))
  81. return
  82. }
  83. if stream != puber.Stream {
  84. // 老流中的音视频轨道不可再使用
  85. puber.AudioTrack = nil
  86. puber.VideoTrack = nil
  87. }
  88. stream = puber.Stream
  89. badPuller = false
  90. if err = puller.Pull(); err != nil && !puller.IsShutdown() {
  91. puller.Error("pull interrupt", zap.Error(err))
  92. }
  93. }
  94. if puller.IsShutdown() {
  95. puller.Info("stop pull", zshutdown)
  96. return
  97. }
  98. puller.Disconnect()
  99. }
  100. puller.Warn("stop pull", znomorereconnect)
  101. }