buffer.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. /*
  2. Package gbytes provides a buffer that supports incrementally detecting input.
  3. You use gbytes.Buffer with the gbytes.Say matcher. When Say finds a match, it fastforwards the buffer's read cursor to the end of that match.
  4. Subsequent matches against the buffer will only operate against data that appears *after* the read cursor.
  5. The read cursor is an opaque implementation detail that you cannot access. You should use the Say matcher to sift through the buffer. You can always
  6. access the entire buffer's contents with Contents().
  7. */
  8. package gbytes
  9. import (
  10. "errors"
  11. "fmt"
  12. "io"
  13. "regexp"
  14. "sync"
  15. "time"
  16. )
  17. /*
  18. gbytes.Buffer implements an io.Writer and can be used with the gbytes.Say matcher.
  19. You should only use a gbytes.Buffer in test code. It stores all writes in an in-memory buffer - behavior that is inappropriate for production code!
  20. */
  21. type Buffer struct {
  22. contents []byte
  23. readCursor uint64
  24. lock *sync.Mutex
  25. detectCloser chan interface{}
  26. closed bool
  27. }
  28. /*
  29. NewBuffer returns a new gbytes.Buffer
  30. */
  31. func NewBuffer() *Buffer {
  32. return &Buffer{
  33. lock: &sync.Mutex{},
  34. }
  35. }
  36. /*
  37. BufferWithBytes returns a new gbytes.Buffer seeded with the passed in bytes
  38. */
  39. func BufferWithBytes(bytes []byte) *Buffer {
  40. return &Buffer{
  41. lock: &sync.Mutex{},
  42. contents: bytes,
  43. }
  44. }
  45. /*
  46. BufferReader returns a new gbytes.Buffer that wraps a reader. The reader's contents are read into
  47. the Buffer via io.Copy
  48. */
  49. func BufferReader(reader io.Reader) *Buffer {
  50. b := &Buffer{
  51. lock: &sync.Mutex{},
  52. }
  53. go func() {
  54. io.Copy(b, reader)
  55. b.Close()
  56. }()
  57. return b
  58. }
  59. /*
  60. Write implements the io.Writer interface
  61. */
  62. func (b *Buffer) Write(p []byte) (n int, err error) {
  63. b.lock.Lock()
  64. defer b.lock.Unlock()
  65. if b.closed {
  66. return 0, errors.New("attempt to write to closed buffer")
  67. }
  68. b.contents = append(b.contents, p...)
  69. return len(p), nil
  70. }
  71. /*
  72. Read implements the io.Reader interface. It advances the
  73. cursor as it reads.
  74. Returns an error if called after Close.
  75. */
  76. func (b *Buffer) Read(d []byte) (int, error) {
  77. b.lock.Lock()
  78. defer b.lock.Unlock()
  79. if b.closed {
  80. return 0, errors.New("attempt to read from closed buffer")
  81. }
  82. if uint64(len(b.contents)) <= b.readCursor {
  83. return 0, io.EOF
  84. }
  85. n := copy(d, b.contents[b.readCursor:])
  86. b.readCursor += uint64(n)
  87. return n, nil
  88. }
  89. /*
  90. Close signifies that the buffer will no longer be written to
  91. */
  92. func (b *Buffer) Close() error {
  93. b.lock.Lock()
  94. defer b.lock.Unlock()
  95. b.closed = true
  96. return nil
  97. }
  98. /*
  99. Closed returns true if the buffer has been closed
  100. */
  101. func (b *Buffer) Closed() bool {
  102. b.lock.Lock()
  103. defer b.lock.Unlock()
  104. return b.closed
  105. }
  106. /*
  107. Contents returns all data ever written to the buffer.
  108. */
  109. func (b *Buffer) Contents() []byte {
  110. b.lock.Lock()
  111. defer b.lock.Unlock()
  112. contents := make([]byte, len(b.contents))
  113. copy(contents, b.contents)
  114. return contents
  115. }
  116. /*
  117. Detect takes a regular expression and returns a channel.
  118. The channel will receive true the first time data matching the regular expression is written to the buffer.
  119. The channel is subsequently closed and the buffer's read-cursor is fast-forwarded to just after the matching region.
  120. You typically don't need to use Detect and should use the ghttp.Say matcher instead. Detect is useful, however, in cases where your code must
  121. be branch and handle different outputs written to the buffer.
  122. For example, consider a buffer hooked up to the stdout of a client library. You may (or may not, depending on state outside of your control) need to authenticate the client library.
  123. You could do something like:
  124. select {
  125. case <-buffer.Detect("You are not logged in"):
  126. //log in
  127. case <-buffer.Detect("Success"):
  128. //carry on
  129. case <-time.After(time.Second):
  130. //welp
  131. }
  132. buffer.CancelDetects()
  133. You should always call CancelDetects after using Detect. This will close any channels that have not detected and clean up the goroutines that were spawned to support them.
  134. Finally, you can pass detect a format string followed by variadic arguments. This will construct the regexp using fmt.Sprintf.
  135. */
  136. func (b *Buffer) Detect(desired string, args ...interface{}) chan bool {
  137. formattedRegexp := desired
  138. if len(args) > 0 {
  139. formattedRegexp = fmt.Sprintf(desired, args...)
  140. }
  141. re := regexp.MustCompile(formattedRegexp)
  142. b.lock.Lock()
  143. defer b.lock.Unlock()
  144. if b.detectCloser == nil {
  145. b.detectCloser = make(chan interface{})
  146. }
  147. closer := b.detectCloser
  148. response := make(chan bool)
  149. go func() {
  150. ticker := time.NewTicker(10 * time.Millisecond)
  151. defer ticker.Stop()
  152. defer close(response)
  153. for {
  154. select {
  155. case <-ticker.C:
  156. b.lock.Lock()
  157. data, cursor := b.contents[b.readCursor:], b.readCursor
  158. loc := re.FindIndex(data)
  159. b.lock.Unlock()
  160. if loc != nil {
  161. response <- true
  162. b.lock.Lock()
  163. newCursorPosition := cursor + uint64(loc[1])
  164. if newCursorPosition >= b.readCursor {
  165. b.readCursor = newCursorPosition
  166. }
  167. b.lock.Unlock()
  168. return
  169. }
  170. case <-closer:
  171. return
  172. }
  173. }
  174. }()
  175. return response
  176. }
  177. /*
  178. CancelDetects cancels any pending detects and cleans up their goroutines. You should always call this when you're done with a set of Detect channels.
  179. */
  180. func (b *Buffer) CancelDetects() {
  181. b.lock.Lock()
  182. defer b.lock.Unlock()
  183. close(b.detectCloser)
  184. b.detectCloser = nil
  185. }
  186. func (b *Buffer) didSay(re *regexp.Regexp) (bool, []byte) {
  187. b.lock.Lock()
  188. defer b.lock.Unlock()
  189. unreadBytes := b.contents[b.readCursor:]
  190. copyOfUnreadBytes := make([]byte, len(unreadBytes))
  191. copy(copyOfUnreadBytes, unreadBytes)
  192. loc := re.FindIndex(unreadBytes)
  193. if loc != nil {
  194. b.readCursor += uint64(loc[1])
  195. return true, copyOfUnreadBytes
  196. }
  197. return false, copyOfUnreadBytes
  198. }