media.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package rtmp
  2. import (
  3. "errors"
  4. "net"
  5. "runtime"
  6. "go.uber.org/zap"
  7. . "m7s.live/engine/v4"
  8. "m7s.live/engine/v4/common"
  9. )
  10. type AVSender struct {
  11. *RTMPSender
  12. ChunkHeader
  13. firstSent bool
  14. }
  15. func (av *AVSender) sendSequenceHead(seqHead []byte) {
  16. av.SetTimestamp(0)
  17. av.MessageLength = uint32(len(seqHead))
  18. for !av.writing.CompareAndSwap(false, true) {
  19. runtime.Gosched()
  20. }
  21. defer av.writing.Store(false)
  22. if av.firstSent {
  23. av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
  24. } else {
  25. av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
  26. }
  27. av.sendChunk(seqHead)
  28. }
  29. func (av *AVSender) sendFrame(frame *common.AVFrame, absTime uint32) (err error) {
  30. seq := frame.Sequence
  31. payloadLen := frame.AVCC.ByteLength
  32. if payloadLen == 0 {
  33. err := errors.New("payload is empty")
  34. av.Error("payload is empty", zap.Error(err))
  35. return err
  36. }
  37. if av.writeSeqNum > av.bandwidth {
  38. av.totalWrite += av.writeSeqNum
  39. av.writeSeqNum = 0
  40. av.SendMessage(RTMP_MSG_ACK, Uint32Message(av.totalWrite))
  41. av.SendStreamID(RTMP_USER_PING_REQUEST, 0)
  42. }
  43. av.MessageLength = uint32(payloadLen)
  44. for !av.writing.CompareAndSwap(false, true) {
  45. runtime.Gosched()
  46. }
  47. defer av.writing.Store(false)
  48. // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
  49. // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
  50. // 当Chunk Type为0时(即Chunk12),
  51. if !av.firstSent {
  52. av.firstSent = true
  53. av.SetTimestamp(absTime)
  54. av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
  55. } else {
  56. av.SetTimestamp(frame.DeltaTime)
  57. av.WriteTo(RTMP_CHUNK_HEAD_8, &av.chunkHeader)
  58. }
  59. //数据被覆盖导致序号变了
  60. if seq != frame.Sequence {
  61. return errors.New("sequence is not equal")
  62. }
  63. r := frame.AVCC.NewReader()
  64. chunk := net.Buffers{av.chunkHeader}
  65. av.writeSeqNum += uint32(av.chunkHeader.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
  66. for r.CanRead() {
  67. item := av.bytePool.Get(16)
  68. defer item.Recycle()
  69. av.WriteTo(RTMP_CHUNK_HEAD_1, &item.Value)
  70. // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
  71. chunk = append(chunk, item.Value)
  72. av.writeSeqNum += uint32(item.Value.Len() + r.WriteNTo(av.writeChunkSize, &chunk))
  73. }
  74. _, err = chunk.WriteTo(av.Conn)
  75. return nil
  76. }
  77. type RTMPSender struct {
  78. Subscriber
  79. NetStream
  80. audio, video AVSender
  81. }
  82. func (rtmp *RTMPSender) OnEvent(event any) {
  83. switch v := event.(type) {
  84. case SEwaitPublish:
  85. rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus)
  86. case SEpublish:
  87. rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus)
  88. case ISubscriber:
  89. rtmp.audio.RTMPSender = rtmp
  90. rtmp.video.RTMPSender = rtmp
  91. rtmp.audio.ChunkStreamID = RTMP_CSID_AUDIO
  92. rtmp.video.ChunkStreamID = RTMP_CSID_VIDEO
  93. rtmp.audio.MessageTypeID = RTMP_MSG_AUDIO
  94. rtmp.video.MessageTypeID = RTMP_MSG_VIDEO
  95. rtmp.audio.MessageStreamID = rtmp.StreamID
  96. rtmp.video.MessageStreamID = rtmp.StreamID
  97. case AudioDeConf:
  98. rtmp.audio.sendSequenceHead(v)
  99. case VideoDeConf:
  100. rtmp.video.sendSequenceHead(v)
  101. case AudioFrame:
  102. if err := rtmp.audio.sendFrame(v.AVFrame, v.AbsTime); err != nil {
  103. rtmp.Stop(zap.Error(err))
  104. }
  105. case VideoFrame:
  106. if err := rtmp.video.sendFrame(v.AVFrame, v.AbsTime); err != nil {
  107. rtmp.Stop(zap.Error(err))
  108. }
  109. default:
  110. rtmp.Subscriber.OnEvent(event)
  111. }
  112. }
  113. func (r *RTMPSender) Response(tid uint64, code, level string) error {
  114. m := new(ResponsePlayMessage)
  115. m.CommandName = Response_OnStatus
  116. m.TransactionId = tid
  117. m.Infomation = map[string]any{
  118. "code": code,
  119. "level": level,
  120. "description": "",
  121. }
  122. m.StreamID = r.StreamID
  123. return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
  124. }
  125. type RTMPReceiver struct {
  126. Publisher
  127. NetStream
  128. }
  129. func (r *RTMPReceiver) OnEvent(event any) {
  130. switch event.(type) {
  131. case IPublisher:
  132. if r.AudioTrack != nil {
  133. r.AudioTrack.SetStuff(r.bytePool)
  134. }
  135. if r.VideoTrack != nil {
  136. r.VideoTrack.SetStuff(r.bytePool)
  137. }
  138. }
  139. r.Publisher.OnEvent(event)
  140. }
  141. func (r *RTMPReceiver) Response(tid uint64, code, level string) error {
  142. m := new(ResponsePublishMessage)
  143. m.CommandName = Response_OnStatus
  144. m.TransactionId = tid
  145. m.Infomation = map[string]any{
  146. "code": code,
  147. "level": level,
  148. "description": "",
  149. }
  150. m.StreamID = r.StreamID
  151. return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
  152. }
  153. func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) {
  154. if r.AudioTrack == nil {
  155. r.WriteAVCCAudio(0, &msg.AVData, r.bytePool)
  156. return
  157. }
  158. r.AudioTrack.WriteAVCC(msg.ExtendTimestamp, &msg.AVData)
  159. }
  160. func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) {
  161. if r.VideoTrack == nil {
  162. r.WriteAVCCVideo(0, &msg.AVData, r.bytePool)
  163. return
  164. }
  165. r.VideoTrack.WriteAVCC(msg.ExtendTimestamp, &msg.AVData)
  166. }