pull.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package hdl
  2. import (
  3. "io"
  4. "net/http"
  5. "net/url"
  6. "os"
  7. "strings"
  8. "go.uber.org/zap"
  9. . "m7s.live/engine/v4"
  10. "m7s.live/engine/v4/codec"
  11. "m7s.live/engine/v4/util"
  12. )
  13. type HDLPuller struct {
  14. Publisher
  15. Puller
  16. absTS uint32 //绝对时间戳
  17. buf util.Buffer
  18. pool util.BytesPool
  19. }
  20. func NewHDLPuller() *HDLPuller {
  21. return &HDLPuller{
  22. buf: util.Buffer(make([]byte, len(codec.FLVHeader))),
  23. pool: make(util.BytesPool, 17),
  24. }
  25. }
  26. func (puller *HDLPuller) Disconnect() {
  27. if puller.Closer != nil {
  28. puller.Closer.Close()
  29. }
  30. }
  31. func (puller *HDLPuller) Connect() (err error) {
  32. HDLPlugin.Info("connect", zap.String("remoteURL", puller.RemoteURL))
  33. if strings.HasPrefix(puller.RemoteURL, "http") {
  34. var res *http.Response
  35. client := http.DefaultClient
  36. if puller.Puller.Config.Proxy != "" {
  37. proxy, err := url.Parse(puller.Puller.Config.Proxy)
  38. if err != nil {
  39. return err
  40. }
  41. transport := &http.Transport{Proxy: http.ProxyURL(proxy)}
  42. client = &http.Client{Transport: transport}
  43. }
  44. if res, err = client.Get(puller.RemoteURL); err == nil {
  45. if res.StatusCode != http.StatusOK {
  46. return io.EOF
  47. }
  48. puller.SetIO(res.Body)
  49. }
  50. } else {
  51. var res *os.File
  52. if res, err = os.Open(puller.RemoteURL); err == nil {
  53. puller.SetIO(res)
  54. }
  55. }
  56. if err == nil {
  57. head := puller.buf.SubBuf(0, len(codec.FLVHeader))
  58. if _, err = io.ReadFull(puller, head); err == nil {
  59. if head[0] != 'F' || head[1] != 'L' || head[2] != 'V' {
  60. err = codec.ErrInvalidFLV
  61. } else {
  62. configCopy := hdlConfig.GetPublishConfig()
  63. if head[4]&0x04 == 0 {
  64. configCopy.PubAudio = false
  65. }
  66. if head[4]&0x01 == 0 {
  67. configCopy.PubVideo = false
  68. }
  69. puller.Config = &configCopy
  70. }
  71. }
  72. }
  73. if err != nil {
  74. HDLPlugin.Error("connect", zap.Error(err))
  75. }
  76. return
  77. }
  78. func (puller *HDLPuller) Pull() (err error) {
  79. puller.buf.Reset()
  80. var startTs uint32
  81. for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(puller, puller.buf[:4]) {
  82. tmp := puller.buf.SubBuf(0, 11)
  83. _, err = io.ReadFull(puller, tmp)
  84. if err != nil {
  85. return
  86. }
  87. t := tmp.ReadByte()
  88. dataSize := tmp.ReadUint24()
  89. timestamp := tmp.ReadUint24() | uint32(tmp.ReadByte())<<24
  90. if startTs == 0 {
  91. startTs = timestamp
  92. }
  93. tmp.ReadUint24()
  94. var frame util.BLL
  95. mem := puller.pool.Get(int(dataSize))
  96. frame.Push(mem)
  97. _, err = io.ReadFull(puller, mem.Value)
  98. if err != nil {
  99. return
  100. }
  101. puller.absTS = offsetTs + (timestamp - startTs)
  102. // fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS)
  103. switch t {
  104. case codec.FLV_TAG_TYPE_AUDIO:
  105. if puller.Config.PubAudio {
  106. puller.WriteAVCCAudio(puller.absTS, &frame, puller.pool)
  107. }
  108. case codec.FLV_TAG_TYPE_VIDEO:
  109. if puller.Config.PubVideo {
  110. puller.WriteAVCCVideo(puller.absTS, &frame, puller.pool)
  111. }
  112. case codec.FLV_TAG_TYPE_SCRIPT:
  113. puller.Info("script", zap.ByteString("data", mem.Value))
  114. frame.Recycle()
  115. }
  116. }
  117. return
  118. }