mpegts_psi.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package mpegts
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "m7s.live/engine/v4/util"
  7. )
  8. //
  9. // PSI
  10. //
  11. const (
  12. PSI_TYPE_PAT = 1
  13. PSI_TYPE_PMT = 2
  14. PSI_TYPE_NIT = 3
  15. PSI_TYPE_CAT = 4
  16. PSI_TYPE_TST = 5
  17. PSI_TYPE_IPMP_CIT = 6
  18. )
  19. type MpegTsPSI struct {
  20. // PAT
  21. // PMT
  22. // CAT
  23. // NIT
  24. Pat MpegTsPAT
  25. Pmt MpegTsPMT
  26. }
  27. // 当传输流包有效载荷包含 PSI 数据时,payload_unit_start_indicator 具有以下意义:
  28. // 若传输流包承载 PSI分段的首字节,则 payload_unit_start_indicator 值必为 1,指示此传输流包的有效载荷的首字节承载pointer_field.
  29. // 若传输流包不承载 PSI 分段的首字节,则 payload_unit_start_indicator 值必为 0,指示在此有效载荷中不存在 pointer_field
  30. // 只要是PSI就一定会有pointer_field
  31. func ReadPSI(r io.Reader, pt uint32) (lr *io.LimitedReader, psi MpegTsPSI, err error) {
  32. // pointer field(8)
  33. cr, ok := r.(*util.Crc32Reader)
  34. if ok {
  35. r = cr.R
  36. }
  37. pointer_field, err := util.ReadByteToUint8(r)
  38. if err != nil {
  39. return
  40. }
  41. if pointer_field != 0 {
  42. // 无论如何都应该确保能将pointer_field读取到,并且io.Reader指针向下移动
  43. // ioutil.Discard常用在,http中,如果Get请求,获取到了很大的Body,要丢弃Body,就用这个方法.
  44. // 因为http默认重链接的时候,必须等body读取完成.
  45. // 用于发送需要读取但不想存储的数据,目的是耗尽读取端的数据
  46. if _, err = io.CopyN(io.Discard, r, int64(pointer_field)); err != nil {
  47. return
  48. }
  49. }
  50. if ok {
  51. r = cr
  52. }
  53. // table id(8)
  54. tableId, err := util.ReadByteToUint8(r)
  55. if err != nil {
  56. return
  57. }
  58. // sectionSyntaxIndicator(1) + zero(1) + reserved1(2) + sectionLength(12)
  59. // sectionLength 前两个字节固定为00
  60. sectionSyntaxIndicatorAndSectionLength, err := util.ReadByteToUint16(r, true)
  61. if err != nil {
  62. return
  63. }
  64. // 指定该分段的字节数,紧随 section_length 字段开始,并包括 CRC
  65. // 因此剩下最多只能在读sectionLength长度的字节
  66. lr = &io.LimitedReader{R: r, N: int64(sectionSyntaxIndicatorAndSectionLength & 0x3FF)}
  67. // PAT TransportStreamID(16) or PMT ProgramNumber(16)
  68. transportStreamIdOrProgramNumber, err := util.ReadByteToUint16(lr, true)
  69. if err != nil {
  70. return
  71. }
  72. // reserved2(2) + versionNumber(5) + currentNextIndicator(1)
  73. versionNumberAndCurrentNextIndicator, err := util.ReadByteToUint8(lr)
  74. if err != nil {
  75. return
  76. }
  77. // sectionNumber(8)
  78. sectionNumber, err := util.ReadByteToUint8(lr)
  79. if err != nil {
  80. return
  81. }
  82. // lastSectionNumber(8)
  83. lastSectionNumber, err := util.ReadByteToUint8(lr)
  84. if err != nil {
  85. return
  86. }
  87. // 因为lr.N是从sectionLength开始计算,所以要减去 pointer_field(8) + table id(8) + sectionSyntaxIndicator(1) + zero(1) + reserved1(2) + sectionLength(12)
  88. lr.N -= 4
  89. switch pt {
  90. case PSI_TYPE_PAT:
  91. {
  92. if tableId != TABLE_PAS {
  93. err = errors.New(fmt.Sprintf("%s, id=%d", "read pmt table id != 2", tableId))
  94. return
  95. }
  96. psi.Pat.TableID = tableId
  97. psi.Pat.SectionSyntaxIndicator = uint8((sectionSyntaxIndicatorAndSectionLength & 0x8000) >> 15)
  98. psi.Pat.SectionLength = sectionSyntaxIndicatorAndSectionLength & 0x3FF
  99. psi.Pat.TransportStreamID = transportStreamIdOrProgramNumber
  100. psi.Pat.VersionNumber = versionNumberAndCurrentNextIndicator & 0x3e
  101. psi.Pat.CurrentNextIndicator = versionNumberAndCurrentNextIndicator & 0x01
  102. psi.Pat.SectionNumber = sectionNumber
  103. psi.Pat.LastSectionNumber = lastSectionNumber
  104. }
  105. case PSI_TYPE_PMT:
  106. {
  107. if tableId != TABLE_TSPMS {
  108. err = errors.New(fmt.Sprintf("%s, id=%d", "read pmt table id != 2", tableId))
  109. return
  110. }
  111. psi.Pmt.TableID = tableId
  112. psi.Pmt.SectionSyntaxIndicator = uint8((sectionSyntaxIndicatorAndSectionLength & 0x8000) >> 15)
  113. psi.Pmt.SectionLength = sectionSyntaxIndicatorAndSectionLength & 0x3FF
  114. psi.Pmt.ProgramNumber = transportStreamIdOrProgramNumber
  115. psi.Pmt.VersionNumber = versionNumberAndCurrentNextIndicator & 0x3e
  116. psi.Pmt.CurrentNextIndicator = versionNumberAndCurrentNextIndicator & 0x01
  117. psi.Pmt.SectionNumber = sectionNumber
  118. psi.Pmt.LastSectionNumber = lastSectionNumber
  119. }
  120. }
  121. return
  122. }
  123. func WritePSI(w io.Writer, pt uint32, psi MpegTsPSI, data []byte) (err error) {
  124. var tableId, versionNumberAndCurrentNextIndicator, sectionNumber, lastSectionNumber uint8
  125. var sectionSyntaxIndicatorAndSectionLength, transportStreamIdOrProgramNumber uint16
  126. switch pt {
  127. case PSI_TYPE_PAT:
  128. {
  129. if psi.Pat.TableID != TABLE_PAS {
  130. err = errors.New(fmt.Sprintf("%s, id=%d", "write pmt table id != 0", tableId))
  131. return
  132. }
  133. tableId = psi.Pat.TableID
  134. sectionSyntaxIndicatorAndSectionLength = uint16(psi.Pat.SectionSyntaxIndicator)<<15 | 3<<12 | psi.Pat.SectionLength
  135. transportStreamIdOrProgramNumber = psi.Pat.TransportStreamID
  136. versionNumberAndCurrentNextIndicator = psi.Pat.VersionNumber<<1 | psi.Pat.CurrentNextIndicator
  137. sectionNumber = psi.Pat.SectionNumber
  138. lastSectionNumber = psi.Pat.LastSectionNumber
  139. }
  140. case PSI_TYPE_PMT:
  141. {
  142. if psi.Pmt.TableID != TABLE_TSPMS {
  143. err = errors.New(fmt.Sprintf("%s, id=%d", "write pmt table id != 2", tableId))
  144. return
  145. }
  146. tableId = psi.Pmt.TableID
  147. sectionSyntaxIndicatorAndSectionLength = uint16(psi.Pmt.SectionSyntaxIndicator)<<15 | 3<<12 | psi.Pmt.SectionLength
  148. transportStreamIdOrProgramNumber = psi.Pmt.ProgramNumber
  149. versionNumberAndCurrentNextIndicator = psi.Pmt.VersionNumber<<1 | psi.Pmt.CurrentNextIndicator
  150. sectionNumber = psi.Pmt.SectionNumber
  151. lastSectionNumber = psi.Pmt.LastSectionNumber
  152. }
  153. }
  154. // pointer field(8)
  155. if err = util.WriteUint8ToByte(w, 0); err != nil {
  156. return
  157. }
  158. cw := &util.Crc32Writer{W: w, Crc32: 0xffffffff}
  159. // table id(8)
  160. if err = util.WriteUint8ToByte(cw, tableId); err != nil {
  161. return
  162. }
  163. // sectionSyntaxIndicator(1) + zero(1) + reserved1(2) + sectionLength(12)
  164. // sectionLength 前两个字节固定为00
  165. // 1 0 11 sectionLength
  166. if err = util.WriteUint16ToByte(cw, sectionSyntaxIndicatorAndSectionLength, true); err != nil {
  167. return
  168. }
  169. // PAT TransportStreamID(16) or PMT ProgramNumber(16)
  170. if err = util.WriteUint16ToByte(cw, transportStreamIdOrProgramNumber, true); err != nil {
  171. return
  172. }
  173. // reserved2(2) + versionNumber(5) + currentNextIndicator(1)
  174. // 0x3 << 6 -> 1100 0000
  175. // 0x3 << 6 | 1 -> 1100 0001
  176. if err = util.WriteUint8ToByte(cw, versionNumberAndCurrentNextIndicator); err != nil {
  177. return
  178. }
  179. // sectionNumber(8)
  180. if err = util.WriteUint8ToByte(cw, sectionNumber); err != nil {
  181. return
  182. }
  183. // lastSectionNumber(8)
  184. if err = util.WriteUint8ToByte(cw, lastSectionNumber); err != nil {
  185. return
  186. }
  187. // data
  188. if _, err = cw.Write(data); err != nil {
  189. return
  190. }
  191. // crc32
  192. crc32 := util.BigLittleSwap(uint(cw.Crc32))
  193. if err = util.WriteUint32ToByte(cw, uint32(crc32), true); err != nil {
  194. return
  195. }
  196. return
  197. }