main.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package hdl // import "m7s.live/plugin/hdl/v4"
  2. import (
  3. "net"
  4. "net/http"
  5. "strings"
  6. "time"
  7. //"fmt"
  8. "go.uber.org/zap"
  9. . "m7s.live/engine/v4"
  10. "m7s.live/engine/v4/codec"
  11. "m7s.live/engine/v4/config"
  12. "m7s.live/engine/v4/util"
  13. )
  14. type HDLConfig struct {
  15. config.HTTP
  16. config.Publish
  17. config.Subscribe
  18. config.Pull
  19. }
  20. func pull(streamPath, url string) {
  21. if err := HDLPlugin.Pull(streamPath, url, NewHDLPuller(), 0); err != nil {
  22. HDLPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
  23. }
  24. }
  25. func (c *HDLConfig) OnEvent(event any) {
  26. switch v := event.(type) {
  27. case FirstConfig:
  28. for streamPath, url := range c.PullOnStart {
  29. pull(streamPath, url)
  30. }
  31. case InvitePublish: //按需拉流
  32. if remoteURL := c.CheckPullOnSub(v.Target); remoteURL != "" {
  33. pull(v.Target, remoteURL)
  34. }
  35. }
  36. }
  37. func str2number(s string) int {
  38. switch s {
  39. case "1":
  40. return 1
  41. case "2":
  42. return 2
  43. default:
  44. return 0
  45. }
  46. }
  47. func (c *HDLConfig) API_Pull(rw http.ResponseWriter, r *http.Request) {
  48. err := HDLPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), NewHDLPuller(), str2number(r.URL.Query().Get("save")))
  49. if err != nil {
  50. util.ReturnError(util.APIErrorPublish, err.Error(), rw, r)
  51. } else {
  52. util.ReturnOK(rw, r)
  53. }
  54. }
  55. func (*HDLConfig) API_List(rw http.ResponseWriter, r *http.Request) {
  56. util.ReturnFetchValue(FilterStreams[*HDLPuller], rw, r)
  57. }
  58. // 确保HDLConfig实现了PullPlugin接口
  59. var hdlConfig = new(HDLConfig)
  60. var HDLPlugin = InstallPlugin(hdlConfig)
  61. type HDLSubscriber struct {
  62. Subscriber
  63. }
  64. func (sub *HDLSubscriber) OnEvent(event any) {
  65. switch v := event.(type) {
  66. case FLVFrame:
  67. // t := time.Now()
  68. // s := util.SizeOfBuffers(v)
  69. if hdlConfig.WriteTimeout > 0 {
  70. if conn, ok := sub.Writer.(net.Conn); ok {
  71. conn.SetWriteDeadline(time.Now().Add(hdlConfig.WriteTimeout))
  72. }
  73. }
  74. if _, err := v.WriteTo(sub); err != nil {
  75. sub.Stop(zap.Error(err))
  76. // } else {
  77. // println(time.Since(t)/time.Millisecond, s)
  78. }
  79. default:
  80. sub.Subscriber.OnEvent(event)
  81. }
  82. }
  83. func (sub *HDLSubscriber) WriteFlvHeader() {
  84. at, vt := sub.Audio, sub.Video
  85. hasAudio, hasVideo := at != nil, vt != nil
  86. var amf util.AMF
  87. amf.Marshal("onMetaData")
  88. metaData := util.EcmaArray{
  89. "MetaDataCreator": "m7s" + Engine.Version,
  90. "hasVideo": hasVideo,
  91. "hasAudio": hasAudio,
  92. "hasMatadata": true,
  93. "canSeekToEnd": false,
  94. "duration": 0,
  95. "hasKeyFrames": 0,
  96. "framerate": 0,
  97. "videodatarate": 0,
  98. "filesize": 0,
  99. }
  100. var flags byte
  101. if hasAudio {
  102. flags |= (1 << 2)
  103. metaData["audiocodecid"] = int(at.CodecID)
  104. metaData["audiosamplerate"] = at.SampleRate
  105. metaData["audiosamplesize"] = at.SampleSize
  106. metaData["stereo"] = at.Channels == 2
  107. }
  108. if hasVideo {
  109. flags |= 1
  110. metaData["videocodecid"] = int(vt.CodecID)
  111. metaData["width"] = vt.SPSInfo.Width
  112. metaData["height"] = vt.SPSInfo.Height
  113. }
  114. amf.Marshal(metaData)
  115. // 写入FLV头
  116. sub.Write([]byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9, 0, 0, 0, 0})
  117. codec.WriteFLVTag(sub, codec.FLV_TAG_TYPE_SCRIPT, 0, amf.Buffer)
  118. }
  119. func (c *HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  120. streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".flv")
  121. if r.URL.RawQuery != "" {
  122. streamPath += "?" + r.URL.RawQuery
  123. }
  124. sub := &HDLSubscriber{}
  125. sub.ID = r.RemoteAddr
  126. sub.SetParentCtx(r.Context())
  127. sub.SetIO(w)
  128. if err := HDLPlugin.Subscribe(streamPath, sub); err != nil {
  129. http.Error(w, err.Error(), http.StatusBadRequest)
  130. } else {
  131. w.Header().Set("Content-Type", "video/x-flv")
  132. w.Header().Set("Transfer-Encoding", "identity")
  133. w.WriteHeader(http.StatusOK)
  134. if hijacker, ok := w.(http.Hijacker); ok && c.WriteTimeout > 0 {
  135. conn, _, _ := hijacker.Hijack()
  136. conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
  137. sub.SetIO(conn)
  138. } else {
  139. w.(http.Flusher).Flush()
  140. }
  141. sub.WriteFlvHeader()
  142. sub.PlayBlock(SUBTYPE_FLV)
  143. }
  144. }