123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571 |
- package rpl
- import (
- "bytes"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "path"
- "sync"
- "time"
- "github.com/siddontang/go/log"
- "github.com/siddontang/go/sync2"
- )
- var (
- magic = []byte("\x1c\x1d\xb8\x88\xff\x9e\x45\x55\x40\xf0\x4c\xda\xe0\xce\x47\xde\x65\x48\x71\x17")
- errTableNeedFlush = errors.New("write table need flush")
- errNilHandler = errors.New("nil write handler")
- )
- const tableReaderKeepaliveInterval int64 = 30
- func fmtTableDataName(base string, index int64) string {
- return path.Join(base, fmt.Sprintf("%08d.data", index))
- }
- func fmtTableMetaName(base string, index int64) string {
- return path.Join(base, fmt.Sprintf("%08d.meta", index))
- }
- type tableReader struct {
- sync.Mutex
- base string
- index int64
- data readFile
- meta readFile
- first uint64
- last uint64
- lastTime uint32
- lastReadTime sync2.AtomicInt64
- useMmap bool
- }
- func newTableReader(base string, index int64, useMmap bool) (*tableReader, error) {
- if index <= 0 {
- return nil, fmt.Errorf("invalid index %d", index)
- }
- t := new(tableReader)
- t.base = base
- t.index = index
- t.useMmap = useMmap
- var err error
- if err = t.check(); err != nil {
- log.Errorf("check %d error: %s, try to repair", t.index, err.Error())
- if err = t.repair(); err != nil {
- log.Errorf("repair %d error: %s", t.index, err.Error())
- return nil, err
- }
- }
- t.close()
- return t, nil
- }
- func (t *tableReader) String() string {
- return fmt.Sprintf("%d", t.index)
- }
- func (t *tableReader) Close() {
- t.Lock()
- t.close()
- t.Unlock()
- }
- func (t *tableReader) close() {
- if t.data != nil {
- t.data.Close()
- t.data = nil
- }
- if t.meta != nil {
- t.meta.Close()
- t.meta = nil
- }
- }
- func (t *tableReader) Keepalived() bool {
- l := t.lastReadTime.Get()
- if l > 0 && time.Now().Unix()-l > tableReaderKeepaliveInterval {
- return false
- }
- return true
- }
- func (t *tableReader) getLogPos(index int) (uint32, error) {
- var buf [4]byte
- if _, err := t.meta.ReadAt(buf[0:4], int64(index)*4); err != nil {
- return 0, err
- }
- return binary.BigEndian.Uint32(buf[0:4]), nil
- }
- func (t *tableReader) checkData() error {
- var err error
- //check will use raw file mode
- if t.data, err = newReadFile(false, fmtTableDataName(t.base, t.index)); err != nil {
- return err
- }
- if t.data.Size() < len(magic) {
- return fmt.Errorf("data file %s size %d too short", t.data.Name(), t.data.Size())
- }
- buf := make([]byte, len(magic))
- if _, err := t.data.ReadAt(buf, int64(t.data.Size()-len(magic))); err != nil {
- return err
- }
- if !bytes.Equal(magic, buf) {
- return fmt.Errorf("data file %s invalid magic data %q", t.data.Name(), buf)
- }
- return nil
- }
- func (t *tableReader) checkMeta() error {
- var err error
- //check will use raw file mode
- if t.meta, err = newReadFile(false, fmtTableMetaName(t.base, t.index)); err != nil {
- return err
- }
- if t.meta.Size()%4 != 0 || t.meta.Size() == 0 {
- return fmt.Errorf("meta file %s invalid offset len %d, must 4 multiple and not 0", t.meta.Name(), t.meta.Size())
- }
- return nil
- }
- func (t *tableReader) check() error {
- var err error
- if err := t.checkMeta(); err != nil {
- return err
- }
- if err := t.checkData(); err != nil {
- return err
- }
- firstLogPos, _ := t.getLogPos(0)
- lastLogPos, _ := t.getLogPos(t.meta.Size()/4 - 1)
- if firstLogPos != 0 {
- return fmt.Errorf("invalid first log pos %d, must 0", firstLogPos)
- }
- var l Log
- if _, err = t.decodeLogHead(&l, t.data, int64(firstLogPos)); err != nil {
- return fmt.Errorf("decode first log err %s", err.Error())
- }
- t.first = l.ID
- var n int64
- if n, err = t.decodeLogHead(&l, t.data, int64(lastLogPos)); err != nil {
- return fmt.Errorf("decode last log err %s", err.Error())
- } else if n+int64(len(magic)) != int64(t.data.Size()) {
- return fmt.Errorf("extra log data at offset %d", n)
- }
- t.last = l.ID
- t.lastTime = l.CreateTime
- if t.first > t.last {
- return fmt.Errorf("invalid log table first %d > last %d", t.first, t.last)
- } else if (t.last - t.first + 1) != uint64(t.meta.Size()/4) {
- return fmt.Errorf("invalid log table, first %d, last %d, and log num %d", t.first, t.last, t.meta.Size()/4)
- }
- return nil
- }
- func (t *tableReader) repair() error {
- t.close()
- var err error
- var data writeFile
- var meta writeFile
- //repair will use raw file mode
- data, err = newWriteFile(false, fmtTableDataName(t.base, t.index), 0)
- data.SetOffset(int64(data.Size()))
- meta, err = newWriteFile(false, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4))
- var l Log
- var pos int64 = 0
- var nextPos int64 = 0
- b := make([]byte, 4)
- t.first = 0
- t.last = 0
- for {
- nextPos, err = t.decodeLogHead(&l, data, pos)
- if err != nil {
- //if error, we may lost all logs from pos
- log.Errorf("%s may lost logs from %d", data.Name(), pos)
- break
- }
- if l.ID == 0 {
- log.Errorf("%s may lost logs from %d, invalid log 0", data.Name(), pos)
- break
- }
- if t.first == 0 {
- t.first = l.ID
- }
- if t.last == 0 {
- t.last = l.ID
- } else if l.ID <= t.last {
- log.Errorf("%s may lost logs from %d, invalid logid %d", t.data.Name(), pos, l.ID)
- break
- }
- t.last = l.ID
- t.lastTime = l.CreateTime
- binary.BigEndian.PutUint32(b, uint32(pos))
- meta.Write(b)
- pos = nextPos
- t.lastTime = l.CreateTime
- }
- var e error
- if err := meta.Close(); err != nil {
- e = err
- }
- data.SetOffset(pos)
- if _, err = data.Write(magic); err != nil {
- log.Errorf("write magic error %s", err.Error())
- }
- if err = data.Close(); err != nil {
- return err
- }
- return e
- }
- func (t *tableReader) decodeLogHead(l *Log, r io.ReaderAt, pos int64) (int64, error) {
- dataLen, err := l.DecodeHeadAt(r, pos)
- if err != nil {
- return 0, err
- }
- return pos + int64(l.HeadSize()) + int64(dataLen), nil
- }
- func (t *tableReader) GetLog(id uint64, l *Log) error {
- if id < t.first || id > t.last {
- return ErrLogNotFound
- }
- t.lastReadTime.Set(time.Now().Unix())
- t.Lock()
- if err := t.openTable(); err != nil {
- t.close()
- t.Unlock()
- return err
- }
- t.Unlock()
- pos, err := t.getLogPos(int(id - t.first))
- if err != nil {
- return err
- }
- if err := l.DecodeAt(t.data, int64(pos)); err != nil {
- return err
- } else if l.ID != id {
- return fmt.Errorf("invalid log id %d != %d", l.ID, id)
- }
- return nil
- }
- func (t *tableReader) openTable() error {
- var err error
- if t.data == nil {
- if t.data, err = newReadFile(t.useMmap, fmtTableDataName(t.base, t.index)); err != nil {
- return err
- }
- }
- if t.meta == nil {
- if t.meta, err = newReadFile(t.useMmap, fmtTableMetaName(t.base, t.index)); err != nil {
- return err
- }
- }
- return nil
- }
- type tableWriter struct {
- sync.RWMutex
- data writeFile
- meta writeFile
- base string
- index int64
- first uint64
- last uint64
- lastTime uint32
- maxLogSize int64
- closed bool
- syncType int
- posBuf []byte
- useMmap bool
- }
- func newTableWriter(base string, index int64, maxLogSize int64, useMmap bool) *tableWriter {
- if index <= 0 {
- panic(fmt.Errorf("invalid index %d", index))
- }
- t := new(tableWriter)
- t.base = base
- t.index = index
- t.maxLogSize = maxLogSize
- t.closed = false
- t.posBuf = make([]byte, 4)
- t.useMmap = useMmap
- return t
- }
- func (t *tableWriter) String() string {
- return fmt.Sprintf("%d", t.index)
- }
- func (t *tableWriter) SetMaxLogSize(s int64) {
- t.maxLogSize = s
- }
- func (t *tableWriter) SetSyncType(tp int) {
- t.syncType = tp
- }
- func (t *tableWriter) close() {
- if t.meta != nil {
- if err := t.meta.Close(); err != nil {
- log.Fatalf("close log meta error %s", err.Error())
- }
- t.meta = nil
- }
- if t.data != nil {
- if _, err := t.data.Write(magic); err != nil {
- log.Fatalf("write magic error %s", err.Error())
- }
- if err := t.data.Close(); err != nil {
- log.Fatalf("close log data error %s", err.Error())
- }
- t.data = nil
- }
- }
- func (t *tableWriter) Close() {
- t.Lock()
- t.closed = true
- t.close()
- t.Unlock()
- }
- func (t *tableWriter) First() uint64 {
- t.Lock()
- id := t.first
- t.Unlock()
- return id
- }
- func (t *tableWriter) Last() uint64 {
- t.Lock()
- id := t.last
- t.Unlock()
- return id
- }
- func (t *tableWriter) Flush() (*tableReader, error) {
- t.Lock()
- if t.data == nil || t.meta == nil {
- t.Unlock()
- return nil, errNilHandler
- }
- tr := new(tableReader)
- tr.base = t.base
- tr.index = t.index
- tr.first = t.first
- tr.last = t.last
- tr.lastTime = t.lastTime
- tr.useMmap = t.useMmap
- t.close()
- t.first = 0
- t.last = 0
- t.index = t.index + 1
- t.Unlock()
- return tr, nil
- }
- func (t *tableWriter) StoreLog(l *Log) error {
- t.Lock()
- err := t.storeLog(l)
- t.Unlock()
- return err
- }
- func (t *tableWriter) openFile() error {
- var err error
- if t.data == nil {
- if t.data, err = newWriteFile(t.useMmap, fmtTableDataName(t.base, t.index), t.maxLogSize+t.maxLogSize/10+int64(len(magic))); err != nil {
- return err
- }
- }
- if t.meta == nil {
- if t.meta, err = newWriteFile(t.useMmap, fmtTableMetaName(t.base, t.index), int64(defaultLogNumInFile*4)); err != nil {
- return err
- }
- }
- return err
- }
- func (t *tableWriter) storeLog(l *Log) error {
- if l.ID == 0 {
- return ErrStoreLogID
- }
- if t.closed {
- return fmt.Errorf("table writer is closed")
- }
- if t.last > 0 && l.ID != t.last+1 {
- return ErrStoreLogID
- }
- if t.data != nil && t.data.Offset() > t.maxLogSize {
- return errTableNeedFlush
- }
- var err error
- if err = t.openFile(); err != nil {
- return err
- }
- offsetPos := t.data.Offset()
- if err = l.Encode(t.data); err != nil {
- return err
- }
- binary.BigEndian.PutUint32(t.posBuf, uint32(offsetPos))
- if _, err = t.meta.Write(t.posBuf); err != nil {
- return err
- }
- if t.first == 0 {
- t.first = l.ID
- }
- t.last = l.ID
- t.lastTime = l.CreateTime
- if t.syncType == 2 {
- if err := t.data.Sync(); err != nil {
- log.Errorf("sync table error %s", err.Error())
- }
- }
- return nil
- }
- func (t *tableWriter) GetLog(id uint64, l *Log) error {
- t.RLock()
- defer t.RUnlock()
- if id < t.first || id > t.last {
- return ErrLogNotFound
- }
- var buf [4]byte
- if _, err := t.meta.ReadAt(buf[0:4], int64((id-t.first)*4)); err != nil {
- return err
- }
- offset := binary.BigEndian.Uint32(buf[0:4])
- if err := l.DecodeAt(t.data, int64(offset)); err != nil {
- return err
- } else if l.ID != id {
- return fmt.Errorf("invalid log id %d != %d", id, l.ID)
- }
- return nil
- }
- func (t *tableWriter) Sync() error {
- t.Lock()
- var err error
- if t.data != nil {
- err = t.data.Sync()
- t.Unlock()
- return err
- }
- if t.meta != nil {
- err = t.meta.Sync()
- }
- t.Unlock()
- return err
- }
|