io.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. package engine
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/url"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "go.uber.org/zap"
  14. "go.uber.org/zap/zapcore"
  15. "m7s.live/engine/v4/config"
  16. "m7s.live/engine/v4/log"
  17. "m7s.live/engine/v4/util"
  18. )
  19. type IOConfig interface {
  20. config.Publish | config.Subscribe
  21. }
  22. type ClientConfig interface {
  23. config.Pull | config.Push
  24. }
  25. type AuthSub interface {
  26. OnAuth(*util.Promise[ISubscriber]) error
  27. }
  28. type AuthPub interface {
  29. OnAuth(*util.Promise[IPublisher]) error
  30. }
  31. // 发布者或者订阅者的共用结构体
  32. type IO struct {
  33. ID string
  34. Type string
  35. RemoteAddr string
  36. context.Context `json:"-" yaml:"-"` //不要直接设置,应当通过SetParentCtx传入父级Context
  37. context.CancelCauseFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者
  38. *log.Logger `json:"-" yaml:"-"`
  39. StartTime time.Time //创建时间
  40. Stream *Stream `json:"-" yaml:"-"`
  41. io.Reader `json:"-" yaml:"-"`
  42. io.Writer `json:"-" yaml:"-"`
  43. io.Closer `json:"-" yaml:"-"`
  44. Args url.Values
  45. Spesific IIO `json:"-" yaml:"-"`
  46. }
  47. func (io *IO) IsClosed() bool {
  48. return io.Err() != nil
  49. }
  50. // SetIO(可选) 设置Writer、Reader、Closer
  51. func (i *IO) SetIO(conn any) {
  52. if v, ok := conn.(io.Closer); ok {
  53. i.Closer = v
  54. }
  55. if v, ok := conn.(io.Reader); ok {
  56. i.Reader = v
  57. }
  58. if v, ok := conn.(io.Writer); ok {
  59. i.Writer = v
  60. }
  61. }
  62. // SetParentCtx(可选)
  63. func (i *IO) SetParentCtx(parent context.Context) {
  64. i.Context, i.CancelCauseFunc = context.WithCancelCause(parent)
  65. }
  66. func (i *IO) SetLogger(logger *log.Logger) {
  67. i.Logger = logger
  68. }
  69. func (i *IO) OnEvent(event any) {
  70. switch event.(type) {
  71. case SEclose:
  72. i.close(StopError{zap.String("event", "close")})
  73. case SEKick:
  74. i.close(StopError{zap.String("event", "kick")})
  75. }
  76. }
  77. func (io *IO) IsShutdown() bool {
  78. if io.Stream == nil {
  79. return false
  80. }
  81. return io.Stream.IsShutdown()
  82. }
  83. type IIO interface {
  84. receive(string, IIO) error
  85. IsClosed() bool
  86. OnEvent(any)
  87. Stop(reason ...zapcore.Field)
  88. SetIO(any)
  89. SetParentCtx(context.Context)
  90. SetLogger(*log.Logger)
  91. IsShutdown() bool
  92. log.Zap
  93. }
  94. func (i *IO) close(err StopError) bool {
  95. if i.IsClosed() {
  96. i.Warn("already closed", err...)
  97. return false
  98. }
  99. i.Info("close", err...)
  100. if i.CancelCauseFunc != nil {
  101. i.CancelCauseFunc(err)
  102. }
  103. if i.Closer != nil {
  104. i.Closer.Close()
  105. }
  106. return true
  107. }
  108. // Stop 停止订阅或者发布,由订阅者或者发布者调用
  109. func (io *IO) Stop(reason ...zapcore.Field) {
  110. io.close(StopError(reason))
  111. }
  112. type StopError []zapcore.Field
  113. func (s StopError) Error() string {
  114. return "stop"
  115. }
  116. var (
  117. ErrDuplicatePublish = errors.New("Duplicate Publish")
  118. ErrBadStreamName = errors.New("StreamPath Illegal")
  119. ErrBadTrackName = errors.New("Track Already Exist")
  120. ErrTrackMute = errors.New("Track Mute")
  121. ErrStreamIsClosed = errors.New("Stream Is Closed")
  122. ErrPublisherLost = errors.New("Publisher Lost")
  123. ErrAuth = errors.New("Auth Failed")
  124. OnAuthSub func(p *util.Promise[ISubscriber]) error
  125. OnAuthPub func(p *util.Promise[IPublisher]) error
  126. )
  127. func (io *IO) auth(key string, secret string, expire string) bool {
  128. //fmt.Println("KEY 1111111111111111111111",key,secret,expire)
  129. if unixTime, err := strconv.ParseInt(expire, 16, 64); err != nil || time.Now().Unix() > unixTime {
  130. return false
  131. }
  132. trueSecret := md5.Sum([]byte(key + io.Stream.Path + expire))
  133. for i := 0; i < 16; i++ {
  134. hex, err := strconv.ParseInt(secret[i<<1:(i<<1)+2], 16, 16)
  135. if trueSecret[i] != byte(hex) || err != nil {
  136. return false
  137. }
  138. }
  139. return true
  140. }
  141. // receive 用于接收发布或者订阅
  142. func (io *IO) receive(streamPath string, specific IIO) error {
  143. streamPath = strings.Trim(streamPath, "/")
  144. u, err := url.Parse(streamPath)
  145. if err != nil {
  146. if EngineConfig.LogLang == "zh" {
  147. log.Error("接收流路径(流唯一标识)格式错误,必须形如 live/test ", zap.String("流路径", streamPath), zap.Error(err))
  148. } else {
  149. log.Error("receive streamPath wrong format", zap.String("streamPath", streamPath), zap.Error(err))
  150. }
  151. return err
  152. }
  153. io.Args = u.Query()
  154. wt := time.Second * 5
  155. var iSub ISubscriber
  156. var iPub IPublisher
  157. var isSubscribe bool
  158. if iSub, isSubscribe = specific.(ISubscriber); isSubscribe {
  159. wt = iSub.GetSubscriber().Config.WaitTimeout
  160. } else {
  161. iPub = specific.(IPublisher)
  162. }
  163. s, create := findOrCreateStream(u.Path, wt)
  164. //fmt.Println("start recive findOrCreateStream-------------------------------------------------------------",streamPath,create)
  165. if s == nil {
  166. return ErrBadStreamName
  167. }
  168. if io.Stream == nil { //初次
  169. if io.Type == "" {
  170. io.Type = reflect.TypeOf(specific).Elem().Name()
  171. }
  172. logFeilds := []zapcore.Field{zap.String("type", io.Type)}
  173. if io.ID != "" {
  174. logFeilds = append(logFeilds, zap.String("ID", io.ID))
  175. }
  176. if io.Logger == nil {
  177. io.Logger = s.With(logFeilds...)
  178. } else {
  179. io.Logger = io.Logger.With(logFeilds...)
  180. }
  181. }
  182. io.Stream = s
  183. io.Spesific = specific
  184. io.StartTime = time.Now()
  185. if io.Context == nil {
  186. io.Debug("create context")
  187. io.SetParentCtx(Engine.Context)
  188. } else if io.IsClosed() {
  189. io.Debug("recreate context")
  190. io.SetParentCtx(Engine.Context)
  191. } else {
  192. io.Debug("warp context")
  193. io.SetParentCtx(io.Context)
  194. }
  195. defer func() {
  196. if err == nil {
  197. specific.OnEvent(specific)
  198. }else{
  199. io.Info("io receive error",zap.String("streamPath",streamPath), zap.String("error",err.Error()))
  200. }
  201. }()
  202. if !isSubscribe {
  203. puber := iPub.GetPublisher()
  204. conf := puber.Config
  205. io.Info("publish", zap.String("ptr", fmt.Sprintf("%p", iPub)))
  206. s.pubLocker.Lock()
  207. defer s.pubLocker.Unlock()
  208. if config.Global.EnableAuth {
  209. onAuthPub := OnAuthPub
  210. if auth, ok := specific.(AuthPub); ok {
  211. onAuthPub = auth.OnAuth
  212. }
  213. if onAuthPub != nil {
  214. authPromise := util.NewPromise(iPub)
  215. if err = onAuthPub(authPromise); err == nil {
  216. err = authPromise.Await()
  217. }
  218. if err != nil {
  219. return err
  220. }
  221. } else if conf.Key != "" {
  222. if !io.auth(conf.Key, io.Args.Get(conf.SecretArgName), io.Args.Get(conf.ExpireArgName)) {
  223. err = ErrAuth
  224. return err
  225. }
  226. }
  227. }
  228. io.Info("Publish Promise Await start",zap.String("streamPath",streamPath), zap.String("time",time.Now().String()))
  229. if promise := util.NewPromise(iPub); s.Receive(promise) {
  230. err = promise.Await()
  231. io.Info("Publish Promise Await end",zap.String("streamPath",streamPath), zap.String("time",time.Now().String()))
  232. return err
  233. }
  234. } else {
  235. conf := iSub.GetSubscriber().Config
  236. io.Info("subscribe")
  237. if create {
  238. EventBus <- InvitePublish{CreateEvent(s.Path)} // 通知发布者按需拉流
  239. }
  240. if config.Global.EnableAuth && !conf.Internal {
  241. onAuthSub := OnAuthSub
  242. if auth, ok := specific.(AuthSub); ok {
  243. onAuthSub = auth.OnAuth
  244. }
  245. if onAuthSub != nil {
  246. authPromise := util.NewPromise(iSub)
  247. if err = onAuthSub(authPromise); err == nil {
  248. err = authPromise.Await()
  249. }
  250. if err != nil {
  251. return err
  252. }
  253. } else if conf.Key != "" {
  254. if !io.auth(conf.Key, io.Args.Get(conf.SecretArgName), io.Args.Get(conf.ExpireArgName)) {
  255. err = ErrAuth
  256. return err
  257. }
  258. }
  259. }
  260. if promise := util.NewPromise(iSub);s.Receive(promise) {
  261. io.Info("Subscribe Promise Await start",zap.String("streamPath",streamPath), zap.String("time",time.Now().String()))
  262. err = promise.Await()
  263. io.Info("SubscribePromise Await end",zap.String("streamPath",streamPath), zap.String("time",time.Now().String()))
  264. return err
  265. }
  266. }
  267. return ErrStreamIsClosed
  268. }
  269. // ClientIO 作为Client角色(Puller,Pusher)的公共结构体
  270. type ClientIO[C ClientConfig] struct {
  271. Config *C
  272. StreamPath string // 本地流标识
  273. RemoteURL string // 远程服务器地址(用于推拉)
  274. ReConnectCount int //重连次数
  275. }
  276. func (c *ClientIO[C]) init(streamPath string, url string, conf *C) {
  277. c.Config = conf
  278. c.StreamPath = streamPath
  279. c.RemoteURL = url
  280. }