write.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. package hls
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "sync"
  8. "time"
  9. "go.uber.org/zap"
  10. . "m7s.live/engine/v4"
  11. "m7s.live/engine/v4/codec"
  12. "m7s.live/engine/v4/codec/mpegts"
  13. "m7s.live/engine/v4/track"
  14. "m7s.live/engine/v4/util"
  15. )
  16. var memoryTs util.Map[string, *util.Map[string, util.Recyclable]]
  17. var memoryM3u8 sync.Map
  18. var pools sync.Pool
  19. func init() {
  20. pools.New = func() any {
  21. return make(util.BytesPool, 20)
  22. }
  23. }
  24. type TrackReader struct {
  25. sync.RWMutex
  26. M3u8 util.Buffer
  27. pes *mpegts.MpegtsPESFrame
  28. ts *MemoryTs
  29. *track.AVRingReader
  30. write_time time.Duration
  31. m3u8Name string
  32. hls_segment_count uint32 // hls segment count
  33. playlist Playlist
  34. infoRing *ring.Ring
  35. }
  36. func (tr *TrackReader) init(hls *HLSWriter, media *track.Media, pid uint16) {
  37. tr.ts = &MemoryTs{
  38. BytesPool: hls.pool,
  39. }
  40. tr.pes = &mpegts.MpegtsPESFrame{
  41. Pid: pid,
  42. }
  43. tr.infoRing = ring.New(hlsConfig.Window)
  44. tr.m3u8Name = hls.Stream.Path + "/" + media.Name
  45. tr.AVRingReader = hls.CreateTrackReader(media)
  46. tr.playlist = Playlist{
  47. Writer: &tr.M3u8,
  48. Version: 3,
  49. Sequence: 0,
  50. Targetduration: int(hlsConfig.Fragment / time.Millisecond / 666), // hlsFragment * 1.5 / 1000
  51. }
  52. }
  53. type AudioTrackReader struct {
  54. TrackReader
  55. *track.Audio
  56. }
  57. type VideoTrackReader struct {
  58. TrackReader
  59. *track.Video
  60. }
  61. type HLSWriter struct {
  62. pool util.BytesPool
  63. audio_tracks []*AudioTrackReader
  64. video_tracks []*VideoTrackReader
  65. Subscriber
  66. memoryTs util.Map[string, util.Recyclable]
  67. }
  68. func (hls *HLSWriter) Start(r *Stream) {
  69. hls.pool = pools.Get().(util.BytesPool)
  70. memoryTs.Add(r.Path, &hls.memoryTs)
  71. if err := HLSPlugin.Subscribe(r.Path, hls); err != nil {
  72. HLSPlugin.Error("HLS Subscribe", zap.Error(err))
  73. return
  74. }
  75. hls.ReadTrack()
  76. memoryTs.Delete(r.Path)
  77. hls.memoryTs.Range(func(k string, v util.Recyclable) {
  78. v.Recycle()
  79. })
  80. pools.Put(hls.pool)
  81. memoryM3u8.Delete(r.Path)
  82. for _, t := range hls.video_tracks {
  83. memoryM3u8.Delete(t.m3u8Name)
  84. }
  85. for _, t := range hls.audio_tracks {
  86. memoryM3u8.Delete(t.m3u8Name)
  87. }
  88. }
  89. func (hls *HLSWriter) ReadTrack() {
  90. var defaultAudio *AudioTrackReader
  91. var defaultVideo *VideoTrackReader
  92. for _, t := range hls.video_tracks {
  93. if defaultVideo == nil {
  94. defaultVideo = t
  95. break
  96. }
  97. }
  98. for _, t := range hls.audio_tracks {
  99. if defaultAudio == nil {
  100. defaultAudio = t
  101. if defaultVideo != nil {
  102. for t.IDRing == nil && !hls.IsClosed() {
  103. time.Sleep(time.Millisecond * 10)
  104. }
  105. t.Ring = t.IDRing
  106. } else {
  107. t.Ring = t.Track.Ring
  108. }
  109. break
  110. }
  111. }
  112. var audioGroup string
  113. m3u8 := `#EXTM3U
  114. #EXT-X-VERSION:3`
  115. if defaultAudio != nil {
  116. audioGroup = `,AUDIO="audio"`
  117. m3u8 += fmt.Sprintf(`
  118. #EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="audio",NAME="%s",DEFAULT=YES,AUTOSELECT=YES,URI="%s/%s.m3u8"`, defaultAudio.Track.Name, hls.Stream.StreamName, defaultAudio.Track.Name)
  119. }
  120. if defaultVideo != nil {
  121. m3u8 += fmt.Sprintf(`
  122. #EXT-X-STREAM-INF:BANDWIDTH=2962000,NAME="%s",RESOLUTION=%dx%d%s
  123. %s/%s.m3u8`, defaultVideo.Track.Name, defaultVideo.Width, defaultVideo.Height, audioGroup, hls.Stream.StreamName, defaultVideo.Track.Name)
  124. }
  125. // 存一个默认的m3u8
  126. memoryM3u8.Store(hls.Stream.Path, m3u8)
  127. for hls.IO.Err() == nil {
  128. for _, t := range hls.video_tracks {
  129. for {
  130. frame, err := t.TryRead()
  131. if err != nil {
  132. return
  133. }
  134. if frame == nil {
  135. break
  136. }
  137. if frame.IFrame {
  138. t.TrackReader.frag(hls, frame.Timestamp)
  139. }
  140. t.pes.IsKeyFrame = frame.IFrame
  141. t.ts.WriteVideoFrame(VideoFrame{frame, t.Video, t.AbsTime, uint32(frame.PTS), uint32(frame.DTS)}, t.pes)
  142. }
  143. }
  144. for _, t := range hls.audio_tracks {
  145. for {
  146. frame, err := t.TryRead()
  147. if err != nil {
  148. return
  149. }
  150. if frame == nil {
  151. break
  152. }
  153. t.TrackReader.frag(hls, frame.Timestamp)
  154. t.pes.IsKeyFrame = false
  155. t.ts.WriteAudioFrame(AudioFrame{frame, t.Audio, t.AbsTime, uint32(frame.PTS), uint32(frame.DTS)}, t.pes)
  156. }
  157. }
  158. time.Sleep(time.Millisecond * 10)
  159. }
  160. }
  161. func (t *TrackReader) frag(hls *HLSWriter, ts time.Duration) (err error) {
  162. streamPath := hls.Stream.Path
  163. // 当前的时间戳减去上一个ts切片的时间戳
  164. if dur := ts - t.write_time; dur >= hlsConfig.Fragment {
  165. // fmt.Println("time :", video.Timestamp, tsSegmentTimestamp)
  166. tsFilename := t.Track.Name + strconv.FormatInt(time.Now().Unix(), 10) + ".ts"
  167. tsFilePath := streamPath + "/" + tsFilename
  168. hls.memoryTs.Store(tsFilePath, t.ts)
  169. // println(hls.currentTs.Length)
  170. t.ts = &MemoryTs{
  171. BytesPool: t.ts.BytesPool,
  172. PMT: t.ts.PMT,
  173. }
  174. if t.playlist.Targetduration < int(dur.Seconds()) {
  175. t.playlist.Targetduration = int(math.Ceil(dur.Seconds()))
  176. }
  177. if t.M3u8.Len() == 0 {
  178. t.playlist.Init()
  179. }
  180. inf := PlaylistInf{
  181. //浮点计算精度
  182. Duration: dur.Seconds(),
  183. Title: tsFilename,
  184. FilePath: tsFilePath,
  185. }
  186. t.Lock()
  187. defer t.Unlock()
  188. if t.hls_segment_count >= uint32(hlsConfig.Window) {
  189. t.M3u8.Reset()
  190. if err = t.playlist.Init(); err != nil {
  191. return
  192. }
  193. if mts, loaded := hls.memoryTs.Delete(t.infoRing.Value.(PlaylistInf).FilePath); loaded {
  194. mts.Recycle()
  195. }
  196. t.infoRing.Value = inf
  197. t.infoRing = t.infoRing.Next()
  198. t.infoRing.Do(func(i interface{}) {
  199. t.playlist.WriteInf(i.(PlaylistInf))
  200. })
  201. } else {
  202. t.infoRing.Value = inf
  203. t.infoRing = t.infoRing.Next()
  204. if err = t.playlist.WriteInf(inf); err != nil {
  205. return
  206. }
  207. }
  208. t.hls_segment_count++
  209. t.write_time = ts
  210. if t.playlist.tsCount > 0 {
  211. memoryM3u8.LoadOrStore(t.m3u8Name, t)
  212. }
  213. }
  214. return
  215. }
  216. func (hls *HLSWriter) OnEvent(event any) {
  217. var err error
  218. defer func() {
  219. if err != nil {
  220. hls.Stop(zap.Error(err))
  221. }
  222. }()
  223. switch v := event.(type) {
  224. case *track.Video:
  225. track := &VideoTrackReader{
  226. Video: v,
  227. }
  228. track.init(hls, &v.Media, mpegts.PID_VIDEO)
  229. track.ts.WritePMTPacket(0, v.CodecID)
  230. track.Ring = track.IDRing
  231. hls.video_tracks = append(hls.video_tracks, track)
  232. case *track.Audio:
  233. if v.CodecID != codec.CodecID_AAC {
  234. return
  235. }
  236. track := &AudioTrackReader{
  237. Audio: v,
  238. }
  239. track.init(hls, &v.Media, mpegts.PID_AUDIO)
  240. track.ts.WritePMTPacket(v.CodecID, 0)
  241. hls.audio_tracks = append(hls.audio_tracks, track)
  242. default:
  243. hls.Subscriber.OnEvent(event)
  244. }
  245. }