pusher.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package engine
  2. import (
  3. "io"
  4. "time"
  5. "go.uber.org/zap"
  6. "m7s.live/engine/v4/config"
  7. )
  8. type IPusher interface {
  9. ISubscriber
  10. Push() error
  11. Connect() error
  12. Disconnect()
  13. init(string, string, *config.Push)
  14. Reconnect() bool
  15. startPush(IPusher)
  16. }
  17. type Pusher struct {
  18. ClientIO[config.Push]
  19. }
  20. // 是否需要重连
  21. func (pub *Pusher) Reconnect() (result bool) {
  22. result = pub.Config.RePush == -1 || pub.ReConnectCount <= pub.Config.RePush
  23. pub.ReConnectCount++
  24. return
  25. }
  26. func (pub *Pusher) startPush(pusher IPusher) {
  27. badPusher := true
  28. var err error
  29. key := pub.RemoteURL
  30. if oldPusher, loaded := Pushers.LoadOrStore(key, pusher); loaded {
  31. sub := oldPusher.(IPusher).GetSubscriber()
  32. pusher.Error("pusher already exists", zap.Time("createAt", sub.StartTime))
  33. return
  34. }
  35. defer Pushers.Delete(key)
  36. defer pusher.Disconnect()
  37. var startTime time.Time
  38. for pusher.Info("start push"); pusher.Reconnect(); pusher.Warn("restart push") {
  39. if time.Since(startTime) < 5*time.Second {
  40. time.Sleep(5 * time.Second)
  41. }
  42. startTime = time.Now()
  43. if err = pusher.Subscribe(pub.StreamPath, pusher); err != nil {
  44. pusher.Error("push subscribe", zap.Error(err))
  45. } else {
  46. stream := pusher.GetSubscriber().Stream
  47. if err = pusher.Connect(); err != nil {
  48. if err == io.EOF {
  49. pusher.Info("push complete")
  50. return
  51. }
  52. pusher.Error("push connect", zap.Error(err))
  53. time.Sleep(time.Second * 5)
  54. stream.Receive(Unsubscribe(pusher)) // 通知stream移除订阅者
  55. if badPusher {
  56. return
  57. }
  58. } else if err = pusher.Push(); err != nil && !stream.IsClosed() {
  59. pusher.Error("push", zap.Error(err))
  60. pusher.Stop()
  61. }
  62. badPusher = false
  63. if stream.IsClosed() {
  64. pusher.Info("stop push closed")
  65. return
  66. }
  67. }
  68. pusher.Disconnect()
  69. }
  70. pusher.Warn("stop push stop reconnect")
  71. }