write.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. package hls
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "go.uber.org/zap"
  11. . "m7s.live/engine/v4"
  12. "m7s.live/engine/v4/codec"
  13. "m7s.live/engine/v4/codec/mpegts"
  14. "m7s.live/engine/v4/track"
  15. "m7s.live/engine/v4/util"
  16. )
  17. var memoryTs util.Map[string, interface {
  18. GetTs(key string) util.Recyclable
  19. }]
  20. var memoryM3u8 sync.Map
  21. var pools sync.Pool
  22. func init() {
  23. pools.New = func() any {
  24. return make(util.BytesPool, 20)
  25. }
  26. }
  27. type TrackReader struct {
  28. sync.RWMutex
  29. M3u8 util.Buffer
  30. pes *mpegts.MpegtsPESFrame
  31. ts *MemoryTs
  32. *track.AVRingReader
  33. write_time time.Duration
  34. m3u8Name string
  35. hls_segment_count uint32 // hls segment count
  36. playlist Playlist
  37. infoRing *ring.Ring
  38. hls_playlist_count uint32
  39. hls_segment_window uint32
  40. }
  41. func (tr *TrackReader) init(hls *HLSWriter, media *track.Media, pid uint16) {
  42. tr.ts = &MemoryTs{
  43. BytesPool: hls.pool,
  44. }
  45. tr.pes = &mpegts.MpegtsPESFrame{
  46. Pid: pid,
  47. }
  48. tr.hls_segment_window = uint32(hlsConfig.Window) + 1
  49. tr.infoRing = ring.New(int(tr.hls_segment_window))
  50. tr.m3u8Name = hls.Stream.Path + "/" + media.Name
  51. tr.AVRingReader = hls.CreateTrackReader(media)
  52. tr.playlist = Playlist{
  53. Writer: &tr.M3u8,
  54. Version: 3,
  55. Sequence: 0,
  56. Targetduration: int(hlsConfig.Fragment / time.Millisecond / 666), // hlsFragment * 1.5 / 1000
  57. }
  58. }
  59. type AudioTrackReader struct {
  60. TrackReader
  61. *track.Audio
  62. }
  63. type VideoTrackReader struct {
  64. TrackReader
  65. *track.Video
  66. }
  67. type HLSWriter struct {
  68. pool util.BytesPool
  69. audio_tracks []*AudioTrackReader
  70. video_tracks []*VideoTrackReader
  71. Subscriber
  72. memoryTs util.Map[string, util.Recyclable]
  73. lastReadTime time.Time
  74. }
  75. func (hls *HLSWriter) GetTs(key string) util.Recyclable {
  76. hls.lastReadTime = time.Now()
  77. return hls.memoryTs.Get(key)
  78. }
  79. func (hls *HLSWriter) Start(streamPath string) {
  80. hls.pool = pools.Get().(util.BytesPool)
  81. if err := HLSPlugin.Subscribe(streamPath, hls); err != nil {
  82. HLSPlugin.Error("HLS Subscribe", zap.Error(err))
  83. return
  84. }
  85. streamPath = strings.Split(streamPath, "?")[0]
  86. memoryTs.Add(streamPath, hls)
  87. hls.ReadTrack()
  88. memoryTs.Delete(streamPath)
  89. hls.memoryTs.Range(func(k string, v util.Recyclable) {
  90. v.Recycle()
  91. })
  92. pools.Put(hls.pool)
  93. memoryM3u8.Delete(streamPath)
  94. for _, t := range hls.video_tracks {
  95. memoryM3u8.Delete(t.m3u8Name)
  96. }
  97. for _, t := range hls.audio_tracks {
  98. memoryM3u8.Delete(t.m3u8Name)
  99. }
  100. if !hlsConfig.Preload {
  101. writingMap.Delete(streamPath)
  102. }
  103. }
  104. func (hls *HLSWriter) ReadTrack() {
  105. var defaultAudio *AudioTrackReader
  106. var defaultVideo *VideoTrackReader
  107. if len(hls.video_tracks) > 0 {
  108. defaultVideo = hls.video_tracks[0]
  109. }
  110. if len(hls.audio_tracks) > 0 {
  111. defaultAudio = hls.audio_tracks[0]
  112. if defaultVideo != nil {
  113. for defaultAudio.IDRing == nil && !hls.IsClosed() {
  114. time.Sleep(time.Millisecond * 10)
  115. }
  116. defaultAudio.Ring = defaultAudio.IDRing
  117. } else {
  118. defaultAudio.Ring = defaultAudio.Track.Ring
  119. }
  120. }
  121. var audioGroup string
  122. m3u8 := `#EXTM3U
  123. #EXT-X-VERSION:3`
  124. if defaultAudio != nil {
  125. audioGroup = `,AUDIO="audio"`
  126. m3u8 += fmt.Sprintf(`
  127. #EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="audio",NAME="%s",DEFAULT=YES,AUTOSELECT=YES,URI="%s/%s.m3u8?sub=1"`, defaultAudio.Track.Name, hls.Stream.StreamName, defaultAudio.Track.Name)
  128. }
  129. if defaultVideo != nil {
  130. m3u8 += fmt.Sprintf(`
  131. #EXT-X-STREAM-INF:BANDWIDTH=2962000,NAME="%s",RESOLUTION=%dx%d%s
  132. %s/%s.m3u8?sub=1`, defaultVideo.Track.Name, defaultVideo.Width, defaultVideo.Height, audioGroup, hls.Stream.StreamName, defaultVideo.Track.Name)
  133. }
  134. // 存一个默认的m3u8
  135. memoryM3u8.Store(hls.Stream.Path, m3u8)
  136. for hls.IO.Err() == nil {
  137. for _, t := range hls.video_tracks {
  138. for {
  139. frame, err := t.TryRead()
  140. if err != nil {
  141. return
  142. }
  143. if frame == nil {
  144. break
  145. }
  146. if frame.IFrame {
  147. t.TrackReader.frag(hls, frame.Timestamp)
  148. }
  149. t.pes.IsKeyFrame = frame.IFrame
  150. if err = t.ts.WriteVideoFrame(VideoFrame{frame, t.Video, t.AbsTime, uint32(frame.PTS), uint32(frame.DTS)}, t.pes); err != nil {
  151. return
  152. }
  153. }
  154. }
  155. for _, t := range hls.audio_tracks {
  156. for {
  157. frame, err := t.TryRead()
  158. if err != nil {
  159. return
  160. }
  161. if frame == nil {
  162. break
  163. }
  164. t.TrackReader.frag(hls, frame.Timestamp)
  165. t.pes.IsKeyFrame = false
  166. if err = t.ts.WriteAudioFrame(AudioFrame{frame, t.Audio, t.AbsTime, uint32(frame.PTS), uint32(frame.DTS)}, t.pes); err != nil {
  167. return
  168. }
  169. }
  170. }
  171. time.Sleep(time.Millisecond * 10)
  172. if !hlsConfig.Preload && !hls.lastReadTime.IsZero() && time.Since(hls.lastReadTime) > time.Second*15 {
  173. hls.Stop(zap.String("reason", "no reader after 15s"))
  174. }
  175. }
  176. }
  177. func (t *TrackReader) frag(hls *HLSWriter, ts time.Duration) (err error) {
  178. streamPath := hls.Stream.Path
  179. // 当前的时间戳减去上一个ts切片的时间戳
  180. if dur := ts - t.write_time; dur >= hlsConfig.Fragment {
  181. // fmt.Println("time :", video.Timestamp, tsSegmentTimestamp)
  182. if dur == ts && t.write_time == 0 { //时间戳不对的情况,首个默认为2s
  183. dur = time.Duration(2) * time.Second
  184. }
  185. num := uint32(t.hls_segment_count)
  186. tsFilename := t.Track.Name + strconv.FormatInt(time.Now().Unix(), 10) + "_" + strconv.FormatUint(uint64(num), 10) + ".ts"
  187. tsFilePath := streamPath + "/" + tsFilename
  188. // println(hls.currentTs.Length)
  189. t.ts = &MemoryTs{
  190. BytesPool: t.ts.BytesPool,
  191. PMT: t.ts.PMT,
  192. }
  193. HLSPlugin.Debug("write ts", zap.String("tsFilePath", tsFilePath))
  194. hls.memoryTs.Store(tsFilePath, t.ts)
  195. if t.playlist.Targetduration < int(dur.Seconds()) {
  196. t.playlist.Targetduration = int(math.Ceil(dur.Seconds()))
  197. }
  198. if t.M3u8.Len() == 0 {
  199. t.playlist.Init()
  200. }
  201. inf := PlaylistInf{
  202. //浮点计算精度
  203. Duration: dur.Seconds(),
  204. Title: tsFilename,
  205. FilePath: tsFilePath,
  206. }
  207. t.Lock()
  208. defer t.Unlock()
  209. if t.hls_segment_count > 0 {
  210. if t.hls_playlist_count >= uint32(hlsConfig.Window) {
  211. t.M3u8.Reset()
  212. if err = t.playlist.Init(); err != nil {
  213. return
  214. }
  215. //playlist起点是ring.next,长度是len(ring)-1
  216. for p := t.infoRing.Next(); p != t.infoRing; p = p.Next() {
  217. t.playlist.WriteInf(p.Value.(PlaylistInf))
  218. }
  219. } else {
  220. if err = t.playlist.WriteInf(t.infoRing.Prev().Value.(PlaylistInf)); err != nil {
  221. return
  222. }
  223. }
  224. memoryM3u8.LoadOrStore(t.m3u8Name, t)
  225. t.hls_playlist_count++
  226. }
  227. if t.hls_segment_count >= t.hls_segment_window {
  228. if mts, loaded := hls.memoryTs.Delete(t.infoRing.Value.(PlaylistInf).FilePath); loaded {
  229. mts.Recycle()
  230. }
  231. t.infoRing.Value = inf
  232. t.infoRing = t.infoRing.Next()
  233. } else {
  234. t.infoRing.Value = inf
  235. t.infoRing = t.infoRing.Next()
  236. }
  237. t.hls_segment_count++
  238. t.write_time = ts
  239. }
  240. return
  241. }
  242. func (hls *HLSWriter) OnEvent(event any) {
  243. switch v := event.(type) {
  244. case *track.Video:
  245. track := &VideoTrackReader{
  246. Video: v,
  247. }
  248. track.init(hls, &v.Media, mpegts.PID_VIDEO)
  249. track.ts.WritePMTPacket(0, v.CodecID)
  250. track.Ring = track.IDRing
  251. hls.video_tracks = append(hls.video_tracks, track)
  252. case *track.Audio:
  253. if v.CodecID != codec.CodecID_AAC {
  254. return
  255. }
  256. track := &AudioTrackReader{
  257. Audio: v,
  258. }
  259. track.init(hls, &v.Media, mpegts.PID_AUDIO)
  260. track.ts.WritePMTPacket(v.CodecID, 0)
  261. hls.audio_tracks = append(hls.audio_tracks, track)
  262. default:
  263. hls.Subscriber.OnEvent(event)
  264. }
  265. }