Skip to content

Commit

Permalink
etcdserver: make serializable Range() fine grained
Browse files Browse the repository at this point in the history
Current Range() tries to read entire requested range of keys in a
single read transaction. It can introduce long waiting of writer
transactions which can be observed as high latency spikes. For solving
this problem, this commit lets serializable Range() split its read
transaction in a fine grained manner. In the interval of the read
transactions, concurrent write RPCs (e.g. Put(), DeleteRange()) can
have a chance of starting their transactions. Serializable read only
Txn() is also turned into the smaller transactions.
  • Loading branch information
mitake committed Jan 23, 2018
1 parent 4a12eaf commit 2c0b061
Showing 1 changed file with 70 additions and 0 deletions.
70 changes: 70 additions & 0 deletions etcdserver/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,77 @@ func (a *applierV3backend) DeleteRange(txn mvcc.TxnWrite, dr *pb.DeleteRangeRequ
return resp, nil
}

func nextKey(key []byte) []byte {
for i := len(key) - 1; 0 <= i; i-- {
if key[i] < 0xff {
key[i]++
return key[:i+1]
}
}

return []byte{0}
}

func (a *applierV3backend) fineGrainedRange(r *pb.RangeRequest) (*pb.RangeResponse, error) {
resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

var res mvcc.RangeResult
startKey := r.Key

res.Rev = r.Revision
rangeEnd := mkGteRange(r.RangeEnd)

lim := int64(100) // TODO: configurable
if r.Limit != 0 && r.Limit < lim {
lim = r.Limit
}
for bytes.Compare(rangeEnd, []byte{0}) != 0 || bytes.Compare(startKey, rangeEnd) == -1 {
txn := a.s.kv.Read()

ro := mvcc.RangeOptions{
Limit: lim,
Rev: r.Revision,
Count: r.CountOnly,
}

rr, err := txn.Range(startKey, rangeEnd, ro)
txn.End()
if err != nil {
return nil, err
}

if rr.Count == 0 {
break
}

res.KVs = append(res.KVs, rr.KVs...)
res.Count += len(rr.KVs)

startKey = nextKey(rr.KVs[len(rr.KVs)-1].Key)
if bytes.Compare(startKey, []byte{0}) == 0 {
break
}
}

resp.Header.Revision = res.Rev
resp.Count = int64(res.Count)
resp.Kvs = make([]*mvccpb.KeyValue, len(res.KVs))
for i := range res.KVs {
if r.KeysOnly {
res.KVs[i].Value = nil
}
resp.Kvs[i] = &res.KVs[i]
}

return resp, nil
}

func (a *applierV3backend) Range(txn mvcc.TxnRead, r *pb.RangeRequest) (*pb.RangeResponse, error) {
if r.Serializable {
return a.fineGrainedRange(r)
}

resp := &pb.RangeResponse{}
resp.Header = &pb.ResponseHeader{}

Expand Down

0 comments on commit 2c0b061

Please sign in to comment.