subscriber.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package record
  2. import (
  3. "bufio"
  4. "io"
  5. "path/filepath"
  6. "strconv"
  7. "time"
  8. //"fmt"
  9. "go.uber.org/zap"
  10. . "m7s.live/engine/v4"
  11. )
  12. type IRecorder interface {
  13. ISubscriber
  14. GetRecorder() *Recorder
  15. Start(streamPath string) error
  16. io.Closer
  17. CreateFile() (FileWr, error)
  18. }
  19. type Recorder struct {
  20. Subscriber
  21. SkipTS uint32
  22. Record `json:"-" yaml:"-"`
  23. File FileWr `json:"-" yaml:"-"`
  24. FileName string // 自定义文件名,分段录像无效
  25. filePath string // 文件路径
  26. append bool // 是否追加模式
  27. }
  28. func (r *Recorder) GetRecorder() *Recorder {
  29. return r
  30. }
  31. func (r *Recorder) CreateFile() (f FileWr, err error) {
  32. //fmt.Println("00000000000000000000 CREATE FILE:",r.Stream.Path)
  33. r.filePath = r.getFileName(r.Stream.Path) + r.Ext
  34. f, err = r.CreateFileFn(r.filePath, r.append)
  35. logFields := []zap.Field{zap.String("path", r.filePath)}
  36. if fw, ok := f.(*FileWriter); ok && r.Config != nil {
  37. if r.Config.WriteBufferSize > 0 {
  38. logFields = append(logFields, zap.Int("bufferSize", r.Config.WriteBufferSize))
  39. fw.bufw = bufio.NewWriterSize(fw.Writer, r.Config.WriteBufferSize)
  40. fw.Writer = fw.bufw
  41. }
  42. }
  43. if err == nil {
  44. r.Info("create file", logFields...)
  45. } else {
  46. logFields = append(logFields, zap.Error(err))
  47. r.Error("create file", logFields...)
  48. }
  49. return
  50. }
  51. func (r *Recorder) getFileName(streamPath string) (filename string) {
  52. filename = streamPath
  53. if r.Fragment == 0 {
  54. if r.FileName != "" {
  55. filename = filepath.Join(filename, r.FileName)
  56. }
  57. } else {
  58. filename = filepath.Join(filename, strconv.FormatInt(time.Now().Unix(), 10))
  59. }
  60. return
  61. }
  62. func (r *Recorder) start(re IRecorder, streamPath string, subType byte) (err error) {
  63. //fmt.Println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSS")
  64. err = plugin.Subscribe(streamPath, re)
  65. //fmt.Println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSS:",err)
  66. if err == nil {
  67. if _, loaded := RecordPluginConfig.recordings.LoadOrStore(r.ID, re); loaded {
  68. //fmt.Println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSS:",ErrRecordExist)
  69. return ErrRecordExist
  70. }
  71. r.Closer = re
  72. go func() {
  73. //fmt.Println("SSSSSSSSSSSSSSSSSSSSSSSSSSSSS play block:")
  74. r.PlayBlock(subType)
  75. RecordPluginConfig.recordings.Delete(r.ID)
  76. }()
  77. }
  78. return
  79. }
  80. func (r *Recorder) cut(absTime uint32) {
  81. if ts := absTime - r.SkipTS; time.Duration(ts)*time.Millisecond >= r.Fragment {
  82. //fmt.Println("111111111111111111 cut new file:",absTime,r.Fragment)
  83. r.SkipTS = absTime
  84. r.Close()
  85. if file, err := r.Spesific.(IRecorder).CreateFile(); err == nil {
  86. r.File = file
  87. r.Spesific.OnEvent(file)
  88. } else {
  89. r.Stop(zap.Error(err))
  90. }
  91. }
  92. }
  93. // func (r *Recorder) Stop(reason ...zap.Field) {
  94. // r.Close()
  95. // r.Subscriber.Stop(reason...)
  96. // }
  97. func (r *Recorder) OnEvent(event any) {
  98. switch v := event.(type) {
  99. case IRecorder:
  100. //fmt.Println("ONENVET IRecorder 111111111111111111111111111:", *r)
  101. if file, err := r.Spesific.(IRecorder).CreateFile(); err == nil {
  102. r.File = file
  103. r.Spesific.OnEvent(file)
  104. } else {
  105. r.Stop(zap.Error(err))
  106. }
  107. case AudioFrame:
  108. // 纯音频流的情况下需要切割文件
  109. if r.Fragment > 0 && r.VideoReader == nil {
  110. r.cut(v.AbsTime)
  111. }
  112. case VideoFrame:
  113. //.Println("record on video frame aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa:")
  114. if r.Fragment > 0 && v.IFrame {
  115. //fmt.Println("ONENVET CUT 111111111111111111111111111:",r.Fragment,v.IFrame,"rrrrrrrrrrr:",*r )
  116. r.cut(v.AbsTime)
  117. }
  118. default:
  119. r.Subscriber.OnEvent(event)
  120. }
  121. }