streams_map_test.go 15 KB


  1. package quic
  2. import (
  3. "errors"
  4. "fmt"
  5. "math"
  6. "net"
  7. "github.com/golang/mock/gomock"
  8. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  9. "github.com/lucas-clemente/quic-go/internal/handshake"
  10. "github.com/lucas-clemente/quic-go/internal/mocks"
  11. "github.com/lucas-clemente/quic-go/internal/protocol"
  12. "github.com/lucas-clemente/quic-go/internal/qerr"
  13. "github.com/lucas-clemente/quic-go/internal/wire"
  14. . "github.com/onsi/ginkgo"
  15. . "github.com/onsi/gomega"
  16. )
  17. type streamMapping struct {
  18. firstIncomingBidiStream protocol.StreamID
  19. firstIncomingUniStream protocol.StreamID
  20. firstOutgoingBidiStream protocol.StreamID
  21. firstOutgoingUniStream protocol.StreamID
  22. }
  23. func expectTooManyStreamsError(err error) {
  24. ExpectWithOffset(1, err).To(HaveOccurred())
  25. ExpectWithOffset(1, err.Error()).To(Equal(errTooManyOpenStreams.Error()))
  26. nerr, ok := err.(net.Error)
  27. ExpectWithOffset(1, ok).To(BeTrue())
  28. ExpectWithOffset(1, nerr.Temporary()).To(BeTrue())
  29. ExpectWithOffset(1, nerr.Timeout()).To(BeFalse())
  30. }
  31. var _ = Describe("Streams Map", func() {
  32. newFlowController := func(protocol.StreamID) flowcontrol.StreamFlowController {
  33. return mocks.NewMockStreamFlowController(mockCtrl)
  34. }
  35. serverStreamMapping := streamMapping{
  36. firstIncomingBidiStream: 0,
  37. firstOutgoingBidiStream: 1,
  38. firstIncomingUniStream: 2,
  39. firstOutgoingUniStream: 3,
  40. }
  41. clientStreamMapping := streamMapping{
  42. firstIncomingBidiStream: 1,
  43. firstOutgoingBidiStream: 0,
  44. firstIncomingUniStream: 3,
  45. firstOutgoingUniStream: 2,
  46. }
  47. for _, p := range []protocol.Perspective{protocol.PerspectiveServer, protocol.PerspectiveClient} {
  48. perspective := p
  49. var ids streamMapping
  50. if perspective == protocol.PerspectiveClient {
  51. ids = clientStreamMapping
  52. } else {
  53. ids = serverStreamMapping
  54. }
  55. Context(perspective.String(), func() {
  56. var (
  57. m *streamsMap
  58. mockSender *MockStreamSender
  59. )
  60. const (
  61. maxBidiStreams = 111
  62. maxUniStreams = 222
  63. )
  64. allowUnlimitedStreams := func() {
  65. m.UpdateLimits(&handshake.TransportParameters{
  66. MaxBidiStreams: math.MaxUint16,
  67. MaxUniStreams: math.MaxUint16,
  68. })
  69. }
  70. BeforeEach(func() {
  71. mockSender = NewMockStreamSender(mockCtrl)
  72. m = newStreamsMap(mockSender, newFlowController, maxBidiStreams, maxUniStreams, perspective, protocol.VersionWhatever).(*streamsMap)
  73. })
  74. Context("opening", func() {
  75. It("opens bidirectional streams", func() {
  76. allowUnlimitedStreams()
  77. str, err := m.OpenStream()
  78. Expect(err).ToNot(HaveOccurred())
  79. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  80. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  81. str, err = m.OpenStream()
  82. Expect(err).ToNot(HaveOccurred())
  83. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  84. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream + 4))
  85. })
  86. It("opens unidirectional streams", func() {
  87. allowUnlimitedStreams()
  88. str, err := m.OpenUniStream()
  89. Expect(err).ToNot(HaveOccurred())
  90. Expect(str).To(BeAssignableToTypeOf(&sendStream{}))
  91. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  92. str, err = m.OpenUniStream()
  93. Expect(err).ToNot(HaveOccurred())
  94. Expect(str).To(BeAssignableToTypeOf(&sendStream{}))
  95. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream + 4))
  96. })
  97. })
  98. Context("accepting", func() {
  99. It("accepts bidirectional streams", func() {
  100. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream)
  101. Expect(err).ToNot(HaveOccurred())
  102. str, err := m.AcceptStream()
  103. Expect(err).ToNot(HaveOccurred())
  104. Expect(str).To(BeAssignableToTypeOf(&stream{}))
  105. Expect(str.StreamID()).To(Equal(ids.firstIncomingBidiStream))
  106. })
  107. It("accepts unidirectional streams", func() {
  108. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream)
  109. Expect(err).ToNot(HaveOccurred())
  110. str, err := m.AcceptUniStream()
  111. Expect(err).ToNot(HaveOccurred())
  112. Expect(str).To(BeAssignableToTypeOf(&receiveStream{}))
  113. Expect(str.StreamID()).To(Equal(ids.firstIncomingUniStream))
  114. })
  115. })
  116. Context("deleting", func() {
  117. BeforeEach(func() {
  118. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  119. allowUnlimitedStreams()
  120. })
  121. It("deletes outgoing bidirectional streams", func() {
  122. id := ids.firstOutgoingBidiStream
  123. str, err := m.OpenStream()
  124. Expect(err).ToNot(HaveOccurred())
  125. Expect(str.StreamID()).To(Equal(id))
  126. Expect(m.DeleteStream(id)).To(Succeed())
  127. dstr, err := m.GetOrOpenSendStream(id)
  128. Expect(err).ToNot(HaveOccurred())
  129. Expect(dstr).To(BeNil())
  130. })
  131. It("deletes incoming bidirectional streams", func() {
  132. id := ids.firstIncomingBidiStream
  133. str, err := m.GetOrOpenReceiveStream(id)
  134. Expect(err).ToNot(HaveOccurred())
  135. Expect(str.StreamID()).To(Equal(id))
  136. Expect(m.DeleteStream(id)).To(Succeed())
  137. dstr, err := m.GetOrOpenReceiveStream(id)
  138. Expect(err).ToNot(HaveOccurred())
  139. Expect(dstr).To(BeNil())
  140. })
  141. It("accepts bidirectional streams after they have been deleted", func() {
  142. id := ids.firstIncomingBidiStream
  143. _, err := m.GetOrOpenReceiveStream(id)
  144. Expect(err).ToNot(HaveOccurred())
  145. Expect(m.DeleteStream(id)).To(Succeed())
  146. str, err := m.AcceptStream()
  147. Expect(err).ToNot(HaveOccurred())
  148. Expect(str).ToNot(BeNil())
  149. Expect(str.StreamID()).To(Equal(id))
  150. })
  151. It("deletes outgoing unidirectional streams", func() {
  152. id := ids.firstOutgoingUniStream
  153. str, err := m.OpenUniStream()
  154. Expect(err).ToNot(HaveOccurred())
  155. Expect(str.StreamID()).To(Equal(id))
  156. Expect(m.DeleteStream(id)).To(Succeed())
  157. dstr, err := m.GetOrOpenSendStream(id)
  158. Expect(err).ToNot(HaveOccurred())
  159. Expect(dstr).To(BeNil())
  160. })
  161. It("deletes incoming unidirectional streams", func() {
  162. id := ids.firstIncomingUniStream
  163. str, err := m.GetOrOpenReceiveStream(id)
  164. Expect(err).ToNot(HaveOccurred())
  165. Expect(str.StreamID()).To(Equal(id))
  166. Expect(m.DeleteStream(id)).To(Succeed())
  167. dstr, err := m.GetOrOpenReceiveStream(id)
  168. Expect(err).ToNot(HaveOccurred())
  169. Expect(dstr).To(BeNil())
  170. })
  171. It("accepts unirectional streams after they have been deleted", func() {
  172. id := ids.firstIncomingUniStream
  173. _, err := m.GetOrOpenReceiveStream(id)
  174. Expect(err).ToNot(HaveOccurred())
  175. Expect(m.DeleteStream(id)).To(Succeed())
  176. str, err := m.AcceptUniStream()
  177. Expect(err).ToNot(HaveOccurred())
  178. Expect(str).ToNot(BeNil())
  179. Expect(str.StreamID()).To(Equal(id))
  180. })
  181. })
  182. Context("getting streams", func() {
  183. BeforeEach(func() {
  184. allowUnlimitedStreams()
  185. })
  186. Context("send streams", func() {
  187. It("gets an outgoing bidirectional stream", func() {
  188. // need to open the stream ourselves first
  189. // the peer is not allowed to create a stream initiated by us
  190. _, err := m.OpenStream()
  191. Expect(err).ToNot(HaveOccurred())
  192. str, err := m.GetOrOpenSendStream(ids.firstOutgoingBidiStream)
  193. Expect(err).ToNot(HaveOccurred())
  194. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  195. })
  196. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  197. id := ids.firstOutgoingBidiStream + 5*4
  198. _, err := m.GetOrOpenSendStream(id)
  199. Expect(err).To(MatchError(qerr.Error(qerr.StreamStateError, fmt.Sprintf("peer attempted to open stream %d", id))))
  200. })
  201. It("gets an outgoing unidirectional stream", func() {
  202. // need to open the stream ourselves first
  203. // the peer is not allowed to create a stream initiated by us
  204. _, err := m.OpenUniStream()
  205. Expect(err).ToNot(HaveOccurred())
  206. str, err := m.GetOrOpenSendStream(ids.firstOutgoingUniStream)
  207. Expect(err).ToNot(HaveOccurred())
  208. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  209. })
  210. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  211. id := ids.firstOutgoingUniStream + 5*4
  212. _, err := m.GetOrOpenSendStream(id)
  213. Expect(err).To(MatchError(qerr.Error(qerr.StreamStateError, fmt.Sprintf("peer attempted to open stream %d", id))))
  214. })
  215. It("gets an incoming bidirectional stream", func() {
  216. id := ids.firstIncomingBidiStream + 4*7
  217. str, err := m.GetOrOpenSendStream(id)
  218. Expect(err).ToNot(HaveOccurred())
  219. Expect(str.StreamID()).To(Equal(id))
  220. })
  221. It("errors when trying to get an incoming unidirectional stream", func() {
  222. id := ids.firstIncomingUniStream
  223. _, err := m.GetOrOpenSendStream(id)
  224. Expect(err).To(MatchError(fmt.Errorf("peer attempted to open send stream %d", id)))
  225. })
  226. })
  227. Context("receive streams", func() {
  228. It("gets an outgoing bidirectional stream", func() {
  229. // need to open the stream ourselves first
  230. // the peer is not allowed to create a stream initiated by us
  231. _, err := m.OpenStream()
  232. Expect(err).ToNot(HaveOccurred())
  233. str, err := m.GetOrOpenReceiveStream(ids.firstOutgoingBidiStream)
  234. Expect(err).ToNot(HaveOccurred())
  235. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  236. })
  237. It("errors when the peer tries to open a higher outgoing bidirectional stream", func() {
  238. id := ids.firstOutgoingBidiStream + 5*4
  239. _, err := m.GetOrOpenReceiveStream(id)
  240. Expect(err).To(MatchError(qerr.Error(qerr.StreamStateError, fmt.Sprintf("peer attempted to open stream %d", id))))
  241. })
  242. It("gets an incoming bidirectional stream", func() {
  243. id := ids.firstIncomingBidiStream + 4*7
  244. str, err := m.GetOrOpenReceiveStream(id)
  245. Expect(err).ToNot(HaveOccurred())
  246. Expect(str.StreamID()).To(Equal(id))
  247. })
  248. It("gets an incoming unidirectional stream", func() {
  249. id := ids.firstIncomingUniStream + 4*10
  250. str, err := m.GetOrOpenReceiveStream(id)
  251. Expect(err).ToNot(HaveOccurred())
  252. Expect(str.StreamID()).To(Equal(id))
  253. })
  254. It("errors when trying to get an outgoing unidirectional stream", func() {
  255. id := ids.firstOutgoingUniStream
  256. _, err := m.GetOrOpenReceiveStream(id)
  257. Expect(err).To(MatchError(fmt.Errorf("peer attempted to open receive stream %d", id)))
  258. })
  259. })
  260. })
  261. Context("updating stream ID limits", func() {
  262. It("processes the parameter for outgoing streams, as a server", func() {
  263. mockSender.EXPECT().queueControlFrame(gomock.Any())
  264. m.perspective = protocol.PerspectiveServer
  265. _, err := m.OpenStream()
  266. expectTooManyStreamsError(err)
  267. Expect(m.UpdateLimits(&handshake.TransportParameters{
  268. MaxBidiStreams: 5,
  269. MaxUniStreams: 5,
  270. })).To(Succeed())
  271. Expect(m.outgoingBidiStreams.maxStream).To(Equal(protocol.StreamID(17)))
  272. Expect(m.outgoingUniStreams.maxStream).To(Equal(protocol.StreamID(19)))
  273. })
  274. It("processes the parameter for outgoing streams, as a client", func() {
  275. mockSender.EXPECT().queueControlFrame(gomock.Any())
  276. m.perspective = protocol.PerspectiveClient
  277. _, err := m.OpenUniStream()
  278. expectTooManyStreamsError(err)
  279. Expect(m.UpdateLimits(&handshake.TransportParameters{
  280. MaxBidiStreams: 5,
  281. MaxUniStreams: 5,
  282. })).To(Succeed())
  283. Expect(m.outgoingBidiStreams.maxStream).To(Equal(protocol.StreamID(16)))
  284. Expect(m.outgoingUniStreams.maxStream).To(Equal(protocol.StreamID(18)))
  285. })
  286. It("rejects parameters with too large unidirectional stream counts", func() {
  287. Expect(m.UpdateLimits(&handshake.TransportParameters{
  288. MaxUniStreams: protocol.MaxStreamCount + 1,
  289. })).To(MatchError(qerr.StreamLimitError))
  290. })
  291. It("rejects parameters with too large unidirectional stream counts", func() {
  292. Expect(m.UpdateLimits(&handshake.TransportParameters{
  293. MaxBidiStreams: protocol.MaxStreamCount + 1,
  294. })).To(MatchError(qerr.StreamLimitError))
  295. })
  296. })
  297. Context("handling MAX_STREAMS frames", func() {
  298. BeforeEach(func() {
  299. mockSender.EXPECT().queueControlFrame(gomock.Any()).AnyTimes()
  300. })
  301. It("processes IDs for outgoing bidirectional streams", func() {
  302. _, err := m.OpenStream()
  303. expectTooManyStreamsError(err)
  304. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  305. Type: protocol.StreamTypeBidi,
  306. MaxStreams: 1,
  307. })).To(Succeed())
  308. str, err := m.OpenStream()
  309. Expect(err).ToNot(HaveOccurred())
  310. Expect(str.StreamID()).To(Equal(ids.firstOutgoingBidiStream))
  311. _, err = m.OpenStream()
  312. expectTooManyStreamsError(err)
  313. })
  314. It("processes IDs for outgoing unidirectional streams", func() {
  315. _, err := m.OpenUniStream()
  316. expectTooManyStreamsError(err)
  317. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  318. Type: protocol.StreamTypeUni,
  319. MaxStreams: 1,
  320. })).To(Succeed())
  321. str, err := m.OpenUniStream()
  322. Expect(err).ToNot(HaveOccurred())
  323. Expect(str.StreamID()).To(Equal(ids.firstOutgoingUniStream))
  324. _, err = m.OpenUniStream()
  325. expectTooManyStreamsError(err)
  326. })
  327. It("rejects MAX_STREAMS frames with too large values", func() {
  328. Expect(m.HandleMaxStreamsFrame(&wire.MaxStreamsFrame{
  329. Type: protocol.StreamTypeBidi,
  330. MaxStreams: protocol.MaxStreamCount + 1,
  331. })).To(MatchError(qerr.StreamLimitError))
  332. })
  333. })
  334. Context("sending MAX_STREAMS frames", func() {
  335. It("sends a MAX_STREAMS frame for bidirectional streams", func() {
  336. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingBidiStream)
  337. Expect(err).ToNot(HaveOccurred())
  338. _, err = m.AcceptStream()
  339. Expect(err).ToNot(HaveOccurred())
  340. mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{
  341. Type: protocol.StreamTypeBidi,
  342. MaxStreams: maxBidiStreams + 1,
  343. })
  344. Expect(m.DeleteStream(ids.firstIncomingBidiStream)).To(Succeed())
  345. })
  346. It("sends a MAX_STREAMS frame for unidirectional streams", func() {
  347. _, err := m.GetOrOpenReceiveStream(ids.firstIncomingUniStream)
  348. Expect(err).ToNot(HaveOccurred())
  349. _, err = m.AcceptUniStream()
  350. Expect(err).ToNot(HaveOccurred())
  351. mockSender.EXPECT().queueControlFrame(&wire.MaxStreamsFrame{
  352. Type: protocol.StreamTypeUni,
  353. MaxStreams: maxUniStreams + 1,
  354. })
  355. Expect(m.DeleteStream(ids.firstIncomingUniStream)).To(Succeed())
  356. })
  357. })
  358. It("closes", func() {
  359. testErr := errors.New("test error")
  360. m.CloseWithError(testErr)
  361. _, err := m.OpenStream()
  362. Expect(err).To(HaveOccurred())
  363. Expect(err.Error()).To(Equal(testErr.Error()))
  364. _, err = m.OpenUniStream()
  365. Expect(err).To(HaveOccurred())
  366. Expect(err.Error()).To(Equal(testErr.Error()))
  367. _, err = m.AcceptStream()
  368. Expect(err).To(HaveOccurred())
  369. Expect(err.Error()).To(Equal(testErr.Error()))
  370. _, err = m.AcceptUniStream()
  371. Expect(err).To(HaveOccurred())
  372. Expect(err.Error()).To(Equal(testErr.Error()))
  373. })
  374. })
  375. }
  376. })