cluster_test.go 54 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090
  1. // mgo - MongoDB driver for Go
  2. //
  3. // Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
  4. //
  5. // All rights reserved.
  6. //
  7. // Redistribution and use in source and binary forms, with or without
  8. // modification, are permitted provided that the following conditions are met:
  9. //
  10. // 1. Redistributions of source code must retain the above copyright notice, this
  11. // list of conditions and the following disclaimer.
  12. // 2. Redistributions in binary form must reproduce the above copyright notice,
  13. // this list of conditions and the following disclaimer in the documentation
  14. // and/or other materials provided with the distribution.
  15. //
  16. // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  17. // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  18. // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  19. // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
  20. // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  21. // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  22. // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  23. // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  24. // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  25. // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  26. package mgo_test
  27. import (
  28. "fmt"
  29. "io"
  30. "net"
  31. "strings"
  32. "sync"
  33. "time"
  34. . "gopkg.in/check.v1"
  35. "gopkg.in/mgo.v2"
  36. "gopkg.in/mgo.v2/bson"
  37. )
  38. func (s *S) TestNewSession(c *C) {
  39. session, err := mgo.Dial("localhost:40001")
  40. c.Assert(err, IsNil)
  41. defer session.Close()
  42. // Do a dummy operation to wait for connection.
  43. coll := session.DB("mydb").C("mycoll")
  44. err = coll.Insert(M{"_id": 1})
  45. c.Assert(err, IsNil)
  46. // Tweak safety and query settings to ensure other has copied those.
  47. session.SetSafe(nil)
  48. session.SetBatch(-1)
  49. other := session.New()
  50. defer other.Close()
  51. session.SetSafe(&mgo.Safe{})
  52. // Clone was copied while session was unsafe, so no errors.
  53. otherColl := other.DB("mydb").C("mycoll")
  54. err = otherColl.Insert(M{"_id": 1})
  55. c.Assert(err, IsNil)
  56. // Original session was made safe again.
  57. err = coll.Insert(M{"_id": 1})
  58. c.Assert(err, NotNil)
  59. // With New(), each session has its own socket now.
  60. stats := mgo.GetStats()
  61. c.Assert(stats.MasterConns, Equals, 2)
  62. c.Assert(stats.SocketsInUse, Equals, 2)
  63. // Ensure query parameters were cloned.
  64. err = otherColl.Insert(M{"_id": 2})
  65. c.Assert(err, IsNil)
  66. // Ping the database to ensure the nonce has been received already.
  67. c.Assert(other.Ping(), IsNil)
  68. mgo.ResetStats()
  69. iter := otherColl.Find(M{}).Iter()
  70. c.Assert(err, IsNil)
  71. m := M{}
  72. ok := iter.Next(m)
  73. c.Assert(ok, Equals, true)
  74. err = iter.Close()
  75. c.Assert(err, IsNil)
  76. // If Batch(-1) is in effect, a single document must have been received.
  77. stats = mgo.GetStats()
  78. c.Assert(stats.ReceivedDocs, Equals, 1)
  79. }
  80. func (s *S) TestCloneSession(c *C) {
  81. session, err := mgo.Dial("localhost:40001")
  82. c.Assert(err, IsNil)
  83. defer session.Close()
  84. // Do a dummy operation to wait for connection.
  85. coll := session.DB("mydb").C("mycoll")
  86. err = coll.Insert(M{"_id": 1})
  87. c.Assert(err, IsNil)
  88. // Tweak safety and query settings to ensure clone is copying those.
  89. session.SetSafe(nil)
  90. session.SetBatch(-1)
  91. clone := session.Clone()
  92. defer clone.Close()
  93. session.SetSafe(&mgo.Safe{})
  94. // Clone was copied while session was unsafe, so no errors.
  95. cloneColl := clone.DB("mydb").C("mycoll")
  96. err = cloneColl.Insert(M{"_id": 1})
  97. c.Assert(err, IsNil)
  98. // Original session was made safe again.
  99. err = coll.Insert(M{"_id": 1})
  100. c.Assert(err, NotNil)
  101. // With Clone(), same socket is shared between sessions now.
  102. stats := mgo.GetStats()
  103. c.Assert(stats.SocketsInUse, Equals, 1)
  104. c.Assert(stats.SocketRefs, Equals, 2)
  105. // Refreshing one of them should let the original socket go,
  106. // while preserving the safety settings.
  107. clone.Refresh()
  108. err = cloneColl.Insert(M{"_id": 1})
  109. c.Assert(err, IsNil)
  110. // Must have used another connection now.
  111. stats = mgo.GetStats()
  112. c.Assert(stats.SocketsInUse, Equals, 2)
  113. c.Assert(stats.SocketRefs, Equals, 2)
  114. // Ensure query parameters were cloned.
  115. err = cloneColl.Insert(M{"_id": 2})
  116. c.Assert(err, IsNil)
  117. // Ping the database to ensure the nonce has been received already.
  118. c.Assert(clone.Ping(), IsNil)
  119. mgo.ResetStats()
  120. iter := cloneColl.Find(M{}).Iter()
  121. c.Assert(err, IsNil)
  122. m := M{}
  123. ok := iter.Next(m)
  124. c.Assert(ok, Equals, true)
  125. err = iter.Close()
  126. c.Assert(err, IsNil)
  127. // If Batch(-1) is in effect, a single document must have been received.
  128. stats = mgo.GetStats()
  129. c.Assert(stats.ReceivedDocs, Equals, 1)
  130. }
  131. func (s *S) TestModeStrong(c *C) {
  132. session, err := mgo.Dial("localhost:40012")
  133. c.Assert(err, IsNil)
  134. defer session.Close()
  135. session.SetMode(mgo.Monotonic, false)
  136. session.SetMode(mgo.Strong, false)
  137. c.Assert(session.Mode(), Equals, mgo.Strong)
  138. result := M{}
  139. cmd := session.DB("admin").C("$cmd")
  140. err = cmd.Find(M{"ismaster": 1}).One(&result)
  141. c.Assert(err, IsNil)
  142. c.Assert(result["ismaster"], Equals, true)
  143. coll := session.DB("mydb").C("mycoll")
  144. err = coll.Insert(M{"a": 1})
  145. c.Assert(err, IsNil)
  146. // Wait since the sync also uses sockets.
  147. for len(session.LiveServers()) != 3 {
  148. c.Log("Waiting for cluster sync to finish...")
  149. time.Sleep(5e8)
  150. }
  151. stats := mgo.GetStats()
  152. c.Assert(stats.MasterConns, Equals, 1)
  153. c.Assert(stats.SlaveConns, Equals, 2)
  154. c.Assert(stats.SocketsInUse, Equals, 1)
  155. session.SetMode(mgo.Strong, true)
  156. stats = mgo.GetStats()
  157. c.Assert(stats.SocketsInUse, Equals, 0)
  158. }
  159. func (s *S) TestModeMonotonic(c *C) {
  160. // Must necessarily connect to a slave, otherwise the
  161. // master connection will be available first.
  162. session, err := mgo.Dial("localhost:40012")
  163. c.Assert(err, IsNil)
  164. defer session.Close()
  165. session.SetMode(mgo.Monotonic, false)
  166. c.Assert(session.Mode(), Equals, mgo.Monotonic)
  167. var result struct{ IsMaster bool }
  168. cmd := session.DB("admin").C("$cmd")
  169. err = cmd.Find(M{"ismaster": 1}).One(&result)
  170. c.Assert(err, IsNil)
  171. c.Assert(result.IsMaster, Equals, false)
  172. coll := session.DB("mydb").C("mycoll")
  173. err = coll.Insert(M{"a": 1})
  174. c.Assert(err, IsNil)
  175. err = cmd.Find(M{"ismaster": 1}).One(&result)
  176. c.Assert(err, IsNil)
  177. c.Assert(result.IsMaster, Equals, true)
  178. // Wait since the sync also uses sockets.
  179. for len(session.LiveServers()) != 3 {
  180. c.Log("Waiting for cluster sync to finish...")
  181. time.Sleep(5e8)
  182. }
  183. stats := mgo.GetStats()
  184. c.Assert(stats.MasterConns, Equals, 1)
  185. c.Assert(stats.SlaveConns, Equals, 2)
  186. c.Assert(stats.SocketsInUse, Equals, 2)
  187. session.SetMode(mgo.Monotonic, true)
  188. stats = mgo.GetStats()
  189. c.Assert(stats.SocketsInUse, Equals, 0)
  190. }
  191. func (s *S) TestModeMonotonicAfterStrong(c *C) {
  192. // Test that a strong session shifting to a monotonic
  193. // one preserves the socket untouched.
  194. session, err := mgo.Dial("localhost:40012")
  195. c.Assert(err, IsNil)
  196. defer session.Close()
  197. // Insert something to force a connection to the master.
  198. coll := session.DB("mydb").C("mycoll")
  199. err = coll.Insert(M{"a": 1})
  200. c.Assert(err, IsNil)
  201. session.SetMode(mgo.Monotonic, false)
  202. // Wait since the sync also uses sockets.
  203. for len(session.LiveServers()) != 3 {
  204. c.Log("Waiting for cluster sync to finish...")
  205. time.Sleep(5e8)
  206. }
  207. // Master socket should still be reserved.
  208. stats := mgo.GetStats()
  209. c.Assert(stats.SocketsInUse, Equals, 1)
  210. // Confirm it's the master even though it's Monotonic by now.
  211. result := M{}
  212. cmd := session.DB("admin").C("$cmd")
  213. err = cmd.Find(M{"ismaster": 1}).One(&result)
  214. c.Assert(err, IsNil)
  215. c.Assert(result["ismaster"], Equals, true)
  216. }
  217. func (s *S) TestModeStrongAfterMonotonic(c *C) {
  218. // Test that shifting from Monotonic to Strong while
  219. // using a slave socket will keep the socket reserved
  220. // until the master socket is necessary, so that no
  221. // switch over occurs unless it's actually necessary.
  222. // Must necessarily connect to a slave, otherwise the
  223. // master connection will be available first.
  224. session, err := mgo.Dial("localhost:40012")
  225. c.Assert(err, IsNil)
  226. defer session.Close()
  227. session.SetMode(mgo.Monotonic, false)
  228. // Ensure we're talking to a slave, and reserve the socket.
  229. result := M{}
  230. err = session.Run("ismaster", &result)
  231. c.Assert(err, IsNil)
  232. c.Assert(result["ismaster"], Equals, false)
  233. // Switch to a Strong session.
  234. session.SetMode(mgo.Strong, false)
  235. // Wait since the sync also uses sockets.
  236. for len(session.LiveServers()) != 3 {
  237. c.Log("Waiting for cluster sync to finish...")
  238. time.Sleep(5e8)
  239. }
  240. // Slave socket should still be reserved.
  241. stats := mgo.GetStats()
  242. c.Assert(stats.SocketsInUse, Equals, 1)
  243. // But any operation will switch it to the master.
  244. result = M{}
  245. err = session.Run("ismaster", &result)
  246. c.Assert(err, IsNil)
  247. c.Assert(result["ismaster"], Equals, true)
  248. }
  249. func (s *S) TestModeMonotonicWriteOnIteration(c *C) {
  250. // Must necessarily connect to a slave, otherwise the
  251. // master connection will be available first.
  252. session, err := mgo.Dial("localhost:40012")
  253. c.Assert(err, IsNil)
  254. defer session.Close()
  255. session.SetMode(mgo.Monotonic, false)
  256. c.Assert(session.Mode(), Equals, mgo.Monotonic)
  257. coll1 := session.DB("mydb").C("mycoll1")
  258. coll2 := session.DB("mydb").C("mycoll2")
  259. ns := []int{40, 41, 42, 43, 44, 45, 46}
  260. for _, n := range ns {
  261. err := coll1.Insert(M{"n": n})
  262. c.Assert(err, IsNil)
  263. }
  264. // Release master so we can grab a slave again.
  265. session.Refresh()
  266. // Wait until synchronization is done.
  267. for {
  268. n, err := coll1.Count()
  269. c.Assert(err, IsNil)
  270. if n == len(ns) {
  271. break
  272. }
  273. }
  274. iter := coll1.Find(nil).Batch(2).Iter()
  275. i := 0
  276. m := M{}
  277. for iter.Next(&m) {
  278. i++
  279. if i > 3 {
  280. err := coll2.Insert(M{"n": 47 + i})
  281. c.Assert(err, IsNil)
  282. }
  283. }
  284. c.Assert(i, Equals, len(ns))
  285. }
  286. func (s *S) TestModeEventual(c *C) {
  287. // Must necessarily connect to a slave, otherwise the
  288. // master connection will be available first.
  289. session, err := mgo.Dial("localhost:40012")
  290. c.Assert(err, IsNil)
  291. defer session.Close()
  292. session.SetMode(mgo.Eventual, false)
  293. c.Assert(session.Mode(), Equals, mgo.Eventual)
  294. result := M{}
  295. err = session.Run("ismaster", &result)
  296. c.Assert(err, IsNil)
  297. c.Assert(result["ismaster"], Equals, false)
  298. coll := session.DB("mydb").C("mycoll")
  299. err = coll.Insert(M{"a": 1})
  300. c.Assert(err, IsNil)
  301. result = M{}
  302. err = session.Run("ismaster", &result)
  303. c.Assert(err, IsNil)
  304. c.Assert(result["ismaster"], Equals, false)
  305. // Wait since the sync also uses sockets.
  306. for len(session.LiveServers()) != 3 {
  307. c.Log("Waiting for cluster sync to finish...")
  308. time.Sleep(5e8)
  309. }
  310. stats := mgo.GetStats()
  311. c.Assert(stats.MasterConns, Equals, 1)
  312. c.Assert(stats.SlaveConns, Equals, 2)
  313. c.Assert(stats.SocketsInUse, Equals, 0)
  314. }
  315. func (s *S) TestModeEventualAfterStrong(c *C) {
  316. // Test that a strong session shifting to an eventual
  317. // one preserves the socket untouched.
  318. session, err := mgo.Dial("localhost:40012")
  319. c.Assert(err, IsNil)
  320. defer session.Close()
  321. // Insert something to force a connection to the master.
  322. coll := session.DB("mydb").C("mycoll")
  323. err = coll.Insert(M{"a": 1})
  324. c.Assert(err, IsNil)
  325. session.SetMode(mgo.Eventual, false)
  326. // Wait since the sync also uses sockets.
  327. for len(session.LiveServers()) != 3 {
  328. c.Log("Waiting for cluster sync to finish...")
  329. time.Sleep(5e8)
  330. }
  331. // Master socket should still be reserved.
  332. stats := mgo.GetStats()
  333. c.Assert(stats.SocketsInUse, Equals, 1)
  334. // Confirm it's the master even though it's Eventual by now.
  335. result := M{}
  336. cmd := session.DB("admin").C("$cmd")
  337. err = cmd.Find(M{"ismaster": 1}).One(&result)
  338. c.Assert(err, IsNil)
  339. c.Assert(result["ismaster"], Equals, true)
  340. session.SetMode(mgo.Eventual, true)
  341. stats = mgo.GetStats()
  342. c.Assert(stats.SocketsInUse, Equals, 0)
  343. }
  344. func (s *S) TestModeStrongFallover(c *C) {
  345. if *fast {
  346. c.Skip("-fast")
  347. }
  348. session, err := mgo.Dial("localhost:40021")
  349. c.Assert(err, IsNil)
  350. defer session.Close()
  351. // With strong consistency, this will open a socket to the master.
  352. result := &struct{ Host string }{}
  353. err = session.Run("serverStatus", result)
  354. c.Assert(err, IsNil)
  355. // Kill the master.
  356. host := result.Host
  357. s.Stop(host)
  358. // This must fail, since the connection was broken.
  359. err = session.Run("serverStatus", result)
  360. c.Assert(err, Equals, io.EOF)
  361. // With strong consistency, it fails again until reset.
  362. err = session.Run("serverStatus", result)
  363. c.Assert(err, Equals, io.EOF)
  364. session.Refresh()
  365. // Now we should be able to talk to the new master.
  366. // Increase the timeout since this may take quite a while.
  367. session.SetSyncTimeout(3 * time.Minute)
  368. err = session.Run("serverStatus", result)
  369. c.Assert(err, IsNil)
  370. c.Assert(result.Host, Not(Equals), host)
  371. // Insert some data to confirm it's indeed a master.
  372. err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
  373. c.Assert(err, IsNil)
  374. }
  375. func (s *S) TestModePrimaryHiccup(c *C) {
  376. if *fast {
  377. c.Skip("-fast")
  378. }
  379. session, err := mgo.Dial("localhost:40021")
  380. c.Assert(err, IsNil)
  381. defer session.Close()
  382. // With strong consistency, this will open a socket to the master.
  383. result := &struct{ Host string }{}
  384. err = session.Run("serverStatus", result)
  385. c.Assert(err, IsNil)
  386. // Establish a few extra sessions to create spare sockets to
  387. // the master. This increases a bit the chances of getting an
  388. // incorrect cached socket.
  389. var sessions []*mgo.Session
  390. for i := 0; i < 20; i++ {
  391. sessions = append(sessions, session.Copy())
  392. err = sessions[len(sessions)-1].Run("serverStatus", result)
  393. c.Assert(err, IsNil)
  394. }
  395. for i := range sessions {
  396. sessions[i].Close()
  397. }
  398. // Kill the master, but bring it back immediatelly.
  399. host := result.Host
  400. s.Stop(host)
  401. s.StartAll()
  402. // This must fail, since the connection was broken.
  403. err = session.Run("serverStatus", result)
  404. c.Assert(err, Equals, io.EOF)
  405. // With strong consistency, it fails again until reset.
  406. err = session.Run("serverStatus", result)
  407. c.Assert(err, Equals, io.EOF)
  408. session.Refresh()
  409. // Now we should be able to talk to the new master.
  410. // Increase the timeout since this may take quite a while.
  411. session.SetSyncTimeout(3 * time.Minute)
  412. // Insert some data to confirm it's indeed a master.
  413. err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
  414. c.Assert(err, IsNil)
  415. }
  416. func (s *S) TestModeMonotonicFallover(c *C) {
  417. if *fast {
  418. c.Skip("-fast")
  419. }
  420. session, err := mgo.Dial("localhost:40021")
  421. c.Assert(err, IsNil)
  422. defer session.Close()
  423. session.SetMode(mgo.Monotonic, true)
  424. // Insert something to force a switch to the master.
  425. coll := session.DB("mydb").C("mycoll")
  426. err = coll.Insert(M{"a": 1})
  427. c.Assert(err, IsNil)
  428. // Wait a bit for this to be synchronized to slaves.
  429. time.Sleep(3 * time.Second)
  430. result := &struct{ Host string }{}
  431. err = session.Run("serverStatus", result)
  432. c.Assert(err, IsNil)
  433. // Kill the master.
  434. host := result.Host
  435. s.Stop(host)
  436. // This must fail, since the connection was broken.
  437. err = session.Run("serverStatus", result)
  438. c.Assert(err, Equals, io.EOF)
  439. // With monotonic consistency, it fails again until reset.
  440. err = session.Run("serverStatus", result)
  441. c.Assert(err, Equals, io.EOF)
  442. session.Refresh()
  443. // Now we should be able to talk to the new master.
  444. err = session.Run("serverStatus", result)
  445. c.Assert(err, IsNil)
  446. c.Assert(result.Host, Not(Equals), host)
  447. }
  448. func (s *S) TestModeMonotonicWithSlaveFallover(c *C) {
  449. if *fast {
  450. c.Skip("-fast")
  451. }
  452. session, err := mgo.Dial("localhost:40021")
  453. c.Assert(err, IsNil)
  454. defer session.Close()
  455. ssresult := &struct{ Host string }{}
  456. imresult := &struct{ IsMaster bool }{}
  457. // Figure the master while still using the strong session.
  458. err = session.Run("serverStatus", ssresult)
  459. c.Assert(err, IsNil)
  460. err = session.Run("isMaster", imresult)
  461. c.Assert(err, IsNil)
  462. master := ssresult.Host
  463. c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
  464. // Create new monotonic session with an explicit address to ensure
  465. // a slave is synchronized before the master, otherwise a connection
  466. // with the master may be used below for lack of other options.
  467. var addr string
  468. switch {
  469. case strings.HasSuffix(ssresult.Host, ":40021"):
  470. addr = "localhost:40022"
  471. case strings.HasSuffix(ssresult.Host, ":40022"):
  472. addr = "localhost:40021"
  473. case strings.HasSuffix(ssresult.Host, ":40023"):
  474. addr = "localhost:40021"
  475. default:
  476. c.Fatal("Unknown host: ", ssresult.Host)
  477. }
  478. session, err = mgo.Dial(addr)
  479. c.Assert(err, IsNil)
  480. defer session.Close()
  481. session.SetMode(mgo.Monotonic, true)
  482. // Check the address of the socket associated with the monotonic session.
  483. c.Log("Running serverStatus and isMaster with monotonic session")
  484. err = session.Run("serverStatus", ssresult)
  485. c.Assert(err, IsNil)
  486. err = session.Run("isMaster", imresult)
  487. c.Assert(err, IsNil)
  488. slave := ssresult.Host
  489. c.Assert(imresult.IsMaster, Equals, false, Commentf("%s is not a slave", slave))
  490. c.Assert(master, Not(Equals), slave)
  491. // Kill the master.
  492. s.Stop(master)
  493. // Session must still be good, since we were talking to a slave.
  494. err = session.Run("serverStatus", ssresult)
  495. c.Assert(err, IsNil)
  496. c.Assert(ssresult.Host, Equals, slave,
  497. Commentf("Monotonic session moved from %s to %s", slave, ssresult.Host))
  498. // If we try to insert something, it'll have to hold until the new
  499. // master is available to move the connection, and work correctly.
  500. coll := session.DB("mydb").C("mycoll")
  501. err = coll.Insert(M{"a": 1})
  502. c.Assert(err, IsNil)
  503. // Must now be talking to the new master.
  504. err = session.Run("serverStatus", ssresult)
  505. c.Assert(err, IsNil)
  506. err = session.Run("isMaster", imresult)
  507. c.Assert(err, IsNil)
  508. c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
  509. // ... which is not the old one, since it's still dead.
  510. c.Assert(ssresult.Host, Not(Equals), master)
  511. }
  512. func (s *S) TestModeEventualFallover(c *C) {
  513. if *fast {
  514. c.Skip("-fast")
  515. }
  516. session, err := mgo.Dial("localhost:40021")
  517. c.Assert(err, IsNil)
  518. defer session.Close()
  519. result := &struct{ Host string }{}
  520. err = session.Run("serverStatus", result)
  521. c.Assert(err, IsNil)
  522. master := result.Host
  523. session.SetMode(mgo.Eventual, true)
  524. // Should connect to the master when needed.
  525. coll := session.DB("mydb").C("mycoll")
  526. err = coll.Insert(M{"a": 1})
  527. c.Assert(err, IsNil)
  528. // Wait a bit for this to be synchronized to slaves.
  529. time.Sleep(3 * time.Second)
  530. // Kill the master.
  531. s.Stop(master)
  532. // Should still work, with the new master now.
  533. coll = session.DB("mydb").C("mycoll")
  534. err = coll.Insert(M{"a": 1})
  535. c.Assert(err, IsNil)
  536. err = session.Run("serverStatus", result)
  537. c.Assert(err, IsNil)
  538. c.Assert(result.Host, Not(Equals), master)
  539. }
  540. func (s *S) TestModeSecondaryJustPrimary(c *C) {
  541. if *fast {
  542. c.Skip("-fast")
  543. }
  544. session, err := mgo.Dial("localhost:40001")
  545. c.Assert(err, IsNil)
  546. defer session.Close()
  547. session.SetMode(mgo.Secondary, true)
  548. err = session.Ping()
  549. c.Assert(err, ErrorMatches, "no reachable servers")
  550. }
  551. func (s *S) TestModeSecondaryPreferredJustPrimary(c *C) {
  552. if *fast {
  553. c.Skip("-fast")
  554. }
  555. session, err := mgo.Dial("localhost:40001")
  556. c.Assert(err, IsNil)
  557. defer session.Close()
  558. session.SetMode(mgo.SecondaryPreferred, true)
  559. result := &struct{ Host string }{}
  560. err = session.Run("serverStatus", result)
  561. c.Assert(err, IsNil)
  562. }
  563. func (s *S) TestModeSecondaryPreferredFallover(c *C) {
  564. if *fast {
  565. c.Skip("-fast")
  566. }
  567. session, err := mgo.Dial("localhost:40011")
  568. c.Assert(err, IsNil)
  569. defer session.Close()
  570. // Ensure secondaries are available for being picked up.
  571. for len(session.LiveServers()) != 3 {
  572. c.Log("Waiting for cluster sync to finish...")
  573. time.Sleep(5e8)
  574. }
  575. session.SetMode(mgo.SecondaryPreferred, true)
  576. result := &struct{ Host string }{}
  577. err = session.Run("serverStatus", result)
  578. c.Assert(err, IsNil)
  579. c.Assert(supvName(result.Host), Not(Equals), "rs1a")
  580. secondary := result.Host
  581. // Should connect to the primary when needed.
  582. coll := session.DB("mydb").C("mycoll")
  583. err = coll.Insert(M{"a": 1})
  584. c.Assert(err, IsNil)
  585. // Wait a bit for this to be synchronized to slaves.
  586. time.Sleep(3 * time.Second)
  587. // Kill the primary.
  588. s.Stop("localhost:40011")
  589. // It can still talk to the selected secondary.
  590. err = session.Run("serverStatus", result)
  591. c.Assert(err, IsNil)
  592. c.Assert(result.Host, Equals, secondary)
  593. // But cannot speak to the primary until reset.
  594. coll = session.DB("mydb").C("mycoll")
  595. err = coll.Insert(M{"a": 1})
  596. c.Assert(err, Equals, io.EOF)
  597. session.Refresh()
  598. // Can still talk to a secondary.
  599. err = session.Run("serverStatus", result)
  600. c.Assert(err, IsNil)
  601. c.Assert(supvName(result.Host), Not(Equals), "rs1a")
  602. s.StartAll()
  603. // Should now be able to talk to the primary again.
  604. coll = session.DB("mydb").C("mycoll")
  605. err = coll.Insert(M{"a": 1})
  606. c.Assert(err, IsNil)
  607. }
  608. func (s *S) TestModePrimaryPreferredFallover(c *C) {
  609. if *fast {
  610. c.Skip("-fast")
  611. }
  612. session, err := mgo.Dial("localhost:40011")
  613. c.Assert(err, IsNil)
  614. defer session.Close()
  615. session.SetMode(mgo.PrimaryPreferred, true)
  616. result := &struct{ Host string }{}
  617. err = session.Run("serverStatus", result)
  618. c.Assert(err, IsNil)
  619. c.Assert(supvName(result.Host), Equals, "rs1a")
  620. // Kill the primary.
  621. s.Stop("localhost:40011")
  622. // Should now fail as there was a primary socket in use already.
  623. err = session.Run("serverStatus", result)
  624. c.Assert(err, Equals, io.EOF)
  625. // Refresh so the reserved primary socket goes away.
  626. session.Refresh()
  627. // Should be able to talk to the secondary.
  628. err = session.Run("serverStatus", result)
  629. c.Assert(err, IsNil)
  630. s.StartAll()
  631. // Should wait for the new primary to become available.
  632. coll := session.DB("mydb").C("mycoll")
  633. err = coll.Insert(M{"a": 1})
  634. c.Assert(err, IsNil)
  635. // And should use the new primary in general, as it is preferred.
  636. err = session.Run("serverStatus", result)
  637. c.Assert(err, IsNil)
  638. c.Assert(supvName(result.Host), Equals, "rs1a")
  639. }
  640. func (s *S) TestModePrimaryFallover(c *C) {
  641. if *fast {
  642. c.Skip("-fast")
  643. }
  644. session, err := mgo.Dial("localhost:40011")
  645. c.Assert(err, IsNil)
  646. defer session.Close()
  647. session.SetSyncTimeout(3 * time.Second)
  648. session.SetMode(mgo.Primary, true)
  649. result := &struct{ Host string }{}
  650. err = session.Run("serverStatus", result)
  651. c.Assert(err, IsNil)
  652. c.Assert(supvName(result.Host), Equals, "rs1a")
  653. // Kill the primary.
  654. s.Stop("localhost:40011")
  655. session.Refresh()
  656. err = session.Ping()
  657. c.Assert(err, ErrorMatches, "no reachable servers")
  658. }
  659. func (s *S) TestModeSecondary(c *C) {
  660. if *fast {
  661. c.Skip("-fast")
  662. }
  663. session, err := mgo.Dial("localhost:40011")
  664. c.Assert(err, IsNil)
  665. defer session.Close()
  666. session.SetMode(mgo.Secondary, true)
  667. result := &struct{ Host string }{}
  668. err = session.Run("serverStatus", result)
  669. c.Assert(err, IsNil)
  670. c.Assert(supvName(result.Host), Not(Equals), "rs1a")
  671. secondary := result.Host
  672. coll := session.DB("mydb").C("mycoll")
  673. err = coll.Insert(M{"a": 1})
  674. c.Assert(err, IsNil)
  675. err = session.Run("serverStatus", result)
  676. c.Assert(err, IsNil)
  677. c.Assert(result.Host, Equals, secondary)
  678. }
  679. func (s *S) TestPreserveSocketCountOnSync(c *C) {
  680. if *fast {
  681. c.Skip("-fast")
  682. }
  683. session, err := mgo.Dial("localhost:40011")
  684. c.Assert(err, IsNil)
  685. defer session.Close()
  686. stats := mgo.GetStats()
  687. for stats.SocketsAlive != 3 {
  688. c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive)
  689. stats = mgo.GetStats()
  690. time.Sleep(5e8)
  691. }
  692. c.Assert(stats.SocketsAlive, Equals, 3)
  693. // Kill the master (with rs1, 'a' is always the master).
  694. s.Stop("localhost:40011")
  695. // Wait for the logic to run for a bit and bring it back.
  696. startedAll := make(chan bool)
  697. go func() {
  698. time.Sleep(5e9)
  699. s.StartAll()
  700. startedAll <- true
  701. }()
  702. // Do not allow the test to return before the goroutine above is done.
  703. defer func() {
  704. <-startedAll
  705. }()
  706. // Do an action to kick the resync logic in, and also to
  707. // wait until the cluster recognizes the server is back.
  708. result := struct{ Ok bool }{}
  709. err = session.Run("getLastError", &result)
  710. c.Assert(err, IsNil)
  711. c.Assert(result.Ok, Equals, true)
  712. for i := 0; i != 20; i++ {
  713. stats = mgo.GetStats()
  714. if stats.SocketsAlive == 3 {
  715. break
  716. }
  717. c.Logf("Waiting for 3 sockets alive, have %d", stats.SocketsAlive)
  718. time.Sleep(5e8)
  719. }
  720. // Ensure the number of sockets is preserved after syncing.
  721. stats = mgo.GetStats()
  722. c.Assert(stats.SocketsAlive, Equals, 3)
  723. c.Assert(stats.SocketsInUse, Equals, 1)
  724. c.Assert(stats.SocketRefs, Equals, 1)
  725. }
  726. // Connect to the master of a deployment with a single server,
  727. // run an insert, and then ensure the insert worked and that a
  728. // single connection was established.
  729. func (s *S) TestTopologySyncWithSingleMaster(c *C) {
  730. // Use hostname here rather than IP, to make things trickier.
  731. session, err := mgo.Dial("localhost:40001")
  732. c.Assert(err, IsNil)
  733. defer session.Close()
  734. coll := session.DB("mydb").C("mycoll")
  735. err = coll.Insert(M{"a": 1, "b": 2})
  736. c.Assert(err, IsNil)
  737. // One connection used for discovery. Master socket recycled for
  738. // insert. Socket is reserved after insert.
  739. stats := mgo.GetStats()
  740. c.Assert(stats.MasterConns, Equals, 1)
  741. c.Assert(stats.SlaveConns, Equals, 0)
  742. c.Assert(stats.SocketsInUse, Equals, 1)
  743. // Refresh session and socket must be released.
  744. session.Refresh()
  745. stats = mgo.GetStats()
  746. c.Assert(stats.SocketsInUse, Equals, 0)
  747. }
  748. func (s *S) TestTopologySyncWithSlaveSeed(c *C) {
  749. // That's supposed to be a slave. Must run discovery
  750. // and find out master to insert successfully.
  751. session, err := mgo.Dial("localhost:40012")
  752. c.Assert(err, IsNil)
  753. defer session.Close()
  754. coll := session.DB("mydb").C("mycoll")
  755. coll.Insert(M{"a": 1, "b": 2})
  756. result := struct{ Ok bool }{}
  757. err = session.Run("getLastError", &result)
  758. c.Assert(err, IsNil)
  759. c.Assert(result.Ok, Equals, true)
  760. // One connection to each during discovery. Master
  761. // socket recycled for insert.
  762. stats := mgo.GetStats()
  763. c.Assert(stats.MasterConns, Equals, 1)
  764. c.Assert(stats.SlaveConns, Equals, 2)
  765. // Only one socket reference alive, in the master socket owned
  766. // by the above session.
  767. c.Assert(stats.SocketsInUse, Equals, 1)
  768. // Refresh it, and it must be gone.
  769. session.Refresh()
  770. stats = mgo.GetStats()
  771. c.Assert(stats.SocketsInUse, Equals, 0)
  772. }
  773. func (s *S) TestSyncTimeout(c *C) {
  774. if *fast {
  775. c.Skip("-fast")
  776. }
  777. session, err := mgo.Dial("localhost:40001")
  778. c.Assert(err, IsNil)
  779. defer session.Close()
  780. s.Stop("localhost:40001")
  781. timeout := 3 * time.Second
  782. session.SetSyncTimeout(timeout)
  783. started := time.Now()
  784. // Do something.
  785. result := struct{ Ok bool }{}
  786. err = session.Run("getLastError", &result)
  787. c.Assert(err, ErrorMatches, "no reachable servers")
  788. c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
  789. c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
  790. }
  791. func (s *S) TestDialWithTimeout(c *C) {
  792. if *fast {
  793. c.Skip("-fast")
  794. }
  795. timeout := 2 * time.Second
  796. started := time.Now()
  797. // 40009 isn't used by the test servers.
  798. session, err := mgo.DialWithTimeout("localhost:40009", timeout)
  799. if session != nil {
  800. session.Close()
  801. }
  802. c.Assert(err, ErrorMatches, "no reachable servers")
  803. c.Assert(session, IsNil)
  804. c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
  805. c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
  806. }
  807. func (s *S) TestSocketTimeout(c *C) {
  808. if *fast {
  809. c.Skip("-fast")
  810. }
  811. session, err := mgo.Dial("localhost:40001")
  812. c.Assert(err, IsNil)
  813. defer session.Close()
  814. s.Freeze("localhost:40001")
  815. timeout := 3 * time.Second
  816. session.SetSocketTimeout(timeout)
  817. started := time.Now()
  818. // Do something.
  819. result := struct{ Ok bool }{}
  820. err = session.Run("getLastError", &result)
  821. c.Assert(err, ErrorMatches, ".*: i/o timeout")
  822. c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
  823. c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
  824. }
  825. func (s *S) TestSocketTimeoutOnDial(c *C) {
  826. if *fast {
  827. c.Skip("-fast")
  828. }
  829. timeout := 1 * time.Second
  830. defer mgo.HackSyncSocketTimeout(timeout)()
  831. s.Freeze("localhost:40001")
  832. started := time.Now()
  833. session, err := mgo.DialWithTimeout("localhost:40001", timeout)
  834. c.Assert(err, ErrorMatches, "no reachable servers")
  835. c.Assert(session, IsNil)
  836. c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
  837. c.Assert(started.After(time.Now().Add(-20*time.Second)), Equals, true)
  838. }
  839. func (s *S) TestSocketTimeoutOnInactiveSocket(c *C) {
  840. if *fast {
  841. c.Skip("-fast")
  842. }
  843. session, err := mgo.Dial("localhost:40001")
  844. c.Assert(err, IsNil)
  845. defer session.Close()
  846. timeout := 2 * time.Second
  847. session.SetSocketTimeout(timeout)
  848. // Do something that relies on the timeout and works.
  849. c.Assert(session.Ping(), IsNil)
  850. // Freeze and wait for the timeout to go by.
  851. s.Freeze("localhost:40001")
  852. time.Sleep(timeout + 500*time.Millisecond)
  853. s.Thaw("localhost:40001")
  854. // Do something again. The timeout above should not have killed
  855. // the socket as there was nothing to be done.
  856. c.Assert(session.Ping(), IsNil)
  857. }
  858. func (s *S) TestDialWithReplicaSetName(c *C) {
  859. seedLists := [][]string{
  860. // rs1 primary and rs2 primary
  861. []string{"localhost:40011", "localhost:40021"},
  862. // rs1 primary and rs2 secondary
  863. []string{"localhost:40011", "localhost:40022"},
  864. // rs1 secondary and rs2 primary
  865. []string{"localhost:40012", "localhost:40021"},
  866. // rs1 secondary and rs2 secondary
  867. []string{"localhost:40012", "localhost:40022"},
  868. }
  869. rs2Members := []string{":40021", ":40022", ":40023"}
  870. verifySyncedServers := func(session *mgo.Session, numServers int) {
  871. // wait for the server(s) to be synced
  872. for len(session.LiveServers()) != numServers {
  873. c.Log("Waiting for cluster sync to finish...")
  874. time.Sleep(5e8)
  875. }
  876. // ensure none of the rs2 set members are communicated with
  877. for _, addr := range session.LiveServers() {
  878. for _, rs2Member := range rs2Members {
  879. c.Assert(strings.HasSuffix(addr, rs2Member), Equals, false)
  880. }
  881. }
  882. }
  883. // only communication with rs1 members is expected
  884. for _, seedList := range seedLists {
  885. info := mgo.DialInfo{
  886. Addrs: seedList,
  887. Timeout: 5 * time.Second,
  888. ReplicaSetName: "rs1",
  889. }
  890. session, err := mgo.DialWithInfo(&info)
  891. c.Assert(err, IsNil)
  892. verifySyncedServers(session, 3)
  893. session.Close()
  894. info.Direct = true
  895. session, err = mgo.DialWithInfo(&info)
  896. c.Assert(err, IsNil)
  897. verifySyncedServers(session, 1)
  898. session.Close()
  899. connectionUrl := fmt.Sprintf("mongodb://%v/?replicaSet=rs1", strings.Join(seedList, ","))
  900. session, err = mgo.Dial(connectionUrl)
  901. c.Assert(err, IsNil)
  902. verifySyncedServers(session, 3)
  903. session.Close()
  904. connectionUrl += "&connect=direct"
  905. session, err = mgo.Dial(connectionUrl)
  906. c.Assert(err, IsNil)
  907. verifySyncedServers(session, 1)
  908. session.Close()
  909. }
  910. }
  911. func (s *S) TestDirect(c *C) {
  912. session, err := mgo.Dial("localhost:40012?connect=direct")
  913. c.Assert(err, IsNil)
  914. defer session.Close()
  915. // We know that server is a slave.
  916. session.SetMode(mgo.Monotonic, true)
  917. result := &struct{ Host string }{}
  918. err = session.Run("serverStatus", result)
  919. c.Assert(err, IsNil)
  920. c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
  921. stats := mgo.GetStats()
  922. c.Assert(stats.SocketsAlive, Equals, 1)
  923. c.Assert(stats.SocketsInUse, Equals, 1)
  924. c.Assert(stats.SocketRefs, Equals, 1)
  925. // We've got no master, so it'll timeout.
  926. session.SetSyncTimeout(5e8 * time.Nanosecond)
  927. coll := session.DB("mydb").C("mycoll")
  928. err = coll.Insert(M{"test": 1})
  929. c.Assert(err, ErrorMatches, "no reachable servers")
  930. // Writing to the local database is okay.
  931. coll = session.DB("local").C("mycoll")
  932. defer coll.RemoveAll(nil)
  933. id := bson.NewObjectId()
  934. err = coll.Insert(M{"_id": id})
  935. c.Assert(err, IsNil)
  936. // Data was stored in the right server.
  937. n, err := coll.Find(M{"_id": id}).Count()
  938. c.Assert(err, IsNil)
  939. c.Assert(n, Equals, 1)
  940. // Server hasn't changed.
  941. result.Host = ""
  942. err = session.Run("serverStatus", result)
  943. c.Assert(err, IsNil)
  944. c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
  945. }
  946. func (s *S) TestDirectToUnknownStateMember(c *C) {
  947. session, err := mgo.Dial("localhost:40041?connect=direct")
  948. c.Assert(err, IsNil)
  949. defer session.Close()
  950. session.SetMode(mgo.Monotonic, true)
  951. result := &struct{ Host string }{}
  952. err = session.Run("serverStatus", result)
  953. c.Assert(err, IsNil)
  954. c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
  955. // We've got no master, so it'll timeout.
  956. session.SetSyncTimeout(5e8 * time.Nanosecond)
  957. coll := session.DB("mydb").C("mycoll")
  958. err = coll.Insert(M{"test": 1})
  959. c.Assert(err, ErrorMatches, "no reachable servers")
  960. // Slave is still reachable.
  961. result.Host = ""
  962. err = session.Run("serverStatus", result)
  963. c.Assert(err, IsNil)
  964. c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
  965. }
  966. func (s *S) TestFailFast(c *C) {
  967. info := mgo.DialInfo{
  968. Addrs: []string{"localhost:99999"},
  969. Timeout: 5 * time.Second,
  970. FailFast: true,
  971. }
  972. started := time.Now()
  973. _, err := mgo.DialWithInfo(&info)
  974. c.Assert(err, ErrorMatches, "no reachable servers")
  975. c.Assert(started.After(time.Now().Add(-time.Second)), Equals, true)
  976. }
  977. func (s *S) countQueries(c *C, server string) (n int) {
  978. defer func() { c.Logf("Queries for %q: %d", server, n) }()
  979. session, err := mgo.Dial(server + "?connect=direct")
  980. c.Assert(err, IsNil)
  981. defer session.Close()
  982. session.SetMode(mgo.Monotonic, true)
  983. var result struct {
  984. OpCounters struct {
  985. Query int
  986. }
  987. Metrics struct {
  988. Commands struct{ Find struct{ Total int } }
  989. }
  990. }
  991. err = session.Run("serverStatus", &result)
  992. c.Assert(err, IsNil)
  993. if s.versionAtLeast(3, 2) {
  994. return result.Metrics.Commands.Find.Total
  995. }
  996. return result.OpCounters.Query
  997. }
  998. func (s *S) countCommands(c *C, server, commandName string) (n int) {
  999. defer func() { c.Logf("Queries for %q: %d", server, n) }()
  1000. session, err := mgo.Dial(server + "?connect=direct")
  1001. c.Assert(err, IsNil)
  1002. defer session.Close()
  1003. session.SetMode(mgo.Monotonic, true)
  1004. var result struct {
  1005. Metrics struct {
  1006. Commands map[string]struct{ Total int }
  1007. }
  1008. }
  1009. err = session.Run("serverStatus", &result)
  1010. c.Assert(err, IsNil)
  1011. return result.Metrics.Commands[commandName].Total
  1012. }
  1013. func (s *S) TestMonotonicSlaveOkFlagWithMongos(c *C) {
  1014. session, err := mgo.Dial("localhost:40021")
  1015. c.Assert(err, IsNil)
  1016. defer session.Close()
  1017. ssresult := &struct{ Host string }{}
  1018. imresult := &struct{ IsMaster bool }{}
  1019. // Figure the master while still using the strong session.
  1020. err = session.Run("serverStatus", ssresult)
  1021. c.Assert(err, IsNil)
  1022. err = session.Run("isMaster", imresult)
  1023. c.Assert(err, IsNil)
  1024. master := ssresult.Host
  1025. c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
  1026. // Ensure mongos is aware about the current topology.
  1027. s.Stop(":40201")
  1028. s.StartAll()
  1029. mongos, err := mgo.Dial("localhost:40202")
  1030. c.Assert(err, IsNil)
  1031. defer mongos.Close()
  1032. // Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
  1033. err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
  1034. c.Assert(err, IsNil)
  1035. // Wait until all servers see the data.
  1036. for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
  1037. session, err := mgo.Dial(addr + "?connect=direct")
  1038. c.Assert(err, IsNil)
  1039. defer session.Close()
  1040. session.SetMode(mgo.Monotonic, true)
  1041. for i := 300; i >= 0; i-- {
  1042. n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
  1043. c.Assert(err, IsNil)
  1044. if n == 1 {
  1045. break
  1046. }
  1047. if i == 0 {
  1048. c.Fatalf("Inserted data never reached " + addr)
  1049. }
  1050. time.Sleep(100 * time.Millisecond)
  1051. }
  1052. }
  1053. // Collect op counters for everyone.
  1054. q21a := s.countQueries(c, "localhost:40021")
  1055. q22a := s.countQueries(c, "localhost:40022")
  1056. q23a := s.countQueries(c, "localhost:40023")
  1057. // Do a SlaveOk query through MongoS
  1058. mongos.SetMode(mgo.Monotonic, true)
  1059. coll := mongos.DB("mydb").C("mycoll")
  1060. var result struct{ N int }
  1061. for i := 0; i != 5; i++ {
  1062. err = coll.Find(nil).One(&result)
  1063. c.Assert(err, IsNil)
  1064. c.Assert(result.N, Equals, 1)
  1065. }
  1066. // Collect op counters for everyone again.
  1067. q21b := s.countQueries(c, "localhost:40021")
  1068. q22b := s.countQueries(c, "localhost:40022")
  1069. q23b := s.countQueries(c, "localhost:40023")
  1070. var masterDelta, slaveDelta int
  1071. switch hostPort(master) {
  1072. case "40021":
  1073. masterDelta = q21b - q21a
  1074. slaveDelta = (q22b - q22a) + (q23b - q23a)
  1075. case "40022":
  1076. masterDelta = q22b - q22a
  1077. slaveDelta = (q21b - q21a) + (q23b - q23a)
  1078. case "40023":
  1079. masterDelta = q23b - q23a
  1080. slaveDelta = (q21b - q21a) + (q22b - q22a)
  1081. default:
  1082. c.Fatal("Uh?")
  1083. }
  1084. c.Check(masterDelta, Equals, 0) // Just the counting itself.
  1085. c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above.
  1086. }
  1087. func (s *S) TestSecondaryModeWithMongos(c *C) {
  1088. session, err := mgo.Dial("localhost:40021")
  1089. c.Assert(err, IsNil)
  1090. defer session.Close()
  1091. ssresult := &struct{ Host string }{}
  1092. imresult := &struct{ IsMaster bool }{}
  1093. // Figure the master while still using the strong session.
  1094. err = session.Run("serverStatus", ssresult)
  1095. c.Assert(err, IsNil)
  1096. err = session.Run("isMaster", imresult)
  1097. c.Assert(err, IsNil)
  1098. master := ssresult.Host
  1099. c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
  1100. // Ensure mongos is aware about the current topology.
  1101. s.Stop(":40201")
  1102. s.StartAll()
  1103. mongos, err := mgo.Dial("localhost:40202")
  1104. c.Assert(err, IsNil)
  1105. defer mongos.Close()
  1106. mongos.SetSyncTimeout(5 * time.Second)
  1107. // Insert some data as otherwise 3.2+ doesn't seem to run the query at all.
  1108. err = mongos.DB("mydb").C("mycoll").Insert(bson.M{"n": 1})
  1109. c.Assert(err, IsNil)
  1110. // Wait until all servers see the data.
  1111. for _, addr := range []string{"localhost:40021", "localhost:40022", "localhost:40023"} {
  1112. session, err := mgo.Dial(addr + "?connect=direct")
  1113. c.Assert(err, IsNil)
  1114. defer session.Close()
  1115. session.SetMode(mgo.Monotonic, true)
  1116. for i := 300; i >= 0; i-- {
  1117. n, err := session.DB("mydb").C("mycoll").Find(nil).Count()
  1118. c.Assert(err, IsNil)
  1119. if n == 1 {
  1120. break
  1121. }
  1122. if i == 0 {
  1123. c.Fatalf("Inserted data never reached " + addr)
  1124. }
  1125. time.Sleep(100 * time.Millisecond)
  1126. }
  1127. }
  1128. // Collect op counters for everyone.
  1129. q21a := s.countQueries(c, "localhost:40021")
  1130. q22a := s.countQueries(c, "localhost:40022")
  1131. q23a := s.countQueries(c, "localhost:40023")
  1132. // Do a Secondary query through MongoS
  1133. mongos.SetMode(mgo.Secondary, true)
  1134. coll := mongos.DB("mydb").C("mycoll")
  1135. var result struct{ N int }
  1136. for i := 0; i != 5; i++ {
  1137. err = coll.Find(nil).One(&result)
  1138. c.Assert(err, IsNil)
  1139. c.Assert(result.N, Equals, 1)
  1140. }
  1141. // Collect op counters for everyone again.
  1142. q21b := s.countQueries(c, "localhost:40021")
  1143. q22b := s.countQueries(c, "localhost:40022")
  1144. q23b := s.countQueries(c, "localhost:40023")
  1145. var masterDelta, slaveDelta int
  1146. switch hostPort(master) {
  1147. case "40021":
  1148. masterDelta = q21b - q21a
  1149. slaveDelta = (q22b - q22a) + (q23b - q23a)
  1150. case "40022":
  1151. masterDelta = q22b - q22a
  1152. slaveDelta = (q21b - q21a) + (q23b - q23a)
  1153. case "40023":
  1154. masterDelta = q23b - q23a
  1155. slaveDelta = (q21b - q21a) + (q22b - q22a)
  1156. default:
  1157. c.Fatal("Uh?")
  1158. }
  1159. c.Check(masterDelta, Equals, 0) // Just the counting itself.
  1160. c.Check(slaveDelta, Equals, 5) // The counting for both, plus 5 queries above.
  1161. }
  1162. func (s *S) TestSecondaryModeWithMongosInsert(c *C) {
  1163. if *fast {
  1164. c.Skip("-fast")
  1165. }
  1166. session, err := mgo.Dial("localhost:40202")
  1167. c.Assert(err, IsNil)
  1168. defer session.Close()
  1169. session.SetMode(mgo.Secondary, true)
  1170. session.SetSyncTimeout(4 * time.Second)
  1171. coll := session.DB("mydb").C("mycoll")
  1172. err = coll.Insert(M{"a": 1})
  1173. c.Assert(err, IsNil)
  1174. var result struct{ A int }
  1175. coll.Find(nil).One(&result)
  1176. c.Assert(result.A, Equals, 1)
  1177. }
  1178. func (s *S) TestRemovalOfClusterMember(c *C) {
  1179. if *fast {
  1180. c.Skip("-fast")
  1181. }
  1182. master, err := mgo.Dial("localhost:40021")
  1183. c.Assert(err, IsNil)
  1184. defer master.Close()
  1185. // Wait for cluster to fully sync up.
  1186. for i := 0; i < 10; i++ {
  1187. if len(master.LiveServers()) == 3 {
  1188. break
  1189. }
  1190. time.Sleep(5e8)
  1191. }
  1192. if len(master.LiveServers()) != 3 {
  1193. c.Fatalf("Test started with bad cluster state: %v", master.LiveServers())
  1194. }
  1195. result := &struct {
  1196. IsMaster bool
  1197. Me string
  1198. }{}
  1199. slave := master.Copy()
  1200. slave.SetMode(mgo.Monotonic, true) // Monotonic can hold a non-master socket persistently.
  1201. err = slave.Run("isMaster", result)
  1202. c.Assert(err, IsNil)
  1203. c.Assert(result.IsMaster, Equals, false)
  1204. slaveAddr := result.Me
  1205. defer func() {
  1206. config := map[string]string{
  1207. "40021": `{_id: 1, host: "127.0.0.1:40021", priority: 1, tags: {rs2: "a"}}`,
  1208. "40022": `{_id: 2, host: "127.0.0.1:40022", priority: 0, tags: {rs2: "b"}}`,
  1209. "40023": `{_id: 3, host: "127.0.0.1:40023", priority: 0, tags: {rs2: "c"}}`,
  1210. }
  1211. master.Refresh()
  1212. master.Run(bson.D{{"$eval", `rs.add(` + config[hostPort(slaveAddr)] + `)`}}, nil)
  1213. master.Close()
  1214. slave.Close()
  1215. // Ensure suite syncs up with the changes before next test.
  1216. s.Stop(":40201")
  1217. s.StartAll()
  1218. time.Sleep(8 * time.Second)
  1219. // TODO Find a better way to find out when mongos is fully aware that all
  1220. // servers are up. Without that follow up tests that depend on mongos will
  1221. // break due to their expectation of things being in a working state.
  1222. }()
  1223. c.Logf("========== Removing slave: %s ==========", slaveAddr)
  1224. master.Run(bson.D{{"$eval", `rs.remove("` + slaveAddr + `")`}}, nil)
  1225. master.Refresh()
  1226. // Give the cluster a moment to catch up by doing a roundtrip to the master.
  1227. err = master.Ping()
  1228. c.Assert(err, IsNil)
  1229. time.Sleep(3e9)
  1230. // This must fail since the slave has been taken off the cluster.
  1231. err = slave.Ping()
  1232. c.Assert(err, NotNil)
  1233. for i := 0; i < 15; i++ {
  1234. if len(master.LiveServers()) == 2 {
  1235. break
  1236. }
  1237. time.Sleep(time.Second)
  1238. }
  1239. live := master.LiveServers()
  1240. if len(live) != 2 {
  1241. c.Errorf("Removed server still considered live: %#s", live)
  1242. }
  1243. c.Log("========== Test succeeded. ==========")
  1244. }
  1245. func (s *S) TestPoolLimitSimple(c *C) {
  1246. for test := 0; test < 2; test++ {
  1247. var session *mgo.Session
  1248. var err error
  1249. if test == 0 {
  1250. session, err = mgo.Dial("localhost:40001")
  1251. c.Assert(err, IsNil)
  1252. session.SetPoolLimit(1)
  1253. } else {
  1254. session, err = mgo.Dial("localhost:40001?maxPoolSize=1")
  1255. c.Assert(err, IsNil)
  1256. }
  1257. defer session.Close()
  1258. // Put one socket in use.
  1259. c.Assert(session.Ping(), IsNil)
  1260. done := make(chan time.Duration)
  1261. // Now block trying to get another one due to the pool limit.
  1262. go func() {
  1263. copy := session.Copy()
  1264. defer copy.Close()
  1265. started := time.Now()
  1266. c.Check(copy.Ping(), IsNil)
  1267. done <- time.Now().Sub(started)
  1268. }()
  1269. time.Sleep(300 * time.Millisecond)
  1270. // Put the one socket back in the pool, freeing it for the copy.
  1271. session.Refresh()
  1272. delay := <-done
  1273. c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay))
  1274. }
  1275. }
  1276. func (s *S) TestPoolLimitMany(c *C) {
  1277. if *fast {
  1278. c.Skip("-fast")
  1279. }
  1280. session, err := mgo.Dial("localhost:40011")
  1281. c.Assert(err, IsNil)
  1282. defer session.Close()
  1283. stats := mgo.GetStats()
  1284. for stats.SocketsAlive != 3 {
  1285. c.Logf("Waiting for all connections to be established (sockets alive currently %d)...", stats.SocketsAlive)
  1286. stats = mgo.GetStats()
  1287. time.Sleep(5e8)
  1288. }
  1289. const poolLimit = 64
  1290. session.SetPoolLimit(poolLimit)
  1291. // Consume the whole limit for the master.
  1292. var master []*mgo.Session
  1293. for i := 0; i < poolLimit; i++ {
  1294. s := session.Copy()
  1295. defer s.Close()
  1296. c.Assert(s.Ping(), IsNil)
  1297. master = append(master, s)
  1298. }
  1299. before := time.Now()
  1300. go func() {
  1301. time.Sleep(3e9)
  1302. master[0].Refresh()
  1303. }()
  1304. // Then, a single ping must block, since it would need another
  1305. // connection to the master, over the limit. Once the goroutine
  1306. // above releases its socket, it should move on.
  1307. session.Ping()
  1308. delay := time.Now().Sub(before)
  1309. c.Assert(delay > 3e9, Equals, true)
  1310. c.Assert(delay < 6e9, Equals, true)
  1311. }
  1312. func (s *S) TestSetModeEventualIterBug(c *C) {
  1313. session1, err := mgo.Dial("localhost:40011")
  1314. c.Assert(err, IsNil)
  1315. defer session1.Close()
  1316. session1.SetMode(mgo.Eventual, false)
  1317. coll1 := session1.DB("mydb").C("mycoll")
  1318. const N = 100
  1319. for i := 0; i < N; i++ {
  1320. err = coll1.Insert(M{"_id": i})
  1321. c.Assert(err, IsNil)
  1322. }
  1323. c.Logf("Waiting until secondary syncs")
  1324. for {
  1325. n, err := coll1.Count()
  1326. c.Assert(err, IsNil)
  1327. if n == N {
  1328. c.Logf("Found all")
  1329. break
  1330. }
  1331. }
  1332. session2, err := mgo.Dial("localhost:40011")
  1333. c.Assert(err, IsNil)
  1334. defer session2.Close()
  1335. session2.SetMode(mgo.Eventual, false)
  1336. coll2 := session2.DB("mydb").C("mycoll")
  1337. i := 0
  1338. iter := coll2.Find(nil).Batch(10).Iter()
  1339. var result struct{}
  1340. for iter.Next(&result) {
  1341. i++
  1342. }
  1343. c.Assert(iter.Close(), Equals, nil)
  1344. c.Assert(i, Equals, N)
  1345. }
  1346. func (s *S) TestCustomDialOld(c *C) {
  1347. dials := make(chan bool, 16)
  1348. dial := func(addr net.Addr) (net.Conn, error) {
  1349. tcpaddr, ok := addr.(*net.TCPAddr)
  1350. if !ok {
  1351. return nil, fmt.Errorf("unexpected address type: %T", addr)
  1352. }
  1353. dials <- true
  1354. return net.DialTCP("tcp", nil, tcpaddr)
  1355. }
  1356. info := mgo.DialInfo{
  1357. Addrs: []string{"localhost:40012"},
  1358. Dial: dial,
  1359. }
  1360. // Use hostname here rather than IP, to make things trickier.
  1361. session, err := mgo.DialWithInfo(&info)
  1362. c.Assert(err, IsNil)
  1363. defer session.Close()
  1364. const N = 3
  1365. for i := 0; i < N; i++ {
  1366. select {
  1367. case <-dials:
  1368. case <-time.After(5 * time.Second):
  1369. c.Fatalf("expected %d dials, got %d", N, i)
  1370. }
  1371. }
  1372. select {
  1373. case <-dials:
  1374. c.Fatalf("got more dials than expected")
  1375. case <-time.After(100 * time.Millisecond):
  1376. }
  1377. }
  1378. func (s *S) TestCustomDialNew(c *C) {
  1379. dials := make(chan bool, 16)
  1380. dial := func(addr *mgo.ServerAddr) (net.Conn, error) {
  1381. dials <- true
  1382. if addr.TCPAddr().Port == 40012 {
  1383. c.Check(addr.String(), Equals, "localhost:40012")
  1384. }
  1385. return net.DialTCP("tcp", nil, addr.TCPAddr())
  1386. }
  1387. info := mgo.DialInfo{
  1388. Addrs: []string{"localhost:40012"},
  1389. DialServer: dial,
  1390. }
  1391. // Use hostname here rather than IP, to make things trickier.
  1392. session, err := mgo.DialWithInfo(&info)
  1393. c.Assert(err, IsNil)
  1394. defer session.Close()
  1395. const N = 3
  1396. for i := 0; i < N; i++ {
  1397. select {
  1398. case <-dials:
  1399. case <-time.After(5 * time.Second):
  1400. c.Fatalf("expected %d dials, got %d", N, i)
  1401. }
  1402. }
  1403. select {
  1404. case <-dials:
  1405. c.Fatalf("got more dials than expected")
  1406. case <-time.After(100 * time.Millisecond):
  1407. }
  1408. }
  1409. func (s *S) TestPrimaryShutdownOnAuthShard(c *C) {
  1410. if *fast {
  1411. c.Skip("-fast")
  1412. }
  1413. // Dial the shard.
  1414. session, err := mgo.Dial("localhost:40203")
  1415. c.Assert(err, IsNil)
  1416. defer session.Close()
  1417. // Login and insert something to make it more realistic.
  1418. session.DB("admin").Login("root", "rapadura")
  1419. coll := session.DB("mydb").C("mycoll")
  1420. err = coll.Insert(bson.M{"n": 1})
  1421. c.Assert(err, IsNil)
  1422. // Dial the replica set to figure the master out.
  1423. rs, err := mgo.Dial("root:rapadura@localhost:40031")
  1424. c.Assert(err, IsNil)
  1425. defer rs.Close()
  1426. // With strong consistency, this will open a socket to the master.
  1427. result := &struct{ Host string }{}
  1428. err = rs.Run("serverStatus", result)
  1429. c.Assert(err, IsNil)
  1430. // Kill the master.
  1431. host := result.Host
  1432. s.Stop(host)
  1433. // This must fail, since the connection was broken.
  1434. err = rs.Run("serverStatus", result)
  1435. c.Assert(err, Equals, io.EOF)
  1436. // This won't work because the master just died.
  1437. err = coll.Insert(bson.M{"n": 2})
  1438. c.Assert(err, NotNil)
  1439. // Refresh session and wait for re-election.
  1440. session.Refresh()
  1441. for i := 0; i < 60; i++ {
  1442. err = coll.Insert(bson.M{"n": 3})
  1443. if err == nil {
  1444. break
  1445. }
  1446. c.Logf("Waiting for replica set to elect a new master. Last error: %v", err)
  1447. time.Sleep(500 * time.Millisecond)
  1448. }
  1449. c.Assert(err, IsNil)
  1450. count, err := coll.Count()
  1451. c.Assert(count > 1, Equals, true)
  1452. }
  1453. func (s *S) TestNearestSecondary(c *C) {
  1454. defer mgo.HackPingDelay(300 * time.Millisecond)()
  1455. rs1a := "127.0.0.1:40011"
  1456. rs1b := "127.0.0.1:40012"
  1457. rs1c := "127.0.0.1:40013"
  1458. s.Freeze(rs1b)
  1459. session, err := mgo.Dial(rs1a)
  1460. c.Assert(err, IsNil)
  1461. defer session.Close()
  1462. // Wait for the sync up to run through the first couple of servers.
  1463. for len(session.LiveServers()) != 2 {
  1464. c.Log("Waiting for two servers to be alive...")
  1465. time.Sleep(100 * time.Millisecond)
  1466. }
  1467. // Extra delay to ensure the third server gets penalized.
  1468. time.Sleep(500 * time.Millisecond)
  1469. // Release third server.
  1470. s.Thaw(rs1b)
  1471. // Wait for it to come up.
  1472. for len(session.LiveServers()) != 3 {
  1473. c.Log("Waiting for all servers to be alive...")
  1474. time.Sleep(100 * time.Millisecond)
  1475. }
  1476. session.SetMode(mgo.Monotonic, true)
  1477. var result struct{ Host string }
  1478. // See which slave picks the line, several times to avoid chance.
  1479. for i := 0; i < 10; i++ {
  1480. session.Refresh()
  1481. err = session.Run("serverStatus", &result)
  1482. c.Assert(err, IsNil)
  1483. c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
  1484. }
  1485. if *fast {
  1486. // Don't hold back for several seconds.
  1487. return
  1488. }
  1489. // Now hold the other server for long enough to penalize it.
  1490. s.Freeze(rs1c)
  1491. time.Sleep(5 * time.Second)
  1492. s.Thaw(rs1c)
  1493. // Wait for the ping to be processed.
  1494. time.Sleep(500 * time.Millisecond)
  1495. // Repeating the test should now pick the former server consistently.
  1496. for i := 0; i < 10; i++ {
  1497. session.Refresh()
  1498. err = session.Run("serverStatus", &result)
  1499. c.Assert(err, IsNil)
  1500. c.Assert(hostPort(result.Host), Equals, hostPort(rs1b))
  1501. }
  1502. }
  1503. func (s *S) TestNearestServer(c *C) {
  1504. defer mgo.HackPingDelay(300 * time.Millisecond)()
  1505. rs1a := "127.0.0.1:40011"
  1506. rs1b := "127.0.0.1:40012"
  1507. rs1c := "127.0.0.1:40013"
  1508. session, err := mgo.Dial(rs1a)
  1509. c.Assert(err, IsNil)
  1510. defer session.Close()
  1511. s.Freeze(rs1a)
  1512. s.Freeze(rs1b)
  1513. // Extra delay to ensure the first two servers get penalized.
  1514. time.Sleep(500 * time.Millisecond)
  1515. // Release them.
  1516. s.Thaw(rs1a)
  1517. s.Thaw(rs1b)
  1518. // Wait for everyone to come up.
  1519. for len(session.LiveServers()) != 3 {
  1520. c.Log("Waiting for all servers to be alive...")
  1521. time.Sleep(100 * time.Millisecond)
  1522. }
  1523. session.SetMode(mgo.Nearest, true)
  1524. var result struct{ Host string }
  1525. // See which server picks the line, several times to avoid chance.
  1526. for i := 0; i < 10; i++ {
  1527. session.Refresh()
  1528. err = session.Run("serverStatus", &result)
  1529. c.Assert(err, IsNil)
  1530. c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
  1531. }
  1532. if *fast {
  1533. // Don't hold back for several seconds.
  1534. return
  1535. }
  1536. // Now hold the two secondaries for long enough to penalize them.
  1537. s.Freeze(rs1b)
  1538. s.Freeze(rs1c)
  1539. time.Sleep(5 * time.Second)
  1540. s.Thaw(rs1b)
  1541. s.Thaw(rs1c)
  1542. // Wait for the ping to be processed.
  1543. time.Sleep(500 * time.Millisecond)
  1544. // Repeating the test should now pick the primary server consistently.
  1545. for i := 0; i < 10; i++ {
  1546. session.Refresh()
  1547. err = session.Run("serverStatus", &result)
  1548. c.Assert(err, IsNil)
  1549. c.Assert(hostPort(result.Host), Equals, hostPort(rs1a))
  1550. }
  1551. }
  1552. func (s *S) TestConnectCloseConcurrency(c *C) {
  1553. restore := mgo.HackPingDelay(500 * time.Millisecond)
  1554. defer restore()
  1555. var wg sync.WaitGroup
  1556. const n = 500
  1557. wg.Add(n)
  1558. for i := 0; i < n; i++ {
  1559. go func() {
  1560. defer wg.Done()
  1561. session, err := mgo.Dial("localhost:40001")
  1562. if err != nil {
  1563. c.Fatal(err)
  1564. }
  1565. time.Sleep(1)
  1566. session.Close()
  1567. }()
  1568. }
  1569. wg.Wait()
  1570. }
  1571. func (s *S) TestSelectServers(c *C) {
  1572. if !s.versionAtLeast(2, 2) {
  1573. c.Skip("read preferences introduced in 2.2")
  1574. }
  1575. session, err := mgo.Dial("localhost:40011")
  1576. c.Assert(err, IsNil)
  1577. defer session.Close()
  1578. session.SetMode(mgo.Eventual, true)
  1579. var result struct{ Host string }
  1580. session.Refresh()
  1581. session.SelectServers(bson.D{{"rs1", "b"}})
  1582. err = session.Run("serverStatus", &result)
  1583. c.Assert(err, IsNil)
  1584. c.Assert(hostPort(result.Host), Equals, "40012")
  1585. session.Refresh()
  1586. session.SelectServers(bson.D{{"rs1", "c"}})
  1587. err = session.Run("serverStatus", &result)
  1588. c.Assert(err, IsNil)
  1589. c.Assert(hostPort(result.Host), Equals, "40013")
  1590. }
  1591. func (s *S) TestSelectServersWithMongos(c *C) {
  1592. if !s.versionAtLeast(2, 2) {
  1593. c.Skip("read preferences introduced in 2.2")
  1594. }
  1595. session, err := mgo.Dial("localhost:40021")
  1596. c.Assert(err, IsNil)
  1597. defer session.Close()
  1598. ssresult := &struct{ Host string }{}
  1599. imresult := &struct{ IsMaster bool }{}
  1600. // Figure the master while still using the strong session.
  1601. err = session.Run("serverStatus", ssresult)
  1602. c.Assert(err, IsNil)
  1603. err = session.Run("isMaster", imresult)
  1604. c.Assert(err, IsNil)
  1605. master := ssresult.Host
  1606. c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
  1607. var slave1, slave2 string
  1608. switch hostPort(master) {
  1609. case "40021":
  1610. slave1, slave2 = "b", "c"
  1611. case "40022":
  1612. slave1, slave2 = "a", "c"
  1613. case "40023":
  1614. slave1, slave2 = "a", "b"
  1615. }
  1616. // Collect op counters for everyone.
  1617. q21a := s.countQueries(c, "localhost:40021")
  1618. q22a := s.countQueries(c, "localhost:40022")
  1619. q23a := s.countQueries(c, "localhost:40023")
  1620. // Do a SlaveOk query through MongoS
  1621. mongos, err := mgo.Dial("localhost:40202")
  1622. c.Assert(err, IsNil)
  1623. defer mongos.Close()
  1624. mongos.SetMode(mgo.Monotonic, true)
  1625. mongos.Refresh()
  1626. mongos.SelectServers(bson.D{{"rs2", slave1}})
  1627. coll := mongos.DB("mydb").C("mycoll")
  1628. result := &struct{}{}
  1629. for i := 0; i != 5; i++ {
  1630. err := coll.Find(nil).One(result)
  1631. c.Assert(err, Equals, mgo.ErrNotFound)
  1632. }
  1633. mongos.Refresh()
  1634. mongos.SelectServers(bson.D{{"rs2", slave2}})
  1635. coll = mongos.DB("mydb").C("mycoll")
  1636. for i := 0; i != 7; i++ {
  1637. err := coll.Find(nil).One(result)
  1638. c.Assert(err, Equals, mgo.ErrNotFound)
  1639. }
  1640. // Collect op counters for everyone again.
  1641. q21b := s.countQueries(c, "localhost:40021")
  1642. q22b := s.countQueries(c, "localhost:40022")
  1643. q23b := s.countQueries(c, "localhost:40023")
  1644. switch hostPort(master) {
  1645. case "40021":
  1646. c.Check(q21b-q21a, Equals, 0)
  1647. c.Check(q22b-q22a, Equals, 5)
  1648. c.Check(q23b-q23a, Equals, 7)
  1649. case "40022":
  1650. c.Check(q21b-q21a, Equals, 5)
  1651. c.Check(q22b-q22a, Equals, 0)
  1652. c.Check(q23b-q23a, Equals, 7)
  1653. case "40023":
  1654. c.Check(q21b-q21a, Equals, 5)
  1655. c.Check(q22b-q22a, Equals, 7)
  1656. c.Check(q23b-q23a, Equals, 0)
  1657. default:
  1658. c.Fatal("Uh?")
  1659. }
  1660. }
  1661. func (s *S) TestDoNotFallbackToMonotonic(c *C) {
  1662. // There was a bug at some point that some functions were
  1663. // falling back to Monotonic mode. This test ensures all listIndexes
  1664. // commands go to the primary, as should happen since the session is
  1665. // in Strong mode.
  1666. if !s.versionAtLeast(3, 0) {
  1667. c.Skip("command-counting logic depends on 3.0+")
  1668. }
  1669. session, err := mgo.Dial("localhost:40012")
  1670. c.Assert(err, IsNil)
  1671. defer session.Close()
  1672. for i := 0; i < 15; i++ {
  1673. q11a := s.countCommands(c, "localhost:40011", "listIndexes")
  1674. q12a := s.countCommands(c, "localhost:40012", "listIndexes")
  1675. q13a := s.countCommands(c, "localhost:40013", "listIndexes")
  1676. _, err := session.DB("local").C("system.indexes").Indexes()
  1677. c.Assert(err, IsNil)
  1678. q11b := s.countCommands(c, "localhost:40011", "listIndexes")
  1679. q12b := s.countCommands(c, "localhost:40012", "listIndexes")
  1680. q13b := s.countCommands(c, "localhost:40013", "listIndexes")
  1681. c.Assert(q11b, Equals, q11a+1)
  1682. c.Assert(q12b, Equals, q12a)
  1683. c.Assert(q13b, Equals, q13a)
  1684. }
  1685. }