12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090 |
- // mgo - MongoDB driver for Go
- //
- // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
- //
- // All rights reserved.
- //
- // Redistribution and use in source and binary forms, with or without
- // modification, are permitted provided that the following conditions are met:
- //
- // 1. Redistributions of source code must retain the above copyright notice, this
- // list of conditions and the following disclaimer.
- // 2. Redistributions in binary form must reproduce the above copyright notice,
- // this list of conditions and the following disclaimer in the documentation
- // and/or other materials provided with the distribution.
- //
- // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
- // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
- // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
- // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
- // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
- // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
- // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
- // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
- // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- package mgo_test
- import (
- "fmt"
- "io"
- "net"
- "strings"
- "sync"
- "time"
- . "gopkg.in/check.v1"
- "gopkg.in/mgo.v2"
- "gopkg.in/mgo.v2/bson"
- )
- func (s *S) TestNewSession(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- // Do a dummy operation to wait for connection.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"_id": 1})
- c.Assert(err, IsNil)
- // Tweak safety and query settings to ensure other has copied those.
- session.SetSafe(nil)
- session.SetBatch(-1)
- other := session.New()
- defer other.Close()
- session.SetSafe(&mgo.Safe{})
- // Clone was copied while session was unsafe, so no errors.
- otherColl := other.DB("mydb").C("mycoll")
- err = otherColl.Insert(M{"_id": 1})
- c.Assert(err, IsNil)
- // Original session was made safe again.
- err = coll.Insert(M{"_id": 1})
- c.Assert(err, NotNil)
- // With New(), each session has its own socket now.
- stats := mgo.GetStats()
- c.Assert(stats.MasterConns, Equals, 2)
- c.Assert(stats.SocketsInUse, Equals, 2)
- // Ensure query parameters were cloned.
- err = otherColl.Insert(M{"_id": 2})
- c.Assert(err, IsNil)
- // Ping the database to ensure the nonce has been received already.
- c.Assert(other.Ping(), IsNil)
- mgo.ResetStats()
- iter := otherColl.Find(M{}).Iter()
- c.Assert(err, IsNil)
- m := M{}
- ok := iter.Next(m)
- c.Assert(ok, Equals, true)
- err = iter.Close()
- c.Assert(err, IsNil)
- // If Batch(-1) is in effect, a single document must have been received.
- stats = mgo.GetStats()
- c.Assert(stats.ReceivedDocs, Equals, 1)
- }
- func (s *S) TestCloneSession(c *C) {
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- // Do a dummy operation to wait for connection.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"_id": 1})
- c.Assert(err, IsNil)
- // Tweak safety and query settings to ensure clone is copying those.
- session.SetSafe(nil)
- session.SetBatch(-1)
- clone := session.Clone()
- defer clone.Close()
- session.SetSafe(&mgo.Safe{})
- // Clone was copied while session was unsafe, so no errors.
- cloneColl := clone.DB("mydb").C("mycoll")
- err = cloneColl.Insert(M{"_id": 1})
- c.Assert(err, IsNil)
- // Original session was made safe again.
- err = coll.Insert(M{"_id": 1})
- c.Assert(err, NotNil)
- // With Clone(), same socket is shared between sessions now.
- stats := mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 1)
- c.Assert(stats.SocketRefs, Equals, 2)
- // Refreshing one of them should let the original socket go,
- // while preserving the safety settings.
- clone.Refresh()
- err = cloneColl.Insert(M{"_id": 1})
- c.Assert(err, IsNil)
- // Must have used another connection now.
- stats = mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 2)
- c.Assert(stats.SocketRefs, Equals, 2)
- // Ensure query parameters were cloned.
- err = cloneColl.Insert(M{"_id": 2})
- c.Assert(err, IsNil)
- // Ping the database to ensure the nonce has been received already.
- c.Assert(clone.Ping(), IsNil)
- mgo.ResetStats()
- iter := cloneColl.Find(M{}).Iter()
- c.Assert(err, IsNil)
- m := M{}
- ok := iter.Next(m)
- c.Assert(ok, Equals, true)
- err = iter.Close()
- c.Assert(err, IsNil)
- // If Batch(-1) is in effect, a single document must have been received.
- stats = mgo.GetStats()
- c.Assert(stats.ReceivedDocs, Equals, 1)
- }
- func (s *S) TestModeStrong(c *C) {
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, false)
- session.SetMode(mgo.Strong, false)
- c.Assert(session.Mode(), Equals, mgo.Strong)
- result := M{}
- cmd := session.DB("admin").C("$cmd")
- err = cmd.Find(M{"ismaster": 1}).One(&result)
- c.Assert(err, IsNil)
- c.Assert(result["ismaster"], Equals, true)
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- // Wait since the sync also uses sockets.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for cluster sync to finish...")
- time.Sleep(5e8)
- }
- stats := mgo.GetStats()
- c.Assert(stats.MasterConns, Equals, 1)
- c.Assert(stats.SlaveConns, Equals, 2)
- c.Assert(stats.SocketsInUse, Equals, 1)
- session.SetMode(mgo.Strong, true)
- stats = mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 0)
- }
- func (s *S) TestModeMonotonic(c *C) {
- // Must necessarily connect to a slave, otherwise the
- // master connection will be available first.
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, false)
- c.Assert(session.Mode(), Equals, mgo.Monotonic)
- var result struct{ IsMaster bool }
- cmd := session.DB("admin").C("$cmd")
- err = cmd.Find(M{"ismaster": 1}).One(&result)
- c.Assert(err, IsNil)
- c.Assert(result.IsMaster, Equals, false)
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- err = cmd.Find(M{"ismaster": 1}).One(&result)
- c.Assert(err, IsNil)
- c.Assert(result.IsMaster, Equals, true)
- // Wait since the sync also uses sockets.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for cluster sync to finish...")
- time.Sleep(5e8)
- }
- stats := mgo.GetStats()
- c.Assert(stats.MasterConns, Equals, 1)
- c.Assert(stats.SlaveConns, Equals, 2)
- c.Assert(stats.SocketsInUse, Equals, 2)
- session.SetMode(mgo.Monotonic, true)
- stats = mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 0)
- }
- func (s *S) TestModeMonotonicAfterStrong(c *C) {
- // Test that a strong session shifting to a monotonic
- // one preserves the socket untouched.
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- // Insert something to force a connection to the master.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- session.SetMode(mgo.Monotonic, false)
- // Wait since the sync also uses sockets.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for cluster sync to finish...")
- time.Sleep(5e8)
- }
- // Master socket should still be reserved.
- stats := mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 1)
- // Confirm it's the master even though it's Monotonic by now.
- result := M{}
- cmd := session.DB("admin").C("$cmd")
- err = cmd.Find(M{"ismaster": 1}).One(&result)
- c.Assert(err, IsNil)
- c.Assert(result["ismaster"], Equals, true)
- }
- func (s *S) TestModeStrongAfterMonotonic(c *C) {
- // Test that shifting from Monotonic to Strong while
- // using a slave socket will keep the socket reserved
- // until the master socket is necessary, so that no
- // switch over occurs unless it's actually necessary.
- // Must necessarily connect to a slave, otherwise the
- // master connection will be available first.
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, false)
- // Ensure we're talking to a slave, and reserve the socket.
- result := M{}
- err = session.Run("ismaster", &result)
- c.Assert(err, IsNil)
- c.Assert(result["ismaster"], Equals, false)
- // Switch to a Strong session.
- session.SetMode(mgo.Strong, false)
- // Wait since the sync also uses sockets.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for cluster sync to finish...")
- time.Sleep(5e8)
- }
- // Slave socket should still be reserved.
- stats := mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 1)
- // But any operation will switch it to the master.
- result = M{}
- err = session.Run("ismaster", &result)
- c.Assert(err, IsNil)
- c.Assert(result["ismaster"], Equals, true)
- }
- func (s *S) TestModeMonotonicWriteOnIteration(c *C) {
- // Must necessarily connect to a slave, otherwise the
- // master connection will be available first.
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, false)
- c.Assert(session.Mode(), Equals, mgo.Monotonic)
- coll1 := session.DB("mydb").C("mycoll1")
- coll2 := session.DB("mydb").C("mycoll2")
- ns := []int{40, 41, 42, 43, 44, 45, 46}
- for _, n := range ns {
- err := coll1.Insert(M{"n": n})
- c.Assert(err, IsNil)
- }
- // Release master so we can grab a slave again.
- session.Refresh()
- // Wait until synchronization is done.
- for {
- n, err := coll1.Count()
- c.Assert(err, IsNil)
- if n == len(ns) {
- break
- }
- }
- iter := coll1.Find(nil).Batch(2).Iter()
- i := 0
- m := M{}
- for iter.Next(&m) {
- i++
- if i > 3 {
- err := coll2.Insert(M{"n": 47 + i})
- c.Assert(err, IsNil)
- }
- }
- c.Assert(i, Equals, len(ns))
- }
- func (s *S) TestModeEventual(c *C) {
- // Must necessarily connect to a slave, otherwise the
- // master connection will be available first.
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Eventual, false)
- c.Assert(session.Mode(), Equals, mgo.Eventual)
- result := M{}
- err = session.Run("ismaster", &result)
- c.Assert(err, IsNil)
- c.Assert(result["ismaster"], Equals, false)
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- result = M{}
- err = session.Run("ismaster", &result)
- c.Assert(err, IsNil)
- c.Assert(result["ismaster"], Equals, false)
- // Wait since the sync also uses sockets.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for cluster sync to finish...")
- time.Sleep(5e8)
- }
- stats := mgo.GetStats()
- c.Assert(stats.MasterConns, Equals, 1)
- c.Assert(stats.SlaveConns, Equals, 2)
- c.Assert(stats.SocketsInUse, Equals, 0)
- }
- func (s *S) TestModeEventualAfterStrong(c *C) {
- // Test that a strong session shifting to an eventual
- // one preserves the socket untouched.
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- // Insert something to force a connection to the master.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- session.SetMode(mgo.Eventual, false)
- // Wait since the sync also uses sockets.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for cluster sync to finish...")
- time.Sleep(5e8)
- }
- // Master socket should still be reserved.
- stats := mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 1)
- // Confirm it's the master even though it's Eventual by now.
- result := M{}
- cmd := session.DB("admin").C("$cmd")
- err = cmd.Find(M{"ismaster": 1}).One(&result)
- c.Assert(err, IsNil)
- c.Assert(result["ismaster"], Equals, true)
- session.SetMode(mgo.Eventual, true)
- stats = mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 0)
- }
- func (s *S) TestModeStrongFallover(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer session.Close()
- // With strong consistency, this will open a socket to the master.
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- // Kill the master.
- host := result.Host
- s.Stop(host)
- // This must fail, since the connection was broken.
- err = session.Run("serverStatus", result)
- c.Assert(err, Equals, io.EOF)
- // With strong consistency, it fails again until reset.
- err = session.Run("serverStatus", result)
- c.Assert(err, Equals, io.EOF)
- session.Refresh()
- // Now we should be able to talk to the new master.
- // Increase the timeout since this may take quite a while.
- session.SetSyncTimeout(3 * time.Minute)
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(result.Host, Not(Equals), host)
- // Insert some data to confirm it's indeed a master.
- err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
- c.Assert(err, IsNil)
- }
- func (s *S) TestModePrimaryHiccup(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer session.Close()
- // With strong consistency, this will open a socket to the master.
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- // Establish a few extra sessions to create spare sockets to
- // the master. This increases a bit the chances of getting an
- // incorrect cached socket.
- var sessions []*mgo.Session
- for i := 0; i < 20; i++ {
- sessions = append(sessions, session.Copy())
- err = sessions[len(sessions)-1].Run("serverStatus", result)
- c.Assert(err, IsNil)
- }
- for i := range sessions {
- sessions[i].Close()
- }
- // Kill the master, but bring it back immediatelly.
- host := result.Host
- s.Stop(host)
- s.StartAll()
- // This must fail, since the connection was broken.
- err = session.Run("serverStatus", result)
- c.Assert(err, Equals, io.EOF)
- // With strong consistency, it fails again until reset.
- err = session.Run("serverStatus", result)
- c.Assert(err, Equals, io.EOF)
- session.Refresh()
- // Now we should be able to talk to the new master.
- // Increase the timeout since this may take quite a while.
- session.SetSyncTimeout(3 * time.Minute)
- // Insert some data to confirm it's indeed a master.
- err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
- c.Assert(err, IsNil)
- }
- func (s *S) TestModeMonotonicFallover(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, true)
- // Insert something to force a switch to the master.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- // Wait a bit for this to be synchronized to slaves.
- time.Sleep(3 * time.Second)
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- // Kill the master.
- host := result.Host
- s.Stop(host)
- // This must fail, since the connection was broken.
- err = session.Run("serverStatus", result)
- c.Assert(err, Equals, io.EOF)
- // With monotonic consistency, it fails again until reset.
- err = session.Run("serverStatus", result)
- c.Assert(err, Equals, io.EOF)
- session.Refresh()
- // Now we should be able to talk to the new master.
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(result.Host, Not(Equals), host)
- }
- func (s *S) TestModeMonotonicWithSlaveFallover(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer session.Close()
- ssresult := &struct{ Host string }{}
- imresult := &struct{ IsMaster bool }{}
- // Figure the master while still using the strong session.
- err = session.Run("serverStatus", ssresult)
- c.Assert(err, IsNil)
- err = session.Run("isMaster", imresult)
- c.Assert(err, IsNil)
- master := ssresult.Host
- c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
- // Create new monotonic session with an explicit address to ensure
- // a slave is synchronized before the master, otherwise a connection
- // with the master may be used below for lack of other options.
- var addr string
- switch {
- case strings.HasSuffix(ssresult.Host, ":40021"):
- addr = "localhost:40022"
- case strings.HasSuffix(ssresult.Host, ":40022"):
- addr = "localhost:40021"
- case strings.HasSuffix(ssresult.Host, ":40023"):
- addr = "localhost:40021"
- default:
- c.Fatal("Unknown host: ", ssresult.Host)
- }
- session, err = mgo.Dial(addr)
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, true)
- // Check the address of the socket associated with the monotonic session.
- c.Log("Running serverStatus and isMaster with monotonic session")
- err = session.Run("serverStatus", ssresult)
- c.Assert(err, IsNil)
- err = session.Run("isMaster", imresult)
- c.Assert(err, IsNil)
- slave := ssresult.Host
- c.Assert(imresult.IsMaster, Equals, false, Commentf("%s is not a slave", slave))
- c.Assert(master, Not(Equals), slave)
- // Kill the master.
- s.Stop(master)
- // Session must still be good, since we were talking to a slave.
- err = session.Run("serverStatus", ssresult)
- c.Assert(err, IsNil)
- c.Assert(ssresult.Host, Equals, slave,
- Commentf("Monotonic session moved from %s to %s", slave, ssresult.Host))
- // If we try to insert something, it'll have to hold until the new
- // master is available to move the connection, and work correctly.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- // Must now be talking to the new master.
- err = session.Run("serverStatus", ssresult)
- c.Assert(err, IsNil)
- err = session.Run("isMaster", imresult)
- c.Assert(err, IsNil)
- c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
- // ... which is not the old one, since it's still dead.
- c.Assert(ssresult.Host, Not(Equals), master)
- }
- func (s *S) TestModeEventualFallover(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer session.Close()
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- master := result.Host
- session.SetMode(mgo.Eventual, true)
- // Should connect to the master when needed.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- // Wait a bit for this to be synchronized to slaves.
- time.Sleep(3 * time.Second)
- // Kill the master.
- s.Stop(master)
- // Should still work, with the new master now.
- coll = session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(result.Host, Not(Equals), master)
- }
- func (s *S) TestModeSecondaryJustPrimary(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Secondary, true)
- err = session.Ping()
- c.Assert(err, ErrorMatches, "no reachable servers")
- }
- func (s *S) TestModeSecondaryPreferredJustPrimary(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.SecondaryPreferred, true)
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- }
- func (s *S) TestModeSecondaryPreferredFallover(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session.Close()
- // Ensure secondaries are available for being picked up.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for cluster sync to finish...")
- time.Sleep(5e8)
- }
- session.SetMode(mgo.SecondaryPreferred, true)
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(supvName(result.Host), Not(Equals), "rs1a")
- secondary := result.Host
- // Should connect to the primary when needed.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- // Wait a bit for this to be synchronized to slaves.
- time.Sleep(3 * time.Second)
- // Kill the primary.
- s.Stop("localhost:40011")
- // It can still talk to the selected secondary.
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(result.Host, Equals, secondary)
- // But cannot speak to the primary until reset.
- coll = session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, Equals, io.EOF)
- session.Refresh()
- // Can still talk to a secondary.
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(supvName(result.Host), Not(Equals), "rs1a")
- s.StartAll()
- // Should now be able to talk to the primary again.
- coll = session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- }
- func (s *S) TestModePrimaryPreferredFallover(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.PrimaryPreferred, true)
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(supvName(result.Host), Equals, "rs1a")
- // Kill the primary.
- s.Stop("localhost:40011")
- // Should now fail as there was a primary socket in use already.
- err = session.Run("serverStatus", result)
- c.Assert(err, Equals, io.EOF)
- // Refresh so the reserved primary socket goes away.
- session.Refresh()
- // Should be able to talk to the secondary.
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- s.StartAll()
- // Should wait for the new primary to become available.
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- // And should use the new primary in general, as it is preferred.
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(supvName(result.Host), Equals, "rs1a")
- }
- func (s *S) TestModePrimaryFallover(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetSyncTimeout(3 * time.Second)
- session.SetMode(mgo.Primary, true)
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(supvName(result.Host), Equals, "rs1a")
- // Kill the primary.
- s.Stop("localhost:40011")
- session.Refresh()
- err = session.Ping()
- c.Assert(err, ErrorMatches, "no reachable servers")
- }
- func (s *S) TestModeSecondary(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Secondary, true)
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(supvName(result.Host), Not(Equals), "rs1a")
- secondary := result.Host
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(result.Host, Equals, secondary)
- }
- func (s *S) TestPreserveSocketCountOnSync(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session.Close()
- stats := mgo.GetStats()
- for stats.SocketsAlive != 3 {
- c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive)
- stats = mgo.GetStats()
- time.Sleep(5e8)
- }
- c.Assert(stats.SocketsAlive, Equals, 3)
- // Kill the master (with rs1, 'a' is always the master).
- s.Stop("localhost:40011")
- // Wait for the logic to run for a bit and bring it back.
- startedAll := make(chan bool)
- go func() {
- time.Sleep(5e9)
- s.StartAll()
- startedAll <- true
- }()
- // Do not allow the test to return before the goroutine above is done.
- defer func() {
- <-startedAll
- }()
- // Do an action to kick the resync logic in, and also to
- // wait until the cluster recognizes the server is back.
- result := struct{ Ok bool }{}
- err = session.Run("getLastError", &result)
- c.Assert(err, IsNil)
- c.Assert(result.Ok, Equals, true)
- for i := 0; i != 20; i++ {
- stats = mgo.GetStats()
- if stats.SocketsAlive == 3 {
- break
- }
- c.Logf("Waiting for 3 sockets alive, have %d", stats.SocketsAlive)
- time.Sleep(5e8)
- }
- // Ensure the number of sockets is preserved after syncing.
- stats = mgo.GetStats()
- c.Assert(stats.SocketsAlive, Equals, 3)
- c.Assert(stats.SocketsInUse, Equals, 1)
- c.Assert(stats.SocketRefs, Equals, 1)
- }
- // Connect to the master of a deployment with a single server,
- // run an insert, and then ensure the insert worked and that a
- // single connection was established.
- func (s *S) TestTopologySyncWithSingleMaster(c *C) {
- // Use hostname here rather than IP, to make things trickier.
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1, "b": 2})
- c.Assert(err, IsNil)
- // One connection used for discovery. Master socket recycled for
- // insert. Socket is reserved after insert.
- stats := mgo.GetStats()
- c.Assert(stats.MasterConns, Equals, 1)
- c.Assert(stats.SlaveConns, Equals, 0)
- c.Assert(stats.SocketsInUse, Equals, 1)
- // Refresh session and socket must be released.
- session.Refresh()
- stats = mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 0)
- }
- func (s *S) TestTopologySyncWithSlaveSeed(c *C) {
- // That's supposed to be a slave. Must run discovery
- // and find out master to insert successfully.
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- coll := session.DB("mydb").C("mycoll")
- coll.Insert(M{"a": 1, "b": 2})
- result := struct{ Ok bool }{}
- err = session.Run("getLastError", &result)
- c.Assert(err, IsNil)
- c.Assert(result.Ok, Equals, true)
- // One connection to each during discovery. Master
- // socket recycled for insert.
- stats := mgo.GetStats()
- c.Assert(stats.MasterConns, Equals, 1)
- c.Assert(stats.SlaveConns, Equals, 2)
- // Only one socket reference alive, in the master socket owned
- // by the above session.
- c.Assert(stats.SocketsInUse, Equals, 1)
- // Refresh it, and it must be gone.
- session.Refresh()
- stats = mgo.GetStats()
- c.Assert(stats.SocketsInUse, Equals, 0)
- }
- func (s *S) TestSyncTimeout(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- s.Stop("localhost:40001")
- timeout := 3 * time.Second
- session.SetSyncTimeout(timeout)
- started := time.Now()
- // Do something.
- result := struct{ Ok bool }{}
- err = session.Run("getLastError", &result)
- c.Assert(err, ErrorMatches, "no reachable servers")
- c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
- c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
- }
- func (s *S) TestDialWithTimeout(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- timeout := 2 * time.Second
- started := time.Now()
- // 40009 isn't used by the test servers.
- session, err := mgo.DialWithTimeout("localhost:40009", timeout)
- if session != nil {
- session.Close()
- }
- c.Assert(err, ErrorMatches, "no reachable servers")
- c.Assert(session, IsNil)
- c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
- c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
- }
- func (s *S) TestSocketTimeout(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- s.Freeze("localhost:40001")
- timeout := 3 * time.Second
- session.SetSocketTimeout(timeout)
- started := time.Now()
- // Do something.
- result := struct{ Ok bool }{}
- err = session.Run("getLastError", &result)
- c.Assert(err, ErrorMatches, ".*: i/o timeout")
- c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
- c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
- }
- func (s *S) TestSocketTimeoutOnDial(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- timeout := 1 * time.Second
- defer mgo.HackSyncSocketTimeout(timeout)()
- s.Freeze("localhost:40001")
- started := time.Now()
- session, err := mgo.DialWithTimeout("localhost:40001", timeout)
- c.Assert(err, ErrorMatches, "no reachable servers")
- c.Assert(session, IsNil)
- c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
- c.Assert(started.After(time.Now().Add(-20*time.Second)), Equals, true)
- }
- func (s *S) TestSocketTimeoutOnInactiveSocket(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- defer session.Close()
- timeout := 2 * time.Second
- session.SetSocketTimeout(timeout)
- // Do something that relies on the timeout and works.
- c.Assert(session.Ping(), IsNil)
- // Freeze and wait for the timeout to go by.
- s.Freeze("localhost:40001")
- time.Sleep(timeout + 500*time.Millisecond)
- s.Thaw("localhost:40001")
- // Do something again. The timeout above should not have killed
- // the socket as there was nothing to be done.
- c.Assert(session.Ping(), IsNil)
- }
- func (s *S) TestDialWithReplicaSetName(c *C) {
- seedLists := [][]string{
- // rs1 primary and rs2 primary
- []string{"localhost:40011", "localhost:40021"},
- // rs1 primary and rs2 secondary
- []string{"localhost:40011", "localhost:40022"},
- // rs1 secondary and rs2 primary
- []string{"localhost:40012", "localhost:40021"},
- // rs1 secondary and rs2 secondary
- []string{"localhost:40012", "localhost:40022"},
- }
- rs2Members := []string{":40021", ":40022", ":40023"}
- verifySyncedServers := func(session *mgo.Session, numServers int) {
- // wait for the server(s) to be synced
- for len(session.LiveServers()) != numServers {
- c.Log("Waiting for cluster sync to finish...")
- time.Sleep(5e8)
- }
- // ensure none of the rs2 set members are communicated with
- for _, addr := range session.LiveServers() {
- for _, rs2Member := range rs2Members {
- c.Assert(strings.HasSuffix(addr, rs2Member), Equals, false)
- }
- }
- }
- // only communication with rs1 members is expected
- for _, seedList := range seedLists {
- info := mgo.DialInfo{
- Addrs: seedList,
- Timeout: 5 * time.Second,
- ReplicaSetName: "rs1",
- }
- session, err := mgo.DialWithInfo(&info)
- c.Assert(err, IsNil)
- verifySyncedServers(session, 3)
- session.Close()
- info.Direct = true
- session, err = mgo.DialWithInfo(&info)
- c.Assert(err, IsNil)
- verifySyncedServers(session, 1)
- session.Close()
- connectionUrl := fmt.Sprintf("mongodb://%v/?replicaSet=rs1", strings.Join(seedList, ","))
- session, err = mgo.Dial(connectionUrl)
- c.Assert(err, IsNil)
- verifySyncedServers(session, 3)
- session.Close()
- connectionUrl += "&connect=direct"
- session, err = mgo.Dial(connectionUrl)
- c.Assert(err, IsNil)
- verifySyncedServers(session, 1)
- session.Close()
- }
- }
- func (s *S) TestDirect(c *C) {
- session, err := mgo.Dial("localhost:40012?connect=direct")
- c.Assert(err, IsNil)
- defer session.Close()
- // We know that server is a slave.
- session.SetMode(mgo.Monotonic, true)
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
- stats := mgo.GetStats()
- c.Assert(stats.SocketsAlive, Equals, 1)
- c.Assert(stats.SocketsInUse, Equals, 1)
- c.Assert(stats.SocketRefs, Equals, 1)
- // We've got no master, so it'll timeout.
- session.SetSyncTimeout(5e8 * time.Nanosecond)
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"test": 1})
- c.Assert(err, ErrorMatches, "no reachable servers")
- // Writing to the local database is okay.
- coll = session.DB("local").C("mycoll")
- defer coll.RemoveAll(nil)
- id := bson.NewObjectId()
- err = coll.Insert(M{"_id": id})
- c.Assert(err, IsNil)
- // Data was stored in the right server.
- n, err := coll.Find(M{"_id": id}).Count()
- c.Assert(err, IsNil)
- c.Assert(n, Equals, 1)
- // Server hasn't changed.
- result.Host = ""
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
- }
- func (s *S) TestDirectToUnknownStateMember(c *C) {
- session, err := mgo.Dial("localhost:40041?connect=direct")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, true)
- result := &struct{ Host string }{}
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
- // We've got no master, so it'll timeout.
- session.SetSyncTimeout(5e8 * time.Nanosecond)
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"test": 1})
- c.Assert(err, ErrorMatches, "no reachable servers")
- // Slave is still reachable.
- result.Host = ""
- err = session.Run("serverStatus", result)
- c.Assert(err, IsNil)
- c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
- }
- func (s *S) TestFailFast(c *C) {
- info := mgo.DialInfo{
- Addrs: []string{"localhost:99999"},
- Timeout: 5 * time.Second,
- FailFast: true,
- }
- started := time.Now()
- _, err := mgo.DialWithInfo(&info)
- c.Assert(err, ErrorMatches, "no reachable servers")
- c.Assert(started.After(time.Now().Add(-time.Second)), Equals, true)
- }
- func (s *S) countQueries(c *C, server string) (n int) {
- defer func() { c.Logf("Queries for %q: %d", server, n) }()
- session, err := mgo.Dial(server + "?connect=direct")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, true)
- var result struct {
- OpCounters struct {
- Query int
- }
- Metrics struct {
- Commands struct{ Find struct{ Total int } }
- }
- }
- err = session.Run("serverStatus", &result)
- c.Assert(err, IsNil)
- if s.versionAtLeast(3, 2) {
- return result.Metrics.Commands.Find.Total
- }
- return result.OpCounters.Query
- }
- func (s *S) countCommands(c *C, server, commandName string) (n int) {
- defer func() { c.Logf("Queries for %q: %d", server, n) }()
- session, err := mgo.Dial(server + "?connect=direct")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, true)
- var result struct {
- Metrics struct {
- Commands map[string]struct{ Total int }
- }
- }
- err = session.Run("serverStatus", &result)
- c.Assert(err, IsNil)
- return result.Metrics.Commands[commandName].Total
- }
- func (s *S) TestMonotonicSlaveOkFlagWithMongos(c *C) {
- session, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer session.Close()
- ssresult := &struct{ Host string }{}
- imresult := &struct{ IsMaster bool }{}
- // Figure the master while still using the strong session.
- err = session.Run("serverStatus", ssresult)
- c.Assert(err, IsNil)
- err = session.Run("isMaster", imresult)
- c.Assert(err, IsNil)
- master := ssresult.Host
- c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
- // Ensure mongos is aware about the current topology.
- s.Stop(":40201")
- s.StartAll()
- mongos, err := mgo.Dial("localhost:40202")
- c.Assert(err, IsNil)
- defer mongos.Close()
- // Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
- err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
- c.Assert(err, IsNil)
- // Wait until all servers see the data.
- for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
- session, err := mgo.Dial(addr + "?connect=direct")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, true)
- for i := 300; i >= 0; i-- {
- n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
- c.Assert(err, IsNil)
- if n == 1 {
- break
- }
- if i == 0 {
- c.Fatalf("Inserted data never reached " + addr)
- }
- time.Sleep(100 * time.Millisecond)
- }
- }
- // Collect op counters for everyone.
- q21a := s.countQueries(c, "localhost:40021")
- q22a := s.countQueries(c, "localhost:40022")
- q23a := s.countQueries(c, "localhost:40023")
- // Do a SlaveOk query through MongoS
- mongos.SetMode(mgo.Monotonic, true)
- coll := mongos.DB("mydb").C("mycoll")
- var result struct{ N int }
- for i := 0; i != 5; i++ {
- err = coll.Find(nil).One(&result)
- c.Assert(err, IsNil)
- c.Assert(result.N, Equals, 1)
- }
- // Collect op counters for everyone again.
- q21b := s.countQueries(c, "localhost:40021")
- q22b := s.countQueries(c, "localhost:40022")
- q23b := s.countQueries(c, "localhost:40023")
- var masterDelta, slaveDelta int
- switch hostPort(master) {
- case "40021":
- masterDelta = q21b - q21a
- slaveDelta = (q22b - q22a) + (q23b - q23a)
- case "40022":
- masterDelta = q22b - q22a
- slaveDelta = (q21b - q21a) + (q23b - q23a)
- case "40023":
- masterDelta = q23b - q23a
- slaveDelta = (q21b - q21a) + (q22b - q22a)
- default:
- c.Fatal("Uh?")
- }
- c.Check(masterDelta, Equals, 0) // Just the counting itself.
- c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above.
- }
- func (s *S) TestSecondaryModeWithMongos(c *C) {
- session, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer session.Close()
- ssresult := &struct{ Host string }{}
- imresult := &struct{ IsMaster bool }{}
- // Figure the master while still using the strong session.
- err = session.Run("serverStatus", ssresult)
- c.Assert(err, IsNil)
- err = session.Run("isMaster", imresult)
- c.Assert(err, IsNil)
- master := ssresult.Host
- c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
- // Ensure mongos is aware about the current topology.
- s.Stop(":40201")
- s.StartAll()
- mongos, err := mgo.Dial("localhost:40202")
- c.Assert(err, IsNil)
- defer mongos.Close()
- mongos.SetSyncTimeout(5 * time.Second)
- // Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
- err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
- c.Assert(err, IsNil)
- // Wait until all servers see the data.
- for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
- session, err := mgo.Dial(addr + "?connect=direct")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Monotonic, true)
- for i := 300; i >= 0; i-- {
- n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
- c.Assert(err, IsNil)
- if n == 1 {
- break
- }
- if i == 0 {
- c.Fatalf("Inserted data never reached " + addr)
- }
- time.Sleep(100 * time.Millisecond)
- }
- }
- // Collect op counters for everyone.
- q21a := s.countQueries(c, "localhost:40021")
- q22a := s.countQueries(c, "localhost:40022")
- q23a := s.countQueries(c, "localhost:40023")
- // Do a Secondary query through MongoS
- mongos.SetMode(mgo.Secondary, true)
- coll := mongos.DB("mydb").C("mycoll")
- var result struct{ N int }
- for i := 0; i != 5; i++ {
- err = coll.Find(nil).One(&result)
- c.Assert(err, IsNil)
- c.Assert(result.N, Equals, 1)
- }
- // Collect op counters for everyone again.
- q21b := s.countQueries(c, "localhost:40021")
- q22b := s.countQueries(c, "localhost:40022")
- q23b := s.countQueries(c, "localhost:40023")
- var masterDelta, slaveDelta int
- switch hostPort(master) {
- case "40021":
- masterDelta = q21b - q21a
- slaveDelta = (q22b - q22a) + (q23b - q23a)
- case "40022":
- masterDelta = q22b - q22a
- slaveDelta = (q21b - q21a) + (q23b - q23a)
- case "40023":
- masterDelta = q23b - q23a
- slaveDelta = (q21b - q21a) + (q22b - q22a)
- default:
- c.Fatal("Uh?")
- }
- c.Check(masterDelta, Equals, 0) // Just the counting itself.
- c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above.
- }
- func (s *S) TestSecondaryModeWithMongosInsert(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40202")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Secondary, true)
- session.SetSyncTimeout(4 * time.Second)
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(M{"a": 1})
- c.Assert(err, IsNil)
- var result struct{ A int }
- coll.Find(nil).One(&result)
- c.Assert(result.A, Equals, 1)
- }
- func (s *S) TestRemovalOfClusterMember(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- master, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer master.Close()
- // Wait for cluster to fully sync up.
- for i := 0; i < 10; i++ {
- if len(master.LiveServers()) == 3 {
- break
- }
- time.Sleep(5e8)
- }
- if len(master.LiveServers()) != 3 {
- c.Fatalf("Test started with bad cluster state: %v", master.LiveServers())
- }
- result := &struct {
- IsMaster bool
- Me string
- }{}
- slave := master.Copy()
- slave.SetMode(mgo.Monotonic, true) // Monotonic can hold a non-master socket persistently.
- err = slave.Run("isMaster", result)
- c.Assert(err, IsNil)
- c.Assert(result.IsMaster, Equals, false)
- slaveAddr := result.Me
- defer func() {
- config := map[string]string{
- "40021": `{_id: 1, host: "127.0.0.1:40021", priority: 1, tags: {rs2: "a"}}`,
- "40022": `{_id: 2, host: "127.0.0.1:40022", priority: 0, tags: {rs2: "b"}}`,
- "40023": `{_id: 3, host: "127.0.0.1:40023", priority: 0, tags: {rs2: "c"}}`,
- }
- master.Refresh()
- master.Run(bson.D{{"$eval", `rs.add(` + config[hostPort(slaveAddr)] + `)`}}, nil)
- master.Close()
- slave.Close()
- // Ensure suite syncs up with the changes before next test.
- s.Stop(":40201")
- s.StartAll()
- time.Sleep(8 * time.Second)
- // TODO Find a better way to find out when mongos is fully aware that all
- // servers are up. Without that follow up tests that depend on mongos will
- // break due to their expectation of things being in a working state.
- }()
- c.Logf("========== Removing slave: %s ==========", slaveAddr)
- master.Run(bson.D{{"$eval", `rs.remove("` + slaveAddr + `")`}}, nil)
- master.Refresh()
- // Give the cluster a moment to catch up by doing a roundtrip to the master.
- err = master.Ping()
- c.Assert(err, IsNil)
- time.Sleep(3e9)
- // This must fail since the slave has been taken off the cluster.
- err = slave.Ping()
- c.Assert(err, NotNil)
- for i := 0; i < 15; i++ {
- if len(master.LiveServers()) == 2 {
- break
- }
- time.Sleep(time.Second)
- }
- live := master.LiveServers()
- if len(live) != 2 {
- c.Errorf("Removed server still considered live: %#s", live)
- }
- c.Log("========== Test succeeded. ==========")
- }
- func (s *S) TestPoolLimitSimple(c *C) {
- for test := 0; test < 2; test++ {
- var session *mgo.Session
- var err error
- if test == 0 {
- session, err = mgo.Dial("localhost:40001")
- c.Assert(err, IsNil)
- session.SetPoolLimit(1)
- } else {
- session, err = mgo.Dial("localhost:40001?maxPoolSize=1")
- c.Assert(err, IsNil)
- }
- defer session.Close()
- // Put one socket in use.
- c.Assert(session.Ping(), IsNil)
- done := make(chan time.Duration)
- // Now block trying to get another one due to the pool limit.
- go func() {
- copy := session.Copy()
- defer copy.Close()
- started := time.Now()
- c.Check(copy.Ping(), IsNil)
- done <- time.Now().Sub(started)
- }()
- time.Sleep(300 * time.Millisecond)
- // Put the one socket back in the pool, freeing it for the copy.
- session.Refresh()
- delay := <-done
- c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
- }
- }
- func (s *S) TestPoolLimitMany(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- session, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session.Close()
- stats := mgo.GetStats()
- for stats.SocketsAlive != 3 {
- c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive)
- stats = mgo.GetStats()
- time.Sleep(5e8)
- }
- const poolLimit = 64
- session.SetPoolLimit(poolLimit)
- // Consume the whole limit for the master.
- var master []*mgo.Session
- for i := 0; i < poolLimit; i++ {
- s := session.Copy()
- defer s.Close()
- c.Assert(s.Ping(), IsNil)
- master = append(master, s)
- }
- before := time.Now()
- go func() {
- time.Sleep(3e9)
- master[0].Refresh()
- }()
- // Then, a single ping must block, since it would need another
- // connection to the master, over the limit. Once the goroutine
- // above releases its socket, it should move on.
- session.Ping()
- delay := time.Now().Sub(before)
- c.Assert(delay > 3e9, Equals, true)
- c.Assert(delay < 6e9, Equals, true)
- }
- func (s *S) TestSetModeEventualIterBug(c *C) {
- session1, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session1.Close()
- session1.SetMode(mgo.Eventual, false)
- coll1 := session1.DB("mydb").C("mycoll")
- const N = 100
- for i := 0; i < N; i++ {
- err = coll1.Insert(M{"_id": i})
- c.Assert(err, IsNil)
- }
- c.Logf("Waiting until secondary syncs")
- for {
- n, err := coll1.Count()
- c.Assert(err, IsNil)
- if n == N {
- c.Logf("Found all")
- break
- }
- }
- session2, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session2.Close()
- session2.SetMode(mgo.Eventual, false)
- coll2 := session2.DB("mydb").C("mycoll")
- i := 0
- iter := coll2.Find(nil).Batch(10).Iter()
- var result struct{}
- for iter.Next(&result) {
- i++
- }
- c.Assert(iter.Close(), Equals, nil)
- c.Assert(i, Equals, N)
- }
- func (s *S) TestCustomDialOld(c *C) {
- dials := make(chan bool, 16)
- dial := func(addr net.Addr) (net.Conn, error) {
- tcpaddr, ok := addr.(*net.TCPAddr)
- if !ok {
- return nil, fmt.Errorf("unexpected address type: %T", addr)
- }
- dials <- true
- return net.DialTCP("tcp", nil, tcpaddr)
- }
- info := mgo.DialInfo{
- Addrs: []string{"localhost:40012"},
- Dial: dial,
- }
- // Use hostname here rather than IP, to make things trickier.
- session, err := mgo.DialWithInfo(&info)
- c.Assert(err, IsNil)
- defer session.Close()
- const N = 3
- for i := 0; i < N; i++ {
- select {
- case <-dials:
- case <-time.After(5 * time.Second):
- c.Fatalf("expected %d dials, got %d", N, i)
- }
- }
- select {
- case <-dials:
- c.Fatalf("got more dials than expected")
- case <-time.After(100 * time.Millisecond):
- }
- }
- func (s *S) TestCustomDialNew(c *C) {
- dials := make(chan bool, 16)
- dial := func(addr *mgo.ServerAddr) (net.Conn, error) {
- dials <- true
- if addr.TCPAddr().Port == 40012 {
- c.Check(addr.String(), Equals, "localhost:40012")
- }
- return net.DialTCP("tcp", nil, addr.TCPAddr())
- }
- info := mgo.DialInfo{
- Addrs: []string{"localhost:40012"},
- DialServer: dial,
- }
- // Use hostname here rather than IP, to make things trickier.
- session, err := mgo.DialWithInfo(&info)
- c.Assert(err, IsNil)
- defer session.Close()
- const N = 3
- for i := 0; i < N; i++ {
- select {
- case <-dials:
- case <-time.After(5 * time.Second):
- c.Fatalf("expected %d dials, got %d", N, i)
- }
- }
- select {
- case <-dials:
- c.Fatalf("got more dials than expected")
- case <-time.After(100 * time.Millisecond):
- }
- }
- func (s *S) TestPrimaryShutdownOnAuthShard(c *C) {
- if *fast {
- c.Skip("-fast")
- }
- // Dial the shard.
- session, err := mgo.Dial("localhost:40203")
- c.Assert(err, IsNil)
- defer session.Close()
- // Login and insert something to make it more realistic.
- session.DB("admin").Login("root", "rapadura")
- coll := session.DB("mydb").C("mycoll")
- err = coll.Insert(bson.M{"n": 1})
- c.Assert(err, IsNil)
- // Dial the replica set to figure the master out.
- rs, err := mgo.Dial("root:rapadura@localhost:40031")
- c.Assert(err, IsNil)
- defer rs.Close()
- // With strong consistency, this will open a socket to the master.
- result := &struct{ Host string }{}
- err = rs.Run("serverStatus", result)
- c.Assert(err, IsNil)
- // Kill the master.
- host := result.Host
- s.Stop(host)
- // This must fail, since the connection was broken.
- err = rs.Run("serverStatus", result)
- c.Assert(err, Equals, io.EOF)
- // This won't work because the master just died.
- err = coll.Insert(bson.M{"n": 2})
- c.Assert(err, NotNil)
- // Refresh session and wait for re-election.
- session.Refresh()
- for i := 0; i < 60; i++ {
- err = coll.Insert(bson.M{"n": 3})
- if err == nil {
- break
- }
- c.Logf("Waiting for replica set to elect a new master. Last error: %v", err)
- time.Sleep(500 * time.Millisecond)
- }
- c.Assert(err, IsNil)
- count, err := coll.Count()
- c.Assert(count > 1, Equals, true)
- }
- func (s *S) TestNearestSecondary(c *C) {
- defer mgo.HackPingDelay(300 * time.Millisecond)()
- rs1a := "127.0.0.1:40011"
- rs1b := "127.0.0.1:40012"
- rs1c := "127.0.0.1:40013"
- s.Freeze(rs1b)
- session, err := mgo.Dial(rs1a)
- c.Assert(err, IsNil)
- defer session.Close()
- // Wait for the sync up to run through the first couple of servers.
- for len(session.LiveServers()) != 2 {
- c.Log("Waiting for two servers to be alive...")
- time.Sleep(100 * time.Millisecond)
- }
- // Extra delay to ensure the third server gets penalized.
- time.Sleep(500 * time.Millisecond)
- // Release third server.
- s.Thaw(rs1b)
- // Wait for it to come up.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for all servers to be alive...")
- time.Sleep(100 * time.Millisecond)
- }
- session.SetMode(mgo.Monotonic, true)
- var result struct{ Host string }
- // See which slave picks the line, several times to avoid chance.
- for i := 0; i < 10; i++ {
- session.Refresh()
- err = session.Run("serverStatus", &result)
- c.Assert(err, IsNil)
- c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
- }
- if *fast {
- // Don't hold back for several seconds.
- return
- }
- // Now hold the other server for long enough to penalize it.
- s.Freeze(rs1c)
- time.Sleep(5 * time.Second)
- s.Thaw(rs1c)
- // Wait for the ping to be processed.
- time.Sleep(500 * time.Millisecond)
- // Repeating the test should now pick the former server consistently.
- for i := 0; i < 10; i++ {
- session.Refresh()
- err = session.Run("serverStatus", &result)
- c.Assert(err, IsNil)
- c.Assert(hostPort(result.Host), Equals, hostPort(rs1b))
- }
- }
- func (s *S) TestNearestServer(c *C) {
- defer mgo.HackPingDelay(300 * time.Millisecond)()
- rs1a := "127.0.0.1:40011"
- rs1b := "127.0.0.1:40012"
- rs1c := "127.0.0.1:40013"
- session, err := mgo.Dial(rs1a)
- c.Assert(err, IsNil)
- defer session.Close()
- s.Freeze(rs1a)
- s.Freeze(rs1b)
- // Extra delay to ensure the first two servers get penalized.
- time.Sleep(500 * time.Millisecond)
- // Release them.
- s.Thaw(rs1a)
- s.Thaw(rs1b)
- // Wait for everyone to come up.
- for len(session.LiveServers()) != 3 {
- c.Log("Waiting for all servers to be alive...")
- time.Sleep(100 * time.Millisecond)
- }
- session.SetMode(mgo.Nearest, true)
- var result struct{ Host string }
- // See which server picks the line, several times to avoid chance.
- for i := 0; i < 10; i++ {
- session.Refresh()
- err = session.Run("serverStatus", &result)
- c.Assert(err, IsNil)
- c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
- }
- if *fast {
- // Don't hold back for several seconds.
- return
- }
- // Now hold the two secondaries for long enough to penalize them.
- s.Freeze(rs1b)
- s.Freeze(rs1c)
- time.Sleep(5 * time.Second)
- s.Thaw(rs1b)
- s.Thaw(rs1c)
- // Wait for the ping to be processed.
- time.Sleep(500 * time.Millisecond)
- // Repeating the test should now pick the primary server consistently.
- for i := 0; i < 10; i++ {
- session.Refresh()
- err = session.Run("serverStatus", &result)
- c.Assert(err, IsNil)
- c.Assert(hostPort(result.Host), Equals, hostPort(rs1a))
- }
- }
- func (s *S) TestConnectCloseConcurrency(c *C) {
- restore := mgo.HackPingDelay(500 * time.Millisecond)
- defer restore()
- var wg sync.WaitGroup
- const n = 500
- wg.Add(n)
- for i := 0; i < n; i++ {
- go func() {
- defer wg.Done()
- session, err := mgo.Dial("localhost:40001")
- if err != nil {
- c.Fatal(err)
- }
- time.Sleep(1)
- session.Close()
- }()
- }
- wg.Wait()
- }
- func (s *S) TestSelectServers(c *C) {
- if !s.versionAtLeast(2, 2) {
- c.Skip("read preferences introduced in 2.2")
- }
- session, err := mgo.Dial("localhost:40011")
- c.Assert(err, IsNil)
- defer session.Close()
- session.SetMode(mgo.Eventual, true)
- var result struct{ Host string }
- session.Refresh()
- session.SelectServers(bson.D{{"rs1", "b"}})
- err = session.Run("serverStatus", &result)
- c.Assert(err, IsNil)
- c.Assert(hostPort(result.Host), Equals, "40012")
- session.Refresh()
- session.SelectServers(bson.D{{"rs1", "c"}})
- err = session.Run("serverStatus", &result)
- c.Assert(err, IsNil)
- c.Assert(hostPort(result.Host), Equals, "40013")
- }
- func (s *S) TestSelectServersWithMongos(c *C) {
- if !s.versionAtLeast(2, 2) {
- c.Skip("read preferences introduced in 2.2")
- }
- session, err := mgo.Dial("localhost:40021")
- c.Assert(err, IsNil)
- defer session.Close()
- ssresult := &struct{ Host string }{}
- imresult := &struct{ IsMaster bool }{}
- // Figure the master while still using the strong session.
- err = session.Run("serverStatus", ssresult)
- c.Assert(err, IsNil)
- err = session.Run("isMaster", imresult)
- c.Assert(err, IsNil)
- master := ssresult.Host
- c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
- var slave1, slave2 string
- switch hostPort(master) {
- case "40021":
- slave1, slave2 = "b", "c"
- case "40022":
- slave1, slave2 = "a", "c"
- case "40023":
- slave1, slave2 = "a", "b"
- }
- // Collect op counters for everyone.
- q21a := s.countQueries(c, "localhost:40021")
- q22a := s.countQueries(c, "localhost:40022")
- q23a := s.countQueries(c, "localhost:40023")
- // Do a SlaveOk query through MongoS
- mongos, err := mgo.Dial("localhost:40202")
- c.Assert(err, IsNil)
- defer mongos.Close()
- mongos.SetMode(mgo.Monotonic, true)
- mongos.Refresh()
- mongos.SelectServers(bson.D{{"rs2", slave1}})
- coll := mongos.DB("mydb").C("mycoll")
- result := &struct{}{}
- for i := 0; i != 5; i++ {
- err := coll.Find(nil).One(result)
- c.Assert(err, Equals, mgo.ErrNotFound)
- }
- mongos.Refresh()
- mongos.SelectServers(bson.D{{"rs2", slave2}})
- coll = mongos.DB("mydb").C("mycoll")
- for i := 0; i != 7; i++ {
- err := coll.Find(nil).One(result)
- c.Assert(err, Equals, mgo.ErrNotFound)
- }
- // Collect op counters for everyone again.
- q21b := s.countQueries(c, "localhost:40021")
- q22b := s.countQueries(c, "localhost:40022")
- q23b := s.countQueries(c, "localhost:40023")
- switch hostPort(master) {
- case "40021":
- c.Check(q21b-q21a, Equals, 0)
- c.Check(q22b-q22a, Equals, 5)
- c.Check(q23b-q23a, Equals, 7)
- case "40022":
- c.Check(q21b-q21a, Equals, 5)
- c.Check(q22b-q22a, Equals, 0)
- c.Check(q23b-q23a, Equals, 7)
- case "40023":
- c.Check(q21b-q21a, Equals, 5)
- c.Check(q22b-q22a, Equals, 7)
- c.Check(q23b-q23a, Equals, 0)
- default:
- c.Fatal("Uh?")
- }
- }
- func (s *S) TestDoNotFallbackToMonotonic(c *C) {
- // There was a bug at some point that some functions were
- // falling back to Monotonic mode. This test ensures all listIndexes
- // commands go to the primary, as should happen since the session is
- // in Strong mode.
- if !s.versionAtLeast(3, 0) {
- c.Skip("command-counting logic depends on 3.0+")
- }
- session, err := mgo.Dial("localhost:40012")
- c.Assert(err, IsNil)
- defer session.Close()
- for i := 0; i < 15; i++ {
- q11a := s.countCommands(c, "localhost:40011", "listIndexes")
- q12a := s.countCommands(c, "localhost:40012", "listIndexes")
- q13a := s.countCommands(c, "localhost:40013", "listIndexes")
- _, err := session.DB("local").C("system.indexes").Indexes()
- c.Assert(err, IsNil)
- q11b := s.countCommands(c, "localhost:40011", "listIndexes")
- q12b := s.countCommands(c, "localhost:40012", "listIndexes")
- q13b := s.countCommands(c, "localhost:40013", "listIndexes")
- c.Assert(q11b, Equals, q11a+1)
- c.Assert(q12b, Equals, q12a)
- c.Assert(q13b, Equals, q13a)
- }
- }
|