subscriber.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. package engine
  2. import (
  3. "bufio"
  4. "context"
  5. "io"
  6. "net"
  7. "strconv"
  8. "time"
  9. "go.uber.org/zap"
  10. "go.uber.org/zap/zapcore"
  11. "m7s.live/engine/v4/codec"
  12. . "m7s.live/engine/v4/common"
  13. "m7s.live/engine/v4/config"
  14. "m7s.live/engine/v4/track"
  15. "m7s.live/engine/v4/util"
  16. )
  17. const (
  18. SUBTYPE_RAW = iota
  19. SUBTYPE_RTP
  20. SUBTYPE_FLV
  21. )
  22. const (
  23. SUBSTATE_INIT = iota
  24. SUBSTATE_FIRST
  25. SUBSTATE_NORMAL
  26. )
  27. // AVCC 格式的序列帧
  28. type VideoDeConf []byte
  29. // AVCC 格式的序列帧
  30. type AudioDeConf []byte
  31. type AudioFrame struct {
  32. *AVFrame
  33. *track.Audio
  34. AbsTime uint32
  35. PTS uint32
  36. DTS uint32
  37. }
  38. type VideoFrame struct {
  39. *AVFrame
  40. *track.Video
  41. AbsTime uint32
  42. PTS uint32
  43. DTS uint32
  44. }
  45. type FLVFrame net.Buffers
  46. type AudioRTP RTPFrame
  47. type VideoRTP RTPFrame
  48. type HasAnnexB interface {
  49. GetAnnexB() (r net.Buffers)
  50. }
  51. func (a AudioDeConf) WithOutRTMP() []byte {
  52. return a[2:]
  53. }
  54. func (v VideoDeConf) WithOutRTMP() []byte {
  55. return v[5:]
  56. }
  57. func (f FLVFrame) IsAudio() bool {
  58. return f[0][0] == codec.FLV_TAG_TYPE_AUDIO
  59. }
  60. func (f FLVFrame) IsVideo() bool {
  61. return f[0][0] == codec.FLV_TAG_TYPE_VIDEO
  62. }
  63. func (f FLVFrame) WriteTo(w io.Writer) (int64, error) {
  64. t := (net.Buffers)(f)
  65. return t.WriteTo(w)
  66. }
  67. func (a AudioFrame) GetADTS() (r net.Buffers) {
  68. r = append(append(r, a.ADTS.Value), a.AUList.ToBuffers()...)
  69. return
  70. }
  71. func (a AudioFrame) WriteRawTo(w io.Writer) (n int64, err error) {
  72. aulist := a.AUList.ToBuffers()
  73. return aulist.WriteTo(w)
  74. }
  75. func (v VideoFrame) GetAnnexB() (r net.Buffers) {
  76. if v.IFrame {
  77. r = v.ParamaterSets.GetAnnexB()
  78. }
  79. v.AUList.Range(func(au *util.BLL) bool {
  80. r = append(append(r, codec.NALU_Delimiter2), au.ToBuffers()...)
  81. return true
  82. })
  83. return
  84. }
  85. func (v VideoFrame) WriteAnnexBTo(w io.Writer) (n int64, err error) {
  86. annexB := v.GetAnnexB()
  87. return annexB.WriteTo(w)
  88. }
  89. type ISubscriber interface {
  90. IIO
  91. GetSubscriber() *Subscriber
  92. IsPlaying() bool
  93. PlayRaw()
  94. PlayBlock(byte)
  95. PlayFLV()
  96. Stop(reason ...zapcore.Field)
  97. Subscribe(streamPath string, sub ISubscriber) error
  98. }
  99. type TrackPlayer struct {
  100. context.Context
  101. context.CancelFunc
  102. AudioReader, VideoReader *track.AVRingReader
  103. Audio *track.Audio
  104. Video *track.Video
  105. }
  106. // Subscriber 订阅者实体定义
  107. type Subscriber struct {
  108. IO
  109. Config *config.Subscribe
  110. readers []*track.AVRingReader
  111. TrackPlayer `json:"-" yaml:"-"`
  112. }
  113. func (s *Subscriber) Subscribe(streamPath string, sub ISubscriber) error {
  114. return s.receive(streamPath, sub)
  115. }
  116. func (s *Subscriber) GetSubscriber() *Subscriber {
  117. return s
  118. }
  119. func (s *Subscriber) SetIO(i any) {
  120. s.IO.SetIO(i)
  121. if s.Writer != nil && s.Config != nil && s.Config.WriteBufferSize > 0 {
  122. s.Writer = bufio.NewWriterSize(s.Writer, s.Config.WriteBufferSize)
  123. }
  124. }
  125. func (s *Subscriber) OnEvent(event any) {
  126. switch v := event.(type) {
  127. case Track: //默认接受所有track
  128. s.AddTrack(v)
  129. default:
  130. s.IO.OnEvent(event)
  131. }
  132. }
  133. func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) {
  134. result = track.NewAVRingReader(t)
  135. s.readers = append(s.readers, result)
  136. result.Logger = s.With(zap.String("track", t.Name))
  137. return
  138. }
  139. func (s *Subscriber) AddTrack(t Track) bool {
  140. switch v := t.(type) {
  141. case *track.Video:
  142. if s.VideoReader != nil || !s.Config.SubVideo {
  143. return false
  144. }
  145. s.VideoReader = s.CreateTrackReader(&v.Media)
  146. s.Video = v
  147. case *track.Audio:
  148. if s.AudioReader != nil || !s.Config.SubAudio {
  149. return false
  150. }
  151. s.AudioReader = s.CreateTrackReader(&v.Media)
  152. s.Audio = v
  153. default:
  154. return false
  155. }
  156. s.Info("track+1", zap.String("name", t.GetName()))
  157. return true
  158. }
  159. func (s *Subscriber) IsPlaying() bool {
  160. return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil
  161. }
  162. func (s *Subscriber) SubPulse() {
  163. s.Stream.Receive(SubPulse{s.Spesific.(ISubscriber)})
  164. }
  165. func (s *Subscriber) PlayRaw() {
  166. s.PlayBlock(SUBTYPE_RAW)
  167. }
  168. func (s *Subscriber) PlayFLV() {
  169. s.PlayBlock(SUBTYPE_FLV)
  170. }
  171. func (s *Subscriber) PlayRTP() {
  172. s.PlayBlock(SUBTYPE_RTP)
  173. }
  174. // PlayBlock 阻塞式读取数据
  175. func (s *Subscriber) PlayBlock(subType byte) {
  176. spesic := s.Spesific
  177. if spesic == nil {
  178. s.Error("play before subscribe")
  179. return
  180. }
  181. if s.IO.Err() != nil {
  182. s.Error("play", zap.Error(s.IO.Err()))
  183. return
  184. }
  185. s.Info("playblock", zap.Uint8("subType", subType))
  186. s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO)
  187. defer s.TrackPlayer.CancelFunc()
  188. ctx := s.TrackPlayer.Context
  189. conf := s.Config
  190. hasVideo, hasAudio := s.Video != nil && conf.SubVideo, s.Audio != nil && conf.SubAudio
  191. stopReason := zap.String("reason", "stop")
  192. defer s.onStop(&stopReason)
  193. if !hasAudio && !hasVideo {
  194. stopReason = zap.String("reason", "play neither video nor audio")
  195. return
  196. }
  197. sendVideoDecConf := func() {
  198. // s.Debug("sendVideoDecConf")
  199. spesic.OnEvent(s.Video.ParamaterSets)
  200. spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead))
  201. }
  202. sendAudioDecConf := func() {
  203. // s.Debug("sendAudioDecConf")
  204. spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead))
  205. }
  206. var sendAudioFrame, sendVideoFrame func(*AVFrame)
  207. switch subType {
  208. case SUBTYPE_RAW:
  209. sendVideoFrame = func(frame *AVFrame) {
  210. if frame.AUList.ByteLength == 0 {
  211. return
  212. }
  213. spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()})
  214. }
  215. sendAudioFrame = func(frame *AVFrame) {
  216. if frame.AUList.ByteLength == 0 {
  217. return
  218. }
  219. // fmt.Println("a", s.AudioReader.Delay)
  220. // fmt.Println("a", frame.Sequence, s.AudioReader.AbsTime)
  221. spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()})
  222. }
  223. case SUBTYPE_RTP:
  224. var videoSeq, audioSeq uint16
  225. sendVideoFrame = func(frame *AVFrame) {
  226. // fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
  227. delta := uint32(s.VideoReader.SkipTs * 90 / time.Millisecond)
  228. frame.RTP.Range(func(vp RTPFrame) bool {
  229. videoSeq++
  230. copy := *vp.Packet
  231. vp.Packet = &copy
  232. vp.Header.Timestamp = vp.Header.Timestamp - delta
  233. vp.Header.SequenceNumber = videoSeq
  234. spesic.OnEvent((VideoRTP)(vp))
  235. return true
  236. })
  237. }
  238. sendAudioFrame = func(frame *AVFrame) {
  239. // fmt.Println("a", frame.Sequence, frame.Timestamp, s.AudioReader.AbsTime)
  240. delta := uint32(s.AudioReader.SkipTs / time.Millisecond * time.Duration(s.AudioReader.Track.SampleRate) / 1000)
  241. frame.RTP.Range(func(ap RTPFrame) bool {
  242. audioSeq++
  243. copy := *ap.Packet
  244. ap.Packet = &copy
  245. ap.Header.SequenceNumber = audioSeq
  246. ap.Header.Timestamp = ap.Header.Timestamp - delta
  247. spesic.OnEvent((AudioRTP)(ap))
  248. return true
  249. })
  250. }
  251. case SUBTYPE_FLV:
  252. flvHeadCache := make([]byte, 15) //内存复用
  253. sendFlvFrame := func(t byte, ts uint32, avcc ...[]byte) {
  254. // println(t, ts)
  255. // fmt.Printf("%d %X %X %d\n", t, avcc[0][0], avcc[0][1], ts)
  256. flvHeadCache[0] = t
  257. result := append(FLVFrame{flvHeadCache[:11]}, avcc...)
  258. dataSize := uint32(util.SizeOfBuffers(avcc))
  259. if dataSize == 0 {
  260. return
  261. }
  262. util.PutBE(flvHeadCache[1:4], dataSize)
  263. util.PutBE(flvHeadCache[4:7], ts)
  264. flvHeadCache[7] = byte(ts >> 24)
  265. spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11)))
  266. }
  267. sendVideoDecConf = func() {
  268. sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, s.VideoReader.Track.SequenceHead)
  269. }
  270. sendAudioDecConf = func() {
  271. sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, s.AudioReader.Track.SequenceHead)
  272. }
  273. sendVideoFrame = func(frame *AVFrame) {
  274. // fmt.Println(frame.Sequence, s.VideoReader.AbsTime, s.VideoReader.Delay, frame.IFrame)
  275. // b := util.Buffer(frame.AVCC.ToBytes()[5:])
  276. // for b.CanRead() {
  277. // nalulen := int(b.ReadUint32())
  278. // if b.CanReadN(nalulen) {
  279. // bb := b.ReadN(int(nalulen))
  280. // println(nalulen, codec.ParseH264NALUType(bb[0]))
  281. // } else {
  282. // println("error")
  283. // }
  284. // }
  285. sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...)
  286. }
  287. sendAudioFrame = func(frame *AVFrame) {
  288. // fmt.Println(frame.Sequence, s.AudioReader.AbsTime, s.AudioReader.Delay)
  289. sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...)
  290. }
  291. }
  292. var subMode = conf.SubMode //订阅模式
  293. if s.Args.Has(conf.SubModeArgName) {
  294. subMode, _ = strconv.Atoi(s.Args.Get(conf.SubModeArgName))
  295. }
  296. var initState = 0
  297. var videoFrame, audioFrame *AVFrame
  298. for ctx.Err() == nil {
  299. if hasVideo {
  300. for ctx.Err() == nil {
  301. err := s.VideoReader.ReadFrame(subMode)
  302. if err == nil {
  303. err = ctx.Err()
  304. }
  305. if err != nil {
  306. stopReason = zap.Error(err)
  307. return
  308. }
  309. videoFrame = s.VideoReader.Value
  310. // fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
  311. if videoFrame.IFrame && s.VideoReader.DecConfChanged() {
  312. s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq
  313. sendVideoDecConf()
  314. }
  315. if hasAudio {
  316. if audioFrame != nil {
  317. if util.Conditoinal(conf.SyncMode == 0, videoFrame.Timestamp > audioFrame.Timestamp, videoFrame.WriteTime.After(audioFrame.WriteTime)) {
  318. // fmt.Println("switch audio", audioFrame.CanRead)
  319. sendAudioFrame(audioFrame)
  320. audioFrame = nil
  321. break
  322. }
  323. } else if initState++; initState >= 2 {
  324. break
  325. }
  326. }
  327. if !conf.IFrameOnly || videoFrame.IFrame {
  328. sendVideoFrame(videoFrame)
  329. } else {
  330. // fmt.Println("skip video", frame.Sequence)
  331. }
  332. }
  333. }
  334. // 正常模式下或者纯音频模式下,音频开始播放
  335. if hasAudio {
  336. for ctx.Err() == nil {
  337. switch s.AudioReader.State {
  338. case track.READSTATE_INIT:
  339. if s.Video != nil {
  340. s.AudioReader.FirstTs = s.VideoReader.FirstTs
  341. }
  342. case track.READSTATE_NORMAL:
  343. if s.Video != nil {
  344. s.AudioReader.SkipTs = s.VideoReader.SkipTs
  345. }
  346. }
  347. err := s.AudioReader.ReadFrame(subMode)
  348. if err == nil {
  349. err = ctx.Err()
  350. }
  351. if err != nil {
  352. stopReason = zap.Error(err)
  353. return
  354. }
  355. audioFrame = s.AudioReader.Value
  356. // fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
  357. if s.AudioReader.DecConfChanged() {
  358. s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq
  359. sendAudioDecConf()
  360. }
  361. if hasVideo && videoFrame != nil {
  362. if util.Conditoinal(conf.SyncMode == 0, audioFrame.Timestamp > videoFrame.Timestamp, audioFrame.WriteTime.After(videoFrame.WriteTime)) {
  363. sendVideoFrame(videoFrame)
  364. videoFrame = nil
  365. break
  366. }
  367. }
  368. if audioFrame.Timestamp >= s.AudioReader.SkipTs {
  369. sendAudioFrame(audioFrame)
  370. } else {
  371. // fmt.Println("skip audio", frame.AbsTime, s.AudioReader.SkipTs)
  372. }
  373. }
  374. }
  375. }
  376. if videoFrame != nil {
  377. videoFrame.ReaderLeave()
  378. }
  379. if audioFrame != nil {
  380. audioFrame.ReaderLeave()
  381. }
  382. stopReason = zap.Error(ctx.Err())
  383. }
  384. func (s *Subscriber) onStop(reason *zapcore.Field) {
  385. if !s.Stream.IsClosed() {
  386. s.Info("play stop", *reason)
  387. if !s.Config.Internal {
  388. s.Stream.Receive(s.Spesific)
  389. }
  390. }
  391. }