streams_map_outgoing_bidi.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. // This file was automatically generated by genny.
  2. // Any changes will be lost if this file is regenerated.
  3. // see https://github.com/cheekybits/genny
  4. package quic
  5. import (
  6. "fmt"
  7. "sync"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/qerr"
  10. "github.com/lucas-clemente/quic-go/internal/wire"
  11. )
  12. type outgoingBidiStreamsMap struct {
  13. mutex sync.RWMutex
  14. cond sync.Cond
  15. streams map[protocol.StreamID]streamI
  16. nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
  17. maxStream protocol.StreamID // the maximum stream ID we're allowed to open
  18. maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0)
  19. blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream
  20. newStream func(protocol.StreamID) streamI
  21. queueStreamIDBlocked func(*wire.StreamsBlockedFrame)
  22. closeErr error
  23. }
  24. func newOutgoingBidiStreamsMap(
  25. nextStream protocol.StreamID,
  26. newStream func(protocol.StreamID) streamI,
  27. queueControlFrame func(wire.Frame),
  28. ) *outgoingBidiStreamsMap {
  29. m := &outgoingBidiStreamsMap{
  30. streams: make(map[protocol.StreamID]streamI),
  31. nextStream: nextStream,
  32. newStream: newStream,
  33. queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) },
  34. }
  35. m.cond.L = &m.mutex
  36. return m
  37. }
  38. func (m *outgoingBidiStreamsMap) OpenStream() (streamI, error) {
  39. m.mutex.Lock()
  40. defer m.mutex.Unlock()
  41. if m.closeErr != nil {
  42. return nil, m.closeErr
  43. }
  44. str, err := m.openStreamImpl()
  45. if err != nil {
  46. return nil, streamOpenErr{err}
  47. }
  48. return str, nil
  49. }
  50. func (m *outgoingBidiStreamsMap) OpenStreamSync() (streamI, error) {
  51. m.mutex.Lock()
  52. defer m.mutex.Unlock()
  53. for {
  54. if m.closeErr != nil {
  55. return nil, m.closeErr
  56. }
  57. str, err := m.openStreamImpl()
  58. if err == nil {
  59. return str, nil
  60. }
  61. if err != nil && err != errTooManyOpenStreams {
  62. return nil, streamOpenErr{err}
  63. }
  64. m.cond.Wait()
  65. }
  66. }
  67. func (m *outgoingBidiStreamsMap) openStreamImpl() (streamI, error) {
  68. if !m.maxStreamSet || m.nextStream > m.maxStream {
  69. if !m.blockedSent {
  70. if m.maxStreamSet {
  71. m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
  72. Type: protocol.StreamTypeBidi,
  73. StreamLimit: m.maxStream.StreamNum(),
  74. })
  75. } else {
  76. m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
  77. Type: protocol.StreamTypeBidi,
  78. StreamLimit: 0,
  79. })
  80. }
  81. m.blockedSent = true
  82. }
  83. return nil, errTooManyOpenStreams
  84. }
  85. s := m.newStream(m.nextStream)
  86. m.streams[m.nextStream] = s
  87. m.nextStream += 4
  88. return s, nil
  89. }
  90. func (m *outgoingBidiStreamsMap) GetStream(id protocol.StreamID) (streamI, error) {
  91. m.mutex.RLock()
  92. if id >= m.nextStream {
  93. m.mutex.RUnlock()
  94. return nil, qerr.Error(qerr.StreamStateError, fmt.Sprintf("peer attempted to open stream %d", id))
  95. }
  96. s := m.streams[id]
  97. m.mutex.RUnlock()
  98. return s, nil
  99. }
  100. func (m *outgoingBidiStreamsMap) DeleteStream(id protocol.StreamID) error {
  101. m.mutex.Lock()
  102. defer m.mutex.Unlock()
  103. if _, ok := m.streams[id]; !ok {
  104. return fmt.Errorf("Tried to delete unknown stream %d", id)
  105. }
  106. delete(m.streams, id)
  107. return nil
  108. }
  109. func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) {
  110. m.mutex.Lock()
  111. if !m.maxStreamSet || id > m.maxStream {
  112. m.maxStream = id
  113. m.maxStreamSet = true
  114. m.blockedSent = false
  115. m.cond.Broadcast()
  116. }
  117. m.mutex.Unlock()
  118. }
  119. func (m *outgoingBidiStreamsMap) CloseWithError(err error) {
  120. m.mutex.Lock()
  121. m.closeErr = err
  122. for _, str := range m.streams {
  123. str.closeForShutdown(err)
  124. }
  125. m.cond.Broadcast()
  126. m.mutex.Unlock()
  127. }