Skip to content

Commit

Permalink
storage/concurrency: define concurrency control interfaces
Browse files Browse the repository at this point in the history
Informs cockroachdb#41720.

The commit creates a new concurrency package that provides a concurrency manager
structure that encapsulates the details of concurrency control and contention
handling for serializable key-value transactions. Interested readers should
start at `concurrency_control.go` and move out from there.

The new package has a few primary objectives:

1. centralize the handling of request synchronization and transaction contention
handling in a single location, allowing for the topic to be documented,
understood, and tested in isolation.

2. rework contention handling to react to intent state transitions directly.
This simplifies the transaction queueing story, reduces the frequency of
transaction push RPCs, and allows waiters to proceed immediately after intent
resolution.

3. create a framework that naturally permits "update" locking, which is required
for kv-level SELECT FOR UPDATE support (cockroachdb#6583).

4. provide stronger guarantees around fairness when transactions conflict, to
reduce tail latencies under contended sceneries.

5. create a structure that can extend to address the long-term goals of a fully
centralized lock-table laid out in cockroachdb#41720.

This commit pulls over a lot of already reviewed code from cockroachdb#43775. The big
differences are that it updates the lockTable interface to match the one
introduced in cockroachdb#43740 and it addresses the remaining TODOs to document the rest
of the concurrency control mechanisms in CockroachDB. At this point, a reader
of this file should get a good idea of how KV transactions interact in CRDB...
now we just need to make the system actually work this way.
  • Loading branch information
nvanbenschoten committed Feb 7, 2020
1 parent 7ea16bc commit 78533e0
Show file tree
Hide file tree
Showing 13 changed files with 1,022 additions and 527 deletions.
2 changes: 2 additions & 0 deletions pkg/storage/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ func init() {
func declareKeysResolveIntentCombined(
desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
// TODO(nvanbenschoten): declare this span at the txn's MinTimestamp. See
// lockTable.UpdateLocks for more.
DefaultDeclareKeys(desc, header, req, spans)
var status roachpb.TransactionStatus
var txnID uuid.UUID
Expand Down
714 changes: 714 additions & 0 deletions pkg/storage/concurrency/concurrency_control.go

Large diffs are not rendered by default.

35 changes: 35 additions & 0 deletions pkg/storage/concurrency/latch_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package concurrency

import (
"context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/spanlatch"
)

// latchManagerImpl implements the latchManager interface.
type latchManagerImpl struct {
m spanlatch.Manager
}

func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard, *Error) {
lg, err := m.m.Acquire(ctx, req.Spans)
if err != nil {
return nil, roachpb.NewError(err)
}
return lg, nil
}

func (m *latchManagerImpl) Release(lg latchGuard) {
m.m.Release(lg.(*spanlatch.Guard))
}
636 changes: 191 additions & 445 deletions pkg/storage/concurrency/lock_table.go

Large diffs are not rendered by default.

77 changes: 37 additions & 40 deletions pkg/storage/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
)

/*
Test needs to handle caller constraints wrt latches being held. The datadriven test uses the
following format:
Test needs to handle caller constraints wrt latches being held. The datadriven
test uses the following format:
txn txn=<name> ts=<int>[,<int>] epoch=<int>
----
Expand All @@ -45,8 +45,8 @@ scan r=<name>
----
<error string>|start-waiting: <bool>
Calls lockTable.scanAndEnqueue. If the request has an existing guard, uses it. If a guard is
returned, stores it for later use.
Calls lockTable.ScanAndEnqueue. If the request has an existing guard, uses it.
If a guard is returned, stores it for later use.
acquire r=<name> k=<key> durability=r|u
----
Expand Down Expand Up @@ -76,19 +76,21 @@ done r=<name>
----
<error string>
Calls lockTable.done() for the named request. The request and guard are discarded after this.
Calls lockTable.Dequeue() for the named request. The request and guard are
discarded after this.
guard-state r=<name>
----
new|old: state=<state> [txn=<name> ts=<ts>]
Calls requestGuard.newState() in a non-blocking manner, followed by currentState().
Calls lockTableGuard.NewStateChan() in a non-blocking manner, followed by
CurState().
guard-start-waiting r=<name>
----
<bool>
Calls requestGuard.startWaiting().
Calls lockTableGuard.ShouldWait().
print
----
Expand Down Expand Up @@ -163,26 +165,14 @@ func scanSpans(t *testing.T, d *datadriven.TestData, ts hlc.Timestamp) *spanset.
return spans
}

type testRequest struct {
tM *enginepb.TxnMeta
s *spanset.SpanSet
t hlc.Timestamp
}

var _ Request = &testRequest{}

func (r *testRequest) txnMeta() *enginepb.TxnMeta { return r.tM }
func (r *testRequest) spans() *spanset.SpanSet { return r.s }
func (r *testRequest) ts() hlc.Timestamp { return r.t }

func TestLockTableBasic(t *testing.T) {
defer leaktest.AfterTest(t)()

lt := newLockTable(1000)
txnsByName := make(map[string]*enginepb.TxnMeta)
txnCounter := uint128.FromInts(0, 0)
requestsByName := make(map[string]*testRequest)
guardsByReqName := make(map[string]requestGuard)
requestsByName := make(map[string]Request)
guardsByReqName := make(map[string]lockTableGuard)
datadriven.RunTest(t, "testdata/lock_table", func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "txn":
Expand Down Expand Up @@ -227,32 +217,35 @@ func TestLockTableBasic(t *testing.T) {
}
ts := scanTimestamp(t, d)
spans := scanSpans(t, d, ts)
req := &testRequest{
tM: txnMeta,
s: spans,
t: ts,
// Update the transaction's timestamp, if necessary. The transaction
// may have needed to move its timestamp for any number of reasons.
txnMeta.WriteTimestamp = ts
req := Request{
Txn: &roachpb.Transaction{TxnMeta: *txnMeta},
Timestamp: ts,
Spans: spans,
}
requestsByName[reqName] = req
return ""

case "scan":
var reqName string
d.ScanArgs(t, "r", &reqName)
req := requestsByName[reqName]
if req == nil {
req, ok := requestsByName[reqName]
if !ok {
d.Fatalf(t, "unknown request: %s", reqName)
}
g := guardsByReqName[reqName]
g = lt.scanAndEnqueue(req, g)
g = lt.ScanAndEnqueue(req, g)
guardsByReqName[reqName] = g
return fmt.Sprintf("start-waiting: %t", g.startWaiting())
return fmt.Sprintf("start-waiting: %t", g.ShouldWait())

case "acquire":
var reqName string
d.ScanArgs(t, "r", &reqName)
g := guardsByReqName[reqName]
if g == nil {
d.Fatalf(t, "unknown guard: %s", reqName)
req, ok := requestsByName[reqName]
if !ok {
d.Fatalf(t, "unknown request: %s", reqName)
}
var key string
d.ScanArgs(t, "k", &key)
Expand All @@ -265,7 +258,7 @@ func TestLockTableBasic(t *testing.T) {
if s[0] == 'r' {
durability = lock.Replicated
}
if err := lt.acquireLock(roachpb.Key(key), lock.Exclusive, durability, g); err != nil {
if err := lt.AcquireLock(&req.Txn.TxnMeta, roachpb.Key(key), lock.Exclusive, durability); err != nil {
return err.Error()
}
return lt.(*lockTableImpl).String()
Expand All @@ -282,7 +275,7 @@ func TestLockTableBasic(t *testing.T) {
span := getSpan(t, d, s)
// TODO(sbhola): also test ABORTED.
intent := &roachpb.Intent{Span: span, Txn: *txnMeta, Status: roachpb.COMMITTED}
if err := lt.updateLocks(intent); err != nil {
if err := lt.UpdateLocks(intent); err != nil {
return err.Error()
}
return lt.(*lockTableImpl).String()
Expand All @@ -306,7 +299,7 @@ func TestLockTableBasic(t *testing.T) {
span := getSpan(t, d, s)
// TODO(sbhola): also test STAGING.
intent := &roachpb.Intent{Span: span, Txn: *txnMeta, Status: roachpb.PENDING}
if err := lt.updateLocks(intent); err != nil {
if err := lt.UpdateLocks(intent); err != nil {
return err.Error()
}
return lt.(*lockTableImpl).String()
Expand All @@ -326,31 +319,35 @@ func TestLockTableBasic(t *testing.T) {
if !ok {
d.Fatalf(t, "unknown txn %s", txnName)
}
if err := lt.addDiscoveredLock(roachpb.Key(key), txnMeta, txnMeta.WriteTimestamp, g); err != nil {
span := roachpb.Span{Key: roachpb.Key(key)}
intent := &roachpb.Intent{Span: span, Txn: *txnMeta, Status: roachpb.PENDING}
if err := lt.AddDiscoveredLock(intent, g); err != nil {
return err.Error()
}
return lt.(*lockTableImpl).String()

case "done":
// TODO(nvanbenschoten): rename this command to dequeue.
var reqName string
d.ScanArgs(t, "r", &reqName)
g := guardsByReqName[reqName]
if g == nil {
d.Fatalf(t, "unknown guard: %s", reqName)
}
lt.done(g)
lt.Dequeue(g)
delete(guardsByReqName, reqName)
delete(requestsByName, reqName)
return lt.(*lockTableImpl).String()

case "guard-start-waiting":
// TODO(nvanbenschoten): rename this command to should-wait.
var reqName string
d.ScanArgs(t, "r", &reqName)
g := guardsByReqName[reqName]
if g == nil {
d.Fatalf(t, "unknown guard: %s", reqName)
}
return fmt.Sprintf("%t", g.startWaiting())
return fmt.Sprintf("%t", g.ShouldWait())

case "guard-state":
var reqName string
Expand All @@ -361,12 +358,12 @@ func TestLockTableBasic(t *testing.T) {
}
var str string
select {
case <-g.newState():
case <-g.NewStateChan():
str = "new: "
default:
str = "old: "
}
state := g.currentState()
state := g.CurState()
var typeStr string
switch state.stateKind {
case waitForDistinguished:
Expand Down
4 changes: 1 addition & 3 deletions pkg/storage/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,9 +883,7 @@ func (ir *IntentResolver) ResolveIntents(
}
var resolveReqs []resolveReq
var resolveRangeReqs []roachpb.Request
for i := range intents {
intent := intents[i] // avoids a race in `i, intent := range ...`

for _, intent := range intents {
if len(intent.EndKey) == 0 {
resolveReqs = append(resolveReqs,
resolveReq{
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ func newUnloadedReplica(
RangeID: desc.RangeID,
store: store,
abortSpan: abortspan.New(desc.RangeID),
txnWaitQueue: txnwait.NewQueue(store),
}
r.txnWaitQueue = txnwait.NewQueue(store, r)
r.mu.pendingLeaseRequest = makePendingLeaseRequest(r)
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (r *Replica) handleTransactionPushError(
return pErr
}
// Enqueue unsuccessfully pushed transaction on the txnWaitQueue and retry.
r.txnWaitQueue.Enqueue(&t.PusheeTxn)
r.txnWaitQueue.EnqueueTxn(&t.PusheeTxn)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/storage/spanset/spanset.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (s *SpanSet) SortAndDedup() {
for sa := SpanAccess(0); sa < NumSpanAccess; sa++ {
for ss := SpanScope(0); ss < NumSpanScope; ss++ {
s.spans[sa][ss], _ /* distinct */ = mergeSpans(s.spans[sa][ss])
// TODO(nvanbenschoten): dedup across accesses.
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/store_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (r *Replica) maybeWaitForPushee(
return nil, nil
}
pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest)
pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, r, pushReq)
pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, pushReq)
if pErr != nil {
return nil, pErr
} else if pushResp != nil {
Expand All @@ -262,7 +262,7 @@ func (r *Replica) maybeWaitForPushee(
// For query txn requests, wait in the txn wait queue either for
// transaction update or for dependent transactions to change.
queryReq := ba.Requests[0].GetInner().(*roachpb.QueryTxnRequest)
pErr := r.txnWaitQueue.MaybeWaitForQuery(ctx, r, queryReq)
pErr := r.txnWaitQueue.MaybeWaitForQuery(ctx, queryReq)
if pErr != nil {
return nil, pErr
}
Expand Down
Loading

0 comments on commit 78533e0

Please sign in to comment.