safe_chan.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package util
  2. import (
  3. "context"
  4. "errors"
  5. "math"
  6. "sync/atomic"
  7. "time"
  8. )
  9. // SafeChan安全的channel,可以防止close后被写入的问题
  10. type SafeChan[T any] struct {
  11. C chan T
  12. senders int32 //当前发送者数量
  13. }
  14. func (sc *SafeChan[T]) Init(n int) {
  15. sc.C = make(chan T, n)
  16. }
  17. // Close senders为0的时候可以安全关闭,否则不能关闭
  18. func (sc *SafeChan[T]) Close() bool {
  19. if atomic.CompareAndSwapInt32(&sc.senders, 0, math.MinInt32) {
  20. close(sc.C)
  21. return true
  22. }
  23. return false
  24. }
  25. func (sc *SafeChan[T]) Send(v T) bool {
  26. // senders增加后为正数说明没有被channel没有被关闭,可以发送数据
  27. if atomic.AddInt32(&sc.senders, 1) > 0 {
  28. sc.C <- v
  29. atomic.AddInt32(&sc.senders, -1)
  30. return true
  31. }
  32. return false
  33. }
  34. func (sc *SafeChan[T]) IsClosed() bool {
  35. return atomic.LoadInt32(&sc.senders) < 0
  36. }
  37. func (sc *SafeChan[T]) IsEmpty() bool {
  38. return atomic.LoadInt32(&sc.senders) == 0
  39. }
  40. func (sc *SafeChan[T]) IsFull() bool {
  41. return atomic.LoadInt32(&sc.senders) > 0
  42. }
  43. var errResolved = errors.New("resolved")
  44. type Promise[S any] struct {
  45. context.Context
  46. context.CancelCauseFunc
  47. context.CancelFunc
  48. Value S
  49. }
  50. func (r *Promise[S]) Resolve() {
  51. r.CancelCauseFunc(errResolved)
  52. }
  53. func (r *Promise[S]) Reject(err error) {
  54. r.CancelCauseFunc(err)
  55. }
  56. func (p *Promise[S]) Await() (err error) {
  57. <-p.Done()
  58. err = context.Cause(p.Context)
  59. if err == errResolved {
  60. err = nil
  61. }
  62. p.CancelFunc()
  63. return
  64. }
  65. func (p *Promise[S]) Then(resolved func(S), rejected func(error)) {
  66. go func() {
  67. if err := p.Await(); err == nil {
  68. resolved(p.Value)
  69. } else {
  70. rejected(err)
  71. }
  72. }()
  73. }
  74. func NewPromise[S any](value S) *Promise[S] {
  75. ctx0, cancel0 := context.WithTimeout(context.Background(), time.Second*10)
  76. ctx, cancel := context.WithCancelCause(ctx0)
  77. return &Promise[S]{
  78. Value: value,
  79. Context: ctx,
  80. CancelCauseFunc: cancel,
  81. CancelFunc: cancel0,
  82. }
  83. }