main.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package engine // import "m7s.live/engine/v4"
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "runtime"
  13. "strings"
  14. "time"
  15. "gitlab.sydip.com/repo/gopkgs/cache"
  16. "github.com/denisbrodbeck/machineid"
  17. "github.com/google/uuid"
  18. . "github.com/logrusorgru/aurora/v4"
  19. "go.uber.org/zap"
  20. "go.uber.org/zap/zapcore"
  21. "gopkg.in/yaml.v3"
  22. "m7s.live/engine/v4/lang"
  23. "m7s.live/engine/v4/db"
  24. "m7s.live/engine/v4/log"
  25. "m7s.live/engine/v4/util"
  26. )
  27. var (
  28. SysInfo struct {
  29. StartTime time.Time //启动时间
  30. LocalIP string
  31. Version string
  32. }
  33. ExecPath = os.Args[0]
  34. ExecDir = filepath.Dir(ExecPath)
  35. // ConfigRaw 配置信息的原始数据
  36. ConfigRaw []byte
  37. Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置
  38. plugins []*Plugin //插件列表
  39. EngineConfig = &GlobalConfig{}
  40. Engine = InstallPlugin(EngineConfig)
  41. SettingDir = filepath.Join(ExecDir, ".m7s") //配置缓存目录,该目录按照插件名称作为文件名存储修改过的配置
  42. MergeConfigs = []string{"Publish", "Subscribe", "HTTP"} //需要合并配置的属性项,插件若没有配置则使用全局配置
  43. EventBus chan any
  44. apiList []string //注册到引擎的API接口列表
  45. )
  46. func init() {
  47. if setting_dir := os.Getenv("M7S_SETTING_DIR"); setting_dir != "" {
  48. SettingDir = setting_dir
  49. }
  50. if conn, err := net.Dial("udp", "114.114.114.114:80"); err == nil {
  51. SysInfo.LocalIP, _, _ = strings.Cut(conn.LocalAddr().String(), ":")
  52. }
  53. }
  54. // Run 启动Monibuca引擎,传入总的Context,可用于关闭所有
  55. func Run(ctx context.Context, conf any) (err error) {
  56. id, _ := machineid.ProtectedID("monibuca")
  57. SysInfo.StartTime = time.Now()
  58. SysInfo.Version = Engine.Version
  59. Engine.Context = ctx
  60. var cg map[string]map[string]any
  61. switch v := conf.(type) {
  62. case string:
  63. if _, err = os.Stat(v); err != nil {
  64. v = filepath.Join(ExecDir, v)
  65. }
  66. if ConfigRaw, err = os.ReadFile(v); err != nil {
  67. log.Warn("read config file error:", err.Error())
  68. }
  69. case []byte:
  70. ConfigRaw = v
  71. case map[string]map[string]any:
  72. cg = v
  73. }
  74. if err = util.CreateShutdownScript(); err != nil {
  75. log.Error("create shutdown script error:", err)
  76. }
  77. if err = os.MkdirAll(SettingDir, 0766); err != nil {
  78. log.Error("create dir .m7s error:", err)
  79. return
  80. }
  81. log.Info("Ⓜ starting engine:", Blink(Engine.Version))
  82. if ConfigRaw != nil {
  83. if err = yaml.Unmarshal(ConfigRaw, &cg); err != nil {
  84. log.Error("parsing yml error:", err)
  85. }
  86. }
  87. Engine.RawConfig.Parse(&EngineConfig.Engine, "GLOBAL")
  88. if cg != nil {
  89. Engine.RawConfig.ParseUserFile(cg["global"])
  90. }
  91. var logger log.Logger
  92. log.LocaleLogger = logger.Lang(lang.Get(EngineConfig.LogLang))
  93. if EngineConfig.LogLevel == "trace" {
  94. log.Trace = true
  95. log.LogLevel.SetLevel(zap.DebugLevel)
  96. } else {
  97. loglevel, err := zapcore.ParseLevel(EngineConfig.LogLevel)
  98. if err != nil {
  99. logger.Error("parse log level error:", zap.Error(err))
  100. loglevel = zapcore.InfoLevel
  101. }
  102. log.LogLevel.SetLevel(loglevel)
  103. }
  104. Engine.Logger = log.LocaleLogger.Named("engine")
  105. // 初始化mysql
  106. db.Setup(EngineConfig.User,
  107. EngineConfig.Passwd,
  108. EngineConfig.Addr,
  109. EngineConfig.DB,
  110. EngineConfig.Charset,
  111. EngineConfig.MaxIdle,
  112. EngineConfig.MaxConn,
  113. EngineConfig.LogMode,
  114. Engine.Logger.Logger,
  115. )
  116. fmt.Println("EngineConfig REDIS:",EngineConfig.RedisAddrs,
  117. EngineConfig.RedisPasswd,
  118. EngineConfig.RedisDB,
  119. EngineConfig.RedisPoolSize,
  120. EngineConfig.RedisMinIdleConns,
  121. EngineConfig.RedisMaxRetries,
  122. EngineConfig.RedisCluster)
  123. cache.Setup(EngineConfig.RedisAddrs,
  124. EngineConfig.RedisPasswd,
  125. EngineConfig.RedisDB,
  126. EngineConfig.RedisPoolSize,
  127. EngineConfig.RedisMinIdleConns,
  128. EngineConfig.RedisMaxRetries,
  129. EngineConfig.RedisCluster)
  130. EngineConfig.AuthMiddleware()
  131. Engine.assign()
  132. Engine.Logger.Debug("", zap.Any("config", EngineConfig))
  133. util.PoolSize = EngineConfig.PoolSize
  134. EventBus = make(chan any, EngineConfig.EventBusSize)
  135. go EngineConfig.Listen(Engine)
  136. for _, plugin := range plugins {
  137. plugin.Logger = log.LocaleLogger.Named(plugin.Name)
  138. if os.Getenv(strings.ToUpper(plugin.Name)+"_ENABLE") == "false" {
  139. plugin.Disabled = true
  140. plugin.Warn("disabled by env")
  141. continue
  142. }
  143. plugin.Info("initialize", zap.String("version", plugin.Version))
  144. plugin.RawConfig.Parse(plugin.Config, strings.ToUpper(plugin.Name))
  145. for _, fname := range MergeConfigs {
  146. if name := strings.ToLower(fname); plugin.RawConfig.Has(name) {
  147. plugin.RawConfig.Get(name).ParseGlobal(Engine.RawConfig.Get(name))
  148. }
  149. }
  150. var userConfig map[string]any
  151. if plugin.defaultYaml != "" {
  152. if err := yaml.Unmarshal([]byte(plugin.defaultYaml), &userConfig); err != nil {
  153. log.Error("parsing default config error:", err)
  154. } else {
  155. plugin.RawConfig.ParseDefaultYaml(userConfig)
  156. }
  157. }
  158. userConfig = cg[strings.ToLower(plugin.Name)]
  159. plugin.RawConfig.ParseUserFile(userConfig)
  160. if EngineConfig.DisableAll {
  161. plugin.Disabled = true
  162. }
  163. if userConfig["enable"] == false {
  164. plugin.Disabled = true
  165. } else if userConfig["enable"] == true {
  166. plugin.Disabled = false
  167. }
  168. if plugin.Disabled {
  169. plugin.Warn("plugin disabled")
  170. } else {
  171. plugin.assign()
  172. }
  173. }
  174. UUID := uuid.NewString()
  175. contentBuf := bytes.NewBuffer(nil)
  176. req, _ := http.NewRequestWithContext(ctx, http.MethodPost, "https://console.monibuca.com/report", nil)
  177. req.Header.Set("Content-Type", "application/json")
  178. version := Engine.Version
  179. if ver, ok := ctx.Value("version").(string); ok && ver != "" && ver != "dev" {
  180. version = ver
  181. }
  182. if EngineConfig.LogLang == "zh" {
  183. log.Info("monibuca ", version, Green(" 启动成功"))
  184. } else {
  185. log.Info("monibuca ", version, Green(" start success"))
  186. }
  187. var enabledPlugins, disabledPlugins []*Plugin
  188. for _, plugin := range plugins {
  189. if plugin.Disabled {
  190. disabledPlugins = append(disabledPlugins, plugin)
  191. } else {
  192. enabledPlugins = append(enabledPlugins, plugin)
  193. }
  194. }
  195. if EngineConfig.LogLang == "zh" {
  196. fmt.Print("已运行的插件:")
  197. } else {
  198. fmt.Print("enabled plugins:")
  199. }
  200. for _, plugin := range enabledPlugins {
  201. fmt.Print(Colorize(" "+plugin.Name+" ", BlackFg|GreenBg|BoldFm), " ")
  202. }
  203. if EngineConfig.LogLang == "zh" {
  204. fmt.Print("已禁用的插件:")
  205. } else {
  206. fmt.Print("disabled plugins:")
  207. }
  208. for _, plugin := range disabledPlugins {
  209. fmt.Print(Colorize(" "+plugin.Name+" ", BlackFg|RedBg|CrossedOutFm), " ")
  210. }
  211. if EngineConfig.LogLang == "zh" {
  212. fmt.Println(Cyan("🌏 官网地址: ").Bold(), Yellow("https://monibuca.com"))
  213. fmt.Println(Cyan("🔥 启动工程: ").Bold(), Yellow("https://github.com/langhuihui/monibuca"))
  214. fmt.Println(Cyan("📄 文档地址: ").Bold(), Yellow("https://monibuca.com/docs/index.html"))
  215. fmt.Println(Cyan("🎞 视频教程: ").Bold(), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619"))
  216. fmt.Println(Cyan("🖥 远程界面: ").Bold(), Yellow("https://console.monibuca.com"))
  217. fmt.Println(Yellow("关注公众号:不卡科技,获取更多信息"))
  218. } else {
  219. fmt.Println(Cyan("🌏 WebSite: ").Bold(), Yellow("https://m7s.live"))
  220. fmt.Println(Cyan("🔥 Github: ").Bold(), Yellow("https://github.com/langhuihui/monibuca"))
  221. fmt.Println(Cyan("📄 Docs: ").Bold(), Yellow("https://docs.m7s.live"))
  222. fmt.Println(Cyan("🎞 Videos: ").Bold(), Yellow("https://space.bilibili.com/328443019/channel/collectiondetail?sid=514619"))
  223. fmt.Println(Cyan("🖥 Console: ").Bold(), Yellow("https://console.monibuca.com"))
  224. }
  225. rp := struct {
  226. UUID string `json:"uuid"`
  227. Machine string `json:"machine"`
  228. Instance string `json:"instance"`
  229. Version string `json:"version"`
  230. OS string `json:"os"`
  231. Arch string `json:"arch"`
  232. }{UUID, id, EngineConfig.GetInstanceId(), version, runtime.GOOS, runtime.GOARCH}
  233. json.NewEncoder(contentBuf).Encode(&rp)
  234. req.Body = io.NopCloser(contentBuf)
  235. EngineConfig.OnEvent(ctx)
  236. go func() {
  237. var c http.Client
  238. reportTimer := time.NewTimer(time.Minute)
  239. c.Do(req)
  240. for {
  241. <-reportTimer.C
  242. contentBuf.Reset()
  243. contentBuf.WriteString(fmt.Sprintf(`{"uuid":"`+UUID+`","streams":%d}`, Streams.Len()))
  244. req.Body = io.NopCloser(contentBuf)
  245. c.Do(req)
  246. reportTimer.Reset(time.Minute)
  247. }
  248. }()
  249. for _, plugin := range enabledPlugins {
  250. plugin.Config.OnEvent(EngineConfig) //引擎初始化完成后,通知插件
  251. }
  252. for {
  253. select {
  254. case event := <-EventBus:
  255. //fmt.Println("ENVEN:",event)
  256. ts := time.Now()
  257. for _, plugin := range enabledPlugins {
  258. ts := time.Now()
  259. plugin.Config.OnEvent(event)
  260. if cost := time.Since(ts); cost > time.Millisecond*100 {
  261. plugin.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost))
  262. }
  263. }
  264. EngineConfig.OnEvent(event)
  265. if cost := time.Since(ts); cost > time.Millisecond*100 {
  266. log.Warn("event cost too much time", zap.String("event", fmt.Sprintf("%v", event)), zap.Duration("cost", cost))
  267. }
  268. case <-ctx.Done():
  269. return
  270. }
  271. }
  272. }