stream.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package quic
  2. import (
  3. "net"
  4. "sync"
  5. "time"
  6. "github.com/lucas-clemente/quic-go/internal/flowcontrol"
  7. "github.com/lucas-clemente/quic-go/internal/protocol"
  8. "github.com/lucas-clemente/quic-go/internal/wire"
  9. )
  10. const errorCodeStopping protocol.ApplicationErrorCode = 0
  11. // The streamSender is notified by the stream about various events.
  12. type streamSender interface {
  13. queueControlFrame(wire.Frame)
  14. onHasStreamData(protocol.StreamID)
  15. // must be called without holding the mutex that is acquired by closeForShutdown
  16. onStreamCompleted(protocol.StreamID)
  17. }
  18. // Each of the both stream halves gets its own uniStreamSender.
  19. // This is necessary in order to keep track when both halves have been completed.
  20. type uniStreamSender struct {
  21. streamSender
  22. onStreamCompletedImpl func()
  23. }
  24. func (s *uniStreamSender) queueControlFrame(f wire.Frame) {
  25. s.streamSender.queueControlFrame(f)
  26. }
  27. func (s *uniStreamSender) onHasStreamData(id protocol.StreamID) {
  28. s.streamSender.onHasStreamData(id)
  29. }
  30. func (s *uniStreamSender) onStreamCompleted(protocol.StreamID) {
  31. s.onStreamCompletedImpl()
  32. }
  33. var _ streamSender = &uniStreamSender{}
  34. type streamI interface {
  35. Stream
  36. closeForShutdown(error)
  37. // for receiving
  38. handleStreamFrame(*wire.StreamFrame) error
  39. handleResetStreamFrame(*wire.ResetStreamFrame) error
  40. getWindowUpdate() protocol.ByteCount
  41. // for sending
  42. hasData() bool
  43. handleStopSendingFrame(*wire.StopSendingFrame)
  44. popStreamFrame(maxBytes protocol.ByteCount) (*wire.StreamFrame, bool)
  45. handleMaxStreamDataFrame(*wire.MaxStreamDataFrame)
  46. }
  47. var _ receiveStreamI = (streamI)(nil)
  48. var _ sendStreamI = (streamI)(nil)
  49. // A Stream assembles the data from StreamFrames and provides a super-convenient Read-Interface
  50. //
  51. // Read() and Write() may be called concurrently, but multiple calls to Read() or Write() individually must be synchronized manually.
  52. type stream struct {
  53. receiveStream
  54. sendStream
  55. completedMutex sync.Mutex
  56. sender streamSender
  57. receiveStreamCompleted bool
  58. sendStreamCompleted bool
  59. version protocol.VersionNumber
  60. }
  61. var _ Stream = &stream{}
  62. type deadlineError struct{}
  63. func (deadlineError) Error() string { return "deadline exceeded" }
  64. func (deadlineError) Temporary() bool { return true }
  65. func (deadlineError) Timeout() bool { return true }
  66. var errDeadline net.Error = &deadlineError{}
  67. type streamCanceledError struct {
  68. error
  69. errorCode protocol.ApplicationErrorCode
  70. }
  71. func (streamCanceledError) Canceled() bool { return true }
  72. func (e streamCanceledError) ErrorCode() protocol.ApplicationErrorCode { return e.errorCode }
  73. var _ StreamError = &streamCanceledError{}
  74. // newStream creates a new Stream
  75. func newStream(streamID protocol.StreamID,
  76. sender streamSender,
  77. flowController flowcontrol.StreamFlowController,
  78. version protocol.VersionNumber,
  79. ) *stream {
  80. s := &stream{sender: sender, version: version}
  81. senderForSendStream := &uniStreamSender{
  82. streamSender: sender,
  83. onStreamCompletedImpl: func() {
  84. s.completedMutex.Lock()
  85. s.sendStreamCompleted = true
  86. s.checkIfCompleted()
  87. s.completedMutex.Unlock()
  88. },
  89. }
  90. s.sendStream = *newSendStream(streamID, senderForSendStream, flowController, version)
  91. senderForReceiveStream := &uniStreamSender{
  92. streamSender: sender,
  93. onStreamCompletedImpl: func() {
  94. s.completedMutex.Lock()
  95. s.receiveStreamCompleted = true
  96. s.checkIfCompleted()
  97. s.completedMutex.Unlock()
  98. },
  99. }
  100. s.receiveStream = *newReceiveStream(streamID, senderForReceiveStream, flowController, version)
  101. return s
  102. }
  103. // need to define StreamID() here, since both receiveStream and readStream have a StreamID()
  104. func (s *stream) StreamID() protocol.StreamID {
  105. // the result is same for receiveStream and sendStream
  106. return s.sendStream.StreamID()
  107. }
  108. func (s *stream) Close() error {
  109. if err := s.sendStream.Close(); err != nil {
  110. return err
  111. }
  112. return nil
  113. }
  114. func (s *stream) SetDeadline(t time.Time) error {
  115. _ = s.SetReadDeadline(t) // SetReadDeadline never errors
  116. _ = s.SetWriteDeadline(t) // SetWriteDeadline never errors
  117. return nil
  118. }
  119. // CloseForShutdown closes a stream abruptly.
  120. // It makes Read and Write unblock (and return the error) immediately.
  121. // The peer will NOT be informed about this: the stream is closed without sending a FIN or RST.
  122. func (s *stream) closeForShutdown(err error) {
  123. s.sendStream.closeForShutdown(err)
  124. s.receiveStream.closeForShutdown(err)
  125. }
  126. func (s *stream) handleResetStreamFrame(frame *wire.ResetStreamFrame) error {
  127. return s.receiveStream.handleResetStreamFrame(frame)
  128. }
  129. // checkIfCompleted is called from the uniStreamSender, when one of the stream halves is completed.
  130. // It makes sure that the onStreamCompleted callback is only called if both receive and send side have completed.
  131. func (s *stream) checkIfCompleted() {
  132. if s.sendStreamCompleted && s.receiveStreamCompleted {
  133. s.sender.onStreamCompleted(s.StreamID())
  134. }
  135. }