123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625 |
- package quic
- import (
- "errors"
- "io"
- "runtime"
- "time"
- "github.com/golang/mock/gomock"
- "github.com/lucas-clemente/quic-go/internal/mocks"
- "github.com/lucas-clemente/quic-go/internal/protocol"
- "github.com/lucas-clemente/quic-go/internal/wire"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- "github.com/onsi/gomega/gbytes"
- )
- var _ = Describe("Receive Stream", func() {
- const streamID protocol.StreamID = 1337
- var (
- str *receiveStream
- strWithTimeout io.Reader // str wrapped with gbytes.TimeoutReader
- mockFC *mocks.MockStreamFlowController
- mockSender *MockStreamSender
- )
- BeforeEach(func() {
- mockSender = NewMockStreamSender(mockCtrl)
- mockFC = mocks.NewMockStreamFlowController(mockCtrl)
- str = newReceiveStream(streamID, mockSender, mockFC, protocol.VersionWhatever)
- timeout := scaleDuration(250 * time.Millisecond)
- strWithTimeout = gbytes.TimeoutReader(str, timeout)
- })
- It("gets stream id", func() {
- Expect(str.StreamID()).To(Equal(protocol.StreamID(1337)))
- })
- Context("reading", func() {
- It("reads a single STREAM frame", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
- frame := wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
- }
- err := str.handleStreamFrame(&frame)
- Expect(err).ToNot(HaveOccurred())
- b := make([]byte, 4)
- n, err := strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(4))
- Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
- })
- It("reads a single STREAM frame in multiple goes", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
- frame := wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
- }
- err := str.handleStreamFrame(&frame)
- Expect(err).ToNot(HaveOccurred())
- b := make([]byte, 2)
- n, err := strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(2))
- Expect(b).To(Equal([]byte{0xDE, 0xAD}))
- n, err = strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(2))
- Expect(b).To(Equal([]byte{0xBE, 0xEF}))
- })
- It("reads all data available", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
- frame1 := wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xDE, 0xAD},
- }
- frame2 := wire.StreamFrame{
- Offset: 2,
- Data: []byte{0xBE, 0xEF},
- }
- err := str.handleStreamFrame(&frame1)
- Expect(err).ToNot(HaveOccurred())
- err = str.handleStreamFrame(&frame2)
- Expect(err).ToNot(HaveOccurred())
- b := make([]byte, 6)
- n, err := strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(4))
- Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0x00}))
- })
- It("assembles multiple STREAM frames", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
- frame1 := wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xDE, 0xAD},
- }
- frame2 := wire.StreamFrame{
- Offset: 2,
- Data: []byte{0xBE, 0xEF},
- }
- err := str.handleStreamFrame(&frame1)
- Expect(err).ToNot(HaveOccurred())
- err = str.handleStreamFrame(&frame2)
- Expect(err).ToNot(HaveOccurred())
- b := make([]byte, 4)
- n, err := strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(4))
- Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
- })
- It("waits until data is available", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
- go func() {
- defer GinkgoRecover()
- frame := wire.StreamFrame{Data: []byte{0xDE, 0xAD}}
- time.Sleep(10 * time.Millisecond)
- err := str.handleStreamFrame(&frame)
- Expect(err).ToNot(HaveOccurred())
- }()
- b := make([]byte, 2)
- n, err := strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(2))
- })
- It("handles STREAM frames in wrong order", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
- frame1 := wire.StreamFrame{
- Offset: 2,
- Data: []byte{0xBE, 0xEF},
- }
- frame2 := wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xDE, 0xAD},
- }
- err := str.handleStreamFrame(&frame1)
- Expect(err).ToNot(HaveOccurred())
- err = str.handleStreamFrame(&frame2)
- Expect(err).ToNot(HaveOccurred())
- b := make([]byte, 4)
- n, err := strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(4))
- Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
- })
- It("ignores duplicate STREAM frames", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
- frame1 := wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xDE, 0xAD},
- }
- frame2 := wire.StreamFrame{
- Offset: 0,
- Data: []byte{0x13, 0x37},
- }
- frame3 := wire.StreamFrame{
- Offset: 2,
- Data: []byte{0xBE, 0xEF},
- }
- err := str.handleStreamFrame(&frame1)
- Expect(err).ToNot(HaveOccurred())
- err = str.handleStreamFrame(&frame2)
- Expect(err).ToNot(HaveOccurred())
- err = str.handleStreamFrame(&frame3)
- Expect(err).ToNot(HaveOccurred())
- b := make([]byte, 4)
- n, err := strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(4))
- Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
- })
- It("doesn't rejects a STREAM frames with an overlapping data range", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), false)
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
- frame1 := wire.StreamFrame{
- Offset: 0,
- Data: []byte("foob"),
- }
- frame2 := wire.StreamFrame{
- Offset: 2,
- Data: []byte("obar"),
- }
- err := str.handleStreamFrame(&frame1)
- Expect(err).ToNot(HaveOccurred())
- err = str.handleStreamFrame(&frame2)
- Expect(err).ToNot(HaveOccurred())
- b := make([]byte, 6)
- n, err := strWithTimeout.Read(b)
- Expect(err).ToNot(HaveOccurred())
- Expect(n).To(Equal(6))
- Expect(b).To(Equal([]byte("foobar")))
- })
- Context("deadlines", func() {
- It("the deadline error has the right net.Error properties", func() {
- Expect(errDeadline.Temporary()).To(BeTrue())
- Expect(errDeadline.Timeout()).To(BeTrue())
- Expect(errDeadline).To(MatchError("deadline exceeded"))
- })
- It("returns an error when Read is called after the deadline", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), false).AnyTimes()
- f := &wire.StreamFrame{Data: []byte("foobar")}
- err := str.handleStreamFrame(f)
- Expect(err).ToNot(HaveOccurred())
- str.SetReadDeadline(time.Now().Add(-time.Second))
- b := make([]byte, 6)
- n, err := strWithTimeout.Read(b)
- Expect(err).To(MatchError(errDeadline))
- Expect(n).To(BeZero())
- })
- It("unblocks when the deadline is changed to the past", func() {
- str.SetReadDeadline(time.Now().Add(time.Hour))
- done := make(chan struct{})
- go func() {
- defer GinkgoRecover()
- _, err := str.Read(make([]byte, 6))
- Expect(err).To(MatchError(errDeadline))
- close(done)
- }()
- Consistently(done).ShouldNot(BeClosed())
- str.SetReadDeadline(time.Now().Add(-time.Hour))
- Eventually(done).Should(BeClosed())
- })
- It("unblocks after the deadline", func() {
- deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
- str.SetReadDeadline(deadline)
- b := make([]byte, 6)
- n, err := strWithTimeout.Read(b)
- Expect(err).To(MatchError(errDeadline))
- Expect(n).To(BeZero())
- Expect(time.Now()).To(BeTemporally("~", deadline, scaleDuration(10*time.Millisecond)))
- })
- It("doesn't unblock if the deadline is changed before the first one expires", func() {
- deadline1 := time.Now().Add(scaleDuration(50 * time.Millisecond))
- deadline2 := time.Now().Add(scaleDuration(100 * time.Millisecond))
- str.SetReadDeadline(deadline1)
- go func() {
- defer GinkgoRecover()
- time.Sleep(scaleDuration(20 * time.Millisecond))
- str.SetReadDeadline(deadline2)
- // make sure that this was actually execute before the deadline expires
- Expect(time.Now()).To(BeTemporally("<", deadline1))
- }()
- runtime.Gosched()
- b := make([]byte, 10)
- n, err := strWithTimeout.Read(b)
- Expect(err).To(MatchError(errDeadline))
- Expect(n).To(BeZero())
- Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(20*time.Millisecond)))
- })
- It("unblocks earlier, when a new deadline is set", func() {
- deadline1 := time.Now().Add(scaleDuration(200 * time.Millisecond))
- deadline2 := time.Now().Add(scaleDuration(50 * time.Millisecond))
- go func() {
- defer GinkgoRecover()
- time.Sleep(scaleDuration(10 * time.Millisecond))
- str.SetReadDeadline(deadline2)
- // make sure that this was actually execute before the deadline expires
- Expect(time.Now()).To(BeTemporally("<", deadline2))
- }()
- str.SetReadDeadline(deadline1)
- runtime.Gosched()
- b := make([]byte, 10)
- _, err := strWithTimeout.Read(b)
- Expect(err).To(MatchError(errDeadline))
- Expect(time.Now()).To(BeTemporally("~", deadline2, scaleDuration(25*time.Millisecond)))
- })
- It("doesn't unblock if the deadline is removed", func() {
- deadline := time.Now().Add(scaleDuration(50 * time.Millisecond))
- str.SetReadDeadline(deadline)
- deadlineUnset := make(chan struct{})
- go func() {
- defer GinkgoRecover()
- time.Sleep(scaleDuration(20 * time.Millisecond))
- str.SetReadDeadline(time.Time{})
- // make sure that this was actually execute before the deadline expires
- Expect(time.Now()).To(BeTemporally("<", deadline))
- close(deadlineUnset)
- }()
- done := make(chan struct{})
- go func() {
- defer GinkgoRecover()
- _, err := strWithTimeout.Read(make([]byte, 1))
- Expect(err).To(MatchError("test done"))
- close(done)
- }()
- runtime.Gosched()
- Eventually(deadlineUnset).Should(BeClosed())
- Consistently(done, scaleDuration(100*time.Millisecond)).ShouldNot(BeClosed())
- // make the go routine return
- str.closeForShutdown(errors.New("test done"))
- Eventually(done).Should(BeClosed())
- })
- })
- Context("closing", func() {
- Context("with FIN bit", func() {
- It("returns EOFs", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(4))
- str.handleStreamFrame(&wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xDE, 0xAD, 0xBE, 0xEF},
- FinBit: true,
- })
- mockSender.EXPECT().onStreamCompleted(streamID)
- b := make([]byte, 4)
- n, err := strWithTimeout.Read(b)
- Expect(err).To(MatchError(io.EOF))
- Expect(n).To(Equal(4))
- Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
- n, err = strWithTimeout.Read(b)
- Expect(n).To(BeZero())
- Expect(err).To(MatchError(io.EOF))
- })
- It("handles out-of-order frames", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), false)
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(4), true)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2)).Times(2)
- frame1 := wire.StreamFrame{
- Offset: 2,
- Data: []byte{0xBE, 0xEF},
- FinBit: true,
- }
- frame2 := wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xDE, 0xAD},
- }
- err := str.handleStreamFrame(&frame1)
- Expect(err).ToNot(HaveOccurred())
- err = str.handleStreamFrame(&frame2)
- Expect(err).ToNot(HaveOccurred())
- mockSender.EXPECT().onStreamCompleted(streamID)
- b := make([]byte, 4)
- n, err := strWithTimeout.Read(b)
- Expect(err).To(MatchError(io.EOF))
- Expect(n).To(Equal(4))
- Expect(b).To(Equal([]byte{0xDE, 0xAD, 0xBE, 0xEF}))
- n, err = strWithTimeout.Read(b)
- Expect(n).To(BeZero())
- Expect(err).To(MatchError(io.EOF))
- })
- It("returns EOFs with partial read", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(2), true)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(2))
- err := str.handleStreamFrame(&wire.StreamFrame{
- Offset: 0,
- Data: []byte{0xde, 0xad},
- FinBit: true,
- })
- Expect(err).ToNot(HaveOccurred())
- mockSender.EXPECT().onStreamCompleted(streamID)
- b := make([]byte, 4)
- n, err := strWithTimeout.Read(b)
- Expect(err).To(MatchError(io.EOF))
- Expect(n).To(Equal(2))
- Expect(b[:n]).To(Equal([]byte{0xde, 0xad}))
- })
- It("handles immediate FINs", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
- err := str.handleStreamFrame(&wire.StreamFrame{
- Offset: 0,
- FinBit: true,
- })
- Expect(err).ToNot(HaveOccurred())
- mockSender.EXPECT().onStreamCompleted(streamID)
- b := make([]byte, 4)
- n, err := strWithTimeout.Read(b)
- Expect(n).To(BeZero())
- Expect(err).To(MatchError(io.EOF))
- })
- })
- It("closes when CloseRemote is called", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(0), true)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(0))
- str.CloseRemote(0)
- mockSender.EXPECT().onStreamCompleted(streamID)
- b := make([]byte, 8)
- n, err := strWithTimeout.Read(b)
- Expect(n).To(BeZero())
- Expect(err).To(MatchError(io.EOF))
- })
- })
- Context("closing for shutdown", func() {
- testErr := errors.New("test error")
- It("immediately returns all reads", func() {
- done := make(chan struct{})
- b := make([]byte, 4)
- go func() {
- defer GinkgoRecover()
- n, err := strWithTimeout.Read(b)
- Expect(n).To(BeZero())
- Expect(err).To(MatchError(testErr))
- close(done)
- }()
- Consistently(done).ShouldNot(BeClosed())
- str.closeForShutdown(testErr)
- Eventually(done).Should(BeClosed())
- })
- It("errors for all following reads", func() {
- str.closeForShutdown(testErr)
- b := make([]byte, 1)
- n, err := strWithTimeout.Read(b)
- Expect(n).To(BeZero())
- Expect(err).To(MatchError(testErr))
- })
- })
- })
- Context("stream cancelations", func() {
- Context("canceling read", func() {
- It("unblocks Read", func() {
- mockSender.EXPECT().queueControlFrame(gomock.Any())
- done := make(chan struct{})
- go func() {
- defer GinkgoRecover()
- _, err := strWithTimeout.Read([]byte{0})
- Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
- close(done)
- }()
- Consistently(done).ShouldNot(BeClosed())
- str.CancelRead(1234)
- Eventually(done).Should(BeClosed())
- })
- It("doesn't allow further calls to Read", func() {
- mockSender.EXPECT().queueControlFrame(gomock.Any())
- str.CancelRead(1234)
- _, err := strWithTimeout.Read([]byte{0})
- Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
- })
- It("does nothing when CancelRead is called twice", func() {
- mockSender.EXPECT().queueControlFrame(gomock.Any())
- str.CancelRead(1234)
- str.CancelRead(1234)
- _, err := strWithTimeout.Read([]byte{0})
- Expect(err).To(MatchError("Read on stream 1337 canceled with error code 1234"))
- })
- It("queues a STOP_SENDING frame", func() {
- mockSender.EXPECT().queueControlFrame(&wire.StopSendingFrame{
- StreamID: streamID,
- ErrorCode: 1234,
- })
- str.CancelRead(1234)
- })
- It("doesn't send a STOP_SENDING frame, if the FIN was already read", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(6), true)
- mockFC.EXPECT().AddBytesRead(protocol.ByteCount(6))
- // no calls to mockSender.queueControlFrame
- Expect(str.handleStreamFrame(&wire.StreamFrame{
- StreamID: streamID,
- Data: []byte("foobar"),
- FinBit: true,
- })).To(Succeed())
- mockSender.EXPECT().onStreamCompleted(streamID)
- _, err := strWithTimeout.Read(make([]byte, 100))
- Expect(err).To(MatchError(io.EOF))
- str.CancelRead(1234)
- })
- It("doesn't send a STOP_SENDING frame, if the stream was already reset", func() {
- gomock.InOrder(
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
- mockFC.EXPECT().Abandon(),
- )
- mockSender.EXPECT().onStreamCompleted(streamID)
- Expect(str.handleResetStreamFrame(&wire.ResetStreamFrame{
- StreamID: streamID,
- ByteOffset: 42,
- })).To(Succeed())
- str.CancelRead(1234)
- })
- It("sends a STOP_SENDING and completes the stream after receiving the final offset", func() {
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true)
- Expect(str.handleStreamFrame(&wire.StreamFrame{
- Offset: 1000,
- FinBit: true,
- })).To(Succeed())
- mockFC.EXPECT().Abandon()
- mockSender.EXPECT().queueControlFrame(gomock.Any())
- mockSender.EXPECT().onStreamCompleted(streamID)
- str.CancelRead(1234)
- })
- It("completes the stream when receiving the FinBit after the stream was canceled", func() {
- mockSender.EXPECT().queueControlFrame(gomock.Any())
- str.CancelRead(1234)
- gomock.InOrder(
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(1000), true),
- mockFC.EXPECT().Abandon(),
- )
- mockSender.EXPECT().onStreamCompleted(streamID)
- Expect(str.handleStreamFrame(&wire.StreamFrame{
- Offset: 1000,
- FinBit: true,
- })).To(Succeed())
- })
- })
- Context("receiving RESET_STREAM frames", func() {
- rst := &wire.ResetStreamFrame{
- StreamID: streamID,
- ByteOffset: 42,
- ErrorCode: 1234,
- }
- It("unblocks Read", func() {
- done := make(chan struct{})
- go func() {
- defer GinkgoRecover()
- _, err := strWithTimeout.Read([]byte{0})
- Expect(err).To(MatchError("Stream 1337 was reset with error code 1234"))
- Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
- Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
- Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(1234)))
- close(done)
- }()
- Consistently(done).ShouldNot(BeClosed())
- mockSender.EXPECT().onStreamCompleted(streamID)
- gomock.InOrder(
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
- mockFC.EXPECT().Abandon(),
- )
- str.handleResetStreamFrame(rst)
- Eventually(done).Should(BeClosed())
- })
- It("doesn't allow further calls to Read", func() {
- mockSender.EXPECT().onStreamCompleted(streamID)
- gomock.InOrder(
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true),
- mockFC.EXPECT().Abandon(),
- )
- Expect(str.handleResetStreamFrame(rst)).To(Succeed())
- _, err := strWithTimeout.Read([]byte{0})
- Expect(err).To(MatchError("Stream 1337 was reset with error code 1234"))
- Expect(err).To(BeAssignableToTypeOf(streamCanceledError{}))
- Expect(err.(streamCanceledError).Canceled()).To(BeTrue())
- Expect(err.(streamCanceledError).ErrorCode()).To(Equal(protocol.ApplicationErrorCode(1234)))
- })
- It("errors when receiving a RESET_STREAM with an inconsistent offset", func() {
- testErr := errors.New("already received a different final offset before")
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Return(testErr)
- err := str.handleResetStreamFrame(rst)
- Expect(err).To(MatchError(testErr))
- })
- It("ignores duplicate RESET_STREAM frames", func() {
- mockSender.EXPECT().onStreamCompleted(streamID)
- mockFC.EXPECT().Abandon()
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(42), true).Times(2)
- Expect(str.handleResetStreamFrame(rst)).To(Succeed())
- Expect(str.handleResetStreamFrame(rst)).To(Succeed())
- })
- It("doesn't do anyting when it was closed for shutdown", func() {
- str.closeForShutdown(nil)
- err := str.handleResetStreamFrame(rst)
- Expect(err).ToNot(HaveOccurred())
- })
- })
- })
- Context("flow control", func() {
- It("errors when a STREAM frame causes a flow control violation", func() {
- testErr := errors.New("flow control violation")
- mockFC.EXPECT().UpdateHighestReceived(protocol.ByteCount(8), false).Return(testErr)
- frame := wire.StreamFrame{
- Offset: 2,
- Data: []byte("foobar"),
- }
- err := str.handleStreamFrame(&frame)
- Expect(err).To(MatchError(testErr))
- })
- It("gets a window update", func() {
- mockFC.EXPECT().GetWindowUpdate().Return(protocol.ByteCount(0x100))
- Expect(str.getWindowUpdate()).To(Equal(protocol.ByteCount(0x100)))
- })
- })
- })
|