rpl.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. package rpl
  2. import (
  3. "encoding/binary"
  4. "os"
  5. "path"
  6. "sync"
  7. "time"
  8. "github.com/siddontang/go/log"
  9. "github.com/siddontang/go/snappy"
  10. "github.com/siddontang/ledisdb/config"
  11. )
  12. type Stat struct {
  13. FirstID uint64
  14. LastID uint64
  15. CommitID uint64
  16. }
  17. type Replication struct {
  18. m sync.Mutex
  19. cfg *config.Config
  20. s LogStore
  21. commitID uint64
  22. commitLog *os.File
  23. quit chan struct{}
  24. wg sync.WaitGroup
  25. nc chan struct{}
  26. ncm sync.Mutex
  27. }
  28. func NewReplication(cfg *config.Config) (*Replication, error) {
  29. if len(cfg.Replication.Path) == 0 {
  30. cfg.Replication.Path = path.Join(cfg.DataDir, "rpl")
  31. }
  32. base := cfg.Replication.Path
  33. r := new(Replication)
  34. r.quit = make(chan struct{})
  35. r.nc = make(chan struct{})
  36. r.cfg = cfg
  37. var err error
  38. switch cfg.Replication.StoreName {
  39. case "goleveldb":
  40. if r.s, err = NewGoLevelDBStore(path.Join(base, "wal"), cfg.Replication.SyncLog); err != nil {
  41. return nil, err
  42. }
  43. default:
  44. if r.s, err = NewFileStore(path.Join(base, "ldb"), cfg); err != nil {
  45. return nil, err
  46. }
  47. }
  48. if r.commitLog, err = os.OpenFile(path.Join(base, "commit.log"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
  49. return nil, err
  50. }
  51. if s, _ := r.commitLog.Stat(); s.Size() == 0 {
  52. r.commitID = 0
  53. } else if err = binary.Read(r.commitLog, binary.BigEndian, &r.commitID); err != nil {
  54. return nil, err
  55. }
  56. log.Infof("staring replication with commit ID %d", r.commitID)
  57. r.wg.Add(1)
  58. go r.run()
  59. return r, nil
  60. }
  61. func (r *Replication) Close() error {
  62. close(r.quit)
  63. r.wg.Wait()
  64. r.m.Lock()
  65. defer r.m.Unlock()
  66. log.Infof("closing replication with commit ID %d", r.commitID)
  67. if r.s != nil {
  68. r.s.Close()
  69. r.s = nil
  70. }
  71. if err := r.updateCommitID(r.commitID, true); err != nil {
  72. log.Errorf("update commit id err %s", err.Error())
  73. }
  74. if r.commitLog != nil {
  75. r.commitLog.Close()
  76. r.commitLog = nil
  77. }
  78. return nil
  79. }
  80. func (r *Replication) Log(data []byte) (*Log, error) {
  81. if r.cfg.Replication.Compression {
  82. //todo optimize
  83. var err error
  84. if data, err = snappy.Encode(nil, data); err != nil {
  85. return nil, err
  86. }
  87. }
  88. r.m.Lock()
  89. lastID, err := r.s.LastID()
  90. if err != nil {
  91. r.m.Unlock()
  92. return nil, err
  93. }
  94. commitId := r.commitID
  95. if lastID < commitId {
  96. lastID = commitId
  97. } else if lastID > commitId {
  98. r.m.Unlock()
  99. return nil, ErrCommitIDBehind
  100. }
  101. l := new(Log)
  102. l.ID = lastID + 1
  103. l.CreateTime = uint32(time.Now().Unix())
  104. if r.cfg.Replication.Compression {
  105. l.Compression = 1
  106. } else {
  107. l.Compression = 0
  108. }
  109. l.Data = data
  110. if err = r.s.StoreLog(l); err != nil {
  111. r.m.Unlock()
  112. return nil, err
  113. }
  114. r.m.Unlock()
  115. r.ncm.Lock()
  116. close(r.nc)
  117. r.nc = make(chan struct{})
  118. r.ncm.Unlock()
  119. return l, nil
  120. }
  121. func (r *Replication) WaitLog() <-chan struct{} {
  122. r.ncm.Lock()
  123. ch := r.nc
  124. r.ncm.Unlock()
  125. return ch
  126. }
  127. func (r *Replication) StoreLog(log *Log) error {
  128. r.m.Lock()
  129. err := r.s.StoreLog(log)
  130. r.m.Unlock()
  131. return err
  132. }
  133. func (r *Replication) FirstLogID() (uint64, error) {
  134. r.m.Lock()
  135. id, err := r.s.FirstID()
  136. r.m.Unlock()
  137. return id, err
  138. }
  139. func (r *Replication) LastLogID() (uint64, error) {
  140. r.m.Lock()
  141. id, err := r.s.LastID()
  142. r.m.Unlock()
  143. return id, err
  144. }
  145. func (r *Replication) LastCommitID() (uint64, error) {
  146. r.m.Lock()
  147. id := r.commitID
  148. r.m.Unlock()
  149. return id, nil
  150. }
  151. func (r *Replication) UpdateCommitID(id uint64) error {
  152. r.m.Lock()
  153. err := r.updateCommitID(id, r.cfg.Replication.SyncLog == 2)
  154. r.m.Unlock()
  155. return err
  156. }
  157. func (r *Replication) Stat() (*Stat, error) {
  158. r.m.Lock()
  159. defer r.m.Unlock()
  160. s := &Stat{}
  161. var err error
  162. if s.FirstID, err = r.s.FirstID(); err != nil {
  163. return nil, err
  164. }
  165. if s.LastID, err = r.s.LastID(); err != nil {
  166. return nil, err
  167. }
  168. s.CommitID = r.commitID
  169. return s, nil
  170. }
  171. func (r *Replication) updateCommitID(id uint64, force bool) error {
  172. if force {
  173. if _, err := r.commitLog.Seek(0, os.SEEK_SET); err != nil {
  174. return err
  175. }
  176. if err := binary.Write(r.commitLog, binary.BigEndian, id); err != nil {
  177. return err
  178. }
  179. }
  180. r.commitID = id
  181. return nil
  182. }
  183. func (r *Replication) CommitIDBehind() (bool, error) {
  184. r.m.Lock()
  185. id, err := r.s.LastID()
  186. if err != nil {
  187. r.m.Unlock()
  188. return false, err
  189. }
  190. behind := id > r.commitID
  191. r.m.Unlock()
  192. return behind, nil
  193. }
  194. func (r *Replication) GetLog(id uint64, log *Log) error {
  195. return r.s.GetLog(id, log)
  196. }
  197. func (r *Replication) NextNeedCommitLog(log *Log) error {
  198. r.m.Lock()
  199. defer r.m.Unlock()
  200. id, err := r.s.LastID()
  201. if err != nil {
  202. return err
  203. }
  204. if id <= r.commitID {
  205. return ErrNoBehindLog
  206. }
  207. return r.s.GetLog(r.commitID+1, log)
  208. }
  209. func (r *Replication) Clear() error {
  210. return r.ClearWithCommitID(0)
  211. }
  212. func (r *Replication) ClearWithCommitID(id uint64) error {
  213. r.m.Lock()
  214. defer r.m.Unlock()
  215. if err := r.s.Clear(); err != nil {
  216. return err
  217. }
  218. return r.updateCommitID(id, true)
  219. }
  220. func (r *Replication) run() {
  221. defer r.wg.Done()
  222. syncTc := time.NewTicker(1 * time.Second)
  223. purgeTc := time.NewTicker(1 * time.Hour)
  224. for {
  225. select {
  226. case <-purgeTc.C:
  227. n := (r.cfg.Replication.ExpiredLogDays * 24 * 3600)
  228. r.m.Lock()
  229. err := r.s.PurgeExpired(int64(n))
  230. r.m.Unlock()
  231. if err != nil {
  232. log.Errorf("purge expired log error %s", err.Error())
  233. }
  234. case <-syncTc.C:
  235. if r.cfg.Replication.SyncLog == 1 {
  236. r.m.Lock()
  237. err := r.s.Sync()
  238. r.m.Unlock()
  239. if err != nil {
  240. log.Errorf("sync store error %s", err.Error())
  241. }
  242. }
  243. if r.cfg.Replication.SyncLog != 2 {
  244. //we will sync commit id every 1 second
  245. r.m.Lock()
  246. err := r.updateCommitID(r.commitID, true)
  247. r.m.Unlock()
  248. if err != nil {
  249. log.Errorf("sync commitid error %s", err.Error())
  250. }
  251. }
  252. case <-r.quit:
  253. syncTc.Stop()
  254. purgeTc.Stop()
  255. return
  256. }
  257. }
  258. }