flv.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package record
  2. import (
  3. "io"
  4. "net"
  5. "os"
  6. "path/filepath"
  7. "strconv"
  8. "time"
  9. "go.uber.org/zap"
  10. . "m7s.live/engine/v4"
  11. "m7s.live/engine/v4/codec"
  12. "m7s.live/engine/v4/util"
  13. )
  14. type FLVRecorder struct {
  15. Recorder
  16. filepositions []uint64
  17. times []float64
  18. Offset int64
  19. duration int64
  20. }
  21. func (r *FLVRecorder) Start(streamPath string) (err error) {
  22. r.Record = &RecordPluginConfig.Flv
  23. r.ID = streamPath + "/flv"
  24. if _, ok := RecordPluginConfig.recordings.Load(r.ID); ok {
  25. return ErrRecordExist
  26. }
  27. return plugin.Subscribe(streamPath, r)
  28. }
  29. func (r *FLVRecorder) start() {
  30. RecordPluginConfig.recordings.Store(r.ID, r)
  31. r.PlayFLV()
  32. RecordPluginConfig.recordings.Delete(r.ID)
  33. }
  34. func (r *FLVRecorder) writeMetaData(file *os.File, duration int64) {
  35. defer file.Close()
  36. at, vt := r.Audio, r.Video
  37. hasAudio, hasVideo := at != nil, vt != nil
  38. var amf util.AMF
  39. metaData := util.EcmaArray{
  40. "MetaDataCreator": "m7s " + Engine.Version,
  41. "hasVideo": hasVideo,
  42. "hasAudio": hasAudio,
  43. "hasMatadata": true,
  44. "canSeekToEnd": false,
  45. "duration": float64(duration) / 1000,
  46. "hasKeyFrames": len(r.filepositions) > 0,
  47. "filesize": 0,
  48. }
  49. var flags byte
  50. if hasAudio {
  51. flags |= (1 << 2)
  52. metaData["audiocodecid"] = int(at.CodecID)
  53. metaData["audiosamplerate"] = at.SampleRate
  54. metaData["audiosamplesize"] = at.SampleSize
  55. metaData["stereo"] = at.Channels == 2
  56. }
  57. if hasVideo {
  58. flags |= 1
  59. metaData["videocodecid"] = int(vt.CodecID)
  60. metaData["width"] = vt.SPSInfo.Width
  61. metaData["height"] = vt.SPSInfo.Height
  62. metaData["framerate"] = vt.FPS
  63. metaData["videodatarate"] = vt.BPS
  64. metaData["keyframes"] = map[string]any{
  65. "filepositions": r.filepositions,
  66. "times": r.times,
  67. }
  68. defer func() {
  69. r.filepositions = []uint64{0}
  70. r.times = []float64{0}
  71. }()
  72. }
  73. amf.Marshals("onMetaData", metaData)
  74. offset := amf.Len() + len(codec.FLVHeader) + 15
  75. if keyframesCount := len(r.filepositions); keyframesCount > 0 {
  76. metaData["filesize"] = uint64(offset) + r.filepositions[keyframesCount-1]
  77. for i := range r.filepositions {
  78. r.filepositions[i] += uint64(offset)
  79. }
  80. metaData["keyframes"] = map[string]any{
  81. "filepositions": r.filepositions,
  82. "times": r.times,
  83. }
  84. }
  85. if tempFile, err := os.CreateTemp("", "*.flv"); err != nil {
  86. r.Error("create temp file failed: ", zap.Error(err))
  87. return
  88. } else {
  89. defer func() {
  90. tempFile.Close()
  91. os.Remove(tempFile.Name())
  92. r.Info("writeMetaData success")
  93. }()
  94. _, err := tempFile.Write([]byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9, 0, 0, 0, 0})
  95. if err != nil {
  96. r.Error("", zap.Error(err))
  97. return
  98. }
  99. amf.Reset()
  100. marshals := amf.Marshals("onMetaData", metaData)
  101. codec.WriteFLVTag(tempFile, codec.FLV_TAG_TYPE_SCRIPT, 0, marshals)
  102. _, err = file.Seek(int64(len(codec.FLVHeader)), io.SeekStart)
  103. if err != nil {
  104. r.Error("writeMetaData Seek failed: ", zap.Error(err))
  105. return
  106. }
  107. _, err = io.Copy(tempFile, file)
  108. if err != nil {
  109. r.Error("writeMetaData Copy failed: ", zap.Error(err))
  110. return
  111. }
  112. tempFile.Seek(0, io.SeekStart)
  113. file.Seek(0, io.SeekStart)
  114. _, err = io.Copy(file, tempFile)
  115. if err != nil {
  116. r.Error("writeMetaData Copy failed: ", zap.Error(err))
  117. return
  118. }
  119. }
  120. }
  121. func (r *FLVRecorder) OnEvent(event any) {
  122. switch v := event.(type) {
  123. case ISubscriber:
  124. filename := strconv.FormatInt(time.Now().Unix(), 10) + r.Ext
  125. if r.Fragment == 0 {
  126. filename = r.Stream.Path + r.Ext
  127. } else {
  128. filename = filepath.Join(r.Stream.Path, filename)
  129. }
  130. if file, err := r.CreateFileFn(filename, r.append); err == nil {
  131. r.SetIO(file)
  132. // 写入文件头
  133. if !r.append {
  134. r.Write(codec.FLVHeader)
  135. } else {
  136. if _, err = file.Seek(-4, io.SeekEnd); err != nil {
  137. r.Error("seek file failed", zap.Error(err))
  138. r.Write(codec.FLVHeader)
  139. } else {
  140. tmp := make(util.Buffer, 4)
  141. tmp2 := tmp
  142. file.Read(tmp)
  143. tagSize := tmp.ReadUint32()
  144. tmp = tmp2
  145. file.Seek(int64(tagSize), io.SeekEnd)
  146. file.Read(tmp2)
  147. ts := tmp2.ReadUint24() | (uint32(tmp[3]) << 24)
  148. r.Info("append flv", zap.String("filename", filename), zap.Uint32("last tagSize", tagSize), zap.Uint32("last ts", ts))
  149. if r.VideoReader != nil {
  150. r.VideoReader.StartTs = time.Duration(ts) * time.Millisecond
  151. }
  152. if r.AudioReader != nil {
  153. r.AudioReader.StartTs = time.Duration(ts) * time.Millisecond
  154. }
  155. file.Seek(0, io.SeekEnd)
  156. }
  157. }
  158. go r.start()
  159. } else {
  160. r.Error("create file failed", zap.Error(err))
  161. r.Stop()
  162. }
  163. case FLVFrame:
  164. check := false
  165. var absTime uint32
  166. if r.VideoReader == nil {
  167. check = true
  168. absTime = r.AudioReader.AbsTime
  169. } else if v.IsVideo() {
  170. check = r.VideoReader.Value.IFrame
  171. absTime = r.VideoReader.AbsTime
  172. if check {
  173. r.filepositions = append(r.filepositions, uint64(r.Offset))
  174. r.times = append(r.times, float64(absTime)/1000)
  175. }
  176. }
  177. if r.duration = int64(absTime); r.Fragment > 0 && check && time.Duration(r.duration)*time.Millisecond >= r.Fragment {
  178. r.Close()
  179. r.Offset = 0
  180. if file, err := r.CreateFileFn(filepath.Join(r.Stream.Path, strconv.FormatInt(time.Now().Unix(), 10)+r.Ext), false); err == nil {
  181. r.SetIO(file)
  182. r.Write(codec.FLVHeader)
  183. var dcflv net.Buffers
  184. if r.VideoReader != nil {
  185. r.VideoReader.ResetAbsTime()
  186. dcflv = codec.VideoAVCC2FLV(0, r.VideoReader.Track.SequenceHead)
  187. flv := append(dcflv, codec.VideoAVCC2FLV(0, r.VideoReader.Value.AVCC.ToBuffers()...)...)
  188. flv.WriteTo(r)
  189. }
  190. if r.AudioReader != nil {
  191. r.AudioReader.ResetAbsTime()
  192. if r.Audio.CodecID == codec.CodecID_AAC {
  193. dcflv = codec.AudioAVCC2FLV(0, r.AudioReader.Track.SequenceHead)
  194. }
  195. flv := append(dcflv, codec.AudioAVCC2FLV(0, r.AudioReader.Value.AVCC.ToBuffers()...)...)
  196. flv.WriteTo(r)
  197. }
  198. return
  199. }
  200. }
  201. if n, err := v.WriteTo(r); err != nil {
  202. r.Error("write file failed", zap.Error(err))
  203. r.Stop()
  204. } else {
  205. r.Offset += n
  206. }
  207. default:
  208. r.Subscriber.OnEvent(event)
  209. }
  210. }
  211. func (r *FLVRecorder) SetIO(file any) {
  212. r.Subscriber.SetIO(file)
  213. r.Closer = r
  214. }
  215. func (r *FLVRecorder) Close() error {
  216. if file, ok := r.Writer.(*os.File); ok && !r.append {
  217. go r.writeMetaData(file, r.duration)
  218. } else if closer, ok := r.Writer.(io.Closer); ok {
  219. return closer.Close()
  220. }
  221. return nil
  222. }