123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- package hls
- import (
- "compress/gzip"
- "context"
- "fmt"
- "io"
- "net/http"
- "net/url"
- "os"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "github.com/quangngotan95/go-m3u8/m3u8"
- "go.uber.org/zap"
- . "m7s.live/engine/v4"
- "m7s.live/engine/v4/util"
- )
- // HLSPuller HLS拉流者
- type HLSPuller struct {
- TSPublisher
- Puller
- Video M3u8Info
- Audio M3u8Info
- TsHead http.Header `json:"-" yaml:"-"` //用于提供cookie等特殊身份的http头
- SaveContext context.Context `json:"-" yaml:"-"` //用来保存ts文件到服务器
- memoryTs util.Map[string, util.Recyclable]
- }
- // M3u8Info m3u8文件的信息,用于拉取m3u8文件,和提供查询
- type M3u8Info struct {
- Req *http.Request `json:"-" yaml:"-"`
- M3U8Count int //一共拉取的m3u8文件数量
- TSCount int //一共拉取的ts文件数量
- LastM3u8 string //最后一个m3u8文件内容
- }
- type TSDownloader struct {
- client *http.Client
- url *url.URL
- req *http.Request
- res *http.Response
- wg sync.WaitGroup
- err error
- dur float64
- }
- func (p *TSDownloader) Start() {
- p.wg.Add(1)
- go func() {
- defer p.wg.Done()
- if tsRes, err := p.client.Do(p.req); err == nil {
- p.res = tsRes
- } else {
- p.err = err
- }
- }()
- }
- func (p *HLSPuller) GetTs(key string) util.Recyclable {
- return p.memoryTs.Get(key)
- }
- func readM3U8(res *http.Response) (playlist *m3u8.Playlist, err error) {
- var reader io.Reader = res.Body
- if res.Header.Get("Content-Encoding") == "gzip" {
- reader, err = gzip.NewReader(reader)
- }
- if err == nil {
- playlist, err = m3u8.Read(reader)
- }
- if err != nil {
- HLSPlugin.Error("readM3U8", zap.Error(err))
- }
- return
- }
- func (p *HLSPuller) OnEvent(event any) {
- switch event.(type) {
- case IPublisher:
- p.TSPublisher.OnEvent(event)
- if hlsConfig.RelayMode != 0 {
- p.Stream.NeverTimeout = true
- memoryTs.Add(p.StreamPath, p)
- }
- case SEKick, SEclose:
- if hlsConfig.RelayMode == 1 {
- memoryTs.Delete(p.StreamPath)
- }
- p.Publisher.OnEvent(event)
- default:
- p.Publisher.OnEvent(event)
- }
- }
- func (p *HLSPuller) pull(info *M3u8Info) (err error) {
- //请求失败自动退出
- req := info.Req.WithContext(p.Context)
- client := http.DefaultClient
- if p.Puller.Config.Proxy != "" {
- URL, err := url.Parse(p.Puller.Config.Proxy)
- if err != nil {
- return err
- }
- client = &http.Client{
- Transport: &http.Transport{
- Proxy: http.ProxyURL(URL),
- },
- }
- }
- sequence := -1
- lastTs := make(map[string]bool)
- tsbuffer := make(chan io.ReadCloser)
- bytesPool := make(util.BytesPool, 30)
- tsRing := util.NewRing[string](6)
- var tsReader *TSReader
- if hlsConfig.RelayMode != 1 {
- tsReader = NewTSReader(&p.TSPublisher)
- defer tsReader.Close()
- }
- defer func() {
- HLSPlugin.Info("hls exit", zap.String("streamPath", p.Stream.Path), zap.Error(err))
- close(tsbuffer)
- p.Stop()
- }()
- var maxResolution *m3u8.PlaylistItem
- for errcount := 0; err == nil; err = p.Err() {
- resp, err1 := client.Do(req)
- if err1 != nil {
- return err1
- }
- req = resp.Request
- if playlist, err2 := readM3U8(resp); err2 == nil {
- errcount = 0
- info.LastM3u8 = playlist.String()
- //if !playlist.Live {
- // log.Println(p.LastM3u8)
- // return
- //}
- if playlist.Sequence <= sequence {
- HLSPlugin.Warn("same sequence", zap.Int("sequence", playlist.Sequence), zap.Int("max", sequence))
- time.Sleep(time.Second)
- continue
- }
- info.M3U8Count++
- sequence = playlist.Sequence
- thisTs := make(map[string]bool)
- tsItems := make([]*m3u8.SegmentItem, 0)
- discontinuity := false
- for _, item := range playlist.Items {
- switch v := item.(type) {
- case *m3u8.PlaylistItem:
- if (maxResolution == nil || maxResolution.Resolution != nil && (maxResolution.Resolution.Width < v.Resolution.Width || maxResolution.Resolution.Height < v.Resolution.Height)) || maxResolution.Bandwidth < v.Bandwidth {
- maxResolution = v
- }
- case *m3u8.DiscontinuityItem:
- discontinuity = true
- case *m3u8.SegmentItem:
- thisTs[v.Segment] = true
- if _, ok := lastTs[v.Segment]; ok && !discontinuity {
- continue
- }
- tsItems = append(tsItems, v)
- case *m3u8.MediaItem:
- if p.Audio.Req == nil {
- if url, err := req.URL.Parse(*v.URI); err == nil {
- newReq, _ := http.NewRequest("GET", url.String(), nil)
- newReq.Header = req.Header
- p.Audio.Req = newReq
- go p.pull(&p.Audio)
- }
- }
- }
- }
- if maxResolution != nil && len(tsItems) == 0 {
- if url, err := req.URL.Parse(maxResolution.URI); err == nil {
- if strings.HasSuffix(url.Path, ".m3u8") {
- p.Video.Req, _ = http.NewRequest("GET", url.String(), nil)
- p.Video.Req.Header = req.Header
- req = p.Video.Req
- continue
- }
- }
- }
- tsCount := len(tsItems)
- HLSPlugin.Debug("readM3U8", zap.Int("sequence", sequence), zap.Int("tscount", tsCount))
- lastTs = thisTs
- if tsCount > 3 {
- tsItems = tsItems[tsCount-3:]
- }
- var plBuffer util.Buffer
- relayPlayList := Playlist{
- Writer: &plBuffer,
- Targetduration: playlist.Target,
- Sequence: playlist.Sequence,
- }
- if hlsConfig.RelayMode != 0 {
- relayPlayList.Init()
- }
- var tsDownloaders = make([]*TSDownloader, len(tsItems))
- for i, v := range tsItems {
- if p.Err() != nil {
- return p.Err()
- }
- tsUrl, _ := info.Req.URL.Parse(v.Segment)
- tsReq, _ := http.NewRequestWithContext(p.Context, "GET", tsUrl.String(), nil)
- tsReq.Header = p.TsHead
- // t1 := time.Now()
- tsDownloaders[i] = &TSDownloader{
- client: client,
- req: tsReq,
- url: tsUrl,
- dur: v.Duration,
- }
- tsDownloaders[i].Start()
- }
- ts := time.Now().UnixMilli()
- for i, v := range tsDownloaders {
- HLSPlugin.Debug("start download ts", zap.String("tsUrl", v.url.String()))
- v.wg.Wait()
- if v.res != nil {
- info.TSCount++
- p.SetIO(v.res.Body)
- if p.SaveContext != nil && p.SaveContext.Err() == nil {
- os.MkdirAll(filepath.Join(hlsConfig.Path, p.Stream.Path), 0766)
- if f, err := os.OpenFile(filepath.Join(hlsConfig.Path, p.Stream.Path, filepath.Base(v.url.Path)), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666); err == nil {
- p.SetIO(io.TeeReader(v.res.Body, f))
- p.Closer = f
- }
- }
- var tsBytes *util.Buffer
- var item *util.ListItem[util.Buffer]
- // 包含转发
- if hlsConfig.RelayMode != 0 {
- if v.res.ContentLength < 0 {
- item = bytesPool.GetShell(make([]byte, 0))
- } else {
- item = bytesPool.Get(int(v.res.ContentLength))
- item.Value = item.Value[:0]
- }
- tsBytes = &item.Value
- }
- switch hlsConfig.RelayMode {
- case 1:
- io.Copy(tsBytes, p.Reader)
- case 2:
- p.SetIO(io.TeeReader(p.Reader, tsBytes))
- fallthrough
- case 0:
- tsReader.Feed(p)
- }
- if hlsConfig.RelayMode != 0 {
- tsFilename := fmt.Sprintf("%d_%d.ts", ts, i)
- tsFilePath := p.StreamPath + "/" + tsFilename
- var plInfo = PlaylistInf{
- Title: p.Stream.StreamName + "/" + tsFilename,
- Duration: v.dur,
- FilePath: tsFilePath,
- }
- relayPlayList.WriteInf(plInfo)
- p.memoryTs.Add(tsFilePath, item)
- next := tsRing.Next()
- if next.Value != "" {
- item, _ := p.memoryTs.Delete(next.Value)
- if item == nil {
- p.Warn("memoryTs delete nil", zap.String("tsFilePath", next.Value))
- } else {
- item.Recycle()
- }
- }
- next.Value = tsFilePath
- tsRing = next
- }
- p.Close()
- } else if v.err != nil {
- HLSPlugin.Error("reqTs", zap.String("streamPath", p.Stream.Path), zap.Error(v.err))
- } else {
- HLSPlugin.Error("reqTs", zap.String("streamPath", p.Stream.Path))
- }
- HLSPlugin.Debug("finish download ts", zap.String("tsUrl", v.url.String()))
- }
- if hlsConfig.RelayMode != 0 {
- m3u8 := string(plBuffer)
- HLSPlugin.Debug("write m3u8", zap.String("streamPath", p.Stream.Path), zap.String("m3u8", m3u8))
- memoryM3u8.Store(p.Stream.Path, m3u8)
- }
- } else {
- HLSPlugin.Error("readM3u8", zap.String("streamPath", p.Stream.Path), zap.Error(err2))
- errcount++
- if errcount > 10 {
- return err2
- }
- }
- }
- return
- }
- func (p *HLSPuller) Connect() (err error) {
- p.Video.Req, err = http.NewRequest("GET", p.RemoteURL, nil)
- return
- }
- func (p *HLSPuller) Disconnect() {
- p.Stop(zap.String("reason", "disconnect"))
- }
- func (p *HLSPuller) Pull() error {
- return p.pull(&p.Video)
- }
|