receive_stream.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package quic
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "time"
  7. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  8. "github.com/lucas-clemente/quic-go/internal/protocol"
  9. "github.com/lucas-clemente/quic-go/internal/utils"
  10. "github.com/lucas-clemente/quic-go/internal/wire"
  11. )
  12. type receiveStreamI interface {
  13. ReceiveStream
  14. handleStreamFrame(*wire.StreamFrame) error
  15. handleResetStreamFrame(*wire.ResetStreamFrame) error
  16. closeForShutdown(error)
  17. getWindowUpdate() protocol.ByteCount
  18. }
  19. type receiveStream struct {
  20. mutex sync.Mutex
  21. streamID protocol.StreamID
  22. sender streamSender
  23. frameQueue *frameSorter
  24. readOffset protocol.ByteCount
  25. finalOffset protocol.ByteCount
  26. currentFrame []byte
  27. currentFrameIsLast bool // is the currentFrame the last frame on this stream
  28. readPosInFrame int
  29. closeForShutdownErr error
  30. cancelReadErr error
  31. resetRemotelyErr StreamError
  32. closedForShutdown bool // set when CloseForShutdown() is called
  33. finRead bool // set once we read a frame with a FinBit
  34. canceledRead bool // set when CancelRead() is called
  35. resetRemotely bool // set when HandleResetStreamFrame() is called
  36. readChan chan struct{}
  37. deadline time.Time
  38. flowController flowcontrol.StreamFlowController
  39. version protocol.VersionNumber
  40. }
  41. var _ ReceiveStream = &receiveStream{}
  42. var _ receiveStreamI = &receiveStream{}
  43. func newReceiveStream(
  44. streamID protocol.StreamID,
  45. sender streamSender,
  46. flowController flowcontrol.StreamFlowController,
  47. version protocol.VersionNumber,
  48. ) *receiveStream {
  49. return &receiveStream{
  50. streamID: streamID,
  51. sender: sender,
  52. flowController: flowController,
  53. frameQueue: newFrameSorter(),
  54. readChan: make(chan struct{}, 1),
  55. finalOffset: protocol.MaxByteCount,
  56. version: version,
  57. }
  58. }
  59. func (s *receiveStream) StreamID() protocol.StreamID {
  60. return s.streamID
  61. }
  62. // Read implements io.Reader. It is not thread safe!
  63. func (s *receiveStream) Read(p []byte) (int, error) {
  64. s.mutex.Lock()
  65. completed, n, err := s.readImpl(p)
  66. s.mutex.Unlock()
  67. if completed {
  68. s.streamCompleted()
  69. }
  70. return n, err
  71. }
  72. func (s *receiveStream) readImpl(p []byte) (bool /*stream completed */, int, error) {
  73. if s.finRead {
  74. return false, 0, io.EOF
  75. }
  76. if s.canceledRead {
  77. return false, 0, s.cancelReadErr
  78. }
  79. if s.resetRemotely {
  80. return false, 0, s.resetRemotelyErr
  81. }
  82. if s.closedForShutdown {
  83. return false, 0, s.closeForShutdownErr
  84. }
  85. bytesRead := 0
  86. for bytesRead < len(p) {
  87. if s.currentFrame == nil || s.readPosInFrame >= len(s.currentFrame) {
  88. s.dequeueNextFrame()
  89. }
  90. if s.currentFrame == nil && bytesRead > 0 {
  91. return false, bytesRead, s.closeForShutdownErr
  92. }
  93. var deadlineTimer *utils.Timer
  94. for {
  95. // Stop waiting on errors
  96. if s.closedForShutdown {
  97. return false, bytesRead, s.closeForShutdownErr
  98. }
  99. if s.canceledRead {
  100. return false, bytesRead, s.cancelReadErr
  101. }
  102. if s.resetRemotely {
  103. return false, bytesRead, s.resetRemotelyErr
  104. }
  105. deadline := s.deadline
  106. if !deadline.IsZero() {
  107. if !time.Now().Before(deadline) {
  108. return false, bytesRead, errDeadline
  109. }
  110. if deadlineTimer == nil {
  111. deadlineTimer = utils.NewTimer()
  112. }
  113. deadlineTimer.Reset(deadline)
  114. }
  115. if s.currentFrame != nil || s.currentFrameIsLast {
  116. break
  117. }
  118. s.mutex.Unlock()
  119. if deadline.IsZero() {
  120. <-s.readChan
  121. } else {
  122. select {
  123. case <-s.readChan:
  124. case <-deadlineTimer.Chan():
  125. deadlineTimer.SetRead()
  126. }
  127. }
  128. s.mutex.Lock()
  129. if s.currentFrame == nil {
  130. s.dequeueNextFrame()
  131. }
  132. }
  133. if bytesRead > len(p) {
  134. return false, bytesRead, fmt.Errorf("BUG: bytesRead (%d) > len(p) (%d) in stream.Read", bytesRead, len(p))
  135. }
  136. if s.readPosInFrame > len(s.currentFrame) {
  137. return false, bytesRead, fmt.Errorf("BUG: readPosInFrame (%d) > frame.DataLen (%d) in stream.Read", s.readPosInFrame, len(s.currentFrame))
  138. }
  139. s.mutex.Unlock()
  140. m := copy(p[bytesRead:], s.currentFrame[s.readPosInFrame:])
  141. s.readPosInFrame += m
  142. bytesRead += m
  143. s.readOffset += protocol.ByteCount(m)
  144. s.mutex.Lock()
  145. // when a RESET_STREAM was received, the was already informed about the final byteOffset for this stream
  146. if !s.resetRemotely {
  147. s.flowController.AddBytesRead(protocol.ByteCount(m))
  148. }
  149. if s.readPosInFrame >= len(s.currentFrame) && s.currentFrameIsLast {
  150. s.finRead = true
  151. return true, bytesRead, io.EOF
  152. }
  153. }
  154. return false, bytesRead, nil
  155. }
  156. func (s *receiveStream) dequeueNextFrame() {
  157. var offset protocol.ByteCount
  158. offset, s.currentFrame = s.frameQueue.Pop()
  159. s.currentFrameIsLast = offset+protocol.ByteCount(len(s.currentFrame)) >= s.finalOffset
  160. s.readPosInFrame = 0
  161. }
  162. func (s *receiveStream) CancelRead(errorCode protocol.ApplicationErrorCode) {
  163. s.mutex.Lock()
  164. completed := s.cancelReadImpl(errorCode)
  165. s.mutex.Unlock()
  166. if completed {
  167. s.streamCompleted()
  168. }
  169. }
  170. func (s *receiveStream) cancelReadImpl(errorCode protocol.ApplicationErrorCode) bool /* completed */ {
  171. if s.finRead || s.canceledRead || s.resetRemotely {
  172. return false
  173. }
  174. s.canceledRead = true
  175. s.cancelReadErr = fmt.Errorf("Read on stream %d canceled with error code %d", s.streamID, errorCode)
  176. s.signalRead()
  177. s.sender.queueControlFrame(&wire.StopSendingFrame{
  178. StreamID: s.streamID,
  179. ErrorCode: errorCode,
  180. })
  181. // We're done with this stream if the final offset was already received.
  182. return s.finalOffset != protocol.MaxByteCount
  183. }
  184. func (s *receiveStream) handleStreamFrame(frame *wire.StreamFrame) error {
  185. s.mutex.Lock()
  186. completed, err := s.handleStreamFrameImpl(frame)
  187. s.mutex.Unlock()
  188. if completed {
  189. s.streamCompleted()
  190. }
  191. return err
  192. }
  193. func (s *receiveStream) handleStreamFrameImpl(frame *wire.StreamFrame) (bool /* completed */, error) {
  194. maxOffset := frame.Offset + frame.DataLen()
  195. if err := s.flowController.UpdateHighestReceived(maxOffset, frame.FinBit); err != nil {
  196. return false, err
  197. }
  198. if frame.FinBit {
  199. s.finalOffset = maxOffset
  200. }
  201. if s.canceledRead {
  202. return frame.FinBit, nil
  203. }
  204. if err := s.frameQueue.Push(frame.Data, frame.Offset); err != nil {
  205. return false, err
  206. }
  207. s.signalRead()
  208. return false, nil
  209. }
  210. func (s *receiveStream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  211. s.mutex.Lock()
  212. completed, err := s.handleResetStreamFrameImpl(frame)
  213. s.mutex.Unlock()
  214. if completed {
  215. s.streamCompleted()
  216. }
  217. return err
  218. }
  219. func (s *receiveStream) handleResetStreamFrameImpl(frame *wire.ResetStreamFrame) (bool /*completed */, error) {
  220. if s.closedForShutdown {
  221. return false, nil
  222. }
  223. if err := s.flowController.UpdateHighestReceived(frame.ByteOffset, true); err != nil {
  224. return false, err
  225. }
  226. s.finalOffset = frame.ByteOffset
  227. // ignore duplicate RESET_STREAM frames for this stream (after checking their final offset)
  228. if s.resetRemotely {
  229. return false, nil
  230. }
  231. s.resetRemotely = true
  232. s.resetRemotelyErr = streamCanceledError{
  233. errorCode: frame.ErrorCode,
  234. error: fmt.Errorf("Stream %d was reset with error code %d", s.streamID, frame.ErrorCode),
  235. }
  236. s.signalRead()
  237. return true, nil
  238. }
  239. func (s *receiveStream) CloseRemote(offset protocol.ByteCount) {
  240. s.handleStreamFrame(&wire.StreamFrame{FinBit: true, Offset: offset})
  241. }
  242. func (s *receiveStream) SetReadDeadline(t time.Time) error {
  243. s.mutex.Lock()
  244. s.deadline = t
  245. s.mutex.Unlock()
  246. s.signalRead()
  247. return nil
  248. }
  249. // CloseForShutdown closes a stream abruptly.
  250. // It makes Read unblock (and return the error) immediately.
  251. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RESET.
  252. func (s *receiveStream) closeForShutdown(err error) {
  253. s.mutex.Lock()
  254. s.closedForShutdown = true
  255. s.closeForShutdownErr = err
  256. s.mutex.Unlock()
  257. s.signalRead()
  258. }
  259. func (s *receiveStream) getWindowUpdate() protocol.ByteCount {
  260. return s.flowController.GetWindowUpdate()
  261. }
  262. func (s *receiveStream) streamCompleted() {
  263. s.mutex.Lock()
  264. finRead := s.finRead
  265. s.mutex.Unlock()
  266. if !finRead {
  267. s.flowController.Abandon()
  268. }
  269. s.sender.onStreamCompleted(s.streamID)
  270. }
  271. // signalRead performs a non-blocking send on the readChan
  272. func (s *receiveStream) signalRead() {
  273. select {
  274. case s.readChan <- struct{}{}:
  275. default:
  276. }
  277. }