http.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package engine
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "go.uber.org/zap"
  10. "gopkg.in/yaml.v3"
  11. "m7s.live/engine/v4/codec"
  12. "m7s.live/engine/v4/config"
  13. "m7s.live/engine/v4/util"
  14. )
  15. const (
  16. NO_SUCH_CONIFG = "no such config"
  17. NO_SUCH_STREAM = "no such stream"
  18. )
  19. type GlobalConfig struct {
  20. config.Engine
  21. }
  22. func (conf *GlobalConfig) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
  23. if r.URL.Path == "/favicon.ico" {
  24. http.ServeFile(rw, r, "favicon.ico")
  25. return
  26. }
  27. fmt.Fprintf(rw, "Monibuca Engine %s StartTime:%s\n", SysInfo.Version, SysInfo.StartTime)
  28. for _, plugin := range Plugins {
  29. fmt.Fprintf(rw, "Plugin %s Version:%s\n", plugin.Name, plugin.Version)
  30. }
  31. for _, api := range apiList {
  32. fmt.Fprintf(rw, "%s\n", api)
  33. }
  34. }
  35. func (conf *GlobalConfig) API_summary(rw http.ResponseWriter, r *http.Request) {
  36. util.ReturnValue(&summary, rw, r)
  37. }
  38. func (conf *GlobalConfig) API_plugins(rw http.ResponseWriter, r *http.Request) {
  39. util.ReturnValue(Plugins, rw, r)
  40. }
  41. func (conf *GlobalConfig) API_stream(rw http.ResponseWriter, r *http.Request) {
  42. if streamPath := r.URL.Query().Get("streamPath"); streamPath != "" {
  43. if s := Streams.Get(streamPath); s != nil {
  44. util.ReturnValue(s, rw, r)
  45. } else {
  46. util.ReturnError(util.APIErrorNoStream, NO_SUCH_STREAM, rw, r)
  47. }
  48. } else {
  49. util.ReturnError(util.APIErrorNoStream, "no streamPath", rw, r)
  50. }
  51. }
  52. func (conf *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request) {
  53. util.ReturnValue(&SysInfo, rw, r)
  54. }
  55. func (conf *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request) {
  56. if streamPath := r.URL.Query().Get("streamPath"); streamPath != "" {
  57. if s := Streams.Get(streamPath); s != nil {
  58. s.Close()
  59. util.ReturnOK(w, r)
  60. } else {
  61. util.ReturnError(util.APIErrorNoStream, NO_SUCH_STREAM, w, r)
  62. }
  63. } else {
  64. util.ReturnError(util.APIErrorNoStream, "no streamPath", w, r)
  65. }
  66. }
  67. // API_getConfig 获取指定的配置信息
  68. func (conf *GlobalConfig) API_getConfig(w http.ResponseWriter, r *http.Request) {
  69. var p *Plugin
  70. var q = r.URL.Query()
  71. if configName := q.Get("name"); configName != "" {
  72. if c, ok := Plugins[configName]; ok {
  73. p = c
  74. } else {
  75. util.ReturnError(util.APIErrorNoConfig, NO_SUCH_CONIFG, w, r)
  76. return
  77. }
  78. } else {
  79. p = Engine
  80. }
  81. var data any
  82. if q.Get("yaml") != "" {
  83. var tmp struct {
  84. File string
  85. Modified string
  86. Merged string
  87. }
  88. mm, err := yaml.Marshal(p.RawConfig.File)
  89. if err == nil {
  90. tmp.File = string(mm)
  91. }
  92. mm, err = yaml.Marshal(p.RawConfig.Modify)
  93. if err == nil {
  94. tmp.Modified = string(mm)
  95. }
  96. mm, err = yaml.Marshal(p.RawConfig.GetMap())
  97. if err == nil {
  98. tmp.Merged = string(mm)
  99. }
  100. data = &tmp
  101. } else if q.Get("formily") != "" {
  102. data = p.RawConfig.GetFormily()
  103. } else {
  104. data = &p.RawConfig
  105. }
  106. util.ReturnValue(data, w, r)
  107. }
  108. // API_modifyConfig 修改并保存配置
  109. func (conf *GlobalConfig) API_modifyConfig(w http.ResponseWriter, r *http.Request) {
  110. var p *Plugin
  111. var q = r.URL.Query()
  112. var err error
  113. if configName := q.Get("name"); configName != "" {
  114. if c, ok := Plugins[configName]; ok {
  115. p = c
  116. } else {
  117. util.ReturnError(util.APIErrorNoConfig, NO_SUCH_CONIFG, w, r)
  118. return
  119. }
  120. } else {
  121. p = Engine
  122. }
  123. var modified map[string]any
  124. if q.Get("yaml") != "" {
  125. err = yaml.NewDecoder(r.Body).Decode(&modified)
  126. } else {
  127. err = json.NewDecoder(r.Body).Decode(&modified)
  128. }
  129. if err != nil {
  130. util.ReturnError(util.APIErrorDecode, err.Error(), w, r)
  131. return
  132. }
  133. p.RawConfig.ParseModifyFile(modified)
  134. if err = p.Save(); err != nil {
  135. util.ReturnError(util.APIErrorSave, err.Error(), w, r)
  136. return
  137. }
  138. util.ReturnOK(w, r)
  139. }
  140. // API_updateConfig 热更新配置
  141. func (conf *GlobalConfig) API_updateConfig(w http.ResponseWriter, r *http.Request) {
  142. var p *Plugin
  143. var q = r.URL.Query()
  144. if configName := q.Get("name"); configName != "" {
  145. if c, ok := Plugins[configName]; ok {
  146. p = c
  147. } else {
  148. util.ReturnError(util.APIErrorNoConfig, NO_SUCH_CONIFG, w, r)
  149. return
  150. }
  151. } else {
  152. p = Engine
  153. }
  154. var err error
  155. var modified map[string]any
  156. if q.Get("yaml") != "" {
  157. err = yaml.NewDecoder(r.Body).Decode(&modified)
  158. } else {
  159. err = json.NewDecoder(r.Body).Decode(&modified)
  160. }
  161. if err != nil {
  162. util.ReturnError(util.APIErrorDecode, err.Error(), w, r)
  163. return
  164. }
  165. p.RawConfig.ParseModifyFile(modified)
  166. if err = p.Save(); err != nil {
  167. util.ReturnError(util.APIErrorSave, err.Error(), w, r)
  168. return
  169. }
  170. p.Update(&p.RawConfig)
  171. util.ReturnOK(w, r)
  172. }
  173. func (conf *GlobalConfig) API_list_pull(w http.ResponseWriter, r *http.Request) {
  174. util.ReturnFetchValue(func() (result []any) {
  175. Pullers.Range(func(key, value any) bool {
  176. result = append(result, value)
  177. return true
  178. })
  179. return
  180. }, w, r)
  181. }
  182. func (conf *GlobalConfig) API_list_push(w http.ResponseWriter, r *http.Request) {
  183. util.ReturnFetchValue(func() (result []any) {
  184. Pushers.Range(func(key, value any) bool {
  185. result = append(result, value)
  186. return true
  187. })
  188. return
  189. }, w, r)
  190. }
  191. func (conf *GlobalConfig) API_stop_push(w http.ResponseWriter, r *http.Request) {
  192. q := r.URL.Query()
  193. pusher, ok := Pushers.Load(q.Get("url"))
  194. if ok {
  195. pusher.(IPusher).Stop()
  196. util.ReturnOK(w, r)
  197. } else {
  198. util.ReturnError(util.APIErrorNoPusher, "no such pusher", w, r)
  199. }
  200. }
  201. func (conf *GlobalConfig) API_stop_subscribe(w http.ResponseWriter, r *http.Request) {
  202. q := r.URL.Query()
  203. streamPath := q.Get("streamPath")
  204. id := q.Get("id")
  205. s := Streams.Get(streamPath)
  206. if s == nil {
  207. util.ReturnError(util.APIErrorNoStream, NO_SUCH_STREAM, w, r)
  208. return
  209. }
  210. suber := s.Subscribers.Find(id)
  211. if suber == nil {
  212. util.ReturnError(util.APIErrorNoSubscriber, "no such subscriber", w, r)
  213. return
  214. }
  215. suber.Stop(zap.String("reason", "stop by api"))
  216. util.ReturnOK(w, r)
  217. }
  218. func (conf *GlobalConfig) API_replay_rtpdump(w http.ResponseWriter, r *http.Request) {
  219. q := r.URL.Query()
  220. streamPath := q.Get("streamPath")
  221. if streamPath == "" {
  222. streamPath = "dump/rtsp"
  223. }
  224. dumpFile := q.Get("dump")
  225. if dumpFile == "" {
  226. dumpFile = streamPath + ".rtpdump"
  227. }
  228. cv := q.Get("vcodec")
  229. ca := q.Get("acodec")
  230. cvp := q.Get("vpayload")
  231. cap := q.Get("apayload")
  232. var pub RTPDumpPublisher
  233. i, _ := strconv.ParseInt(cvp, 10, 64)
  234. pub.VPayloadType = byte(i)
  235. i, _ = strconv.ParseInt(cap, 10, 64)
  236. pub.APayloadType = byte(i)
  237. switch cv {
  238. case "h264":
  239. pub.VCodec = codec.CodecID_H264
  240. case "h265":
  241. pub.VCodec = codec.CodecID_H265
  242. }
  243. switch ca {
  244. case "aac":
  245. pub.ACodec = codec.CodecID_AAC
  246. case "pcma":
  247. pub.ACodec = codec.CodecID_PCMA
  248. case "pcmu":
  249. pub.ACodec = codec.CodecID_PCMU
  250. }
  251. ss := strings.Split(dumpFile, ",")
  252. if len(ss) > 1 {
  253. if err := Engine.Publish(streamPath, &pub); err != nil {
  254. util.ReturnError(util.APIErrorPublish, err.Error(), w, r)
  255. } else {
  256. for _, s := range ss {
  257. f, err := os.Open(s)
  258. if err != nil {
  259. util.ReturnError(util.APIErrorOpen, err.Error(), w, r)
  260. return
  261. }
  262. go pub.Feed(f)
  263. }
  264. util.ReturnOK(w, r)
  265. }
  266. } else {
  267. f, err := os.Open(dumpFile)
  268. if err != nil {
  269. util.ReturnError(util.APIErrorOpen, err.Error(), w, r)
  270. return
  271. }
  272. if err := Engine.Publish(streamPath, &pub); err != nil {
  273. util.ReturnError(util.APIErrorPublish, err.Error(), w, r)
  274. } else {
  275. pub.SetIO(f)
  276. util.ReturnOK(w, r)
  277. go pub.Feed(f)
  278. }
  279. }
  280. }
  281. func (conf *GlobalConfig) API_replay_ts(w http.ResponseWriter, r *http.Request) {
  282. q := r.URL.Query()
  283. streamPath := q.Get("streamPath")
  284. if streamPath == "" {
  285. streamPath = "dump/ts"
  286. }
  287. dumpFile := q.Get("dump")
  288. if dumpFile == "" {
  289. dumpFile = streamPath + ".ts"
  290. }
  291. f, err := os.Open(dumpFile)
  292. if err != nil {
  293. util.ReturnError(util.APIErrorOpen, err.Error(), w, r)
  294. return
  295. }
  296. var pub TSPublisher
  297. if err := Engine.Publish(streamPath, &pub); err != nil {
  298. util.ReturnError(util.APIErrorPublish, err.Error(), w, r)
  299. } else {
  300. tsReader := NewTSReader(&pub)
  301. pub.SetIO(f)
  302. go func() {
  303. tsReader.Feed(f)
  304. tsReader.Close()
  305. }()
  306. util.ReturnOK(w, r)
  307. }
  308. }
  309. func (conf *GlobalConfig) API_replay_mp4(w http.ResponseWriter, r *http.Request) {
  310. q := r.URL.Query()
  311. streamPath := q.Get("streamPath")
  312. if streamPath == "" {
  313. streamPath = "dump/mp4"
  314. }
  315. dumpFile := q.Get("dump")
  316. if dumpFile == "" {
  317. dumpFile = streamPath + ".mp4"
  318. }
  319. var pub MP4Publisher
  320. f, err := os.Open(dumpFile)
  321. if err != nil {
  322. util.ReturnError(util.APIErrorOpen, err.Error(), w, r)
  323. return
  324. }
  325. if err := Engine.Publish(streamPath, &pub); err != nil {
  326. util.ReturnError(util.APIErrorPublish, err.Error(), w, r)
  327. } else {
  328. pub.SetIO(f)
  329. util.ReturnOK(w, r)
  330. go pub.ReadMP4Data(f)
  331. }
  332. }