bulk_test.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504
  1. // mgo - MongoDB driver for Go
  2. //
  3. // Copyright (c) 2010-2015 - Gustavo Niemeyer <gustavo@niemeyer.net>
  4. //
  5. // All rights reserved.
  6. //
  7. // Redistribution and use in source and binary forms, with or without
  8. // modification, are permitted provided that the following conditions are met:
  9. //
  10. // 1. Redistributions of source code must retain the above copyright notice, this
  11. // list of conditions and the following disclaimer.
  12. // 2. Redistributions in binary form must reproduce the above copyright notice,
  13. // this list of conditions and the following disclaimer in the documentation
  14. // and/or other materials provided with the distribution.
  15. //
  16. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  20. // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. package mgo_test
  27. import (
  28. . "gopkg.in/check.v1"
  29. "gopkg.in/mgo.v2"
  30. )
  31. func (s *S) TestBulkInsert(c *C) {
  32. session, err := mgo.Dial("localhost:40001")
  33. c.Assert(err, IsNil)
  34. defer session.Close()
  35. coll := session.DB("mydb").C("mycoll")
  36. bulk := coll.Bulk()
  37. bulk.Insert(M{"n": 1})
  38. bulk.Insert(M{"n": 2}, M{"n": 3})
  39. r, err := bulk.Run()
  40. c.Assert(err, IsNil)
  41. c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
  42. type doc struct{ N int }
  43. var res []doc
  44. err = coll.Find(nil).Sort("n").All(&res)
  45. c.Assert(err, IsNil)
  46. c.Assert(res, DeepEquals, []doc{{1}, {2}, {3}})
  47. }
  48. func (s *S) TestBulkInsertError(c *C) {
  49. session, err := mgo.Dial("localhost:40001")
  50. c.Assert(err, IsNil)
  51. defer session.Close()
  52. coll := session.DB("mydb").C("mycoll")
  53. bulk := coll.Bulk()
  54. bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}, M{"_id": 3})
  55. _, err = bulk.Run()
  56. c.Assert(err, ErrorMatches, ".*duplicate key.*")
  57. c.Assert(mgo.IsDup(err), Equals, true)
  58. type doc struct {
  59. N int `_id`
  60. }
  61. var res []doc
  62. err = coll.Find(nil).Sort("_id").All(&res)
  63. c.Assert(err, IsNil)
  64. c.Assert(res, DeepEquals, []doc{{1}, {2}})
  65. }
  66. func (s *S) TestBulkInsertErrorUnordered(c *C) {
  67. session, err := mgo.Dial("localhost:40001")
  68. c.Assert(err, IsNil)
  69. defer session.Close()
  70. coll := session.DB("mydb").C("mycoll")
  71. bulk := coll.Bulk()
  72. bulk.Unordered()
  73. bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}, M{"_id": 3})
  74. _, err = bulk.Run()
  75. c.Assert(err, ErrorMatches, ".*duplicate key.*")
  76. type doc struct {
  77. N int `_id`
  78. }
  79. var res []doc
  80. err = coll.Find(nil).Sort("_id").All(&res)
  81. c.Assert(err, IsNil)
  82. c.Assert(res, DeepEquals, []doc{{1}, {2}, {3}})
  83. }
  84. func (s *S) TestBulkInsertErrorUnorderedSplitBatch(c *C) {
  85. // The server has a batch limit of 1000 documents when using write commands.
  86. // This artificial limit did not exist with the old wire protocol, so to
  87. // avoid compatibility issues the implementation internally split batches
  88. // into the proper size and delivers them one by one. This test ensures that
  89. // the behavior of unordered (that is, continue on error) remains correct
  90. // when errors happen and there are batches left.
  91. session, err := mgo.Dial("localhost:40001")
  92. c.Assert(err, IsNil)
  93. defer session.Close()
  94. coll := session.DB("mydb").C("mycoll")
  95. bulk := coll.Bulk()
  96. bulk.Unordered()
  97. const total = 4096
  98. type doc struct {
  99. Id int `_id`
  100. }
  101. docs := make([]interface{}, total)
  102. for i := 0; i < total; i++ {
  103. docs[i] = doc{i}
  104. }
  105. docs[1] = doc{0}
  106. bulk.Insert(docs...)
  107. _, err = bulk.Run()
  108. c.Assert(err, ErrorMatches, ".*duplicate key.*")
  109. n, err := coll.Count()
  110. c.Assert(err, IsNil)
  111. c.Assert(n, Equals, total-1)
  112. var res doc
  113. err = coll.FindId(1500).One(&res)
  114. c.Assert(err, IsNil)
  115. c.Assert(res.Id, Equals, 1500)
  116. }
  117. func (s *S) TestBulkErrorString(c *C) {
  118. session, err := mgo.Dial("localhost:40001")
  119. c.Assert(err, IsNil)
  120. defer session.Close()
  121. coll := session.DB("mydb").C("mycoll")
  122. // If it's just the same string multiple times, join it into a single message.
  123. bulk := coll.Bulk()
  124. bulk.Unordered()
  125. bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2})
  126. _, err = bulk.Run()
  127. c.Assert(err, ErrorMatches, ".*duplicate key.*")
  128. c.Assert(err, Not(ErrorMatches), ".*duplicate key.*duplicate key")
  129. c.Assert(mgo.IsDup(err), Equals, true)
  130. // With matching errors but different messages, present them all.
  131. bulk = coll.Bulk()
  132. bulk.Unordered()
  133. bulk.Insert(M{"_id": "dupone"}, M{"_id": "dupone"}, M{"_id": "duptwo"}, M{"_id": "duptwo"})
  134. _, err = bulk.Run()
  135. if s.versionAtLeast(2, 6) {
  136. c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n( - .*duplicate.*\n){2}$")
  137. c.Assert(err, ErrorMatches, "(?s).*dupone.*")
  138. c.Assert(err, ErrorMatches, "(?s).*duptwo.*")
  139. } else {
  140. // Wire protocol query doesn't return all errors.
  141. c.Assert(err, ErrorMatches, ".*duplicate.*")
  142. }
  143. c.Assert(mgo.IsDup(err), Equals, true)
  144. // With mixed errors, present them all.
  145. bulk = coll.Bulk()
  146. bulk.Unordered()
  147. bulk.Insert(M{"_id": 1}, M{"_id": []int{2}})
  148. _, err = bulk.Run()
  149. if s.versionAtLeast(2, 6) {
  150. c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n - .*duplicate.*\n - .*array.*\n$")
  151. } else {
  152. // Wire protocol query doesn't return all errors.
  153. c.Assert(err, ErrorMatches, ".*array.*")
  154. }
  155. c.Assert(mgo.IsDup(err), Equals, false)
  156. }
  157. func (s *S) TestBulkErrorCases_2_6(c *C) {
  158. if !s.versionAtLeast(2, 6) {
  159. c.Skip("2.4- has poor bulk reporting")
  160. }
  161. session, err := mgo.Dial("localhost:40001")
  162. c.Assert(err, IsNil)
  163. defer session.Close()
  164. coll := session.DB("mydb").C("mycoll")
  165. bulk := coll.Bulk()
  166. bulk.Unordered()
  167. // There's a limit of 1000 operations per command, so
  168. // this forces the more complex indexing logic to act.
  169. for i := 0; i < 1010; i++ {
  170. switch i {
  171. case 3, 14:
  172. bulk.Insert(M{"_id": "dupone"})
  173. case 5, 106:
  174. bulk.Update(M{"_id": i - 1}, M{"$set": M{"_id": 4}})
  175. case 7, 1008:
  176. bulk.Insert(M{"_id": "duptwo"})
  177. default:
  178. bulk.Insert(M{"_id": i})
  179. }
  180. }
  181. _, err = bulk.Run()
  182. ecases := err.(*mgo.BulkError).Cases()
  183. c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*dupone.*")
  184. c.Check(ecases[0].Index, Equals, 14)
  185. c.Check(ecases[1].Err, ErrorMatches, ".*update.*_id.*")
  186. c.Check(ecases[1].Index, Equals, 106)
  187. c.Check(ecases[2].Err, ErrorMatches, ".*duplicate.*duptwo.*")
  188. c.Check(ecases[2].Index, Equals, 1008)
  189. }
  190. func (s *S) TestBulkErrorCases_2_4(c *C) {
  191. if s.versionAtLeast(2, 6) {
  192. c.Skip("2.6+ has better reporting")
  193. }
  194. session, err := mgo.Dial("localhost:40001")
  195. c.Assert(err, IsNil)
  196. defer session.Close()
  197. coll := session.DB("mydb").C("mycoll")
  198. bulk := coll.Bulk()
  199. bulk.Unordered()
  200. // There's a limit of 1000 operations per command, so
  201. // this forces the more complex indexing logic to act.
  202. for i := 0; i < 1010; i++ {
  203. switch i {
  204. case 3, 14:
  205. bulk.Insert(M{"_id": "dupone"})
  206. case 5:
  207. bulk.Update(M{"_id": i - 1}, M{"$set": M{"n": 4}})
  208. case 106:
  209. bulk.Update(M{"_id": i - 1}, M{"$bogus": M{"n": 4}})
  210. case 7, 1008:
  211. bulk.Insert(M{"_id": "duptwo"})
  212. default:
  213. bulk.Insert(M{"_id": i})
  214. }
  215. }
  216. _, err = bulk.Run()
  217. ecases := err.(*mgo.BulkError).Cases()
  218. c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*duptwo.*")
  219. c.Check(ecases[0].Index, Equals, -1)
  220. c.Check(ecases[1].Err, ErrorMatches, `.*\$bogus.*`)
  221. c.Check(ecases[1].Index, Equals, 106)
  222. }
  223. func (s *S) TestBulkErrorCasesOrdered(c *C) {
  224. session, err := mgo.Dial("localhost:40001")
  225. c.Assert(err, IsNil)
  226. defer session.Close()
  227. coll := session.DB("mydb").C("mycoll")
  228. bulk := coll.Bulk()
  229. // There's a limit of 1000 operations per command, so
  230. // this forces the more complex indexing logic to act.
  231. for i := 0; i < 20; i++ {
  232. switch i {
  233. case 3, 14:
  234. bulk.Insert(M{"_id": "dupone"})
  235. case 7, 17:
  236. bulk.Insert(M{"_id": "duptwo"})
  237. default:
  238. bulk.Insert(M{"_id": i})
  239. }
  240. }
  241. _, err = bulk.Run()
  242. ecases := err.(*mgo.BulkError).Cases()
  243. c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*dupone.*")
  244. if s.versionAtLeast(2, 6) {
  245. c.Check(ecases[0].Index, Equals, 14)
  246. } else {
  247. c.Check(ecases[0].Index, Equals, -1)
  248. }
  249. c.Check(ecases, HasLen, 1)
  250. }
  251. func (s *S) TestBulkUpdate(c *C) {
  252. session, err := mgo.Dial("localhost:40001")
  253. c.Assert(err, IsNil)
  254. defer session.Close()
  255. coll := session.DB("mydb").C("mycoll")
  256. err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
  257. c.Assert(err, IsNil)
  258. bulk := coll.Bulk()
  259. bulk.Update(M{"n": 1}, M{"$set": M{"n": 1}})
  260. bulk.Update(M{"n": 2}, M{"$set": M{"n": 20}})
  261. bulk.Update(M{"n": 5}, M{"$set": M{"n": 50}}) // Won't match.
  262. bulk.Update(M{"n": 1}, M{"$set": M{"n": 10}}, M{"n": 3}, M{"$set": M{"n": 30}})
  263. r, err := bulk.Run()
  264. c.Assert(err, IsNil)
  265. c.Assert(r.Matched, Equals, 4)
  266. if s.versionAtLeast(2, 6) {
  267. c.Assert(r.Modified, Equals, 3)
  268. }
  269. type doc struct{ N int }
  270. var res []doc
  271. err = coll.Find(nil).Sort("n").All(&res)
  272. c.Assert(err, IsNil)
  273. c.Assert(res, DeepEquals, []doc{{10}, {20}, {30}})
  274. }
  275. func (s *S) TestBulkUpdateError(c *C) {
  276. session, err := mgo.Dial("localhost:40001")
  277. c.Assert(err, IsNil)
  278. defer session.Close()
  279. coll := session.DB("mydb").C("mycoll")
  280. err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
  281. c.Assert(err, IsNil)
  282. bulk := coll.Bulk()
  283. bulk.Update(
  284. M{"n": 1}, M{"$set": M{"n": 10}},
  285. M{"n": 2}, M{"$set": M{"n": 20, "_id": 20}},
  286. M{"n": 3}, M{"$set": M{"n": 30}},
  287. )
  288. r, err := bulk.Run()
  289. c.Assert(err, ErrorMatches, ".*_id.*")
  290. c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
  291. type doc struct{ N int }
  292. var res []doc
  293. err = coll.Find(nil).Sort("n").All(&res)
  294. c.Assert(err, IsNil)
  295. c.Assert(res, DeepEquals, []doc{{2}, {3}, {10}})
  296. }
  297. func (s *S) TestBulkUpdateErrorUnordered(c *C) {
  298. session, err := mgo.Dial("localhost:40001")
  299. c.Assert(err, IsNil)
  300. defer session.Close()
  301. coll := session.DB("mydb").C("mycoll")
  302. err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
  303. c.Assert(err, IsNil)
  304. bulk := coll.Bulk()
  305. bulk.Unordered()
  306. bulk.Update(
  307. M{"n": 1}, M{"$set": M{"n": 10}},
  308. M{"n": 2}, M{"$set": M{"n": 20, "_id": 20}},
  309. M{"n": 3}, M{"$set": M{"n": 30}},
  310. )
  311. r, err := bulk.Run()
  312. c.Assert(err, ErrorMatches, ".*_id.*")
  313. c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
  314. type doc struct{ N int }
  315. var res []doc
  316. err = coll.Find(nil).Sort("n").All(&res)
  317. c.Assert(err, IsNil)
  318. c.Assert(res, DeepEquals, []doc{{2}, {10}, {30}})
  319. }
  320. func (s *S) TestBulkUpdateAll(c *C) {
  321. session, err := mgo.Dial("localhost:40001")
  322. c.Assert(err, IsNil)
  323. defer session.Close()
  324. coll := session.DB("mydb").C("mycoll")
  325. err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
  326. c.Assert(err, IsNil)
  327. bulk := coll.Bulk()
  328. bulk.UpdateAll(M{"n": 1}, M{"$set": M{"n": 10}})
  329. bulk.UpdateAll(M{"n": 2}, M{"$set": M{"n": 2}}) // Won't change.
  330. bulk.UpdateAll(M{"n": 5}, M{"$set": M{"n": 50}}) // Won't match.
  331. bulk.UpdateAll(M{}, M{"$inc": M{"n": 1}}, M{"n": 11}, M{"$set": M{"n": 5}})
  332. r, err := bulk.Run()
  333. c.Assert(err, IsNil)
  334. c.Assert(r.Matched, Equals, 6)
  335. if s.versionAtLeast(2, 6) {
  336. c.Assert(r.Modified, Equals, 5)
  337. }
  338. type doc struct{ N int }
  339. var res []doc
  340. err = coll.Find(nil).Sort("n").All(&res)
  341. c.Assert(err, IsNil)
  342. c.Assert(res, DeepEquals, []doc{{3}, {4}, {5}})
  343. }
  344. func (s *S) TestBulkMixedUnordered(c *C) {
  345. session, err := mgo.Dial("localhost:40001")
  346. c.Assert(err, IsNil)
  347. defer session.Close()
  348. coll := session.DB("mydb").C("mycoll")
  349. // Abuse undefined behavior to ensure the desired implementation is in place.
  350. bulk := coll.Bulk()
  351. bulk.Unordered()
  352. bulk.Insert(M{"n": 1})
  353. bulk.Update(M{"n": 2}, M{"$inc": M{"n": 1}})
  354. bulk.Insert(M{"n": 2})
  355. bulk.Update(M{"n": 3}, M{"$inc": M{"n": 1}})
  356. bulk.Update(M{"n": 1}, M{"$inc": M{"n": 1}})
  357. bulk.Insert(M{"n": 3})
  358. r, err := bulk.Run()
  359. c.Assert(err, IsNil)
  360. c.Assert(r.Matched, Equals, 3)
  361. if s.versionAtLeast(2, 6) {
  362. c.Assert(r.Modified, Equals, 3)
  363. }
  364. type doc struct{ N int }
  365. var res []doc
  366. err = coll.Find(nil).Sort("n").All(&res)
  367. c.Assert(err, IsNil)
  368. c.Assert(res, DeepEquals, []doc{{2}, {3}, {4}})
  369. }
  370. func (s *S) TestBulkUpsert(c *C) {
  371. session, err := mgo.Dial("localhost:40001")
  372. c.Assert(err, IsNil)
  373. defer session.Close()
  374. coll := session.DB("mydb").C("mycoll")
  375. err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
  376. c.Assert(err, IsNil)
  377. bulk := coll.Bulk()
  378. bulk.Upsert(M{"n": 2}, M{"$set": M{"n": 20}})
  379. bulk.Upsert(M{"n": 4}, M{"$set": M{"n": 40}}, M{"n": 3}, M{"$set": M{"n": 30}})
  380. r, err := bulk.Run()
  381. c.Assert(err, IsNil)
  382. c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
  383. type doc struct{ N int }
  384. var res []doc
  385. err = coll.Find(nil).Sort("n").All(&res)
  386. c.Assert(err, IsNil)
  387. c.Assert(res, DeepEquals, []doc{{1}, {20}, {30}, {40}})
  388. }
  389. func (s *S) TestBulkRemove(c *C) {
  390. session, err := mgo.Dial("localhost:40001")
  391. c.Assert(err, IsNil)
  392. defer session.Close()
  393. coll := session.DB("mydb").C("mycoll")
  394. err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}, M{"n": 4}, M{"n": 4})
  395. c.Assert(err, IsNil)
  396. bulk := coll.Bulk()
  397. bulk.Remove(M{"n": 1})
  398. bulk.Remove(M{"n": 2}, M{"n": 4})
  399. r, err := bulk.Run()
  400. c.Assert(err, IsNil)
  401. c.Assert(r.Matched, Equals, 3)
  402. type doc struct{ N int }
  403. var res []doc
  404. err = coll.Find(nil).Sort("n").All(&res)
  405. c.Assert(err, IsNil)
  406. c.Assert(res, DeepEquals, []doc{{3}, {4}})
  407. }
  408. func (s *S) TestBulkRemoveAll(c *C) {
  409. session, err := mgo.Dial("localhost:40001")
  410. c.Assert(err, IsNil)
  411. defer session.Close()
  412. coll := session.DB("mydb").C("mycoll")
  413. err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}, M{"n": 4}, M{"n": 4})
  414. c.Assert(err, IsNil)
  415. bulk := coll.Bulk()
  416. bulk.RemoveAll(M{"n": 1})
  417. bulk.RemoveAll(M{"n": 2}, M{"n": 4})
  418. r, err := bulk.Run()
  419. c.Assert(err, IsNil)
  420. c.Assert(r.Matched, Equals, 4)
  421. type doc struct{ N int }
  422. var res []doc
  423. err = coll.Find(nil).Sort("n").All(&res)
  424. c.Assert(err, IsNil)
  425. c.Assert(res, DeepEquals, []doc{{3}})
  426. }