llhls.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package hls
  2. import (
  3. "net"
  4. "net/http"
  5. "path"
  6. "strings"
  7. "time"
  8. "github.com/bluenviron/gohlslib"
  9. "github.com/bluenviron/gohlslib/pkg/codecs"
  10. "github.com/bluenviron/mediacommon/pkg/codecs/mpeg4audio"
  11. "go.uber.org/zap"
  12. . "m7s.live/engine/v4"
  13. "m7s.live/engine/v4/codec"
  14. "m7s.live/engine/v4/config"
  15. "m7s.live/engine/v4/track"
  16. "m7s.live/engine/v4/util"
  17. )
  18. var llhlsConfig = &LLHLSConfig{
  19. DefaultYaml: defaultYaml,
  20. }
  21. var LLHLSPlugin = InstallPlugin(llhlsConfig)
  22. var llwriting util.Map[string, *LLMuxer]
  23. type LLHLSConfig struct {
  24. DefaultYaml
  25. config.HTTP
  26. config.Publish
  27. // config.Pull
  28. config.Subscribe
  29. Filter string // 过滤,正则表达式
  30. Path string
  31. }
  32. func (c *LLHLSConfig) OnEvent(event any) {
  33. switch v := event.(type) {
  34. case SEpublish:
  35. if !llwriting.Has(v.Target.Path) {
  36. var outStream LLMuxer
  37. llwriting.Set(v.Target.Path, &outStream)
  38. go outStream.Start(v.Target)
  39. }
  40. }
  41. }
  42. func (c *LLHLSConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  43. streamPath := strings.TrimPrefix(r.URL.Path, "/")
  44. streamPath = path.Dir(streamPath)
  45. if llwriting.Has(streamPath) {
  46. r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+streamPath)
  47. llwriting.Get(streamPath).Handle(w, r)
  48. return
  49. } else {
  50. w.Write([]byte(`<html><body><video src="/llhls/live/test/index.m3u8"></video></body></html>`))
  51. }
  52. }
  53. type LLVideoTrack struct {
  54. *track.AVRingReader
  55. *track.Video
  56. }
  57. type LLAudioTrack struct {
  58. *track.AVRingReader
  59. *track.Audio
  60. }
  61. type LLMuxer struct {
  62. *gohlslib.Muxer
  63. Subscriber
  64. audio_tracks []*LLAudioTrack
  65. video_tracks []*LLVideoTrack
  66. }
  67. func (ll *LLMuxer) OnEvent(event any) {
  68. var err error
  69. defer func() {
  70. if err != nil {
  71. ll.Stop(zap.Error(err))
  72. }
  73. }()
  74. switch v := event.(type) {
  75. case *track.Video:
  76. // track := ll.CreateTrackReader(&v.Media)
  77. ll.video_tracks = append(ll.video_tracks, &LLVideoTrack{
  78. Video: v,
  79. })
  80. case *track.Audio:
  81. if v.CodecID != codec.CodecID_AAC {
  82. return
  83. }
  84. ll.audio_tracks = append(ll.audio_tracks, &LLAudioTrack{
  85. Audio: v,
  86. })
  87. default:
  88. ll.Subscriber.OnEvent(event)
  89. }
  90. }
  91. func (ll *LLMuxer) Start(s *Stream) {
  92. if err := LLHLSPlugin.Subscribe(s.Path, ll); err != nil {
  93. LLHLSPlugin.Error("LL-HLS Subscribe", zap.Error(err))
  94. return
  95. }
  96. ll.Muxer = &gohlslib.Muxer{
  97. Variant: gohlslib.MuxerVariantLowLatency,
  98. SegmentCount: func() int {
  99. return 7
  100. }(),
  101. SegmentDuration: 1 * time.Second,
  102. }
  103. var defaultAudio *LLAudioTrack
  104. var defaultVideo *LLVideoTrack
  105. for _, t := range ll.video_tracks {
  106. if defaultVideo == nil {
  107. defaultVideo = t
  108. t.AVRingReader = ll.CreateTrackReader(&t.Video.Media)
  109. t.Ring = t.IDRing
  110. ll.Muxer.VideoTrack = &gohlslib.Track{}
  111. switch t.Video.CodecID {
  112. case codec.CodecID_H264:
  113. ll.Muxer.VideoTrack.Codec = &codecs.H264{
  114. SPS: t.Video.SPS,
  115. PPS: t.Video.PPS,
  116. }
  117. case codec.CodecID_H265:
  118. ll.Muxer.VideoTrack.Codec = &codecs.H265{
  119. SPS: t.Video.SPS,
  120. PPS: t.Video.PPS,
  121. VPS: t.Video.ParamaterSets[2],
  122. }
  123. }
  124. }
  125. }
  126. for _, t := range ll.audio_tracks {
  127. if defaultAudio == nil {
  128. defaultAudio = t
  129. t.AVRingReader = ll.CreateTrackReader(&t.Audio.Media)
  130. if defaultVideo != nil {
  131. for t.IDRing == nil && !ll.IsClosed() {
  132. time.Sleep(time.Millisecond * 10)
  133. }
  134. t.Ring = t.IDRing
  135. } else {
  136. t.Ring = t.Audio.Ring
  137. }
  138. ll.Muxer.AudioTrack = &gohlslib.Track{
  139. Codec: &codecs.MPEG4Audio{
  140. Config: mpeg4audio.Config{
  141. Type: 2,
  142. SampleRate: 44100,
  143. ChannelCount: 2,
  144. },
  145. },
  146. }
  147. }
  148. }
  149. ll.Muxer.Start()
  150. defer ll.Muxer.Close()
  151. now := time.Now()
  152. for ll.IO.Err() == nil {
  153. for defaultAudio != nil {
  154. frame, err := defaultAudio.TryRead()
  155. if err != nil {
  156. return
  157. }
  158. if frame == nil {
  159. break
  160. }
  161. audioFrame := AudioFrame{
  162. AVFrame: frame,
  163. }
  164. ll.Muxer.WriteMPEG4Audio(now.Add(frame.Timestamp-time.Second), frame.Timestamp, audioFrame.GetADTS())
  165. }
  166. for defaultVideo != nil {
  167. frame, err := defaultVideo.TryRead()
  168. if err != nil {
  169. return
  170. }
  171. if frame == nil {
  172. break
  173. }
  174. var aus net.Buffers
  175. if frame.IFrame {
  176. if len(defaultVideo.ParamaterSets[0]) == 0 {
  177. continue
  178. }
  179. aus = append(aus, defaultVideo.ParamaterSets...)
  180. }
  181. frame.AUList.Range(func(au *util.BLL) bool {
  182. auBytes := util.ConcatBuffers(au.ToBuffers())
  183. if len(auBytes) > 0 {
  184. aus = append(aus, auBytes)
  185. }
  186. return true
  187. })
  188. if len(aus) > 0 {
  189. ll.Muxer.WriteH26x(now.Add(frame.Timestamp-time.Second), frame.Timestamp, aus)
  190. }
  191. }
  192. time.Sleep(time.Millisecond * 10)
  193. }
  194. }