socket.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. // mgo - MongoDB driver for Go
  2. //
  3. // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
  4. //
  5. // All rights reserved.
  6. //
  7. // Redistribution and use in source and binary forms, with or without
  8. // modification, are permitted provided that the following conditions are met:
  9. //
  10. // 1. Redistributions of source code must retain the above copyright notice, this
  11. // list of conditions and the following disclaimer.
  12. // 2. Redistributions in binary form must reproduce the above copyright notice,
  13. // this list of conditions and the following disclaimer in the documentation
  14. // and/or other materials provided with the distribution.
  15. //
  16. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  20. // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. package mgo
  27. import (
  28. "errors"
  29. "fmt"
  30. "net"
  31. "sync"
  32. "time"
  33. "gopkg.in/mgo.v2/bson"
  34. )
  35. type replyFunc func(err error, reply *replyOp, docNum int, docData []byte)
  36. type mongoSocket struct {
  37. sync.Mutex
  38. server *mongoServer // nil when cached
  39. conn net.Conn
  40. timeout time.Duration
  41. addr string // For debugging only.
  42. nextRequestId uint32
  43. replyFuncs map[uint32]replyFunc
  44. references int
  45. creds []Credential
  46. logout []Credential
  47. cachedNonce string
  48. gotNonce sync.Cond
  49. dead error
  50. serverInfo *mongoServerInfo
  51. }
  52. type queryOpFlags uint32
  53. const (
  54. _ queryOpFlags = 1 << iota
  55. flagTailable
  56. flagSlaveOk
  57. flagLogReplay
  58. flagNoCursorTimeout
  59. flagAwaitData
  60. )
  61. type queryOp struct {
  62. collection string
  63. query interface{}
  64. skip int32
  65. limit int32
  66. selector interface{}
  67. flags queryOpFlags
  68. replyFunc replyFunc
  69. mode Mode
  70. options queryWrapper
  71. hasOptions bool
  72. serverTags []bson.D
  73. }
  74. type queryWrapper struct {
  75. Query interface{} "$query"
  76. OrderBy interface{} "$orderby,omitempty"
  77. Hint interface{} "$hint,omitempty"
  78. Explain bool "$explain,omitempty"
  79. Snapshot bool "$snapshot,omitempty"
  80. ReadPreference bson.D "$readPreference,omitempty"
  81. MaxScan int "$maxScan,omitempty"
  82. MaxTimeMS int "$maxTimeMS,omitempty"
  83. Comment string "$comment,omitempty"
  84. }
  85. func (op *queryOp) finalQuery(socket *mongoSocket) interface{} {
  86. if op.flags&flagSlaveOk != 0 && socket.ServerInfo().Mongos {
  87. var modeName string
  88. switch op.mode {
  89. case Strong:
  90. modeName = "primary"
  91. case Monotonic, Eventual:
  92. modeName = "secondaryPreferred"
  93. case PrimaryPreferred:
  94. modeName = "primaryPreferred"
  95. case Secondary:
  96. modeName = "secondary"
  97. case SecondaryPreferred:
  98. modeName = "secondaryPreferred"
  99. case Nearest:
  100. modeName = "nearest"
  101. default:
  102. panic(fmt.Sprintf("unsupported read mode: %d", op.mode))
  103. }
  104. op.hasOptions = true
  105. op.options.ReadPreference = make(bson.D, 0, 2)
  106. op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{"mode", modeName})
  107. if len(op.serverTags) > 0 {
  108. op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{"tags", op.serverTags})
  109. }
  110. }
  111. if op.hasOptions {
  112. if op.query == nil {
  113. var empty bson.D
  114. op.options.Query = empty
  115. } else {
  116. op.options.Query = op.query
  117. }
  118. debugf("final query is %#v\n", &op.options)
  119. return &op.options
  120. }
  121. return op.query
  122. }
  123. type getMoreOp struct {
  124. collection string
  125. limit int32
  126. cursorId int64
  127. replyFunc replyFunc
  128. }
  129. type replyOp struct {
  130. flags uint32
  131. cursorId int64
  132. firstDoc int32
  133. replyDocs int32
  134. }
  135. type insertOp struct {
  136. collection string // "database.collection"
  137. documents []interface{} // One or more documents to insert
  138. flags uint32
  139. }
  140. type updateOp struct {
  141. Collection string `bson:"-"` // "database.collection"
  142. Selector interface{} `bson:"q"`
  143. Update interface{} `bson:"u"`
  144. Flags uint32 `bson:"-"`
  145. Multi bool `bson:"multi,omitempty"`
  146. Upsert bool `bson:"upsert,omitempty"`
  147. }
  148. type deleteOp struct {
  149. Collection string `bson:"-"` // "database.collection"
  150. Selector interface{} `bson:"q"`
  151. Flags uint32 `bson:"-"`
  152. Limit int `bson:"limit"`
  153. }
  154. type killCursorsOp struct {
  155. cursorIds []int64
  156. }
  157. type requestInfo struct {
  158. bufferPos int
  159. replyFunc replyFunc
  160. }
  161. func newSocket(server *mongoServer, conn net.Conn, timeout time.Duration) *mongoSocket {
  162. socket := &mongoSocket{
  163. conn: conn,
  164. addr: server.Addr,
  165. server: server,
  166. replyFuncs: make(map[uint32]replyFunc),
  167. }
  168. socket.gotNonce.L = &socket.Mutex
  169. if err := socket.InitialAcquire(server.Info(), timeout); err != nil {
  170. panic("newSocket: InitialAcquire returned error: " + err.Error())
  171. }
  172. stats.socketsAlive(+1)
  173. debugf("Socket %p to %s: initialized", socket, socket.addr)
  174. socket.resetNonce()
  175. go socket.readLoop()
  176. return socket
  177. }
  178. // Server returns the server that the socket is associated with.
  179. // It returns nil while the socket is cached in its respective server.
  180. func (socket *mongoSocket) Server() *mongoServer {
  181. socket.Lock()
  182. server := socket.server
  183. socket.Unlock()
  184. return server
  185. }
  186. // ServerInfo returns details for the server at the time the socket
  187. // was initially acquired.
  188. func (socket *mongoSocket) ServerInfo() *mongoServerInfo {
  189. socket.Lock()
  190. serverInfo := socket.serverInfo
  191. socket.Unlock()
  192. return serverInfo
  193. }
  194. // InitialAcquire obtains the first reference to the socket, either
  195. // right after the connection is made or once a recycled socket is
  196. // being put back in use.
  197. func (socket *mongoSocket) InitialAcquire(serverInfo *mongoServerInfo, timeout time.Duration) error {
  198. socket.Lock()
  199. if socket.references > 0 {
  200. panic("Socket acquired out of cache with references")
  201. }
  202. if socket.dead != nil {
  203. dead := socket.dead
  204. socket.Unlock()
  205. return dead
  206. }
  207. socket.references++
  208. socket.serverInfo = serverInfo
  209. socket.timeout = timeout
  210. stats.socketsInUse(+1)
  211. stats.socketRefs(+1)
  212. socket.Unlock()
  213. return nil
  214. }
  215. // Acquire obtains an additional reference to the socket.
  216. // The socket will only be recycled when it's released as many
  217. // times as it's been acquired.
  218. func (socket *mongoSocket) Acquire() (info *mongoServerInfo) {
  219. socket.Lock()
  220. if socket.references == 0 {
  221. panic("Socket got non-initial acquire with references == 0")
  222. }
  223. // We'll track references to dead sockets as well.
  224. // Caller is still supposed to release the socket.
  225. socket.references++
  226. stats.socketRefs(+1)
  227. serverInfo := socket.serverInfo
  228. socket.Unlock()
  229. return serverInfo
  230. }
  231. // Release decrements a socket reference. The socket will be
  232. // recycled once its released as many times as it's been acquired.
  233. func (socket *mongoSocket) Release() {
  234. socket.Lock()
  235. if socket.references == 0 {
  236. panic("socket.Release() with references == 0")
  237. }
  238. socket.references--
  239. stats.socketRefs(-1)
  240. if socket.references == 0 {
  241. stats.socketsInUse(-1)
  242. server := socket.server
  243. socket.Unlock()
  244. socket.LogoutAll()
  245. // If the socket is dead server is nil.
  246. if server != nil {
  247. server.RecycleSocket(socket)
  248. }
  249. } else {
  250. socket.Unlock()
  251. }
  252. }
  253. // SetTimeout changes the timeout used on socket operations.
  254. func (socket *mongoSocket) SetTimeout(d time.Duration) {
  255. socket.Lock()
  256. socket.timeout = d
  257. socket.Unlock()
  258. }
  259. type deadlineType int
  260. const (
  261. readDeadline deadlineType = 1
  262. writeDeadline deadlineType = 2
  263. )
  264. func (socket *mongoSocket) updateDeadline(which deadlineType) {
  265. var when time.Time
  266. if socket.timeout > 0 {
  267. when = time.Now().Add(socket.timeout)
  268. }
  269. whichstr := ""
  270. switch which {
  271. case readDeadline | writeDeadline:
  272. whichstr = "read/write"
  273. socket.conn.SetDeadline(when)
  274. case readDeadline:
  275. whichstr = "read"
  276. socket.conn.SetReadDeadline(when)
  277. case writeDeadline:
  278. whichstr = "write"
  279. socket.conn.SetWriteDeadline(when)
  280. default:
  281. panic("invalid parameter to updateDeadline")
  282. }
  283. debugf("Socket %p to %s: updated %s deadline to %s ahead (%s)", socket, socket.addr, whichstr, socket.timeout, when)
  284. }
  285. // Close terminates the socket use.
  286. func (socket *mongoSocket) Close() {
  287. socket.kill(errors.New("Closed explicitly"), false)
  288. }
  289. func (socket *mongoSocket) kill(err error, abend bool) {
  290. socket.Lock()
  291. if socket.dead != nil {
  292. debugf("Socket %p to %s: killed again: %s (previously: %s)", socket, socket.addr, err.Error(), socket.dead.Error())
  293. socket.Unlock()
  294. return
  295. }
  296. logf("Socket %p to %s: closing: %s (abend=%v)", socket, socket.addr, err.Error(), abend)
  297. socket.dead = err
  298. socket.conn.Close()
  299. stats.socketsAlive(-1)
  300. replyFuncs := socket.replyFuncs
  301. socket.replyFuncs = make(map[uint32]replyFunc)
  302. server := socket.server
  303. socket.server = nil
  304. socket.gotNonce.Broadcast()
  305. socket.Unlock()
  306. for _, replyFunc := range replyFuncs {
  307. logf("Socket %p to %s: notifying replyFunc of closed socket: %s", socket, socket.addr, err.Error())
  308. replyFunc(err, nil, -1, nil)
  309. }
  310. if abend {
  311. server.AbendSocket(socket)
  312. }
  313. }
  314. func (socket *mongoSocket) SimpleQuery(op *queryOp) (data []byte, err error) {
  315. var wait, change sync.Mutex
  316. var replyDone bool
  317. var replyData []byte
  318. var replyErr error
  319. wait.Lock()
  320. op.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
  321. change.Lock()
  322. if !replyDone {
  323. replyDone = true
  324. replyErr = err
  325. if err == nil {
  326. replyData = docData
  327. }
  328. }
  329. change.Unlock()
  330. wait.Unlock()
  331. }
  332. err = socket.Query(op)
  333. if err != nil {
  334. return nil, err
  335. }
  336. wait.Lock()
  337. change.Lock()
  338. data = replyData
  339. err = replyErr
  340. change.Unlock()
  341. return data, err
  342. }
  343. func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
  344. if lops := socket.flushLogout(); len(lops) > 0 {
  345. ops = append(lops, ops...)
  346. }
  347. buf := make([]byte, 0, 256)
  348. // Serialize operations synchronously to avoid interrupting
  349. // other goroutines while we can't really be sending data.
  350. // Also, record id positions so that we can compute request
  351. // ids at once later with the lock already held.
  352. requests := make([]requestInfo, len(ops))
  353. requestCount := 0
  354. for _, op := range ops {
  355. debugf("Socket %p to %s: serializing op: %#v", socket, socket.addr, op)
  356. if qop, ok := op.(*queryOp); ok {
  357. if cmd, ok := qop.query.(*findCmd); ok {
  358. debugf("Socket %p to %s: find command: %#v", socket, socket.addr, cmd)
  359. }
  360. }
  361. start := len(buf)
  362. var replyFunc replyFunc
  363. switch op := op.(type) {
  364. case *updateOp:
  365. buf = addHeader(buf, 2001)
  366. buf = addInt32(buf, 0) // Reserved
  367. buf = addCString(buf, op.Collection)
  368. buf = addInt32(buf, int32(op.Flags))
  369. debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.Selector)
  370. buf, err = addBSON(buf, op.Selector)
  371. if err != nil {
  372. return err
  373. }
  374. debugf("Socket %p to %s: serializing update document: %#v", socket, socket.addr, op.Update)
  375. buf, err = addBSON(buf, op.Update)
  376. if err != nil {
  377. return err
  378. }
  379. case *insertOp:
  380. buf = addHeader(buf, 2002)
  381. buf = addInt32(buf, int32(op.flags))
  382. buf = addCString(buf, op.collection)
  383. for _, doc := range op.documents {
  384. debugf("Socket %p to %s: serializing document for insertion: %#v", socket, socket.addr, doc)
  385. buf, err = addBSON(buf, doc)
  386. if err != nil {
  387. return err
  388. }
  389. }
  390. case *queryOp:
  391. buf = addHeader(buf, 2004)
  392. buf = addInt32(buf, int32(op.flags))
  393. buf = addCString(buf, op.collection)
  394. buf = addInt32(buf, op.skip)
  395. buf = addInt32(buf, op.limit)
  396. buf, err = addBSON(buf, op.finalQuery(socket))
  397. if err != nil {
  398. return err
  399. }
  400. if op.selector != nil {
  401. buf, err = addBSON(buf, op.selector)
  402. if err != nil {
  403. return err
  404. }
  405. }
  406. replyFunc = op.replyFunc
  407. case *getMoreOp:
  408. buf = addHeader(buf, 2005)
  409. buf = addInt32(buf, 0) // Reserved
  410. buf = addCString(buf, op.collection)
  411. buf = addInt32(buf, op.limit)
  412. buf = addInt64(buf, op.cursorId)
  413. replyFunc = op.replyFunc
  414. case *deleteOp:
  415. buf = addHeader(buf, 2006)
  416. buf = addInt32(buf, 0) // Reserved
  417. buf = addCString(buf, op.Collection)
  418. buf = addInt32(buf, int32(op.Flags))
  419. debugf("Socket %p to %s: serializing selector document: %#v", socket, socket.addr, op.Selector)
  420. buf, err = addBSON(buf, op.Selector)
  421. if err != nil {
  422. return err
  423. }
  424. case *killCursorsOp:
  425. buf = addHeader(buf, 2007)
  426. buf = addInt32(buf, 0) // Reserved
  427. buf = addInt32(buf, int32(len(op.cursorIds)))
  428. for _, cursorId := range op.cursorIds {
  429. buf = addInt64(buf, cursorId)
  430. }
  431. default:
  432. panic("internal error: unknown operation type")
  433. }
  434. setInt32(buf, start, int32(len(buf)-start))
  435. if replyFunc != nil {
  436. request := &requests[requestCount]
  437. request.replyFunc = replyFunc
  438. request.bufferPos = start
  439. requestCount++
  440. }
  441. }
  442. // Buffer is ready for the pipe. Lock, allocate ids, and enqueue.
  443. socket.Lock()
  444. if socket.dead != nil {
  445. dead := socket.dead
  446. socket.Unlock()
  447. debugf("Socket %p to %s: failing query, already closed: %s", socket, socket.addr, socket.dead.Error())
  448. // XXX This seems necessary in case the session is closed concurrently
  449. // with a query being performed, but it's not yet tested:
  450. for i := 0; i != requestCount; i++ {
  451. request := &requests[i]
  452. if request.replyFunc != nil {
  453. request.replyFunc(dead, nil, -1, nil)
  454. }
  455. }
  456. return dead
  457. }
  458. wasWaiting := len(socket.replyFuncs) > 0
  459. // Reserve id 0 for requests which should have no responses.
  460. requestId := socket.nextRequestId + 1
  461. if requestId == 0 {
  462. requestId++
  463. }
  464. socket.nextRequestId = requestId + uint32(requestCount)
  465. for i := 0; i != requestCount; i++ {
  466. request := &requests[i]
  467. setInt32(buf, request.bufferPos+4, int32(requestId))
  468. socket.replyFuncs[requestId] = request.replyFunc
  469. requestId++
  470. }
  471. debugf("Socket %p to %s: sending %d op(s) (%d bytes)", socket, socket.addr, len(ops), len(buf))
  472. stats.sentOps(len(ops))
  473. socket.updateDeadline(writeDeadline)
  474. _, err = socket.conn.Write(buf)
  475. if !wasWaiting && requestCount > 0 {
  476. socket.updateDeadline(readDeadline)
  477. }
  478. socket.Unlock()
  479. return err
  480. }
  481. func fill(r net.Conn, b []byte) error {
  482. l := len(b)
  483. n, err := r.Read(b)
  484. for n != l && err == nil {
  485. var ni int
  486. ni, err = r.Read(b[n:])
  487. n += ni
  488. }
  489. return err
  490. }
  491. // Estimated minimum cost per socket: 1 goroutine + memory for the largest
  492. // document ever seen.
  493. func (socket *mongoSocket) readLoop() {
  494. p := make([]byte, 36) // 16 from header + 20 from OP_REPLY fixed fields
  495. s := make([]byte, 4)
  496. conn := socket.conn // No locking, conn never changes.
  497. for {
  498. err := fill(conn, p)
  499. if err != nil {
  500. socket.kill(err, true)
  501. return
  502. }
  503. totalLen := getInt32(p, 0)
  504. responseTo := getInt32(p, 8)
  505. opCode := getInt32(p, 12)
  506. // Don't use socket.server.Addr here. socket is not
  507. // locked and socket.server may go away.
  508. debugf("Socket %p to %s: got reply (%d bytes)", socket, socket.addr, totalLen)
  509. _ = totalLen
  510. if opCode != 1 {
  511. socket.kill(errors.New("opcode != 1, corrupted data?"), true)
  512. return
  513. }
  514. reply := replyOp{
  515. flags: uint32(getInt32(p, 16)),
  516. cursorId: getInt64(p, 20),
  517. firstDoc: getInt32(p, 28),
  518. replyDocs: getInt32(p, 32),
  519. }
  520. stats.receivedOps(+1)
  521. stats.receivedDocs(int(reply.replyDocs))
  522. socket.Lock()
  523. replyFunc, ok := socket.replyFuncs[uint32(responseTo)]
  524. if ok {
  525. delete(socket.replyFuncs, uint32(responseTo))
  526. }
  527. socket.Unlock()
  528. if replyFunc != nil && reply.replyDocs == 0 {
  529. replyFunc(nil, &reply, -1, nil)
  530. } else {
  531. for i := 0; i != int(reply.replyDocs); i++ {
  532. err := fill(conn, s)
  533. if err != nil {
  534. if replyFunc != nil {
  535. replyFunc(err, nil, -1, nil)
  536. }
  537. socket.kill(err, true)
  538. return
  539. }
  540. b := make([]byte, int(getInt32(s, 0)))
  541. // copy(b, s) in an efficient way.
  542. b[0] = s[0]
  543. b[1] = s[1]
  544. b[2] = s[2]
  545. b[3] = s[3]
  546. err = fill(conn, b[4:])
  547. if err != nil {
  548. if replyFunc != nil {
  549. replyFunc(err, nil, -1, nil)
  550. }
  551. socket.kill(err, true)
  552. return
  553. }
  554. if globalDebug && globalLogger != nil {
  555. m := bson.M{}
  556. if err := bson.Unmarshal(b, m); err == nil {
  557. debugf("Socket %p to %s: received document: %#v", socket, socket.addr, m)
  558. }
  559. }
  560. if replyFunc != nil {
  561. replyFunc(nil, &reply, i, b)
  562. }
  563. // XXX Do bound checking against totalLen.
  564. }
  565. }
  566. socket.Lock()
  567. if len(socket.replyFuncs) == 0 {
  568. // Nothing else to read for now. Disable deadline.
  569. socket.conn.SetReadDeadline(time.Time{})
  570. } else {
  571. socket.updateDeadline(readDeadline)
  572. }
  573. socket.Unlock()
  574. // XXX Do bound checking against totalLen.
  575. }
  576. }
  577. var emptyHeader = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
  578. func addHeader(b []byte, opcode int) []byte {
  579. i := len(b)
  580. b = append(b, emptyHeader...)
  581. // Enough for current opcodes.
  582. b[i+12] = byte(opcode)
  583. b[i+13] = byte(opcode >> 8)
  584. return b
  585. }
  586. func addInt32(b []byte, i int32) []byte {
  587. return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24))
  588. }
  589. func addInt64(b []byte, i int64) []byte {
  590. return append(b, byte(i), byte(i>>8), byte(i>>16), byte(i>>24),
  591. byte(i>>32), byte(i>>40), byte(i>>48), byte(i>>56))
  592. }
  593. func addCString(b []byte, s string) []byte {
  594. b = append(b, []byte(s)...)
  595. b = append(b, 0)
  596. return b
  597. }
  598. func addBSON(b []byte, doc interface{}) ([]byte, error) {
  599. if doc == nil {
  600. return append(b, 5, 0, 0, 0, 0), nil
  601. }
  602. data, err := bson.Marshal(doc)
  603. if err != nil {
  604. return b, err
  605. }
  606. return append(b, data...), nil
  607. }
  608. func setInt32(b []byte, pos int, i int32) {
  609. b[pos] = byte(i)
  610. b[pos+1] = byte(i >> 8)
  611. b[pos+2] = byte(i >> 16)
  612. b[pos+3] = byte(i >> 24)
  613. }
  614. func getInt32(b []byte, pos int) int32 {
  615. return (int32(b[pos+0])) |
  616. (int32(b[pos+1]) << 8) |
  617. (int32(b[pos+2]) << 16) |
  618. (int32(b[pos+3]) << 24)
  619. }
  620. func getInt64(b []byte, pos int) int64 {
  621. return (int64(b[pos+0])) |
  622. (int64(b[pos+1]) << 8) |
  623. (int64(b[pos+2]) << 16) |
  624. (int64(b[pos+3]) << 24) |
  625. (int64(b[pos+4]) << 32) |
  626. (int64(b[pos+5]) << 40) |
  627. (int64(b[pos+6]) << 48) |
  628. (int64(b[pos+7]) << 56)
  629. }