file_table.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. package rpl
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "path"
  9. "sync"
  10. "time"
  11. "github.com/siddontang/go/log"
  12. "github.com/siddontang/go/sync2"
  13. )
  14. var (
  15. magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17")
  16. errTableNeedFlush = errors.New("write table need flush")
  17. errNilHandler = errors.New("nil write handler")
  18. )
  19. const tableReaderKeepaliveInterval int64 = 30
  20. func fmtTableDataName(base string, index int64) string {
  21. return path.Join(base, fmt.Sprintf("%08d.data", index))
  22. }
  23. func fmtTableMetaName(base string, index int64) string {
  24. return path.Join(base, fmt.Sprintf("%08d.meta", index))
  25. }
  26. type tableReader struct {
  27. sync.Mutex
  28. base string
  29. index int64
  30. data readFile
  31. meta readFile
  32. first uint64
  33. last uint64
  34. lastTime uint32
  35. lastReadTime sync2.AtomicInt64
  36. useMmap bool
  37. }
  38. func newTableReader(base string, index int64, useMmap bool) (*tableReader, error) {
  39. if index <= 0 {
  40. return nil, fmt.Errorf("invalid index %d", index)
  41. }
  42. t := new(tableReader)
  43. t.base = base
  44. t.index = index
  45. t.useMmap = useMmap
  46. var err error
  47. if err = t.check(); err != nil {
  48. log.Errorf("check %d error: %s, try to repair", t.index, err.Error())
  49. if err = t.repair(); err != nil {
  50. log.Errorf("repair %d error: %s", t.index, err.Error())
  51. return nil, err
  52. }
  53. }
  54. t.close()
  55. return t, nil
  56. }
  57. func (t *tableReader) String() string {
  58. return fmt.Sprintf("%d", t.index)
  59. }
  60. func (t *tableReader) Close() {
  61. t.Lock()
  62. t.close()
  63. t.Unlock()
  64. }
  65. func (t *tableReader) close() {
  66. if t.data != nil {
  67. t.data.Close()
  68. t.data = nil
  69. }
  70. if t.meta != nil {
  71. t.meta.Close()
  72. t.meta = nil
  73. }
  74. }
  75. func (t *tableReader) Keepalived() bool {
  76. l := t.lastReadTime.Get()
  77. if l > 0 && time.Now().Unix()-l > tableReaderKeepaliveInterval {
  78. return false
  79. }
  80. return true
  81. }
  82. func (t *tableReader) getLogPos(index int) (uint32, error) {
  83. var buf [4]byte
  84. if _, err := t.meta.ReadAt(buf[0:4], int64(index)*4); err != nil {
  85. return 0, err
  86. }
  87. return binary.BigEndian.Uint32(buf[0:4]), nil
  88. }
  89. func (t *tableReader) checkData() error {
  90. var err error
  91. //check will use raw file mode
  92. if t.data, err = newReadFile(false, fmtTableDataName(t.base, t.index)); err != nil {
  93. return err
  94. }
  95. if t.data.Size() < len(magic) {
  96. return fmt.Errorf("data file %s size %d too short", t.data.Name(), t.data.Size())
  97. }
  98. buf := make([]byte, len(magic))
  99. if _, err := t.data.ReadAt(buf, int64(t.data.Size()-len(magic))); err != nil {
  100. return err
  101. }
  102. if !bytes.Equal(magic, buf) {
  103. return fmt.Errorf("data file %s invalid magic data %q", t.data.Name(), buf)
  104. }
  105. return nil
  106. }
  107. func (t *tableReader) checkMeta() error {
  108. var err error
  109. //check will use raw file mode
  110. if t.meta, err = newReadFile(false, fmtTableMetaName(t.base, t.index)); err != nil {
  111. return err
  112. }
  113. if t.meta.Size()%4 != 0 || t.meta.Size() == 0 {
  114. return fmt.Errorf("meta file %s invalid offset len %d, must 4 multiple and not 0", t.meta.Name(), t.meta.Size())
  115. }
  116. return nil
  117. }
  118. func (t *tableReader) check() error {
  119. var err error
  120. if err := t.checkMeta(); err != nil {
  121. return err
  122. }
  123. if err := t.checkData(); err != nil {
  124. return err
  125. }
  126. firstLogPos, _ := t.getLogPos(0)
  127. lastLogPos, _ := t.getLogPos(t.meta.Size()/4 - 1)
  128. if firstLogPos != 0 {
  129. return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos)
  130. }
  131. var l Log
  132. if _, err = t.decodeLogHead(&l, t.data, int64(firstLogPos)); err != nil {
  133. return fmt.Errorf("decode first log err %s", err.Error())
  134. }
  135. t.first = l.ID
  136. var n int64
  137. if n, err = t.decodeLogHead(&l, t.data, int64(lastLogPos)); err != nil {
  138. return fmt.Errorf("decode last log err %s", err.Error())
  139. } else if n+int64(len(magic)) != int64(t.data.Size()) {
  140. return fmt.Errorf("extra log data at offset %d", n)
  141. }
  142. t.last = l.ID
  143. t.lastTime = l.CreateTime
  144. if t.first > t.last {
  145. return fmt.Errorf("invalid log table first %d > last %d", t.first, t.last)
  146. } else if (t.last - t.first + 1) != uint64(t.meta.Size()/4) {
  147. return fmt.Errorf("invalid log table, first %d, last %d, and log num %d", t.first, t.last, t.meta.Size()/4)
  148. }
  149. return nil
  150. }
  151. func (t *tableReader) repair() error {
  152. t.close()
  153. var err error
  154. var data writeFile
  155. var meta writeFile
  156. //repair will use raw file mode
  157. data, err = newWriteFile(false, fmtTableDataName(t.base, t.index), 0)
  158. data.SetOffset(int64(data.Size()))
  159. meta, err = newWriteFile(false, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4))
  160. var l Log
  161. var pos int64 = 0
  162. var nextPos int64 = 0
  163. b := make([]byte, 4)
  164. t.first = 0
  165. t.last = 0
  166. for {
  167. nextPos, err = t.decodeLogHead(&l, data, pos)
  168. if err != nil {
  169. //if error, we may lost all logs from pos
  170. log.Errorf("%s may lost logs from %d", data.Name(), pos)
  171. break
  172. }
  173. if l.ID == 0 {
  174. log.Errorf("%s may lost logs from %d, invalid log 0", data.Name(), pos)
  175. break
  176. }
  177. if t.first == 0 {
  178. t.first = l.ID
  179. }
  180. if t.last == 0 {
  181. t.last = l.ID
  182. } else if l.ID <= t.last {
  183. log.Errorf("%s may lost logs from %d, invalid logid %d", t.data.Name(), pos, l.ID)
  184. break
  185. }
  186. t.last = l.ID
  187. t.lastTime = l.CreateTime
  188. binary.BigEndian.PutUint32(b, uint32(pos))
  189. meta.Write(b)
  190. pos = nextPos
  191. t.lastTime = l.CreateTime
  192. }
  193. var e error
  194. if err := meta.Close(); err != nil {
  195. e = err
  196. }
  197. data.SetOffset(pos)
  198. if _, err = data.Write(magic); err != nil {
  199. log.Errorf("write magic error %s", err.Error())
  200. }
  201. if err = data.Close(); err != nil {
  202. return err
  203. }
  204. return e
  205. }
  206. func (t *tableReader) decodeLogHead(l *Log, r io.ReaderAt, pos int64) (int64, error) {
  207. dataLen, err := l.DecodeHeadAt(r, pos)
  208. if err != nil {
  209. return 0, err
  210. }
  211. return pos + int64(l.HeadSize()) + int64(dataLen), nil
  212. }
  213. func (t *tableReader) GetLog(id uint64, l *Log) error {
  214. if id < t.first || id > t.last {
  215. return ErrLogNotFound
  216. }
  217. t.lastReadTime.Set(time.Now().Unix())
  218. t.Lock()
  219. if err := t.openTable(); err != nil {
  220. t.close()
  221. t.Unlock()
  222. return err
  223. }
  224. t.Unlock()
  225. pos, err := t.getLogPos(int(id - t.first))
  226. if err != nil {
  227. return err
  228. }
  229. if err := l.DecodeAt(t.data, int64(pos)); err != nil {
  230. return err
  231. } else if l.ID != id {
  232. return fmt.Errorf("invalid log id %d != %d", l.ID, id)
  233. }
  234. return nil
  235. }
  236. func (t *tableReader) openTable() error {
  237. var err error
  238. if t.data == nil {
  239. if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil {
  240. return err
  241. }
  242. }
  243. if t.meta == nil {
  244. if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil {
  245. return err
  246. }
  247. }
  248. return nil
  249. }
  250. type tableWriter struct {
  251. sync.RWMutex
  252. data writeFile
  253. meta writeFile
  254. base string
  255. index int64
  256. first uint64
  257. last uint64
  258. lastTime uint32
  259. maxLogSize int64
  260. closed bool
  261. syncType int
  262. posBuf []byte
  263. useMmap bool
  264. }
  265. func newTableWriter(base string, index int64, maxLogSize int64, useMmap bool) *tableWriter {
  266. if index <= 0 {
  267. panic(fmt.Errorf("invalid index %d", index))
  268. }
  269. t := new(tableWriter)
  270. t.base = base
  271. t.index = index
  272. t.maxLogSize = maxLogSize
  273. t.closed = false
  274. t.posBuf = make([]byte, 4)
  275. t.useMmap = useMmap
  276. return t
  277. }
  278. func (t *tableWriter) String() string {
  279. return fmt.Sprintf("%d", t.index)
  280. }
  281. func (t *tableWriter) SetMaxLogSize(s int64) {
  282. t.maxLogSize = s
  283. }
  284. func (t *tableWriter) SetSyncType(tp int) {
  285. t.syncType = tp
  286. }
  287. func (t *tableWriter) close() {
  288. if t.meta != nil {
  289. if err := t.meta.Close(); err != nil {
  290. log.Fatalf("close log meta error %s", err.Error())
  291. }
  292. t.meta = nil
  293. }
  294. if t.data != nil {
  295. if _, err := t.data.Write(magic); err != nil {
  296. log.Fatalf("write magic error %s", err.Error())
  297. }
  298. if err := t.data.Close(); err != nil {
  299. log.Fatalf("close log data error %s", err.Error())
  300. }
  301. t.data = nil
  302. }
  303. }
  304. func (t *tableWriter) Close() {
  305. t.Lock()
  306. t.closed = true
  307. t.close()
  308. t.Unlock()
  309. }
  310. func (t *tableWriter) First() uint64 {
  311. t.Lock()
  312. id := t.first
  313. t.Unlock()
  314. return id
  315. }
  316. func (t *tableWriter) Last() uint64 {
  317. t.Lock()
  318. id := t.last
  319. t.Unlock()
  320. return id
  321. }
  322. func (t *tableWriter) Flush() (*tableReader, error) {
  323. t.Lock()
  324. if t.data == nil || t.meta == nil {
  325. t.Unlock()
  326. return nil, errNilHandler
  327. }
  328. tr := new(tableReader)
  329. tr.base = t.base
  330. tr.index = t.index
  331. tr.first = t.first
  332. tr.last = t.last
  333. tr.lastTime = t.lastTime
  334. tr.useMmap = t.useMmap
  335. t.close()
  336. t.first = 0
  337. t.last = 0
  338. t.index = t.index + 1
  339. t.Unlock()
  340. return tr, nil
  341. }
  342. func (t *tableWriter) StoreLog(l *Log) error {
  343. t.Lock()
  344. err := t.storeLog(l)
  345. t.Unlock()
  346. return err
  347. }
  348. func (t *tableWriter) openFile() error {
  349. var err error
  350. if t.data == nil {
  351. if t.data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), t.maxLogSize+t.maxLogSize/10+int64(len(magic))); err != nil {
  352. return err
  353. }
  354. }
  355. if t.meta == nil {
  356. if t.meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)); err != nil {
  357. return err
  358. }
  359. }
  360. return err
  361. }
  362. func (t *tableWriter) storeLog(l *Log) error {
  363. if l.ID == 0 {
  364. return ErrStoreLogID
  365. }
  366. if t.closed {
  367. return fmt.Errorf("table writer is closed")
  368. }
  369. if t.last > 0 && l.ID != t.last+1 {
  370. return ErrStoreLogID
  371. }
  372. if t.data != nil && t.data.Offset() > t.maxLogSize {
  373. return errTableNeedFlush
  374. }
  375. var err error
  376. if err = t.openFile(); err != nil {
  377. return err
  378. }
  379. offsetPos := t.data.Offset()
  380. if err = l.Encode(t.data); err != nil {
  381. return err
  382. }
  383. binary.BigEndian.PutUint32(t.posBuf, uint32(offsetPos))
  384. if _, err = t.meta.Write(t.posBuf); err != nil {
  385. return err
  386. }
  387. if t.first == 0 {
  388. t.first = l.ID
  389. }
  390. t.last = l.ID
  391. t.lastTime = l.CreateTime
  392. if t.syncType == 2 {
  393. if err := t.data.Sync(); err != nil {
  394. log.Errorf("sync table error %s", err.Error())
  395. }
  396. }
  397. return nil
  398. }
  399. func (t *tableWriter) GetLog(id uint64, l *Log) error {
  400. t.RLock()
  401. defer t.RUnlock()
  402. if id < t.first || id > t.last {
  403. return ErrLogNotFound
  404. }
  405. var buf [4]byte
  406. if _, err := t.meta.ReadAt(buf[0:4], int64((id-t.first)*4)); err != nil {
  407. return err
  408. }
  409. offset := binary.BigEndian.Uint32(buf[0:4])
  410. if err := l.DecodeAt(t.data, int64(offset)); err != nil {
  411. return err
  412. } else if l.ID != id {
  413. return fmt.Errorf("invalid log id %d != %d", id, l.ID)
  414. }
  415. return nil
  416. }
  417. func (t *tableWriter) Sync() error {
  418. t.Lock()
  419. var err error
  420. if t.data != nil {
  421. err = t.data.Sync()
  422. t.Unlock()
  423. return err
  424. }
  425. if t.meta != nil {
  426. err = t.meta.Sync()
  427. }
  428. t.Unlock()
  429. return err
  430. }