io.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package engine
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/url"
  9. "reflect"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "go.uber.org/zap"
  14. "go.uber.org/zap/zapcore"
  15. "m7s.live/engine/v4/common"
  16. "m7s.live/engine/v4/config"
  17. "m7s.live/engine/v4/log"
  18. "m7s.live/engine/v4/util"
  19. )
  20. type IOConfig interface {
  21. config.Publish | config.Subscribe
  22. }
  23. type ClientConfig interface {
  24. config.Pull | config.Push
  25. }
  26. type AuthSub interface {
  27. OnAuth(*util.Promise[ISubscriber]) error
  28. }
  29. type AuthPub interface {
  30. OnAuth(*util.Promise[IPublisher]) error
  31. }
  32. // 发布者或者订阅者的共用结构体
  33. type IO struct {
  34. ID string
  35. Type string
  36. RemoteAddr string
  37. context.Context `json:"-" yaml:"-"` //不要直接设置,应当通过SetParentCtx传入父级Context
  38. context.CancelCauseFunc `json:"-" yaml:"-"` //流关闭是关闭发布者或者订阅者
  39. *log.Logger `json:"-" yaml:"-"`
  40. StartTime time.Time //创建时间
  41. Stream *Stream `json:"-" yaml:"-"`
  42. io.Reader `json:"-" yaml:"-"`
  43. io.Writer `json:"-" yaml:"-"`
  44. io.Closer `json:"-" yaml:"-"`
  45. Args url.Values
  46. Spesific common.IIO `json:"-" yaml:"-"`
  47. }
  48. func (io *IO) GetStream() common.IStream {
  49. return io.Stream
  50. }
  51. func (io *IO) IsClosed() bool {
  52. return io.Err() != nil
  53. }
  54. // SetIO(可选) 设置Writer、Reader、Closer
  55. func (i *IO) SetIO(conn any) {
  56. if v, ok := conn.(io.Closer); ok {
  57. i.Closer = v
  58. }
  59. if v, ok := conn.(io.Reader); ok {
  60. i.Reader = v
  61. }
  62. if v, ok := conn.(io.Writer); ok {
  63. i.Writer = v
  64. }
  65. }
  66. // SetParentCtx(可选)
  67. func (i *IO) SetParentCtx(parent context.Context) {
  68. i.Context, i.CancelCauseFunc = context.WithCancelCause(parent)
  69. }
  70. func (i *IO) SetLogger(logger *log.Logger) {
  71. i.Logger = logger
  72. }
  73. func (i *IO) OnEvent(event any) {
  74. switch event.(type) {
  75. case SEclose:
  76. i.close(StopError{zap.String("event", "close")})
  77. case SEKick:
  78. i.close(StopError{zap.String("event", "kick")})
  79. }
  80. }
  81. func (io *IO) IsShutdown() bool {
  82. if io.Stream == nil {
  83. return false
  84. }
  85. return io.Stream.IsShutdown()
  86. }
  87. func (i *IO) close(err StopError) bool {
  88. if i.IsClosed() {
  89. i.Warn("already closed", err...)
  90. return false
  91. }
  92. i.Info("close", err...)
  93. if i.CancelCauseFunc != nil {
  94. i.CancelCauseFunc(err)
  95. }
  96. if i.Closer != nil {
  97. i.Closer.Close()
  98. }
  99. return true
  100. }
  101. // Stop 停止订阅或者发布,由订阅者或者发布者调用
  102. func (io *IO) Stop(reason ...zapcore.Field) {
  103. io.close(StopError(reason))
  104. }
  105. type StopError []zapcore.Field
  106. func (s StopError) Error() string {
  107. return "stop"
  108. }
  109. var (
  110. ErrDuplicatePublish = errors.New("Duplicate Publish")
  111. ErrBadStreamName = errors.New("StreamPath Illegal")
  112. ErrBadTrackName = errors.New("Track Already Exist")
  113. ErrTrackMute = errors.New("Track Mute")
  114. ErrStreamIsClosed = errors.New("Stream Is Closed")
  115. ErrPublisherLost = errors.New("Publisher Lost")
  116. ErrAuth = errors.New("Auth Failed")
  117. OnAuthSub func(p *util.Promise[ISubscriber]) error
  118. OnAuthPub func(p *util.Promise[IPublisher]) error
  119. )
  120. func (io *IO) auth(key string, secret string, expire string) bool {
  121. if unixTime, err := strconv.ParseInt(expire, 16, 64); err != nil || time.Now().Unix() > unixTime {
  122. return false
  123. }
  124. trueSecret := md5.Sum([]byte(key + io.Stream.Path + expire))
  125. for i := 0; i < 16; i++ {
  126. hex, err := strconv.ParseInt(secret[i<<1:(i<<1)+2], 16, 16)
  127. if trueSecret[i] != byte(hex) || err != nil {
  128. return false
  129. }
  130. }
  131. return true
  132. }
  133. // receive 用于接收发布或者订阅
  134. func (io *IO) receive(streamPath string, specific common.IIO) error {
  135. streamPath = strings.Trim(streamPath, "/")
  136. u, err := url.Parse(streamPath)
  137. if err != nil {
  138. if EngineConfig.LogLang == "zh" {
  139. log.Error("接收流路径(流唯一标识)格式错误,必须形如 live/test ", zap.String("流路径", streamPath), zap.Error(err))
  140. } else {
  141. log.Error("receive streamPath wrong format", zap.String("streamPath", streamPath), zap.Error(err))
  142. }
  143. return err
  144. }
  145. io.Args = u.Query()
  146. wt := time.Second * 5
  147. var iSub ISubscriber
  148. var iPub IPublisher
  149. var isSubscribe bool
  150. if iSub, isSubscribe = specific.(ISubscriber); isSubscribe {
  151. wt = iSub.GetSubscriber().Config.WaitTimeout
  152. } else {
  153. iPub = specific.(IPublisher)
  154. }
  155. s, create := findOrCreateStream(u.Path, wt)
  156. if s == nil {
  157. return ErrBadStreamName
  158. }
  159. if io.Stream == nil { //初次
  160. if io.Type == "" {
  161. io.Type = reflect.TypeOf(specific).Elem().Name()
  162. }
  163. logFeilds := []zapcore.Field{zap.String("type", io.Type)}
  164. if io.ID != "" {
  165. logFeilds = append(logFeilds, zap.String("ID", io.ID))
  166. }
  167. if io.Logger == nil {
  168. io.Logger = s.With(logFeilds...)
  169. } else {
  170. io.Logger = io.Logger.With(logFeilds...)
  171. }
  172. }
  173. io.Stream = s
  174. io.Spesific = specific
  175. io.StartTime = time.Now()
  176. if io.Context == nil {
  177. io.Debug("create context")
  178. io.SetParentCtx(Engine.Context)
  179. } else if io.IsClosed() {
  180. io.Debug("recreate context")
  181. io.SetParentCtx(Engine.Context)
  182. } else {
  183. io.Debug("warp context")
  184. io.SetParentCtx(io.Context)
  185. }
  186. defer func() {
  187. if err == nil {
  188. specific.OnEvent(specific)
  189. }
  190. }()
  191. if !isSubscribe {
  192. puber := iPub.GetPublisher()
  193. conf := puber.Config
  194. io.Info("publish", zap.String("ptr", fmt.Sprintf("%p", iPub)))
  195. s.pubLocker.Lock()
  196. defer s.pubLocker.Unlock()
  197. if config.Global.EnableAuth {
  198. onAuthPub := OnAuthPub
  199. if auth, ok := specific.(AuthPub); ok {
  200. onAuthPub = auth.OnAuth
  201. }
  202. if onAuthPub != nil {
  203. authPromise := util.NewPromise(iPub)
  204. if err = onAuthPub(authPromise); err == nil {
  205. err = authPromise.Await()
  206. }
  207. if err != nil {
  208. return err
  209. }
  210. } else if conf.Key != "" {
  211. if !io.auth(conf.Key, io.Args.Get(conf.SecretArgName), io.Args.Get(conf.ExpireArgName)) {
  212. return ErrAuth
  213. }
  214. }
  215. }
  216. if promise := util.NewPromise(iPub); s.Receive(promise) {
  217. err = promise.Await()
  218. return err
  219. }
  220. } else {
  221. conf := iSub.GetSubscriber().Config
  222. io.Info("subscribe")
  223. if create {
  224. EventBus <- InvitePublish{CreateEvent(s.Path)} // 通知发布者按需拉流
  225. }
  226. if config.Global.EnableAuth && !conf.Internal {
  227. onAuthSub := OnAuthSub
  228. if auth, ok := specific.(AuthSub); ok {
  229. onAuthSub = auth.OnAuth
  230. }
  231. if onAuthSub != nil {
  232. authPromise := util.NewPromise(iSub)
  233. if err = onAuthSub(authPromise); err == nil {
  234. err = authPromise.Await()
  235. }
  236. if err != nil {
  237. return err
  238. }
  239. } else if conf.Key != "" {
  240. if !io.auth(conf.Key, io.Args.Get(conf.SecretArgName), io.Args.Get(conf.ExpireArgName)) {
  241. return ErrAuth
  242. }
  243. }
  244. }
  245. if promise := util.NewPromise(iSub); s.Receive(promise) {
  246. err = promise.Await()
  247. return err
  248. }
  249. }
  250. return ErrStreamIsClosed
  251. }
  252. // ClientIO 作为Client角色(Puller,Pusher)的公共结构体
  253. type ClientIO[C ClientConfig] struct {
  254. Config *C
  255. StreamPath string // 本地流标识
  256. RemoteURL string // 远程服务器地址(用于推拉)
  257. ReConnectCount int //重连次数
  258. }
  259. func (c *ClientIO[C]) init(streamPath string, url string, conf *C) {
  260. c.Config = conf
  261. c.StreamPath = streamPath
  262. c.RemoteURL = url
  263. }