subscriber.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429
  1. package engine
  2. import (
  3. "bufio"
  4. "context"
  5. "io"
  6. "net"
  7. "strconv"
  8. "time"
  9. //"fmt"
  10. "go.uber.org/zap"
  11. "go.uber.org/zap/zapcore"
  12. "m7s.live/engine/v4/codec"
  13. . "m7s.live/engine/v4/common"
  14. "m7s.live/engine/v4/config"
  15. "m7s.live/engine/v4/track"
  16. "m7s.live/engine/v4/util"
  17. )
  18. const (
  19. SUBTYPE_RAW = iota
  20. SUBTYPE_RTP
  21. SUBTYPE_FLV
  22. )
  23. const (
  24. SUBSTATE_INIT = iota
  25. SUBSTATE_FIRST
  26. SUBSTATE_NORMAL
  27. )
  28. // AVCC 格式的序列帧
  29. type VideoDeConf []byte
  30. // AVCC 格式的序列帧
  31. type AudioDeConf []byte
  32. type AudioFrame struct {
  33. *AVFrame
  34. *track.Audio
  35. AbsTime uint32
  36. PTS uint32
  37. DTS uint32
  38. }
  39. type VideoFrame struct {
  40. *AVFrame
  41. *track.Video
  42. AbsTime uint32
  43. PTS uint32
  44. DTS uint32
  45. }
  46. type FLVFrame net.Buffers
  47. type AudioRTP RTPFrame
  48. type VideoRTP RTPFrame
  49. type HasAnnexB interface {
  50. GetAnnexB() (r net.Buffers)
  51. }
  52. func (a AudioDeConf) WithOutRTMP() []byte {
  53. return a[2:]
  54. }
  55. func (v VideoDeConf) WithOutRTMP() []byte {
  56. return v[5:]
  57. }
  58. func (f FLVFrame) IsAudio() bool {
  59. return f[0][0] == codec.FLV_TAG_TYPE_AUDIO
  60. }
  61. func (f FLVFrame) IsVideo() bool {
  62. return f[0][0] == codec.FLV_TAG_TYPE_VIDEO
  63. }
  64. func (f FLVFrame) WriteTo(w io.Writer) (int64, error) {
  65. t := (net.Buffers)(f)
  66. return t.WriteTo(w)
  67. }
  68. func (a AudioFrame) GetADTS() (r net.Buffers) {
  69. r = append(append(r, a.ADTS.Value), a.AUList.ToBuffers()...)
  70. return
  71. }
  72. func (a AudioFrame) WriteRawTo(w io.Writer) (n int64, err error) {
  73. aulist := a.AUList.ToBuffers()
  74. return aulist.WriteTo(w)
  75. }
  76. func (v VideoFrame) GetAnnexB() (r net.Buffers) {
  77. if v.IFrame {
  78. r = v.ParamaterSets.GetAnnexB()
  79. }
  80. v.AUList.Range(func(au *util.BLL) bool {
  81. r = append(append(r, codec.NALU_Delimiter2), au.ToBuffers()...)
  82. return true
  83. })
  84. return
  85. }
  86. func (v VideoFrame) WriteAnnexBTo(w io.Writer) (n int64, err error) {
  87. annexB := v.GetAnnexB()
  88. return annexB.WriteTo(w)
  89. }
  90. type ISubscriber interface {
  91. IIO
  92. GetSubscriber() *Subscriber
  93. IsPlaying() bool
  94. PlayRaw()
  95. PlayBlock(byte)
  96. PlayFLV()
  97. Stop(reason ...zapcore.Field)
  98. Subscribe(streamPath string, sub ISubscriber) error
  99. }
  100. type TrackPlayer struct {
  101. context.Context
  102. context.CancelFunc
  103. AudioReader, VideoReader *track.AVRingReader
  104. Audio *track.Audio
  105. Video *track.Video
  106. }
  107. // Subscriber 订阅者实体定义
  108. type Subscriber struct {
  109. IO
  110. Config *config.Subscribe
  111. readers []*track.AVRingReader
  112. TrackPlayer `json:"-" yaml:"-"`
  113. }
  114. func (s *Subscriber) Subscribe(streamPath string, sub ISubscriber) error {
  115. return s.receive(streamPath, sub)
  116. }
  117. func (s *Subscriber) GetSubscriber() *Subscriber {
  118. return s
  119. }
  120. func (s *Subscriber) SetIO(i any) {
  121. s.IO.SetIO(i)
  122. if s.Writer != nil && s.Config != nil && s.Config.WriteBufferSize > 0 {
  123. s.Writer = bufio.NewWriterSize(s.Writer, s.Config.WriteBufferSize)
  124. }
  125. }
  126. func (s *Subscriber) OnEvent(event any) {
  127. switch v := event.(type) {
  128. case Track: //默认接受所有track
  129. s.AddTrack(v)
  130. default:
  131. s.IO.OnEvent(event)
  132. }
  133. }
  134. func (s *Subscriber) CreateTrackReader(t *track.Media) (result *track.AVRingReader) {
  135. result = track.NewAVRingReader(t)
  136. s.readers = append(s.readers, result)
  137. result.Logger = s.With(zap.String("track", t.Name))
  138. return
  139. }
  140. func (s *Subscriber) AddTrack(t Track) bool {
  141. switch v := t.(type) {
  142. case *track.Video:
  143. if s.VideoReader != nil || !s.Config.SubVideo {
  144. return false
  145. }
  146. s.VideoReader = s.CreateTrackReader(&v.Media)
  147. s.Video = v
  148. case *track.Audio:
  149. if s.AudioReader != nil || !s.Config.SubAudio {
  150. return false
  151. }
  152. s.AudioReader = s.CreateTrackReader(&v.Media)
  153. s.Audio = v
  154. default:
  155. return false
  156. }
  157. s.Info("track+1", zap.String("name", t.GetName()))
  158. return true
  159. }
  160. func (s *Subscriber) IsPlaying() bool {
  161. return s.TrackPlayer.Context != nil && s.TrackPlayer.Err() == nil
  162. }
  163. func (s *Subscriber) SubPulse() {
  164. s.Stream.Receive(SubPulse{s.Spesific.(ISubscriber)})
  165. }
  166. func (s *Subscriber) PlayRaw() {
  167. s.PlayBlock(SUBTYPE_RAW)
  168. }
  169. func (s *Subscriber) PlayFLV() {
  170. s.PlayBlock(SUBTYPE_FLV)
  171. }
  172. func (s *Subscriber) PlayRTP() {
  173. s.PlayBlock(SUBTYPE_RTP)
  174. }
  175. // PlayBlock 阻塞式读取数据
  176. func (s *Subscriber) PlayBlock(subType byte) {
  177. spesic := s.Spesific
  178. if spesic == nil {
  179. s.Error("play before subscribe")
  180. return
  181. }
  182. if s.IO.Err() != nil {
  183. s.Error("play", zap.Error(s.IO.Err()))
  184. return
  185. }
  186. s.Info("playblock", zap.Uint8("subType", subType))
  187. s.TrackPlayer.Context, s.TrackPlayer.CancelFunc = context.WithCancel(s.IO)
  188. defer s.TrackPlayer.CancelFunc()
  189. ctx := s.TrackPlayer.Context
  190. conf := s.Config
  191. hasVideo, hasAudio := s.Video != nil && conf.SubVideo, s.Audio != nil && conf.SubAudio
  192. stopReason := zap.String("reason", "stop")
  193. defer s.onStop(&stopReason)
  194. if !hasAudio && !hasVideo {
  195. stopReason = zap.String("reason", "play neither video nor audio")
  196. return
  197. }
  198. sendVideoDecConf := func() {
  199. // s.Debug("sendVideoDecConf")
  200. spesic.OnEvent(s.Video.ParamaterSets)
  201. spesic.OnEvent(VideoDeConf(s.VideoReader.Track.SequenceHead))
  202. }
  203. sendAudioDecConf := func() {
  204. // s.Debug("sendAudioDecConf")
  205. spesic.OnEvent(AudioDeConf(s.AudioReader.Track.SequenceHead))
  206. }
  207. var sendAudioFrame, sendVideoFrame func(*AVFrame)
  208. switch subType {
  209. case SUBTYPE_RAW:
  210. sendVideoFrame = func(frame *AVFrame) {
  211. if frame.AUList.ByteLength == 0 {
  212. return
  213. }
  214. //fmt.Println("on video frame 11111111111111111")
  215. spesic.OnEvent(VideoFrame{frame, s.Video, s.VideoReader.AbsTime, s.VideoReader.GetPTS32(), s.VideoReader.GetDTS32()})
  216. }
  217. sendAudioFrame = func(frame *AVFrame) {
  218. if frame.AUList.ByteLength == 0 {
  219. return
  220. }
  221. // fmt.Println("a", s.AudioReader.Delay)
  222. // fmt.Println("a", frame.Sequence, s.AudioReader.AbsTime)
  223. spesic.OnEvent(AudioFrame{frame, s.Audio, s.AudioReader.AbsTime, s.AudioReader.GetPTS32(), s.AudioReader.GetDTS32()})
  224. }
  225. case SUBTYPE_RTP:
  226. var videoSeq, audioSeq uint16
  227. sendVideoFrame = func(frame *AVFrame) {
  228. // fmt.Println("v", frame.Sequence, frame.AbsTime, s.VideoReader.AbsTime, frame.IFrame)
  229. delta := uint32(s.VideoReader.SkipTs * 90 / time.Millisecond)
  230. frame.RTP.Range(func(vp RTPFrame) bool {
  231. videoSeq++
  232. copy := *vp.Packet
  233. vp.Packet = &copy
  234. vp.Header.Timestamp = vp.Header.Timestamp - delta
  235. vp.Header.SequenceNumber = videoSeq
  236. spesic.OnEvent((VideoRTP)(vp))
  237. return true
  238. })
  239. }
  240. sendAudioFrame = func(frame *AVFrame) {
  241. // fmt.Println("a", frame.Sequence, frame.Timestamp, s.AudioReader.AbsTime)
  242. delta := uint32(s.AudioReader.SkipTs / time.Millisecond * time.Duration(s.AudioReader.Track.SampleRate) / 1000)
  243. frame.RTP.Range(func(ap RTPFrame) bool {
  244. audioSeq++
  245. copy := *ap.Packet
  246. ap.Packet = &copy
  247. ap.Header.SequenceNumber = audioSeq
  248. ap.Header.Timestamp = ap.Header.Timestamp - delta
  249. spesic.OnEvent((AudioRTP)(ap))
  250. return true
  251. })
  252. }
  253. case SUBTYPE_FLV:
  254. flvHeadCache := make([]byte, 15) //内存复用
  255. sendFlvFrame := func(t byte, ts uint32, avcc ...[]byte) {
  256. // println(t, ts)
  257. // fmt.Printf("%d %X %X %d\n", t, avcc[0][0], avcc[0][1], ts)
  258. flvHeadCache[0] = t
  259. result := append(FLVFrame{flvHeadCache[:11]}, avcc...)
  260. dataSize := uint32(util.SizeOfBuffers(avcc))
  261. if dataSize == 0 {
  262. return
  263. }
  264. util.PutBE(flvHeadCache[1:4], dataSize)
  265. util.PutBE(flvHeadCache[4:7], ts)
  266. flvHeadCache[7] = byte(ts >> 24)
  267. spesic.OnEvent(append(result, util.PutBE(flvHeadCache[11:15], dataSize+11)))
  268. }
  269. sendVideoDecConf = func() {
  270. sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, s.VideoReader.Track.SequenceHead)
  271. }
  272. sendAudioDecConf = func() {
  273. sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, s.AudioReader.Track.SequenceHead)
  274. }
  275. sendVideoFrame = func(frame *AVFrame) {
  276. // fmt.Println(frame.Sequence, s.VideoReader.AbsTime, s.VideoReader.Delay, frame.IFrame)
  277. // b := util.Buffer(frame.AVCC.ToBytes()[5:])
  278. // for b.CanRead() {
  279. // nalulen := int(b.ReadUint32())
  280. // if b.CanReadN(nalulen) {
  281. // bb := b.ReadN(int(nalulen))
  282. // println(nalulen, codec.ParseH264NALUType(bb[0]))
  283. // } else {
  284. // println("error")
  285. // }
  286. // }
  287. sendFlvFrame(codec.FLV_TAG_TYPE_VIDEO, s.VideoReader.AbsTime, frame.AVCC.ToBuffers()...)
  288. }
  289. sendAudioFrame = func(frame *AVFrame) {
  290. // fmt.Println(frame.Sequence, s.AudioReader.AbsTime, s.AudioReader.Delay)
  291. sendFlvFrame(codec.FLV_TAG_TYPE_AUDIO, s.AudioReader.AbsTime, frame.AVCC.ToBuffers()...)
  292. }
  293. }
  294. var subMode = conf.SubMode //订阅模式
  295. if s.Args.Has(conf.SubModeArgName) {
  296. subMode, _ = strconv.Atoi(s.Args.Get(conf.SubModeArgName))
  297. }
  298. var initState = 0
  299. var videoFrame, audioFrame *AVFrame
  300. for ctx.Err() == nil {
  301. if hasVideo {
  302. //fmt.Println("has video 111111111111111111111111111111111111111111111111111111111",ctx.Err())
  303. for ctx.Err() == nil {
  304. //fmt.Println(" video loap 111111111111111111111111111111111111111111111111111111111")
  305. err := s.VideoReader.ReadFrame(subMode)
  306. if err == nil {
  307. err = ctx.Err()
  308. }
  309. if err != nil {
  310. stopReason = zap.Error(err)
  311. return
  312. }
  313. videoFrame = s.VideoReader.Value
  314. if videoFrame.IFrame && s.VideoReader.DecConfChanged() {
  315. s.VideoReader.ConfSeq = s.VideoReader.Track.SequenceHeadSeq
  316. sendVideoDecConf()
  317. }
  318. if hasAudio {
  319. if audioFrame != nil {
  320. if util.Conditoinal(conf.SyncMode == 0, videoFrame.Timestamp > audioFrame.Timestamp, videoFrame.WriteTime.After(audioFrame.WriteTime)) {
  321. //fmt.Println("audioFrame IS NOT null 11111111111111")
  322. sendAudioFrame(audioFrame)
  323. audioFrame = nil
  324. break
  325. }
  326. } else if initState++; initState >= 2 {
  327. //fmt.Println("audioFrame IS null BREAK",initState)
  328. break
  329. }
  330. }
  331. if !conf.IFrameOnly || videoFrame.IFrame {
  332. //fmt.Println("SEND sendVideoFrame")
  333. sendVideoFrame(videoFrame)
  334. } else {
  335. }
  336. }
  337. }
  338. // 正常模式下或者纯音频模式下,音频开始播放
  339. if hasAudio {
  340. //fmt.Println("has audio 111111111111111111111111111111111111111111111111111111111",ctx.Err())
  341. for ctx.Err() == nil {
  342. //fmt.Println(" audio loap 111111111111111111111111111111111111111111111111111111111")
  343. switch s.AudioReader.State {
  344. case track.READSTATE_INIT:
  345. if s.Video != nil {
  346. s.AudioReader.FirstTs = s.VideoReader.FirstTs
  347. }
  348. case track.READSTATE_NORMAL:
  349. if s.Video != nil {
  350. s.AudioReader.SkipTs = s.VideoReader.SkipTs
  351. }
  352. }
  353. err := s.AudioReader.ReadFrame(subMode)
  354. if err == nil {
  355. err = ctx.Err()
  356. }
  357. if err != nil {
  358. stopReason = zap.Error(err)
  359. return
  360. }
  361. audioFrame = s.AudioReader.Value
  362. // fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
  363. if s.AudioReader.DecConfChanged() {
  364. s.AudioReader.ConfSeq = s.AudioReader.Track.SequenceHeadSeq
  365. sendAudioDecConf()
  366. }
  367. if hasVideo && videoFrame != nil {
  368. if util.Conditoinal(conf.SyncMode == 0, audioFrame.Timestamp > videoFrame.Timestamp, audioFrame.WriteTime.After(videoFrame.WriteTime)) {
  369. //fmt.Println("SEND VIDEO FRAM 111111111111111111")
  370. sendVideoFrame(videoFrame)
  371. videoFrame = nil
  372. break
  373. }
  374. }
  375. if audioFrame.Timestamp >= s.AudioReader.SkipTs {
  376. //fmt.Println("SEND sendAudioFrame")
  377. sendAudioFrame(audioFrame)
  378. } else {
  379. // fmt.Println("skip audio", frame.AbsTime, s.AudioReader.SkipTs)
  380. }
  381. }
  382. }
  383. }
  384. if videoFrame != nil {
  385. videoFrame.ReaderLeave()
  386. }
  387. if audioFrame != nil {
  388. audioFrame.ReaderLeave()
  389. }
  390. stopReason = zap.Error(ctx.Err())
  391. return
  392. }
  393. func (s *Subscriber) onStop(reason *zapcore.Field) {
  394. if !s.Stream.IsClosed() {
  395. s.Info("play stop", *reason)
  396. if !s.Config.Internal {
  397. s.Stream.Receive(s.Spesific)
  398. }
  399. }
  400. }