123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578 |
- package mpegts
- import (
- "bytes"
- "errors"
- "io"
- "io/ioutil"
- "m7s.live/engine/v4/util"
- //"sync"
- )
- // NALU AUD 00 00 00 01 09 F0
- const (
- TS_PACKET_SIZE = 188
- TS_DVHS_PACKET_SIZE = 192
- TS_FEC_PACKET_SIZE = 204
- TS_MAX_PACKET_SIZE = 204
- PID_PAT = 0x0000
- PID_CAT = 0x0001
- PID_TSDT = 0x0002
- PID_RESERVED1 = 0x0003
- PID_RESERVED2 = 0x000F
- PID_NIT_ST = 0x0010
- PID_SDT_BAT_ST = 0x0011
- PID_EIT_ST = 0x0012
- PID_RST_ST = 0x0013
- PID_TDT_TOT_ST = 0x0014
- PID_NET_SYNC = 0x0015
- PID_RESERVED3 = 0x0016
- PID_RESERVED4 = 0x001B
- PID_SIGNALLING = 0x001C
- PID_MEASURE = 0x001D
- PID_DIT = 0x001E
- PID_SIT = 0x001F
- PID_PMT = 0x0100
- PID_VIDEO = 0x0101
- PID_AUDIO = 0x0102
- // 0x0003 - 0x000F Reserved
- // 0x0010 - 0x1FFE May be assigned as network_PID, Program_map_PID, elementary_PID, or for other purposes
- // 0x1FFF Null Packet
- // program_association_section
- // conditional_access_section
- // TS_program_map_section
- // TS_description_section
- // ISO_IEC_14496_scene_description_section
- // ISO_IEC_14496_object_descriptor_section
- // Metadata_section
- // IPMP_Control_Information_section (defined in ISO/IEC 13818-11)
- TABLE_PAS = 0x00
- TABLE_CAS = 0x01
- TABLE_TSPMS = 0x02
- TABLE_TSDS = 0x03
- TABLE_ISO_IEC_14496_SDC = 0x04
- TABLE_ISO_IEC_14496_ODC = 0x05
- TABLE_MS = 0x06
- TABLE_IPMP_CIS = 0x07
- // 0x06 - 0x37 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 reserved
- // 0x38 - 0x3F Defined in ISO/IEC 13818-6
- // 0x40 - 0xFE User private
- // 0xFF Forbidden
- STREAM_TYPE_VIDEO_MPEG1 = 0x01
- STREAM_TYPE_VIDEO_MPEG2 = 0x02
- STREAM_TYPE_AUDIO_MPEG1 = 0x03
- STREAM_TYPE_AUDIO_MPEG2 = 0x04
- STREAM_TYPE_PRIVATE_SECTIONS = 0x05
- STREAM_TYPE_PRIVATE_DATA = 0x06
- STREAM_TYPE_MHEG = 0x07
- STREAM_TYPE_H264 = 0x1B
- STREAM_TYPE_H265 = 0x24
- STREAM_TYPE_AAC = 0x0F
- STREAM_TYPE_G711A = 0x90
- STREAM_TYPE_G711U = 0x91
- STREAM_TYPE_G722_1 = 0x92
- STREAM_TYPE_G723_1 = 0x93
- STREAM_TYPE_G726 = 0x94
- STREAM_TYPE_G729 = 0x99
- STREAM_TYPE_ADPCM = 0x11
- STREAM_TYPE_PCM = 0x0A
- STREAM_TYPE_AC3 = 0x81
- STREAM_TYPE_DTS = 0x8A
- STREAM_TYPE_LPCM = 0x8B
- // 1110 xxxx
- // 110x xxxx
- STREAM_ID_VIDEO = 0xE0 // ITU-T Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 11172-2 or ISO/IEC14496-2 video stream number xxxx
- STREAM_ID_AUDIO = 0xC0 // ISO/IEC 13818-3 or ISO/IEC 11172-3 or ISO/IEC 13818-7 or ISO/IEC14496-3 audio stream number x xxxx
- PAT_PKT_TYPE = 0
- PMT_PKT_TYPE = 1
- PES_PKT_TYPE = 2
- )
- //
- // MPEGTS -> PAT + PMT + PES
- // ES -> PES -> TS
- //
- type MpegTsStream struct {
- PAT MpegTsPAT // PAT表信息
- PMT MpegTsPMT // PMT表信息
- PESBuffer map[uint16]*MpegTsPESPacket
- PESChan chan *MpegTsPESPacket
- }
- // ios13818-1-CN.pdf 33/165
- //
- // TS
- //
- // Packet == Header + Payload == 188 bytes
- type MpegTsPacket struct {
- Header MpegTsHeader
- Payload []byte
- }
- // 前面32bit的数据即TS分组首部,它指出了这个分组的属性
- type MpegTsHeader struct {
- SyncByte byte // 8 bits 同步字节,固定为0x47,表示后面是一个TS分组
- TransportErrorIndicator byte // 1 bit 传输错误标志位
- PayloadUnitStartIndicator byte // 1 bit 负载单元开始标志(packet不满188字节时需填充).为1时,表示在4个字节后,有一个调整字节
- TransportPriority byte // 1 bit 传输优先级
- Pid uint16 // 13 bits Packet ID号码,唯一的号码对应不同的包.为0表示携带的是PAT表
- TransportScramblingControl byte // 2 bits 加密标志位(00:未加密;其他表示已加密)
- AdaptionFieldControl byte // 2 bits 附加区域控制.表示TS分组首部后面是否跟随有调整字段和有效负载.01仅含有效负载(没有adaptation_field),10仅含调整字段(没有Payload),11含有调整字段和有效负载(有adaptation_field,adaptation_field之后是Payload).为00的话解码器不进行处理.空分组没有调整字段
- ContinuityCounter byte // 4 bits 包递增计数器.范围0-15,具有相同的PID的TS分组传输时每次加1,到15后清0.不过,有些情况下是不计数的.
- MpegTsHeaderAdaptationField
- }
- // 调整字段,只可能出现在每一帧的开头(当含有pcr的时候),或者结尾(当不满足188个字节的时候)
- // adaptionFieldControl 00 -> 高字节代表调整字段, 低字节代表负载字段 0x20 0x10
- // PCR字段编码在MPEG-2 TS包的自适应字段(Adaptation field)的6个Byte中,其中6 bits为预留位,42 bits为有效位()
- // MpegTsHeaderAdaptationField + stuffing bytes
- type MpegTsHeaderAdaptationField struct {
- AdaptationFieldLength byte // 8bits 本区域除了本字节剩下的长度(不包含本字节!!!切记), if adaptationFieldLength > 0, 那么就有下面8个字段. adaptation_field_length 值必须在 0 到 182 的区间内.当 adaptation_field_control 值为'10'时,adaptation_field_length 值必须为 183
- DiscontinuityIndicator byte // 1bit 置于"1"时,指示当前传输流包的不连续性状态为真.当 discontinuity_indicator 设置为"0"或不存在时,不连续性状态为假.不连续性指示符用于指示两种类型的不连续性,系统时间基不连续性和 continuity_counter 不连续性.
- RandomAccessIndicator byte // 1bit 指示当前的传输流包以及可能的具有相同 PID 的后续传输流包,在此点包含有助于随机接入的某些信息.特别的,该比特置于"1"时,在具有当前 PID 的传输流包的有效载荷中起始的下一个 PES 包必须包含一个 discontinuity_indicator 字段中规定的基本流接入点.此外,在视频情况中,显示时间标记必须在跟随基本流接入点的第一图像中存在
- ElementaryStreamPriorityIndicator byte // 1bit 在具有相同 PID 的包之间,它指示此传输流包有效载荷内承载的基本流数据的优先级.1->指示该有效载荷具有比其他传输流包有效载荷更高的优先级
- PCRFlag byte // 1bit 1->指示 adaptation_field 包含以两部分编码的 PCR 字段.0->指示自适应字段不包含任何 PCR 字段
- OPCRFlag byte // 1bit 1->指示 adaptation_field 包含以两部分编码的 OPCR字段.0->指示自适应字段不包含任何 OPCR 字段
- SplicingPointFlag byte // 1bit 1->指示 splice_countdown 字段必须在相关自适应字段中存在,指定拼接点的出现.0->指示自适应字段中 splice_countdown 字段不存在
- TrasportPrivateDataFlag byte // 1bit 1->指示自适应字段包含一个或多个 private_data 字节.0->指示自适应字段不包含任何 private_data 字节
- AdaptationFieldExtensionFlag byte // 1bit 1->指示自适应字段扩展的存在.0->指示自适应字段中自适应字段扩展不存在
- // Optional Fields
- ProgramClockReferenceBase uint64 // 33 bits pcr
- Reserved1 byte // 6 bits
- ProgramClockReferenceExtension uint16 // 9 bits
- OriginalProgramClockReferenceBase uint64 // 33 bits opcr
- Reserved2 byte // 6 bits
- OriginalProgramClockReferenceExtension uint16 // 9 bits
- SpliceCountdown byte // 8 bits
- TransportPrivateDataLength byte // 8 bits 指定紧随传输private_data_length 字段的 private_data 字节数. private_data 字节数不能使专用数据扩展超出自适应字段的范围
- PrivateDataByte byte // 8 bits 不通过 ITU-T|ISO/IEC 指定
- AdaptationFieldExtensionLength byte // 8 bits 指定紧随此字段的扩展的自适应字段数据的字节数,包括要保留的字节(如果存在)
- LtwFlag byte // 1 bit 1->指示 ltw_offset 字段存在
- PiecewiseRateFlag byte // 1 bit 1->指示 piecewise_rate 字段存在
- SeamlessSpliceFlag byte // 1 bit 1->指示 splice_type 以及 DTS_next_AU 字段存在. 0->指示无论是 splice_type 字段还是 DTS_next_AU 字段均不存在
- // Optional Fields
- LtwValidFlag byte // 1 bit 1->指示 ltw_offset 的值必将生效.0->指示 ltw_offset 字段中该值未定义
- LtwOffset uint16 // 15 bits 其值仅当 ltw_valid 标志字段具有'1'值时才定义.定义时,法定时间窗补偿以(300/fs)秒为度量单位,其中 fs 为此 PID 所归属的节目的系统时钟频率
- Reserved3 byte // 2 bits 保留
- PiecewiseRate uint32 // 22 bits 只要当 ltw_flag 和 ltw_valid_flag 均置于‘1’时,此 22 比特字段的含义才确定
- SpliceType byte // 4 bits
- DtsNextAU uint64 // 33 bits (解码时间标记下一个存取单元)
- // stuffing bytes
- // 此为固定的 8 比特值等于'1111 1111',能够通过编码器插入.它亦能被解码器丢弃
- }
- // ios13818-1-CN.pdf 77
- //
- // Descriptor
- //
- type MpegTsDescriptor struct {
- Tag byte // 8 bits 标识每一个描述符
- Length byte // 8 bits 指定紧随 descriptor_length 字段的描述符的字节数
- Data []byte
- }
- func ReadTsPacket(r io.Reader) (packet MpegTsPacket, err error) {
- lr := &io.LimitedReader{R: r, N: TS_PACKET_SIZE}
- // header
- packet.Header, err = ReadTsHeader(lr)
- if err != nil {
- return
- }
- // payload
- packet.Payload = make([]byte, lr.N)
- _, err = lr.Read(packet.Payload)
- if err != nil {
- return
- }
- return
- }
- func ReadTsHeader(r io.Reader) (header MpegTsHeader, err error) {
- var h uint32
- // MPEGTS Header 4个字节
- h, err = util.ReadByteToUint32(r, true)
- if err != nil {
- return
- }
- // payloadUnitStartIndicator
- // 为1时,表示在4个字节后,有一个调整字节.包头后需要除去一个字节才是有效数据(payload_unit_start_indicator="1")
- // header.payloadUnitStartIndicator = uint8(h & 0x400000)
- // | 1111 1111 | 0000 0000 | 0000 0000 | 0000 0000 |
- // | 1111 1111 | 0000 0000 | 0000 0000 | 0000 0000 |
- header.SyncByte = byte((h & 0xff000000) >> 24)
- if header.SyncByte != 0x47 {
- err = errors.New("mpegts header sync error!")
- return
- }
- // | 0000 0000 | 1000 0000 | 0000 0000 | 0000 0000 |
- header.TransportErrorIndicator = byte((h & 0x800000) >> 23)
- // | 0000 0000 | 0100 0000 | 0000 0000 | 0000 0000 |
- header.PayloadUnitStartIndicator = byte((h & 0x400000) >> 22)
- // | 0000 0000 | 0010 0000 | 0000 0000 | 0000 0000 |
- header.TransportPriority = byte((h & 0x200000) >> 21)
- // | 0000 0000 | 0001 1111 | 1111 1111 | 0000 0000 |
- header.Pid = uint16((h & 0x1fff00) >> 8)
- // | 0000 0000 | 0000 0000 | 0000 0000 | 1100 0000 |
- header.TransportScramblingControl = byte((h & 0xc0) >> 6)
- // | 0000 0000 | 0000 0000 | 0000 0000 | 0011 0000 |
- // 0x30 , 0x20 -> adaptation_field, 0x10 -> Payload
- header.AdaptionFieldControl = byte((h & 0x30) >> 4)
- // | 0000 0000 | 0000 0000 | 0000 0000 | 0000 1111 |
- header.ContinuityCounter = byte(h & 0xf)
- // | 0010 0000 |
- // adaptionFieldControl
- // 表示TS分组首部后面是否跟随有调整字段和有效负载.
- // 01仅含有效负载(没有adaptation_field)
- // 10仅含调整字段(没有Payload)
- // 11含有调整字段和有效负载(有adaptation_field,adaptation_field之后是Payload).
- // 为00的话解码器不进行处理.空分组没有调整字段
- // 当值为'11时,adaptation_field_length 值必须在0 到182 的区间内.
- // 当值为'10'时,adaptation_field_length 值必须为183.
- // 对于承载PES 包的传输流包,只要存在欠充足的PES 包数据就需要通过填充来完全填满传输流包的有效载荷字节.
- // 填充通过规定自适应字段长度比自适应字段中数据元的长度总和还要长来实现,以致于自适应字段在完全容纳有效的PES 包数据后,有效载荷字节仍有剩余.自适应字段中额外空间采用填充字节填满.
- if header.AdaptionFieldControl >= 2 {
- // adaptationFieldLength
- header.AdaptationFieldLength, err = util.ReadByteToUint8(r)
- if err != nil {
- return
- }
- if header.AdaptationFieldLength > 0 {
- lr := &io.LimitedReader{R: r, N: int64(header.AdaptationFieldLength)}
- // discontinuityIndicator(1)
- // randomAccessIndicator(1)
- // elementaryStreamPriorityIndicator
- // PCRFlag
- // OPCRFlag
- // splicingPointFlag
- // trasportPrivateDataFlag
- // adaptationFieldExtensionFlag
- var flags uint8
- flags, err = util.ReadByteToUint8(lr)
- if err != nil {
- return
- }
- header.DiscontinuityIndicator = flags & 0x80
- header.RandomAccessIndicator = flags & 0x40
- header.ElementaryStreamPriorityIndicator = flags & 0x20
- header.PCRFlag = flags & 0x10
- header.OPCRFlag = flags & 0x08
- header.SplicingPointFlag = flags & 0x04
- header.TrasportPrivateDataFlag = flags & 0x02
- header.AdaptationFieldExtensionFlag = flags & 0x01
- // randomAccessIndicator
- // 在此点包含有助于随机接入的某些信息.
- // 特别的,该比特置于"1"时,在具有当前 PID 的传输流包的有效载荷中起始的下一个 PES 包必须包含一个 discontinuity_indicator 字段中规定的基本流接入点.
- // 此外,在视频情况中,显示时间标记必须在跟随基本流接入点的第一图像中存在
- if header.RandomAccessIndicator != 0 {
- }
- // PCRFlag
- // 1->指示 adaptation_field 包含以两部分编码的 PCR 字段.
- // 0->指示自适应字段不包含任何 PCR 字段
- if header.PCRFlag != 0 {
- var pcr uint64
- pcr, err = util.ReadByteToUint48(lr, true)
- if err != nil {
- return
- }
- // PCR(i) = PCR_base(i)*300 + PCR_ext(i)
- // afd.programClockReferenceBase * 300 + afd.programClockReferenceExtension
- header.ProgramClockReferenceBase = pcr >> 15 // 9 bits + 6 bits
- header.ProgramClockReferenceExtension = uint16(pcr & 0x1ff) // 9 bits -> | 0000 0001 | 1111 1111 |
- }
- // OPCRFlag
- if header.OPCRFlag != 0 {
- var opcr uint64
- opcr, err = util.ReadByteToUint48(lr, true)
- if err != nil {
- return
- }
- // OPCR(i) = OPCR_base(i)*300 + OPCR_ext(i)
- // afd.originalProgramClockReferenceBase * 300 + afd.originalProgramClockReferenceExtension
- header.OriginalProgramClockReferenceBase = opcr >> 15 // 9 bits + 6 bits
- header.OriginalProgramClockReferenceExtension = uint16(opcr & 0x1ff) // 9 bits -> | 0000 0001 | 1111 1111 |
- }
- // splicingPointFlag
- // 1->指示 splice_countdown 字段必须在相关自适应字段中存在,指定拼接点的出现.
- // 0->指示自适应字段中 splice_countdown 字段不存在
- if header.SplicingPointFlag != 0 {
- header.SpliceCountdown, err = util.ReadByteToUint8(lr)
- if err != nil {
- return
- }
- }
- // trasportPrivateDataFlag
- // 1->指示自适应字段包含一个或多个 private_data 字节.
- // 0->指示自适应字段不包含任何 private_data 字节
- if header.TrasportPrivateDataFlag != 0 {
- header.TransportPrivateDataLength, err = util.ReadByteToUint8(lr)
- if err != nil {
- return
- }
- // privateDataByte
- b := make([]byte, header.TransportPrivateDataLength)
- if _, err = lr.Read(b); err != nil {
- return
- }
- }
- // adaptationFieldExtensionFlag
- if header.AdaptationFieldExtensionFlag != 0 {
- }
- // 消耗掉剩下的数据,我们不关心
- if lr.N > 0 {
- // Discard 是一个 io.Writer,对它进行的任何 Write 调用都将无条件成功
- // 但是ioutil.Discard不记录copy得到的数值
- // 用于发送需要读取但不想存储的数据,目的是耗尽读取端的数据
- if _, err = io.CopyN(ioutil.Discard, lr, int64(lr.N)); err != nil {
- return
- }
- }
- }
- }
- return
- }
- func WriteTsHeader(w io.Writer, header MpegTsHeader) (written int, err error) {
- if header.SyncByte != 0x47 {
- err = errors.New("mpegts header sync error!")
- return
- }
- h := uint32(header.SyncByte)<<24 + uint32(header.TransportErrorIndicator)<<23 + uint32(header.PayloadUnitStartIndicator)<<22 + uint32(header.TransportPriority)<<21 + uint32(header.Pid)<<8 + uint32(header.TransportScramblingControl)<<6 + uint32(header.AdaptionFieldControl)<<4 + uint32(header.ContinuityCounter)
- if err = util.WriteUint32ToByte(w, h, true); err != nil {
- return
- }
- written += 4
- if header.AdaptionFieldControl >= 2 {
- // adaptationFieldLength(8)
- if err = util.WriteUint8ToByte(w, header.AdaptationFieldLength); err != nil {
- return
- }
- written += 1
- if header.AdaptationFieldLength > 0 {
- // discontinuityIndicator(1)
- // randomAccessIndicator(1)
- // elementaryStreamPriorityIndicator(1)
- // PCRFlag(1)
- // OPCRFlag(1)
- // splicingPointFlag(1)
- // trasportPrivateDataFlag(1)
- // adaptationFieldExtensionFlag(1)
- threeIndicatorFiveFlags := uint8(header.DiscontinuityIndicator<<7) + uint8(header.RandomAccessIndicator<<6) + uint8(header.ElementaryStreamPriorityIndicator<<5) + uint8(header.PCRFlag<<4) + uint8(header.OPCRFlag<<3) + uint8(header.SplicingPointFlag<<2) + uint8(header.TrasportPrivateDataFlag<<1) + uint8(header.AdaptationFieldExtensionFlag)
- if err = util.WriteUint8ToByte(w, threeIndicatorFiveFlags); err != nil {
- return
- }
- written += 1
- // PCR(i) = PCR_base(i)*300 + PCR_ext(i)
- if header.PCRFlag != 0 {
- pcr := header.ProgramClockReferenceBase<<15 | 0x3f<<9 | uint64(header.ProgramClockReferenceExtension)
- if err = util.WriteUint48ToByte(w, pcr, true); err != nil {
- return
- }
- written += 6
- }
- // OPCRFlag
- if header.OPCRFlag != 0 {
- opcr := header.OriginalProgramClockReferenceBase<<15 | 0x3f<<9 | uint64(header.OriginalProgramClockReferenceExtension)
- if err = util.WriteUint48ToByte(w, opcr, true); err != nil {
- return
- }
- written += 6
- }
- }
- }
- return
- }
- //
- //func (s *MpegTsStream) TestWrite(fileName string) error {
- //
- // if fileName != "" {
- // file, err := os.Create(fileName)
- // if err != nil {
- // panic(err)
- // }
- // defer file.Close()
- //
- // patTsHeader := []byte{0x47, 0x40, 0x00, 0x10}
- //
- // if err := WritePATPacket(file, patTsHeader, *s.pat); err != nil {
- // panic(err)
- // }
- //
- // // TODO:这里的pid应该是由PAT给的
- // pmtTsHeader := []byte{0x47, 0x41, 0x00, 0x10}
- //
- // if err := WritePMTPacket(file, pmtTsHeader, *s.pmt); err != nil {
- // panic(err)
- // }
- // }
- //
- // var videoFrame int
- // var audioFrame int
- // for {
- // tsPesPkt, ok := <-s.TsPesPktChan
- // if !ok {
- // fmt.Println("frame index, video , audio :", videoFrame, audioFrame)
- // break
- // }
- //
- // if tsPesPkt.PesPkt.Header.StreamID == STREAM_ID_AUDIO {
- // audioFrame++
- // }
- //
- // if tsPesPkt.PesPkt.Header.StreamID == STREAM_ID_VIDEO {
- // println(tsPesPkt.PesPkt.Header.Pts)
- // videoFrame++
- // }
- //
- // fmt.Sprintf("%s", tsPesPkt)
- //
- // // if err := WritePESPacket(file, tsPesPkt.TsPkt.Header, tsPesPkt.PesPkt); err != nil {
- // // return err
- // // }
- //
- // }
- //
- // return nil
- //}
- func (s *MpegTsStream) ReadPAT(packet *MpegTsPacket, pr io.Reader) (err error) {
- // 首先找到PID==0x00的TS包(PAT)
- if PID_PAT == packet.Header.Pid {
- if len(packet.Payload) == 188 {
- pr = &util.Crc32Reader{R: pr, Crc32: 0xffffffff}
- }
- // Header + PSI + Paylod
- s.PAT, err = ReadPAT(pr)
- }
- return
- }
- func (s *MpegTsStream) ReadPMT(packet *MpegTsPacket, pr io.Reader) (err error) {
- // 在读取PAT中已经将所有频道节目信息(PMT_PID)保存了起来
- // 接着读取所有TS包里面的PID,找出PID==PMT_PID的TS包,就是PMT表
- for _, v := range s.PAT.Program {
- if v.ProgramMapPID == packet.Header.Pid {
- if len(packet.Payload) == 188 {
- pr = &util.Crc32Reader{R: pr, Crc32: 0xffffffff}
- }
- // Header + PSI + Paylod
- s.PMT, err = ReadPMT(pr)
- }
- }
- return
- }
- func (s *MpegTsStream) Feed(ts io.Reader) (err error) {
- var reader bytes.Reader
- var lr io.LimitedReader
- lr.R = &reader
- var tsHeader MpegTsHeader
- tsData := make([]byte, TS_PACKET_SIZE)
- for {
- _, err = io.ReadFull(ts, tsData)
- if err == io.EOF {
- // 文件结尾 把最后面的数据发出去
- for _, pesPkt := range s.PESBuffer {
- if pesPkt != nil {
- s.PESChan <- pesPkt
- }
- }
- return nil
- } else if err != nil {
- return
- }
- reader.Reset(tsData)
- lr.N = TS_PACKET_SIZE
- if tsHeader, err = ReadTsHeader(&lr); err != nil {
- return
- }
- if tsHeader.Pid == PID_PAT {
- if s.PAT, err = ReadPAT(&lr); err != nil {
- return
- }
- continue
- }
- if len(s.PMT.Stream) == 0 {
- for _, v := range s.PAT.Program {
- if v.ProgramMapPID == tsHeader.Pid {
- if s.PMT, err = ReadPMT(&lr); err != nil {
- return
- }
- for _, v := range s.PMT.Stream {
- s.PESBuffer[v.ElementaryPID] = nil
- }
- }
- continue
- }
- } else if pesPkt, ok := s.PESBuffer[tsHeader.Pid]; ok {
- if tsHeader.PayloadUnitStartIndicator == 1 {
- if pesPkt != nil {
- s.PESChan <- pesPkt
- }
- pesPkt = &MpegTsPESPacket{}
- s.PESBuffer[tsHeader.Pid] = pesPkt
- if pesPkt.Header, err = ReadPESHeader(&lr); err != nil {
- return
- }
- }
- io.Copy(&pesPkt.Payload, &lr)
- }
- }
- }
|