subscriber.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package jessica
  2. import (
  3. "encoding/binary"
  4. "net"
  5. "net/http"
  6. "path"
  7. "strings"
  8. "time"
  9. "github.com/gobwas/ws"
  10. "github.com/gobwas/ws/wsutil"
  11. "github.com/pion/rtp"
  12. . "m7s.live/engine/v4"
  13. "m7s.live/engine/v4/track"
  14. "m7s.live/engine/v4/util"
  15. )
  16. type JessicaBase struct {
  17. Subscriber
  18. IsWebSocket bool
  19. }
  20. type JessicaSubscriber struct {
  21. JessicaBase
  22. head []byte
  23. }
  24. func (j *JessicaSubscriber) WriteAVCC(typ byte, ts uint32, avcc ...[]byte) {
  25. j.head[0] = typ
  26. binary.BigEndian.PutUint32(j.head[1:], ts)
  27. err := ws.WriteHeader(j, ws.Header{
  28. Fin: true,
  29. OpCode: ws.OpBinary,
  30. Length: int64(util.SizeOfBuffers(avcc) + 5),
  31. })
  32. defer func() {
  33. if err != nil {
  34. j.Stop()
  35. }
  36. }()
  37. if err != nil {
  38. return
  39. }
  40. var clone net.Buffers
  41. clone = append(append(clone, j.head), avcc...)
  42. if jessicaConfig.WriteTimeout > 0 {
  43. j.Writer.(net.Conn).SetWriteDeadline(time.Now().Add(jessicaConfig.WriteTimeout))
  44. }
  45. _, err = clone.WriteTo(j.Writer)
  46. }
  47. func (j *JessicaSubscriber) OnEvent(event any) {
  48. switch v := event.(type) {
  49. case AudioDeConf:
  50. j.WriteAVCC(1, 0, v)
  51. case VideoDeConf:
  52. j.WriteAVCC(2, 0, v)
  53. case AudioFrame:
  54. j.WriteAVCC(1, v.AbsTime, v.AVCC.ToBuffers()...)
  55. case VideoFrame:
  56. j.WriteAVCC(2, v.AbsTime, v.AVCC.ToBuffers()...)
  57. default:
  58. j.Subscriber.OnEvent(event)
  59. }
  60. }
  61. func (j *JessicaConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  62. ext := path.Ext(r.URL.Path)
  63. streamPath := strings.TrimPrefix(r.URL.Path, "/")
  64. streamPath = strings.TrimSuffix(streamPath, ext)
  65. if r.URL.RawQuery != "" {
  66. streamPath += "?" + r.URL.RawQuery
  67. }
  68. baseStream := JessicaBase{}
  69. var conn net.Conn
  70. var err error
  71. if r.Header.Get("Upgrade") == "websocket" {
  72. baseStream.IsWebSocket = true
  73. conn, _, _, err = ws.UpgradeHTTP(r, w)
  74. if err != nil {
  75. return
  76. }
  77. } else {
  78. if ext == ".flv" {
  79. w.Header().Set("Content-Type", "video/x-flv")
  80. } else {
  81. w.Header().Set("Content-Type", "application/octet-stream")
  82. }
  83. w.Header().Set("Transfer-Encoding", "identity")
  84. w.WriteHeader(http.StatusOK)
  85. if hijacker, ok := w.(http.Hijacker); ok && j.WriteTimeout > 0 {
  86. conn, _, _ = hijacker.Hijack()
  87. conn.SetWriteDeadline(time.Now().Add(j.WriteTimeout))
  88. } else {
  89. w.(http.Flusher).Flush()
  90. }
  91. }
  92. if conn == nil { //注入writer
  93. baseStream.SetIO(w)
  94. } else {
  95. baseStream.SetIO(conn)
  96. }
  97. baseStream.SetParentCtx(r.Context()) //注入context
  98. baseStream.ID = r.RemoteAddr
  99. var specific ISubscriber
  100. copyConfig := *&j.Subscribe
  101. switch ext {
  102. case ".flv":
  103. specific = &JessicaFLV{baseStream}
  104. case ".h264", ".h265":
  105. copyConfig.SubVideoTracks = strings.Split(ext, ".")[1:]
  106. copyConfig.SubAudio = false
  107. baseStream.Config = &copyConfig
  108. specific = &JessicaH26x{baseStream}
  109. default:
  110. specific = &JessicaSubscriber{baseStream, make([]byte, 5)}
  111. }
  112. if err = JessicaPlugin.Subscribe(streamPath, specific); err != nil {
  113. http.Error(w, err.Error(), http.StatusBadRequest)
  114. return
  115. }
  116. play := specific.PlayRaw
  117. if ext == ".flv" {
  118. play = specific.PlayFLV
  119. }
  120. defer specific.Stop()
  121. if baseStream.IsWebSocket {
  122. go play()
  123. b, err := wsutil.ReadClientBinary(conn)
  124. var rtpPacket rtp.Packet
  125. if err == nil {
  126. dc := track.NewDataTrack[[]byte]("voice")
  127. dc.Attach(specific.GetSubscriber().Stream)
  128. for err == nil {
  129. err = rtpPacket.Unmarshal(b)
  130. if err == nil {
  131. dc.Push(rtpPacket.Payload)
  132. }
  133. b, err = wsutil.ReadClientBinary(conn)
  134. }
  135. } else {
  136. // baseStream.Error("receive", zap.Error(err))
  137. }
  138. } else {
  139. play()
  140. }
  141. }