receive_stream_test.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625
  1. package quic
  2. import (
  3. "errors"
  4. "io"
  5. "runtime"
  6. "time"
  7. "github.com/golang/mock/gomock"
  8. "github.com/lucas-clemente/quic-go/internal/mocks"
  9. "github.com/lucas-clemente/quic-go/internal/protocol"
  10. "github.com/lucas-clemente/quic-go/internal/wire"
  11. . "github.com/onsi/ginkgo"
  12. . "github.com/onsi/gomega"
  13. "github.com/onsi/gomega/gbytes"
  14. )
  15. var _ = Describe("Receive Stream", func() {
  16. const streamID protocol.StreamID = 1337
  17. var (
  18. str *receiveStream
  19. strWithTimeout io.Reader // str wrapped with gbytes.TimeoutReader
  20. mockFC *mocks.MockStreamFlowController
  21. mockSender *MockStreamSender
  22. )
  23. BeforeEach(func() {
  24. mockSender = NewMockStreamSender(mockCtrl)
  25. mockFC = mocks.NewMockStreamFlowController(mockCtrl)
  26. str = newReceiveStream(streamID, mockSender, mockFC, protocol.VersionWhatever)
  27. timeout := scaleDuration(250 * time.Millisecond)
  28. strWithTimeout = gbytes.TimeoutReader(str, timeout)
  29. })
  30. It("gets stream id", func() {
  31. Expect(str.StreamID()).To(Equal(protocol.StreamID(1337)))
  32. })
  33. Context("reading", func() {
  34. It("reads a single STREAM frame", func() {
  35. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  36. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
  37. frame := wire.StreamFrame{
  38. Offset: 0,
  39. Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
  40. }
  41. err := str.handleStreamFrame(&frame)
  42. Expect(err).ToNot(HaveOccurred())
  43. b := make([]byte, 4)
  44. n, err := strWithTimeout.Read(b)
  45. Expect(err).ToNot(HaveOccurred())
  46. Expect(n).To(Equal(4))
  47. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  48. })
  49. It("reads a single STREAM frame in multiple goes", func() {
  50. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  51. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  52. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  53. frame := wire.StreamFrame{
  54. Offset: 0,
  55. Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
  56. }
  57. err := str.handleStreamFrame(&frame)
  58. Expect(err).ToNot(HaveOccurred())
  59. b := make([]byte, 2)
  60. n, err := strWithTimeout.Read(b)
  61. Expect(err).ToNot(HaveOccurred())
  62. Expect(n).To(Equal(2))
  63. Expect(b).To(Equal([]byte{0xDE, 0xAD}))
  64. n, err = strWithTimeout.Read(b)
  65. Expect(err).ToNot(HaveOccurred())
  66. Expect(n).To(Equal(2))
  67. Expect(b).To(Equal([]byte{0xBE, 0xEF}))
  68. })
  69. It("reads all data available", func() {
  70. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  71. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  72. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  73. frame1 := wire.StreamFrame{
  74. Offset: 0,
  75. Data: []byte{0xDE, 0xAD},
  76. }
  77. frame2 := wire.StreamFrame{
  78. Offset: 2,
  79. Data: []byte{0xBE, 0xEF},
  80. }
  81. err := str.handleStreamFrame(&frame1)
  82. Expect(err).ToNot(HaveOccurred())
  83. err = str.handleStreamFrame(&frame2)
  84. Expect(err).ToNot(HaveOccurred())
  85. b := make([]byte, 6)
  86. n, err := strWithTimeout.Read(b)
  87. Expect(err).ToNot(HaveOccurred())
  88. Expect(n).To(Equal(4))
  89. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00}))
  90. })
  91. It("assembles multiple STREAM frames", func() {
  92. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  93. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  94. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  95. frame1 := wire.StreamFrame{
  96. Offset: 0,
  97. Data: []byte{0xDE, 0xAD},
  98. }
  99. frame2 := wire.StreamFrame{
  100. Offset: 2,
  101. Data: []byte{0xBE, 0xEF},
  102. }
  103. err := str.handleStreamFrame(&frame1)
  104. Expect(err).ToNot(HaveOccurred())
  105. err = str.handleStreamFrame(&frame2)
  106. Expect(err).ToNot(HaveOccurred())
  107. b := make([]byte, 4)
  108. n, err := strWithTimeout.Read(b)
  109. Expect(err).ToNot(HaveOccurred())
  110. Expect(n).To(Equal(4))
  111. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  112. })
  113. It("waits until data is available", func() {
  114. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  115. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  116. go func() {
  117. defer GinkgoRecover()
  118. frame := wire.StreamFrame{Data: []byte{0xDE, 0xAD}}
  119. time.Sleep(10 * time.Millisecond)
  120. err := str.handleStreamFrame(&frame)
  121. Expect(err).ToNot(HaveOccurred())
  122. }()
  123. b := make([]byte, 2)
  124. n, err := strWithTimeout.Read(b)
  125. Expect(err).ToNot(HaveOccurred())
  126. Expect(n).To(Equal(2))
  127. })
  128. It("handles STREAM frames in wrong order", func() {
  129. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  130. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  131. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  132. frame1 := wire.StreamFrame{
  133. Offset: 2,
  134. Data: []byte{0xBE, 0xEF},
  135. }
  136. frame2 := wire.StreamFrame{
  137. Offset: 0,
  138. Data: []byte{0xDE, 0xAD},
  139. }
  140. err := str.handleStreamFrame(&frame1)
  141. Expect(err).ToNot(HaveOccurred())
  142. err = str.handleStreamFrame(&frame2)
  143. Expect(err).ToNot(HaveOccurred())
  144. b := make([]byte, 4)
  145. n, err := strWithTimeout.Read(b)
  146. Expect(err).ToNot(HaveOccurred())
  147. Expect(n).To(Equal(4))
  148. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  149. })
  150. It("ignores duplicate STREAM frames", func() {
  151. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  152. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  153. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  154. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  155. frame1 := wire.StreamFrame{
  156. Offset: 0,
  157. Data: []byte{0xDE, 0xAD},
  158. }
  159. frame2 := wire.StreamFrame{
  160. Offset: 0,
  161. Data: []byte{0x13, 0x37},
  162. }
  163. frame3 := wire.StreamFrame{
  164. Offset: 2,
  165. Data: []byte{0xBE, 0xEF},
  166. }
  167. err := str.handleStreamFrame(&frame1)
  168. Expect(err).ToNot(HaveOccurred())
  169. err = str.handleStreamFrame(&frame2)
  170. Expect(err).ToNot(HaveOccurred())
  171. err = str.handleStreamFrame(&frame3)
  172. Expect(err).ToNot(HaveOccurred())
  173. b := make([]byte, 4)
  174. n, err := strWithTimeout.Read(b)
  175. Expect(err).ToNot(HaveOccurred())
  176. Expect(n).To(Equal(4))
  177. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  178. })
  179. It("doesn't rejects a STREAM frames with an overlapping data range", func() {
  180. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
  181. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false)
  182. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  183. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
  184. frame1 := wire.StreamFrame{
  185. Offset: 0,
  186. Data: []byte("foob"),
  187. }
  188. frame2 := wire.StreamFrame{
  189. Offset: 2,
  190. Data: []byte("obar"),
  191. }
  192. err := str.handleStreamFrame(&frame1)
  193. Expect(err).ToNot(HaveOccurred())
  194. err = str.handleStreamFrame(&frame2)
  195. Expect(err).ToNot(HaveOccurred())
  196. b := make([]byte, 6)
  197. n, err := strWithTimeout.Read(b)
  198. Expect(err).ToNot(HaveOccurred())
  199. Expect(n).To(Equal(6))
  200. Expect(b).To(Equal([]byte("foobar")))
  201. })
  202. Context("deadlines", func() {
  203. It("the deadline error has the right net.Error properties", func() {
  204. Expect(errDeadline.Temporary()).To(BeTrue())
  205. Expect(errDeadline.Timeout()).To(BeTrue())
  206. Expect(errDeadline).To(MatchError("deadline exceeded"))
  207. })
  208. It("returns an error when Read is called after the deadline", func() {
  209. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false).AnyTimes()
  210. f := &wire.StreamFrame{Data: []byte("foobar")}
  211. err := str.handleStreamFrame(f)
  212. Expect(err).ToNot(HaveOccurred())
  213. str.SetReadDeadline(time.Now().Add(-time.Second))
  214. b := make([]byte, 6)
  215. n, err := strWithTimeout.Read(b)
  216. Expect(err).To(MatchError(errDeadline))
  217. Expect(n).To(BeZero())
  218. })
  219. It("unblocks when the deadline is changed to the past", func() {
  220. str.SetReadDeadline(time.Now().Add(time.Hour))
  221. done := make(chan struct{})
  222. go func() {
  223. defer GinkgoRecover()
  224. _, err := str.Read(make([]byte, 6))
  225. Expect(err).To(MatchError(errDeadline))
  226. close(done)
  227. }()
  228. Consistently(done).ShouldNot(BeClosed())
  229. str.SetReadDeadline(time.Now().Add(-time.Hour))
  230. Eventually(done).Should(BeClosed())
  231. })
  232. It("unblocks after the deadline", func() {
  233. deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
  234. str.SetReadDeadline(deadline)
  235. b := make([]byte, 6)
  236. n, err := strWithTimeout.Read(b)
  237. Expect(err).To(MatchError(errDeadline))
  238. Expect(n).To(BeZero())
  239. Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(10*time.Millisecond)))
  240. })
  241. It("doesn't unblock if the deadline is changed before the first one expires", func() {
  242. deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond))
  243. deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond))
  244. str.SetReadDeadline(deadline1)
  245. go func() {
  246. defer GinkgoRecover()
  247. time.Sleep(scaleDuration(20 * time.Millisecond))
  248. str.SetReadDeadline(deadline2)
  249. // make sure that this was actually execute before the deadline expires
  250. Expect(time.Now()).To(BeTemporally("<", deadline1))
  251. }()
  252. runtime.Gosched()
  253. b := make([]byte, 10)
  254. n, err := strWithTimeout.Read(b)
  255. Expect(err).To(MatchError(errDeadline))
  256. Expect(n).To(BeZero())
  257. Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
  258. })
  259. It("unblocks earlier, when a new deadline is set", func() {
  260. deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond))
  261. deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond))
  262. go func() {
  263. defer GinkgoRecover()
  264. time.Sleep(scaleDuration(10 * time.Millisecond))
  265. str.SetReadDeadline(deadline2)
  266. // make sure that this was actually execute before the deadline expires
  267. Expect(time.Now()).To(BeTemporally("<", deadline2))
  268. }()
  269. str.SetReadDeadline(deadline1)
  270. runtime.Gosched()
  271. b := make([]byte, 10)
  272. _, err := strWithTimeout.Read(b)
  273. Expect(err).To(MatchError(errDeadline))
  274. Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(25*time.Millisecond)))
  275. })
  276. It("doesn't unblock if the deadline is removed", func() {
  277. deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
  278. str.SetReadDeadline(deadline)
  279. deadlineUnset := make(chan struct{})
  280. go func() {
  281. defer GinkgoRecover()
  282. time.Sleep(scaleDuration(20 * time.Millisecond))
  283. str.SetReadDeadline(time.Time{})
  284. // make sure that this was actually execute before the deadline expires
  285. Expect(time.Now()).To(BeTemporally("<", deadline))
  286. close(deadlineUnset)
  287. }()
  288. done := make(chan struct{})
  289. go func() {
  290. defer GinkgoRecover()
  291. _, err := strWithTimeout.Read(make([]byte, 1))
  292. Expect(err).To(MatchError("test done"))
  293. close(done)
  294. }()
  295. runtime.Gosched()
  296. Eventually(deadlineUnset).Should(BeClosed())
  297. Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed())
  298. // make the go routine return
  299. str.closeForShutdown(errors.New("test done"))
  300. Eventually(done).Should(BeClosed())
  301. })
  302. })
  303. Context("closing", func() {
  304. Context("with FIN bit", func() {
  305. It("returns EOFs", func() {
  306. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
  307. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
  308. str.handleStreamFrame(&wire.StreamFrame{
  309. Offset: 0,
  310. Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
  311. FinBit: true,
  312. })
  313. mockSender.EXPECT().onStreamCompleted(streamID)
  314. b := make([]byte, 4)
  315. n, err := strWithTimeout.Read(b)
  316. Expect(err).To(MatchError(io.EOF))
  317. Expect(n).To(Equal(4))
  318. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  319. n, err = strWithTimeout.Read(b)
  320. Expect(n).To(BeZero())
  321. Expect(err).To(MatchError(io.EOF))
  322. })
  323. It("handles out-of-order frames", func() {
  324. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
  325. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
  326. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
  327. frame1 := wire.StreamFrame{
  328. Offset: 2,
  329. Data: []byte{0xBE, 0xEF},
  330. FinBit: true,
  331. }
  332. frame2 := wire.StreamFrame{
  333. Offset: 0,
  334. Data: []byte{0xDE, 0xAD},
  335. }
  336. err := str.handleStreamFrame(&frame1)
  337. Expect(err).ToNot(HaveOccurred())
  338. err = str.handleStreamFrame(&frame2)
  339. Expect(err).ToNot(HaveOccurred())
  340. mockSender.EXPECT().onStreamCompleted(streamID)
  341. b := make([]byte, 4)
  342. n, err := strWithTimeout.Read(b)
  343. Expect(err).To(MatchError(io.EOF))
  344. Expect(n).To(Equal(4))
  345. Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
  346. n, err = strWithTimeout.Read(b)
  347. Expect(n).To(BeZero())
  348. Expect(err).To(MatchError(io.EOF))
  349. })
  350. It("returns EOFs with partial read", func() {
  351. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), true)
  352. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
  353. err := str.handleStreamFrame(&wire.StreamFrame{
  354. Offset: 0,
  355. Data: []byte{0xde, 0xad},
  356. FinBit: true,
  357. })
  358. Expect(err).ToNot(HaveOccurred())
  359. mockSender.EXPECT().onStreamCompleted(streamID)
  360. b := make([]byte, 4)
  361. n, err := strWithTimeout.Read(b)
  362. Expect(err).To(MatchError(io.EOF))
  363. Expect(n).To(Equal(2))
  364. Expect(b[:n]).To(Equal([]byte{0xde, 0xad}))
  365. })
  366. It("handles immediate FINs", func() {
  367. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
  368. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
  369. err := str.handleStreamFrame(&wire.StreamFrame{
  370. Offset: 0,
  371. FinBit: true,
  372. })
  373. Expect(err).ToNot(HaveOccurred())
  374. mockSender.EXPECT().onStreamCompleted(streamID)
  375. b := make([]byte, 4)
  376. n, err := strWithTimeout.Read(b)
  377. Expect(n).To(BeZero())
  378. Expect(err).To(MatchError(io.EOF))
  379. })
  380. })
  381. It("closes when CloseRemote is called", func() {
  382. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
  383. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
  384. str.CloseRemote(0)
  385. mockSender.EXPECT().onStreamCompleted(streamID)
  386. b := make([]byte, 8)
  387. n, err := strWithTimeout.Read(b)
  388. Expect(n).To(BeZero())
  389. Expect(err).To(MatchError(io.EOF))
  390. })
  391. })
  392. Context("closing for shutdown", func() {
  393. testErr := errors.New("test error")
  394. It("immediately returns all reads", func() {
  395. done := make(chan struct{})
  396. b := make([]byte, 4)
  397. go func() {
  398. defer GinkgoRecover()
  399. n, err := strWithTimeout.Read(b)
  400. Expect(n).To(BeZero())
  401. Expect(err).To(MatchError(testErr))
  402. close(done)
  403. }()
  404. Consistently(done).ShouldNot(BeClosed())
  405. str.closeForShutdown(testErr)
  406. Eventually(done).Should(BeClosed())
  407. })
  408. It("errors for all following reads", func() {
  409. str.closeForShutdown(testErr)
  410. b := make([]byte, 1)
  411. n, err := strWithTimeout.Read(b)
  412. Expect(n).To(BeZero())
  413. Expect(err).To(MatchError(testErr))
  414. })
  415. })
  416. })
  417. Context("stream cancelations", func() {
  418. Context("canceling read", func() {
  419. It("unblocks Read", func() {
  420. mockSender.EXPECT().queueControlFrame(gomock.Any())
  421. done := make(chan struct{})
  422. go func() {
  423. defer GinkgoRecover()
  424. _, err := strWithTimeout.Read([]byte{0})
  425. Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
  426. close(done)
  427. }()
  428. Consistently(done).ShouldNot(BeClosed())
  429. str.CancelRead(1234)
  430. Eventually(done).Should(BeClosed())
  431. })
  432. It("doesn't allow further calls to Read", func() {
  433. mockSender.EXPECT().queueControlFrame(gomock.Any())
  434. str.CancelRead(1234)
  435. _, err := strWithTimeout.Read([]byte{0})
  436. Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
  437. })
  438. It("does nothing when CancelRead is called twice", func() {
  439. mockSender.EXPECT().queueControlFrame(gomock.Any())
  440. str.CancelRead(1234)
  441. str.CancelRead(1234)
  442. _, err := strWithTimeout.Read([]byte{0})
  443. Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
  444. })
  445. It("queues a STOP_SENDING frame", func() {
  446. mockSender.EXPECT().queueControlFrame(&wire.StopSendingFrame{
  447. StreamID: streamID,
  448. ErrorCode: 1234,
  449. })
  450. str.CancelRead(1234)
  451. })
  452. It("doesn't send a STOP_SENDING frame, if the FIN was already read", func() {
  453. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true)
  454. mockFC.EXPECT().AddBytesRead(protocol.ByteCount(6))
  455. // no calls to mockSender.queueControlFrame
  456. Expect(str.handleStreamFrame(&wire.StreamFrame{
  457. StreamID: streamID,
  458. Data: []byte("foobar"),
  459. FinBit: true,
  460. })).To(Succeed())
  461. mockSender.EXPECT().onStreamCompleted(streamID)
  462. _, err := strWithTimeout.Read(make([]byte, 100))
  463. Expect(err).To(MatchError(io.EOF))
  464. str.CancelRead(1234)
  465. })
  466. It("doesn't send a STOP_SENDING frame, if the stream was already reset", func() {
  467. gomock.InOrder(
  468. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
  469. mockFC.EXPECT().Abandon(),
  470. )
  471. mockSender.EXPECT().onStreamCompleted(streamID)
  472. Expect(str.handleResetStreamFrame(&wire.ResetStreamFrame{
  473. StreamID: streamID,
  474. ByteOffset: 42,
  475. })).To(Succeed())
  476. str.CancelRead(1234)
  477. })
  478. It("sends a STOP_SENDING and completes the stream after receiving the final offset", func() {
  479. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true)
  480. Expect(str.handleStreamFrame(&wire.StreamFrame{
  481. Offset: 1000,
  482. FinBit: true,
  483. })).To(Succeed())
  484. mockFC.EXPECT().Abandon()
  485. mockSender.EXPECT().queueControlFrame(gomock.Any())
  486. mockSender.EXPECT().onStreamCompleted(streamID)
  487. str.CancelRead(1234)
  488. })
  489. It("completes the stream when receiving the FinBit after the stream was canceled", func() {
  490. mockSender.EXPECT().queueControlFrame(gomock.Any())
  491. str.CancelRead(1234)
  492. gomock.InOrder(
  493. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true),
  494. mockFC.EXPECT().Abandon(),
  495. )
  496. mockSender.EXPECT().onStreamCompleted(streamID)
  497. Expect(str.handleStreamFrame(&wire.StreamFrame{
  498. Offset: 1000,
  499. FinBit: true,
  500. })).To(Succeed())
  501. })
  502. })
  503. Context("receiving RESET_STREAM frames", func() {
  504. rst := &wire.ResetStreamFrame{
  505. StreamID: streamID,
  506. ByteOffset: 42,
  507. ErrorCode: 1234,
  508. }
  509. It("unblocks Read", func() {
  510. done := make(chan struct{})
  511. go func() {
  512. defer GinkgoRecover()
  513. _, err := strWithTimeout.Read([]byte{0})
  514. Expect(err).To(MatchError("Stream 1337 was reset with error code 1234"))
  515. Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
  516. Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
  517. Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(1234)))
  518. close(done)
  519. }()
  520. Consistently(done).ShouldNot(BeClosed())
  521. mockSender.EXPECT().onStreamCompleted(streamID)
  522. gomock.InOrder(
  523. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
  524. mockFC.EXPECT().Abandon(),
  525. )
  526. str.handleResetStreamFrame(rst)
  527. Eventually(done).Should(BeClosed())
  528. })
  529. It("doesn't allow further calls to Read", func() {
  530. mockSender.EXPECT().onStreamCompleted(streamID)
  531. gomock.InOrder(
  532. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
  533. mockFC.EXPECT().Abandon(),
  534. )
  535. Expect(str.handleResetStreamFrame(rst)).To(Succeed())
  536. _, err := strWithTimeout.Read([]byte{0})
  537. Expect(err).To(MatchError("Stream 1337 was reset with error code 1234"))
  538. Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
  539. Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
  540. Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(1234)))
  541. })
  542. It("errors when receiving a RESET_STREAM with an inconsistent offset", func() {
  543. testErr := errors.New("already received a different final offset before")
  544. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Return(testErr)
  545. err := str.handleResetStreamFrame(rst)
  546. Expect(err).To(MatchError(testErr))
  547. })
  548. It("ignores duplicate RESET_STREAM frames", func() {
  549. mockSender.EXPECT().onStreamCompleted(streamID)
  550. mockFC.EXPECT().Abandon()
  551. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2)
  552. Expect(str.handleResetStreamFrame(rst)).To(Succeed())
  553. Expect(str.handleResetStreamFrame(rst)).To(Succeed())
  554. })
  555. It("doesn't do anyting when it was closed for shutdown", func() {
  556. str.closeForShutdown(nil)
  557. err := str.handleResetStreamFrame(rst)
  558. Expect(err).ToNot(HaveOccurred())
  559. })
  560. })
  561. })
  562. Context("flow control", func() {
  563. It("errors when a STREAM frame causes a flow control violation", func() {
  564. testErr := errors.New("flow control violation")
  565. mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), false).Return(testErr)
  566. frame := wire.StreamFrame{
  567. Offset: 2,
  568. Data: []byte("foobar"),
  569. }
  570. err := str.handleStreamFrame(&frame)
  571. Expect(err).To(MatchError(testErr))
  572. })
  573. It("gets a window update", func() {
  574. mockFC.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x100))
  575. Expect(str.getWindowUpdate()).To(Equal(protocol.ByteCount(0x100)))
  576. })
  577. })
  578. })