plugin.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package engine
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "os"
  8. "path/filepath"
  9. "reflect"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "time"
  14. "unsafe"
  15. "github.com/mcuadros/go-defaults"
  16. "go.uber.org/zap"
  17. "gopkg.in/yaml.v3"
  18. "m7s.live/engine/v4/config"
  19. "m7s.live/engine/v4/log"
  20. "m7s.live/engine/v4/util"
  21. )
  22. // InstallPlugin 安装插件,传入插件配置生成插件信息对象
  23. func InstallPlugin(config config.Plugin, options ...any) *Plugin {
  24. defaults.SetDefaults(config)
  25. //fmt.Println("00000xxxxxxxxxxxxxxx11111111:",config)
  26. t := reflect.TypeOf(config).Elem()
  27. name := strings.TrimSuffix(t.Name(), "Config")
  28. plugin := &Plugin{
  29. Name: name,
  30. Config: config,
  31. }
  32. //fmt.Println("InstallPlugin xxxxxxx xxxxxxxxxxxxxxx:",plugin.Name)
  33. //fmt.Println("11111xxxxxxxxxxxxxxx11111111:",config)
  34. for _, v := range options {
  35. switch v := v.(type) {
  36. case DefaultYaml:
  37. plugin.defaultYaml = v
  38. case string:
  39. name = v
  40. plugin.Name = name
  41. }
  42. }
  43. //fmt.Println("222222xxxxxxxxxxxxxxx11111111:",config)
  44. _, pluginFilePath, _, _ := runtime.Caller(1)
  45. configDir := filepath.Dir(pluginFilePath)
  46. if parts := strings.Split(configDir, "@"); len(parts) > 1 {
  47. plugin.Version = util.LastElement(parts)
  48. } else {
  49. plugin.Version = pluginFilePath
  50. }
  51. //fmt.Println("33333xxxxxxxxxxxxxxx11111111:",config)
  52. if _, ok := Plugins[name]; ok {
  53. //fmt.Println("xxxxxxxxxxxxxxx0000:",config)
  54. return nil
  55. }
  56. //fmt.Println("444444xxxxxxxxxxxxxxx11111111:",config)
  57. switch v := config.(type) {
  58. case *GlobalConfig:
  59. v.InitDefaultHttp()
  60. default:
  61. Plugins[name] = plugin
  62. plugins = append(plugins, plugin)
  63. }
  64. //fmt.Println("xxxxxxxxxxxxxxx11111111:",config)
  65. return plugin
  66. }
  67. type FirstConfig *config.Config
  68. type UpdateConfig *config.Config
  69. type DefaultYaml string
  70. // Plugin 插件信息
  71. type Plugin struct {
  72. context.Context `json:"-" yaml:"-"`
  73. context.CancelFunc `json:"-" yaml:"-"`
  74. Name string //插件名称
  75. Config config.Plugin `json:"-" yaml:"-"` //类型化的插件配置
  76. Version string //插件版本
  77. RawConfig config.Config //最终合并后的配置的map形式方便查询
  78. defaultYaml DefaultYaml //默认配置
  79. *log.Logger `json:"-" yaml:"-"`
  80. saveTimer *time.Timer //用于保存的时候的延迟,防抖
  81. Disabled bool
  82. }
  83. func (opt *Plugin) logHandler(pattern string, handler http.Handler) http.Handler {
  84. return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
  85. opt.Debug("visit", zap.String("path", r.URL.String()), zap.String("remote", r.RemoteAddr))
  86. name := strings.ToLower(opt.Name)
  87. r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+name)
  88. //fmt.Println("vist 11111111111111111",name,r.URL.Path)
  89. handler.ServeHTTP(rw, r)
  90. })
  91. }
  92. func (opt *Plugin) handle(pattern string, handler http.Handler) {
  93. if opt == nil {
  94. return
  95. }
  96. conf, ok := opt.Config.(config.HTTPConfig)
  97. if !strings.HasPrefix(pattern, "/") {
  98. pattern = "/" + pattern
  99. }
  100. if ok {
  101. opt.Debug("http handle added", zap.String("pattern", pattern))
  102. //fmt.Println("HANDLE 33333311111122222222222222222222222222222222222222",pattern)
  103. conf.Handle(pattern, opt.logHandler(pattern, handler))
  104. }
  105. if opt != Engine {
  106. //handlerName := reflect.TypeOf((&handler)(nil)).Elem().Name()
  107. pattern = "/" + strings.ToLower(opt.Name) + pattern
  108. //fmt.Println("HANDLE 11111122222222222222222222222222222222222222",pattern)
  109. opt.Debug("http handle added to engine", zap.String("pattern", pattern))
  110. EngineConfig.Handle(pattern, opt.logHandler(pattern, handler))
  111. }
  112. apiList = append(apiList, pattern)
  113. }
  114. // 读取独立配置合并入总配置中
  115. func (opt *Plugin) assign() {
  116. //fmt.Println("assign 111111111111111111111111111111111111111111111111111 ",opt.settingPath())
  117. f, err := os.Open(opt.settingPath())
  118. defer f.Close()
  119. if err == nil {
  120. var modifyConfig map[string]any
  121. err = yaml.NewDecoder(f).Decode(&modifyConfig)
  122. if err != nil {
  123. panic(err)
  124. }
  125. opt.RawConfig.ParseModifyFile(modifyConfig)
  126. }
  127. opt.registerHandler()
  128. if opt != Engine {
  129. opt.run()
  130. }
  131. }
  132. func (opt *Plugin) run() {
  133. opt.Context, opt.CancelFunc = context.WithCancel(Engine)
  134. opt.Config.OnEvent(FirstConfig(&opt.RawConfig))
  135. opt.Debug("config", zap.Any("config", opt.Config))
  136. if conf, ok := opt.Config.(config.HTTPConfig); ok {
  137. //opt.Info("config111111111111111111111111HTTPConfig", zap.Any("config", opt.Config))
  138. go conf.Listen(opt)
  139. }
  140. if conf, ok := opt.Config.(config.TCPConfig); ok {
  141. go conf.ListenTCP(opt, opt.Config.(config.TCPPlugin))
  142. }
  143. }
  144. // Update 热更新配置
  145. func (opt *Plugin) Update(conf *config.Config) {
  146. opt.Config.OnEvent(UpdateConfig(conf))
  147. }
  148. func (opt *Plugin) registerHandler() {
  149. //fmt.Println("registerHandler 111111111111111111111111111111111111111111111111111",opt.Name)
  150. t := reflect.TypeOf(opt.Config)
  151. v := reflect.ValueOf(opt.Config)
  152. // 注册http响应
  153. for i, j := 0, t.NumMethod(); i < j; i++ {
  154. name := t.Method(i).Name
  155. if name == "ServeHTTP" {
  156. continue
  157. }
  158. //fmt.Println("registerHandler name111111111111111111111111111111111111111111111111111",name)
  159. switch handler := v.Method(i).Interface().(type) {
  160. case func(http.ResponseWriter, *http.Request):
  161. patten := strings.ToLower(strings.ReplaceAll(name, "_", "/"))
  162. //fmt.Println("registerHandler patten xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",patten)
  163. opt.handle(patten, http.HandlerFunc(handler))
  164. }
  165. }
  166. if rootHandler, ok := opt.Config.(http.Handler); ok {
  167. //fmt.Println("registerHandler root xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",opt.Config)
  168. opt.handle("/", rootHandler)
  169. }
  170. }
  171. func (opt *Plugin) settingPath() string {
  172. return filepath.Join(SettingDir, strings.ToLower(opt.Name)+".yaml")
  173. }
  174. func (opt *Plugin) Save() error {
  175. if opt.saveTimer == nil {
  176. var lock sync.Mutex
  177. opt.saveTimer = time.AfterFunc(time.Second, func() {
  178. lock.Lock()
  179. defer lock.Unlock()
  180. if opt.RawConfig.Modify == nil {
  181. os.Remove(opt.settingPath())
  182. return
  183. }
  184. file, err := os.OpenFile(opt.settingPath(), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
  185. if err == nil {
  186. defer file.Close()
  187. err = yaml.NewEncoder(file).Encode(opt.RawConfig.Modify)
  188. }
  189. if err == nil {
  190. opt.Info("config saved")
  191. }
  192. })
  193. } else {
  194. opt.saveTimer.Reset(time.Second)
  195. }
  196. return nil
  197. }
  198. func (opt *Plugin) AssignPubConfig(puber *Publisher) {
  199. if puber.Config == nil {
  200. conf, ok := opt.Config.(config.PublishConfig)
  201. if !ok {
  202. conf = EngineConfig
  203. }
  204. copyConfig := conf.GetPublishConfig()
  205. puber.Config = &copyConfig
  206. }
  207. }
  208. func (opt *Plugin) Publish(streamPath string, pub IPublisher) error {
  209. puber := pub.GetPublisher()
  210. if puber == nil {
  211. if EngineConfig.LogLang == "zh" {
  212. return errors.New("不是发布者")
  213. } else {
  214. return errors.New("not publisher")
  215. }
  216. }
  217. opt.AssignPubConfig(puber)
  218. return pub.Publish(streamPath, pub)
  219. }
  220. var ErrStreamNotExist = errors.New("stream not exist")
  221. // SubscribeExist 订阅已经存在的流
  222. func (opt *Plugin) SubscribeExist(streamPath string, sub ISubscriber) error {
  223. opt.Info("subscribe exsit", zap.String("path", streamPath))
  224. path, _, _ := strings.Cut(streamPath, "?")
  225. if !Streams.Has(path) {
  226. opt.Warn("stream not exist", zap.String("path", streamPath))
  227. return ErrStreamNotExist
  228. }
  229. return opt.Subscribe(streamPath, sub)
  230. }
  231. func (opt *Plugin) AssignSubConfig(suber *Subscriber) {
  232. if suber.Config == nil {
  233. conf, ok := opt.Config.(config.SubscribeConfig)
  234. if !ok {
  235. conf = EngineConfig
  236. }
  237. copyConfig := *conf.GetSubscribeConfig()
  238. suber.Config = &copyConfig
  239. }
  240. if suber.ID == "" {
  241. suber.ID = fmt.Sprintf("%d", uintptr(unsafe.Pointer(suber)))
  242. }
  243. }
  244. // Subscribe 订阅一个流,如果流不存在则创建一个等待流
  245. func (opt *Plugin) Subscribe(streamPath string, sub ISubscriber) error {
  246. suber := sub.GetSubscriber()
  247. if suber == nil {
  248. if EngineConfig.LogLang == "zh" {
  249. return errors.New("不是订阅者")
  250. } else {
  251. return errors.New("not subscriber")
  252. }
  253. }
  254. opt.AssignSubConfig(suber)
  255. return sub.Subscribe(streamPath, sub)
  256. }
  257. // SubscribeBlock 阻塞订阅一个流,直到订阅结束
  258. func (opt *Plugin) SubscribeBlock(streamPath string, sub ISubscriber, t byte) (err error) {
  259. if err = opt.Subscribe(streamPath, sub); err == nil {
  260. sub.PlayBlock(t)
  261. }
  262. return
  263. }
  264. var ErrNoPullConfig = errors.New("no pull config")
  265. var Pullers sync.Map
  266. func (opt *Plugin) Pull(streamPath string, url string, puller IPuller, save int) (err error) {
  267. conf, ok := opt.Config.(config.PullConfig)
  268. if !ok {
  269. return ErrNoPullConfig
  270. }
  271. pullConf := conf.GetPullConfig()
  272. if save < 2 {
  273. zurl := zap.String("url", url)
  274. zpath := zap.String("stream", streamPath)
  275. opt.Info("pull", zpath, zurl)
  276. puller.init(streamPath, url, pullConf)
  277. opt.AssignPubConfig(puller.GetPublisher())
  278. puller.SetLogger(opt.Logger.With(zpath, zurl))
  279. go puller.startPull(puller)
  280. }
  281. switch save {
  282. case 1:
  283. pullConf.PullOnStartLocker.Lock()
  284. defer pullConf.PullOnStartLocker.Unlock()
  285. m := map[string]string{streamPath: url}
  286. opt.RawConfig.ParseModifyFile(map[string]any{
  287. "pull": map[string]any{
  288. "pullonstart": m,
  289. },
  290. })
  291. case 2:
  292. pullConf.PullOnSubLocker.Lock()
  293. defer pullConf.PullOnSubLocker.Unlock()
  294. m := map[string]string{streamPath: url}
  295. for id := range pullConf.PullOnSub {
  296. m[id] = pullConf.PullOnSub[id]
  297. }
  298. opt.RawConfig.ParseModifyFile(map[string]any{
  299. "pull": map[string]any{
  300. "pullonsub": m,
  301. },
  302. })
  303. }
  304. if save > 0 {
  305. if err = opt.Save(); err != nil {
  306. opt.Error("save faild", zap.Error(err))
  307. }
  308. }
  309. return
  310. }
  311. var ErrNoPushConfig = errors.New("no push config")
  312. var Pushers sync.Map
  313. func (opt *Plugin) Push(streamPath string, url string, pusher IPusher, save bool) (err error) {
  314. zp, zu := zap.String("stream", streamPath), zap.String("url", url)
  315. opt.Info("push", zp, zu)
  316. defer func() {
  317. if err != nil {
  318. opt.Error("push faild", zap.Error(err))
  319. }
  320. }()
  321. conf, ok := opt.Config.(config.PushConfig)
  322. if !ok {
  323. return ErrNoPushConfig
  324. }
  325. pushConfig := conf.GetPushConfig()
  326. pusher.init(streamPath, url, pushConfig)
  327. pusher.SetLogger(opt.Logger.With(zp, zu))
  328. opt.AssignSubConfig(pusher.GetSubscriber())
  329. go pusher.startPush(pusher)
  330. if save {
  331. pushConfig.AddPush(url, streamPath)
  332. opt.RawConfig.Get("push").Get("pushlist").Modify = pushConfig.PushList
  333. if err = opt.Save(); err != nil {
  334. opt.Error("save faild", zap.Error(err))
  335. }
  336. }
  337. return
  338. }