main.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package engine // import "m7s.live/engine/v4"
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "runtime"
  13. "strings"
  14. "time"
  15. "github.com/denisbrodbeck/machineid"
  16. "github.com/google/uuid"
  17. . "github.com/logrusorgru/aurora/v4"
  18. "go.uber.org/zap"
  19. "go.uber.org/zap/zapcore"
  20. "gopkg.in/yaml.v3"
  21. "m7s.live/engine/v4/lang"
  22. "m7s.live/engine/v4/log"
  23. "m7s.live/engine/v4/util"
  24. )
  25. var (
  26. SysInfo struct {
  27. StartTime time.Time //启动时间
  28. LocalIP string
  29. Version string
  30. }
  31. ExecPath = os.Args[0]
  32. ExecDir = filepath.Dir(ExecPath)
  33. // ConfigRaw 配置信息的原始数据
  34. ConfigRaw []byte
  35. Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置
  36. plugins []*Plugin //插件列表
  37. EngineConfig = &GlobalConfig{}
  38. Engine = InstallPlugin(EngineConfig)
  39. SettingDir = filepath.Join(ExecDir, ".m7s") //配置缓存目录,该目录按照插件名称作为文件名存储修改过的配置
  40. MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置
  41. EventBus chan any
  42. apiList []string //注册到引擎的API接口列表
  43. )
  44. func init() {
  45. if setting_dir := os.Getenv("M7S_SETTING_DIR"); setting_dir != "" {
  46. SettingDir = setting_dir
  47. }
  48. if conn, err := net.Dial("udp", "114.114.114.114:80"); err == nil {
  49. SysInfo.LocalIP, _, _ = strings.Cut(conn.LocalAddr().String(), ":")
  50. }
  51. }
  52. // Run 启动Monibuca引擎,传入总的Context,可用于关闭所有
  53. func Run(ctx context.Context, conf any) (err error) {
  54. id, _ := machineid.ProtectedID("monibuca")
  55. SysInfo.StartTime = time.Now()
  56. SysInfo.Version = Engine.Version
  57. Engine.Context = ctx
  58. var cg map[string]map[string]any
  59. switch v := conf.(type) {
  60. case string:
  61. if _, err = os.Stat(v); err != nil {
  62. v = filepath.Join(ExecDir, v)
  63. }
  64. if ConfigRaw, err = os.ReadFile(v); err != nil {
  65. log.Warn("read config file error:", err.Error())
  66. }
  67. case []byte:
  68. ConfigRaw = v
  69. case map[string]map[string]any:
  70. cg = v
  71. }
  72. if err = util.CreateShutdownScript(); err != nil {
  73. log.Error("create shutdown script error:", err)
  74. }
  75. if err = os.MkdirAll(SettingDir, 0766); err != nil {
  76. log.Error("create dir .m7s error:", err)
  77. return
  78. }
  79. log.Info("Ⓜ starting engine:", Blink(Engine.Version))
  80. if ConfigRaw != nil {
  81. if err = yaml.Unmarshal(ConfigRaw, &cg); err != nil {
  82. log.Error("parsing yml error:", err)
  83. }
  84. }
  85. Engine.RawConfig.Parse(&EngineConfig.Engine, "GLOBAL")
  86. if cg != nil {
  87. Engine.RawConfig.ParseUserFile(cg["global"])
  88. }
  89. var logger log.Logger
  90. log.LocaleLogger = logger.Lang(lang.Get(EngineConfig.LogLang))
  91. if EngineConfig.LogLevel == "trace" {
  92. log.Trace = true
  93. log.LogLevel.SetLevel(zap.DebugLevel)
  94. } else {
  95. loglevel, err := zapcore.ParseLevel(EngineConfig.LogLevel)
  96. if err != nil {
  97. logger.Error("parse log level error:", zap.Error(err))
  98. loglevel = zapcore.InfoLevel
  99. }
  100. log.LogLevel.SetLevel(loglevel)
  101. }
  102. Engine.Logger = log.LocaleLogger.Named("engine")
  103. Engine.assign()
  104. Engine.Logger.Debug("", zap.Any("config", EngineConfig))
  105. util.PoolSize = EngineConfig.PoolSize
  106. EventBus = make(chan any, EngineConfig.EventBusSize)
  107. go EngineConfig.Listen(Engine)
  108. for _, plugin := range plugins {
  109. plugin.Logger = log.LocaleLogger.Named(plugin.Name)
  110. if os.Getenv(strings.ToUpper(plugin.Name)+"_ENABLE") == "false" {
  111. plugin.Disabled = true
  112. plugin.Warn("disabled by env")
  113. continue
  114. }
  115. plugin.Info("initialize", zap.String("version", plugin.Version))
  116. plugin.RawConfig.Parse(plugin.Config, strings.ToUpper(plugin.Name))
  117. for _, fname := range MergeConfigs {
  118. if name := strings.ToLower(fname); plugin.RawConfig.Has(name) {
  119. plugin.RawConfig.Get(name).ParseGlobal(Engine.RawConfig.Get(name))
  120. }
  121. }
  122. var userConfig map[string]any
  123. if plugin.defaultYaml != "" {
  124. if err := yaml.Unmarshal([]byte(plugin.defaultYaml), &userConfig); err != nil {
  125. log.Error("parsing default config error:", err)
  126. } else {
  127. plugin.RawConfig.ParseDefaultYaml(userConfig)
  128. }
  129. }
  130. userConfig = cg[strings.ToLower(plugin.Name)]
  131. plugin.RawConfig.ParseUserFile(userConfig)
  132. if EngineConfig.DisableAll {
  133. plugin.Disabled = true
  134. }
  135. if userConfig["enable"] == false {
  136. plugin.Disabled = true
  137. } else if userConfig["enable"] == true {
  138. plugin.Disabled = false
  139. }
  140. if plugin.Disabled {
  141. plugin.Warn("plugin disabled")
  142. } else {
  143. plugin.assign()
  144. }
  145. }
  146. UUID := uuid.NewString()
  147. contentBuf := bytes.NewBuffer(nil)
  148. req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "https://console.monibuca.com/report", nil)
  149. req.Header.Set("Content-Type", "application/json")
  150. version := Engine.Version
  151. if ver, ok := ctx.Value("version").(string); ok && ver != "" && ver != "dev" {
  152. version = ver
  153. }
  154. if EngineConfig.LogLang == "zh" {
  155. log.Info("monibuca ", version, Green(" 启动成功"))
  156. } else {
  157. log.Info("monibuca ", version, Green(" start success"))
  158. }
  159. var enabledPlugins, disabledPlugins []*Plugin
  160. for _, plugin := range plugins {
  161. if plugin.Disabled {
  162. disabledPlugins = append(disabledPlugins, plugin)
  163. } else {
  164. enabledPlugins = append(enabledPlugins, plugin)
  165. }
  166. }
  167. if EngineConfig.LogLang == "zh" {
  168. fmt.Print("已运行的插件:")
  169. } else {
  170. fmt.Print("enabled plugins:")
  171. }
  172. for _, plugin := range enabledPlugins {
  173. fmt.Print(Colorize(" "+plugin.Name+" ", BlackFg|GreenBg|BoldFm), " ")
  174. }
  175. fmt.Println()
  176. if EngineConfig.LogLang == "zh" {
  177. fmt.Print("已禁用的插件:")
  178. } else {
  179. fmt.Print("disabled plugins:")
  180. }
  181. for _, plugin := range disabledPlugins {
  182. fmt.Print(Colorize(" "+plugin.Name+" ", BlackFg|RedBg|CrossedOutFm), " ")
  183. }
  184. fmt.Println()
  185. if EngineConfig.LogLang == "zh" {
  186. fmt.Println(Cyan("🌏 官网地址: ").Bold(), Yellow("https://monibuca.com"))
  187. fmt.Println(Cyan("🔥 启动工程: ").Bold(), Yellow("https://github.com/langhuihui/monibuca"))
  188. fmt.Println(Cyan("📄 文档地址: ").Bold(), Yellow("https://monibuca.com/docs/index.html"))
  189. fmt.Println(Cyan("🎞 视频教程: ").Bold(), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619"))
  190. fmt.Println(Cyan("🖥 远程界面: ").Bold(), Yellow("https://console.monibuca.com"))
  191. fmt.Println(Yellow("关注公众号:不卡科技,获取更多信息"))
  192. } else {
  193. fmt.Println(Cyan("🌏 WebSite: ").Bold(), Yellow("https://m7s.live"))
  194. fmt.Println(Cyan("🔥 Github: ").Bold(), Yellow("https://github.com/langhuihui/monibuca"))
  195. fmt.Println(Cyan("📄 Docs: ").Bold(), Yellow("https://docs.m7s.live"))
  196. fmt.Println(Cyan("🎞 Videos: ").Bold(), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619"))
  197. fmt.Println(Cyan("🖥 Console: ").Bold(), Yellow("https://console.monibuca.com"))
  198. }
  199. rp := struct {
  200. UUID string `json:"uuid"`
  201. Machine string `json:"machine"`
  202. Instance string `json:"instance"`
  203. Version string `json:"version"`
  204. OS string `json:"os"`
  205. Arch string `json:"arch"`
  206. }{UUID, id, EngineConfig.GetInstanceId(), version, runtime.GOOS, runtime.GOARCH}
  207. json.NewEncoder(contentBuf).Encode(&rp)
  208. req.Body = io.NopCloser(contentBuf)
  209. EngineConfig.OnEvent(ctx)
  210. go func() {
  211. var c http.Client
  212. reportTimer := time.NewTimer(time.Minute)
  213. c.Do(req)
  214. for {
  215. <-reportTimer.C
  216. contentBuf.Reset()
  217. contentBuf.WriteString(fmt.Sprintf(`{"uuid":"`+UUID+`","streams":%d}`, Streams.Len()))
  218. req.Body = io.NopCloser(contentBuf)
  219. c.Do(req)
  220. reportTimer.Reset(time.Minute)
  221. }
  222. }()
  223. for _, plugin := range enabledPlugins {
  224. plugin.Config.OnEvent(EngineConfig) //引擎初始化完成后,通知插件
  225. }
  226. for {
  227. select {
  228. case event := <-EventBus:
  229. ts := time.Now()
  230. for _, plugin := range enabledPlugins {
  231. ts := time.Now()
  232. plugin.Config.OnEvent(event)
  233. if cost := time.Since(ts); cost > time.Millisecond*100 {
  234. plugin.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost))
  235. }
  236. }
  237. EngineConfig.OnEvent(event)
  238. if cost := time.Since(ts); cost > time.Millisecond*100 {
  239. log.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost))
  240. }
  241. case <-ctx.Done():
  242. return
  243. }
  244. }
  245. }