123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504 |
- // mgo - MongoDB driver for Go
- //
- // Copyright (c) 2010-2015 - Gustavo Niemeyer <gustavo@niemeyer.net>
- //
- // All rights reserved.
- //
- // Redistribution and use in source and binary forms, with or without
- // modification, are permitted provided that the following conditions are met:
- //
- // 1. Redistributions of source code must retain the above copyright notice, this
- // list of conditions and the following disclaimer.
- // 2. Redistributions in binary form must reproduce the above copyright notice,
- // this list of conditions and the following disclaimer in the documentation
- // and/or other materials provided with the distribution.
- //
- // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
- // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
- // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- package mgo_test
- import (
- . "gopkg.in/check.v1"
- "gopkg.in/mgo.v2"
- )
- func (s *S) TestBulkInsert(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- bulk := coll.Bulk()
- bulk.Insert(M{"n": 1})
- bulk.Insert(M{"n": 2}, M{"n": 3})
- r, err := bulk.Run()
- c.Assert(err, IsNil)
- c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{1}, {2}, {3}})
- }
- func (s *S) TestBulkInsertError(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- bulk := coll.Bulk()
- bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}, M{"_id": 3})
- _, err = bulk.Run()
- c.Assert(err, ErrorMatches, ".*duplicate key.*")
- c.Assert(mgo.IsDup(err), Equals, true)
- type doc struct {
- N int `_id`
- }
- var res []doc
- err = coll.Find(nil).Sort("_id").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{1}, {2}})
- }
- func (s *S) TestBulkInsertErrorUnordered(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- bulk := coll.Bulk()
- bulk.Unordered()
- bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2}, M{"_id": 3})
- _, err = bulk.Run()
- c.Assert(err, ErrorMatches, ".*duplicate key.*")
- type doc struct {
- N int `_id`
- }
- var res []doc
- err = coll.Find(nil).Sort("_id").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{1}, {2}, {3}})
- }
- func (s *S) TestBulkInsertErrorUnorderedSplitBatch(c *C) {
- // The server has a batch limit of 1000 documents when using write commands.
- // This artificial limit did not exist with the old wire protocol, so to
- // avoid compatibility issues the implementation internally split batches
- // into the proper size and delivers them one by one. This test ensures that
- // the behavior of unordered (that is, continue on error) remains correct
- // when errors happen and there are batches left.
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- bulk := coll.Bulk()
- bulk.Unordered()
- const total = 4096
- type doc struct {
- Id int `_id`
- }
- docs := make([]interface{}, total)
- for i := 0; i < total; i++ {
- docs[i] = doc{i}
- }
- docs[1] = doc{0}
- bulk.Insert(docs...)
- _, err = bulk.Run()
- c.Assert(err, ErrorMatches, ".*duplicate key.*")
- n, err := coll.Count()
- c.Assert(err, IsNil)
- c.Assert(n, Equals, total-1)
- var res doc
- err = coll.FindId(1500).One(&res)
- c.Assert(err, IsNil)
- c.Assert(res.Id, Equals, 1500)
- }
- func (s *S) TestBulkErrorString(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- // If it's just the same string multiple times, join it into a single message.
- bulk := coll.Bulk()
- bulk.Unordered()
- bulk.Insert(M{"_id": 1}, M{"_id": 2}, M{"_id": 2})
- _, err = bulk.Run()
- c.Assert(err, ErrorMatches, ".*duplicate key.*")
- c.Assert(err, Not(ErrorMatches), ".*duplicate key.*duplicate key")
- c.Assert(mgo.IsDup(err), Equals, true)
- // With matching errors but different messages, present them all.
- bulk = coll.Bulk()
- bulk.Unordered()
- bulk.Insert(M{"_id": "dupone"}, M{"_id": "dupone"}, M{"_id": "duptwo"}, M{"_id": "duptwo"})
- _, err = bulk.Run()
- if s.versionAtLeast(2, 6) {
- c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n( - .*duplicate.*\n){2}$")
- c.Assert(err, ErrorMatches, "(?s).*dupone.*")
- c.Assert(err, ErrorMatches, "(?s).*duptwo.*")
- } else {
- // Wire protocol query doesn't return all errors.
- c.Assert(err, ErrorMatches, ".*duplicate.*")
- }
- c.Assert(mgo.IsDup(err), Equals, true)
- // With mixed errors, present them all.
- bulk = coll.Bulk()
- bulk.Unordered()
- bulk.Insert(M{"_id": 1}, M{"_id": []int{2}})
- _, err = bulk.Run()
- if s.versionAtLeast(2, 6) {
- c.Assert(err, ErrorMatches, "multiple errors in bulk operation:\n - .*duplicate.*\n - .*array.*\n$")
- } else {
- // Wire protocol query doesn't return all errors.
- c.Assert(err, ErrorMatches, ".*array.*")
- }
- c.Assert(mgo.IsDup(err), Equals, false)
- }
- func (s *S) TestBulkErrorCases_2_6(c *C) {
- if !s.versionAtLeast(2, 6) {
- c.Skip("2.4- has poor bulk reporting")
- }
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- bulk := coll.Bulk()
- bulk.Unordered()
- // There's a limit of 1000 operations per command, so
- // this forces the more complex indexing logic to act.
- for i := 0; i < 1010; i++ {
- switch i {
- case 3, 14:
- bulk.Insert(M{"_id": "dupone"})
- case 5, 106:
- bulk.Update(M{"_id": i - 1}, M{"$set": M{"_id": 4}})
- case 7, 1008:
- bulk.Insert(M{"_id": "duptwo"})
- default:
- bulk.Insert(M{"_id": i})
- }
- }
- _, err = bulk.Run()
- ecases := err.(*mgo.BulkError).Cases()
- c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*dupone.*")
- c.Check(ecases[0].Index, Equals, 14)
- c.Check(ecases[1].Err, ErrorMatches, ".*update.*_id.*")
- c.Check(ecases[1].Index, Equals, 106)
- c.Check(ecases[2].Err, ErrorMatches, ".*duplicate.*duptwo.*")
- c.Check(ecases[2].Index, Equals, 1008)
- }
- func (s *S) TestBulkErrorCases_2_4(c *C) {
- if s.versionAtLeast(2, 6) {
- c.Skip("2.6+ has better reporting")
- }
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- bulk := coll.Bulk()
- bulk.Unordered()
- // There's a limit of 1000 operations per command, so
- // this forces the more complex indexing logic to act.
- for i := 0; i < 1010; i++ {
- switch i {
- case 3, 14:
- bulk.Insert(M{"_id": "dupone"})
- case 5:
- bulk.Update(M{"_id": i - 1}, M{"$set": M{"n": 4}})
- case 106:
- bulk.Update(M{"_id": i - 1}, M{"$bogus": M{"n": 4}})
- case 7, 1008:
- bulk.Insert(M{"_id": "duptwo"})
- default:
- bulk.Insert(M{"_id": i})
- }
- }
- _, err = bulk.Run()
- ecases := err.(*mgo.BulkError).Cases()
- c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*duptwo.*")
- c.Check(ecases[0].Index, Equals, -1)
- c.Check(ecases[1].Err, ErrorMatches, `.*\$bogus.*`)
- c.Check(ecases[1].Index, Equals, 106)
- }
- func (s *S) TestBulkErrorCasesOrdered(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- bulk := coll.Bulk()
- // There's a limit of 1000 operations per command, so
- // this forces the more complex indexing logic to act.
- for i := 0; i < 20; i++ {
- switch i {
- case 3, 14:
- bulk.Insert(M{"_id": "dupone"})
- case 7, 17:
- bulk.Insert(M{"_id": "duptwo"})
- default:
- bulk.Insert(M{"_id": i})
- }
- }
- _, err = bulk.Run()
- ecases := err.(*mgo.BulkError).Cases()
- c.Check(ecases[0].Err, ErrorMatches, ".*duplicate.*dupone.*")
- if s.versionAtLeast(2, 6) {
- c.Check(ecases[0].Index, Equals, 14)
- } else {
- c.Check(ecases[0].Index, Equals, -1)
- }
- c.Check(ecases, HasLen, 1)
- }
- func (s *S) TestBulkUpdate(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
- c.Assert(err, IsNil)
- bulk := coll.Bulk()
- bulk.Update(M{"n": 1}, M{"$set": M{"n": 1}})
- bulk.Update(M{"n": 2}, M{"$set": M{"n": 20}})
- bulk.Update(M{"n": 5}, M{"$set": M{"n": 50}}) // Won't match.
- bulk.Update(M{"n": 1}, M{"$set": M{"n": 10}}, M{"n": 3}, M{"$set": M{"n": 30}})
- r, err := bulk.Run()
- c.Assert(err, IsNil)
- c.Assert(r.Matched, Equals, 4)
- if s.versionAtLeast(2, 6) {
- c.Assert(r.Modified, Equals, 3)
- }
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{10}, {20}, {30}})
- }
- func (s *S) TestBulkUpdateError(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
- c.Assert(err, IsNil)
- bulk := coll.Bulk()
- bulk.Update(
- M{"n": 1}, M{"$set": M{"n": 10}},
- M{"n": 2}, M{"$set": M{"n": 20, "_id": 20}},
- M{"n": 3}, M{"$set": M{"n": 30}},
- )
- r, err := bulk.Run()
- c.Assert(err, ErrorMatches, ".*_id.*")
- c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{2}, {3}, {10}})
- }
- func (s *S) TestBulkUpdateErrorUnordered(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
- c.Assert(err, IsNil)
- bulk := coll.Bulk()
- bulk.Unordered()
- bulk.Update(
- M{"n": 1}, M{"$set": M{"n": 10}},
- M{"n": 2}, M{"$set": M{"n": 20, "_id": 20}},
- M{"n": 3}, M{"$set": M{"n": 30}},
- )
- r, err := bulk.Run()
- c.Assert(err, ErrorMatches, ".*_id.*")
- c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{2}, {10}, {30}})
- }
- func (s *S) TestBulkUpdateAll(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
- c.Assert(err, IsNil)
- bulk := coll.Bulk()
- bulk.UpdateAll(M{"n": 1}, M{"$set": M{"n": 10}})
- bulk.UpdateAll(M{"n": 2}, M{"$set": M{"n": 2}}) // Won't change.
- bulk.UpdateAll(M{"n": 5}, M{"$set": M{"n": 50}}) // Won't match.
- bulk.UpdateAll(M{}, M{"$inc": M{"n": 1}}, M{"n": 11}, M{"$set": M{"n": 5}})
- r, err := bulk.Run()
- c.Assert(err, IsNil)
- c.Assert(r.Matched, Equals, 6)
- if s.versionAtLeast(2, 6) {
- c.Assert(r.Modified, Equals, 5)
- }
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{3}, {4}, {5}})
- }
- func (s *S) TestBulkMixedUnordered(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- // Abuse undefined behavior to ensure the desired implementation is in place.
- bulk := coll.Bulk()
- bulk.Unordered()
- bulk.Insert(M{"n": 1})
- bulk.Update(M{"n": 2}, M{"$inc": M{"n": 1}})
- bulk.Insert(M{"n": 2})
- bulk.Update(M{"n": 3}, M{"$inc": M{"n": 1}})
- bulk.Update(M{"n": 1}, M{"$inc": M{"n": 1}})
- bulk.Insert(M{"n": 3})
- r, err := bulk.Run()
- c.Assert(err, IsNil)
- c.Assert(r.Matched, Equals, 3)
- if s.versionAtLeast(2, 6) {
- c.Assert(r.Modified, Equals, 3)
- }
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{2}, {3}, {4}})
- }
- func (s *S) TestBulkUpsert(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3})
- c.Assert(err, IsNil)
- bulk := coll.Bulk()
- bulk.Upsert(M{"n": 2}, M{"$set": M{"n": 20}})
- bulk.Upsert(M{"n": 4}, M{"$set": M{"n": 40}}, M{"n": 3}, M{"$set": M{"n": 30}})
- r, err := bulk.Run()
- c.Assert(err, IsNil)
- c.Assert(r, FitsTypeOf, &mgo.BulkResult{})
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{1}, {20}, {30}, {40}})
- }
- func (s *S) TestBulkRemove(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}, M{"n": 4}, M{"n": 4})
- c.Assert(err, IsNil)
- bulk := coll.Bulk()
- bulk.Remove(M{"n": 1})
- bulk.Remove(M{"n": 2}, M{"n": 4})
- r, err := bulk.Run()
- c.Assert(err, IsNil)
- c.Assert(r.Matched, Equals, 3)
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{3}, {4}})
- }
- func (s *S) TestBulkRemoveAll(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"n": 1}, M{"n": 2}, M{"n": 3}, M{"n": 4}, M{"n": 4})
- c.Assert(err, IsNil)
- bulk := coll.Bulk()
- bulk.RemoveAll(M{"n": 1})
- bulk.RemoveAll(M{"n": 2}, M{"n": 4})
- r, err := bulk.Run()
- c.Assert(err, IsNil)
- c.Assert(r.Matched, Equals, 4)
- type doc struct{ N int }
- var res []doc
- err = coll.Find(nil).Sort("n").All(&res)
- c.Assert(err, IsNil)
- c.Assert(res, DeepEquals, []doc{{3}})
- }
|