123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- // Copyright 2016 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package raft
- import pb "github.com/coreos/etcd/raft/raftpb"
- // ReadState provides state for read only query.
- // It's caller's responsibility to call ReadIndex first before getting
- // this state from ready, it's also caller's duty to differentiate if this
- // state is what it requests through RequestCtx, eg. given a unique id as
- // RequestCtx
- type ReadState struct {
- Index uint64
- RequestCtx []byte
- }
- type readIndexStatus struct {
- req pb.Message
- index uint64
- acks map[uint64]struct{}
- }
- type readOnly struct {
- option ReadOnlyOption
- pendingReadIndex map[string]*readIndexStatus
- readIndexQueue []string
- }
- func newReadOnly(option ReadOnlyOption) *readOnly {
- return &readOnly{
- option: option,
- pendingReadIndex: make(map[string]*readIndexStatus),
- }
- }
- // addRequest adds a read only reuqest into readonly struct.
- // `index` is the commit index of the raft state machine when it received
- // the read only request.
- // `m` is the original read only request message from the local or remote node.
- func (ro *readOnly) addRequest(index uint64, m pb.Message) {
- ctx := string(m.Entries[0].Data)
- if _, ok := ro.pendingReadIndex[ctx]; ok {
- return
- }
- ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
- ro.readIndexQueue = append(ro.readIndexQueue, ctx)
- }
- // recvAck notifies the readonly struct that the raft state machine received
- // an acknowledgment of the heartbeat that attached with the read only request
- // context.
- func (ro *readOnly) recvAck(m pb.Message) int {
- rs, ok := ro.pendingReadIndex[string(m.Context)]
- if !ok {
- return 0
- }
- rs.acks[m.From] = struct{}{}
- // add one to include an ack from local node
- return len(rs.acks) + 1
- }
- // advance advances the read only request queue kept by the readonly struct.
- // It dequeues the requests until it finds the read only request that has
- // the same context as the given `m`.
- func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
- var (
- i int
- found bool
- )
- ctx := string(m.Context)
- rss := []*readIndexStatus{}
- for _, okctx := range ro.readIndexQueue {
- i++
- rs, ok := ro.pendingReadIndex[okctx]
- if !ok {
- panic("cannot find corresponding read state from pending map")
- }
- rss = append(rss, rs)
- if okctx == ctx {
- found = true
- break
- }
- }
- if found {
- ro.readIndexQueue = ro.readIndexQueue[i:]
- for _, rs := range rss {
- delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
- }
- return rss
- }
- return nil
- }
- // lastPendingRequestCtx returns the context of the last pending read only
- // request in readonly struct.
- func (ro *readOnly) lastPendingRequestCtx() string {
- if len(ro.readIndexQueue) == 0 {
- return ""
- }
- return ro.readIndexQueue[len(ro.readIndexQueue)-1]
- }
|