12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- package engine
- import (
- "io"
- "time"
- "go.uber.org/zap"
- "m7s.live/engine/v4/config"
- )
- type IPusher interface {
- ISubscriber
- Push() error
- Connect() error
- Disconnect()
- init(string, string, *config.Push)
- Reconnect() bool
- startPush(IPusher)
- }
- type Pusher struct {
- ClientIO[config.Push]
- }
- // 是否需要重连
- func (pub *Pusher) Reconnect() (result bool) {
- result = pub.Config.RePush == -1 || pub.ReConnectCount <= pub.Config.RePush
- pub.ReConnectCount++
- return
- }
- func (pub *Pusher) startPush(pusher IPusher) {
- badPusher := true
- var err error
- key := pub.RemoteURL
- if oldPusher, loaded := Pushers.LoadOrStore(key, pusher); loaded {
- sub := oldPusher.(IPusher).GetSubscriber()
- pusher.Error("pusher already exists", zap.Time("createAt", sub.StartTime))
- return
- }
- defer Pushers.Delete(key)
- defer pusher.Disconnect()
- var startTime time.Time
- for pusher.Info("start push"); pusher.Reconnect(); pusher.Warn("restart push") {
- if time.Since(startTime) < 5*time.Second {
- time.Sleep(5 * time.Second)
- }
- startTime = time.Now()
- if err = pusher.Subscribe(pub.StreamPath, pusher); err != nil {
- pusher.Error("push subscribe", zap.Error(err))
- } else {
- stream := pusher.GetSubscriber().Stream
- if err = pusher.Connect(); err != nil {
- if err == io.EOF {
- pusher.Info("push complete")
- return
- }
- pusher.Error("push connect", zap.Error(err))
- time.Sleep(time.Second * 5)
- stream.Receive(Unsubscribe(pusher)) // 通知stream移除订阅者
- if badPusher {
- return
- }
- } else if err = pusher.Push(); err != nil && !stream.IsClosed() {
- pusher.Error("push", zap.Error(err))
- pusher.Stop()
- }
- badPusher = false
- if stream.IsClosed() {
- pusher.Info("stop push closed")
- return
- }
- }
- pusher.Disconnect()
- }
- pusher.Warn("stop push stop reconnect")
- }
|