// This file was automatically generated by genny. // Any changes will be lost if this file is regenerated. // see https://github.com/cheekybits/genny package quic import ( "fmt" "sync" "github.com/lucas-clemente/quic-go/internal/protocol" "github.com/lucas-clemente/quic-go/internal/qerr" "github.com/lucas-clemente/quic-go/internal/wire" ) type outgoingBidiStreamsMap struct { mutex sync.RWMutex cond sync.Cond streams map[protocol.StreamID]streamI nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync) maxStream protocol.StreamID // the maximum stream ID we're allowed to open maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0) blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream newStream func(protocol.StreamID) streamI queueStreamIDBlocked func(*wire.StreamsBlockedFrame) closeErr error } func newOutgoingBidiStreamsMap( nextStream protocol.StreamID, newStream func(protocol.StreamID) streamI, queueControlFrame func(wire.Frame), ) *outgoingBidiStreamsMap { m := &outgoingBidiStreamsMap{ streams: make(map[protocol.StreamID]streamI), nextStream: nextStream, newStream: newStream, queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) }, } m.cond.L = &m.mutex return m } func (m *outgoingBidiStreamsMap) OpenStream() (streamI, error) { m.mutex.Lock() defer m.mutex.Unlock() if m.closeErr != nil { return nil, m.closeErr } str, err := m.openStreamImpl() if err != nil { return nil, streamOpenErr{err} } return str, nil } func (m *outgoingBidiStreamsMap) OpenStreamSync() (streamI, error) { m.mutex.Lock() defer m.mutex.Unlock() for { if m.closeErr != nil { return nil, m.closeErr } str, err := m.openStreamImpl() if err == nil { return str, nil } if err != nil && err != errTooManyOpenStreams { return nil, streamOpenErr{err} } m.cond.Wait() } } func (m *outgoingBidiStreamsMap) openStreamImpl() (streamI, error) { if !m.maxStreamSet || m.nextStream > m.maxStream { if !m.blockedSent { if m.maxStreamSet { m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ Type: protocol.StreamTypeBidi, StreamLimit: m.maxStream.StreamNum(), }) } else { m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{ Type: protocol.StreamTypeBidi, StreamLimit: 0, }) } m.blockedSent = true } return nil, errTooManyOpenStreams } s := m.newStream(m.nextStream) m.streams[m.nextStream] = s m.nextStream += 4 return s, nil } func (m *outgoingBidiStreamsMap) GetStream(id protocol.StreamID) (streamI, error) { m.mutex.RLock() if id >= m.nextStream { m.mutex.RUnlock() return nil, qerr.Error(qerr.StreamStateError, fmt.Sprintf("peer attempted to open stream %d", id)) } s := m.streams[id] m.mutex.RUnlock() return s, nil } func (m *outgoingBidiStreamsMap) DeleteStream(id protocol.StreamID) error { m.mutex.Lock() defer m.mutex.Unlock() if _, ok := m.streams[id]; !ok { return fmt.Errorf("Tried to delete unknown stream %d", id) } delete(m.streams, id) return nil } func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) { m.mutex.Lock() if !m.maxStreamSet || id > m.maxStream { m.maxStream = id m.maxStreamSet = true m.blockedSent = false m.cond.Broadcast() } m.mutex.Unlock() } func (m *outgoingBidiStreamsMap) CloseWithError(err error) { m.mutex.Lock() m.closeErr = err for _, str := range m.streams { str.closeForShutdown(err) } m.cond.Broadcast() m.mutex.Unlock() }