write.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. package hls
  2. import (
  3. "container/ring"
  4. "fmt"
  5. "math"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "go.uber.org/zap"
  11. . "m7s.live/engine/v4"
  12. "m7s.live/engine/v4/codec"
  13. "m7s.live/engine/v4/codec/mpegts"
  14. "m7s.live/engine/v4/track"
  15. "m7s.live/engine/v4/util"
  16. )
  17. var memoryTs util.Map[string, interface {
  18. GetTs(key string) util.Recyclable
  19. }]
  20. var memoryM3u8 sync.Map
  21. var pools sync.Pool
  22. func init() {
  23. pools.New = func() any {
  24. return make(util.BytesPool, 20)
  25. }
  26. }
  27. type TrackReader struct {
  28. sync.RWMutex
  29. M3u8 util.Buffer
  30. pes *mpegts.MpegtsPESFrame
  31. ts *MemoryTs
  32. *track.AVRingReader
  33. write_time time.Duration
  34. m3u8Name string
  35. hls_segment_count uint32 // hls segment count
  36. playlist Playlist
  37. infoRing *ring.Ring
  38. hls_playlist_count uint32
  39. hls_segment_window uint32
  40. }
  41. func (tr *TrackReader) init(hls *HLSWriter, media *track.Media, pid uint16) {
  42. tr.ts = &MemoryTs{
  43. BytesPool: hls.pool,
  44. }
  45. tr.pes = &mpegts.MpegtsPESFrame{
  46. Pid: pid,
  47. }
  48. tr.hls_segment_window = uint32(hlsConfig.Window) + 1
  49. tr.infoRing = ring.New(int(tr.hls_segment_window))
  50. tr.m3u8Name = hls.Stream.Path + "/" + media.Name
  51. tr.AVRingReader = hls.CreateTrackReader(media)
  52. tr.playlist = Playlist{
  53. Writer: &tr.M3u8,
  54. Version: 3,
  55. Sequence: 0,
  56. Targetduration: int(hlsConfig.Fragment / time.Millisecond / 666), // hlsFragment * 1.5 / 1000
  57. }
  58. }
  59. type AudioTrackReader struct {
  60. TrackReader
  61. *track.Audio
  62. }
  63. type VideoTrackReader struct {
  64. TrackReader
  65. *track.Video
  66. }
  67. type HLSWriter struct {
  68. pool util.BytesPool
  69. audio_tracks []*AudioTrackReader
  70. video_tracks []*VideoTrackReader
  71. Subscriber
  72. memoryTs util.Map[string, util.Recyclable]
  73. lastReadTime time.Time
  74. }
  75. func (hls *HLSWriter) GetTs(key string) util.Recyclable {
  76. hls.lastReadTime = time.Now()
  77. return hls.memoryTs.Get(key)
  78. }
  79. func (hls *HLSWriter) Start(streamPath string) {
  80. hls.pool = pools.Get().(util.BytesPool)
  81. if err := HLSPlugin.Subscribe(streamPath, hls); err != nil {
  82. HLSPlugin.Error("HLS Subscribe", zap.Error(err))
  83. return
  84. }
  85. streamPath = strings.Split(streamPath, "?")[0]
  86. memoryTs.Add(streamPath, hls)
  87. hls.ReadTrack()
  88. memoryTs.Delete(streamPath)
  89. hls.memoryTs.Range(func(k string, v util.Recyclable) {
  90. v.Recycle()
  91. })
  92. pools.Put(hls.pool)
  93. memoryM3u8.Delete(streamPath)
  94. for _, t := range hls.video_tracks {
  95. memoryM3u8.Delete(t.m3u8Name)
  96. }
  97. for _, t := range hls.audio_tracks {
  98. memoryM3u8.Delete(t.m3u8Name)
  99. }
  100. if !hlsConfig.Preload {
  101. writingMap.Delete(streamPath)
  102. }
  103. }
  104. func (hls *HLSWriter) ReadTrack() {
  105. var defaultAudio *AudioTrackReader
  106. var defaultVideo *VideoTrackReader
  107. for _, t := range hls.video_tracks {
  108. if defaultVideo == nil {
  109. defaultVideo = t
  110. break
  111. }
  112. }
  113. for _, t := range hls.audio_tracks {
  114. if defaultAudio == nil {
  115. defaultAudio = t
  116. if defaultVideo != nil {
  117. for t.IDRing == nil && !hls.IsClosed() {
  118. time.Sleep(time.Millisecond * 10)
  119. }
  120. t.Ring = t.IDRing
  121. } else {
  122. t.Ring = t.Track.Ring
  123. }
  124. break
  125. }
  126. }
  127. var audioGroup string
  128. m3u8 := `#EXTM3U
  129. #EXT-X-VERSION:3`
  130. if defaultAudio != nil {
  131. audioGroup = `,AUDIO="audio"`
  132. m3u8 += fmt.Sprintf(`
  133. #EXT-X-MEDIA:TYPE=AUDIO,GROUP-ID="audio",NAME="%s",DEFAULT=YES,AUTOSELECT=YES,URI="%s/%s.m3u8?sub=1"`, defaultAudio.Track.Name, hls.Stream.StreamName, defaultAudio.Track.Name)
  134. }
  135. if defaultVideo != nil {
  136. m3u8 += fmt.Sprintf(`
  137. #EXT-X-STREAM-INF:BANDWIDTH=2962000,NAME="%s",RESOLUTION=%dx%d%s
  138. %s/%s.m3u8?sub=1`, defaultVideo.Track.Name, defaultVideo.Width, defaultVideo.Height, audioGroup, hls.Stream.StreamName, defaultVideo.Track.Name)
  139. }
  140. // 存一个默认的m3u8
  141. memoryM3u8.Store(hls.Stream.Path, m3u8)
  142. for hls.IO.Err() == nil {
  143. for _, t := range hls.video_tracks {
  144. for {
  145. frame, err := t.TryRead()
  146. if err != nil {
  147. return
  148. }
  149. if frame == nil {
  150. break
  151. }
  152. if frame.IFrame {
  153. t.TrackReader.frag(hls, frame.Timestamp)
  154. }
  155. t.pes.IsKeyFrame = frame.IFrame
  156. if err = t.ts.WriteVideoFrame(VideoFrame{frame, t.Video, t.AbsTime, uint32(frame.PTS), uint32(frame.DTS)}, t.pes); err != nil {
  157. return
  158. }
  159. }
  160. }
  161. for _, t := range hls.audio_tracks {
  162. for {
  163. frame, err := t.TryRead()
  164. if err != nil {
  165. return
  166. }
  167. if frame == nil {
  168. break
  169. }
  170. t.TrackReader.frag(hls, frame.Timestamp)
  171. t.pes.IsKeyFrame = false
  172. if err = t.ts.WriteAudioFrame(AudioFrame{frame, t.Audio, t.AbsTime, uint32(frame.PTS), uint32(frame.DTS)}, t.pes); err != nil {
  173. return
  174. }
  175. }
  176. }
  177. time.Sleep(time.Millisecond * 10)
  178. if !hlsConfig.Preload && !hls.lastReadTime.IsZero() && time.Since(hls.lastReadTime) > time.Second*15 {
  179. hls.Stop(zap.String("reason", "no reader after 15s"))
  180. }
  181. }
  182. }
  183. func (t *TrackReader) frag(hls *HLSWriter, ts time.Duration) (err error) {
  184. streamPath := hls.Stream.Path
  185. // 当前的时间戳减去上一个ts切片的时间戳
  186. if dur := ts - t.write_time; dur >= hlsConfig.Fragment {
  187. // fmt.Println("time :", video.Timestamp, tsSegmentTimestamp)
  188. if dur == ts && t.write_time == 0 {//时间戳不对的情况,首个默认为2s
  189. dur = time.Duration(2) * time.Second
  190. }
  191. num := uint32(t.hls_segment_count)
  192. tsFilename := t.Track.Name + strconv.FormatInt(time.Now().Unix(), 10) + "_" + strconv.FormatUint(uint64(num), 10) + ".ts"
  193. tsFilePath := streamPath + "/" + tsFilename
  194. // println(hls.currentTs.Length)
  195. t.ts = &MemoryTs{
  196. BytesPool: t.ts.BytesPool,
  197. PMT: t.ts.PMT,
  198. }
  199. hls.memoryTs.Store(tsFilePath, t.ts)
  200. if t.playlist.Targetduration < int(dur.Seconds()) {
  201. t.playlist.Targetduration = int(math.Ceil(dur.Seconds()))
  202. }
  203. if t.M3u8.Len() == 0 {
  204. t.playlist.Init()
  205. }
  206. inf := PlaylistInf{
  207. //浮点计算精度
  208. Duration: dur.Seconds(),
  209. Title: tsFilename,
  210. FilePath: tsFilePath,
  211. }
  212. t.Lock()
  213. defer t.Unlock()
  214. if t.hls_segment_count > 0 {
  215. if t.hls_playlist_count >= uint32(hlsConfig.Window) {
  216. t.M3u8.Reset()
  217. if err = t.playlist.Init(); err != nil {
  218. return
  219. }
  220. //playlist起点是ring.next,长度是len(ring)-1
  221. for p := t.infoRing.Next(); p != t.infoRing; p = p.Next() {
  222. t.playlist.WriteInf(p.Value.(PlaylistInf))
  223. }
  224. } else {
  225. if err = t.playlist.WriteInf(t.infoRing.Prev().Value.(PlaylistInf)); err != nil {
  226. return
  227. }
  228. }
  229. memoryM3u8.LoadOrStore(t.m3u8Name, t)
  230. t.hls_playlist_count++
  231. }
  232. if t.hls_segment_count >= t.hls_segment_window {
  233. if mts, loaded := hls.memoryTs.Delete(t.infoRing.Value.(PlaylistInf).FilePath); loaded {
  234. mts.Recycle()
  235. }
  236. t.infoRing.Value = inf
  237. t.infoRing = t.infoRing.Next()
  238. } else {
  239. t.infoRing.Value = inf
  240. t.infoRing = t.infoRing.Next()
  241. }
  242. t.hls_segment_count++
  243. t.write_time = ts
  244. }
  245. return
  246. }
  247. func (hls *HLSWriter) OnEvent(event any) {
  248. switch v := event.(type) {
  249. case *track.Video:
  250. track := &VideoTrackReader{
  251. Video: v,
  252. }
  253. track.init(hls, &v.Media, mpegts.PID_VIDEO)
  254. track.ts.WritePMTPacket(0, v.CodecID)
  255. track.Ring = track.IDRing
  256. hls.video_tracks = append(hls.video_tracks, track)
  257. case *track.Audio:
  258. if v.CodecID != codec.CodecID_AAC {
  259. return
  260. }
  261. track := &AudioTrackReader{
  262. Audio: v,
  263. }
  264. track.init(hls, &v.Media, mpegts.PID_AUDIO)
  265. track.ts.WritePMTPacket(v.CodecID, 0)
  266. hls.audio_tracks = append(hls.audio_tracks, track)
  267. default:
  268. hls.Subscriber.OnEvent(event)
  269. }
  270. }