memory-ts.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package engine
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "m7s.live/engine/v4/codec"
  8. "m7s.live/engine/v4/codec/mpegts"
  9. "m7s.live/engine/v4/util"
  10. )
  11. type MemoryTs struct {
  12. util.BytesPool
  13. PMT util.Buffer
  14. util.BLL
  15. }
  16. func (ts *MemoryTs) WritePMTPacket(audio codec.AudioCodecID, video codec.VideoCodecID) {
  17. ts.PMT.Reset()
  18. mpegts.WritePMTPacket(&ts.PMT, video, audio)
  19. }
  20. func (ts *MemoryTs) WriteTo(w io.Writer) (int64, error) {
  21. w.Write(mpegts.DefaultPATPacket)
  22. w.Write(ts.PMT)
  23. return ts.BLL.WriteTo(w)
  24. }
  25. func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.MpegTsPESPacket) (err error) {
  26. if packet.Header.PacketStartCodePrefix != 0x000001 {
  27. err = errors.New("packetStartCodePrefix != 0x000001")
  28. return
  29. }
  30. pesHeadItem := ts.Get(32)
  31. pesHeadItem.Value.Reset()
  32. _, err = mpegts.WritePESHeader(&pesHeadItem.Value, packet.Header)
  33. if err != nil {
  34. return
  35. }
  36. pesBuffers := append(net.Buffers{pesHeadItem.Value}, packet.Buffers...)
  37. defer pesHeadItem.Recycle()
  38. pesPktLength := util.SizeOfBuffers(pesBuffers)
  39. buffer := ts.Get((pesPktLength/mpegts.TS_PACKET_SIZE+1)*6 + pesPktLength)
  40. bwTsHeader := &buffer.Value
  41. bigLen := bwTsHeader.Len()
  42. bwTsHeader.Reset()
  43. ts.BLL.Push(buffer)
  44. var tsHeaderLength int
  45. for i := 0; len(pesBuffers) > 0; i++ {
  46. if bigLen < mpegts.TS_PACKET_SIZE {
  47. if i == 0 {
  48. ts.Recycle()
  49. }
  50. headerItem := ts.Get(mpegts.TS_PACKET_SIZE)
  51. ts.BLL.Push(headerItem)
  52. bwTsHeader = &headerItem.Value
  53. bwTsHeader.Reset()
  54. }
  55. bigLen -= mpegts.TS_PACKET_SIZE
  56. pesPktLength = util.SizeOfBuffers(pesBuffers)
  57. tsHeader := mpegts.MpegTsHeader{
  58. SyncByte: 0x47,
  59. TransportErrorIndicator: 0,
  60. PayloadUnitStartIndicator: 0,
  61. TransportPriority: 0,
  62. Pid: frame.Pid,
  63. TransportScramblingControl: 0,
  64. AdaptionFieldControl: 1,
  65. ContinuityCounter: frame.ContinuityCounter,
  66. }
  67. frame.ContinuityCounter++
  68. frame.ContinuityCounter = frame.ContinuityCounter % 16
  69. // 每一帧的开头,当含有pcr的时候,包含调整字段
  70. if i == 0 {
  71. tsHeader.PayloadUnitStartIndicator = 1
  72. // 当PCRFlag为1的时候,包含调整字段
  73. if frame.IsKeyFrame {
  74. tsHeader.AdaptionFieldControl = 0x03
  75. tsHeader.AdaptationFieldLength = 7
  76. tsHeader.PCRFlag = 1
  77. tsHeader.RandomAccessIndicator = 1
  78. tsHeader.ProgramClockReferenceBase = frame.ProgramClockReferenceBase
  79. }
  80. }
  81. // 每一帧的结尾,当不满足188个字节的时候,包含调整字段
  82. if pesPktLength < mpegts.TS_PACKET_SIZE-4 {
  83. var tsStuffingLength uint8
  84. tsHeader.AdaptionFieldControl = 0x03
  85. tsHeader.AdaptationFieldLength = uint8(mpegts.TS_PACKET_SIZE - 4 - 1 - pesPktLength)
  86. // TODO:如果第一个TS包也是最后一个TS包,是不是需要考虑这个情况?
  87. // MpegTsHeader最少占6个字节.(前4个走字节 + AdaptationFieldLength(1 byte) + 3个指示符5个标志位(1 byte))
  88. if tsHeader.AdaptationFieldLength >= 1 {
  89. tsStuffingLength = tsHeader.AdaptationFieldLength - 1
  90. } else {
  91. tsStuffingLength = 0
  92. }
  93. // error
  94. tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
  95. if err != nil {
  96. return
  97. }
  98. if tsStuffingLength > 0 {
  99. if _, err = bwTsHeader.Write(mpegts.Stuffing[:tsStuffingLength]); err != nil {
  100. return
  101. }
  102. }
  103. tsHeaderLength += int(tsStuffingLength)
  104. } else {
  105. tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
  106. if err != nil {
  107. return
  108. }
  109. }
  110. tsPayloadLength := mpegts.TS_PACKET_SIZE - tsHeaderLength
  111. //fmt.Println("tsPayloadLength :", tsPayloadLength)
  112. // 这里不断的减少PES包
  113. io.CopyN(bwTsHeader, &pesBuffers, int64(tsPayloadLength))
  114. // tmp := tsHeaderByte[3] << 2
  115. // tmp = tmp >> 6
  116. // if tmp == 2 {
  117. // fmt.Println("fuck you mother.")
  118. // }
  119. tsPktByteLen := bwTsHeader.Len()
  120. if tsPktByteLen != (i+1)*mpegts.TS_PACKET_SIZE && tsPktByteLen != mpegts.TS_PACKET_SIZE {
  121. err = errors.New(fmt.Sprintf("%s, packet size=%d", "TS_PACKET_SIZE != 188,", tsPktByteLen))
  122. return
  123. }
  124. }
  125. return nil
  126. }
  127. func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error) {
  128. // packetLength = 原始音频流长度 + adts(7) + MpegTsOptionalPESHeader长度(8 bytes, 因为只含有pts)
  129. var packet mpegts.MpegTsPESPacket
  130. if frame.CodecID == codec.CodecID_AAC {
  131. packet.Header.PesPacketLength = uint16(7 + frame.AUList.ByteLength + 8)
  132. packet.Buffers = frame.GetADTS()
  133. } else {
  134. packet.Header.PesPacketLength = uint16(frame.AUList.ByteLength + 8)
  135. packet.Buffers = frame.AUList.ToBuffers()
  136. }
  137. packet.Header.PacketStartCodePrefix = 0x000001
  138. packet.Header.ConstTen = 0x80
  139. packet.Header.StreamID = mpegts.STREAM_ID_AUDIO
  140. packet.Header.Pts = uint64(frame.PTS)
  141. pes.ProgramClockReferenceBase = packet.Header.Pts
  142. packet.Header.PtsDtsFlags = 0x80
  143. packet.Header.PesHeaderDataLength = 5
  144. return ts.WritePESPacket(pes, packet)
  145. }
  146. func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error) {
  147. var buffer net.Buffers
  148. //需要对原始数据(ES),进行一些预处理,视频需要分割nalu(H264编码),并且打上sps,pps,nalu_aud信息.
  149. if len(frame.ParamaterSets) == 2 {
  150. buffer = append(buffer, codec.NALU_AUD_BYTE)
  151. } else {
  152. buffer = append(buffer, codec.AudNalu)
  153. }
  154. buffer = append(buffer, frame.GetAnnexB()...)
  155. pktLength := util.SizeOfBuffers(buffer) + 10 + 3
  156. if pktLength > 0xffff {
  157. pktLength = 0
  158. }
  159. var packet mpegts.MpegTsPESPacket
  160. packet.Header.PacketStartCodePrefix = 0x000001
  161. packet.Header.ConstTen = 0x80
  162. packet.Header.StreamID = mpegts.STREAM_ID_VIDEO
  163. packet.Header.PesPacketLength = uint16(pktLength)
  164. packet.Header.Pts = uint64(frame.PTS)
  165. pes.ProgramClockReferenceBase = packet.Header.Pts
  166. packet.Header.Dts = uint64(frame.DTS)
  167. packet.Header.PtsDtsFlags = 0xC0
  168. packet.Header.PesHeaderDataLength = 10
  169. packet.Buffers = buffer
  170. return ts.WritePESPacket(pes, packet)
  171. }