mpegts.go 21 KB


  1. package mpegts
  2. import (
  3. "bytes"
  4. "errors"
  5. "io"
  6. "io/ioutil"
  7. "m7s.live/engine/v4/util"
  8. //"sync"
  9. )
  10. // NALU AUD 00 00 00 01 09 F0
  11. const (
  12. TS_PACKET_SIZE = 188
  13. TS_DVHS_PACKET_SIZE = 192
  14. TS_FEC_PACKET_SIZE = 204
  15. TS_MAX_PACKET_SIZE = 204
  16. PID_PAT = 0x0000
  17. PID_CAT = 0x0001
  18. PID_TSDT = 0x0002
  19. PID_RESERVED1 = 0x0003
  20. PID_RESERVED2 = 0x000F
  21. PID_NIT_ST = 0x0010
  22. PID_SDT_BAT_ST = 0x0011
  23. PID_EIT_ST = 0x0012
  24. PID_RST_ST = 0x0013
  25. PID_TDT_TOT_ST = 0x0014
  26. PID_NET_SYNC = 0x0015
  27. PID_RESERVED3 = 0x0016
  28. PID_RESERVED4 = 0x001B
  29. PID_SIGNALLING = 0x001C
  30. PID_MEASURE = 0x001D
  31. PID_DIT = 0x001E
  32. PID_SIT = 0x001F
  33. PID_PMT = 0x0100
  34. PID_VIDEO = 0x0101
  35. PID_AUDIO = 0x0102
  36. // 0x0003 - 0x000F Reserved
  37. // 0x0010 - 0x1FFE May be assigned as network_PID, Program_map_PID, elementary_PID, or for other purposes
  38. // 0x1FFF Null Packet
  39. // program_association_section
  40. // conditional_access_section
  41. // TS_program_map_section
  42. // TS_description_section
  43. // ISO_IEC_14496_scene_description_section
  44. // ISO_IEC_14496_object_descriptor_section
  45. // Metadata_section
  46. // IPMP_Control_Information_section (defined in ISO/IEC 13818-11)
  47. TABLE_PAS = 0x00
  48. TABLE_CAS = 0x01
  49. TABLE_TSPMS = 0x02
  50. TABLE_TSDS = 0x03
  51. TABLE_ISO_IEC_14496_SDC = 0x04
  52. TABLE_ISO_IEC_14496_ODC = 0x05
  53. TABLE_MS = 0x06
  54. TABLE_IPMP_CIS = 0x07
  55. // 0x06 - 0x37 ITU-T Rec. H.222.0 | ISO/IEC 13818-1 reserved
  56. // 0x38 - 0x3F Defined in ISO/IEC 13818-6
  57. // 0x40 - 0xFE User private
  58. // 0xFF Forbidden
  59. STREAM_TYPE_VIDEO_MPEG1 = 0x01
  60. STREAM_TYPE_VIDEO_MPEG2 = 0x02
  61. STREAM_TYPE_AUDIO_MPEG1 = 0x03
  62. STREAM_TYPE_AUDIO_MPEG2 = 0x04
  63. STREAM_TYPE_PRIVATE_SECTIONS = 0x05
  64. STREAM_TYPE_PRIVATE_DATA = 0x06
  65. STREAM_TYPE_MHEG = 0x07
  66. STREAM_TYPE_H264 = 0x1B
  67. STREAM_TYPE_H265 = 0x24
  68. STREAM_TYPE_AAC = 0x0F
  69. STREAM_TYPE_G711A = 0x90
  70. STREAM_TYPE_G711U = 0x91
  71. STREAM_TYPE_G722_1 = 0x92
  72. STREAM_TYPE_G723_1 = 0x93
  73. STREAM_TYPE_G726 = 0x94
  74. STREAM_TYPE_G729 = 0x99
  75. STREAM_TYPE_ADPCM = 0x11
  76. STREAM_TYPE_PCM = 0x0A
  77. STREAM_TYPE_AC3 = 0x81
  78. STREAM_TYPE_DTS = 0x8A
  79. STREAM_TYPE_LPCM = 0x8B
  80. // 1110 xxxx
  81. // 110x xxxx
  82. STREAM_ID_VIDEO = 0xE0 // ITU-T Rec. H.262 | ISO/IEC 13818-2 or ISO/IEC 11172-2 or ISO/IEC14496-2 video stream number xxxx
  83. STREAM_ID_AUDIO = 0xC0 // ISO/IEC 13818-3 or ISO/IEC 11172-3 or ISO/IEC 13818-7 or ISO/IEC14496-3 audio stream number x xxxx
  84. PAT_PKT_TYPE = 0
  85. PMT_PKT_TYPE = 1
  86. PES_PKT_TYPE = 2
  87. )
  88. //
  89. // MPEGTS -> PAT + PMT + PES
  90. // ES -> PES -> TS
  91. //
  92. type MpegTsStream struct {
  93. PAT MpegTsPAT // PAT表信息
  94. PMT MpegTsPMT // PMT表信息
  95. PESBuffer map[uint16]*MpegTsPESPacket
  96. PESChan chan *MpegTsPESPacket
  97. }
  98. // ios13818-1-CN.pdf 33/165
  99. //
  100. // TS
  101. //
  102. // Packet == Header + Payload == 188 bytes
  103. type MpegTsPacket struct {
  104. Header MpegTsHeader
  105. Payload []byte
  106. }
  107. // 前面32bit的数据即TS分组首部,它指出了这个分组的属性
  108. type MpegTsHeader struct {
  109. SyncByte byte // 8 bits 同步字节,固定为0x47,表示后面是一个TS分组
  110. TransportErrorIndicator byte // 1 bit 传输错误标志位
  111. PayloadUnitStartIndicator byte // 1 bit 负载单元开始标志(packet不满188字节时需填充).为1时,表示在4个字节后,有一个调整字节
  112. TransportPriority byte // 1 bit 传输优先级
  113. Pid uint16 // 13 bits Packet ID号码,唯一的号码对应不同的包.为0表示携带的是PAT表
  114. TransportScramblingControl byte // 2 bits 加密标志位(00:未加密;其他表示已加密)
  115. AdaptionFieldControl byte // 2 bits 附加区域控制.表示TS分组首部后面是否跟随有调整字段和有效负载.01仅含有效负载(没有adaptation_field),10仅含调整字段(没有Payload),11含有调整字段和有效负载(有adaptation_field,adaptation_field之后是Payload).为00的话解码器不进行处理.空分组没有调整字段
  116. ContinuityCounter byte // 4 bits 包递增计数器.范围0-15,具有相同的PID的TS分组传输时每次加1,到15后清0.不过,有些情况下是不计数的.
  117. MpegTsHeaderAdaptationField
  118. }
  119. // 调整字段,只可能出现在每一帧的开头(当含有pcr的时候),或者结尾(当不满足188个字节的时候)
  120. // adaptionFieldControl 00 -> 高字节代表调整字段, 低字节代表负载字段 0x20 0x10
  121. // PCR字段编码在MPEG-2 TS包的自适应字段(Adaptation field)的6个Byte中,其中6 bits为预留位,42 bits为有效位()
  122. // MpegTsHeaderAdaptationField + stuffing bytes
  123. type MpegTsHeaderAdaptationField struct {
  124. AdaptationFieldLength byte // 8bits 本区域除了本字节剩下的长度(不包含本字节!!!切记), if adaptationFieldLength > 0, 那么就有下面8个字段. adaptation_field_length 值必须在 0 到 182 的区间内.当 adaptation_field_control 值为'10'时,adaptation_field_length 值必须为 183
  125. DiscontinuityIndicator byte // 1bit 置于"1"时,指示当前传输流包的不连续性状态为真.当 discontinuity_indicator 设置为"0"或不存在时,不连续性状态为假.不连续性指示符用于指示两种类型的不连续性,系统时间基不连续性和 continuity_counter 不连续性.
  126. RandomAccessIndicator byte // 1bit 指示当前的传输流包以及可能的具有相同 PID 的后续传输流包,在此点包含有助于随机接入的某些信息.特别的,该比特置于"1"时,在具有当前 PID 的传输流包的有效载荷中起始的下一个 PES 包必须包含一个 discontinuity_indicator 字段中规定的基本流接入点.此外,在视频情况中,显示时间标记必须在跟随基本流接入点的第一图像中存在
  127. ElementaryStreamPriorityIndicator byte // 1bit 在具有相同 PID 的包之间,它指示此传输流包有效载荷内承载的基本流数据的优先级.1->指示该有效载荷具有比其他传输流包有效载荷更高的优先级
  128. PCRFlag byte // 1bit 1->指示 adaptation_field 包含以两部分编码的 PCR 字段.0->指示自适应字段不包含任何 PCR 字段
  129. OPCRFlag byte // 1bit 1->指示 adaptation_field 包含以两部分编码的 OPCR字段.0->指示自适应字段不包含任何 OPCR 字段
  130. SplicingPointFlag byte // 1bit 1->指示 splice_countdown 字段必须在相关自适应字段中存在,指定拼接点的出现.0->指示自适应字段中 splice_countdown 字段不存在
  131. TrasportPrivateDataFlag byte // 1bit 1->指示自适应字段包含一个或多个 private_data 字节.0->指示自适应字段不包含任何 private_data 字节
  132. AdaptationFieldExtensionFlag byte // 1bit 1->指示自适应字段扩展的存在.0->指示自适应字段中自适应字段扩展不存在
  133. // Optional Fields
  134. ProgramClockReferenceBase uint64 // 33 bits pcr
  135. Reserved1 byte // 6 bits
  136. ProgramClockReferenceExtension uint16 // 9 bits
  137. OriginalProgramClockReferenceBase uint64 // 33 bits opcr
  138. Reserved2 byte // 6 bits
  139. OriginalProgramClockReferenceExtension uint16 // 9 bits
  140. SpliceCountdown byte // 8 bits
  141. TransportPrivateDataLength byte // 8 bits 指定紧随传输private_data_length 字段的 private_data 字节数. private_data 字节数不能使专用数据扩展超出自适应字段的范围
  142. PrivateDataByte byte // 8 bits 不通过 ITU-T|ISO/IEC 指定
  143. AdaptationFieldExtensionLength byte // 8 bits 指定紧随此字段的扩展的自适应字段数据的字节数,包括要保留的字节(如果存在)
  144. LtwFlag byte // 1 bit 1->指示 ltw_offset 字段存在
  145. PiecewiseRateFlag byte // 1 bit 1->指示 piecewise_rate 字段存在
  146. SeamlessSpliceFlag byte // 1 bit 1->指示 splice_type 以及 DTS_next_AU 字段存在. 0->指示无论是 splice_type 字段还是 DTS_next_AU 字段均不存在
  147. // Optional Fields
  148. LtwValidFlag byte // 1 bit 1->指示 ltw_offset 的值必将生效.0->指示 ltw_offset 字段中该值未定义
  149. LtwOffset uint16 // 15 bits 其值仅当 ltw_valid 标志字段具有'1'值时才定义.定义时,法定时间窗补偿以(300/fs)秒为度量单位,其中 fs 为此 PID 所归属的节目的系统时钟频率
  150. Reserved3 byte // 2 bits 保留
  151. PiecewiseRate uint32 // 22 bits 只要当 ltw_flag 和 ltw_valid_flag 均置于‘1’时,此 22 比特字段的含义才确定
  152. SpliceType byte // 4 bits
  153. DtsNextAU uint64 // 33 bits (解码时间标记下一个存取单元)
  154. // stuffing bytes
  155. // 此为固定的 8 比特值等于'1111 1111',能够通过编码器插入.它亦能被解码器丢弃
  156. }
  157. // ios13818-1-CN.pdf 77
  158. //
  159. // Descriptor
  160. //
  161. type MpegTsDescriptor struct {
  162. Tag byte // 8 bits 标识每一个描述符
  163. Length byte // 8 bits 指定紧随 descriptor_length 字段的描述符的字节数
  164. Data []byte
  165. }
  166. func ReadTsPacket(r io.Reader) (packet MpegTsPacket, err error) {
  167. lr := &io.LimitedReader{R: r, N: TS_PACKET_SIZE}
  168. // header
  169. packet.Header, err = ReadTsHeader(lr)
  170. if err != nil {
  171. return
  172. }
  173. // payload
  174. packet.Payload = make([]byte, lr.N)
  175. _, err = lr.Read(packet.Payload)
  176. if err != nil {
  177. return
  178. }
  179. return
  180. }
  181. func ReadTsHeader(r io.Reader) (header MpegTsHeader, err error) {
  182. var h uint32
  183. // MPEGTS Header 4个字节
  184. h, err = util.ReadByteToUint32(r, true)
  185. if err != nil {
  186. return
  187. }
  188. // payloadUnitStartIndicator
  189. // 为1时,表示在4个字节后,有一个调整字节.包头后需要除去一个字节才是有效数据(payload_unit_start_indicator="1")
  190. // header.payloadUnitStartIndicator = uint8(h & 0x400000)
  191. // | 1111 1111 | 0000 0000 | 0000 0000 | 0000 0000 |
  192. // | 1111 1111 | 0000 0000 | 0000 0000 | 0000 0000 |
  193. header.SyncByte = byte((h & 0xff000000) >> 24)
  194. if header.SyncByte != 0x47 {
  195. err = errors.New("mpegts header sync error!")
  196. return
  197. }
  198. // | 0000 0000 | 1000 0000 | 0000 0000 | 0000 0000 |
  199. header.TransportErrorIndicator = byte((h & 0x800000) >> 23)
  200. // | 0000 0000 | 0100 0000 | 0000 0000 | 0000 0000 |
  201. header.PayloadUnitStartIndicator = byte((h & 0x400000) >> 22)
  202. // | 0000 0000 | 0010 0000 | 0000 0000 | 0000 0000 |
  203. header.TransportPriority = byte((h & 0x200000) >> 21)
  204. // | 0000 0000 | 0001 1111 | 1111 1111 | 0000 0000 |
  205. header.Pid = uint16((h & 0x1fff00) >> 8)
  206. // | 0000 0000 | 0000 0000 | 0000 0000 | 1100 0000 |
  207. header.TransportScramblingControl = byte((h & 0xc0) >> 6)
  208. // | 0000 0000 | 0000 0000 | 0000 0000 | 0011 0000 |
  209. // 0x30 , 0x20 -> adaptation_field, 0x10 -> Payload
  210. header.AdaptionFieldControl = byte((h & 0x30) >> 4)
  211. // | 0000 0000 | 0000 0000 | 0000 0000 | 0000 1111 |
  212. header.ContinuityCounter = byte(h & 0xf)
  213. // | 0010 0000 |
  214. // adaptionFieldControl
  215. // 表示TS分组首部后面是否跟随有调整字段和有效负载.
  216. // 01仅含有效负载(没有adaptation_field)
  217. // 10仅含调整字段(没有Payload)
  218. // 11含有调整字段和有效负载(有adaptation_field,adaptation_field之后是Payload).
  219. // 为00的话解码器不进行处理.空分组没有调整字段
  220. // 当值为'11时,adaptation_field_length 值必须在0 到182 的区间内.
  221. // 当值为'10'时,adaptation_field_length 值必须为183.
  222. // 对于承载PES 包的传输流包,只要存在欠充足的PES 包数据就需要通过填充来完全填满传输流包的有效载荷字节.
  223. // 填充通过规定自适应字段长度比自适应字段中数据元的长度总和还要长来实现,以致于自适应字段在完全容纳有效的PES 包数据后,有效载荷字节仍有剩余.自适应字段中额外空间采用填充字节填满.
  224. if header.AdaptionFieldControl >= 2 {
  225. // adaptationFieldLength
  226. header.AdaptationFieldLength, err = util.ReadByteToUint8(r)
  227. if err != nil {
  228. return
  229. }
  230. if header.AdaptationFieldLength > 0 {
  231. lr := &io.LimitedReader{R: r, N: int64(header.AdaptationFieldLength)}
  232. // discontinuityIndicator(1)
  233. // randomAccessIndicator(1)
  234. // elementaryStreamPriorityIndicator
  235. // PCRFlag
  236. // OPCRFlag
  237. // splicingPointFlag
  238. // trasportPrivateDataFlag
  239. // adaptationFieldExtensionFlag
  240. var flags uint8
  241. flags, err = util.ReadByteToUint8(lr)
  242. if err != nil {
  243. return
  244. }
  245. header.DiscontinuityIndicator = flags & 0x80
  246. header.RandomAccessIndicator = flags & 0x40
  247. header.ElementaryStreamPriorityIndicator = flags & 0x20
  248. header.PCRFlag = flags & 0x10
  249. header.OPCRFlag = flags & 0x08
  250. header.SplicingPointFlag = flags & 0x04
  251. header.TrasportPrivateDataFlag = flags & 0x02
  252. header.AdaptationFieldExtensionFlag = flags & 0x01
  253. // randomAccessIndicator
  254. // 在此点包含有助于随机接入的某些信息.
  255. // 特别的,该比特置于"1"时,在具有当前 PID 的传输流包的有效载荷中起始的下一个 PES 包必须包含一个 discontinuity_indicator 字段中规定的基本流接入点.
  256. // 此外,在视频情况中,显示时间标记必须在跟随基本流接入点的第一图像中存在
  257. if header.RandomAccessIndicator != 0 {
  258. }
  259. // PCRFlag
  260. // 1->指示 adaptation_field 包含以两部分编码的 PCR 字段.
  261. // 0->指示自适应字段不包含任何 PCR 字段
  262. if header.PCRFlag != 0 {
  263. var pcr uint64
  264. pcr, err = util.ReadByteToUint48(lr, true)
  265. if err != nil {
  266. return
  267. }
  268. // PCR(i) = PCR_base(i)*300 + PCR_ext(i)
  269. // afd.programClockReferenceBase * 300 + afd.programClockReferenceExtension
  270. header.ProgramClockReferenceBase = pcr >> 15 // 9 bits + 6 bits
  271. header.ProgramClockReferenceExtension = uint16(pcr & 0x1ff) // 9 bits -> | 0000 0001 | 1111 1111 |
  272. }
  273. // OPCRFlag
  274. if header.OPCRFlag != 0 {
  275. var opcr uint64
  276. opcr, err = util.ReadByteToUint48(lr, true)
  277. if err != nil {
  278. return
  279. }
  280. // OPCR(i) = OPCR_base(i)*300 + OPCR_ext(i)
  281. // afd.originalProgramClockReferenceBase * 300 + afd.originalProgramClockReferenceExtension
  282. header.OriginalProgramClockReferenceBase = opcr >> 15 // 9 bits + 6 bits
  283. header.OriginalProgramClockReferenceExtension = uint16(opcr & 0x1ff) // 9 bits -> | 0000 0001 | 1111 1111 |
  284. }
  285. // splicingPointFlag
  286. // 1->指示 splice_countdown 字段必须在相关自适应字段中存在,指定拼接点的出现.
  287. // 0->指示自适应字段中 splice_countdown 字段不存在
  288. if header.SplicingPointFlag != 0 {
  289. header.SpliceCountdown, err = util.ReadByteToUint8(lr)
  290. if err != nil {
  291. return
  292. }
  293. }
  294. // trasportPrivateDataFlag
  295. // 1->指示自适应字段包含一个或多个 private_data 字节.
  296. // 0->指示自适应字段不包含任何 private_data 字节
  297. if header.TrasportPrivateDataFlag != 0 {
  298. header.TransportPrivateDataLength, err = util.ReadByteToUint8(lr)
  299. if err != nil {
  300. return
  301. }
  302. // privateDataByte
  303. b := make([]byte, header.TransportPrivateDataLength)
  304. if _, err = lr.Read(b); err != nil {
  305. return
  306. }
  307. }
  308. // adaptationFieldExtensionFlag
  309. if header.AdaptationFieldExtensionFlag != 0 {
  310. }
  311. // 消耗掉剩下的数据,我们不关心
  312. if lr.N > 0 {
  313. // Discard 是一个 io.Writer,对它进行的任何 Write 调用都将无条件成功
  314. // 但是ioutil.Discard不记录copy得到的数值
  315. // 用于发送需要读取但不想存储的数据,目的是耗尽读取端的数据
  316. if _, err = io.CopyN(ioutil.Discard, lr, int64(lr.N)); err != nil {
  317. return
  318. }
  319. }
  320. }
  321. }
  322. return
  323. }
  324. func WriteTsHeader(w io.Writer, header MpegTsHeader) (written int, err error) {
  325. if header.SyncByte != 0x47 {
  326. err = errors.New("mpegts header sync error!")
  327. return
  328. }
  329. h := uint32(header.SyncByte)<<24 + uint32(header.TransportErrorIndicator)<<23 + uint32(header.PayloadUnitStartIndicator)<<22 + uint32(header.TransportPriority)<<21 + uint32(header.Pid)<<8 + uint32(header.TransportScramblingControl)<<6 + uint32(header.AdaptionFieldControl)<<4 + uint32(header.ContinuityCounter)
  330. if err = util.WriteUint32ToByte(w, h, true); err != nil {
  331. return
  332. }
  333. written += 4
  334. if header.AdaptionFieldControl >= 2 {
  335. // adaptationFieldLength(8)
  336. if err = util.WriteUint8ToByte(w, header.AdaptationFieldLength); err != nil {
  337. return
  338. }
  339. written += 1
  340. if header.AdaptationFieldLength > 0 {
  341. // discontinuityIndicator(1)
  342. // randomAccessIndicator(1)
  343. // elementaryStreamPriorityIndicator(1)
  344. // PCRFlag(1)
  345. // OPCRFlag(1)
  346. // splicingPointFlag(1)
  347. // trasportPrivateDataFlag(1)
  348. // adaptationFieldExtensionFlag(1)
  349. threeIndicatorFiveFlags := uint8(header.DiscontinuityIndicator<<7) + uint8(header.RandomAccessIndicator<<6) + uint8(header.ElementaryStreamPriorityIndicator<<5) + uint8(header.PCRFlag<<4) + uint8(header.OPCRFlag<<3) + uint8(header.SplicingPointFlag<<2) + uint8(header.TrasportPrivateDataFlag<<1) + uint8(header.AdaptationFieldExtensionFlag)
  350. if err = util.WriteUint8ToByte(w, threeIndicatorFiveFlags); err != nil {
  351. return
  352. }
  353. written += 1
  354. // PCR(i) = PCR_base(i)*300 + PCR_ext(i)
  355. if header.PCRFlag != 0 {
  356. pcr := header.ProgramClockReferenceBase<<15 | 0x3f<<9 | uint64(header.ProgramClockReferenceExtension)
  357. if err = util.WriteUint48ToByte(w, pcr, true); err != nil {
  358. return
  359. }
  360. written += 6
  361. }
  362. // OPCRFlag
  363. if header.OPCRFlag != 0 {
  364. opcr := header.OriginalProgramClockReferenceBase<<15 | 0x3f<<9 | uint64(header.OriginalProgramClockReferenceExtension)
  365. if err = util.WriteUint48ToByte(w, opcr, true); err != nil {
  366. return
  367. }
  368. written += 6
  369. }
  370. }
  371. }
  372. return
  373. }
  374. //
  375. //func (s *MpegTsStream) TestWrite(fileName string) error {
  376. //
  377. // if fileName != "" {
  378. // file, err := os.Create(fileName)
  379. // if err != nil {
  380. // panic(err)
  381. // }
  382. // defer file.Close()
  383. //
  384. // patTsHeader := []byte{0x47, 0x40, 0x00, 0x10}
  385. //
  386. // if err := WritePATPacket(file, patTsHeader, *s.pat); err != nil {
  387. // panic(err)
  388. // }
  389. //
  390. // // TODO:这里的pid应该是由PAT给的
  391. // pmtTsHeader := []byte{0x47, 0x41, 0x00, 0x10}
  392. //
  393. // if err := WritePMTPacket(file, pmtTsHeader, *s.pmt); err != nil {
  394. // panic(err)
  395. // }
  396. // }
  397. //
  398. // var videoFrame int
  399. // var audioFrame int
  400. // for {
  401. // tsPesPkt, ok := <-s.TsPesPktChan
  402. // if !ok {
  403. // fmt.Println("frame index, video , audio :", videoFrame, audioFrame)
  404. // break
  405. // }
  406. //
  407. // if tsPesPkt.PesPkt.Header.StreamID == STREAM_ID_AUDIO {
  408. // audioFrame++
  409. // }
  410. //
  411. // if tsPesPkt.PesPkt.Header.StreamID == STREAM_ID_VIDEO {
  412. // println(tsPesPkt.PesPkt.Header.Pts)
  413. // videoFrame++
  414. // }
  415. //
  416. // fmt.Sprintf("%s", tsPesPkt)
  417. //
  418. // // if err := WritePESPacket(file, tsPesPkt.TsPkt.Header, tsPesPkt.PesPkt); err != nil {
  419. // // return err
  420. // // }
  421. //
  422. // }
  423. //
  424. // return nil
  425. //}
  426. func (s *MpegTsStream) ReadPAT(packet *MpegTsPacket, pr io.Reader) (err error) {
  427. // 首先找到PID==0x00的TS包(PAT)
  428. if PID_PAT == packet.Header.Pid {
  429. if len(packet.Payload) == 188 {
  430. pr = &util.Crc32Reader{R: pr, Crc32: 0xffffffff}
  431. }
  432. // Header + PSI + Paylod
  433. s.PAT, err = ReadPAT(pr)
  434. }
  435. return
  436. }
  437. func (s *MpegTsStream) ReadPMT(packet *MpegTsPacket, pr io.Reader) (err error) {
  438. // 在读取PAT中已经将所有频道节目信息(PMT_PID)保存了起来
  439. // 接着读取所有TS包里面的PID,找出PID==PMT_PID的TS包,就是PMT表
  440. for _, v := range s.PAT.Program {
  441. if v.ProgramMapPID == packet.Header.Pid {
  442. if len(packet.Payload) == 188 {
  443. pr = &util.Crc32Reader{R: pr, Crc32: 0xffffffff}
  444. }
  445. // Header + PSI + Paylod
  446. s.PMT, err = ReadPMT(pr)
  447. }
  448. }
  449. return
  450. }
  451. func (s *MpegTsStream) Feed(ts io.Reader) (err error) {
  452. var reader bytes.Reader
  453. var lr io.LimitedReader
  454. lr.R = &reader
  455. var tsHeader MpegTsHeader
  456. tsData := make([]byte, TS_PACKET_SIZE)
  457. for {
  458. _, err = io.ReadFull(ts, tsData)
  459. if err == io.EOF {
  460. // 文件结尾 把最后面的数据发出去
  461. for _, pesPkt := range s.PESBuffer {
  462. if pesPkt != nil {
  463. s.PESChan <- pesPkt
  464. }
  465. }
  466. return nil
  467. } else if err != nil {
  468. return
  469. }
  470. reader.Reset(tsData)
  471. lr.N = TS_PACKET_SIZE
  472. if tsHeader, err = ReadTsHeader(&lr); err != nil {
  473. return
  474. }
  475. if tsHeader.Pid == PID_PAT {
  476. if s.PAT, err = ReadPAT(&lr); err != nil {
  477. return
  478. }
  479. continue
  480. }
  481. if len(s.PMT.Stream) == 0 {
  482. for _, v := range s.PAT.Program {
  483. if v.ProgramMapPID == tsHeader.Pid {
  484. if s.PMT, err = ReadPMT(&lr); err != nil {
  485. return
  486. }
  487. for _, v := range s.PMT.Stream {
  488. s.PESBuffer[v.ElementaryPID] = nil
  489. }
  490. }
  491. continue
  492. }
  493. } else if pesPkt, ok := s.PESBuffer[tsHeader.Pid]; ok {
  494. if tsHeader.PayloadUnitStartIndicator == 1 {
  495. if pesPkt != nil {
  496. s.PESChan <- pesPkt
  497. }
  498. pesPkt = &MpegTsPESPacket{}
  499. s.PESBuffer[tsHeader.Pid] = pesPkt
  500. if pesPkt.Header, err = ReadPESHeader(&lr); err != nil {
  501. return
  502. }
  503. }
  504. io.Copy(&pesPkt.Payload, &lr)
  505. }
  506. }
  507. }