main.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package hls // import "m7s.live/plugin/hls/v4"
  2. import (
  3. "embed"
  4. "fmt"
  5. "log"
  6. "math"
  7. "net/http"
  8. "os"
  9. "path"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "go.uber.org/zap"
  15. . "m7s.live/engine/v4"
  16. "m7s.live/engine/v4/config"
  17. "m7s.live/engine/v4/util"
  18. )
  19. //go:embed hls.js
  20. var hls_js embed.FS
  21. //go:embed default.ts
  22. var defaultTS []byte
  23. //go:embed default.yaml
  24. var defaultYaml DefaultYaml
  25. var defaultSeq = 0 // 默认片头的全局序号
  26. var writing = make(map[string]*HLSWriter) // preload 使用
  27. var writingMap sync.Map // 非preload使用
  28. var hlsConfig = &HLSConfig{}
  29. var HLSPlugin = InstallPlugin(hlsConfig, defaultYaml)
  30. type HLSConfig struct {
  31. config.HTTP
  32. config.Publish
  33. config.Pull
  34. config.Subscribe
  35. Fragment time.Duration `default:"2s" desc:"ts分片大小"`
  36. Window int `default:"3" desc:"m3u8窗口大小(包含ts的数量)"`
  37. Filter config.Regexp `desc:"用于过滤的正则表达式"` // 过滤,正则表达式
  38. Path string `desc:"保存 ts 文件的路径"`
  39. DefaultTS string `desc:"默认的ts文件"` // 默认的ts文件
  40. DefaultTSDuration time.Duration `desc:"默认的ts文件时长"` // 默认的ts文件时长
  41. RelayMode int `desc:"转发模式(转协议会消耗资源)" enum:"0:只转协议,1:纯转发,2:转协议+转发"` // 转发模式,0:转协议+不转发,1:不转协议+转发,2:转协议+转发
  42. Preload bool `desc:"是否预加载,提高响应速度"` // 是否预加载,提高响应速度
  43. }
  44. func (c *HLSConfig) OnEvent(event any) {
  45. switch v := event.(type) {
  46. case FirstConfig:
  47. if !c.Preload {
  48. c.Internal = false // 如何不预加载,则为非内部订阅
  49. }
  50. for streamPath, url := range c.PullOnStart {
  51. if err := HLSPlugin.Pull(streamPath, url, new(HLSPuller), 0); err != nil {
  52. HLSPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
  53. }
  54. }
  55. if c.DefaultTS != "" {
  56. ts, err := os.ReadFile(c.DefaultTS)
  57. if err == nil {
  58. defaultTS = ts
  59. } else {
  60. log.Panic("read default ts error")
  61. }
  62. } else {
  63. c.DefaultTSDuration = time.Second * 388 / 100
  64. }
  65. if c.DefaultTSDuration == 0 {
  66. log.Panic("default ts duration error")
  67. } else {
  68. go func() {
  69. ticker := time.NewTicker(c.DefaultTSDuration)
  70. for range ticker.C {
  71. defaultSeq++
  72. }
  73. }()
  74. }
  75. case SEclose:
  76. if c.Preload {
  77. delete(writing, v.Target.Path)
  78. } else {
  79. writingMap.Delete(v.Target.Path)
  80. }
  81. case SEpublish:
  82. if c.Preload {
  83. if writing[v.Target.Path] == nil && (!c.Filter.Valid() || c.Filter.MatchString(v.Target.Path)) {
  84. if _, ok := v.Target.Publisher.(*HLSPuller); !ok || c.RelayMode == 0 {
  85. var outStream HLSWriter
  86. writing[v.Target.Path] = &outStream
  87. go outStream.Start(v.Target.Path)
  88. }
  89. }
  90. }
  91. case InvitePublish: //按需拉流
  92. if remoteURL := c.CheckPullOnSub(v.Target); remoteURL != "" {
  93. if err := HLSPlugin.Pull(v.Target, remoteURL, new(HLSPuller), 0); err != nil {
  94. HLSPlugin.Error("pull", zap.String("streamPath", v.Target), zap.String("url", remoteURL), zap.Error(err))
  95. }
  96. }
  97. }
  98. }
  99. func (config *HLSConfig) API_List(w http.ResponseWriter, r *http.Request) {
  100. util.ReturnFetchValue(FilterStreams[*HLSPuller], w, r)
  101. }
  102. // 处于拉流时,可以调用这个API将拉流的TS文件保存下来,这个http如果断开,则停止保存
  103. func (config *HLSConfig) API_Save(w http.ResponseWriter, r *http.Request) {
  104. streamPath := r.URL.Query().Get("streamPath")
  105. if s := Streams.Get(streamPath); s != nil {
  106. if hls, ok := s.Publisher.(*HLSPuller); ok {
  107. hls.SaveContext = r.Context()
  108. <-hls.SaveContext.Done()
  109. }
  110. }
  111. }
  112. func (config *HLSConfig) API_Pull(w http.ResponseWriter, r *http.Request) {
  113. targetURL := r.URL.Query().Get("target")
  114. streamPath := r.URL.Query().Get("streamPath")
  115. save, _ := strconv.Atoi(r.URL.Query().Get("save"))
  116. if err := HLSPlugin.Pull(streamPath, targetURL, new(HLSPuller), save); err != nil {
  117. util.ReturnError(util.APIErrorQueryParse, err.Error(), w, r)
  118. } else {
  119. util.ReturnOK(w, r)
  120. }
  121. }
  122. func (config *HLSConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  123. fileName := strings.TrimPrefix(r.URL.Path, "/")
  124. query := r.URL.Query()
  125. waitTimeout, err := time.ParseDuration(query.Get("timeout"))
  126. if err == nil {
  127. HLSPlugin.Debug("request", zap.String("fileName", fileName), zap.Duration("timeout", waitTimeout))
  128. } else if !config.Preload {
  129. waitTimeout = time.Second * 10
  130. }
  131. waitStart := time.Now()
  132. if strings.HasSuffix(r.URL.Path, ".m3u8") {
  133. w.Header().Add("Content-Type", "application/vnd.apple.mpegurl")
  134. for {
  135. if v, ok := memoryM3u8.Load(strings.TrimSuffix(fileName, ".m3u8")); ok {
  136. switch hls := v.(type) {
  137. case *TrackReader:
  138. hls.RLock()
  139. w.Write(hls.M3u8)
  140. hls.RUnlock()
  141. return
  142. case string:
  143. fmt.Fprint(w, strings.Replace(hls, "?sub=1", util.Conditoinal(waitTimeout > 0, fmt.Sprintf("?sub=1&timeout=%s", waitTimeout), ""), -1))
  144. return
  145. // if _, ok := memoryM3u8.Load(hls); ok {
  146. // ss := strings.Split(hls, "/")
  147. // m3u8 := fmt.Sprintf(`#EXTM3U
  148. // #EXT-X-VERSION:3
  149. // #EXT-X-STREAM-INF:BANDWIDTH=2560000
  150. // %s/%s.m3u8
  151. // `, ss[len(ss)-2], ss[len(ss)-1])
  152. // w.Write([]byte(m3u8))
  153. // return
  154. // }
  155. }
  156. }
  157. if waitTimeout > 0 && time.Since(waitStart) < waitTimeout {
  158. if query.Get("sub") == "" {
  159. streamPath := strings.TrimSuffix(fileName, ".m3u8")
  160. if !config.Preload {
  161. writer, loaded := writingMap.LoadOrStore(streamPath, new(HLSWriter))
  162. if !loaded {
  163. outStream := writer.(*HLSWriter)
  164. go outStream.Start(streamPath + "?" + r.URL.RawQuery)
  165. }
  166. } else {
  167. TryInvitePublish(streamPath)
  168. }
  169. }
  170. time.Sleep(time.Second)
  171. continue
  172. } else {
  173. break
  174. }
  175. }
  176. w.Write([]byte(fmt.Sprintf(`#EXTM3U
  177. #EXT-X-VERSION:3
  178. #EXT-X-MEDIA-SEQUENCE:%d
  179. #EXT-X-TARGETDURATION:%d
  180. #EXT-X-DISCONTINUITY-SEQUENCE:%d
  181. #EXT-X-DISCONTINUITY
  182. #EXTINF:%.3f,
  183. default.ts`, defaultSeq, int(math.Ceil(config.DefaultTSDuration.Seconds())), defaultSeq, config.DefaultTSDuration.Seconds())))
  184. } else if strings.HasSuffix(r.URL.Path, ".ts") {
  185. w.Header().Add("Content-Type", "video/mp2t") //video/mp2t
  186. streamPath := path.Dir(fileName)
  187. for {
  188. tsData := memoryTs.Get(streamPath)
  189. if tsData == nil {
  190. tsData = memoryTs.Get(path.Dir(streamPath))
  191. if tsData == nil {
  192. if waitTimeout > 0 && time.Since(waitStart) < waitTimeout {
  193. time.Sleep(time.Second)
  194. continue
  195. } else {
  196. w.Write(defaultTS)
  197. return
  198. }
  199. }
  200. }
  201. for {
  202. if tsData := tsData.GetTs(fileName); tsData != nil {
  203. switch v := tsData.(type) {
  204. case *MemoryTs:
  205. v.WriteTo(w)
  206. case *util.ListItem[util.Buffer]:
  207. w.Write(v.Value)
  208. }
  209. return
  210. } else {
  211. if waitTimeout > 0 && time.Since(waitStart) < waitTimeout {
  212. time.Sleep(time.Second)
  213. continue
  214. } else {
  215. w.Write(defaultTS)
  216. return
  217. }
  218. }
  219. }
  220. }
  221. } else {
  222. f, err := hls_js.ReadFile("hls.js/" + fileName)
  223. if err != nil {
  224. http.Error(w, err.Error(), http.StatusNotFound)
  225. } else {
  226. w.Write(f)
  227. }
  228. // if file, err := hls_js.Open(fileName); err == nil {
  229. // defer file.Close()
  230. // if info, err := file.Stat(); err == nil {
  231. // http.ServeContent(w, r, fileName, info.ModTime(), file)
  232. // }
  233. // } else {
  234. // http.NotFound(w, r)
  235. // }
  236. }
  237. }