diff --git a/pxr/base/tf/CMakeLists.txt b/pxr/base/tf/CMakeLists.txt index 1d31f71372..98fe862524 100644 --- a/pxr/base/tf/CMakeLists.txt +++ b/pxr/base/tf/CMakeLists.txt @@ -70,6 +70,7 @@ pxr_library(tf setenv singleton smallVector + spinRWMutex stackTrace stacked status diff --git a/pxr/base/tf/bigRWMutex.cpp b/pxr/base/tf/bigRWMutex.cpp index d3fef3fad2..8ebc88273b 100644 --- a/pxr/base/tf/bigRWMutex.cpp +++ b/pxr/base/tf/bigRWMutex.cpp @@ -31,76 +31,63 @@ PXR_NAMESPACE_OPEN_SCOPE TfBigRWMutex::TfBigRWMutex() : _states(std::make_unique<_LockState []>(NumStates)) - , _writerWaiting(false) + , _writerActive(false) { } -int +void TfBigRWMutex::_AcquireReadContended(int stateIndex) { - retry: - // First check _writerWaiting and wait until we see that set to false if - // need be. - while (_writerWaiting == true) { - std::this_thread::yield(); - } - - // Now try to bump the reader count on our state index. If we see a write - // lock state, go back to waiting for any pending writer. If we fail to - // bump the count, move to the next slot (and wrap around). - for (int i = (stateIndex + 1) % NumStates; ; i = (i + 1) % NumStates) { - _LockState &lockState = _states[i]; - - int stateVal = lockState.state; - if (stateVal == WriteLocked) { + // First check _writerActive and wait until we see that set to false. + while (true) { + if (_writerActive) { std::this_thread::yield(); - goto retry; } - - // Otherwise try to increment the count. - if (lockState.state.compare_exchange_strong(stateVal, stateVal+1)) { - // Success! Record the state we used to mark this lock as - // acquired. - return i; + else if (_states[stateIndex].mutex.TryAcquireRead()) { + break; } - // Otherwise we advance to the next state index and try there. } } void TfBigRWMutex::_AcquireWrite() { - // First, we need to take _writerWaiting from false -> true. - bool writerWaits = false; - while (!_writerWaiting.compare_exchange_weak(writerWaits, true)) { - std::this_thread::yield(); - writerWaits = false; - } - - // Now, we need to wait for all pending readers to finish and lock out any - // new ones. - for (_LockState *lockState = _states.get(), - *end = _states.get() + NumStates; lockState != end; - ++lockState) { - - int expected = NotLocked; - while (!lockState->state.compare_exchange_weak(expected, WriteLocked)) { + while (_writerActive.exchange(true) == true) { + // Another writer is active, wait to see false and retry. + do { std::this_thread::yield(); - expected = NotLocked; - } + } while (_writerActive); } + + // Use the staged-acquire API that TfSpinRWMutex supplies so that we can + // acquire the write locks while simultaneously waiting for readers on the + // other locks to complete. Otherwise we'd have to wait for all pending + // readers on the Nth lock before beginning to take the N+1th lock. + TfSpinRWMutex::_StagedAcquireWriteState + stageStates[NumStates] { TfSpinRWMutex::_StageNotAcquired }; + + bool allAcquired; + do { + allAcquired = true; + for (int i = 0; i != NumStates; ++i) { + stageStates[i] = + _states[i].mutex._StagedAcquireWriteStep(stageStates[i]); + allAcquired &= (stageStates[i] == TfSpinRWMutex::_StageAcquired); + } + } while (!allAcquired); } void TfBigRWMutex::_ReleaseWrite() { - // Restore all the read lock states to 0 and set writerWaits to false. + _writerActive = false; + + // Release all the write locks. for (_LockState *lockState = _states.get(), *end = _states.get() + NumStates; lockState != end; ++lockState) { - lockState->state = NotLocked; + lockState->mutex.ReleaseWrite(); } - _writerWaiting = false; } PXR_NAMESPACE_CLOSE_SCOPE diff --git a/pxr/base/tf/bigRWMutex.h b/pxr/base/tf/bigRWMutex.h index 3f3d440aa5..0b1061d259 100644 --- a/pxr/base/tf/bigRWMutex.h +++ b/pxr/base/tf/bigRWMutex.h @@ -31,6 +31,7 @@ #include "pxr/base/arch/hints.h" #include "pxr/base/tf/diagnosticLite.h" #include "pxr/base/tf/hash.h" +#include "pxr/base/tf/spinRWMutex.h" #include #include @@ -152,12 +153,6 @@ class TfBigRWMutex void AcquireRead() { TF_AXIOM(_acqState == NotAcquired); _acqState = _mutex->_AcquireRead(_GetSeed()); - // Inform the compiler that the value of _acqState must be >=0 now. - // Without this hint, the compiler must generate code to handle - // releasing from the WriteAcquired state and the NotAcquired state - // in the subsequent Release() call, since it does not know that - // _AcquireRead() can only return values >=0. - ARCH_GUARANTEE_TO_COMPILER(_acqState >= 0); } /// Acquire a write lock on this lock's associated mutex. This lock @@ -196,8 +191,7 @@ class TfBigRWMutex } // Helper for returning a seed value associated with this lock object. - // This used to help determine which lock state a read-locking scope - // should start with. + // This helps determine which lock state a read-lock should use. inline int _GetSeed() const { return static_cast( static_cast(TfHash()(this)) >> 8); @@ -213,45 +207,35 @@ class TfBigRWMutex // Optimistic read-lock case inlined. inline int _AcquireRead(int seed) { - // Determine an initial lock state index to use. + // Determine a lock state index to use. int stateIndex = seed % NumStates; - - // Optimistic path: - { - _LockState &lockState = _states[stateIndex]; - int state = lockState.state.load(); - if (state != WriteLocked && !_writerWaiting && - lockState.state.compare_exchange_strong(state, state + 1)) { - // Success! - return stateIndex; - } + if (ARCH_UNLIKELY(_writerActive) || + !_states[stateIndex].mutex.TryAcquireRead()) { + _AcquireReadContended(stateIndex); } - // Contended case out-of-line: - return _AcquireReadContended(stateIndex); + return stateIndex; } // Contended read-lock helper. - TF_API int _AcquireReadContended(int stateIndex); + TF_API void _AcquireReadContended(int stateIndex); void _ReleaseRead(int stateIndex) { - // Just decrement the read count on the state we hold. - --_states[stateIndex].state; + _states[stateIndex].mutex.ReleaseRead(); } TF_API void _AcquireWrite(); TF_API void _ReleaseWrite(); struct _LockState { - _LockState() : state(0) {} - std::atomic state; + TfSpinRWMutex mutex; // This padding ensures that \p state instances sit on different cache // lines. char _unused_padding[ - ARCH_CACHE_LINE_SIZE-(sizeof(state) % ARCH_CACHE_LINE_SIZE)]; + ARCH_CACHE_LINE_SIZE-(sizeof(mutex) % ARCH_CACHE_LINE_SIZE)]; }; std::unique_ptr<_LockState []> _states; - std::atomic _writerWaiting; + std::atomic _writerActive; }; diff --git a/pxr/base/tf/spinRWMutex.cpp b/pxr/base/tf/spinRWMutex.cpp new file mode 100644 index 0000000000..9fdbdd8bdc --- /dev/null +++ b/pxr/base/tf/spinRWMutex.cpp @@ -0,0 +1,80 @@ +// +// Copyright 2022 Pixar +// +// Licensed under the Apache License, Version 2.0 (the "Apache License") +// with the following modification; you may not use this file except in +// compliance with the Apache License and the following modification to it: +// Section 6. Trademarks. is deleted and replaced with: +// +// 6. Trademarks. This License does not grant permission to use the trade +// names, trademarks, service marks, or product names of the Licensor +// and its affiliates, except as required to comply with Section 4(c) of +// the License and to reproduce the content of the NOTICE file. +// +// You may obtain a copy of the Apache License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License with the above modification is +// distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the Apache License for the specific +// language governing permissions and limitations under the Apache License. +// + +#include "pxr/pxr.h" + +#include "pxr/base/tf/spinRWMutex.h" +#include "pxr/base/arch/defines.h" + +// Needed for ARCH_SPIN_PAUSE on Windows in builds with precompiled +// headers disabled. +#ifdef ARCH_COMPILER_MSVC +#include +#endif + +#include + +PXR_NAMESPACE_OPEN_SCOPE + +static constexpr int SpinsBeforeBackoff = 32; + +template +static void WaitWithBackoff(Fn &&fn) { + // Hope for the best... + if (ARCH_LIKELY(fn())) { + return; + } + // Otherwise spin for a bit... + for (int i = 0; i != SpinsBeforeBackoff; ++i) { + ARCH_SPIN_PAUSE(); + if (fn()) { + return; + } + } + // Keep checking but yield our thread... + do { + std::this_thread::yield(); + } while (!fn()); +} + + +void +TfSpinRWMutex::_WaitForWriter() const +{ + // Wait until we see a cleared WriterFlag. + WaitWithBackoff([this]() { + return !(_lockState.load() & WriterFlag); + }); +} + +void +TfSpinRWMutex::_WaitForReaders() const +{ + // Wait until we see zero readers. + WaitWithBackoff([this]() { + return _lockState.load() == WriterFlag; + }); +} + +PXR_NAMESPACE_CLOSE_SCOPE diff --git a/pxr/base/tf/spinRWMutex.h b/pxr/base/tf/spinRWMutex.h new file mode 100644 index 0000000000..34213cff9d --- /dev/null +++ b/pxr/base/tf/spinRWMutex.h @@ -0,0 +1,360 @@ +// +// Copyright 2022 Pixar +// +// Licensed under the Apache License, Version 2.0 (the "Apache License") +// with the following modification; you may not use this file except in +// compliance with the Apache License and the following modification to it: +// Section 6. Trademarks. is deleted and replaced with: +// +// 6. Trademarks. This License does not grant permission to use the trade +// names, trademarks, service marks, or product names of the Licensor +// and its affiliates, except as required to comply with Section 4(c) of +// the License and to reproduce the content of the NOTICE file. +// +// You may obtain a copy of the Apache License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the Apache License with the above modification is +// distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the Apache License for the specific +// language governing permissions and limitations under the Apache License. +// +#ifndef PXR_BASE_TF_SPIN_RW_MUTEX_H +#define PXR_BASE_TF_SPIN_RW_MUTEX_H + +#include "pxr/pxr.h" +#include "pxr/base/tf/api.h" + +#include "pxr/base/arch/hints.h" +#include "pxr/base/tf/diagnosticLite.h" + +#include + +PXR_NAMESPACE_OPEN_SCOPE + +/// \class TfSpinRWMutex +/// +/// This class implements a readers-writer spin lock that emphasizes throughput +/// when there is light contention or moderate contention dominated by readers. +/// Like all spin locks, significant contention performs poorly; consider a +/// different algorithm design or synchronization strategy in that case. +/// +/// In the best case, acquiring a read lock is an atomic add followed by a +/// conditional branch, and acquiring a write lock is an atomic bitwise-or +/// followed by a conditional branch. +/// +/// When contended by only readers, acquiring a read lock is the same: an atomic +/// add followed by a conditional branch. Of course the shared cache line being +/// concurrently read and modified will affect performance. +/// +/// In the worst case, acquiring a read lock does the atomic add and conditional +/// branch, but the condition shows writer activity, so the add must be undone +/// by a subtraction, and then the thread must wait to see no writer activity +/// before trying again. +/// +/// Similarly in the worst case for acquiring a write lock, the thread does the +/// atomic bitwise-or, but sees another active writer, and then must wait to see +/// no writer activity before trying again. Once the exclusive-or is done +/// successfully, then the writer must wait for any pending readers to clear out +/// before it can proceed. +/// +/// This class provides a nested TfSpinRWMutex::ScopedLock that makes it easy to +/// acquire locks, upgrade reader to writer, downgrade writer to reader, and +/// have those locks automatically release when the ScopedLock is destroyed. +/// +class TfSpinRWMutex +{ + static constexpr int OneReader = 2; + static constexpr int WriterFlag = 1; + +public: + + /// Construct a mutex, initially unlocked. + TfSpinRWMutex() : _lockState(0) {} + + friend struct ScopedLock; + + /// Scoped lock utility class. API modeled roughly after + /// tbb::spin_rw_mutex::scoped_lock. + struct ScopedLock { + + // Acquisition states. + static constexpr int NotAcquired = 0; + static constexpr int ReadAcquired = 1; + static constexpr int WriteAcquired = 2; + + /// Construct a scoped lock for mutex \p m and acquire either a read or + /// a write lock depending on \p write. + explicit ScopedLock(TfSpinRWMutex &m, bool write=true) + : _mutex(&m) + , _acqState(NotAcquired) { + Acquire(write); + } + + /// Construct a scoped lock not associated with a \p mutex. + ScopedLock() : _mutex(nullptr), _acqState(NotAcquired) {} + + /// If this scoped lock is acquired for either read or write, Release() + /// it. + ~ScopedLock() { + Release(); + } + + /// If the current scoped lock is acquired, Release() it, then associate + /// this lock with \p m and acquire either a read or a write lock, + /// depending on \p write. + void Acquire(TfSpinRWMutex &m, bool write=true) { + Release(); + _mutex = &m; + Acquire(write); + } + + /// Acquire either a read or write lock on this lock's associated mutex + /// depending on \p write. This lock must be associated with a mutex + /// (typically by construction or by a call to Acquire() that takes a + /// mutex). This lock must not already be acquired when calling + /// Acquire(). + void Acquire(bool write=true) { + if (write) { + AcquireWrite(); + } + else { + AcquireRead(); + } + } + + /// Release the currently required lock on the associated mutex. If + /// this lock is not currently acquired, silently do nothing. + void Release() { + switch (_acqState) { + default: + case NotAcquired: + break; + case ReadAcquired: + _ReleaseRead(); + break; + case WriteAcquired: + _ReleaseWrite(); + break; + }; + } + + /// Acquire a read lock on this lock's associated mutex. This lock must + /// not already be acquired when calling \p AcquireRead(). + void AcquireRead() { + TF_DEV_AXIOM(_acqState == NotAcquired); + _mutex->AcquireRead(); + _acqState = ReadAcquired; + } + + /// Acquire a write lock on this lock's associated mutex. This lock + /// must not already be acquired when calling \p AcquireWrite(). + void AcquireWrite() { + TF_DEV_AXIOM(_acqState == NotAcquired); + _mutex->AcquireWrite(); + _acqState = WriteAcquired; + } + + /// Change this lock's acquisition state from a read lock to a write + /// lock. This lock must already be acquired for reading. Return true + /// if the upgrade occurred without releasing the read lock, false if it + /// was released. + bool UpgradeToWriter() { + TF_DEV_AXIOM(_acqState == ReadAcquired); + _acqState = WriteAcquired; + return _mutex->UpgradeToWriter(); + } + + /// Change this lock's acquisition state from a write lock to a read + /// lock. This lock must already be acquired for writing. Return true + /// if the downgrade occurred without releasing the write in the + /// interim, false if it was released and other writers may have + /// intervened. + bool DowngradeToReader() { + TF_DEV_AXIOM(_acqState == WriteAcquired); + _acqState = ReadAcquired; + return _mutex->DowngradeToReader(); + } + + private: + + void _ReleaseRead() { + TF_DEV_AXIOM(_acqState == ReadAcquired); + _mutex->ReleaseRead(); + _acqState = NotAcquired; + } + + void _ReleaseWrite() { + TF_DEV_AXIOM(_acqState == WriteAcquired); + _mutex->ReleaseWrite(); + _acqState = NotAcquired; + } + + TfSpinRWMutex *_mutex; + int _acqState; // NotAcquired (0), ReadAcquired (1), WriteAcquired (2) + }; + + /// Attempt to acquire a read lock on this mutex without waiting for + /// writers. This thread must not already hold a lock on this mutex (either + /// read or write). Return true if the lock is acquired, false otherwise. + inline bool TryAcquireRead() { + // Optimistically increment the reader count. + if (ARCH_LIKELY(!(_lockState.fetch_add(OneReader) & WriterFlag))) { + // We incremented the reader count and observed no writer activity, + // we have a read lock. + return true; + } + + // Otherwise there's writer activity. Undo the increment and return + // false. + _lockState -= OneReader; + return false; + } + + /// Acquire a read lock on this mutex. This thread must not already hold a + /// lock on this mutex (either read or write). Consider calling + /// DowngradeToReader() if this thread holds a write lock. + inline void AcquireRead() { + while (true) { + if (TryAcquireRead()) { + return; + } + // There's writer activity. Wait to see no writer activity and + // retry. + _WaitForWriter(); + } + } + + /// Release this thread's read lock on this mutex. + inline void ReleaseRead() { + // Just decrement the count. + _lockState -= OneReader; + } + + /// Attempt to acquire a write lock on this mutex without waiting for other + /// writers. This thread must not already hold a lock on this mutex (either + /// read or write). Return true if the lock is acquired, false otherwise. + inline bool TryAcquireWrite() { + int state = _lockState.fetch_or(WriterFlag); + if (!(state & WriterFlag)) { + // We set the flag, wait for readers. + if (state != 0) { + // Wait for pending readers. + _WaitForReaders(); + } + return true; + } + return false; + } + + /// Acquire a write lock on this mutex. This thread must not already hold a + /// lock on this mutex (either read or write). Consider calling + /// UpgradeToWriter() if this thread holds a read lock. + void AcquireWrite() { + // Attempt to acquire -- if we fail then wait to see no other writer and + // retry. + while (true) { + if (TryAcquireWrite()) { + return; + } + _WaitForWriter(); + } + } + + /// Release this thread's write lock on this mutex. + inline void ReleaseWrite() { + _lockState &= ~WriterFlag; + } + + /// Upgrade this thread's lock on this mutex (which must be a read lock) to + /// a write lock. Return true if the upgrade is done "atomically" meaning + /// that the read lock was not released (and thus no other writer could have + /// acquired the lock in the interim). Return false if this lock was + /// released and thus another writer could have taken the lock in the + /// interim. + bool UpgradeToWriter() { + // This thread owns a read lock, attempt to upgrade to write lock. If + // we do so without an intervening writer, return true, otherwise return + // false. + bool atomic = true; + while (true) { + int state = _lockState.fetch_or(WriterFlag); + if (!(state & WriterFlag)) { + // We set the flag, release our reader count and wait for any + // other pending readers. + if (_lockState.fetch_sub( + OneReader) != (OneReader | WriterFlag)) { + _WaitForReaders(); + } + return atomic; + } + // There was other writer activity -- wait for it to clear, then + // retry. + atomic = false; + _WaitForWriter(); + } + } + + /// Downgrade this mutex, which must be locked for write by this thread, to + /// being locked for read by this thread. Return true if the downgrade + /// happened "atomically", meaning that the write lock was not released (and + /// thus possibly acquired by another thread). This implementation + /// currently always returns true. + bool DowngradeToReader() { + // Simultaneously add a reader count and clear the writer bit by adding + // (OneReader-1). + _lockState += (OneReader-1); + return true; + } + +private: + friend class TfBigRWMutex; + + // Helpers for staged-acquire-write that BigRWMutex uses. + enum _StagedAcquireWriteState { + _StageNotAcquired, + _StageAcquiring, + _StageAcquired + }; + + // This API lets TfBigRWMutex acquire a write lock step-by-step so that it + // can begin acquiring write locks on several mutexes without waiting + // serially for pending readers to complete. Call _StagedAcquireWriteStep + // with _StageNotAcquired initially, and save the returned value. Continue + // repeatedly calling _StagedAcquireWriteStep, passing the previously + // returned value until this function returns _StageAcquired. At this + // point the write lock is acquired. + _StagedAcquireWriteState + _StagedAcquireWriteStep(_StagedAcquireWriteState curState) { + int state; + switch (curState) { + case _StageNotAcquired: + state = _lockState.fetch_or(WriterFlag); + if (!(state & WriterFlag)) { + // We set the flag. If there were no readers we're done, + // otherwise we'll have to wait for them, next step. + return state == 0 ? _StageAcquired : _StageAcquiring; + } + // Other writer activity, must retry next step. + return _StageNotAcquired; + case _StageAcquiring: + // We have set the writer flag but must wait to see no readers. + _WaitForReaders(); + return _StageAcquired; + case _StageAcquired: + default: + return _StageAcquired; + }; + } + + TF_API void _WaitForReaders() const; + TF_API void _WaitForWriter() const; + + std::atomic _lockState; +}; + +PXR_NAMESPACE_CLOSE_SCOPE + +#endif // PXR_BASE_TF_SPIN_RW_MUTEX_H diff --git a/pxr/base/tf/testenv/rwMutexes.cpp b/pxr/base/tf/testenv/rwMutexes.cpp index ea5683c6f0..6070bdc417 100644 --- a/pxr/base/tf/testenv/rwMutexes.cpp +++ b/pxr/base/tf/testenv/rwMutexes.cpp @@ -25,6 +25,7 @@ #include "pxr/pxr.h" #include "pxr/base/tf/regTest.h" #include "pxr/base/tf/bigRWMutex.h" +#include "pxr/base/tf/spinRWMutex.h" #include "pxr/base/tf/stringUtils.h" #include "pxr/base/tf/stopwatch.h" @@ -40,6 +41,13 @@ struct BigRW using LockType = TfBigRWMutex::ScopedLock; }; +struct SpinRW +{ + static constexpr const char *Label = "TfSpinRWMutex"; + using MutexType = TfSpinRWMutex; + using LockType = TfSpinRWMutex::ScopedLock; +}; + struct TbbSpinRW { static constexpr const char *Label = "tbb::spin_rw_mutex"; @@ -66,7 +74,7 @@ Test_RWMutexThroughput() // Make a bunch of threads that mostly read, but occasionally write. - int numThreads = std::thread::hardware_concurrency()-1; + const int numThreads = std::thread::hardware_concurrency()-1; std::vector threads(numThreads); for (auto &t: threads) { t = std::thread([&mutex, &value, numSeconds]() { @@ -106,6 +114,7 @@ Test_RWMutexThroughput() static bool Test_TfRWMutexes() { + Test_RWMutexThroughput(); Test_RWMutexThroughput(); Test_RWMutexThroughput(); Test_RWMutexThroughput();