diff --git a/pkg/storage/batcheval/cmd_resolve_intent.go b/pkg/storage/batcheval/cmd_resolve_intent.go index 5b0836545361..184b7604147b 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent.go +++ b/pkg/storage/batcheval/cmd_resolve_intent.go @@ -28,6 +28,8 @@ func init() { func declareKeysResolveIntentCombined( desc *roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { + // TODO(nvanbenschoten): declare this span at the txn's MinTimestamp. See + // lockTable.UpdateLocks for more. DefaultDeclareKeys(desc, header, req, spans) var status roachpb.TransactionStatus var txnID uuid.UUID diff --git a/pkg/storage/concurrency/concurrency_control.go b/pkg/storage/concurrency/concurrency_control.go new file mode 100644 index 000000000000..383761f05cbb --- /dev/null +++ b/pkg/storage/concurrency/concurrency_control.go @@ -0,0 +1,714 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package concurrency provides a concurrency manager structure that +// encapsulates the details of concurrency control and contention handling for +// serializable key-value transactions. +package concurrency + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/uuid" +) + +// Manager is a structure that sequences incoming requests and provides +// isolation between requests that intend to perform conflicting operations. +// During sequencing, conflicts are discovered and any found are resolved +// through a combination of passive queuing and active pushing. Once a request +// has been sequenced, it is free to evaluate without concerns of conflicting +// with other in-flight requests due to the isolation provided by the manager. +// This isolation is guaranteed for the lifetime of the request but terminates +// once the request completes. +// +// Transactions require isolation both within requests and across requests. The +// manager accommodates this by allowing transactional requests to acquire +// locks, which outlive the requests themselves. Locks extend the duration of +// the isolation provided over specific keys to the lifetime of the lock-holder +// transaction itself. They are (typically) only released when the transaction +// commits or aborts. Other requests that find these locks while being sequenced +// wait on them to be released in a queue before proceeding. Because locks are +// checked during sequencing, requests are guaranteed access to all declared +// keys after they have been sequenced. In other words, locks don't need to be +// checked again during evaluation. +// +// However, at the time of writing, not all locks are stored directly under the +// manager's control, so not all locks are discoverable during sequencing. +// Specifically, write intents (replicated, exclusive locks) are stored inline +// in the MVCC keyspace, so they are not detectable until request evaluation +// time. To accommodate this form of lock storage, the manager exposes a +// HandleWriterIntentError method, which can be used in conjunction with a retry +// loop around evaluation to integrate external locks with the concurrency +// manager structure. In the future, we intend to pull all locks, including +// those associated with write intents, into the concurrency manager directly +// through a replicated lock table structure. +// +// Fairness is ensured between requests. In general, if any two requests +// conflict then the request that arrived first will be sequenced first. As +// such, sequencing guarantees FIFO semantics. The primary exception to this is +// that a request that is part of a transaction which has already acquired a +// lock does not need to wait on that lock during sequencing, and can therefore +// ignore any queue that has formed on the lock. For other exceptions, see the +// later comment for lockTable. +// +// Internal Components +// +// The concurrency manager is composed of a number of internal synchronization, +// bookkeeping, and queueing structures. Each of these is discussed in more +// detail on their interface definition. The following diagram details how the +// components are tied together: +// +// +---------------------+---------------------------------------------+ +// | concurrency.Manager | | +// +---------------------+ | +// | | +// +------------+ acquire +--------------+ acquire | +// Sequence() |--->--->---| latchManager |<---<---<---<---<---<---+ | +// +------------+ +--------------+ | | +// | / check locks + wait queues | | +// | v if conflict, enter q & drop latches ^ | +// | +---------------------------------------------------+ | | +// | | [ lockTable ] | | | +// | | [ key1 ] -------------+-----------------+ | ^ | +// | | [ key2 ] / lockState: | lockWaitQueue: |----<---<---<----+ +// | | [ key3 ]-{ - lock type | +-[a]<-[b]<-[c] | | | | | +// | | [ key4 ] \ - txn meta | | (no latches) |-->-^ | | +// | | [ key5 ] -------------+-|---------------+ | | | +// | | [ ... ] v | | ^ +// | +---------------------------------|-----------------+ | | if lock found, HandleWriterIntentError() +// | | | | | - enter lockWaitQueue +// | | +- may be remote -+--+ | | - drop latches +// | | | | | | - wait for lock update / release +// | v v ^ | | +// | | +--------------------------+ | ^ +// | | | txnWaitQueue: | | | +// | | | (located on txn record's | | | +// | v | leaseholder replica) | | | +// | | |--------------------------| | ^ +// | | | [txn1] [txn2] [txn3] ... |----<---<---<---<----+ | +// | | +--------------------------+ | | if txn push failed, HandleTransactionPushError() +// | | | | - enter txnWaitQueue +// | | | ^ - drop latches +// | | | | - wait for txn record update +// | | | | | +// | | | | | +// | +--> retain latches --> remain at head of queues ---> evaluate ---> Finish() +// | | +// +----------+ | +// Finish() | ---> exit wait queues ---> drop latches -----------------> respond ... +// +----------+ | +// | | +// +-------------------------------------------------------------------+ +// +// See the comments on individual components for a more detailed look at their +// interface and inner-workings. +// +// At a high-level, a request enter the concurrency manager and immediately +// acquire latches from the latchManager to serialize access to the keys that +// they intend to touch. This latching takes into account the keys being +// accessed, the MVCC timestamp of accesses, and the access method being used +// (read vs. write) to allow for concurrency where possible. This has the effect +// of queuing on conflicting in-flight operations until their completion. +// +// Once latched, the request consults the lockTable to check for any conflicting +// locks owned by other transactions. If any are found, the request enters the +// corresponding lockWaitQueue and its latches are dropped. Requests in the +// queue wait for the corresponding lock to be released by intent resolution. +// While waiting, the head of the lockWaitQueue pushes the owner of the lock +// through a remote RPC that ends up in the pushee's txnWaitQueue. This queue +// exists on the leaseholder replica of the range that contains the pushee's +// transaction record. Other entries in the queue wait for the head of the +// queue, eventually pushing it to detect coordinator failures and transaction +// deadlocks. Once the lock is released, the head of the queue reacquires +// latches and attempts to proceed while remaining at the head of that +// lockWaitQueue to ensure fairness. +// +// Once a request is latched and observes no conflicting locks in the lockTable +// and no conflicting lockWaitQueues that it is not already the head of, the +// request can proceed to evaluate. During evaluation, the request may insert or +// remove locks from the lockTable for its own transaction. +// +// When the request completes, it exits any lockWaitQueues that it was a part of +// and releases its latches. However, if the request was successful, any locks +// that it inserted into the lockTable remain. +type Manager interface { + requestSequencer + contentionHandler + lockManager + transactionManager + rangeStateListener + metricExporter +} + +// requestSequencer is concerned with the sequencing of concurrent requests. +type requestSequencer interface { + // SequenceReq acquires latches, checks for locks, and queues behind and/or + // pushes other transactions to resolve any conflicts. Once sequenced, the + // request is guaranteed sufficient isolation for the duration of its + // evaluation, until the returned request guard is released. + // NOTE: this last part will not be true until replicated locks are pulled + // into the concurrency manager. + // + // An optional existing request guard can be provided to SequenceReq. This + // allows the request's position in lock wait-queues to be retained across + // sequencing attempts. If provided, the guard should not be holding latches + // already. The expected usage of this parameter is that it will only be + // provided after acquiring a Guard from a contentionHandler method. + // + // If the method returns a non-nil request guard then the caller must ensure + // that the guard is eventually released by passing it to FinishReq. + // + // Alternatively, the concurrency manager may be able to serve the request + // directly, in which case it will return a Response for the request. If it + // does so, it will not return a request guard. + SequenceReq(context.Context, *Guard, Request) (*Guard, Response, *Error) + + // FinishReq marks the request as complete, releasing any protection + // the request had against conflicting requests and allowing conflicting + // requests that are blocked on this one to proceed. The guard should not + // be used after being released. + FinishReq(*Guard) +} + +// contentionHandler is concerned with handling contention-related errors. This +// typically involves preparing the request to be queued upon a retry. +type contentionHandler interface { + // HandleWriterIntentError consumes a WriteIntentError by informing the + // concurrency manager about the replicated write intent that was missing + // from its lock table which was found during request evaluation (while + // holding latches). After doing so, it enqueues the request that hit the + // error in the lock's wait-queue (but does not wait) and releases the + // guard's latches. It returns an updated guard reflecting this change. + // After the method returns, the original guard should no longer be used. + HandleWriterIntentError(context.Context, *Guard, *roachpb.WriteIntentError) *Guard + + // HandleTransactionPushError consumes a TransactionPushError by informing + // the concurrency manager about a transaction record that could not be + // pushed during request evaluation (while holding latches). After doing so, + // it releases the guard's latches. It returns an updated guard reflecting + // this change. After the method returns, the original guard should no + // longer be used. + HandleTransactionPushError(context.Context, *Guard, *roachpb.TransactionPushError) *Guard +} + +// lockManager is concerned with tracking locks that are stored on the manager's +// range. +type lockManager interface { + // OnLockAcquired informs the concurrency manager that a transaction has + // acquired a new lock or updated an existing lock that it already held. + OnLockAcquired(context.Context, roachpb.Intent) + + // OnLockUpdated informs the concurrency manager that a transaction has + // updated or released a lock or range of locks that it previously held. + OnLockReleased(context.Context, roachpb.Intent) +} + +// transactionManager is concerned with tracking transactions that have their +// record stored on the manager's range. +type transactionManager interface { + // OnTransactionUpdated informs the concurrency manager that a transaction's + // status was updated by a successful transaction state transition. + OnTransactionUpdated(context.Context, *roachpb.Transaction) + + // GetDependents returns a set of transactions waiting on the specified + // transaction either directly or indirectly. The method is used to perform + // deadlock detection. See txnWaitQueue for more. + GetDependents(uuid.UUID) []uuid.UUID +} + +// rangeStateListener is concerned with observing updates to the concurrency +// manager's range. +type rangeStateListener interface { + // OnDescriptorUpdated informs the manager that its range's descriptor has + // been updated. + OnDescriptorUpdated(*roachpb.RangeDescriptor) + + // OnLeaseUpdated informs the concurrency manager that its range's lease has + // been updated. The argument indicates whether this manager's replica is + // the leaseholder going forward. + OnLeaseUpdated(isleaseholder bool) + + // OnSplit informs the concurrency manager that its range has split off a + // new range to its RHS. + OnSplit() + + // OnMerge informs the concurrency manager that its range has merged into + // its LHS neighbor. This is not called on the LHS range being merged into. + OnMerge() +} + +// metricExporter is concerned with providing observability into the state of +// the concurrency manager. +type metricExporter interface { + // TODO(nvanbenschoten): fill out this interface to provide observability + // into the state of the concurrency manager. + // LatchMetrics() + // LockTableMetrics() + // TxnWaitQueueMetrics() +} + +/////////////////////////////////// +// External API Type Definitions // +/////////////////////////////////// + +// Request is the input to Manager.SequenceReq. The struct contains all of the +// information necessary to sequence a KV request and determine which locks and +// other in-flight requests it conflicts with. +type Request struct { + // The (optional) transaction that sent the request. + Txn *roachpb.Transaction + + // The timestamp that the request should evaluate at. + // Should be set to Txn.ReadTimestamp if Txn is non-nil. + Timestamp hlc.Timestamp + + // The priority of the request. Only set if Txn is nil. + Priority roachpb.UserPriority + + // The consistency level of the request. Only set if Txn is nil. + ReadConsistency roachpb.ReadConsistencyType + + // The individual requests in the batch. + Requests []roachpb.RequestUnion + + // The maximal set of spans that the request will access. + Spans *spanset.SpanSet +} + +// Guard is returned from Manager.SequenceReq. The guard is passed back in to +// Manager.FinishReq to release the request's resources when it has completed. +type Guard struct { + req Request + lag latchGuard + ltg lockTableGuard +} + +// Response is a slice of responses to requests in a batch. This type is used +// when the concurrency manager is able to respond to a request directly during +// sequencing. +type Response = []roachpb.ResponseUnion + +// Error is an alias for a roachpb.Error. +type Error = roachpb.Error + +/////////////////////////////////// +// Internal Structure Interfaces // +/////////////////////////////////// + +// latchManager serializes access to keys and key ranges. +// +// See additional documentation in pkg/storage/spanlatch. +type latchManager interface { + // Acquires latches, providing mutual exclusion for conflicting requests. + Acquire(context.Context, Request) (latchGuard, *Error) + + // Releases latches, relinquish its protection from conflicting requests. + Release(latchGuard) +} + +// latchGuard is a handle to a set of acquired key latches. +type latchGuard interface{} + +// lockTable holds a collection of locks acquired by in-progress transactions. +// Each lock in the table has a possibly-empty lock wait-queue associated with +// it, where conflicting transactions can queue while waiting for the lock to be +// released. +// +// +---------------------------------------------------+ +// | [ lockTable ] | +// | [ key1 ] -------------+-----------------+ | +// | [ key2 ] / lockState: | lockWaitQueue: | | +// | [ key3 ]-{ - lock type | <-[a]<-[b]<-[c] | | +// | [ key4 ] \ - txn meta | | | +// | [ key5 ] -------------+-----------------+ | +// | [ ... ] | +// +---------------------------------------------------+ +// +// The database is read and written using "requests". Transactions are composed +// of one or more requests. Isolation is needed across requests. Additionally, +// since transactions represent a group of requests, isolation is needed across +// such groups. Part of this isolation is accomplished by maintaining multiple +// versions and part by allowing requests to acquire locks. Even the isolation +// based on multiple versions requires some form of mutual exclusion to ensure +// that a read and a conflicting lock acquisition do not happen concurrently. +// The lock table provides both locking and sequencing of requests (in concert +// with the use of latches). +// +// Locks outlive the requests themselves and thereby extend the duration of the +// isolation provided over specific keys to the lifetime of the lock-holder +// transaction itself. They are (typically) only released when the transaction +// commits or aborts. Other requests that find these locks while being sequenced +// wait on them to be released in a queue before proceeding. Because locks are +// checked during sequencing, requests are guaranteed access to all declared +// keys after they have been sequenced. In other words, locks don't need to be +// checked again during evaluation. +// +// However, at the time of writing, not all locks are stored directly under +// lock table control, so not all locks are discoverable during sequencing. +// Specifically, write intents (replicated, exclusive locks) are stored inline +// in the MVCC keyspace, so they are often not detectable until request +// evaluation time. To accommodate this form of lock storage, the lock table +// exposes an AddDiscoveredLock method. In the future, we intend to pull all +// locks, including those associated with write intents, into the lock table +// directly. +// +// The lock table also provides fairness between requests. If two requests +// conflict then the request that arrived first will typically be sequenced +// first. There are some exceptions: +// +// - a request that is part of a transaction which has already acquired a lock +// does not need to wait on that lock during sequencing, and can therefore +// ignore any queue that has formed on the lock. +// +// - contending requests that encounter different levels of contention may be +// sequenced in non-FIFO order. This is to allow for more concurrency. e.g. +// if request R1 and R2 contend on key K2, but R1 is also waiting at key K1, +// R2 could slip past R1 and evaluate. +// +type lockTable interface { + // ScanAndEnqueue scans over the spans that the request will access and + // enqueues the request in the lock wait-queue of any conflicting locks + // encountered. + // + // The first call to ScanAndEnqueue for a given request uses a nil + // lockTableGuard and the subsequent calls reuse the previously returned + // one. The latches needed by the request must be held when calling this + // function. + ScanAndEnqueue(Request, lockTableGuard) lockTableGuard + + // Dequeue removes the request from its lock wait-queues. It should be + // called when the request is finished, whether it evaluated or not. + // + // This method does not release any locks. This method must be called on the + // last guard returned from ScanAndEnqueue for the request, even if one of + // the (a) lockTable calls that use a lockTableGuard parameter, or (b) a + // lockTableGuard call, returned an error. The method allows but does not + // require latches to be held. + Dequeue(lockTableGuard) + + // AddDiscoveredLock informs the lockTable of a lock that was discovered + // during evaluation which the lockTable wasn't previously tracking. + // + // The method is called when an exclusive replicated lock held by a + // different transaction is discovered when reading the MVCC keys during + // evaluation of this request. It adds the lock and enqueues this requester + // in its wait-queue. It is required that request evaluation discover such + // locks before acquiring its own locks, since the request needs to repeat + // ScanAndEnqueue. + // + // A latch consistent with the access desired by the guard must be held on + // the span containing the discovered lock's key. + AddDiscoveredLock(*roachpb.Intent, lockTableGuard) error + + // AcquireLock informs the lockTable that a new lock was acquired or an + // existing lock was updated. + // + // The provided TxnMeta must be the same one used when the request scanned + // the lockTable initially. It must only be called in the evaluation phase + // before calling Dequeue, which means all the latches needed by the request + // are held. The key must be in the request's SpanSet with the appropriate + // SpanAccess: currently the strength is always Exclusive, so the span + // containing this key must be SpanReadWrite. This contract ensures that the + // lock is not held in a conflicting manner by a different transaction. + // Acquiring a lock that is already held by this transaction upgrades the + // lock's timestamp and strength, if necessary. + // + // For replicated locks, this must be called after the corresponding write + // intent has been applied to the replicated state machine. + AcquireLock(*enginepb.TxnMeta, roachpb.Key, lock.Strength, lock.Durability) error + + // UpdateLocks informs the lockTable that an existing lock or range of locks + // was either updated or released. + // + // The method is called during intent resolution. For spans containing + // Replicated locks, this must be called after intent resolution has been + // applied to the replicated state machine. + // + // A latch with SpanReadWrite must be held on span with the lowest timestamp + // at which any of the locks could be held. This is explained below. + // + // Note that spans can be wider than the actual keys on which locks were + // acquired, and it is ok if no locks are found or locks held by other + // transactions are found (for those lock this call is a noop). + // + // For COMMITTED or ABORTED transactions, all locks are released. + // + // For PENDING or STAGING transactions, the behavior is: + // + // - All replicated locks known to the lockTable are dropped. This is not + // because those intents are necessarily deleted, but because in the + // current code where intents are not managed by the lockTable (this will + // change when we have a segregated lock table), we do not want to risk + // code divergence between lockTable and mvccResolveWriteIntent: the + // danger is that the latter removes or changes an intent while the + // lockTable retains it, and a waiter is stuck forever. + // + // Note that even the conservative behavior of dropping locks requires + // that intent resolution acquire latches using the oldest timestamp at + // which the intent could have been written: if the intent was at ts=5 and + // the intent resolution is using ts=10 (since the transaction has been + // pushed), there is a race where a reader at ts=8 can be concurrently + // holding latches and the following bad sequence occurs (both thread1 and + // thread2 are concurrent since their latches do not conflict): + // + // - [thread1-txn1] reader sees intent at ts=5 + // - [thread2-txn2] intent resolution changes that intent to ts=10 + // - [thread2-txn2] updateLocks is called and lock is removed since it is a + // replicated lock. + // - [thread1-txn1] reader calls addDiscoveredLock() for ts=5. + // + // Now the lockTable thinks there is a lock and subsequent pushes of txn2 + // by txn1 will do nothing since the txn2 is already at timestamp 10. Txn1 + // will unnecessarily block until txn2 is done. + // + // - Unreplicated locks: + // - for epochs older than txn.Epoch, locks are dropped. + // - locks in the current epoch that are at a TxnMeta.Sequence + // contained in IgnoredSeqNums are dropped. + // - the remaining locks are changed to timestamp equal to + // txn.WriteTimestamp. + UpdateLocks(*roachpb.Intent) error + + // Clear removes all locks and lock wait-queues from the lockTable. + Clear() +} + +// lockTableGuard is a handle to a request as it waits on conflicting locks in a +// lockTable or as it holds a place in lock wait-queues as it evaluates. +type lockTableGuard interface { + // ShouldWait must be called after each ScanAndEnqueue. The request should + // proceed to evaluation if it returns false, else it releases latches and + // listens to the channel returned by NewStateChan. + ShouldWait() bool + + // NewStateChan returns the channel to listen on for notification that the + // state may have changed. If ShouldWait returns true, this channel will + // have an initial notification. Note that notifications are collapsed if + // not retrieved, since it is not necessary for the waiter to see every + // state transition. + NewStateChan() <-chan struct{} + + // CurState returns the latest waiting state. + CurState() waitingState +} + +// lockTableWaiter is concerned with waiting in lock wait-queues for locks held +// by conflicting transactions. It ensures that waiting requests continue to +// make forward progress even in the presence of faulty transaction coordinators +// and transaction deadlocks. +// +// The waiter implements logic for a request to wait on conflicting locks in the +// lockTable until they are released. Similarly, it implements logic to wait on +// conflicting requests ahead of the caller's request in any lock wait-queues +// that it is a part of. +// +// This waiting state responds to a set of state transitions in the lock table: +// - a conflicting lock is released +// - a conflicting lock is updated such that it no longer conflicts +// - a conflicting request in the lock wait-queue acquires the lock +// - a conflicting request in the lock wait-queue exits the lock wait-queue +// +// These state transitions are typically reactive - the waiter can simply wait +// for locks to be released or lock wait-queues to be exited by other actors. +// Reacting to state transitions for conflicting locks is powered by the +// lockManager and reacting to state transitions for conflicting lock +// wait-queues is powered by the requestSequencer interface. +// +// However, in the case of transaction coordinator failures or transaction +// deadlocks, a state transition may never occur without intervention from the +// waiter. To ensure forward-progress, the waiter may need to actively push +// either a lock holder of a conflicting lock or the head of a conflicting lock +// wait-queue. This active pushing requires an RPC to the leaseholder of the +// conflicting transaction's record, and will typically result in the RPC +// queuing in that leaseholder's txnWaitQueue. Because this can be expensive, +// the push is not immediately performed. Instead, it is only performed after a +// delay. +type lockTableWaiter interface { + // WaitOn accepts and waits on a lockTableGuard that has returned true from + // ShouldWait. + // + // The method should be called after dropping any latches that a request + // has acquired. It returns when the request is at the front of all lock + // wait-queues and it is safe to re-acquire latches and scan the lockTable + // again. + WaitOn(context.Context, Request, lockTableGuard) *Error +} + +// txnWaitQueue holds a collection of wait-queues for transaction records. +// Conflicting transactions, known as "pushers", sit in a queue associated with +// a extant transaction that they conflict with, known as the "pushee", and wait +// for the pushee transaction to commit or abort. +// +// Typically, waiting for a pushee's transaction record to undergo a state +// transition is sufficient to satisfy a pusher transaction. Reacting to state +// transitions for conflicting transactions is powered by the transactionManager +// interface. +// +// Just like with the lockTableWaiter, there are cases where reacting to state +// transitions alone in insufficient to make forward progress. However, unlike +// with the lockTableWaiter, the location of the txnWaitQueue on the range +// containing the conflicting transaction's record instead of on the range +// containing the conflicting transaction's lock presents an opportunity to +// actively resolve these situations. This is because a transaction's record +// reflects its authoritative status. +// +// The first of these situations is failure of the conflicting transaction's +// coordinator. This situation comes in two flavors: +// - before a transaction has been finalized (committed or aborted) +// - after a transaction has been finalized but before all of its intents have +// been resolved +// +// In the first of these flavors, the transaction record may still have a +// PENDING status. Without a live transaction coordinator heartbeating it, the +// record will eventually expire and be abortable. The the second of these +// flavors, the transaction's record will already be committed or aborted. +// Regardless of which case the push falls into, once the transaction record +// is observed in a finalized state, the push will succeed, kick off intent +// resolution, and return to the sender. +// +// The second of these situations is transaction deadlock. Deadlocks occur when +// the lock acquisition patterns of two or more transactions interact in such a +// way that a cycle emerges in the "waits-for" graph of transactions. To break +// this cycle, one of the transactions much be aborted or it is impossible for +// any of the transactions that are part of the deadlock to continue making +// progress. +// +// The txnWaitQueue provides a mechanism for detecting these cycles across a +// distributed graph of transactions. Distributed deadlock detection works by +// having each pusher transaction that is waiting in the queue for a different +// transaction periodically query its own record using a QueryTxn request. While +// on the pusher's own transaction record range, the QueryTxn request uses the +// GetDependents method to collect the IDs of all locally-known transactions +// that are waiting for the pusher itself to release its locks. Of course, this +// local view of the dependency graph is incomplete, as it does not initially +// take into consideration transitive dependencies. To address this, when the +// QueryTxn returns to the initial txnWaitQueue, the pusher records its own +// dependencies as dependencies of its pushee transaction. As this process +// continues and pushers periodically query for their own dependencies and +// transfer these to their pushee, each txnWaitQueue accumulate more information +// about the global "waits-for" graph. Eventually, one of the txnWaitQueues is +// able to observe a full cycle in this graph and aborts one of the transactions +// in the cycle to break the deadlock. +// +// Example of Distributed Deadlock Detection +// +// The following diagream demonstrates how the txnWaitQueue interacts with +// distributed deadlock detection. +// +// - txnA enters txnB's txnWaitQueue during a PushTxn request (MaybeWaitForPush) +// - txnB enters txnC's txnWaitQueue during a PushTxn request (MaybeWaitForPush) +// - txnC enters txnA's txnWaitQueue during a PushTxn request (MaybeWaitForPush) +// +// .-----------------------------------. +// | | +// v | +// [txnA record] --> [txnB record] --> [txnC record] +// deps: deps: deps: +// - txnC - txnA - txnB +// +// - txnA queries its own txnWaitQueue using a QueryTxn request (MaybeWaitForQuery) +// +// .-----------------------------------. +// | ............ | +// v v . | +// [txnA record] --> [txnB record] --> [txnC record] +// deps: deps: deps: +// - txnC - txnA - txnB +// +// - txnA finds that txnC is a dependent. It transfers this dependency to txnB +// +// .-----------------------------------. +// | | +// v | +// [txnA record] --> [txnB record] --> [txnC record] +// deps: deps: deps: +// - txnC - txnA - txnB +// - txnC +// +// - txnC queries its own txnWaitQueue using a QueryTxn request (MaybeWaitForQuery) +// - txnB queries its own txnWaitQueue using a QueryTxn request (MaybeWaitForQuery) +// - txnC finds that txnB is a dependent. It transfers this dependency to txnA +// - txnB finds that txnA and txnC are dependents. It transfers these dependencies to txnC +// +// .-----------------------------------. +// | | +// v | +// [txnA record] --> [txnB record] --> [txnC record] +// deps: deps: deps: +// - txnC - txnA - txnB +// - txnB - txnC - txnA +// - txnC +// +// - txnB notices that txnC is a transitive dependency of itself. This indicates +// a cycle in the global wait-for graph. txnC is aborted, breaking the cycle +// and the deadlock +// +// [txnA record] --> [txnB record] --> [txnC record: ABORTED] +// +// - txnC releases its locks and the transactions proceed in order. +// +// [txnA record] --> [txnB record] --> (free to commit) +// +// TODO(nvanbenschoten): if we exposed a "queue guard" interface, we could make +// stronger guarantees around cleaning up enqueued txns when there are no +// waiters. +type txnWaitQueue interface { + // EnqueueTxn creates a queue associated with the provided transaction. Once + // a queue is established, pushers of this transaction can wait in the queue + // and will be informed of state transitions that the transaction undergoes. + EnqueueTxn(*roachpb.Transaction) + + // UpdateTxn informs the queue that the provided transaction has undergone + // a state transition. This will be communicated to any waiting pushers. + UpdateTxn(context.Context, *roachpb.Transaction) + + // GetDependents returns a set of transactions waiting on the specified + // transaction either directly or indirectly. The method is used to perform + // deadlock detection. + GetDependents(uuid.UUID) []uuid.UUID + + // MaybeWaitForPush checks whether there is a queue already established for + // transaction being pushed by the provided request. If not, or if the + // PushTxn request isn't queueable, the method returns immediately. If there + // is a queue, the method enqueues this request as a waiter and waits for + // the transaction to be pushed/finalized. + // + // If the transaction is successfully pushed while this method is waiting, + // the first return value is a non-nil PushTxnResponse object. + MaybeWaitForPush(context.Context, *roachpb.PushTxnRequest) (*roachpb.PushTxnResponse, *Error) + + // MaybeWaitForQuery checks whether there is a queue already established for + // transaction being queried. If not, or if the QueryTxn request hasn't + // specified WaitForUpdate, the method returns immediately. If there is a + // queue, the method enqueues this request as a waiter and waits for any + // updates to the target transaction. + MaybeWaitForQuery(context.Context, *roachpb.QueryTxnRequest) *Error + + // Enable allows transactions to be enqueued and waiting pushers added. + // The method is idempotent. + Enable() + + // Clear empties all queues and causes all waiters to return. If disable is + // true, future transactions may not be enqueued or waiting pushers added. + Clear(disable bool) +} + +// Silence unused warnings until this package is used. +var _ = Manager(nil) +var _ = latchManager(nil) +var _ = lockTableWaiter(nil) +var _ = txnWaitQueue(nil) +var _ = Guard{req: Request{}, lag: nil, ltg: nil} +var _ = latchManagerImpl{} diff --git a/pkg/storage/concurrency/latch_manager.go b/pkg/storage/concurrency/latch_manager.go new file mode 100644 index 000000000000..9033e4d04cce --- /dev/null +++ b/pkg/storage/concurrency/latch_manager.go @@ -0,0 +1,35 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package concurrency + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/spanlatch" +) + +// latchManagerImpl implements the latchManager interface. +type latchManagerImpl struct { + m spanlatch.Manager +} + +func (m *latchManagerImpl) Acquire(ctx context.Context, req Request) (latchGuard, *Error) { + lg, err := m.m.Acquire(ctx, req.Spans) + if err != nil { + return nil, roachpb.NewError(err) + } + return lg, nil +} + +func (m *latchManagerImpl) Release(lg latchGuard) { + m.m.Release(lg.(*spanlatch.Guard)) +} diff --git a/pkg/storage/concurrency/lock_table.go b/pkg/storage/concurrency/lock_table.go index 959edaee7ade..85b5f8cb0267 100644 --- a/pkg/storage/concurrency/lock_table.go +++ b/pkg/storage/concurrency/lock_table.go @@ -29,156 +29,6 @@ import ( "github.com/google/btree" ) -// The database is read and written using "requests". Transactions are -// composed of one or more requests. Isolation is needed across requests. -// Additionally, since transactions represent a group of requests, isolation -// is needed across such groups. Part of this isolation is accomplished by -// maintaining multiple versions and part by allowing requests to acquire -// locks. Even the isolation based on multiple versions requires some form of -// mutual exclusion to ensure that a read and a conflicting lock acquisition -// do not happen concurrently. The lock table provides both locking and -// sequencing of requests (in concert with the use of latches, as specified -// below). -// -// Locks outlive the requests themselves and thereby extend the duration of the -// isolation provided over specific keys to the lifetime of the lock-holder -// transaction itself. They are (typically) only released when the transaction -// commits or aborts. Other requests that find these locks while being sequenced -// wait on them to be released in a queue before proceeding. Because locks are -// checked during sequencing, requests are guaranteed access to all declared -// keys after they have been sequenced. In other words, locks don't need to be -// checked again during evaluation. -// -// However, at the time of writing, not all locks are stored directly under lock -// table control, so not all locks are discoverable during sequencing. -// Specifically, write intents (replicated, exclusive locks) are stored inline -// in the MVCC keyspace, so they are often not detectable until request -// evaluation time. To accommodate this form of lock storage, the lock table -// exposes an addDiscoveredLock method. In the future, we intend to pull all -// locks, including those associated with write intents, into the lock table -// directly. -// -// The lock table also provides fairness between requests. If two requests -// conflict then the request that arrived first will typically be sequenced -// first. There are some exceptions: -// -// - a request that is part of a transaction which has already acquired a lock -// does not need to wait on that lock during sequencing, and can therefore -// ignore any queue that has formed on the lock. -// -// - contending requests that encounter different levels of contention may be -// sequenced in non-FIFO order. This is to allow for more concurrency. e.g. if -// request R1 and R2 contend on key K2, but R1 is also waiting at key K1, R2 -// could slip past R1 and evaluate. - -// Request encapsulates a request that needs to be evaluated on the state -// machine. -type Request interface { - // nil when not a transactional request -- such requests can only have - // SpanReadOnly spans and not acquire any locks, but are sequenced through - // lockTable. - txnMeta() *enginepb.TxnMeta - - // A SpanAccess of SpanReadWrite allows the requester to acquire an Exclusive - // lock for a key contained in the corresponding Span (when evaluating). - // SpanReadOnly spans do not permit any lock acquisition for their contained - // keys but are sequenced by the lockTable. A key must not appear in more than - // one span, i.e., it must be either SpanReadOnly or SpanReadWrite but not - // both. - spans() *spanset.SpanSet - - // The timestamp of the request. This must be equal to the Span.Timestamp in - // all of the spans in the SpanSet. - ts() hlc.Timestamp -} - -// A guard that is returned to the request the first time it calls -// lockTable.scanAndEnqueue() and used in later calls to scanAndEnqueue() and -// done(). After a call to scanAndEnqueue() (which is made while holding -// latches), the caller must first call requestGuard.startWaiting() and if it -// returns true release the latches and continue interacting with the -// requestGuard. If startWaiting() returns false, the request can proceed to -// evaluation. -// -// Waiting logic: The interface hides the queues that the request is waiting on, -// and the request's position in the queue. One of the reasons for this hiding -// is that queues are not FIFO since a request that did not wait on a queue for -// key k in a preceding call to scanAndEnqueue() (because k was not locked and -// there was no queue) may need to wait on the queue in a later call to -// scanAndEnqueue(). So sequencing of requests arriving at the lockTable is -// partially decided by a sequence number assigned to a request when it first -// called scanAndEnqueue() and queues are ordered by this sequence number. -// However the sequencing is not fully described by the sequence numbers -- a -// request R1 encountering contention over some keys in its span does not -// prevent a request R2 that has a higher sequence number and overlapping span -// to proceed if R2 does not encounter contention. This concurrency (that is not -// completely fair) is deemed desirable. -// -// The interface exposes an abstracted version of the waiting logic in a way -// that the request that starts waiting is considered waiting for at most one -// other request or transaction. This is exposed as a series of state -// transitions where the transitions are notified via newState() and the current -// state can be read using currentState(). -// -// - The waitFor* states provide information on who the request is waiting for. -// The waitForDistinguished state is a sub-case -- a distinguished waiter is -// responsible for taking extra actions e.g. immediately pushing the transaction -// it is waiting for. The implementation ensures that if there are multiple -// requests in waitFor state waiting on the same transaction at least one will -// be a distinguished waiter. *Note*: there is one rare case when multiple -// requests from the same transaction are waiting on the same lock. These -// requests wait independently, and it is possible for one of these requests to -// grab a reservation (see the detailed comment in lockState below regarding -// reservations), while the other request keeps waiting. This other request is -// waiting on its own transaction and it is even possible for it to be the -// distinguished waiter. We rely on code layers above lockTable to recognize -// this trivial loop and not place oneself in the txn wait queue or push itself. -// -// TODO(sbhola): investigate removing the waitForDistinguished state which -// will simplify the code here. All waitFor requests would wait (currently -// 50ms) before pushing the transaction (for deadlock detection) they are -// waiting on, say T. Typically T will be done before 50ms which is considered -// ok: the one exception we will need to make is if T has the min priority or -// the waiting transaction has max priority -- in both cases it will push -// immediately. The bad case is if T is ABORTED: the push will succeed after, -// and if T left N intents, each push would wait for 50ms, incurring a latency -// of 50*N ms. A cache of recently encountered ABORTED transactions on each -// Store should mitigate this latency increase. Whenever a transaction sees a -// waitFor state, it will consult this cache and if T is found, push -// immediately (if there isn't already a push in-flight) -- even if T is not -// initially in the cache, the first push will place it in the cache, so the -// maximum latency increase is 50ms. -// -// - The waitElsewhere state is a rare state that is used when the lockTable is -// under memory pressure and is clearing its internal queue state. Like the -// waitFor* states, it informs the request who it is waiting for so that -// deadlock detection works. However, sequencing information inside the -// lockTable is mostly discarded. -// -// - The waitSelf state is a rare state when a different request from the same -// transaction has a reservation. See the comment about "Reservations" in -// lockState. -// -// - The doneWaiting state is used to indicate that the request should make -// another call to scanAndEnqueue() (that next call is more likely to return a -// requestGuard that returns false from startWaiting()). -type requestGuard interface { - // startWaiting must be called after each scanAndEnqueue(). The request should - // proceed to evaluation if it returns false, else it releases latches and - // listens to the channel returned by newState(). - startWaiting() bool - - // newState returns the channel to listen on for notification that the state - // may have changed. If startWaiting() returns true, this channel will have an - // initial notification. Note that notifications are collapsed if not - // retrieved, since it is not necessary for the waiter to see every state - // transition. - newState() <-chan struct{} - - // currentState returns the latest waiting state. - currentState() waitingState -} - // The kind of waiting that the request is subject to. See the detailed comment // above for the meaning of each kind. type stateKind int @@ -202,185 +52,6 @@ type waitingState struct { access spanset.SpanAccess // Currently only SpanReadWrite. } -// Concurrency: in addition to holding latches, we require for a particular -// request scanAndEnqueue() and currentState() must be called by the same -// thread. -// -// Recommended usage: -// -// Request evaluation: -// -// g = nil -// // Outer for loop that repeatedly calls scanAndEnqueue() until request can -// // "evaluate". -// for { -// acquire all latches for req.spans() -// // Discovers "all" locks and queues in req.spans() and queues itself -// // where necessary. -// g := lockTable.scanAndEnqueue(..., g) -// if !g.startWaiting() { -// // "Evaluate" request while holding latches -// ... -// if found an exclusive-replicated lock { -// lockTable.addDiscoveredLock(...) -// release latches -// continue -// } -// // May call lockTable.acquireLock() if wants to lock something for -// // later requests in this transaction, or if placed a write intent -// // and it has been applied to the state machine. -// ... -// -// lockTable.done(handle) // Does not release locks. -// break -// } -// // Either there is a lock held by some other txn for which this -// // request has queued, or there isn't a lock but this request is not -// // at the front of the queue so needs to wait its turn. -// -// release all span latches -// var timer *time.Timer -// // Inner for loop that repeats until it is time to call scanAndEnqueue() -// // again, or to return without "evaluation". -// for { -// select { -// case c <- g.newState(): -// state := g.currentState(); -// // Act on state: deadlock detection, pushing other txns etc. -// if event.eventType == doneWaiting { -// break -// } -// if event.eventType == waitFor { -// if timer == nil { -// // Create timer for placing oneself in txn wait queue. -// timer = NewTimer(...) -// } -// continue -// } -// if event.eventType == waitForDistinguished { -// Do blocking push -// // When returns will be doneWaiting if there are no more lock -// // queues where it needs to wait. -// continue -// } -// if event.eventType == waitElsewhere { -// Do blocking call to put oneself in txn wait queue -// break -// } -// if event.eventType == waitSelf { -// continue -// } -// case <- timer.C: -// // Need to put oneself in txn wait queue. -// timer = nil -// Do blocking call to put oneself in txn wait queue -// // When return, continue waiting on this handle. -// continue -// case deadline or cancellation: -// lockTable.done(g) -// return -// } -// } // end inner for loop -// } // end outer for loop -// -// Transaction is done: -// call lockTable.releaseLocks() -// -// Transaction is pushed and holds some locks: -// call lockTable.updateLocks() -type lockTable interface { - // scanAndEnqueue is used to find locks and queues to add the request to. The - // first call to scanAndEnqueue for a request uses a nil requestGuard and the - // subsequent calls reuse the previously returned one. The latches needed by - // the request must be held when calling this function. - scanAndEnqueue(req Request, guard requestGuard) requestGuard - - // done is called when the request is finished, whether it evaluated or not. - // This causes it to be removed from any queues. This method does not release - // any locks. This method must be called on the last guard returned from - // scanAndEnqueue() for the request even if one of the (a) lockTable calls - // that use a requestGuard parameter, or (b) a requestGuard call, returned an - // error. The lockTable does not require latches to be held. - done(guard requestGuard) - - // acquireLock is used to acquire a lock in the lockTable. It is only - // permitted for requests that have a non-nil TxnMeta. It must only be called - // in the evaluation phase before calling done(), which means all the latches - // needed by the request are held. The key must be in the request's SpanSet - // with the appropriate SpanAccess: currently the strength is always - // Exclusive, so the span containing this key must be SpanReadWrite. This - // contract ensures that the lock is not held in a conflicting manner by a - // different transaction. Acquiring a lock that is already held by this - // transaction is a noop. - // - // For replicated locks, this must be called after the intent has been applied - // to the replicated state machine. - acquireLock(key roachpb.Key, strength lock.Strength, durability lock.Durability, guard requestGuard) error - - // Called when intent resolution or other adjustments (e.g. pushing a - // transaction with no intents) occur on transaction. - // - // For spans containing Replicated locks, this must be called after intent - // resolution has been applied to the replicated state machine. - // - // A latch with SpanReadWrite must be held on span with the lowest timestamp - // at which any of the locks could be held. This is explained below. - // - // Note that spans can be wider than the actual keys on which locks were - // acquired, and it is ok if no locks are found or locks held by other - // transactions are found (for those lock this call is a noop). - // - // For COMMITTED or ABORTED transactions, all locks are released. - // - // For PENDING or STAGING transactions, the behavior is: - // - // - All replicated locks known to the lockTable are dropped. This is not - // because those intents are necessarily deleted, but because in the - // current code where intents are not managed by the lockTable (this will - // change when we have a segregated lock table), we do not want to risk - // code divergence between lockTable and mvccResolveWriteIntent: the - // danger is that the latter removes or changes an intent while the - // lockTable retains it, and a waiter is stuck forever. - // - // Note that even the conservative behavior of dropping locks requires - // that intent resolution acquire latches using the oldest timestamp at - // which the intent could have been written: if the intent was at ts=5 and - // the intent resolution is using ts=10 (since the transaction has been - // pushed), there is a race where a reader at ts=8 can be concurrently - // holding latches and the following bad sequence occurs (both thread1 and - // thread2 are concurrent since their latches do not conflict): - // - // - [thread1-txn1] reader sees intent at ts=5 - // - [thread2-txn2] intent resolution changes that intent to ts=10 - // - [thread2-txn2] updateLocks is called and lock is removed since it is a - // replicated lock. - // - [thread1-txn1] reader calls addDiscoveredLock() for ts=5. - // - // Now the lockTable thinks there is a lock and subsequent pushes of txn2 - // by txn1 will do nothing since the txn2 is already at timestamp 10. Txn1 - // will unnecessarily block until txn2 is done. - // - // - Unreplicated locks: - // - for epochs older than txn.Epoch, locks are dropped. - // - locks in the current epoch that are at a TxnMeta.Sequence contained - // in ignoredSeqNums are dropped. - // - the remaining locks are changed to timestamp equal to - // txn.WriteTimestamp. - updateLocks(intent *roachpb.Intent) error - - // addDiscoveredLock is called when an exclusive replicated lock held by a - // different transaction is discovered when reading the MVCC keys during - // evaluation of this request. It adds the lock and enqueues this requester. - // It is required that request evaluation discover such locks before - // acquiring its own locks, since the request needs to repeat - // scanAndEnqueue(). - // - // A latch consistent with the access desired by guard and guard.ts() must - // be held on the span containing key. - addDiscoveredLock( - key roachpb.Key, txn *enginepb.TxnMeta, ts hlc.Timestamp, guard requestGuard) error -} - // Implementation // TODO(sbhola): // - use the cow btree. @@ -404,6 +75,13 @@ type treeMu struct { numLocks int64 } +// lockTableImpl is an implementation of lockTable. +// +// Concurrency: in addition to holding latches, we require for a particular +// request ScanAndEnqueue() and CurState() must be called by the same +// thread. +// +// Mutex ordering: treeMu.mu > lockState.mu > lockTableGuardImpl.mu type lockTableImpl struct { // A sequence number is assigned to each request seen by the lockTable. This // is to preserve fairness despite the design choice of allowing @@ -461,14 +139,79 @@ func newLockTable(maxLocks int64) lockTable { var _ lockTable = &lockTableImpl{} -// Synchronization: +// lockTableGuardImpl is an implementation of lockTableGuard. // -// mutex ordering: treeMu.mu > lockState.mu > requestGuardImpl.mu +// The struct is a guard that is returned to the request the first time it calls +// lockTable.ScanAndEnqueue() and used in later calls to ScanAndEnqueue() and +// done(). After a call to ScanAndEnqueue() (which is made while holding +// latches), the caller must first call lockTableGuard.StartWaiting() and if it +// returns true release the latches and continue interacting with the +// lockTableGuard. If StartWaiting() returns false, the request can proceed to +// evaluation. // -// queuedGuard.active is protected by lockState.mu - -// Implementation of requestGuard. -type requestGuardImpl struct { +// Waiting logic: The interface hides the queues that the request is waiting on, +// and the request's position in the queue. One of the reasons for this hiding +// is that queues are not FIFO since a request that did not wait on a queue for +// key k in a preceding call to ScanAndEnqueue() (because k was not locked and +// there was no queue) may need to wait on the queue in a later call to +// ScanAndEnqueue(). So sequencing of requests arriving at the lockTable is +// partially decided by a sequence number assigned to a request when it first +// called ScanAndEnqueue() and queues are ordered by this sequence number. +// However the sequencing is not fully described by the sequence numbers -- a +// request R1 encountering contention over some keys in its span does not +// prevent a request R2 that has a higher sequence number and overlapping span +// to proceed if R2 does not encounter contention. This concurrency (that is not +// completely fair) is deemed desirable. +// +// The interface exposes an abstracted version of the waiting logic in a way +// that the request that starts waiting is considered waiting for at most one +// other request or transaction. This is exposed as a series of state +// transitions where the transitions are notified via newState() and the current +// state can be read using CurState(). +// +// - The waitFor* states provide information on who the request is waiting for. +// The waitForDistinguished state is a sub-case -- a distinguished waiter is +// responsible for taking extra actions e.g. immediately pushing the transaction +// it is waiting for. The implementation ensures that if there are multiple +// requests in waitFor state waiting on the same transaction at least one will +// be a distinguished waiter. *Note*: there is one rare case when multiple +// requests from the same transaction are waiting on the same lock. These +// requests wait independently, and it is possible for one of these requests to +// grab a reservation (see the detailed comment in lockState below regarding +// reservations), while the other request keeps waiting. This other request is +// waiting on its own transaction and it is even possible for it to be the +// distinguished waiter. We rely on code layers above lockTable to recognize +// this trivial loop and not place oneself in the txn wait queue or push itself. +// +// TODO(sbhola): investigate removing the waitForDistinguished state which +// will simplify the code here. All waitFor requests would wait (currently +// 50ms) before pushing the transaction (for deadlock detection) they are +// waiting on, say T. Typically T will be done before 50ms which is considered +// ok: the one exception we will need to make is if T has the min priority or +// the waiting transaction has max priority -- in both cases it will push +// immediately. The bad case is if T is ABORTED: the push will succeed after, +// and if T left N intents, each push would wait for 50ms, incurring a latency +// of 50*N ms. A cache of recently encountered ABORTED transactions on each +// Store should mitigate this latency increase. Whenever a transaction sees a +// waitFor state, it will consult this cache and if T is found, push +// immediately (if there isn't already a push in-flight) -- even if T is not +// initially in the cache, the first push will place it in the cache, so the +// maximum latency increase is 50ms. +// +// - The waitElsewhere state is a rare state that is used when the lockTable is +// under memory pressure and is clearing its internal queue state. Like the +// waitFor* states, it informs the request who it is waiting for so that +// deadlock detection works. However, sequencing information inside the +// lockTable is mostly discarded. +// +// - The waitSelf state is a rare state when a different request from the same +// transaction has a reservation. See the comment about "Reservations" in +// lockState. +// +// - The doneWaiting state is used to indicate that the request should make +// another call to ScanAndEnqueue() (that next call is more likely to return a +// lockTableGuard that returns false from StartWaiting()). +type lockTableGuardImpl struct { seqNum uint64 table *lockTableImpl @@ -477,7 +220,7 @@ type requestGuardImpl struct { spans *spanset.SpanSet ts hlc.Timestamp - // A request whose startWait is set to true in scanAndEnqueue is actively + // A request whose startWait is set to true in ScanAndEnqueue is actively // waiting at a particular key. This is the first key encountered when // iterating through spans that it needs to wait at. A future event (lock // release etc.) may cause the request to no longer need to wait at this @@ -508,7 +251,7 @@ type requestGuardImpl struct { // If this is true, the state has changed and the channel has been // signaled, but what the state should be has not been computed. The call - // to currentState() needs to compute that current state. Deferring the + // to CurState() needs to compute that current state. Deferring the // computation makes the waiters do this work themselves instead of making // the call to release/update locks or release reservations do this work // (proportional to number of waiters). @@ -516,21 +259,21 @@ type requestGuardImpl struct { } } -var _ requestGuard = &requestGuardImpl{} +var _ lockTableGuard = &lockTableGuardImpl{} -func (g *requestGuardImpl) startWaiting() bool { +func (g *lockTableGuardImpl) ShouldWait() bool { g.mu.Lock() defer g.mu.Unlock() return g.mu.startWait } -func (g *requestGuardImpl) newState() <-chan struct{} { +func (g *lockTableGuardImpl) NewStateChan() <-chan struct{} { g.mu.Lock() defer g.mu.Unlock() return g.mu.signal } -func (g *requestGuardImpl) currentState() waitingState { +func (g *lockTableGuardImpl) CurState() waitingState { g.mu.Lock() defer g.mu.Unlock() if !g.mu.mustFindNextLockAfter { @@ -545,7 +288,7 @@ func (g *requestGuardImpl) currentState() waitingState { return g.mu.state } -func (g *requestGuardImpl) notify() { +func (g *lockTableGuardImpl) notify() { select { case g.mu.signal <- struct{}{}: default: @@ -554,12 +297,12 @@ func (g *requestGuardImpl) notify() { // Waiting writers in a lockState are wrapped in a queuedGuard. A waiting // writer is typically waiting in an active state, i.e., the -// requestGuardImpl.key refers to this lockState. However, breaking of +// lockTableGuardImpl.key refers to this lockState. However, breaking of // reservations (see the comment on reservations below, in lockState) can // cause a writer to be an inactive waiter. type queuedGuard struct { - guard *requestGuardImpl - active bool + guard *lockTableGuardImpl + active bool // protected by lockState.mu } // Information about a lock holder. @@ -592,7 +335,7 @@ type lockState struct { mu syncutil.Mutex // Protects everything below. // Invariant summary (see detailed comments below): - // - both holder.locked and reservation != nil cannot be true. + // - both holder.locked and waitQ.reservation != nil cannot be true. // - if holder.locked and multiple holderInfos have txn != nil: all the // txns must have the same txn.ID. // - !holder.locked => waitingReaders.Len() == 0. That is, readers wait @@ -609,6 +352,11 @@ type lockState struct { holder [lock.MaxDurability + 1]lockHolderInfo } + // Information about the requests waiting on the lock. + lockWaitQueue +} + +type lockWaitQueue struct { // Reservations: // // A not-held lock can be "reserved". A reservation is just a claim that @@ -711,12 +459,12 @@ type lockState struct { // first waiter since that waiter must be desiring a lock that is // incompatible with a shared lock. - reservation *requestGuardImpl + reservation *lockTableGuardImpl // TODO(sbhola): There are a number of places where we iterate over these // lists looking for something, as described below. If some of these turn // out to be inefficient, consider better data-structures. One idea is that - // for cases that find a particular guard the requestGuardImpl.locks can be + // for cases that find a particular guard the lockTableGuardImpl.locks can be // a map instead of a set to point directly to the *list.Element. // // queuedWriters: @@ -737,14 +485,14 @@ type lockState struct { // non-empty, either the lock is held or there is a reservation. queuedWriters list.List - // List of *requestGuardImpl. All of these are actively waiting. If + // List of *lockTableGuardImpl. All of these are actively waiting. If // non-empty, the lock must be held. By definition these cannot be in // waitSelf state since that state is only used when there is a reservation. waitingReaders list.List // If there is a non-empty set of active waiters that are not waitSelf, then // at least one must be distinguished. - distinguishedWaiter *requestGuardImpl + distinguishedWaiter *lockTableGuardImpl } // Less implements the btree.Item interface. @@ -797,7 +545,7 @@ func (l *lockState) informActiveWaiters() { // Since there are waiting readers we could not have transitioned out of // or into a state with a reservation, since readers do not wait for // reservations. - g := e.Value.(*requestGuardImpl) + g := e.Value.(*lockTableGuardImpl) if findDistinguished { l.distinguishedWaiter = g findDistinguished = false @@ -841,9 +589,9 @@ func (l *lockState) informActiveWaiters() { // waiter. // REQUIRES: l.mu is locked. func (l *lockState) tryMakeNewDistinguished() { - var g *requestGuardImpl + var g *lockTableGuardImpl if l.waitingReaders.Len() > 0 { - g = l.waitingReaders.Front().Value.(*requestGuardImpl) + g = l.waitingReaders.Front().Value.(*lockTableGuardImpl) } else if l.queuedWriters.Len() > 0 { for e := l.queuedWriters.Front(); e != nil; e = e.Next() { qg := e.Value.(*queuedGuard) @@ -910,10 +658,10 @@ func (l *lockState) getLockerInfo() (*enginepb.TxnMeta, hlc.Timestamp) { // it is set to false when the call to tryActiveWait is happening due to an // event for a different request or transaction (like a lock release) since in // that case the channel is notified first and the call to tryActiveWait() -// happens later in requestGuard.currentState(). The return value is true iff +// happens later in lockTableGuard.CurState(). The return value is true iff // it is actively waiting. // Acquires l.mu, g.mu. -func (l *lockState) tryActiveWait(g *requestGuardImpl, sa spanset.SpanAccess, notify bool) bool { +func (l *lockState) tryActiveWait(g *lockTableGuardImpl, sa spanset.SpanAccess, notify bool) bool { l.mu.Lock() defer l.mu.Unlock() @@ -1047,13 +795,13 @@ func (l *lockState) tryActiveWait(g *requestGuardImpl, sa spanset.SpanAccess, no // that is acquiring the lock. // Acquires l.mu. func (l *lockState) acquireLock( - _ lock.Strength, durability lock.Durability, g *requestGuardImpl, -) (doneWaiting []*requestGuardImpl, err error) { + txn *enginepb.TxnMeta, _ lock.Strength, durability lock.Durability, +) (doneWaiting []*lockTableGuardImpl, err error) { l.mu.Lock() defer l.mu.Unlock() if l.holder.locked { // Already held. - updateTxn, updateTs := g.txn, g.ts + updateTxn, updateTs := txn, txn.WriteTimestamp beforeTxn, beforeTs := l.getLockerInfo() if beforeTxn.ID != updateTxn.ID { return nil, errors.Errorf("caller violated contract") @@ -1089,15 +837,15 @@ func (l *lockState) acquireLock( } // Not already held, so may be reserved by this request. if l.reservation != nil { - if l.reservation != g { + if l.reservation.txn.ID != txn.ID { return nil, errors.Errorf("caller violated contract") } if l.waitingReaders.Len() > 0 { panic("lockTable bug") } - g.mu.Lock() - delete(g.mu.locks, l) - g.mu.Unlock() + l.reservation.mu.Lock() + delete(l.reservation.mu.locks, l) + l.reservation.mu.Unlock() } else { if l.queuedWriters.Len() > 0 || l.waitingReaders.Len() > 0 { panic("lockTable bug") @@ -1105,11 +853,10 @@ func (l *lockState) acquireLock( } l.reservation = nil l.holder.locked = true - l.holder.holder[durability].txn = g.txn - l.holder.holder[durability].ts = g.ts - l.holder.holder[durability].seqs = append([]enginepb.TxnSeq(nil), g.txn.Sequence) + l.holder.holder[durability].txn = txn + l.holder.holder[durability].ts = txn.WriteTimestamp + l.holder.holder[durability].seqs = append([]enginepb.TxnSeq(nil), txn.Sequence) - txnID := g.txn.ID // No effect on queuedWriters from other txns, but if there are waiting // requests from the same txn, they no longer need to wait. for e := l.queuedWriters.Front(); e != nil; { @@ -1117,7 +864,7 @@ func (l *lockState) acquireLock( curr := e e = e.Next() g := qg.guard - if g.txn.ID == txnID { + if g.txn.ID == txn.ID { if qg.active { doneWaiting = append(doneWaiting, g) if g == l.distinguishedWaiter { @@ -1137,7 +884,7 @@ func (l *lockState) acquireLock( // where g is trying to access this key with access sa. // Acquires l.mu. func (l *lockState) discoveredLock( - txn *enginepb.TxnMeta, ts hlc.Timestamp, g *requestGuardImpl, sa spanset.SpanAccess, + txn *enginepb.TxnMeta, ts hlc.Timestamp, g *lockTableGuardImpl, sa spanset.SpanAccess, ) error { l.mu.Lock() defer l.mu.Unlock() @@ -1222,7 +969,7 @@ func (l *lockState) tryClearLock() bool { l.reservation = nil } for e := l.waitingReaders.Front(); e != nil; { - g := e.Value.(*requestGuardImpl) + g := e.Value.(*lockTableGuardImpl) curr := e e = e.Next() l.waitingReaders.Remove(curr) @@ -1296,7 +1043,7 @@ func removeIgnored( // Acquires l.mu. func (l *lockState) tryUpdateLock( intent *roachpb.Intent, -) (doneWaiting []*requestGuardImpl, gc bool, err error) { +) (doneWaiting []*lockTableGuardImpl, gc bool, err error) { l.mu.Lock() defer l.mu.Unlock() if !l.isLockedBy(intent.Txn.ID) { @@ -1356,11 +1103,11 @@ func (l *lockState) tryUpdateLock( // The lock holder timestamp has increased. Returns the list of guards that // are done actively waiting at this key. // REQUIRES: l.mu is locked. -func (l *lockState) increasedLockTs(newTs hlc.Timestamp) []*requestGuardImpl { - var doneWaiting []*requestGuardImpl +func (l *lockState) increasedLockTs(newTs hlc.Timestamp) []*lockTableGuardImpl { + var doneWaiting []*lockTableGuardImpl distinguishedRemoved := false for e := l.waitingReaders.Front(); e != nil; { - g := e.Value.(*requestGuardImpl) + g := e.Value.(*lockTableGuardImpl) curr := e e = e.Next() if !g.ts.LessEq(newTs) { @@ -1391,7 +1138,9 @@ func (l *lockState) increasedLockTs(newTs hlc.Timestamp) []*requestGuardImpl { // list of guards that are done actively waiting at this key and whether the // lockState can be garbage collected. // Acquires l.mu. -func (l *lockState) requestDone(g *requestGuardImpl) (doneWaiting []*requestGuardImpl, gc bool) { +func (l *lockState) requestDone( + g *lockTableGuardImpl, +) (doneWaiting []*lockTableGuardImpl, gc bool) { l.mu.Lock() defer l.mu.Unlock() @@ -1424,7 +1173,7 @@ func (l *lockState) requestDone(g *requestGuardImpl) (doneWaiting []*requestGuar } if !doneRemoval { for e := l.waitingReaders.Front(); e != nil; e = e.Next() { - gg := e.Value.(*requestGuardImpl) + gg := e.Value.(*lockTableGuardImpl) if gg == g { l.waitingReaders.Remove(e) if g == l.distinguishedWaiter { @@ -1448,7 +1197,7 @@ func (l *lockState) requestDone(g *requestGuardImpl) (doneWaiting []*requestGuar // The lock has transitioned from locked/reserved to unlocked. There could be // waiters, but there cannot be a reservation. // REQUIRES: l.mu is locked. -func (l *lockState) lockIsFree() (doneWaiting []*requestGuardImpl, gc bool) { +func (l *lockState) lockIsFree() (doneWaiting []*lockTableGuardImpl, gc bool) { if l.reservation != nil { panic("lockTable bug") } @@ -1457,7 +1206,7 @@ func (l *lockState) lockIsFree() (doneWaiting []*requestGuardImpl, gc bool) { findDistinguished := l.distinguishedWaiter == nil // All waiting readers don't need to wait here anymore. for e := l.waitingReaders.Front(); e != nil; { - g := e.Value.(*requestGuardImpl) + g := e.Value.(*lockTableGuardImpl) curr := e e = e.Next() l.waitingReaders.Remove(curr) @@ -1523,23 +1272,23 @@ func (l *lockState) lockIsFree() (doneWaiting []*requestGuardImpl, gc bool) { return doneWaiting, false } -// scanAndEnqueue implements the lockTable interface. -func (t *lockTableImpl) scanAndEnqueue(req Request, guard requestGuard) requestGuard { - var g *requestGuardImpl +// ScanAndEnqueue implements the lockTable interface. +func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTableGuard { + var g *lockTableGuardImpl if guard == nil { seqNum := atomic.AddUint64(&t.seqNum, 1) - g = &requestGuardImpl{ + g = &lockTableGuardImpl{ seqNum: seqNum, table: t, - txn: req.txnMeta(), - spans: req.spans(), - ts: req.ts(), + txn: &req.Txn.TxnMeta, + spans: req.Spans, + ts: req.Timestamp, index: -1, } g.mu.signal = make(chan struct{}, 1) g.mu.locks = make(map[*lockState]struct{}) } else { - g = guard.(*requestGuardImpl) + g = guard.(*lockTableGuardImpl) g.mu.Lock() g.key = nil g.sa = spanset.SpanAccess(0) @@ -1553,7 +1302,7 @@ func (t *lockTableImpl) scanAndEnqueue(req Request, guard requestGuard) requestG return g } -func processDoneWaiting(doneWaiting []*requestGuardImpl) { +func processDoneWaiting(doneWaiting []*lockTableGuardImpl) { for _, g := range doneWaiting { g.mu.Lock() g.mu.mustFindNextLockAfter = true @@ -1562,16 +1311,16 @@ func processDoneWaiting(doneWaiting []*requestGuardImpl) { } } -// done implements the lockTable interface. -func (t *lockTableImpl) done(guard requestGuard) { - g := guard.(*requestGuardImpl) +// Dequeue implements the lockTable interface. +func (t *lockTableImpl) Dequeue(guard lockTableGuard) { + g := guard.(*lockTableGuardImpl) var candidateLocks []*lockState g.mu.Lock() for l := range g.mu.locks { candidateLocks = append(candidateLocks, l) } g.mu.Unlock() - var doneWaiting []*requestGuardImpl + var doneWaiting []*lockTableGuardImpl var locksToGC [spanset.NumSpanScope][]*lockState for _, l := range candidateLocks { dw, gc := l.requestDone(g) @@ -1589,14 +1338,46 @@ func (t *lockTableImpl) done(guard requestGuard) { processDoneWaiting(doneWaiting) } -// acquireLock implements the lockTable interface. -func (t *lockTableImpl) acquireLock( - key roachpb.Key, strength lock.Strength, durability lock.Durability, guard requestGuard, +// AddDiscoveredLock implements the lockTable interface. +func (t *lockTableImpl) AddDiscoveredLock(intent *roachpb.Intent, guard lockTableGuard) error { + g := guard.(*lockTableGuardImpl) + key := intent.Key + ss := spanset.SpanGlobal + if keys.IsLocal(key) { + ss = spanset.SpanLocal + } + sa, err := findAccessInSpans(key, ss, g.spans) + if err != nil { + return err + } + var l *lockState + tree := &t.locks[ss] + tree.mu.Lock() + // Can't release tree.mu until call l.discoveredLock() since someone may + // find an empty lock and remove it from the tree. If we expect that + // lockState will already be in tree we can optimize this by first trying + // with a tree.mu.RLock(). + defer tree.mu.Unlock() + i := tree.Get(&lockState{key: key}) + if i == nil { + l = &lockState{key: key, ss: ss} + l.queuedWriters.Init() + l.waitingReaders.Init() + tree.ReplaceOrInsert(l) + atomic.AddInt64(&tree.numLocks, 1) + } else { + l = i.(*lockState) + } + return l.discoveredLock(&intent.Txn, intent.Txn.WriteTimestamp, g, sa) +} + +// AcquireLock implements the lockTable interface. +func (t *lockTableImpl) AcquireLock( + txn *enginepb.TxnMeta, key roachpb.Key, strength lock.Strength, durability lock.Durability, ) error { if strength != lock.Exclusive { return errors.Errorf("caller violated contract") } - g := guard.(*requestGuardImpl) ss := spanset.SpanGlobal if keys.IsLocal(key) { ss = spanset.SpanLocal @@ -1623,11 +1404,8 @@ func (t *lockTableImpl) acquireLock( } else { l = i.(*lockState) } - doneWaiting, err := l.acquireLock(strength, durability, g) + doneWaiting, err := l.acquireLock(txn, strength, durability) tree.mu.Unlock() - if err != nil { - return err - } processDoneWaiting(doneWaiting) var totalLocks int64 @@ -1637,15 +1415,15 @@ func (t *lockTableImpl) acquireLock( if totalLocks > t.maxLocks { t.clearMostLocks() } - return nil + return err } // Removes all locks that have active waiters and tells those waiters to wait // elsewhere. A replicated lock which has been discovered by a request but the // request is not yet actively waiting on it will be preserved since we need // to tell that request who it is waiting for when it next calls -// scanAndEnqueue(). If we aggressively removed even those requests, the next -// scanAndEnqueue() would not find that lock, the request would evaluate +// ScanAndEnqueue(). If we aggressively removed even those requests, the next +// ScanAndEnqueue() would not find that lock, the request would evaluate // again, again discover that lock and if clearMostLocks() keeps getting // called would be stuck in this loop without pushing. func (t *lockTableImpl) clearMostLocks() { @@ -1682,40 +1460,6 @@ func findAccessInSpans( return spanset.NumSpanAccess, errors.Errorf("caller violated contract") } -// addDiscoveredLock implements the lockTable interface. -func (t *lockTableImpl) addDiscoveredLock( - key roachpb.Key, txn *enginepb.TxnMeta, ts hlc.Timestamp, guard requestGuard, -) error { - g := guard.(*requestGuardImpl) - ss := spanset.SpanGlobal - if keys.IsLocal(key) { - ss = spanset.SpanLocal - } - sa, err := findAccessInSpans(key, ss, g.spans) - if err != nil { - return err - } - var l *lockState - tree := &t.locks[ss] - tree.mu.Lock() - // Can't release tree.mu until call l.discoveredLock() since someone may - // find an empty lock and remove it from the tree. If we expect that - // lockState will already be in tree we can optimize this by first trying - // with a tree.mu.RLock(). - defer tree.mu.Unlock() - i := tree.Get(&lockState{key: key}) - if i == nil { - l = &lockState{key: key, ss: ss} - l.queuedWriters.Init() - l.waitingReaders.Init() - tree.ReplaceOrInsert(l) - atomic.AddInt64(&tree.numLocks, 1) - } else { - l = i.(*lockState) - } - return l.discoveredLock(txn, ts, g, sa) -} - // Tries to GC locks that were previously known to have become empty. func (t *lockTableImpl) tryGCLocks(tree *treeMu, locks []*lockState) { tree.mu.Lock() @@ -1741,8 +1485,8 @@ func (t *lockTableImpl) tryGCLocks(tree *treeMu, locks []*lockState) { } } -// updateLocks implements the lockTable interface. -func (t *lockTableImpl) updateLocks(intent *roachpb.Intent) error { +// UpdateLocks implements the lockTable interface. +func (t *lockTableImpl) UpdateLocks(intent *roachpb.Intent) error { span := intent.Span ss := spanset.SpanGlobal if keys.IsLocal(span.Key) { @@ -1750,7 +1494,7 @@ func (t *lockTableImpl) updateLocks(intent *roachpb.Intent) error { } tree := &t.locks[ss] var err error - var doneWaiting []*requestGuardImpl + var doneWaiting []*lockTableGuardImpl var locksToGC []*lockState changeFunc := func(i btree.Item) bool { l := i.(*lockState) @@ -1774,20 +1518,18 @@ func (t *lockTableImpl) updateLocks(intent *roachpb.Intent) error { } } tree.mu.RUnlock() - if err != nil { - return err - } + if len(locksToGC) > 0 { t.tryGCLocks(tree, locksToGC) } processDoneWaiting(doneWaiting) - return nil + return err } // Iteration helper for findNextLockAfter. Returns the next span to search // over, or nil if the iteration is done. // REQUIRES: g.mu is locked. -func stepToNextSpan(g *requestGuardImpl) *spanset.Span { +func stepToNextSpan(g *lockTableGuardImpl) *spanset.Span { g.index++ for ; g.ss < spanset.NumSpanScope; g.ss++ { for ; g.sa < spanset.NumSpanAccess; g.sa++ { @@ -1808,7 +1550,7 @@ func stepToNextSpan(g *requestGuardImpl) *spanset.Span { // finds the next lock the request starts actively waiting there, else it is // told that it is done waiting. // Acquires g.mu. Acquires treeMu.mu's in read mode. -func (t *lockTableImpl) findNextLockAfter(g *requestGuardImpl, notify bool) { +func (t *lockTableImpl) findNextLockAfter(g *lockTableGuardImpl, notify bool) { spans := g.spans.GetSpans(g.sa, g.ss) var span *spanset.Span resumingInSameSpan := false @@ -1879,6 +1621,10 @@ func (t *lockTableImpl) findNextLockAfter(g *requestGuardImpl, notify bool) { } } +func (t *lockTableImpl) Clear() { + // TODO(nvanbenschoten): implement and test this. +} + // For tests. func (t *lockTableImpl) String() string { var buf strings.Builder @@ -1904,7 +1650,7 @@ func (t *lockTableImpl) String() string { if l.waitingReaders.Len() > 0 { fmt.Fprintln(&buf, " waiting readers:") for e := l.waitingReaders.Front(); e != nil; e = e.Next() { - g := e.Value.(*requestGuardImpl) + g := e.Value.(*lockTableGuardImpl) fmt.Fprintf(&buf, " req: %d, txn: %v\n", g.seqNum, g.txn.ID) } } diff --git a/pkg/storage/concurrency/lock_table_test.go b/pkg/storage/concurrency/lock_table_test.go index e1aada2bac16..2a9f8af5a4e6 100644 --- a/pkg/storage/concurrency/lock_table_test.go +++ b/pkg/storage/concurrency/lock_table_test.go @@ -28,8 +28,8 @@ import ( ) /* -Test needs to handle caller constraints wrt latches being held. The datadriven test uses the -following format: +Test needs to handle caller constraints wrt latches being held. The datadriven +test uses the following format: txn txn= ts=[,] epoch= ---- @@ -45,8 +45,8 @@ scan r= ---- |start-waiting: - Calls lockTable.scanAndEnqueue. If the request has an existing guard, uses it. If a guard is - returned, stores it for later use. + Calls lockTable.ScanAndEnqueue. If the request has an existing guard, uses it. + If a guard is returned, stores it for later use. acquire r= k= durability=r|u ---- @@ -76,19 +76,21 @@ done r= ---- - Calls lockTable.done() for the named request. The request and guard are discarded after this. + Calls lockTable.Dequeue() for the named request. The request and guard are + discarded after this. guard-state r= ---- new|old: state= [txn= ts=] - Calls requestGuard.newState() in a non-blocking manner, followed by currentState(). + Calls lockTableGuard.NewStateChan() in a non-blocking manner, followed by + CurState(). guard-start-waiting r= ---- - Calls requestGuard.startWaiting(). + Calls lockTableGuard.ShouldWait(). print ---- @@ -163,26 +165,14 @@ func scanSpans(t *testing.T, d *datadriven.TestData, ts hlc.Timestamp) *spanset. return spans } -type testRequest struct { - tM *enginepb.TxnMeta - s *spanset.SpanSet - t hlc.Timestamp -} - -var _ Request = &testRequest{} - -func (r *testRequest) txnMeta() *enginepb.TxnMeta { return r.tM } -func (r *testRequest) spans() *spanset.SpanSet { return r.s } -func (r *testRequest) ts() hlc.Timestamp { return r.t } - func TestLockTableBasic(t *testing.T) { defer leaktest.AfterTest(t)() lt := newLockTable(1000) txnsByName := make(map[string]*enginepb.TxnMeta) txnCounter := uint128.FromInts(0, 0) - requestsByName := make(map[string]*testRequest) - guardsByReqName := make(map[string]requestGuard) + requestsByName := make(map[string]Request) + guardsByReqName := make(map[string]lockTableGuard) datadriven.RunTest(t, "testdata/lock_table", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "txn": @@ -227,10 +217,13 @@ func TestLockTableBasic(t *testing.T) { } ts := scanTimestamp(t, d) spans := scanSpans(t, d, ts) - req := &testRequest{ - tM: txnMeta, - s: spans, - t: ts, + // Update the transaction's timestamp, if necessary. The transaction + // may have needed to move its timestamp for any number of reasons. + txnMeta.WriteTimestamp = ts + req := Request{ + Txn: &roachpb.Transaction{TxnMeta: *txnMeta}, + Timestamp: ts, + Spans: spans, } requestsByName[reqName] = req return "" @@ -238,21 +231,21 @@ func TestLockTableBasic(t *testing.T) { case "scan": var reqName string d.ScanArgs(t, "r", &reqName) - req := requestsByName[reqName] - if req == nil { + req, ok := requestsByName[reqName] + if !ok { d.Fatalf(t, "unknown request: %s", reqName) } g := guardsByReqName[reqName] - g = lt.scanAndEnqueue(req, g) + g = lt.ScanAndEnqueue(req, g) guardsByReqName[reqName] = g - return fmt.Sprintf("start-waiting: %t", g.startWaiting()) + return fmt.Sprintf("start-waiting: %t", g.ShouldWait()) case "acquire": var reqName string d.ScanArgs(t, "r", &reqName) - g := guardsByReqName[reqName] - if g == nil { - d.Fatalf(t, "unknown guard: %s", reqName) + req, ok := requestsByName[reqName] + if !ok { + d.Fatalf(t, "unknown request: %s", reqName) } var key string d.ScanArgs(t, "k", &key) @@ -265,7 +258,7 @@ func TestLockTableBasic(t *testing.T) { if s[0] == 'r' { durability = lock.Replicated } - if err := lt.acquireLock(roachpb.Key(key), lock.Exclusive, durability, g); err != nil { + if err := lt.AcquireLock(&req.Txn.TxnMeta, roachpb.Key(key), lock.Exclusive, durability); err != nil { return err.Error() } return lt.(*lockTableImpl).String() @@ -282,7 +275,7 @@ func TestLockTableBasic(t *testing.T) { span := getSpan(t, d, s) // TODO(sbhola): also test ABORTED. intent := &roachpb.Intent{Span: span, Txn: *txnMeta, Status: roachpb.COMMITTED} - if err := lt.updateLocks(intent); err != nil { + if err := lt.UpdateLocks(intent); err != nil { return err.Error() } return lt.(*lockTableImpl).String() @@ -306,7 +299,7 @@ func TestLockTableBasic(t *testing.T) { span := getSpan(t, d, s) // TODO(sbhola): also test STAGING. intent := &roachpb.Intent{Span: span, Txn: *txnMeta, Status: roachpb.PENDING} - if err := lt.updateLocks(intent); err != nil { + if err := lt.UpdateLocks(intent); err != nil { return err.Error() } return lt.(*lockTableImpl).String() @@ -326,31 +319,35 @@ func TestLockTableBasic(t *testing.T) { if !ok { d.Fatalf(t, "unknown txn %s", txnName) } - if err := lt.addDiscoveredLock(roachpb.Key(key), txnMeta, txnMeta.WriteTimestamp, g); err != nil { + span := roachpb.Span{Key: roachpb.Key(key)} + intent := &roachpb.Intent{Span: span, Txn: *txnMeta, Status: roachpb.PENDING} + if err := lt.AddDiscoveredLock(intent, g); err != nil { return err.Error() } return lt.(*lockTableImpl).String() case "done": + // TODO(nvanbenschoten): rename this command to dequeue. var reqName string d.ScanArgs(t, "r", &reqName) g := guardsByReqName[reqName] if g == nil { d.Fatalf(t, "unknown guard: %s", reqName) } - lt.done(g) + lt.Dequeue(g) delete(guardsByReqName, reqName) delete(requestsByName, reqName) return lt.(*lockTableImpl).String() case "guard-start-waiting": + // TODO(nvanbenschoten): rename this command to should-wait. var reqName string d.ScanArgs(t, "r", &reqName) g := guardsByReqName[reqName] if g == nil { d.Fatalf(t, "unknown guard: %s", reqName) } - return fmt.Sprintf("%t", g.startWaiting()) + return fmt.Sprintf("%t", g.ShouldWait()) case "guard-state": var reqName string @@ -361,12 +358,12 @@ func TestLockTableBasic(t *testing.T) { } var str string select { - case <-g.newState(): + case <-g.NewStateChan(): str = "new: " default: str = "old: " } - state := g.currentState() + state := g.CurState() var typeStr string switch state.stateKind { case waitForDistinguished: diff --git a/pkg/storage/intentresolver/intent_resolver.go b/pkg/storage/intentresolver/intent_resolver.go index acd9cda729ee..8819fa923b57 100644 --- a/pkg/storage/intentresolver/intent_resolver.go +++ b/pkg/storage/intentresolver/intent_resolver.go @@ -883,9 +883,7 @@ func (ir *IntentResolver) ResolveIntents( } var resolveReqs []resolveReq var resolveRangeReqs []roachpb.Request - for i := range intents { - intent := intents[i] // avoids a race in `i, intent := range ...` - + for _, intent := range intents { if len(intent.EndKey) == 0 { resolveReqs = append(resolveReqs, resolveReq{ diff --git a/pkg/storage/replica_init.go b/pkg/storage/replica_init.go index d393dd1630d6..39f25b07fa20 100644 --- a/pkg/storage/replica_init.go +++ b/pkg/storage/replica_init.go @@ -70,8 +70,8 @@ func newUnloadedReplica( RangeID: desc.RangeID, store: store, abortSpan: abortspan.New(desc.RangeID), - txnWaitQueue: txnwait.NewQueue(store), } + r.txnWaitQueue = txnwait.NewQueue(store, r) r.mu.pendingLeaseRequest = makePendingLeaseRequest(r) r.mu.stateLoader = stateloader.Make(desc.RangeID) r.mu.quiescent = true diff --git a/pkg/storage/replica_send.go b/pkg/storage/replica_send.go index cae0bd8f8ece..14d5a802c6c7 100644 --- a/pkg/storage/replica_send.go +++ b/pkg/storage/replica_send.go @@ -311,7 +311,7 @@ func (r *Replica) handleTransactionPushError( return pErr } // Enqueue unsuccessfully pushed transaction on the txnWaitQueue and retry. - r.txnWaitQueue.Enqueue(&t.PusheeTxn) + r.txnWaitQueue.EnqueueTxn(&t.PusheeTxn) return nil } diff --git a/pkg/storage/spanset/spanset.go b/pkg/storage/spanset/spanset.go index da68a2479ee5..b616c37fef48 100644 --- a/pkg/storage/spanset/spanset.go +++ b/pkg/storage/spanset/spanset.go @@ -137,6 +137,7 @@ func (s *SpanSet) SortAndDedup() { for sa := SpanAccess(0); sa < NumSpanAccess; sa++ { for ss := SpanScope(0); ss < NumSpanScope; ss++ { s.spans[sa][ss], _ /* distinct */ = mergeSpans(s.spans[sa][ss]) + // TODO(nvanbenschoten): dedup across accesses. } } } diff --git a/pkg/storage/store_send.go b/pkg/storage/store_send.go index ab2c89c504a5..ce36e965dbe7 100644 --- a/pkg/storage/store_send.go +++ b/pkg/storage/store_send.go @@ -250,7 +250,7 @@ func (r *Replica) maybeWaitForPushee( return nil, nil } pushReq := ba.Requests[0].GetInner().(*roachpb.PushTxnRequest) - pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, r, pushReq) + pushResp, pErr := r.txnWaitQueue.MaybeWaitForPush(ctx, pushReq) if pErr != nil { return nil, pErr } else if pushResp != nil { @@ -262,7 +262,7 @@ func (r *Replica) maybeWaitForPushee( // For query txn requests, wait in the txn wait queue either for // transaction update or for dependent transactions to change. queryReq := ba.Requests[0].GetInner().(*roachpb.QueryTxnRequest) - pErr := r.txnWaitQueue.MaybeWaitForQuery(ctx, r, queryReq) + pErr := r.txnWaitQueue.MaybeWaitForQuery(ctx, queryReq) if pErr != nil { return nil, pErr } diff --git a/pkg/storage/txn_wait_queue_test.go b/pkg/storage/txn_wait_queue_test.go index 868c5573dd3e..3f5e557c9eff 100644 --- a/pkg/storage/txn_wait_queue_test.go +++ b/pkg/storage/txn_wait_queue_test.go @@ -90,7 +90,7 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) { t.Fatal(err.Error()) } - q.Enqueue(txn) + q.EnqueueTxn(txn) if _, ok := q.TrackedTxns()[txn.ID]; !ok { t.Fatalf("expected pendingTxn to be in txns map after enqueue") } @@ -106,7 +106,7 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) { retCh := make(chan RespWithErr, 1) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req) + resp, pErr := q.MaybeWaitForPush(context.Background(), &req) retCh <- RespWithErr{resp, pErr} }() @@ -146,7 +146,7 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) { t.Errorf("expected GetDependents to return nil as queue is disabled; got %+v", deps) } - q.Enqueue(txn) + q.EnqueueTxn(txn) if q.IsEnabled() { t.Errorf("expected enqueue to silently fail since queue is disabled") } @@ -159,7 +159,7 @@ func TestTxnWaitQueueEnableDisable(t *testing.T) { t.Fatalf("expected update to silently fail since queue is disabled") } - if resp, pErr := q.MaybeWaitForPush(context.TODO(), tc.repl, &req); resp != nil || pErr != nil { + if resp, pErr := q.MaybeWaitForPush(context.TODO(), &req); resp != nil || pErr != nil { t.Errorf("expected nil resp and err as queue is disabled; got %+v, %s", resp, pErr) } if err := checkAllGaugesZero(tc); err != nil { @@ -190,7 +190,7 @@ func TestTxnWaitQueueCancel(t *testing.T) { if err := checkAllGaugesZero(tc); err != nil { t.Fatal(err.Error()) } - q.Enqueue(txn) + q.EnqueueTxn(txn) m := tc.store.GetTxnWaitMetrics() assert.EqualValues(tc, 1, m.PusheeWaiting.Value()) assert.EqualValues(tc, 0, m.PusherWaiting.Value()) @@ -198,7 +198,7 @@ func TestTxnWaitQueueCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) retCh := make(chan RespWithErr, 1) go func() { - resp, pErr := q.MaybeWaitForPush(ctx, tc.repl, &req) + resp, pErr := q.MaybeWaitForPush(ctx, &req) retCh <- RespWithErr{resp, pErr} }() @@ -253,13 +253,13 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) { q := tc.repl.txnWaitQueue q.Enable() - q.Enqueue(txn) + q.EnqueueTxn(txn) m := tc.store.GetTxnWaitMetrics() assert.EqualValues(tc, 1, m.PusheeWaiting.Value()) retCh := make(chan RespWithErr, 2) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req1) + resp, pErr := q.MaybeWaitForPush(context.Background(), &req1) retCh <- RespWithErr{resp, pErr} }() testutils.SucceedsSoon(t, func() error { @@ -283,7 +283,7 @@ func TestTxnWaitQueueUpdateTxn(t *testing.T) { }) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req2) + resp, pErr := q.MaybeWaitForPush(context.Background(), &req2) retCh <- RespWithErr{resp, pErr} }() testutils.SucceedsSoon(t, func() error { @@ -371,11 +371,11 @@ func TestTxnWaitQueueTxnSilentlyCompletes(t *testing.T) { q := tc.repl.txnWaitQueue q.Enable() - q.Enqueue(txn) + q.EnqueueTxn(txn) retCh := make(chan RespWithErr, 2) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, req) + resp, pErr := q.MaybeWaitForPush(context.Background(), req) retCh <- RespWithErr{resp, pErr} }() @@ -446,11 +446,11 @@ func TestTxnWaitQueueUpdateNotPushedTxn(t *testing.T) { q := tc.repl.txnWaitQueue q.Enable() - q.Enqueue(txn) + q.EnqueueTxn(txn) retCh := make(chan RespWithErr, 1) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req) + resp, pErr := q.MaybeWaitForPush(context.Background(), &req) retCh <- RespWithErr{resp, pErr} }() @@ -521,11 +521,11 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) { q := tc.repl.txnWaitQueue q.Enable() - q.Enqueue(txn) + q.EnqueueTxn(txn) retCh := make(chan RespWithErr, 2) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req1) + resp, pErr := q.MaybeWaitForPush(context.Background(), &req1) retCh <- RespWithErr{resp, pErr} }() testutils.SucceedsSoon(t, func() error { @@ -537,7 +537,7 @@ func TestTxnWaitQueuePusheeExpires(t *testing.T) { }) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req2) + resp, pErr := q.MaybeWaitForPush(context.Background(), &req2) retCh <- RespWithErr{resp, pErr} }() testutils.SucceedsSoon(t, func() error { @@ -609,11 +609,11 @@ func TestTxnWaitQueuePusherUpdate(t *testing.T) { q := tc.repl.txnWaitQueue q.Enable() - q.Enqueue(txn) + q.EnqueueTxn(txn) retCh := make(chan RespWithErr, 1) go func() { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, &req) + resp, pErr := q.MaybeWaitForPush(context.Background(), &req) retCh <- RespWithErr{resp, pErr} }() @@ -724,7 +724,7 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() for _, txn := range []*roachpb.Transaction{txnA, txnB, txnC} { - q.Enqueue(txn) + q.EnqueueTxn(txn) } m := tc.store.GetTxnWaitMetrics() assert.EqualValues(tc, 0, m.DeadlocksTotal.Count()) @@ -733,7 +733,7 @@ func TestTxnWaitQueueDependencyCycle(t *testing.T) { retCh := make(chan ReqWithRespAndErr, len(reqs)) for _, req := range reqs { go func(req *roachpb.PushTxnRequest) { - resp, pErr := q.MaybeWaitForPush(ctx, tc.repl, req) + resp, pErr := q.MaybeWaitForPush(ctx, req) retCh <- ReqWithRespAndErr{req, resp, pErr} }(req) } @@ -813,7 +813,7 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) { q.Enable() for _, txn := range []*roachpb.Transaction{txnA, txnB} { - q.Enqueue(txn) + q.EnqueueTxn(txn) } m := tc.store.GetTxnWaitMetrics() assert.EqualValues(tc, 0, m.DeadlocksTotal.Count()) @@ -822,7 +822,7 @@ func TestTxnWaitQueueDependencyCycleWithPriorityInversion(t *testing.T) { retCh := make(chan ReqWithRespAndErr, len(reqs)) for _, req := range reqs { go func(req *roachpb.PushTxnRequest) { - resp, pErr := q.MaybeWaitForPush(context.Background(), tc.repl, req) + resp, pErr := q.MaybeWaitForPush(context.Background(), req) retCh <- ReqWithRespAndErr{req, resp, pErr} }(req) } diff --git a/pkg/storage/txnwait/queue.go b/pkg/storage/txnwait/queue.go index 69fd59322bc7..02bab41d9a8a 100644 --- a/pkg/storage/txnwait/queue.go +++ b/pkg/storage/txnwait/queue.go @@ -186,6 +186,7 @@ type TestingKnobs struct { // Queue is thread safe. type Queue struct { store StoreInterface + repl ReplicaInterface mu struct { syncutil.Mutex txns map[uuid.UUID]*pendingTxn @@ -194,9 +195,10 @@ type Queue struct { } // NewQueue instantiates a new Queue. -func NewQueue(store StoreInterface) *Queue { +func NewQueue(store StoreInterface, repl ReplicaInterface) *Queue { return &Queue{ store: store, + repl: repl, } } @@ -275,10 +277,10 @@ func (q *Queue) IsEnabled() bool { return q.mu.txns != nil } -// Enqueue creates a new pendingTxn for the target txn of a failed +// EnqueueTxn creates a new pendingTxn for the target txn of a failed // PushTxn command. Subsequent PushTxn requests for the same txn // will be enqueued behind the pendingTxn via MaybeWait(). -func (q *Queue) Enqueue(txn *roachpb.Transaction) { +func (q *Queue) EnqueueTxn(txn *roachpb.Transaction) { q.mu.Lock() defer q.mu.Unlock() if q.mu.txns == nil { @@ -329,7 +331,7 @@ func (q *Queue) UpdateTxn(ctx context.Context, txn *roachpb.Transaction) { q.store.GetTxnWaitMetrics().PusherWaiting.Dec(int64(len(waitingPushes))) if log.V(1) && len(waitingPushes) > 0 { - log.Infof(context.Background(), "updating %d push waiters for %s", len(waitingPushes), txn.ID.Short()) + log.Infof(ctx, "updating %d push waiters for %s", len(waitingPushes), txn.ID.Short()) } // Send on pending waiter channels outside of the mutex lock. for _, w := range waitingPushes { @@ -399,7 +401,7 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID // If the transaction is successfully pushed while this method is waiting, // the first return value is a non-nil PushTxnResponse object. func (q *Queue) MaybeWaitForPush( - ctx context.Context, repl ReplicaInterface, req *roachpb.PushTxnRequest, + ctx context.Context, req *roachpb.PushTxnRequest, ) (*roachpb.PushTxnResponse, *roachpb.Error) { if ShouldPushImmediately(req) { return nil, nil @@ -411,7 +413,7 @@ func (q *Queue) MaybeWaitForPush( // outside of the replica after a split or merge. Note that the // ContainsKey check is done under the txn wait queue's lock to // ensure that it's not cleared before an incorrect insertion happens. - if q.mu.txns == nil || !repl.ContainsKey(req.Key) { + if q.mu.txns == nil || !q.repl.ContainsKey(req.Key) { q.mu.Unlock() return nil, nil } @@ -668,7 +670,7 @@ func (q *Queue) MaybeWaitForPush( // there is a queue, enqueue this request as a waiter and enter a // select loop waiting for any updates to the target transaction. func (q *Queue) MaybeWaitForQuery( - ctx context.Context, repl ReplicaInterface, req *roachpb.QueryTxnRequest, + ctx context.Context, req *roachpb.QueryTxnRequest, ) *roachpb.Error { if !req.WaitForUpdate { return nil @@ -680,7 +682,7 @@ func (q *Queue) MaybeWaitForQuery( // outside of the replica after a split or merge. Note that the // ContainsKey check is done under the txn wait queue's lock to // ensure that it's not cleared before an incorrect insertion happens. - if q.mu.txns == nil || !repl.ContainsKey(req.Key) { + if q.mu.txns == nil || !q.repl.ContainsKey(req.Key) { q.mu.Unlock() return nil } diff --git a/pkg/storage/txnwait/queue_test.go b/pkg/storage/txnwait/queue_test.go index 040d75481305..ae90a1759bb6 100644 --- a/pkg/storage/txnwait/queue_test.go +++ b/pkg/storage/txnwait/queue_test.go @@ -205,14 +205,14 @@ func TestMaybeWaitForQueryWithContextCancellation(t *testing.T) { defer leaktest.AfterTest(t)() ms := newMockStore(nil) defer ms.Stopper().Stop(context.Background()) - q := NewQueue(ms) + q := NewQueue(ms, mockRepl{}) q.Enable() ctx, cancel := context.WithCancel(context.Background()) waitingRes := make(chan *roachpb.Error) go func() { req := &roachpb.QueryTxnRequest{WaitForUpdate: true} - waitingRes <- q.MaybeWaitForQuery(ctx, mockRepl{}, req) + waitingRes <- q.MaybeWaitForQuery(ctx, req) }() cancel() @@ -247,7 +247,7 @@ func TestPushersReleasedAfterAnyQueryTxnFindsAbortedTxn(t *testing.T) { return mockSender(ctx, ba) }) defer ms.Stopper().Stop(context.Background()) - q := NewQueue(ms) + q := NewQueue(ms, mockRepl{}) q.Enable() // Set an extremely high transaction liveness threshold so that the pushee @@ -256,7 +256,7 @@ func TestPushersReleasedAfterAnyQueryTxnFindsAbortedTxn(t *testing.T) { // Enqueue pushee transaction in the queue. txn := roachpb.MakeTransaction("test", nil, 0, ms.Clock().Now(), 0) - q.Enqueue(&txn) + q.EnqueueTxn(&txn) const numPushees = 3 var queryTxnCount int32 @@ -285,7 +285,7 @@ func TestPushersReleasedAfterAnyQueryTxnFindsAbortedTxn(t *testing.T) { defer wg.Done() ctx := context.Background() req := roachpb.PushTxnRequest{PusheeTxn: txn.TxnMeta, PushType: roachpb.PUSH_ABORT} - res, err := q.MaybeWaitForPush(ctx, mockRepl{}, &req) + res, err := q.MaybeWaitForPush(ctx, &req) require.Nil(t, err) require.NotNil(t, res) require.Equal(t, roachpb.ABORTED, res.PusheeTxn.Status)