Skip to content

Commit

Permalink
[DNM] storage/concurrency: introduce concurrency control package, pro…
Browse files Browse the repository at this point in the history
…totype SFU

Informs cockroachdb#41720.

This commit creates a new concurrency package that provides a concurrency
manager structure that encapsulates the details of concurrency control and
contention handling for serializable key-value transactions. Any reader of this
commit should start at `concurrency_control.go` and move out from there.

The new package has a few primary objectives:
1. centralize the handling of request synchronization and transaction contention
   handling in a single location, allowing for the topic to be documented and
   understood in isolation.
2. rework contention handling to react to intent state transitions directly. This
   simplifies the transaction queueing story, reduces the frequency of transaction
   push RPCs, and allows waiters to proceed after intent resolution as soon as possible.
3. create a framework that naturally permits "update" locking, which is required for
   kv-level SELECT FOR UPDATE support (cockroachdb#6583).
4. provide stronger guarantees around fairness when transactions conflict, in order
   to reduce tail latencies under contended scenarios.
5. create a structure that can extend to address the long-term goals of a fully
   centralized lock-table laid out in cockroachdb#41720.

WARNING: this is still a WIP. Notably, the lockTableImpl is mocked out with a
working but incomplete implementation. See cockroachdb#43740 for a more complete strawman.

Release note: None
  • Loading branch information
nvanbenschoten committed Feb 14, 2020
1 parent edf88b4 commit f0709c2
Show file tree
Hide file tree
Showing 19 changed files with 200 additions and 316 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/batcheval/cmd_query_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,6 @@ func QueryTxn(
}

// Get the list of txns waiting on this txn.
reply.WaitingTxns = cArgs.EvalCtx.GetTxnWaitQueue().GetDependents(args.Txn.ID)
reply.WaitingTxns = cArgs.EvalCtx.GetConcurrencyManager().GetDependents(args.Txn.ID)
return result.Result{}, nil
}
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/cmd_resolve_intent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -71,7 +71,7 @@ func (m *mockEvalCtx) GetLimiters() *Limiters {
func (m *mockEvalCtx) AbortSpan() *abortspan.AbortSpan {
return m.abortSpan
}
func (m *mockEvalCtx) GetTxnWaitQueue() *txnwait.Queue {
func (m *mockEvalCtx) GetConcurrencyManager() concurrency.Manager {
panic("unimplemented")
}
func (m *mockEvalCtx) NodeID() roachpb.NodeID {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batcheval/eval_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
Expand Down Expand Up @@ -53,7 +53,7 @@ type EvalContext interface {
Clock() *hlc.Clock
DB() *client.DB
AbortSpan() *abortspan.AbortSpan
GetTxnWaitQueue() *txnwait.Queue
GetConcurrencyManager() concurrency.Manager
GetLimiters() *Limiters

NodeID() roachpb.NodeID
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,6 @@ func (r *Replica) IsQuiescent() bool {
return r.mu.quiescent
}

func (r *Replica) IsTxnWaitQueueEnabled() bool {
return r.txnWaitQueue.IsEnabled()
}

// GetQueueLastProcessed returns the last processed timestamp for the
// specified queue, or the zero timestamp if not available.
func (r *Replica) GetQueueLastProcessed(ctx context.Context, queue string) (hlc.Timestamp, error) {
Expand Down
93 changes: 19 additions & 74 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rangefeed"
"github.com/cockroachdb/cockroach/pkg/storage/spanlatch"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/split"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -176,9 +174,8 @@ type Replica struct {
// TODO(tschottdorf): Duplicates r.mu.state.desc.RangeID; revisit that.
RangeID roachpb.RangeID // Only set by the constructor

store *Store
abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort
txnWaitQueue *txnwait.Queue // Queues push txn attempts by txn ID
store *Store
abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort

// leaseholderStats tracks all incoming BatchRequests to the replica and which
// localities they come from in order to aid in lease rebalancing decisions.
Expand Down Expand Up @@ -224,11 +221,10 @@ type Replica struct {
// Contains the lease history when enabled.
leaseHistory *leaseHistory

// Enforces at most one command is running per key(s) within each span
// scope. The globally-scoped component tracks user writes (i.e. all
// keys for which keys.Addr is the identity), the locally-scoped component
// the rest (e.g. RangeDescriptor, transaction record, Lease, ...).
latchMgr spanlatch.Manager
// concMgr sequences incoming requests and provides isoaltion between
// requests that intend to perform conflicting operations. It is the
// centerpiece of transaction contention handling.
concMgr concurrency.Manager

mu struct {
// Protects all fields in the mu struct.
Expand Down Expand Up @@ -664,9 +660,9 @@ func (r *Replica) GetLimiters() *batcheval.Limiters {
return &r.store.limiters
}

// GetTxnWaitQueue returns the Replica's txnwait.Queue.
func (r *Replica) GetTxnWaitQueue() *txnwait.Queue {
return r.txnWaitQueue
// GetConcurrencyManager returns the Replica's concurrency.Manager.
func (r *Replica) GetConcurrencyManager() concurrency.Manager {
return r.concMgr
}

// GetTerm returns the term of the given index in the raft log.
Expand Down Expand Up @@ -927,14 +923,14 @@ func (r *Replica) assertStateLocked(ctx context.Context, reader engine.Reader) {
// able to serve traffic or that the request is not compatible with the state of
// the Range.
//
// The method accepts a spanlatch.Guard and a LeaseStatus parameter. These are
// The method accepts a concurrency.Guard and a LeaseStatus parameter. These are
// used to indicate whether the caller has acquired latches and checked the
// Range lease. The method will only check for a pending merge if both of these
// conditions are true. If either lg == nil or st == nil then the method will
// not check for a pending merge. Callers might be ok with this if they know
// that they will end up checking for a pending merge at some later time.
func (r *Replica) checkExecutionCanProceed(
ba *roachpb.BatchRequest, lg *spanlatch.Guard, st *storagepb.LeaseStatus,
ba *roachpb.BatchRequest, g *concurrency.Guard, st *storagepb.LeaseStatus,
) error {
rSpan, err := keys.Range(ba.Requests)
if err != nil {
Expand All @@ -949,7 +945,7 @@ func (r *Replica) checkExecutionCanProceed(
return err
} else if err := r.checkTSAboveGCThresholdRLocked(ba.Timestamp); err != nil {
return err
} else if lg != nil && st != nil {
} else if g != nil && st != nil {
// Only check for a pending merge if latches are held and the Range
// lease is held by this Replica. Without both of these conditions,
// checkForPendingMergeRLocked could return false negatives.
Expand Down Expand Up @@ -1121,7 +1117,7 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool {
// command processing.
type endCmds struct {
repl *Replica
lg *spanlatch.Guard
g *concurrency.Guard
}

// move moves the endCmds into the return value, clearing and making
Expand Down Expand Up @@ -1149,67 +1145,16 @@ func (ec *endCmds) done(
// Update the timestamp cache if the request is not being re-evaluated. Each
// request is considered in turn; only those marked as affecting the cache are
// processed. Inconsistent reads are excluded.
if ba.ReadConsistency == roachpb.CONSISTENT {
ec.repl.updateTimestampCache(ctx, ba, br, pErr)
}
ec.repl.updateTimestampCache(ctx, ba, br, pErr)

// Release the latches acquired by the request back to the spanlatch
// manager. Must be done AFTER the timestamp cache is updated.
if ec.lg != nil {
ec.repl.latchMgr.Release(ec.lg)
// manager. Must be done AFTER the timestamp cache is updated. Only set
// during when the proposal has assumed responsibility for releasing latches.
if ec.g != nil {
ec.repl.concMgr.FinishReq(ec.g)
}
}

// beginCmds waits for any in-flight, conflicting commands to complete. More
// specifically, beginCmds acquires latches for the request based on keys
// affected by the batched commands. This gates subsequent commands with
// overlapping keys or key ranges. It returns a cleanup function to be called
// when the commands are done and can release their latches.
func (r *Replica) beginCmds(
ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet,
) (*spanlatch.Guard, error) {
// Only acquire latches for consistent operations.
if ba.ReadConsistency != roachpb.CONSISTENT {
log.Event(ctx, "operation accepts inconsistent results")
return nil, nil
}

// Don't acquire latches for lease requests. These are run on replicas that
// do not hold the lease, so acquiring latches wouldn't help synchronize
// with other requests.
if ba.IsLeaseRequest() {
return nil, nil
}

var beforeLatch time.Time
if log.ExpensiveLogEnabled(ctx, 2) {
beforeLatch = timeutil.Now()
}

// Acquire latches for all the request's declared spans to ensure
// protected access and to avoid interacting requests from operating at
// the same time. The latches will be held for the duration of request.
log.Event(ctx, "acquire latches")
lg, err := r.latchMgr.Acquire(ctx, spans)
if err != nil {
return nil, err
}

if !beforeLatch.IsZero() {
dur := timeutil.Since(beforeLatch)
log.VEventf(ctx, 2, "waited %s to acquire latches", dur)
}

if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil {
if pErr := filter(*ba); pErr != nil {
r.latchMgr.Release(lg)
return nil, pErr.GoError()
}
}

return lg, nil
}

// maybeWatchForMerge checks whether a merge of this replica into its left
// neighbor is in its critical phase and, if so, arranges to block all requests
// until the merge completes.
Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/replica_eval_context_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/cloud"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)
Expand Down Expand Up @@ -74,9 +74,9 @@ func (rec *SpanSetReplicaEvalContext) DB() *client.DB {
return rec.i.DB()
}

// GetTxnWaitQueue returns the txnwait.Queue.
func (rec *SpanSetReplicaEvalContext) GetTxnWaitQueue() *txnwait.Queue {
return rec.i.GetTxnWaitQueue()
// GetConcurrencyManager returns the concurrency.Manager.
func (rec *SpanSetReplicaEvalContext) GetConcurrencyManager() concurrency.Manager {
return rec.i.GetConcurrencyManager()
}

// NodeID returns the NodeID.
Expand Down
7 changes: 3 additions & 4 deletions pkg/storage/replica_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/spanlatch"
"github.com/cockroachdb/cockroach/pkg/storage/concurrency"
"github.com/cockroachdb/cockroach/pkg/storage/split"
"github.com/cockroachdb/cockroach/pkg/storage/stateloader"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/storage/txnwait"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -70,8 +69,8 @@ func newUnloadedReplica(
RangeID: desc.RangeID,
store: store,
abortSpan: abortspan.New(desc.RangeID),
concMgr: concurrency.NewManager(store, desc),
}
r.txnWaitQueue = txnwait.NewQueue(store, r)
r.mu.pendingLeaseRequest = makePendingLeaseRequest(r)
r.mu.stateLoader = stateloader.Make(desc.RangeID)
r.mu.quiescent = true
Expand All @@ -80,7 +79,6 @@ func newUnloadedReplica(
split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 {
return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV))
})
r.latchMgr = spanlatch.Make(r.store.stopper, r.store.metrics.SlowLatchRequests)
r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{}
r.mu.checksums = map[uuid.UUID]ReplicaChecksum{}
r.mu.proposalBuf.Init((*replicaProposer)(r))
Expand Down Expand Up @@ -300,5 +298,6 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R

r.rangeStr.store(r.mu.replicaID, desc)
r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey))
r.concMgr.OnDescriptorUpdated(desc)
r.mu.state.Desc = desc
}
7 changes: 4 additions & 3 deletions pkg/storage/replica_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ func (r *Replica) Metrics(
_, ticking := r.store.unquiescedReplicas.m[r.RangeID]
r.store.unquiescedReplicas.Unlock()

latchInfoGlobal, latchInfoLocal := r.latchMgr.Info()
// TODO WIP
// latchInfoGlobal, latchInfoLocal := r.latchMgr.Info()

return calcReplicaMetrics(
ctx,
Expand All @@ -79,8 +80,8 @@ func (r *Replica) Metrics(
r.store.StoreID(),
quiescent,
ticking,
latchInfoLocal,
latchInfoGlobal,
storagepb.LatchManagerInfo{},
storagepb.LatchManagerInfo{},
raftLogSize,
)
}
Expand Down
23 changes: 11 additions & 12 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type ProposalData struct {
tmpFooter storagepb.RaftCommandFooter

// ec.done is called after command application to update the timestamp
// cache and release latches.
// cache and optionally release latches.
ec endCmds

// applied is set when the a command finishes application. It is used to
Expand Down Expand Up @@ -413,12 +413,6 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
}
}

if leaseChangingHands && !iAmTheLeaseHolder {
// Also clear and disable the push transaction queue. Any waiters
// must be redirected to the new lease holder.
r.txnWaitQueue.Clear(true /* disable */)
}

// If we're the current raft leader, may want to transfer the leadership to
// the new leaseholder. Note that this condition is also checked periodically
// when ticking the replica.
Expand All @@ -439,6 +433,9 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
}
}

// Inform the concurrency manager that the lease holder has been updated.
r.concMgr.OnLeaseUpdated(iAmTheLeaseHolder)

// Potentially re-gossip if the range contains system data (e.g. system
// config or node liveness). We need to perform this gossip at startup as
// soon as possible. Trying to minimize how often we gossip is a fool's
Expand All @@ -453,8 +450,6 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
if err := r.MaybeGossipNodeLiveness(ctx, keys.NodeLivenessSpan); err != nil {
log.Error(ctx, err)
}
// Make sure the push transaction queue is enabled.
r.txnWaitQueue.Enable()

// Emit an MLAI on the leaseholder replica, as follower will be looking
// for one and if we went on to quiesce, they wouldn't necessarily get
Expand Down Expand Up @@ -610,18 +605,22 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re
}

if lResult.UpdatedIntents != nil {
// TODO(nvanbenschoten): handle UpdatedIntents.
for i := range lResult.UpdatedIntents {
r.concMgr.OnLockAcquired(ctx, &lResult.UpdatedIntents[i])
}
lResult.UpdatedIntents = nil
}

if lResult.ResolvedIntents != nil {
// TODO(nvanbenschoten): handle ResolvedIntents.
for i := range lResult.ResolvedIntents {
r.concMgr.OnLockUpdated(ctx, &lResult.ResolvedIntents[i])
}
lResult.ResolvedIntents = nil
}

if lResult.UpdatedTxns != nil {
for _, txn := range lResult.UpdatedTxns {
r.txnWaitQueue.UpdateTxn(ctx, txn)
r.concMgr.OnTransactionUpdated(ctx, txn)
}
lResult.UpdatedTxns = nil
}
Expand Down
Loading

0 comments on commit f0709c2

Please sign in to comment.