pull.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package hls
  2. import (
  3. "compress/gzip"
  4. "context"
  5. "fmt"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "path/filepath"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/quangngotan95/go-m3u8/m3u8"
  15. "go.uber.org/zap"
  16. . "m7s.live/engine/v4"
  17. "m7s.live/engine/v4/util"
  18. )
  19. // HLSPuller HLS拉流者
  20. type HLSPuller struct {
  21. TSPublisher
  22. Puller
  23. Video M3u8Info
  24. Audio M3u8Info
  25. TsHead http.Header `json:"-" yaml:"-"` //用于提供cookie等特殊身份的http头
  26. SaveContext context.Context `json:"-" yaml:"-"` //用来保存ts文件到服务器
  27. memoryTs util.Map[string, util.Recyclable]
  28. }
  29. // M3u8Info m3u8文件的信息,用于拉取m3u8文件,和提供查询
  30. type M3u8Info struct {
  31. Req *http.Request `json:"-" yaml:"-"`
  32. M3U8Count int //一共拉取的m3u8文件数量
  33. TSCount int //一共拉取的ts文件数量
  34. LastM3u8 string //最后一个m3u8文件内容
  35. }
  36. type TSDownloader struct {
  37. client *http.Client
  38. url *url.URL
  39. req *http.Request
  40. res *http.Response
  41. wg sync.WaitGroup
  42. err error
  43. dur float64
  44. }
  45. func (p *TSDownloader) Start() {
  46. p.wg.Add(1)
  47. go func() {
  48. defer p.wg.Done()
  49. if tsRes, err := p.client.Do(p.req); err == nil {
  50. p.res = tsRes
  51. } else {
  52. p.err = err
  53. }
  54. }()
  55. }
  56. func (p *HLSPuller) GetTs(key string) util.Recyclable {
  57. return p.memoryTs.Get(key)
  58. }
  59. func readM3U8(res *http.Response) (playlist *m3u8.Playlist, err error) {
  60. var reader io.Reader = res.Body
  61. if res.Header.Get("Content-Encoding") == "gzip" {
  62. reader, err = gzip.NewReader(reader)
  63. }
  64. if err == nil {
  65. playlist, err = m3u8.Read(reader)
  66. }
  67. if err != nil {
  68. HLSPlugin.Error("readM3U8", zap.Error(err))
  69. }
  70. return
  71. }
  72. func (p *HLSPuller) OnEvent(event any) {
  73. switch event.(type) {
  74. case IPublisher:
  75. p.TSPublisher.OnEvent(event)
  76. if hlsConfig.RelayMode != 0 {
  77. p.Stream.NeverTimeout = true
  78. memoryTs.Add(p.StreamPath, p)
  79. }
  80. case SEKick, SEclose:
  81. if hlsConfig.RelayMode == 1 {
  82. memoryTs.Delete(p.StreamPath)
  83. }
  84. p.Publisher.OnEvent(event)
  85. default:
  86. p.Publisher.OnEvent(event)
  87. }
  88. }
  89. func (p *HLSPuller) pull(info *M3u8Info) (err error) {
  90. //请求失败自动退出
  91. req := info.Req.WithContext(p.Context)
  92. client := http.DefaultClient
  93. if p.Puller.Config.Proxy != "" {
  94. URL, err := url.Parse(p.Puller.Config.Proxy)
  95. if err != nil {
  96. return err
  97. }
  98. client = &http.Client{
  99. Transport: &http.Transport{
  100. Proxy: http.ProxyURL(URL),
  101. },
  102. }
  103. }
  104. sequence := -1
  105. lastTs := make(map[string]bool)
  106. tsbuffer := make(chan io.ReadCloser)
  107. bytesPool := make(util.BytesPool, 30)
  108. tsRing := util.NewRing[string](6)
  109. var tsReader *TSReader
  110. if hlsConfig.RelayMode != 1 {
  111. tsReader = NewTSReader(&p.TSPublisher)
  112. defer tsReader.Close()
  113. }
  114. defer func() {
  115. HLSPlugin.Info("hls exit", zap.String("streamPath", p.Stream.Path), zap.Error(err))
  116. close(tsbuffer)
  117. p.Stop()
  118. }()
  119. var maxResolution *m3u8.PlaylistItem
  120. for errcount := 0; err == nil; err = p.Err() {
  121. resp, err1 := client.Do(req)
  122. if err1 != nil {
  123. return err1
  124. }
  125. req = resp.Request
  126. if playlist, err2 := readM3U8(resp); err2 == nil {
  127. errcount = 0
  128. info.LastM3u8 = playlist.String()
  129. //if !playlist.Live {
  130. // log.Println(p.LastM3u8)
  131. // return
  132. //}
  133. //fmt.Println("ppppppppppppppppppppppp:",playlist.Sequence,sequence,playlist)
  134. if playlist.Sequence <= sequence {
  135. HLSPlugin.Warn("same sequence", zap.Int("sequence", playlist.Sequence), zap.Int("max", sequence))
  136. time.Sleep(time.Second)
  137. continue
  138. }
  139. info.M3U8Count++
  140. sequence = playlist.Sequence
  141. thisTs := make(map[string]bool)
  142. tsItems := make([]*m3u8.SegmentItem, 0)
  143. discontinuity := false
  144. for _, item := range playlist.Items {
  145. switch v := item.(type) {
  146. case *m3u8.PlaylistItem:
  147. if (maxResolution == nil || maxResolution.Resolution != nil && (maxResolution.Resolution.Width < v.Resolution.Width || maxResolution.Resolution.Height < v.Resolution.Height)) || maxResolution.Bandwidth < v.Bandwidth {
  148. maxResolution = v
  149. }
  150. case *m3u8.DiscontinuityItem:
  151. discontinuity = true
  152. case *m3u8.SegmentItem:
  153. thisTs[v.Segment] = true
  154. if _, ok := lastTs[v.Segment]; ok && !discontinuity {
  155. continue
  156. }
  157. tsItems = append(tsItems, v)
  158. case *m3u8.MediaItem:
  159. if p.Audio.Req == nil {
  160. if url, err := req.URL.Parse(*v.URI); err == nil {
  161. newReq, _ := http.NewRequest("GET", url.String(), nil)
  162. newReq.Header = req.Header
  163. p.Audio.Req = newReq
  164. go p.pull(&p.Audio)
  165. }
  166. }
  167. }
  168. }
  169. if maxResolution != nil && len(tsItems) == 0 {
  170. if url, err := req.URL.Parse(maxResolution.URI); err == nil {
  171. if strings.HasSuffix(url.Path, ".m3u8") {
  172. p.Video.Req, _ = http.NewRequest("GET", url.String(), nil)
  173. p.Video.Req.Header = req.Header
  174. req = p.Video.Req
  175. continue
  176. }
  177. }
  178. }
  179. tsCount := len(tsItems)
  180. HLSPlugin.Debug("readM3U8", zap.Int("sequence", sequence), zap.Int("tscount", tsCount))
  181. lastTs = thisTs
  182. if tsCount > 3 {
  183. tsItems = tsItems[tsCount-3:]
  184. }
  185. var plBuffer util.Buffer
  186. relayPlayList := Playlist{
  187. Writer: &plBuffer,
  188. Targetduration: playlist.Target,
  189. Sequence: playlist.Sequence,
  190. }
  191. if hlsConfig.RelayMode != 0 {
  192. relayPlayList.Init()
  193. }
  194. var tsDownloaders = make([]*TSDownloader, len(tsItems))
  195. for i, v := range tsItems {
  196. if p.Err() != nil {
  197. return p.Err()
  198. }
  199. tsUrl, _ := info.Req.URL.Parse(v.Segment)
  200. tsReq, _ := http.NewRequestWithContext(p.Context, "GET", tsUrl.String(), nil)
  201. tsReq.Header = p.TsHead
  202. // t1 := time.Now()
  203. tsDownloaders[i] = &TSDownloader{
  204. client: client,
  205. req: tsReq,
  206. url: tsUrl,
  207. dur: v.Duration,
  208. }
  209. tsDownloaders[i].Start()
  210. }
  211. ts := time.Now().UnixMilli()
  212. for i, v := range tsDownloaders {
  213. HLSPlugin.Debug("start download ts", zap.String("tsUrl", v.url.String()))
  214. v.wg.Wait()
  215. if v.res != nil {
  216. info.TSCount++
  217. p.SetIO(v.res.Body)
  218. if p.SaveContext != nil && p.SaveContext.Err() == nil {
  219. os.MkdirAll(filepath.Join(hlsConfig.Path, p.Stream.Path), 0766)
  220. if f, err := os.OpenFile(filepath.Join(hlsConfig.Path, p.Stream.Path, filepath.Base(v.url.Path)), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666); err == nil {
  221. p.SetIO(io.TeeReader(v.res.Body, f))
  222. p.Closer = f
  223. }
  224. }
  225. var tsBytes *util.Buffer
  226. var item *util.ListItem[util.Buffer]
  227. // 包含转发
  228. if hlsConfig.RelayMode != 0 {
  229. if v.res.ContentLength < 0 {
  230. item = bytesPool.GetShell(make([]byte, 0))
  231. } else {
  232. item = bytesPool.Get(int(v.res.ContentLength))
  233. item.Value = item.Value[:0]
  234. }
  235. tsBytes = &item.Value
  236. }
  237. switch hlsConfig.RelayMode {
  238. case 1:
  239. io.Copy(tsBytes, p.Reader)
  240. case 2:
  241. p.SetIO(io.TeeReader(p.Reader, tsBytes))
  242. fallthrough
  243. case 0:
  244. tsReader.Feed(p)
  245. }
  246. if hlsConfig.RelayMode != 0 {
  247. tsFilename := fmt.Sprintf("%d_%d.ts", ts, i)
  248. tsFilePath := p.StreamPath + "/" + tsFilename
  249. var plInfo = PlaylistInf{
  250. Title: p.Stream.StreamName + "/" + tsFilename,
  251. Duration: v.dur,
  252. FilePath: tsFilePath,
  253. }
  254. relayPlayList.WriteInf(plInfo)
  255. p.memoryTs.Add(tsFilePath, item)
  256. next := tsRing.Next()
  257. if next.Value != "" {
  258. item, _ := p.memoryTs.Delete(next.Value)
  259. if item == nil {
  260. p.Warn("memoryTs delete nil", zap.String("tsFilePath", next.Value))
  261. } else {
  262. item.Recycle()
  263. }
  264. }
  265. next.Value = tsFilePath
  266. tsRing = next
  267. }
  268. p.Close()
  269. } else if v.err != nil {
  270. HLSPlugin.Error("reqTs", zap.String("streamPath", p.Stream.Path), zap.Error(v.err))
  271. } else {
  272. HLSPlugin.Error("reqTs", zap.String("streamPath", p.Stream.Path))
  273. }
  274. HLSPlugin.Debug("finish download ts", zap.String("tsUrl", v.url.String()))
  275. }
  276. if hlsConfig.RelayMode != 0 {
  277. m3u8 := string(plBuffer)
  278. HLSPlugin.Debug("write m3u8", zap.String("streamPath", p.Stream.Path), zap.String("m3u8", m3u8))
  279. memoryM3u8.Store(p.Stream.Path, m3u8)
  280. }
  281. } else {
  282. HLSPlugin.Error("readM3u8", zap.String("streamPath", p.Stream.Path), zap.Error(err2))
  283. errcount++
  284. if errcount > 10 {
  285. return err2
  286. }
  287. }
  288. }
  289. return
  290. }
  291. func (p *HLSPuller) Connect() (err error) {
  292. p.Video.Req, err = http.NewRequest("GET", p.RemoteURL, nil)
  293. return
  294. }
  295. func (p *HLSPuller) Disconnect() {
  296. p.Stop(zap.String("reason", "disconnect"))
  297. }
  298. func (p *HLSPuller) Pull() error {
  299. return p.pull(&p.Video)
  300. }