publisher.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package engine
  2. import (
  3. "go.uber.org/zap"
  4. "m7s.live/engine/v4/codec"
  5. "m7s.live/engine/v4/common"
  6. "m7s.live/engine/v4/config"
  7. "m7s.live/engine/v4/track"
  8. "m7s.live/engine/v4/util"
  9. )
  10. type IPublisher interface {
  11. common.IPuber
  12. GetPublisher() *Publisher
  13. }
  14. var _ IPublisher = (*Publisher)(nil)
  15. type Publisher struct {
  16. IO
  17. Config *config.Publish
  18. common.AudioTrack `json:"-" yaml:"-"`
  19. common.VideoTrack `json:"-" yaml:"-"`
  20. }
  21. func (p *Publisher) Publish(streamPath string, pub common.IPuber) error {
  22. return p.receive(streamPath, pub)
  23. }
  24. func (p *Publisher) GetPublisher() *Publisher {
  25. return p
  26. }
  27. // func (p *Publisher) Stop(reason ...zapcore.Field) {
  28. // p.IO.Stop(reason...)
  29. // p.Stream.Receive(ACTION_PUBLISHCLOSE)
  30. // }
  31. func (p *Publisher) GetAudioTrack() common.AudioTrack {
  32. return p.AudioTrack
  33. }
  34. func (p *Publisher) GetVideoTrack() common.VideoTrack {
  35. return p.VideoTrack
  36. }
  37. func (p *Publisher) GetConfig() *config.Publish {
  38. return p.Config
  39. }
  40. // func (p *Publisher) OnEvent(event any) {
  41. // p.IO.OnEvent(event)
  42. // switch event.(type) {
  43. // case SEclose, SEKick:
  44. // p.AudioTrack = nil
  45. // p.VideoTrack = nil
  46. // }
  47. // }
  48. func (p *Publisher) CreateAudioTrack(codecID codec.AudioCodecID, stuff ...any) common.AudioTrack {
  49. switch codecID {
  50. case codec.CodecID_AAC:
  51. p.AudioTrack = track.NewAAC(p, stuff...)
  52. case codec.CodecID_PCMA:
  53. p.AudioTrack = track.NewG711(p, true, stuff...)
  54. case codec.CodecID_PCMU:
  55. p.AudioTrack = track.NewG711(p, false, stuff...)
  56. case codec.CodecID_OPUS:
  57. p.AudioTrack = track.NewOpus(p, stuff...)
  58. }
  59. return p.AudioTrack
  60. }
  61. func (p *Publisher) CreateVideoTrack(codecID codec.VideoCodecID, stuff ...any) common.VideoTrack {
  62. switch codecID {
  63. case codec.CodecID_H264:
  64. p.VideoTrack = track.NewH264(p, stuff...)
  65. case codec.CodecID_H265:
  66. p.VideoTrack = track.NewH265(p, stuff...)
  67. case codec.CodecID_AV1:
  68. p.VideoTrack = track.NewAV1(p, stuff...)
  69. }
  70. return p.VideoTrack
  71. }
  72. func (p *Publisher) WriteAVCCVideo(ts uint32, frame *util.BLL, pool util.BytesPool) {
  73. if frame.ByteLength < 6 {
  74. return
  75. }
  76. if p.VideoTrack == nil {
  77. b0 := frame.GetByte(0)
  78. // https://github.com/veovera/enhanced-rtmp/blob/main/enhanced-rtmp-v1.pdf
  79. if isExtHeader := b0 & 0b1000_0000; isExtHeader != 0 {
  80. fourCC := frame.GetUintN(1, 4)
  81. switch fourCC {
  82. case codec.FourCC_H265_32:
  83. p.VideoTrack = track.NewH265(p, pool)
  84. p.VideoTrack.WriteAVCC(ts, frame)
  85. case codec.FourCC_AV1_32:
  86. p.VideoTrack = track.NewAV1(p, pool)
  87. p.VideoTrack.WriteAVCC(ts, frame)
  88. }
  89. } else {
  90. if frame.GetByte(1) == 0 {
  91. ts = 0
  92. p.CreateVideoTrack(codec.VideoCodecID(b0&0x0F), pool)
  93. if p.VideoTrack == nil {
  94. p.Stream.Error("video codecID not support", zap.Uint8("codeId", uint8(codec.VideoCodecID(b0&0x0F))))
  95. return
  96. }
  97. p.VideoTrack.WriteAVCC(ts, frame)
  98. } else {
  99. p.Stream.Warn("need sequence frame")
  100. }
  101. }
  102. } else {
  103. p.VideoTrack.WriteAVCC(ts, frame)
  104. }
  105. }
  106. func (p *Publisher) WriteAVCCAudio(ts uint32, frame *util.BLL, pool util.BytesPool) {
  107. if frame.ByteLength < 4 {
  108. return
  109. }
  110. if p.AudioTrack == nil {
  111. b0 := frame.GetByte(0)
  112. t := p.CreateAudioTrack(codec.AudioCodecID(b0>>4), pool)
  113. switch a := t.(type) {
  114. case *track.AAC:
  115. if frame.GetByte(1) != 0 {
  116. return
  117. }
  118. a.AVCCHead = []byte{frame.GetByte(0), 1}
  119. a.WriteAVCC(0, frame)
  120. case *track.G711:
  121. a.Audio.SampleRate = uint32(codec.SoundRate[(b0&0x0c)>>2])
  122. if b0&0x02 == 0 {
  123. a.Audio.SampleSize = 8
  124. }
  125. a.Channels = b0&0x01 + 1
  126. a.AVCCHead = []byte{b0}
  127. a.WriteAVCC(ts, frame)
  128. default:
  129. p.Stream.Error("audio codec not support yet", zap.Uint8("codecId", uint8(codec.AudioCodecID(b0>>4))))
  130. }
  131. } else {
  132. p.AudioTrack.WriteAVCC(ts, frame)
  133. }
  134. }