123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package engine
- import (
- "errors"
- "fmt"
- "io"
- "net"
- "m7s.live/engine/v4/codec"
- "m7s.live/engine/v4/codec/mpegts"
- "m7s.live/engine/v4/util"
- )
- type MemoryTs struct {
- util.BytesPool
- PMT util.Buffer
- util.BLL
- }
- func (ts *MemoryTs) WritePMTPacket(audio codec.AudioCodecID, video codec.VideoCodecID) {
- ts.PMT.Reset()
- mpegts.WritePMTPacket(&ts.PMT, video, audio)
- }
- func (ts *MemoryTs) WriteTo(w io.Writer) (int64, error) {
- w.Write(mpegts.DefaultPATPacket)
- w.Write(ts.PMT)
- return ts.BLL.WriteTo(w)
- }
- func (ts *MemoryTs) WritePESPacket(frame *mpegts.MpegtsPESFrame, packet mpegts.MpegTsPESPacket) (err error) {
- if packet.Header.PacketStartCodePrefix != 0x000001 {
- err = errors.New("packetStartCodePrefix != 0x000001")
- return
- }
- pesHeadItem := ts.Get(32)
- pesHeadItem.Value.Reset()
- _, err = mpegts.WritePESHeader(&pesHeadItem.Value, packet.Header)
- if err != nil {
- return
- }
- pesBuffers := append(net.Buffers{pesHeadItem.Value}, packet.Buffers...)
- defer pesHeadItem.Recycle()
- pesPktLength := util.SizeOfBuffers(pesBuffers)
- buffer := ts.Get((pesPktLength/mpegts.TS_PACKET_SIZE+1)*6 + pesPktLength)
- bwTsHeader := &buffer.Value
- bigLen := bwTsHeader.Len()
- bwTsHeader.Reset()
- ts.BLL.Push(buffer)
- var tsHeaderLength int
- for i := 0; len(pesBuffers) > 0; i++ {
- if bigLen < mpegts.TS_PACKET_SIZE {
- if i == 0 {
- ts.Recycle()
- }
- headerItem := ts.Get(mpegts.TS_PACKET_SIZE)
- ts.BLL.Push(headerItem)
- bwTsHeader = &headerItem.Value
- bwTsHeader.Reset()
- }
- bigLen -= mpegts.TS_PACKET_SIZE
- pesPktLength = util.SizeOfBuffers(pesBuffers)
- tsHeader := mpegts.MpegTsHeader{
- SyncByte: 0x47,
- TransportErrorIndicator: 0,
- PayloadUnitStartIndicator: 0,
- TransportPriority: 0,
- Pid: frame.Pid,
- TransportScramblingControl: 0,
- AdaptionFieldControl: 1,
- ContinuityCounter: frame.ContinuityCounter,
- }
- frame.ContinuityCounter++
- frame.ContinuityCounter = frame.ContinuityCounter % 16
- // 每一帧的开头,当含有pcr的时候,包含调整字段
- if i == 0 {
- tsHeader.PayloadUnitStartIndicator = 1
- // 当PCRFlag为1的时候,包含调整字段
- if frame.IsKeyFrame {
- tsHeader.AdaptionFieldControl = 0x03
- tsHeader.AdaptationFieldLength = 7
- tsHeader.PCRFlag = 1
- tsHeader.RandomAccessIndicator = 1
- tsHeader.ProgramClockReferenceBase = frame.ProgramClockReferenceBase
- }
- }
- // 每一帧的结尾,当不满足188个字节的时候,包含调整字段
- if pesPktLength < mpegts.TS_PACKET_SIZE-4 {
- var tsStuffingLength uint8
- tsHeader.AdaptionFieldControl = 0x03
- tsHeader.AdaptationFieldLength = uint8(mpegts.TS_PACKET_SIZE - 4 - 1 - pesPktLength)
- // TODO:如果第一个TS包也是最后一个TS包,是不是需要考虑这个情况?
- // MpegTsHeader最少占6个字节.(前4个走字节 + AdaptationFieldLength(1 byte) + 3个指示符5个标志位(1 byte))
- if tsHeader.AdaptationFieldLength >= 1 {
- tsStuffingLength = tsHeader.AdaptationFieldLength - 1
- } else {
- tsStuffingLength = 0
- }
- // error
- tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
- if err != nil {
- return
- }
- if tsStuffingLength > 0 {
- if _, err = bwTsHeader.Write(mpegts.Stuffing[:tsStuffingLength]); err != nil {
- return
- }
- }
- tsHeaderLength += int(tsStuffingLength)
- } else {
- tsHeaderLength, err = mpegts.WriteTsHeader(bwTsHeader, tsHeader)
- if err != nil {
- return
- }
- }
- tsPayloadLength := mpegts.TS_PACKET_SIZE - tsHeaderLength
- //fmt.Println("tsPayloadLength :", tsPayloadLength)
- // 这里不断的减少PES包
- io.CopyN(bwTsHeader, &pesBuffers, int64(tsPayloadLength))
- // tmp := tsHeaderByte[3] << 2
- // tmp = tmp >> 6
- // if tmp == 2 {
- // fmt.Println("fuck you mother.")
- // }
- tsPktByteLen := bwTsHeader.Len()
- if tsPktByteLen != (i+1)*mpegts.TS_PACKET_SIZE && tsPktByteLen != mpegts.TS_PACKET_SIZE {
- err = errors.New(fmt.Sprintf("%s, packet size=%d", "TS_PACKET_SIZE != 188,", tsPktByteLen))
- return
- }
- }
- return nil
- }
- func (ts *MemoryTs) WriteAudioFrame(frame AudioFrame, pes *mpegts.MpegtsPESFrame) (err error) {
- // packetLength = 原始音频流长度 + adts(7) + MpegTsOptionalPESHeader长度(8 bytes, 因为只含有pts)
- var packet mpegts.MpegTsPESPacket
- if frame.CodecID == codec.CodecID_AAC {
- packet.Header.PesPacketLength = uint16(7 + frame.AUList.ByteLength + 8)
- packet.Buffers = frame.GetADTS()
- } else {
- packet.Header.PesPacketLength = uint16(frame.AUList.ByteLength + 8)
- packet.Buffers = frame.AUList.ToBuffers()
- }
- packet.Header.PacketStartCodePrefix = 0x000001
- packet.Header.ConstTen = 0x80
- packet.Header.StreamID = mpegts.STREAM_ID_AUDIO
- packet.Header.Pts = uint64(frame.PTS)
- pes.ProgramClockReferenceBase = packet.Header.Pts
- packet.Header.PtsDtsFlags = 0x80
- packet.Header.PesHeaderDataLength = 5
- return ts.WritePESPacket(pes, packet)
- }
- func (ts *MemoryTs) WriteVideoFrame(frame VideoFrame, pes *mpegts.MpegtsPESFrame) (err error) {
- var buffer net.Buffers
- //需要对原始数据(ES),进行一些预处理,视频需要分割nalu(H264编码),并且打上sps,pps,nalu_aud信息.
- if len(frame.ParamaterSets) == 2 {
- buffer = append(buffer, codec.NALU_AUD_BYTE)
- } else {
- buffer = append(buffer, codec.AudNalu)
- }
- buffer = append(buffer, frame.GetAnnexB()...)
- pktLength := util.SizeOfBuffers(buffer) + 10 + 3
- if pktLength > 0xffff {
- pktLength = 0
- }
- var packet mpegts.MpegTsPESPacket
- packet.Header.PacketStartCodePrefix = 0x000001
- packet.Header.ConstTen = 0x80
- packet.Header.StreamID = mpegts.STREAM_ID_VIDEO
- packet.Header.PesPacketLength = uint16(pktLength)
- packet.Header.Pts = uint64(frame.PTS)
- pes.ProgramClockReferenceBase = packet.Header.Pts
- packet.Header.Dts = uint64(frame.DTS)
- packet.Header.PtsDtsFlags = 0xC0
- packet.Header.PesHeaderDataLength = 10
- packet.Buffers = buffer
- return ts.WritePESPacket(pes, packet)
- }
|