subscriber.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package jessica
  2. import (
  3. "encoding/binary"
  4. "net"
  5. "net/http"
  6. "path"
  7. "strings"
  8. "time"
  9. "github.com/gobwas/ws"
  10. "github.com/gobwas/ws/wsutil"
  11. "github.com/pion/rtp"
  12. "go.uber.org/zap"
  13. . "m7s.live/engine/v4"
  14. "m7s.live/engine/v4/track"
  15. "m7s.live/engine/v4/util"
  16. )
  17. type JessicaBase struct {
  18. Subscriber
  19. IsWebSocket bool
  20. }
  21. type JessicaSubscriber struct {
  22. JessicaBase
  23. head []byte
  24. }
  25. func (j *JessicaSubscriber) WriteAVCC(typ byte, ts uint32, avcc ...[]byte) {
  26. j.head[0] = typ
  27. binary.BigEndian.PutUint32(j.head[1:], ts)
  28. err := ws.WriteHeader(j, ws.Header{
  29. Fin: true,
  30. OpCode: ws.OpBinary,
  31. Length: int64(util.SizeOfBuffers(avcc) + 5),
  32. })
  33. defer func() {
  34. if err != nil {
  35. j.Stop(zap.Error(err))
  36. }
  37. }()
  38. if err != nil {
  39. return
  40. }
  41. var clone net.Buffers
  42. clone = append(append(clone, j.head), avcc...)
  43. if jessicaConfig.WriteTimeout > 0 {
  44. j.Writer.(net.Conn).SetWriteDeadline(time.Now().Add(jessicaConfig.WriteTimeout))
  45. }
  46. _, err = clone.WriteTo(j.Writer)
  47. }
  48. func (j *JessicaSubscriber) OnEvent(event any) {
  49. switch v := event.(type) {
  50. case AudioDeConf:
  51. j.WriteAVCC(1, 0, v)
  52. case VideoDeConf:
  53. j.WriteAVCC(2, 0, v)
  54. case AudioFrame:
  55. j.WriteAVCC(1, v.AbsTime, v.AVCC.ToBuffers()...)
  56. case VideoFrame:
  57. j.WriteAVCC(2, v.AbsTime, v.AVCC.ToBuffers()...)
  58. default:
  59. j.Subscriber.OnEvent(event)
  60. }
  61. }
  62. func (j *JessicaConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  63. ext := path.Ext(r.URL.Path)
  64. streamPath := strings.TrimPrefix(r.URL.Path, "/")
  65. streamPath = strings.TrimSuffix(streamPath, ext)
  66. if r.URL.RawQuery != "" {
  67. streamPath += "?" + r.URL.RawQuery
  68. }
  69. baseStream := JessicaBase{}
  70. var conn net.Conn
  71. var err error
  72. if r.Header.Get("Upgrade") == "websocket" {
  73. baseStream.IsWebSocket = true
  74. conn, _, _, err = ws.UpgradeHTTP(r, w)
  75. if err != nil {
  76. return
  77. }
  78. } else {
  79. if ext == ".flv" {
  80. w.Header().Set("Content-Type", "video/x-flv")
  81. } else {
  82. w.Header().Set("Content-Type", "application/octet-stream")
  83. }
  84. w.Header().Set("Transfer-Encoding", "identity")
  85. w.WriteHeader(http.StatusOK)
  86. if hijacker, ok := w.(http.Hijacker); ok && j.WriteTimeout > 0 {
  87. conn, _, _ = hijacker.Hijack()
  88. conn.SetWriteDeadline(time.Now().Add(j.WriteTimeout))
  89. } else {
  90. w.(http.Flusher).Flush()
  91. }
  92. }
  93. if conn == nil { //注入writer
  94. baseStream.SetIO(w)
  95. } else {
  96. baseStream.SetIO(conn)
  97. }
  98. baseStream.SetParentCtx(r.Context()) //注入context
  99. baseStream.ID = r.RemoteAddr
  100. var specific ISubscriber
  101. copyConfig := *&j.Subscribe
  102. switch ext {
  103. case ".flv":
  104. specific = &JessicaFLV{baseStream}
  105. case ".h264", ".h265":
  106. copyConfig.SubVideoTracks = strings.Split(ext, ".")[1:]
  107. copyConfig.SubAudio = false
  108. baseStream.Config = &copyConfig
  109. specific = &JessicaH26x{baseStream}
  110. default:
  111. specific = &JessicaSubscriber{baseStream, make([]byte, 5)}
  112. }
  113. if err = JessicaPlugin.Subscribe(streamPath, specific); err != nil {
  114. if baseStream.IsWebSocket {
  115. wsutil.WriteServerText(conn, []byte(err.Error()))
  116. } else {
  117. http.Error(w, err.Error(), http.StatusBadRequest)
  118. }
  119. return
  120. }
  121. play := specific.PlayRaw
  122. if ext == ".flv" {
  123. play = specific.PlayFLV
  124. }
  125. defer specific.Stop()
  126. if baseStream.IsWebSocket {
  127. go play()
  128. b, err := wsutil.ReadClientBinary(conn)
  129. var rtpPacket rtp.Packet
  130. if err == nil {
  131. dc := track.NewDataTrack[[]byte]("voice")
  132. dc.Attach(specific.GetSubscriber().Stream)
  133. for err == nil {
  134. err = rtpPacket.Unmarshal(b)
  135. if err == nil {
  136. dc.Push(rtpPacket.Payload)
  137. }
  138. b, err = wsutil.ReadClientBinary(conn)
  139. }
  140. } else {
  141. // baseStream.Error("receive", zap.Error(err))
  142. }
  143. } else {
  144. play()
  145. }
  146. }