observe.go 6.1 KB


  1. package couchbase
  2. import (
  3. "fmt"
  4. "github.com/couchbase/goutils/logging"
  5. "sync"
  6. )
  7. type PersistTo uint8
  8. const (
  9. PersistNone = PersistTo(0x00)
  10. PersistMaster = PersistTo(0x01)
  11. PersistOne = PersistTo(0x02)
  12. PersistTwo = PersistTo(0x03)
  13. PersistThree = PersistTo(0x04)
  14. PersistFour = PersistTo(0x05)
  15. )
  16. type ObserveTo uint8
  17. const (
  18. ObserveNone = ObserveTo(0x00)
  19. ObserveReplicateOne = ObserveTo(0x01)
  20. ObserveReplicateTwo = ObserveTo(0x02)
  21. ObserveReplicateThree = ObserveTo(0x03)
  22. ObserveReplicateFour = ObserveTo(0x04)
  23. )
  24. type JobType uint8
  25. const (
  26. OBSERVE = JobType(0x00)
  27. PERSIST = JobType(0x01)
  28. )
  29. type ObservePersistJob struct {
  30. vb uint16
  31. vbuuid uint64
  32. hostname string
  33. jobType JobType
  34. failover uint8
  35. lastPersistedSeqNo uint64
  36. currentSeqNo uint64
  37. resultChan chan *ObservePersistJob
  38. errorChan chan *OPErrResponse
  39. }
  40. type OPErrResponse struct {
  41. vb uint16
  42. vbuuid uint64
  43. err error
  44. job *ObservePersistJob
  45. }
  46. var ObservePersistPool = NewPool(1024)
  47. var OPJobChan = make(chan *ObservePersistJob, 1024)
  48. var OPJobDone = make(chan bool)
  49. var wg sync.WaitGroup
  50. func (b *Bucket) StartOPPollers(maxWorkers int) {
  51. for i := 0; i < maxWorkers; i++ {
  52. go b.OPJobPoll()
  53. wg.Add(1)
  54. }
  55. wg.Wait()
  56. }
  57. func (b *Bucket) SetObserveAndPersist(nPersist PersistTo, nObserve ObserveTo) (err error) {
  58. numNodes := len(b.Nodes())
  59. if int(nPersist) > numNodes || int(nObserve) > numNodes {
  60. return fmt.Errorf("Not enough healthy nodes in the cluster")
  61. }
  62. if int(nPersist) > (b.Replicas+1) || int(nObserve) > b.Replicas {
  63. return fmt.Errorf("Not enough replicas in the cluster")
  64. }
  65. if EnableMutationToken == false {
  66. return fmt.Errorf("Mutation Tokens not enabled ")
  67. }
  68. b.ds = &DurablitySettings{Persist: PersistTo(nPersist), Observe: ObserveTo(nObserve)}
  69. return
  70. }
  71. func (b *Bucket) ObserveAndPersistPoll(vb uint16, vbuuid uint64, seqNo uint64) (err error, failover bool) {
  72. b.RLock()
  73. ds := b.ds
  74. b.RUnlock()
  75. if ds == nil {
  76. return
  77. }
  78. nj := 0 // total number of jobs
  79. resultChan := make(chan *ObservePersistJob, 10)
  80. errChan := make(chan *OPErrResponse, 10)
  81. nodes := b.GetNodeList(vb)
  82. if int(ds.Observe) > len(nodes) || int(ds.Persist) > len(nodes) {
  83. return fmt.Errorf("Not enough healthy nodes in the cluster"), false
  84. }
  85. logging.Infof("Node list %v", nodes)
  86. if ds.Observe >= ObserveReplicateOne {
  87. // create a job for each host
  88. for i := ObserveReplicateOne; i < ds.Observe+1; i++ {
  89. opJob := ObservePersistPool.Get()
  90. opJob.vb = vb
  91. opJob.vbuuid = vbuuid
  92. opJob.jobType = OBSERVE
  93. opJob.hostname = nodes[i]
  94. opJob.resultChan = resultChan
  95. opJob.errorChan = errChan
  96. OPJobChan <- opJob
  97. nj++
  98. }
  99. }
  100. if ds.Persist >= PersistMaster {
  101. for i := PersistMaster; i < ds.Persist+1; i++ {
  102. opJob := ObservePersistPool.Get()
  103. opJob.vb = vb
  104. opJob.vbuuid = vbuuid
  105. opJob.jobType = PERSIST
  106. opJob.hostname = nodes[i]
  107. opJob.resultChan = resultChan
  108. opJob.errorChan = errChan
  109. OPJobChan <- opJob
  110. nj++
  111. }
  112. }
  113. ok := true
  114. for ok {
  115. select {
  116. case res := <-resultChan:
  117. jobDone := false
  118. if res.failover == 0 {
  119. // no failover
  120. if res.jobType == PERSIST {
  121. if res.lastPersistedSeqNo >= seqNo {
  122. jobDone = true
  123. }
  124. } else {
  125. if res.currentSeqNo >= seqNo {
  126. jobDone = true
  127. }
  128. }
  129. if jobDone == true {
  130. nj--
  131. ObservePersistPool.Put(res)
  132. } else {
  133. // requeue this job
  134. OPJobChan <- res
  135. }
  136. } else {
  137. // Not currently handling failover scenarios TODO
  138. nj--
  139. ObservePersistPool.Put(res)
  140. failover = true
  141. }
  142. if nj == 0 {
  143. // done with all the jobs
  144. ok = false
  145. close(resultChan)
  146. close(errChan)
  147. }
  148. case Err := <-errChan:
  149. logging.Errorf("Error in Observe/Persist %v", Err.err)
  150. err = fmt.Errorf("Error in Observe/Persist job %v", Err.err)
  151. nj--
  152. ObservePersistPool.Put(Err.job)
  153. if nj == 0 {
  154. close(resultChan)
  155. close(errChan)
  156. ok = false
  157. }
  158. }
  159. }
  160. return
  161. }
  162. func (b *Bucket) OPJobPoll() {
  163. ok := true
  164. for ok == true {
  165. select {
  166. case job := <-OPJobChan:
  167. pool := b.getConnPoolByHost(job.hostname, false /* bucket not already locked */)
  168. if pool == nil {
  169. errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
  170. errRes.err = fmt.Errorf("Pool not found for host %v", job.hostname)
  171. errRes.job = job
  172. job.errorChan <- errRes
  173. continue
  174. }
  175. conn, err := pool.Get()
  176. if err != nil {
  177. errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
  178. errRes.err = fmt.Errorf("Unable to get connection from pool %v", err)
  179. errRes.job = job
  180. job.errorChan <- errRes
  181. continue
  182. }
  183. res, err := conn.ObserveSeq(job.vb, job.vbuuid)
  184. if err != nil {
  185. errRes := &OPErrResponse{vb: job.vb, vbuuid: job.vbuuid}
  186. errRes.err = fmt.Errorf("Command failed %v", err)
  187. errRes.job = job
  188. job.errorChan <- errRes
  189. continue
  190. }
  191. pool.Return(conn)
  192. job.lastPersistedSeqNo = res.LastPersistedSeqNo
  193. job.currentSeqNo = res.CurrentSeqNo
  194. job.failover = res.Failover
  195. job.resultChan <- job
  196. case <-OPJobDone:
  197. logging.Infof("Observe Persist Poller exitting")
  198. ok = false
  199. }
  200. }
  201. wg.Done()
  202. }
  203. func (b *Bucket) GetNodeList(vb uint16) []string {
  204. vbm := b.VBServerMap()
  205. if len(vbm.VBucketMap) < int(vb) {
  206. logging.Infof("vbmap smaller than vblist")
  207. return nil
  208. }
  209. nodes := make([]string, len(vbm.VBucketMap[vb]))
  210. for i := 0; i < len(vbm.VBucketMap[vb]); i++ {
  211. n := vbm.VBucketMap[vb][i]
  212. if n < 0 {
  213. continue
  214. }
  215. node := b.getMasterNode(n)
  216. if len(node) > 1 {
  217. nodes[i] = node
  218. }
  219. continue
  220. }
  221. return nodes
  222. }
  223. //pool of ObservePersist Jobs
  224. type OPpool struct {
  225. pool chan *ObservePersistJob
  226. }
  227. // NewPool creates a new pool of jobs
  228. func NewPool(max int) *OPpool {
  229. return &OPpool{
  230. pool: make(chan *ObservePersistJob, max),
  231. }
  232. }
  233. // Borrow a Client from the pool.
  234. func (p *OPpool) Get() *ObservePersistJob {
  235. var o *ObservePersistJob
  236. select {
  237. case o = <-p.pool:
  238. default:
  239. o = &ObservePersistJob{}
  240. }
  241. return o
  242. }
  243. // Return returns a Client to the pool.
  244. func (p *OPpool) Put(o *ObservePersistJob) {
  245. select {
  246. case p.pool <- o:
  247. default:
  248. // let it go, let it go...
  249. }
  250. }