publisher-ts.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package engine
  2. import (
  3. "go.uber.org/zap"
  4. "m7s.live/engine/v4/codec/mpegts"
  5. "m7s.live/engine/v4/track"
  6. "m7s.live/engine/v4/util"
  7. )
  8. type TSReader struct {
  9. *TSPublisher
  10. mpegts.MpegTsStream
  11. }
  12. func NewTSReader(pub *TSPublisher) (r *TSReader) {
  13. r = &TSReader{
  14. TSPublisher: pub,
  15. }
  16. r.PESChan = make(chan *mpegts.MpegTsPESPacket, 50)
  17. r.PESBuffer = make(map[uint16]*mpegts.MpegTsPESPacket)
  18. go r.ReadPES()
  19. return
  20. }
  21. type TSPublisher struct {
  22. Publisher
  23. pool util.BytesPool
  24. }
  25. func (t *TSPublisher) OnEvent(event any) {
  26. switch v := event.(type) {
  27. case IPublisher:
  28. t.pool = make(util.BytesPool, 17)
  29. if v.GetPublisher() != &t.Publisher {
  30. t.AudioTrack = v.GetAudioTrack()
  31. t.VideoTrack = v.GetVideoTrack()
  32. }
  33. case SEKick, SEclose:
  34. // close(t.PESChan)
  35. t.Publisher.OnEvent(event)
  36. default:
  37. t.Publisher.OnEvent(event)
  38. }
  39. }
  40. func (t *TSPublisher) OnPmtStream(s mpegts.MpegTsPmtStream) {
  41. switch s.StreamType {
  42. case mpegts.STREAM_TYPE_H264:
  43. if t.VideoTrack == nil {
  44. t.VideoTrack = track.NewH264(t, t.pool)
  45. }
  46. case mpegts.STREAM_TYPE_H265:
  47. if t.VideoTrack == nil {
  48. t.VideoTrack = track.NewH265(t, t.pool)
  49. }
  50. case mpegts.STREAM_TYPE_AAC:
  51. if t.AudioTrack == nil {
  52. t.AudioTrack = track.NewAAC(t, t.pool)
  53. }
  54. case mpegts.STREAM_TYPE_G711A:
  55. if t.AudioTrack == nil {
  56. t.AudioTrack = track.NewG711(t, true, t.pool)
  57. }
  58. case mpegts.STREAM_TYPE_G711U:
  59. if t.AudioTrack == nil {
  60. t.AudioTrack = track.NewG711(t, false, t.pool)
  61. }
  62. default:
  63. t.Warn("unsupport stream type:", zap.Uint8("type", s.StreamType))
  64. }
  65. }
  66. func (t *TSReader) Close() {
  67. close(t.PESChan)
  68. }
  69. func (t *TSReader) ReadPES() {
  70. for pes := range t.PESChan {
  71. if t.Err() != nil {
  72. continue
  73. }
  74. if pes.Header.Dts == 0 {
  75. pes.Header.Dts = pes.Header.Pts
  76. }
  77. switch pes.Header.StreamID & 0xF0 {
  78. case mpegts.STREAM_ID_VIDEO:
  79. if t.VideoTrack == nil {
  80. for _, s := range t.PMT.Stream {
  81. t.OnPmtStream(s)
  82. }
  83. }
  84. if t.VideoTrack != nil {
  85. t.WriteAnnexB(uint32(pes.Header.Pts), uint32(pes.Header.Dts), pes.Payload)
  86. }
  87. default:
  88. if t.AudioTrack == nil {
  89. for _, s := range t.PMT.Stream {
  90. t.OnPmtStream(s)
  91. }
  92. }
  93. if t.AudioTrack != nil {
  94. switch t.AudioTrack.(type) {
  95. case *track.AAC:
  96. t.AudioTrack.WriteADTS(uint32(pes.Header.Pts), pes.Payload)
  97. case *track.G711:
  98. t.AudioTrack.WriteRawBytes(uint32(pes.Header.Pts), pes.Payload)
  99. }
  100. }
  101. }
  102. }
  103. }