events.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package engine
  2. import (
  3. "reflect"
  4. "time"
  5. "m7s.live/engine/v4/common"
  6. )
  7. type Event[T any] struct {
  8. Time time.Time
  9. Target T `json:"-" yaml:"-"`
  10. }
  11. func CreateEvent[T any](target T) (event Event[T]) {
  12. event.Time = time.Now()
  13. event.Target = target
  14. return
  15. }
  16. // PulseEvent 心跳事件
  17. type PulseEvent struct {
  18. Event[struct{}]
  19. }
  20. type StreamEvent struct {
  21. Event[*Stream]
  22. }
  23. // StateEvent 状态机事件
  24. type StateEvent struct {
  25. StreamEvent
  26. Action StreamAction
  27. From StreamState
  28. }
  29. // ErrorEvent 错误事件
  30. type ErrorEvent struct {
  31. Event[any]
  32. Error error
  33. }
  34. func (se StateEvent) Next() (next StreamState, ok bool) {
  35. next, ok = StreamFSM[se.From][se.Action]
  36. return
  37. }
  38. type SEwaitPublish struct {
  39. StateEvent
  40. Publisher IPublisher
  41. }
  42. type SEpublish struct {
  43. StateEvent
  44. }
  45. type SEtrackAvaliable struct {
  46. StateEvent
  47. }
  48. type SErepublish struct {
  49. StateEvent
  50. }
  51. type SEwaitClose struct {
  52. StateEvent
  53. }
  54. type SEclose struct {
  55. StateEvent
  56. }
  57. type SEcreate struct {
  58. StreamEvent
  59. }
  60. type SEKick struct {
  61. Event[struct{}]
  62. }
  63. type UnsubscribeEvent struct {
  64. Event[ISubscriber]
  65. }
  66. type AddTrackEvent struct {
  67. Event[common.Track]
  68. }
  69. // InvitePublishEvent 邀请推流事件(按需拉流)
  70. type InvitePublish struct {
  71. Event[string]
  72. }
  73. func TryInvitePublish(streamPath string) {
  74. s := Streams.Get(streamPath)
  75. if s == nil || s.Publisher == nil {
  76. EventBus <- InvitePublish{Event: CreateEvent(streamPath)}
  77. }
  78. }
  79. // InviteTrackEvent 邀请推送指定 Track 事件(转码需要)
  80. type InviteTrackEvent struct {
  81. Event[string]
  82. ISubscriber
  83. }
  84. func InviteTrack(name string, suber ISubscriber) {
  85. EventBus <- InviteTrackEvent{Event: CreateEvent(name), ISubscriber: suber}
  86. }
  87. var handlers = make(map[reflect.Type][]any)
  88. func ListenEvent[T any](handler func(event T)) {
  89. t := reflect.TypeOf(handler).In(0)
  90. handlers[t] = append(handlers[t], handler)
  91. }
  92. func EmitEvent[T any](event T) {
  93. t := reflect.TypeOf(event)
  94. for _, handler := range handlers[t] {
  95. handler.(func(event T))(event)
  96. }
  97. }