plugin.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. package engine
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "path/filepath"
  9. "reflect"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "time"
  14. "unsafe"
  15. "github.com/mcuadros/go-defaults"
  16. "go.uber.org/zap"
  17. "gopkg.in/yaml.v3"
  18. "m7s.live/engine/v4/config"
  19. "m7s.live/engine/v4/log"
  20. "m7s.live/engine/v4/util"
  21. )
  22. // InstallPlugin 安装插件,传入插件配置生成插件信息对象
  23. func InstallPlugin(config config.Plugin, options ...any) *Plugin {
  24. defaults.SetDefaults(config)
  25. t := reflect.TypeOf(config).Elem()
  26. name := strings.TrimSuffix(t.Name(), "Config")
  27. plugin := &Plugin{
  28. Name: name,
  29. Config: config,
  30. }
  31. for _, v := range options {
  32. switch v := v.(type) {
  33. case DefaultYaml:
  34. plugin.defaultYaml = v
  35. case string:
  36. name = v
  37. plugin.Name = name
  38. }
  39. }
  40. _, pluginFilePath, _, _ := runtime.Caller(1)
  41. configDir := filepath.Dir(pluginFilePath)
  42. if parts := strings.Split(configDir, "@"); len(parts) > 1 {
  43. plugin.Version = util.LastElement(parts)
  44. } else {
  45. plugin.Version = pluginFilePath
  46. }
  47. if _, ok := Plugins[name]; ok {
  48. return nil
  49. }
  50. switch v := config.(type) {
  51. case *GlobalConfig:
  52. v.InitDefaultHttp()
  53. default:
  54. Plugins[name] = plugin
  55. plugins = append(plugins, plugin)
  56. }
  57. return plugin
  58. }
  59. type FirstConfig *config.Config
  60. type UpdateConfig *config.Config
  61. type DefaultYaml string
  62. // Plugin 插件信息
  63. type Plugin struct {
  64. context.Context `json:"-" yaml:"-"`
  65. context.CancelFunc `json:"-" yaml:"-"`
  66. Name string //插件名称
  67. Config config.Plugin `json:"-" yaml:"-"` //类型化的插件配置
  68. Version string //插件版本
  69. RawConfig config.Config //最终合并后的配置的map形式方便查询
  70. defaultYaml DefaultYaml //默认配置
  71. *log.Logger `json:"-" yaml:"-"`
  72. saveTimer *time.Timer //用于保存的时候的延迟,防抖
  73. Disabled bool
  74. }
  75. func (opt *Plugin) logHandler(pattern string, handler http.Handler) http.Handler {
  76. return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
  77. opt.Debug("visit", zap.String("path", r.URL.String()), zap.String("remote", r.RemoteAddr))
  78. name := strings.ToLower(opt.Name)
  79. r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+name)
  80. handler.ServeHTTP(rw, r)
  81. })
  82. }
  83. func (opt *Plugin) handle(pattern string, handler http.Handler) {
  84. if opt == nil {
  85. return
  86. }
  87. conf, ok := opt.Config.(config.HTTPConfig)
  88. if !strings.HasPrefix(pattern, "/") {
  89. pattern = "/" + pattern
  90. }
  91. if ok {
  92. opt.Debug("http handle added", zap.String("pattern", pattern))
  93. conf.Handle(pattern, opt.logHandler(pattern, handler))
  94. }
  95. if opt != Engine {
  96. pattern = "/" + strings.ToLower(opt.Name) + pattern
  97. opt.Debug("http handle added to engine", zap.String("pattern", pattern))
  98. EngineConfig.Handle(pattern, opt.logHandler(pattern, handler))
  99. }
  100. apiList = append(apiList, pattern)
  101. }
  102. // 读取独立配置合并入总配置中
  103. func (opt *Plugin) assign() {
  104. f, err := os.Open(opt.settingPath())
  105. defer f.Close()
  106. if err == nil {
  107. var modifyConfig map[string]any
  108. err = yaml.NewDecoder(f).Decode(&modifyConfig)
  109. if err != nil {
  110. panic(err)
  111. }
  112. opt.RawConfig.ParseModifyFile(modifyConfig)
  113. }
  114. opt.registerHandler()
  115. if opt != Engine {
  116. opt.run()
  117. }
  118. }
  119. func (opt *Plugin) run() {
  120. opt.Context, opt.CancelFunc = context.WithCancel(Engine)
  121. opt.Config.OnEvent(FirstConfig(&opt.RawConfig))
  122. opt.Debug("config", zap.Any("config", opt.Config))
  123. if conf, ok := opt.Config.(config.HTTPConfig); ok {
  124. go conf.Listen(opt)
  125. }
  126. if conf, ok := opt.Config.(config.TCPConfig); ok {
  127. go conf.ListenTCP(opt, opt.Config.(config.TCPPlugin))
  128. }
  129. }
  130. // Update 热更新配置
  131. func (opt *Plugin) Update(conf *config.Config) {
  132. opt.Config.OnEvent(UpdateConfig(conf))
  133. }
  134. func (opt *Plugin) registerHandler() {
  135. t := reflect.TypeOf(opt.Config)
  136. v := reflect.ValueOf(opt.Config)
  137. // 注册http响应
  138. for i, j := 0, t.NumMethod(); i < j; i++ {
  139. name := t.Method(i).Name
  140. if name == "ServeHTTP" {
  141. continue
  142. }
  143. switch handler := v.Method(i).Interface().(type) {
  144. case func(http.ResponseWriter, *http.Request):
  145. patten := strings.ToLower(strings.ReplaceAll(name, "_", "/"))
  146. opt.handle(patten, http.HandlerFunc(handler))
  147. }
  148. }
  149. if rootHandler, ok := opt.Config.(http.Handler); ok {
  150. opt.handle("/", rootHandler)
  151. }
  152. }
  153. func (opt *Plugin) settingPath() string {
  154. return filepath.Join(SettingDir, strings.ToLower(opt.Name)+".yaml")
  155. }
  156. func (opt *Plugin) Save() error {
  157. if opt.saveTimer == nil {
  158. var lock sync.Mutex
  159. opt.saveTimer = time.AfterFunc(time.Second, func() {
  160. lock.Lock()
  161. defer lock.Unlock()
  162. if opt.RawConfig.Modify == nil {
  163. os.Remove(opt.settingPath())
  164. return
  165. }
  166. file, err := os.OpenFile(opt.settingPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
  167. if err == nil {
  168. defer file.Close()
  169. err = yaml.NewEncoder(file).Encode(opt.RawConfig.Modify)
  170. }
  171. if err == nil {
  172. opt.Info("config saved")
  173. }
  174. })
  175. } else {
  176. opt.saveTimer.Reset(time.Second)
  177. }
  178. return nil
  179. }
  180. func (opt *Plugin) AssignPubConfig(puber *Publisher) {
  181. if puber.Config == nil {
  182. conf, ok := opt.Config.(config.PublishConfig)
  183. if !ok {
  184. conf = EngineConfig
  185. }
  186. copyConfig := conf.GetPublishConfig()
  187. puber.Config = &copyConfig
  188. }
  189. }
  190. func (opt *Plugin) Publish(streamPath string, pub IPublisher) error {
  191. puber := pub.GetPublisher()
  192. if puber == nil {
  193. if EngineConfig.LogLang == "zh" {
  194. return errors.New("不是发布者")
  195. } else {
  196. return errors.New("not publisher")
  197. }
  198. }
  199. opt.AssignPubConfig(puber)
  200. return pub.Publish(streamPath, pub)
  201. }
  202. var ErrStreamNotExist = errors.New("stream not exist")
  203. // SubscribeExist 订阅已经存在的流
  204. func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error {
  205. opt.Info("subscribe exsit", zap.String("path", streamPath))
  206. path, _, _ := strings.Cut(streamPath, "?")
  207. if !Streams.Has(path) {
  208. opt.Warn("stream not exist", zap.String("path", streamPath))
  209. return ErrStreamNotExist
  210. }
  211. return opt.Subscribe(streamPath, sub)
  212. }
  213. func (opt *Plugin) AssignSubConfig(suber *Subscriber) {
  214. if suber.Config == nil {
  215. conf, ok := opt.Config.(config.SubscribeConfig)
  216. if !ok {
  217. conf = EngineConfig
  218. }
  219. copyConfig := *conf.GetSubscribeConfig()
  220. suber.Config = &copyConfig
  221. }
  222. if suber.ID == "" {
  223. suber.ID = fmt.Sprintf("%d", uintptr(unsafe.Pointer(suber)))
  224. }
  225. }
  226. // Subscribe 订阅一个流,如果流不存在则创建一个等待流
  227. func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error {
  228. suber := sub.GetSubscriber()
  229. if suber == nil {
  230. if EngineConfig.LogLang == "zh" {
  231. return errors.New("不是订阅者")
  232. } else {
  233. return errors.New("not subscriber")
  234. }
  235. }
  236. opt.AssignSubConfig(suber)
  237. return sub.Subscribe(streamPath, sub)
  238. }
  239. // SubscribeBlock 阻塞订阅一个流,直到订阅结束
  240. func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber, t byte) (err error) {
  241. if err = opt.Subscribe(streamPath, sub); err == nil {
  242. sub.PlayBlock(t)
  243. }
  244. return
  245. }
  246. var ErrNoPullConfig = errors.New("no pull config")
  247. var Pullers sync.Map
  248. func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) (err error) {
  249. conf, ok := opt.Config.(config.PullConfig)
  250. if !ok {
  251. return ErrNoPullConfig
  252. }
  253. pullConf := conf.GetPullConfig()
  254. if save < 2 {
  255. zurl := zap.String("url", url)
  256. zpath := zap.String("stream", streamPath)
  257. opt.Info("pull", zpath, zurl)
  258. puller.init(streamPath, url, pullConf)
  259. opt.AssignPubConfig(puller.GetPublisher())
  260. puller.SetLogger(opt.Logger.With(zpath, zurl))
  261. go puller.startPull(puller)
  262. }
  263. switch save {
  264. case 1:
  265. pullConf.PullOnStartLocker.Lock()
  266. defer pullConf.PullOnStartLocker.Unlock()
  267. m := map[string]string{streamPath: url}
  268. opt.RawConfig.ParseModifyFile(map[string]any{
  269. "pull": map[string]any{
  270. "pullonstart": m,
  271. },
  272. })
  273. case 2:
  274. pullConf.PullOnSubLocker.Lock()
  275. defer pullConf.PullOnSubLocker.Unlock()
  276. m := map[string]string{streamPath: url}
  277. for id := range pullConf.PullOnSub {
  278. m[id] = pullConf.PullOnSub[id]
  279. }
  280. opt.RawConfig.ParseModifyFile(map[string]any{
  281. "pull": map[string]any{
  282. "pullonsub": m,
  283. },
  284. })
  285. }
  286. if save > 0 {
  287. if err = opt.Save(); err != nil {
  288. opt.Error("save faild", zap.Error(err))
  289. }
  290. }
  291. return
  292. }
  293. var ErrNoPushConfig = errors.New("no push config")
  294. var Pushers sync.Map
  295. func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error) {
  296. zp, zu := zap.String("stream", streamPath), zap.String("url", url)
  297. opt.Info("push", zp, zu)
  298. defer func() {
  299. if err != nil {
  300. opt.Error("push faild", zap.Error(err))
  301. }
  302. }()
  303. conf, ok := opt.Config.(config.PushConfig)
  304. if !ok {
  305. return ErrNoPushConfig
  306. }
  307. pushConfig := conf.GetPushConfig()
  308. pusher.init(streamPath, url, pushConfig)
  309. pusher.SetLogger(opt.Logger.With(zp, zu))
  310. opt.AssignSubConfig(pusher.GetSubscriber())
  311. go pusher.startPush(pusher)
  312. if save {
  313. pushConfig.AddPush(url, streamPath)
  314. opt.RawConfig.Get("push").Get("pushlist").Modify = pushConfig.PushList
  315. if err = opt.Save(); err != nil {
  316. opt.Error("save faild", zap.Error(err))
  317. }
  318. }
  319. return
  320. }