main.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. package hdl // import "m7s.live/plugin/hdl/v4"
  2. import (
  3. "net"
  4. "net/http"
  5. "strings"
  6. "time"
  7. "go.uber.org/zap"
  8. . "m7s.live/engine/v4"
  9. "m7s.live/engine/v4/codec"
  10. "m7s.live/engine/v4/config"
  11. "m7s.live/engine/v4/util"
  12. )
  13. type HDLConfig struct {
  14. config.HTTP
  15. config.Publish
  16. config.Subscribe
  17. config.Pull
  18. }
  19. func pull(streamPath, url string) {
  20. if err := HDLPlugin.Pull(streamPath, url, NewHDLPuller(), 0); err != nil {
  21. HDLPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
  22. }
  23. }
  24. func (c *HDLConfig) OnEvent(event any) {
  25. switch v := event.(type) {
  26. case FirstConfig:
  27. for streamPath, url := range c.PullOnStart {
  28. pull(streamPath, url)
  29. }
  30. case InvitePublish: //按需拉流
  31. if remoteURL := c.CheckPullOnSub(v.Target); remoteURL != "" {
  32. pull(v.Target, remoteURL)
  33. }
  34. }
  35. }
  36. func str2number(s string) int {
  37. switch s {
  38. case "1":
  39. return 1
  40. case "2":
  41. return 2
  42. default:
  43. return 0
  44. }
  45. }
  46. func (c *HDLConfig) API_Pull(rw http.ResponseWriter, r *http.Request) {
  47. err := HDLPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), NewHDLPuller(), str2number(r.URL.Query().Get("save")))
  48. if err != nil {
  49. util.ReturnError(util.APIErrorPublish, err.Error(), rw, r)
  50. } else {
  51. util.ReturnOK(rw, r)
  52. }
  53. }
  54. func (*HDLConfig) API_List(rw http.ResponseWriter, r *http.Request) {
  55. util.ReturnFetchValue(FilterStreams[*HDLPuller], rw, r)
  56. }
  57. // 确保HDLConfig实现了PullPlugin接口
  58. var hdlConfig = new(HDLConfig)
  59. var HDLPlugin = InstallPlugin(hdlConfig)
  60. type HDLSubscriber struct {
  61. Subscriber
  62. }
  63. func (sub *HDLSubscriber) OnEvent(event any) {
  64. switch v := event.(type) {
  65. case FLVFrame:
  66. // t := time.Now()
  67. // s := util.SizeOfBuffers(v)
  68. if hdlConfig.WriteTimeout > 0 {
  69. if conn, ok := sub.Writer.(net.Conn); ok {
  70. conn.SetWriteDeadline(time.Now().Add(hdlConfig.WriteTimeout))
  71. }
  72. }
  73. if _, err := v.WriteTo(sub); err != nil {
  74. sub.Stop(zap.Error(err))
  75. // } else {
  76. // println(time.Since(t)/time.Millisecond, s)
  77. }
  78. default:
  79. sub.Subscriber.OnEvent(event)
  80. }
  81. }
  82. func (sub *HDLSubscriber) WriteFlvHeader() {
  83. at, vt := sub.Audio, sub.Video
  84. hasAudio, hasVideo := at != nil, vt != nil
  85. var amf util.AMF
  86. amf.Marshal("onMetaData")
  87. metaData := util.EcmaArray{
  88. "MetaDataCreator": "m7s" + Engine.Version,
  89. "hasVideo": hasVideo,
  90. "hasAudio": hasAudio,
  91. "hasMatadata": true,
  92. "canSeekToEnd": false,
  93. "duration": 0,
  94. "hasKeyFrames": 0,
  95. "framerate": 0,
  96. "videodatarate": 0,
  97. "filesize": 0,
  98. }
  99. var flags byte
  100. if hasAudio {
  101. flags |= (1 << 2)
  102. metaData["audiocodecid"] = int(at.CodecID)
  103. metaData["audiosamplerate"] = at.SampleRate
  104. metaData["audiosamplesize"] = at.SampleSize
  105. metaData["stereo"] = at.Channels == 2
  106. }
  107. if hasVideo {
  108. flags |= 1
  109. metaData["videocodecid"] = int(vt.CodecID)
  110. metaData["width"] = vt.SPSInfo.Width
  111. metaData["height"] = vt.SPSInfo.Height
  112. }
  113. amf.Marshal(metaData)
  114. // 写入FLV头
  115. sub.Write([]byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9, 0, 0, 0, 0})
  116. codec.WriteFLVTag(sub, codec.FLV_TAG_TYPE_SCRIPT, 0, amf.Buffer)
  117. }
  118. func (c *HDLConfig) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  119. streamPath := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), ".flv")
  120. if r.URL.RawQuery != "" {
  121. streamPath += "?" + r.URL.RawQuery
  122. }
  123. sub := &HDLSubscriber{}
  124. sub.ID = r.RemoteAddr
  125. sub.SetParentCtx(r.Context())
  126. sub.SetIO(w)
  127. if err := HDLPlugin.Subscribe(streamPath, sub); err != nil {
  128. http.Error(w, err.Error(), http.StatusBadRequest)
  129. } else {
  130. w.Header().Set("Content-Type", "video/x-flv")
  131. w.Header().Set("Transfer-Encoding", "identity")
  132. w.WriteHeader(http.StatusOK)
  133. if hijacker, ok := w.(http.Hijacker); ok && c.WriteTimeout > 0 {
  134. conn, _, _ := hijacker.Hijack()
  135. conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout))
  136. sub.SetIO(conn)
  137. } else {
  138. w.(http.Flusher).Flush()
  139. }
  140. sub.WriteFlvHeader()
  141. sub.PlayBlock(SUBTYPE_FLV)
  142. }
  143. }