From f0709c21673b80aeb0819e43b7d4644572618510 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 6 Jan 2020 17:57:20 -0500 Subject: [PATCH] [DNM] storage/concurrency: introduce concurrency control package, prototype SFU Informs #41720. This commit creates a new concurrency package that provides a concurrency manager structure that encapsulates the details of concurrency control and contention handling for serializable key-value transactions. Any reader of this commit should start at `concurrency_control.go` and move out from there. The new package has a few primary objectives: 1. centralize the handling of request synchronization and transaction contention handling in a single location, allowing for the topic to be documented and understood in isolation. 2. rework contention handling to react to intent state transitions directly. This simplifies the transaction queueing story, reduces the frequency of transaction push RPCs, and allows waiters to proceed after intent resolution as soon as possible. 3. create a framework that naturally permits "update" locking, which is required for kv-level SELECT FOR UPDATE support (#6583). 4. provide stronger guarantees around fairness when transactions conflict, in order to reduce tail latencies under contended scenarios. 5. create a structure that can extend to address the long-term goals of a fully centralized lock-table laid out in #41720. WARNING: this is still a WIP. Notably, the lockTableImpl is mocked out with a working but incomplete implementation. See #43740 for a more complete strawman. Release note: None --- pkg/storage/batcheval/cmd_query_txn.go | 2 +- .../batcheval/cmd_resolve_intent_test.go | 4 +- pkg/storage/batcheval/eval_context.go | 4 +- pkg/storage/helpers_test.go | 4 - pkg/storage/replica.go | 93 +++------- pkg/storage/replica_eval_context_span.go | 8 +- pkg/storage/replica_init.go | 7 +- pkg/storage/replica_metrics.go | 7 +- pkg/storage/replica_proposal.go | 23 ++- pkg/storage/replica_raft.go | 34 ++-- pkg/storage/replica_read.go | 29 ++-- pkg/storage/replica_send.go | 160 +++++++----------- pkg/storage/replica_test.go | 36 ++-- pkg/storage/replica_tscache.go | 3 + pkg/storage/replica_write.go | 49 +++--- pkg/storage/store.go | 9 + pkg/storage/store_merge.go | 6 +- pkg/storage/store_send.go | 36 ---- pkg/storage/store_split.go | 2 +- 19 files changed, 200 insertions(+), 316 deletions(-) diff --git a/pkg/storage/batcheval/cmd_query_txn.go b/pkg/storage/batcheval/cmd_query_txn.go index a1dd5170242d..7473f7a55de1 100644 --- a/pkg/storage/batcheval/cmd_query_txn.go +++ b/pkg/storage/batcheval/cmd_query_txn.go @@ -72,6 +72,6 @@ func QueryTxn( } // Get the list of txns waiting on this txn. - reply.WaitingTxns = cArgs.EvalCtx.GetTxnWaitQueue().GetDependents(args.Txn.ID) + reply.WaitingTxns = cArgs.EvalCtx.GetConcurrencyManager().GetDependents(args.Txn.ID) return result.Result{}, nil } diff --git a/pkg/storage/batcheval/cmd_resolve_intent_test.go b/pkg/storage/batcheval/cmd_resolve_intent_test.go index c3af46381c40..5a35d03b109b 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent_test.go +++ b/pkg/storage/batcheval/cmd_resolve_intent_test.go @@ -21,11 +21,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/cloud" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -71,7 +71,7 @@ func (m *mockEvalCtx) GetLimiters() *Limiters { func (m *mockEvalCtx) AbortSpan() *abortspan.AbortSpan { return m.abortSpan } -func (m *mockEvalCtx) GetTxnWaitQueue() *txnwait.Queue { +func (m *mockEvalCtx) GetConcurrencyManager() concurrency.Manager { panic("unimplemented") } func (m *mockEvalCtx) NodeID() roachpb.NodeID { diff --git a/pkg/storage/batcheval/eval_context.go b/pkg/storage/batcheval/eval_context.go index 6b15685b9c96..52b637880d21 100644 --- a/pkg/storage/batcheval/eval_context.go +++ b/pkg/storage/batcheval/eval_context.go @@ -19,10 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/cloud" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/uuid" @@ -53,7 +53,7 @@ type EvalContext interface { Clock() *hlc.Clock DB() *client.DB AbortSpan() *abortspan.AbortSpan - GetTxnWaitQueue() *txnwait.Queue + GetConcurrencyManager() concurrency.Manager GetLimiters() *Limiters NodeID() roachpb.NodeID diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 24d45132eb53..2a68eb7f861e 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -458,10 +458,6 @@ func (r *Replica) IsQuiescent() bool { return r.mu.quiescent } -func (r *Replica) IsTxnWaitQueueEnabled() bool { - return r.txnWaitQueue.IsEnabled() -} - // GetQueueLastProcessed returns the last processed timestamp for the // specified queue, or the zero timestamp if not available. func (r *Replica) GetQueueLastProcessed(ctx context.Context, queue string) (hlc.Timestamp, error) { diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index d3fad5d95522..84c3f716fcdc 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -30,16 +30,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/storage/cloud" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/rangefeed" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" - "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/split" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -176,9 +174,8 @@ type Replica struct { // TODO(tschottdorf): Duplicates r.mu.state.desc.RangeID; revisit that. RangeID roachpb.RangeID // Only set by the constructor - store *Store - abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort - txnWaitQueue *txnwait.Queue // Queues push txn attempts by txn ID + store *Store + abortSpan *abortspan.AbortSpan // Avoids anomalous reads after abort // leaseholderStats tracks all incoming BatchRequests to the replica and which // localities they come from in order to aid in lease rebalancing decisions. @@ -224,11 +221,10 @@ type Replica struct { // Contains the lease history when enabled. leaseHistory *leaseHistory - // Enforces at most one command is running per key(s) within each span - // scope. The globally-scoped component tracks user writes (i.e. all - // keys for which keys.Addr is the identity), the locally-scoped component - // the rest (e.g. RangeDescriptor, transaction record, Lease, ...). - latchMgr spanlatch.Manager + // concMgr sequences incoming requests and provides isoaltion between + // requests that intend to perform conflicting operations. It is the + // centerpiece of transaction contention handling. + concMgr concurrency.Manager mu struct { // Protects all fields in the mu struct. @@ -664,9 +660,9 @@ func (r *Replica) GetLimiters() *batcheval.Limiters { return &r.store.limiters } -// GetTxnWaitQueue returns the Replica's txnwait.Queue. -func (r *Replica) GetTxnWaitQueue() *txnwait.Queue { - return r.txnWaitQueue +// GetConcurrencyManager returns the Replica's concurrency.Manager. +func (r *Replica) GetConcurrencyManager() concurrency.Manager { + return r.concMgr } // GetTerm returns the term of the given index in the raft log. @@ -927,14 +923,14 @@ func (r *Replica) assertStateLocked(ctx context.Context, reader engine.Reader) { // able to serve traffic or that the request is not compatible with the state of // the Range. // -// The method accepts a spanlatch.Guard and a LeaseStatus parameter. These are +// The method accepts a concurrency.Guard and a LeaseStatus parameter. These are // used to indicate whether the caller has acquired latches and checked the // Range lease. The method will only check for a pending merge if both of these // conditions are true. If either lg == nil or st == nil then the method will // not check for a pending merge. Callers might be ok with this if they know // that they will end up checking for a pending merge at some later time. func (r *Replica) checkExecutionCanProceed( - ba *roachpb.BatchRequest, lg *spanlatch.Guard, st *storagepb.LeaseStatus, + ba *roachpb.BatchRequest, g *concurrency.Guard, st *storagepb.LeaseStatus, ) error { rSpan, err := keys.Range(ba.Requests) if err != nil { @@ -949,7 +945,7 @@ func (r *Replica) checkExecutionCanProceed( return err } else if err := r.checkTSAboveGCThresholdRLocked(ba.Timestamp); err != nil { return err - } else if lg != nil && st != nil { + } else if g != nil && st != nil { // Only check for a pending merge if latches are held and the Range // lease is held by this Replica. Without both of these conditions, // checkForPendingMergeRLocked could return false negatives. @@ -1121,7 +1117,7 @@ func (r *Replica) isNewerThanSplitRLocked(split *roachpb.SplitTrigger) bool { // command processing. type endCmds struct { repl *Replica - lg *spanlatch.Guard + g *concurrency.Guard } // move moves the endCmds into the return value, clearing and making @@ -1149,67 +1145,16 @@ func (ec *endCmds) done( // Update the timestamp cache if the request is not being re-evaluated. Each // request is considered in turn; only those marked as affecting the cache are // processed. Inconsistent reads are excluded. - if ba.ReadConsistency == roachpb.CONSISTENT { - ec.repl.updateTimestampCache(ctx, ba, br, pErr) - } + ec.repl.updateTimestampCache(ctx, ba, br, pErr) // Release the latches acquired by the request back to the spanlatch - // manager. Must be done AFTER the timestamp cache is updated. - if ec.lg != nil { - ec.repl.latchMgr.Release(ec.lg) + // manager. Must be done AFTER the timestamp cache is updated. Only set + // during when the proposal has assumed responsibility for releasing latches. + if ec.g != nil { + ec.repl.concMgr.FinishReq(ec.g) } } -// beginCmds waits for any in-flight, conflicting commands to complete. More -// specifically, beginCmds acquires latches for the request based on keys -// affected by the batched commands. This gates subsequent commands with -// overlapping keys or key ranges. It returns a cleanup function to be called -// when the commands are done and can release their latches. -func (r *Replica) beginCmds( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, -) (*spanlatch.Guard, error) { - // Only acquire latches for consistent operations. - if ba.ReadConsistency != roachpb.CONSISTENT { - log.Event(ctx, "operation accepts inconsistent results") - return nil, nil - } - - // Don't acquire latches for lease requests. These are run on replicas that - // do not hold the lease, so acquiring latches wouldn't help synchronize - // with other requests. - if ba.IsLeaseRequest() { - return nil, nil - } - - var beforeLatch time.Time - if log.ExpensiveLogEnabled(ctx, 2) { - beforeLatch = timeutil.Now() - } - - // Acquire latches for all the request's declared spans to ensure - // protected access and to avoid interacting requests from operating at - // the same time. The latches will be held for the duration of request. - log.Event(ctx, "acquire latches") - lg, err := r.latchMgr.Acquire(ctx, spans) - if err != nil { - return nil, err - } - - if !beforeLatch.IsZero() { - dur := timeutil.Since(beforeLatch) - log.VEventf(ctx, 2, "waited %s to acquire latches", dur) - } - - if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { - if pErr := filter(*ba); pErr != nil { - r.latchMgr.Release(lg) - return nil, pErr.GoError() - } - } - - return lg, nil -} - // maybeWatchForMerge checks whether a merge of this replica into its left // neighbor is in its critical phase and, if so, arranges to block all requests // until the merge completes. diff --git a/pkg/storage/replica_eval_context_span.go b/pkg/storage/replica_eval_context_span.go index d91e7f3cc075..33b77b0fcbee 100644 --- a/pkg/storage/replica_eval_context_span.go +++ b/pkg/storage/replica_eval_context_span.go @@ -20,11 +20,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/cloud" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -74,9 +74,9 @@ func (rec *SpanSetReplicaEvalContext) DB() *client.DB { return rec.i.DB() } -// GetTxnWaitQueue returns the txnwait.Queue. -func (rec *SpanSetReplicaEvalContext) GetTxnWaitQueue() *txnwait.Queue { - return rec.i.GetTxnWaitQueue() +// GetConcurrencyManager returns the concurrency.Manager. +func (rec *SpanSetReplicaEvalContext) GetConcurrencyManager() concurrency.Manager { + return rec.i.GetConcurrencyManager() } // NodeID returns the NodeID. diff --git a/pkg/storage/replica_init.go b/pkg/storage/replica_init.go index 39f25b07fa20..7255882316d1 100644 --- a/pkg/storage/replica_init.go +++ b/pkg/storage/replica_init.go @@ -18,11 +18,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/storage/abortspan" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/split" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" - "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -70,8 +69,8 @@ func newUnloadedReplica( RangeID: desc.RangeID, store: store, abortSpan: abortspan.New(desc.RangeID), + concMgr: concurrency.NewManager(store, desc), } - r.txnWaitQueue = txnwait.NewQueue(store, r) r.mu.pendingLeaseRequest = makePendingLeaseRequest(r) r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true @@ -80,7 +79,6 @@ func newUnloadedReplica( split.Init(&r.loadBasedSplitter, rand.Intn, func() float64 { return float64(SplitByLoadQPSThreshold.Get(&store.cfg.Settings.SV)) }) - r.latchMgr = spanlatch.Make(r.store.stopper, r.store.metrics.SlowLatchRequests) r.mu.proposals = map[storagebase.CmdIDKey]*ProposalData{} r.mu.checksums = map[uuid.UUID]ReplicaChecksum{} r.mu.proposalBuf.Init((*replicaProposer)(r)) @@ -300,5 +298,6 @@ func (r *Replica) setDescLockedRaftMuLocked(ctx context.Context, desc *roachpb.R r.rangeStr.store(r.mu.replicaID, desc) r.connectionClass.set(rpc.ConnectionClassForKey(desc.StartKey)) + r.concMgr.OnDescriptorUpdated(desc) r.mu.state.Desc = desc } diff --git a/pkg/storage/replica_metrics.go b/pkg/storage/replica_metrics.go index 21d21452d61d..4f5740f18e7a 100644 --- a/pkg/storage/replica_metrics.go +++ b/pkg/storage/replica_metrics.go @@ -64,7 +64,8 @@ func (r *Replica) Metrics( _, ticking := r.store.unquiescedReplicas.m[r.RangeID] r.store.unquiescedReplicas.Unlock() - latchInfoGlobal, latchInfoLocal := r.latchMgr.Info() + // TODO WIP + // latchInfoGlobal, latchInfoLocal := r.latchMgr.Info() return calcReplicaMetrics( ctx, @@ -79,8 +80,8 @@ func (r *Replica) Metrics( r.store.StoreID(), quiescent, ticking, - latchInfoLocal, - latchInfoGlobal, + storagepb.LatchManagerInfo{}, + storagepb.LatchManagerInfo{}, raftLogSize, ) } diff --git a/pkg/storage/replica_proposal.go b/pkg/storage/replica_proposal.go index ca86b7dfc62a..4099907d1934 100644 --- a/pkg/storage/replica_proposal.go +++ b/pkg/storage/replica_proposal.go @@ -87,7 +87,7 @@ type ProposalData struct { tmpFooter storagepb.RaftCommandFooter // ec.done is called after command application to update the timestamp - // cache and release latches. + // cache and optionally release latches. ec endCmds // applied is set when the a command finishes application. It is used to @@ -413,12 +413,6 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe } } - if leaseChangingHands && !iAmTheLeaseHolder { - // Also clear and disable the push transaction queue. Any waiters - // must be redirected to the new lease holder. - r.txnWaitQueue.Clear(true /* disable */) - } - // If we're the current raft leader, may want to transfer the leadership to // the new leaseholder. Note that this condition is also checked periodically // when ticking the replica. @@ -439,6 +433,9 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe } } + // Inform the concurrency manager that the lease holder has been updated. + r.concMgr.OnLeaseUpdated(iAmTheLeaseHolder) + // Potentially re-gossip if the range contains system data (e.g. system // config or node liveness). We need to perform this gossip at startup as // soon as possible. Trying to minimize how often we gossip is a fool's @@ -453,8 +450,6 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe if err := r.MaybeGossipNodeLiveness(ctx, keys.NodeLivenessSpan); err != nil { log.Error(ctx, err) } - // Make sure the push transaction queue is enabled. - r.txnWaitQueue.Enable() // Emit an MLAI on the leaseholder replica, as follower will be looking // for one and if we went on to quiesce, they wouldn't necessarily get @@ -610,18 +605,22 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re } if lResult.UpdatedIntents != nil { - // TODO(nvanbenschoten): handle UpdatedIntents. + for i := range lResult.UpdatedIntents { + r.concMgr.OnLockAcquired(ctx, &lResult.UpdatedIntents[i]) + } lResult.UpdatedIntents = nil } if lResult.ResolvedIntents != nil { - // TODO(nvanbenschoten): handle ResolvedIntents. + for i := range lResult.ResolvedIntents { + r.concMgr.OnLockUpdated(ctx, &lResult.ResolvedIntents[i]) + } lResult.ResolvedIntents = nil } if lResult.UpdatedTxns != nil { for _, txn := range lResult.UpdatedTxns { - r.txnWaitQueue.UpdateTxn(ctx, txn) + r.concMgr.OnTransactionUpdated(ctx, txn) } lResult.UpdatedTxns = nil } diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 1e7bc7701d2d..00440880d2fe 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/apply" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/stateloader" @@ -68,21 +69,14 @@ func (r *Replica) evalAndPropose( lease *roachpb.Lease, ba *roachpb.BatchRequest, spans *spanset.SpanSet, - ec endCmds, -) (_ chan proposalResult, _ func(), _ int64, pErr *roachpb.Error) { + g *concurrency.Guard, +) (_ chan proposalResult, _ func(), _ int64, async bool, pErr *roachpb.Error) { idKey := makeIDKey() proposal, pErr := r.requestToProposal(ctx, idKey, ba, spans) log.Event(proposal.ctx, "evaluated request") - // Attach the endCmds to the proposal. This moves responsibility of - // releasing latches to "below Raft" machinery. However, we make sure - // we clean up this resource if the proposal doesn't make it to Raft. - proposal.ec = ec.move() - defer func() { - if pErr != nil { - proposal.ec.done(ctx, ba, nil /* br */, pErr) - } - }() + // Attach the endCmds to the proposal. + proposal.ec = endCmds{repl: r} // Pull out proposal channel to return. proposal.doneCh may be set to // nil if it is signaled in this function. @@ -106,9 +100,13 @@ func (r *Replica) evalAndPropose( EndTxns: endTxns, } proposal.finishApplication(ctx, pr) - return proposalCh, func() {}, 0, nil + return proposalCh, func() {}, 0, async, nil } + // Assume responsibility for releasing the concurrency guard. + proposal.ec.g = g + async = true + // If the request requested that Raft consensus be performed asynchronously, // return a proposal result immediately on the proposal's done channel. // The channel's capacity will be large enough to accommodate this. @@ -117,7 +115,7 @@ func (r *Replica) evalAndPropose( // Disallow async consensus for commands with EndTxnIntents because // any !Always EndTxnIntent can't be cleaned up until after the // command succeeds. - return nil, nil, 0, roachpb.NewErrorf("cannot perform consensus asynchronously for "+ + return nil, nil, 0, false, roachpb.NewErrorf("cannot perform consensus asynchronously for "+ "proposal with EndTxnIntents=%v; %v", ets, ba) } @@ -154,14 +152,14 @@ func (r *Replica) evalAndPropose( // behavior. quotaSize := uint64(proposal.command.Size()) if maxSize := uint64(MaxCommandSize.Get(&r.store.cfg.Settings.SV)); quotaSize > maxSize { - return nil, nil, 0, roachpb.NewError(errors.Errorf( + return nil, nil, 0, false, roachpb.NewError(errors.Errorf( "command is too large: %d bytes (max: %d)", quotaSize, maxSize, )) } var err error proposal.quotaAlloc, err = r.maybeAcquireProposalQuota(ctx, quotaSize) if err != nil { - return nil, nil, 0, roachpb.NewError(err) + return nil, nil, 0, false, roachpb.NewError(err) } // Make sure we clean up the proposal if we fail to insert it into the // proposal buffer successfully. This ensures that we always release any @@ -180,13 +178,13 @@ func (r *Replica) evalAndPropose( Req: *ba, } if pErr := filter(filterArgs); pErr != nil { - return nil, nil, 0, pErr + return nil, nil, 0, false, pErr } } maxLeaseIndex, pErr := r.propose(ctx, proposal) if pErr != nil { - return nil, nil, 0, pErr + return nil, nil, 0, false, pErr } // Abandoning a proposal unbinds its context so that the proposal's client // is free to terminate execution. However, it does nothing to try to @@ -208,7 +206,7 @@ func (r *Replica) evalAndPropose( // We'd need to make sure the span is finished eventually. proposal.ctx = r.AnnotateCtx(context.TODO()) } - return proposalCh, abandon, maxLeaseIndex, nil + return proposalCh, abandon, maxLeaseIndex, async, nil } // propose encodes a command, starts tracking it, and proposes it to raft. The diff --git a/pkg/storage/replica_read.go b/pkg/storage/replica_read.go index 6b8d4b037fbb..3f25e4376b53 100644 --- a/pkg/storage/replica_read.go +++ b/pkg/storage/replica_read.go @@ -15,7 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -29,22 +29,15 @@ import ( // iterator to evaluate the batch and then updates the timestamp cache to // reflect the key spans that it read. func (r *Replica) executeReadOnlyBatch( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, lg *spanlatch.Guard, -) (br *roachpb.BatchResponse, pErr *roachpb.Error) { - // Guarantee we release the provided latches. This is wrapped to delay pErr - // evaluation to its value when returning. - ec := endCmds{repl: r, lg: lg} - defer func() { - ec.done(ctx, ba, br, pErr) - }() - + ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, g *concurrency.Guard, +) (br *roachpb.BatchResponse, _ bool, pErr *roachpb.Error) { // If the read is not inconsistent, the read requires the range lease or // permission to serve via follower reads. var status storagepb.LeaseStatus if ba.ReadConsistency.RequiresReadLease() { if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { if nErr := r.canServeFollowerRead(ctx, ba, pErr); nErr != nil { - return nil, nErr + return nil, false, nErr } r.store.metrics.FollowerReadsCount.Inc(1) } @@ -56,8 +49,8 @@ func (r *Replica) executeReadOnlyBatch( defer r.readOnlyCmdMu.RUnlock() // Verify that the batch can be executed. - if err := r.checkExecutionCanProceed(ba, ec.lg, &status); err != nil { - return nil, roachpb.NewError(err) + if err := r.checkExecutionCanProceed(ba, g, &status); err != nil { + return nil, false, roachpb.NewError(err) } // Evaluate read-only batch command. @@ -76,13 +69,14 @@ func (r *Replica) executeReadOnlyBatch( if err := r.handleReadOnlyLocalEvalResult(ctx, ba, result.Local); err != nil { pErr = roachpb.NewError(err) } + r.updateTimestampCache(ctx, ba, br, pErr) if pErr != nil { log.VErrEvent(ctx, 3, pErr.String()) } else { log.Event(ctx, "read completed") } - return br, pErr + return br, false, pErr } func (r *Replica) handleReadOnlyLocalEvalResult( @@ -105,6 +99,13 @@ func (r *Replica) handleReadOnlyLocalEvalResult( lResult.MaybeWatchForMerge = false } + if lResult.UpdatedIntents != nil { + for i := range lResult.UpdatedIntents { + r.concMgr.OnLockAcquired(ctx, &lResult.UpdatedIntents[i]) + } + lResult.UpdatedIntents = nil + } + if intents := lResult.DetachEncounteredIntents(); len(intents) > 0 { log.Eventf(ctx, "submitting %d intents to asynchronous processing", len(intents)) // We only allow synchronous intent resolution for consistent requests. diff --git a/pkg/storage/replica_send.go b/pkg/storage/replica_send.go index 14d5a802c6c7..e9f4e315aff0 100644 --- a/pkg/storage/replica_send.go +++ b/pkg/storage/replica_send.go @@ -16,8 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" - "github.com/cockroachdb/cockroach/pkg/storage/intentresolver" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/storage/txnwait" @@ -121,10 +120,13 @@ func (r *Replica) sendWithRangeID( // batchExecutionFn is a method on Replica that is able to execute a // BatchRequest. It is called with the batch, along with the span bounds that // the batch will operate over and a guard for the latches protecting the span -// bounds. The function must ensure that the latch guard is eventually released. +// bounds. The function will return either a batch response or an error. If a +// batch response is returned, it may also return a flag indicating that the +// request is continuing to execute asynchronously and that it has assumed +// responsibility of releasing the guard. type batchExecutionFn func( - *Replica, context.Context, *roachpb.BatchRequest, *spanset.SpanSet, *spanlatch.Guard, -) (*roachpb.BatchResponse, *roachpb.Error) + *Replica, context.Context, *roachpb.BatchRequest, *spanset.SpanSet, *concurrency.Guard, +) (_ *roachpb.BatchResponse, async bool, _ *roachpb.Error) var _ batchExecutionFn = (*Replica).executeWriteBatch var _ batchExecutionFn = (*Replica).executeReadOnlyBatch @@ -155,68 +157,84 @@ func (r *Replica) executeBatchWithConcurrencyRetries( // Handle load-based splitting. r.recordBatchForLoadBasedSplitting(ctx, ba, spans) - // TODO(nvanbenschoten): Clean this up once it's pulled inside the - // concurrency manager. - var cleanup intentresolver.CleanupFunc + // Try to execute command; exit retry loop on success. + var g *concurrency.Guard defer func() { - if cleanup != nil { - // This request wrote an intent only if there was no error, the - // request is transactional, the transaction is not yet finalized, - // and the request wasn't read-only. - if pErr == nil && ba.Txn != nil && !br.Txn.Status.IsFinalized() && !ba.IsReadOnly() { - cleanup(nil, &br.Txn.TxnMeta) - } else { - cleanup(nil, nil) - } + // NB: wrapped to delay g evaluation to its value when returning. + if g != nil { + r.concMgr.FinishReq(g) } }() - - // Try to execute command; exit retry loop on success. for { // Exit loop if context has been canceled or timed out. if err := ctx.Err(); err != nil { return nil, roachpb.NewError(errors.Wrap(err, "aborted during Replica.Send")) } - // If necessary, the request may need to wait in the txn wait queue, - // pending updates to the target transaction for either PushTxn or - // QueryTxn requests. - // TODO(nvanbenschoten): Push this into the concurrency package. - br, pErr = r.maybeWaitForPushee(ctx, ba) - if br != nil || pErr != nil { - return br, pErr + // Acquire latches to prevent overlapping requests from executing until + // this request completes. After latching, wait on any conflicting locks + // to ensure that the request has full isolation during evaluation. This + // returns a request guard that must be eventually released. + var resp []roachpb.ResponseUnion + g, resp, pErr = r.concMgr.SequenceReq(ctx, g, concurrency.Request{ + Txn: ba.Txn, + Timestamp: ba.Timestamp, + Priority: ba.UserPriority, + ReadConsistency: ba.ReadConsistency, + Requests: ba.Requests, + Spans: spans, + }) + if pErr != nil { + return nil, pErr + } else if resp != nil { + br.Responses = resp + return br, nil } - // Acquire latches to prevent overlapping commands from executing until - // this command completes. - // TODO(nvanbenschoten): Replace this with a call into the upcoming - // concurrency package when it is introduced. - lg, err := r.beginCmds(ctx, ba, spans) - if err != nil { - return nil, roachpb.NewError(err) + if filter := r.store.cfg.TestingKnobs.TestingLatchFilter; filter != nil { + if pErr := filter(*ba); pErr != nil { + return nil, pErr + } } - br, pErr = fn(r, ctx, ba, spans, lg) + var async bool + br, async, pErr = fn(r, ctx, ba, spans, g) switch t := pErr.GetDetail().(type) { case nil: // Success. + if async { + // If the request is continuing to execute asynchronously, it + // has assumed responsibility for releasing the concurrency + // guard when it finishes. + g = nil + } return br, nil case *roachpb.WriteIntentError: - if cleanup, pErr = r.handleWriteIntentError(ctx, ba, pErr, t, cleanup); pErr != nil { + // Drop latches, but retains lock wait-queues. + if g, pErr = r.handleWriteIntentError(ctx, ba, g, pErr, t); pErr != nil { return nil, pErr } // Retry... case *roachpb.TransactionPushError: - if pErr = r.handleTransactionPushError(ctx, ba, pErr, t); pErr != nil { + // Drop latches, but retains lock wait-queues. + if g, pErr = r.handleTransactionPushError(ctx, ba, g, pErr, t); pErr != nil { return nil, pErr } // Retry... case *roachpb.IndeterminateCommitError: + // Drop latches and lock wait-queues. + r.concMgr.FinishReq(g) + g = nil + // Then launch a task to handle the indeterminate commit error. if pErr = r.handleIndeterminateCommitError(ctx, ba, pErr, t); pErr != nil { return nil, pErr } // Retry... case *roachpb.MergeInProgressError: + // Drop latches and lock wait-queues. + r.concMgr.FinishReq(g) + g = nil + // Then listen for the merge to complete. if pErr = r.handleMergeInProgressError(ctx, ba, pErr, t); pErr != nil { return nil, pErr } @@ -231,73 +249,24 @@ func (r *Replica) executeBatchWithConcurrencyRetries( func (r *Replica) handleWriteIntentError( ctx context.Context, ba *roachpb.BatchRequest, + g *concurrency.Guard, pErr *roachpb.Error, t *roachpb.WriteIntentError, - cleanup intentresolver.CleanupFunc, -) (intentresolver.CleanupFunc, *roachpb.Error) { +) (*concurrency.Guard, *roachpb.Error) { if r.store.cfg.TestingKnobs.DontPushOnWriteIntentError { - return cleanup, pErr + return g, pErr } - - // Process and resolve write intent error. - var pushType roachpb.PushTxnType - if ba.IsWrite() { - pushType = roachpb.PUSH_ABORT - } else { - pushType = roachpb.PUSH_TIMESTAMP - } - - index := pErr.Index - // Make a copy of the header for the upcoming push; we will update the - // timestamp. - h := ba.Header - if h.Txn != nil { - // We must push at least to h.Timestamp, but in fact we want to - // go all the way up to a timestamp which was taken off the HLC - // after our operation started. This allows us to not have to - // restart for uncertainty as we come back and read. - obsTS, ok := h.Txn.GetObservedTimestamp(ba.Replica.NodeID) - if !ok { - // This was set earlier in this method, so it's - // completely unexpected to not be found now. - log.Fatalf(ctx, "missing observed timestamp: %+v", h.Txn) - } - h.Timestamp.Forward(obsTS) - // We are going to hand the header (and thus the transaction proto) - // to the RPC framework, after which it must not be changed (since - // that could race). Since the subsequent execution of the original - // request might mutate the transaction, make a copy here. - // - // See #9130. - h.Txn = h.Txn.Clone() - } - - // Handle the case where we get more than one write intent error; - // we need to cleanup the previous attempt to handle it to allow - // any other pusher queued up behind this RPC to proceed. - if cleanup != nil { - cleanup(t, nil) - } - cleanup, pErr = r.store.intentResolver.ProcessWriteIntentError(ctx, pErr, h, pushType) - if pErr != nil { - // Do not propagate ambiguous results; assume success and retry original op. - if _, ok := pErr.GetDetail().(*roachpb.AmbiguousResultError); ok { - return cleanup, nil - } - // Propagate new error. Preserve the error index. - pErr.Index = index - return cleanup, pErr - } - // We've resolved the write intent; retry command. - return cleanup, nil + // g's latches will be dropped, but it retains its spot in lock wait-queues. + return r.concMgr.HandleWriterIntentError(ctx, g, t), nil } func (r *Replica) handleTransactionPushError( ctx context.Context, ba *roachpb.BatchRequest, + g *concurrency.Guard, pErr *roachpb.Error, t *roachpb.TransactionPushError, -) *roachpb.Error { +) (*concurrency.Guard, *roachpb.Error) { // On a transaction push error, retry immediately if doing so will enqueue // into the txnWaitQueue in order to await further updates to the unpushed // txn's status. We check ShouldPushImmediately to avoid retrying @@ -308,11 +277,10 @@ func (r *Replica) handleTransactionPushError( dontRetry = txnwait.ShouldPushImmediately(pushReq) } if dontRetry { - return pErr + return g, pErr } - // Enqueue unsuccessfully pushed transaction on the txnWaitQueue and retry. - r.txnWaitQueue.EnqueueTxn(&t.PusheeTxn) - return nil + // g's latches will be dropped, but it retains its spot in lock wait-queues. + return r.concMgr.HandleTransactionPushError(ctx, g, t), nil } func (r *Replica) handleIndeterminateCommitError( diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 129396e6c537..acae21508664 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -7293,24 +7293,24 @@ func TestReplicaAbandonProposal(t *testing.T) { t.Fatalf("expected AmbiguousResultError error; got %s (%T)", detail, detail) } - // The request should still be holding its latches. - latchInfoGlobal, _ := tc.repl.latchMgr.Info() - if w := latchInfoGlobal.WriteCount; w == 0 { - t.Fatal("expected non-empty latch manager") - } - - // Let the proposal be reproposed and go through. - atomic.StoreInt32(&dropProp, 0) - - // Even though we canceled the command it will still get executed and its - // latches cleaned up. - testutils.SucceedsSoon(t, func() error { - latchInfoGlobal, _ := tc.repl.latchMgr.Info() - if w := latchInfoGlobal.WriteCount; w != 0 { - return errors.Errorf("expected empty latch manager") - } - return nil - }) + // // The request should still be holding its latches. + // latchInfoGlobal, _ := tc.repl.latchMgr.Info() + // if w := latchInfoGlobal.WriteCount; w == 0 { + // t.Fatal("expected non-empty latch manager") + // } + + // // Let the proposal be reproposed and go through. + // atomic.StoreInt32(&dropProp, 0) + + // // Even though we canceled the command it will still get executed and its + // // latches cleaned up. + // testutils.SucceedsSoon(t, func() error { + // latchInfoGlobal, _ := tc.repl.latchMgr.Info() + // if w := latchInfoGlobal.WriteCount; w != 0 { + // return errors.Errorf("expected empty latch manager") + // } + // return nil + // }) } func TestNewReplicaCorruptionError(t *testing.T) { diff --git a/pkg/storage/replica_tscache.go b/pkg/storage/replica_tscache.go index 69f35a6d5830..924abab67551 100644 --- a/pkg/storage/replica_tscache.go +++ b/pkg/storage/replica_tscache.go @@ -43,6 +43,9 @@ func setTimestampCacheLowWaterMark( func (r *Replica) updateTimestampCache( ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse, pErr *roachpb.Error, ) { + if ba.ReadConsistency != roachpb.CONSISTENT { + return + } addToTSCache := r.store.tsCache.Add if util.RaceEnabled { addToTSCache = checkedTSCacheUpdate(r.store.Clock().Now(), r.store.tsCache, ba, br, pErr) diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index 17b4d97ed948..b330b9cf7618 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -18,9 +18,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" @@ -40,9 +40,6 @@ import ( // // Concretely, // -// - Latches for the keys affected by the command are acquired (i.e. -// tracked as in-flight mutations). -// - In doing so, we wait until no overlapping mutations are in flight. // - The timestamp cache is checked to determine if the command's affected keys // were accessed with a timestamp exceeding that of the command; if so, the // command's timestamp is incremented accordingly. @@ -62,19 +59,10 @@ import ( // as this method makes the assumption that it operates on a shallow copy (see // call to applyTimestampCache). func (r *Replica) executeWriteBatch( - ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, lg *spanlatch.Guard, -) (br *roachpb.BatchResponse, pErr *roachpb.Error) { + ctx context.Context, ba *roachpb.BatchRequest, spans *spanset.SpanSet, g *concurrency.Guard, +) (br *roachpb.BatchResponse, async bool, pErr *roachpb.Error) { startTime := timeutil.Now() - // Guarantee we release the provided latches if we never make it to - // passing responsibility to evalAndPropose. This is wrapped to delay - // pErr evaluation to its value when returning. - ec := endCmds{repl: r, lg: lg} - defer func() { - // No-op if we move ec into evalAndPropose. - ec.done(ctx, ba, br, pErr) - }() - // Determine the lease under which to evaluate the write. var lease roachpb.Lease var status storagepb.LeaseStatus @@ -85,7 +73,7 @@ func (r *Replica) executeWriteBatch( // Other write commands require that this replica has the range // lease. if status, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil { - return nil, pErr + return nil, false, pErr } lease = status.Lease } @@ -96,8 +84,8 @@ func (r *Replica) executeWriteBatch( // at proposal time, not at application time, because the spanlatch manager // will synchronize all requests (notably EndTxn with SplitTrigger) that may // cause this condition to change. - if err := r.checkExecutionCanProceed(ba, ec.lg, &status); err != nil { - return nil, roachpb.NewError(err) + if err := r.checkExecutionCanProceed(ba, g, &status); err != nil { + return nil, false, roachpb.NewError(err) } minTS, untrack := r.store.cfg.ClosedTimestamp.Tracker.Track(ctx) @@ -129,12 +117,12 @@ func (r *Replica) executeWriteBatch( // Checking the context just before proposing can help avoid ambiguous errors. if err := ctx.Err(); err != nil { log.VEventf(ctx, 2, "%s before proposing: %s", err, ba.Summary()) - return nil, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) + return nil, false, roachpb.NewError(errors.Wrap(err, "aborted before proposing")) } // After the command is proposed to Raft, invoking endCmds.done is the // responsibility of Raft, so move the endCmds into evalAndPropose. - ch, abandon, maxLeaseIndex, pErr := r.evalAndPropose(ctx, &lease, ba, spans, ec.move()) + ch, abandon, maxLeaseIndex, async, pErr := r.evalAndPropose(ctx, &lease, ba, spans, g) if pErr != nil { if maxLeaseIndex != 0 { log.Fatalf( @@ -142,7 +130,7 @@ func (r *Replica) executeWriteBatch( maxLeaseIndex, ba, pErr, ) } - return nil, pErr + return nil, false, pErr } // A max lease index of zero is returned when no proposal was made or a lease was proposed. // In both cases, we don't need to communicate a MLAI. Furthermore, for lease proposals we @@ -202,7 +190,7 @@ func (r *Replica) executeWriteBatch( log.Warning(ctx, err) } } - return propResult.Reply, propResult.Err + return propResult.Reply, async, propResult.Err case <-slowTimer.C: slowTimer.Read = true r.store.metrics.SlowRaftRequests.Inc(1) @@ -228,14 +216,14 @@ and the following Raft status: %+v`, abandon() log.VEventf(ctx, 2, "context cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) - return nil, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())) + return nil, async, roachpb.NewError(roachpb.NewAmbiguousResultError(ctx.Err().Error())) case <-shouldQuiesce: // If shutting down, return an AmbiguousResultError, which indicates // to the caller that the command may have executed. abandon() log.VEventf(ctx, 2, "shutdown cancellation after %0.1fs of attempting command %s", timeutil.Since(startTime).Seconds(), ba) - return nil, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")) + return nil, async, roachpb.NewError(roachpb.NewAmbiguousResultError("server shutdown")) } } } @@ -431,6 +419,19 @@ func (r *Replica) evaluate1PC( } } + // Even though the transaction is 1PC and hasn't written any intents, it + // may have acquired unreplicated locks, so inform the concurrency manager + // that it is finalized. + res.Local.UpdatedTxns = []*roachpb.Transaction{clonedTxn} + res.Local.ResolvedIntents = make([]roachpb.Intent, len(etArg.IntentSpans)) + for i, sp := range etArg.IntentSpans { + res.Local.ResolvedIntents[i] = roachpb.Intent{ + Span: sp, + Txn: clonedTxn.TxnMeta, + Status: clonedTxn.Status, + } + } + // Add placeholder responses for end transaction requests. br.Add(&roachpb.EndTxnResponse{OnePhaseCommit: true}) br.Txn = clonedTxn diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 18296b763c12..d2d0983f5804 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -43,6 +43,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/closedts/ctpb" "github.com/cockroachdb/cockroach/pkg/storage/cloud" "github.com/cockroachdb/cockroach/pkg/storage/compactor" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/idalloc" @@ -2052,6 +2053,9 @@ func (s *Store) Gossip() *gossip.Gossip { return s.cfg.Gossip } // Compactor accessor. func (s *Store) Compactor() *compactor.Compactor { return s.compactor } +// IntentResolver accessor. +func (s *Store) IntentResolver() concurrency.IntentResolver { return s.intentResolver } + // Stopper accessor. func (s *Store) Stopper() *stop.Stopper { return s.stopper } @@ -2556,6 +2560,11 @@ func (s *Store) GetTxnWaitMetrics() *txnwait.Metrics { return s.txnWaitMetrics } +// GetTxnWaitKnobs is part of concurrency.StoreInterface. +func (s *Store) GetSlowLatchGauge() *metric.Gauge { + return s.metrics.SlowLatchRequests +} + func init() { tracing.RegisterTagRemapping("s", "store") } diff --git a/pkg/storage/store_merge.go b/pkg/storage/store_merge.go index 5a75dc86dc16..a0e2074bec04 100644 --- a/pkg/storage/store_merge.go +++ b/pkg/storage/store_merge.go @@ -85,9 +85,9 @@ func (s *Store) MergeRange( leftRepl.writeStats.resetRequestCounts() } - // Clear the wait queue to redirect the queued transactions to the - // left-hand replica, if necessary. - rightRepl.txnWaitQueue.Clear(true /* disable */) + // Clear the concurrency manager's txn wait queue to redirect the queued + // transactions to the left-hand replica, if necessary. + rightRepl.concMgr.OnMerge() leftLease, _ := leftRepl.GetLease() rightLease, _ := rightRepl.GetLease() diff --git a/pkg/storage/store_send.go b/pkg/storage/store_send.go index ce36e965dbe7..0b0d7f9b7b09 100644 --- a/pkg/storage/store_send.go +++ b/pkg/storage/store_send.go @@ -234,39 +234,3 @@ func (s *Store) Send( } return nil, pErr } - -// maybeWaitForPushee potentially diverts the incoming request to -// the txnwait.Queue, where it will wait for updates to the target -// transaction. -// TODO(nvanbenschoten): Move this method. -func (r *Replica) maybeWaitForPushee( - ctx context.Context, ba *roachpb.BatchRequest, -) (*roachpb.BatchResponse, *roachpb.Error) { - // If this is a push txn request, check the push queue first, which - // may cause this request to wait and either return a successful push - // txn response or else allow this request to proceed. - if ba.IsSinglePushTxnRequest() { - if r.store.cfg.TestingKnobs.DontRetryPushTxnFailures { - return nil, nil - } - pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest) - pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, pushReq) - if pErr != nil { - return nil, pErr - } else if pushResp != nil { - br := &roachpb.BatchResponse{} - br.Add(pushResp) - return br, nil - } - } else if ba.IsSingleQueryTxnRequest() { - // For query txn requests, wait in the txn wait queue either for - // transaction update or for dependent transactions to change. - queryReq := ba.Requests[0].GetInner().(*roachpb.QueryTxnRequest) - pErr := r.txnWaitQueue.MaybeWaitForQuery(ctx, queryReq) - if pErr != nil { - return nil, pErr - } - } - - return nil, nil -} diff --git a/pkg/storage/store_split.go b/pkg/storage/store_split.go index d57a03fcc310..833c1664c573 100644 --- a/pkg/storage/store_split.go +++ b/pkg/storage/store_split.go @@ -281,7 +281,7 @@ func (s *Store) SplitRange( // appropriate. We do this after setDescWithoutProcessUpdate // to ensure that no pre-split commands are inserted into the // txnWaitQueue after we clear it. - leftRepl.txnWaitQueue.Clear(false /* disable */) + leftRepl.concMgr.OnSplit() // The rangefeed processor will no longer be provided logical ops for // its entire range, so it needs to be shut down and all registrations