pull.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package hdl
  2. import (
  3. "io"
  4. "net/http"
  5. "net/url"
  6. "os"
  7. "strings"
  8. "go.uber.org/zap"
  9. . "m7s.live/engine/v4"
  10. "m7s.live/engine/v4/codec"
  11. "m7s.live/engine/v4/util"
  12. )
  13. type HDLPuller struct {
  14. Publisher
  15. Puller
  16. absTS uint32 //绝对时间戳
  17. buf util.Buffer
  18. pool util.BytesPool
  19. }
  20. func NewHDLPuller() *HDLPuller {
  21. return &HDLPuller{
  22. buf: util.Buffer(make([]byte, len(codec.FLVHeader))),
  23. pool: make(util.BytesPool, 17),
  24. }
  25. }
  26. func (puller *HDLPuller) Disconnect() {
  27. if puller.Closer != nil {
  28. puller.Closer.Close()
  29. }
  30. }
  31. func (puller *HDLPuller) Connect() (err error) {
  32. HDLPlugin.Info("connect", zap.String("remoteURL", puller.RemoteURL))
  33. if strings.HasPrefix(puller.RemoteURL, "http") {
  34. var res *http.Response
  35. client := http.DefaultClient
  36. if puller.Puller.Config.Proxy != "" {
  37. proxy, err := url.Parse(puller.Puller.Config.Proxy)
  38. if err != nil {
  39. return err
  40. }
  41. transport := &http.Transport{Proxy: http.ProxyURL(proxy)}
  42. client = &http.Client{Transport: transport}
  43. }
  44. if res, err = client.Get(puller.RemoteURL); err == nil {
  45. if res.StatusCode != http.StatusOK {
  46. return io.EOF
  47. }
  48. puller.SetIO(res.Body)
  49. }
  50. } else {
  51. var res *os.File
  52. if res, err = os.Open(puller.RemoteURL); err == nil {
  53. puller.SetIO(res)
  54. }
  55. }
  56. if err == nil {
  57. head := puller.buf.SubBuf(0, len(codec.FLVHeader))
  58. if _, err = io.ReadFull(puller, head); err == nil {
  59. if head[0] != 'F' || head[1] != 'L' || head[2] != 'V' {
  60. err = codec.ErrInvalidFLV
  61. } else {
  62. configCopy := hdlConfig.GetPublishConfig()
  63. configCopy.PubAudio = head[4]&0x04 != 0
  64. configCopy.PubVideo = head[4]&0x01 != 0
  65. puller.Config = &configCopy
  66. }
  67. }
  68. }
  69. if err != nil {
  70. HDLPlugin.Error("connect", zap.Error(err))
  71. }
  72. return
  73. }
  74. func (puller *HDLPuller) Pull() (err error) {
  75. puller.buf.Reset()
  76. var startTs uint32
  77. for offsetTs := puller.absTS; err == nil && puller.Err() == nil; _, err = io.ReadFull(puller, puller.buf[:4]) {
  78. tmp := puller.buf.SubBuf(0, 11)
  79. _, err = io.ReadFull(puller, tmp)
  80. if err != nil {
  81. return
  82. }
  83. t := tmp.ReadByte()
  84. dataSize := tmp.ReadUint24()
  85. timestamp := tmp.ReadUint24() | uint32(tmp.ReadByte())<<24
  86. if startTs == 0 {
  87. startTs = timestamp
  88. }
  89. tmp.ReadUint24()
  90. var frame util.BLL
  91. mem := puller.pool.Get(int(dataSize))
  92. frame.Push(mem)
  93. _, err = io.ReadFull(puller, mem.Value)
  94. if err != nil {
  95. return
  96. }
  97. puller.absTS = offsetTs + (timestamp - startTs)
  98. // fmt.Println(t, offsetTs, timestamp, startTs, puller.absTS)
  99. switch t {
  100. case codec.FLV_TAG_TYPE_AUDIO:
  101. puller.WriteAVCCAudio(puller.absTS, &frame, puller.pool)
  102. case codec.FLV_TAG_TYPE_VIDEO:
  103. puller.WriteAVCCVideo(puller.absTS, &frame, puller.pool)
  104. case codec.FLV_TAG_TYPE_SCRIPT:
  105. puller.Info("script", zap.ByteString("data", mem.Value))
  106. frame.Recycle()
  107. }
  108. }
  109. return
  110. }