wal_test.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. // Copyright 2015 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package wal
  15. import (
  16. "bytes"
  17. "io"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "reflect"
  23. "testing"
  24. "github.com/coreos/etcd/pkg/fileutil"
  25. "github.com/coreos/etcd/pkg/pbutil"
  26. "github.com/coreos/etcd/raft/raftpb"
  27. "github.com/coreos/etcd/wal/walpb"
  28. )
  29. func TestNew(t *testing.T) {
  30. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. defer os.RemoveAll(p)
  35. w, err := Create(p, []byte("somedata"))
  36. if err != nil {
  37. t.Fatalf("err = %v, want nil", err)
  38. }
  39. if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
  40. t.Errorf("name = %+v, want %+v", g, walName(0, 0))
  41. }
  42. defer w.Close()
  43. // file is preallocated to segment size; only read data written by wal
  44. off, err := w.tail().Seek(0, io.SeekCurrent)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. gd := make([]byte, off)
  49. f, err := os.Open(filepath.Join(p, filepath.Base(w.tail().Name())))
  50. if err != nil {
  51. t.Fatal(err)
  52. }
  53. defer f.Close()
  54. if _, err = io.ReadFull(f, gd); err != nil {
  55. t.Fatalf("err = %v, want nil", err)
  56. }
  57. var wb bytes.Buffer
  58. e := newEncoder(&wb, 0, 0)
  59. err = e.encode(&walpb.Record{Type: crcType, Crc: 0})
  60. if err != nil {
  61. t.Fatalf("err = %v, want nil", err)
  62. }
  63. err = e.encode(&walpb.Record{Type: metadataType, Data: []byte("somedata")})
  64. if err != nil {
  65. t.Fatalf("err = %v, want nil", err)
  66. }
  67. r := &walpb.Record{
  68. Type: snapshotType,
  69. Data: pbutil.MustMarshal(&walpb.Snapshot{}),
  70. }
  71. if err = e.encode(r); err != nil {
  72. t.Fatalf("err = %v, want nil", err)
  73. }
  74. e.flush()
  75. if !bytes.Equal(gd, wb.Bytes()) {
  76. t.Errorf("data = %v, want %v", gd, wb.Bytes())
  77. }
  78. }
  79. func TestNewForInitedDir(t *testing.T) {
  80. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  81. if err != nil {
  82. t.Fatal(err)
  83. }
  84. defer os.RemoveAll(p)
  85. os.Create(filepath.Join(p, walName(0, 0)))
  86. if _, err = Create(p, nil); err == nil || err != os.ErrExist {
  87. t.Errorf("err = %v, want %v", err, os.ErrExist)
  88. }
  89. }
  90. func TestOpenAtIndex(t *testing.T) {
  91. dir, err := ioutil.TempDir(os.TempDir(), "waltest")
  92. if err != nil {
  93. t.Fatal(err)
  94. }
  95. defer os.RemoveAll(dir)
  96. f, err := os.Create(filepath.Join(dir, walName(0, 0)))
  97. if err != nil {
  98. t.Fatal(err)
  99. }
  100. f.Close()
  101. w, err := Open(dir, walpb.Snapshot{})
  102. if err != nil {
  103. t.Fatalf("err = %v, want nil", err)
  104. }
  105. if g := filepath.Base(w.tail().Name()); g != walName(0, 0) {
  106. t.Errorf("name = %+v, want %+v", g, walName(0, 0))
  107. }
  108. if w.seq() != 0 {
  109. t.Errorf("seq = %d, want %d", w.seq(), 0)
  110. }
  111. w.Close()
  112. wname := walName(2, 10)
  113. f, err = os.Create(filepath.Join(dir, wname))
  114. if err != nil {
  115. t.Fatal(err)
  116. }
  117. f.Close()
  118. w, err = Open(dir, walpb.Snapshot{Index: 5})
  119. if err != nil {
  120. t.Fatalf("err = %v, want nil", err)
  121. }
  122. if g := filepath.Base(w.tail().Name()); g != wname {
  123. t.Errorf("name = %+v, want %+v", g, wname)
  124. }
  125. if w.seq() != 2 {
  126. t.Errorf("seq = %d, want %d", w.seq(), 2)
  127. }
  128. w.Close()
  129. emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty")
  130. if err != nil {
  131. t.Fatal(err)
  132. }
  133. defer os.RemoveAll(emptydir)
  134. if _, err = Open(emptydir, walpb.Snapshot{}); err != ErrFileNotFound {
  135. t.Errorf("err = %v, want %v", err, ErrFileNotFound)
  136. }
  137. }
  138. // TestVerify tests that Verify throws a non-nil error when the WAL is corrupted.
  139. // The test creates a WAL directory and cuts out multiple WAL files. Then
  140. // it corrupts one of the files by completely truncating it.
  141. func TestVerify(t *testing.T) {
  142. walDir, err := ioutil.TempDir(os.TempDir(), "waltest")
  143. if err != nil {
  144. t.Fatal(err)
  145. }
  146. defer os.RemoveAll(walDir)
  147. // create WAL
  148. w, err := Create(walDir, nil)
  149. if err != nil {
  150. t.Fatal(err)
  151. }
  152. defer w.Close()
  153. // make 5 separate files
  154. for i := 0; i < 5; i++ {
  155. es := []raftpb.Entry{{Index: uint64(i), Data: []byte("waldata" + string(i+1))}}
  156. if err = w.Save(raftpb.HardState{}, es); err != nil {
  157. t.Fatal(err)
  158. }
  159. if err = w.cut(); err != nil {
  160. t.Fatal(err)
  161. }
  162. }
  163. // to verify the WAL is not corrupted at this point
  164. err = Verify(walDir, walpb.Snapshot{})
  165. if err != nil {
  166. t.Errorf("expected a nil error, got %v", err)
  167. }
  168. walFiles, err := ioutil.ReadDir(walDir)
  169. if err != nil {
  170. t.Fatal(err)
  171. }
  172. // corrupt the WAL by truncating one of the WAL files completely
  173. err = os.Truncate(path.Join(walDir, walFiles[2].Name()), 0)
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. err = Verify(walDir, walpb.Snapshot{})
  178. if err == nil {
  179. t.Error("expected a non-nil error, got nil")
  180. }
  181. }
  182. // TODO: split it into smaller tests for better readability
  183. func TestCut(t *testing.T) {
  184. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  185. if err != nil {
  186. t.Fatal(err)
  187. }
  188. defer os.RemoveAll(p)
  189. w, err := Create(p, nil)
  190. if err != nil {
  191. t.Fatal(err)
  192. }
  193. defer w.Close()
  194. state := raftpb.HardState{Term: 1}
  195. if err = w.Save(state, nil); err != nil {
  196. t.Fatal(err)
  197. }
  198. if err = w.cut(); err != nil {
  199. t.Fatal(err)
  200. }
  201. wname := walName(1, 1)
  202. if g := filepath.Base(w.tail().Name()); g != wname {
  203. t.Errorf("name = %s, want %s", g, wname)
  204. }
  205. es := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}}
  206. if err = w.Save(raftpb.HardState{}, es); err != nil {
  207. t.Fatal(err)
  208. }
  209. if err = w.cut(); err != nil {
  210. t.Fatal(err)
  211. }
  212. snap := walpb.Snapshot{Index: 2, Term: 1}
  213. if err = w.SaveSnapshot(snap); err != nil {
  214. t.Fatal(err)
  215. }
  216. wname = walName(2, 2)
  217. if g := filepath.Base(w.tail().Name()); g != wname {
  218. t.Errorf("name = %s, want %s", g, wname)
  219. }
  220. // check the state in the last WAL
  221. // We do check before closing the WAL to ensure that Cut syncs the data
  222. // into the disk.
  223. f, err := os.Open(filepath.Join(p, wname))
  224. if err != nil {
  225. t.Fatal(err)
  226. }
  227. defer f.Close()
  228. nw := &WAL{
  229. decoder: newDecoder(f),
  230. start: snap,
  231. }
  232. _, gst, _, err := nw.ReadAll()
  233. if err != nil {
  234. t.Fatal(err)
  235. }
  236. if !reflect.DeepEqual(gst, state) {
  237. t.Errorf("state = %+v, want %+v", gst, state)
  238. }
  239. }
  240. func TestSaveWithCut(t *testing.T) {
  241. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  242. if err != nil {
  243. t.Fatal(err)
  244. }
  245. defer os.RemoveAll(p)
  246. w, err := Create(p, []byte("metadata"))
  247. if err != nil {
  248. t.Fatal(err)
  249. }
  250. state := raftpb.HardState{Term: 1}
  251. if err = w.Save(state, nil); err != nil {
  252. t.Fatal(err)
  253. }
  254. bigData := make([]byte, 500)
  255. strdata := "Hello World!!"
  256. copy(bigData, strdata)
  257. // set a lower value for SegmentSizeBytes, else the test takes too long to complete
  258. restoreLater := SegmentSizeBytes
  259. const EntrySize int = 500
  260. SegmentSizeBytes = 2 * 1024
  261. defer func() { SegmentSizeBytes = restoreLater }()
  262. var index uint64 = 0
  263. for totalSize := 0; totalSize < int(SegmentSizeBytes); totalSize += EntrySize {
  264. ents := []raftpb.Entry{{Index: index, Term: 1, Data: bigData}}
  265. if err = w.Save(state, ents); err != nil {
  266. t.Fatal(err)
  267. }
  268. index++
  269. }
  270. w.Close()
  271. neww, err := Open(p, walpb.Snapshot{})
  272. if err != nil {
  273. t.Fatalf("err = %v, want nil", err)
  274. }
  275. defer neww.Close()
  276. wname := walName(1, index)
  277. if g := filepath.Base(neww.tail().Name()); g != wname {
  278. t.Errorf("name = %s, want %s", g, wname)
  279. }
  280. _, newhardstate, entries, err := neww.ReadAll()
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. if !reflect.DeepEqual(newhardstate, state) {
  285. t.Errorf("Hard State = %+v, want %+v", newhardstate, state)
  286. }
  287. if len(entries) != int(SegmentSizeBytes/int64(EntrySize)) {
  288. t.Errorf("Number of entries = %d, expected = %d", len(entries), int(SegmentSizeBytes/int64(EntrySize)))
  289. }
  290. for _, oneent := range entries {
  291. if !bytes.Equal(oneent.Data, bigData) {
  292. t.Errorf("the saved data does not match at Index %d : found: %s , want :%s", oneent.Index, oneent.Data, bigData)
  293. }
  294. }
  295. }
  296. func TestRecover(t *testing.T) {
  297. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  298. if err != nil {
  299. t.Fatal(err)
  300. }
  301. defer os.RemoveAll(p)
  302. w, err := Create(p, []byte("metadata"))
  303. if err != nil {
  304. t.Fatal(err)
  305. }
  306. if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
  307. t.Fatal(err)
  308. }
  309. ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
  310. if err = w.Save(raftpb.HardState{}, ents); err != nil {
  311. t.Fatal(err)
  312. }
  313. sts := []raftpb.HardState{{Term: 1, Vote: 1, Commit: 1}, {Term: 2, Vote: 2, Commit: 2}}
  314. for _, s := range sts {
  315. if err = w.Save(s, nil); err != nil {
  316. t.Fatal(err)
  317. }
  318. }
  319. w.Close()
  320. if w, err = Open(p, walpb.Snapshot{}); err != nil {
  321. t.Fatal(err)
  322. }
  323. metadata, state, entries, err := w.ReadAll()
  324. if err != nil {
  325. t.Fatal(err)
  326. }
  327. if !bytes.Equal(metadata, []byte("metadata")) {
  328. t.Errorf("metadata = %s, want %s", metadata, "metadata")
  329. }
  330. if !reflect.DeepEqual(entries, ents) {
  331. t.Errorf("ents = %+v, want %+v", entries, ents)
  332. }
  333. // only the latest state is recorded
  334. s := sts[len(sts)-1]
  335. if !reflect.DeepEqual(state, s) {
  336. t.Errorf("state = %+v, want %+v", state, s)
  337. }
  338. w.Close()
  339. }
  340. func TestSearchIndex(t *testing.T) {
  341. tests := []struct {
  342. names []string
  343. index uint64
  344. widx int
  345. wok bool
  346. }{
  347. {
  348. []string{
  349. "0000000000000000-0000000000000000.wal",
  350. "0000000000000001-0000000000001000.wal",
  351. "0000000000000002-0000000000002000.wal",
  352. },
  353. 0x1000, 1, true,
  354. },
  355. {
  356. []string{
  357. "0000000000000001-0000000000004000.wal",
  358. "0000000000000002-0000000000003000.wal",
  359. "0000000000000003-0000000000005000.wal",
  360. },
  361. 0x4000, 1, true,
  362. },
  363. {
  364. []string{
  365. "0000000000000001-0000000000002000.wal",
  366. "0000000000000002-0000000000003000.wal",
  367. "0000000000000003-0000000000005000.wal",
  368. },
  369. 0x1000, -1, false,
  370. },
  371. }
  372. for i, tt := range tests {
  373. idx, ok := searchIndex(tt.names, tt.index)
  374. if idx != tt.widx {
  375. t.Errorf("#%d: idx = %d, want %d", i, idx, tt.widx)
  376. }
  377. if ok != tt.wok {
  378. t.Errorf("#%d: ok = %v, want %v", i, ok, tt.wok)
  379. }
  380. }
  381. }
  382. func TestScanWalName(t *testing.T) {
  383. tests := []struct {
  384. str string
  385. wseq, windex uint64
  386. wok bool
  387. }{
  388. {"0000000000000000-0000000000000000.wal", 0, 0, true},
  389. {"0000000000000000.wal", 0, 0, false},
  390. {"0000000000000000-0000000000000000.snap", 0, 0, false},
  391. }
  392. for i, tt := range tests {
  393. s, index, err := parseWalName(tt.str)
  394. if g := err == nil; g != tt.wok {
  395. t.Errorf("#%d: ok = %v, want %v", i, g, tt.wok)
  396. }
  397. if s != tt.wseq {
  398. t.Errorf("#%d: seq = %d, want %d", i, s, tt.wseq)
  399. }
  400. if index != tt.windex {
  401. t.Errorf("#%d: index = %d, want %d", i, index, tt.windex)
  402. }
  403. }
  404. }
  405. func TestRecoverAfterCut(t *testing.T) {
  406. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  407. if err != nil {
  408. t.Fatal(err)
  409. }
  410. defer os.RemoveAll(p)
  411. md, err := Create(p, []byte("metadata"))
  412. if err != nil {
  413. t.Fatal(err)
  414. }
  415. for i := 0; i < 10; i++ {
  416. if err = md.SaveSnapshot(walpb.Snapshot{Index: uint64(i)}); err != nil {
  417. t.Fatal(err)
  418. }
  419. es := []raftpb.Entry{{Index: uint64(i)}}
  420. if err = md.Save(raftpb.HardState{}, es); err != nil {
  421. t.Fatal(err)
  422. }
  423. if err = md.cut(); err != nil {
  424. t.Fatal(err)
  425. }
  426. }
  427. md.Close()
  428. if err := os.Remove(filepath.Join(p, walName(4, 4))); err != nil {
  429. t.Fatal(err)
  430. }
  431. for i := 0; i < 10; i++ {
  432. w, err := Open(p, walpb.Snapshot{Index: uint64(i)})
  433. if err != nil {
  434. if i <= 4 {
  435. if err != ErrFileNotFound {
  436. t.Errorf("#%d: err = %v, want %v", i, err, ErrFileNotFound)
  437. }
  438. } else {
  439. t.Errorf("#%d: err = %v, want nil", i, err)
  440. }
  441. continue
  442. }
  443. metadata, _, entries, err := w.ReadAll()
  444. if err != nil {
  445. t.Errorf("#%d: err = %v, want nil", i, err)
  446. continue
  447. }
  448. if !bytes.Equal(metadata, []byte("metadata")) {
  449. t.Errorf("#%d: metadata = %s, want %s", i, metadata, "metadata")
  450. }
  451. for j, e := range entries {
  452. if e.Index != uint64(j+i+1) {
  453. t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
  454. }
  455. }
  456. w.Close()
  457. }
  458. }
  459. func TestOpenAtUncommittedIndex(t *testing.T) {
  460. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  461. if err != nil {
  462. t.Fatal(err)
  463. }
  464. defer os.RemoveAll(p)
  465. w, err := Create(p, nil)
  466. if err != nil {
  467. t.Fatal(err)
  468. }
  469. if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
  470. t.Fatal(err)
  471. }
  472. if err = w.Save(raftpb.HardState{}, []raftpb.Entry{{Index: 0}}); err != nil {
  473. t.Fatal(err)
  474. }
  475. w.Close()
  476. w, err = Open(p, walpb.Snapshot{})
  477. if err != nil {
  478. t.Fatal(err)
  479. }
  480. // commit up to index 0, try to read index 1
  481. if _, _, _, err = w.ReadAll(); err != nil {
  482. t.Errorf("err = %v, want nil", err)
  483. }
  484. w.Close()
  485. }
  486. // TestOpenForRead tests that OpenForRead can load all files.
  487. // The tests creates WAL directory, and cut out multiple WAL files. Then
  488. // it releases the lock of part of data, and excepts that OpenForRead
  489. // can read out all files even if some are locked for write.
  490. func TestOpenForRead(t *testing.T) {
  491. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  492. if err != nil {
  493. t.Fatal(err)
  494. }
  495. defer os.RemoveAll(p)
  496. // create WAL
  497. w, err := Create(p, nil)
  498. if err != nil {
  499. t.Fatal(err)
  500. }
  501. defer w.Close()
  502. // make 10 separate files
  503. for i := 0; i < 10; i++ {
  504. es := []raftpb.Entry{{Index: uint64(i)}}
  505. if err = w.Save(raftpb.HardState{}, es); err != nil {
  506. t.Fatal(err)
  507. }
  508. if err = w.cut(); err != nil {
  509. t.Fatal(err)
  510. }
  511. }
  512. // release the lock to 5
  513. unlockIndex := uint64(5)
  514. w.ReleaseLockTo(unlockIndex)
  515. // All are available for read
  516. w2, err := OpenForRead(p, walpb.Snapshot{})
  517. if err != nil {
  518. t.Fatal(err)
  519. }
  520. defer w2.Close()
  521. _, _, ents, err := w2.ReadAll()
  522. if err != nil {
  523. t.Fatalf("err = %v, want nil", err)
  524. }
  525. if g := ents[len(ents)-1].Index; g != 9 {
  526. t.Errorf("last index read = %d, want %d", g, 9)
  527. }
  528. }
  529. func TestSaveEmpty(t *testing.T) {
  530. var buf bytes.Buffer
  531. var est raftpb.HardState
  532. w := WAL{
  533. encoder: newEncoder(&buf, 0, 0),
  534. }
  535. if err := w.saveState(&est); err != nil {
  536. t.Errorf("err = %v, want nil", err)
  537. }
  538. if len(buf.Bytes()) != 0 {
  539. t.Errorf("buf.Bytes = %d, want 0", len(buf.Bytes()))
  540. }
  541. }
  542. func TestReleaseLockTo(t *testing.T) {
  543. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  544. if err != nil {
  545. t.Fatal(err)
  546. }
  547. defer os.RemoveAll(p)
  548. // create WAL
  549. w, err := Create(p, nil)
  550. defer func() {
  551. if err = w.Close(); err != nil {
  552. t.Fatal(err)
  553. }
  554. }()
  555. if err != nil {
  556. t.Fatal(err)
  557. }
  558. // release nothing if no files
  559. err = w.ReleaseLockTo(10)
  560. if err != nil {
  561. t.Errorf("err = %v, want nil", err)
  562. }
  563. // make 10 separate files
  564. for i := 0; i < 10; i++ {
  565. es := []raftpb.Entry{{Index: uint64(i)}}
  566. if err = w.Save(raftpb.HardState{}, es); err != nil {
  567. t.Fatal(err)
  568. }
  569. if err = w.cut(); err != nil {
  570. t.Fatal(err)
  571. }
  572. }
  573. // release the lock to 5
  574. unlockIndex := uint64(5)
  575. w.ReleaseLockTo(unlockIndex)
  576. // expected remaining are 4,5,6,7,8,9,10
  577. if len(w.locks) != 7 {
  578. t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 7)
  579. }
  580. for i, l := range w.locks {
  581. var lockIndex uint64
  582. _, lockIndex, err = parseWalName(filepath.Base(l.Name()))
  583. if err != nil {
  584. t.Fatal(err)
  585. }
  586. if lockIndex != uint64(i+4) {
  587. t.Errorf("#%d: lockindex = %d, want %d", i, lockIndex, uint64(i+4))
  588. }
  589. }
  590. // release the lock to 15
  591. unlockIndex = uint64(15)
  592. w.ReleaseLockTo(unlockIndex)
  593. // expected remaining is 10
  594. if len(w.locks) != 1 {
  595. t.Errorf("len(w.locks) = %d, want %d", len(w.locks), 1)
  596. }
  597. _, lockIndex, err := parseWalName(filepath.Base(w.locks[0].Name()))
  598. if err != nil {
  599. t.Fatal(err)
  600. }
  601. if lockIndex != uint64(10) {
  602. t.Errorf("lockindex = %d, want %d", lockIndex, 10)
  603. }
  604. }
  605. // TestTailWriteNoSlackSpace ensures that tail writes append if there's no preallocated space.
  606. func TestTailWriteNoSlackSpace(t *testing.T) {
  607. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  608. if err != nil {
  609. t.Fatal(err)
  610. }
  611. defer os.RemoveAll(p)
  612. // create initial WAL
  613. w, err := Create(p, []byte("metadata"))
  614. if err != nil {
  615. t.Fatal(err)
  616. }
  617. // write some entries
  618. for i := 1; i <= 5; i++ {
  619. es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
  620. if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
  621. t.Fatal(err)
  622. }
  623. }
  624. // get rid of slack space by truncating file
  625. off, serr := w.tail().Seek(0, io.SeekCurrent)
  626. if serr != nil {
  627. t.Fatal(serr)
  628. }
  629. if terr := w.tail().Truncate(off); terr != nil {
  630. t.Fatal(terr)
  631. }
  632. w.Close()
  633. // open, write more
  634. w, err = Open(p, walpb.Snapshot{})
  635. if err != nil {
  636. t.Fatal(err)
  637. }
  638. _, _, ents, rerr := w.ReadAll()
  639. if rerr != nil {
  640. t.Fatal(rerr)
  641. }
  642. if len(ents) != 5 {
  643. t.Fatalf("got entries %+v, expected 5 entries", ents)
  644. }
  645. // write more entries
  646. for i := 6; i <= 10; i++ {
  647. es := []raftpb.Entry{{Index: uint64(i), Term: 1, Data: []byte{byte(i)}}}
  648. if err = w.Save(raftpb.HardState{Term: 1}, es); err != nil {
  649. t.Fatal(err)
  650. }
  651. }
  652. w.Close()
  653. // confirm all writes
  654. w, err = Open(p, walpb.Snapshot{})
  655. if err != nil {
  656. t.Fatal(err)
  657. }
  658. _, _, ents, rerr = w.ReadAll()
  659. if rerr != nil {
  660. t.Fatal(rerr)
  661. }
  662. if len(ents) != 10 {
  663. t.Fatalf("got entries %+v, expected 10 entries", ents)
  664. }
  665. w.Close()
  666. }
  667. // TestRestartCreateWal ensures that an interrupted WAL initialization is clobbered on restart
  668. func TestRestartCreateWal(t *testing.T) {
  669. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  670. if err != nil {
  671. t.Fatal(err)
  672. }
  673. defer os.RemoveAll(p)
  674. // make temporary directory so it looks like initialization is interrupted
  675. tmpdir := filepath.Clean(p) + ".tmp"
  676. if err = os.Mkdir(tmpdir, fileutil.PrivateDirMode); err != nil {
  677. t.Fatal(err)
  678. }
  679. if _, err = os.OpenFile(filepath.Join(tmpdir, "test"), os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode); err != nil {
  680. t.Fatal(err)
  681. }
  682. w, werr := Create(p, []byte("abc"))
  683. if werr != nil {
  684. t.Fatal(werr)
  685. }
  686. w.Close()
  687. if Exist(tmpdir) {
  688. t.Fatalf("got %q exists, expected it to not exist", tmpdir)
  689. }
  690. if w, err = OpenForRead(p, walpb.Snapshot{}); err != nil {
  691. t.Fatal(err)
  692. }
  693. defer w.Close()
  694. if meta, _, _, rerr := w.ReadAll(); rerr != nil || string(meta) != "abc" {
  695. t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
  696. }
  697. }
  698. // TestOpenOnTornWrite ensures that entries past the torn write are truncated.
  699. func TestOpenOnTornWrite(t *testing.T) {
  700. maxEntries := 40
  701. clobberIdx := 20
  702. overwriteEntries := 5
  703. p, err := ioutil.TempDir(os.TempDir(), "waltest")
  704. if err != nil {
  705. t.Fatal(err)
  706. }
  707. defer os.RemoveAll(p)
  708. w, err := Create(p, nil)
  709. defer func() {
  710. if err = w.Close(); err != nil && err != os.ErrInvalid {
  711. t.Fatal(err)
  712. }
  713. }()
  714. if err != nil {
  715. t.Fatal(err)
  716. }
  717. // get offset of end of each saved entry
  718. offsets := make([]int64, maxEntries)
  719. for i := range offsets {
  720. es := []raftpb.Entry{{Index: uint64(i)}}
  721. if err = w.Save(raftpb.HardState{}, es); err != nil {
  722. t.Fatal(err)
  723. }
  724. if offsets[i], err = w.tail().Seek(0, io.SeekCurrent); err != nil {
  725. t.Fatal(err)
  726. }
  727. }
  728. fn := filepath.Join(p, filepath.Base(w.tail().Name()))
  729. w.Close()
  730. // clobber some entry with 0's to simulate a torn write
  731. f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode)
  732. if ferr != nil {
  733. t.Fatal(ferr)
  734. }
  735. defer f.Close()
  736. _, err = f.Seek(offsets[clobberIdx], io.SeekStart)
  737. if err != nil {
  738. t.Fatal(err)
  739. }
  740. zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx])
  741. _, err = f.Write(zeros)
  742. if err != nil {
  743. t.Fatal(err)
  744. }
  745. f.Close()
  746. w, err = Open(p, walpb.Snapshot{})
  747. if err != nil {
  748. t.Fatal(err)
  749. }
  750. // seek up to clobbered entry
  751. _, _, _, err = w.ReadAll()
  752. if err != nil {
  753. t.Fatal(err)
  754. }
  755. // write a few entries past the clobbered entry
  756. for i := 0; i < overwriteEntries; i++ {
  757. // Index is different from old, truncated entries
  758. es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}}
  759. if err = w.Save(raftpb.HardState{}, es); err != nil {
  760. t.Fatal(err)
  761. }
  762. }
  763. w.Close()
  764. // read back the entries, confirm number of entries matches expectation
  765. w, err = OpenForRead(p, walpb.Snapshot{})
  766. if err != nil {
  767. t.Fatal(err)
  768. }
  769. _, _, ents, rerr := w.ReadAll()
  770. if rerr != nil {
  771. // CRC error? the old entries were likely never truncated away
  772. t.Fatal(rerr)
  773. }
  774. wEntries := (clobberIdx - 1) + overwriteEntries
  775. if len(ents) != wEntries {
  776. t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
  777. }
  778. }