Skip to content

Commit

Permalink
tf: Add TfSpinRWMutex, which performs better than tbb::spin_rw_mutex …
Browse files Browse the repository at this point in the history
…with low

contention (which is when spin locks are appropriate) and when there is high
reader contention with low writer contention (which is how we mostly use
reader/writer locks).  It performs slightly worse when there is simultaneously
reader and writer contention, but spin locks should not be used in those
situations.  Also change TfBigRWMutex to use TfSpinRWMutex internally,
improving its throughput under high reader contention.

(Internal change: 2256317)
(Internal change: 2257556)
  • Loading branch information
gitamohr authored and pixar-oss committed Dec 8, 2022
1 parent a7f81a0 commit dcfd31b
Show file tree
Hide file tree
Showing 6 changed files with 495 additions and 74 deletions.
1 change: 1 addition & 0 deletions pxr/base/tf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pxr_library(tf
setenv
singleton
smallVector
spinRWMutex
stackTrace
stacked
status
Expand Down
77 changes: 32 additions & 45 deletions pxr/base/tf/bigRWMutex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,76 +31,63 @@ PXR_NAMESPACE_OPEN_SCOPE

TfBigRWMutex::TfBigRWMutex()
: _states(std::make_unique<_LockState []>(NumStates))
, _writerWaiting(false)
, _writerActive(false)
{
}

int
void
TfBigRWMutex::_AcquireReadContended(int stateIndex)
{
retry:
// First check _writerWaiting and wait until we see that set to false if
// need be.
while (_writerWaiting == true) {
std::this_thread::yield();
}

// Now try to bump the reader count on our state index. If we see a write
// lock state, go back to waiting for any pending writer. If we fail to
// bump the count, move to the next slot (and wrap around).
for (int i = (stateIndex + 1) % NumStates; ; i = (i + 1) % NumStates) {
_LockState &lockState = _states[i];

int stateVal = lockState.state;
if (stateVal == WriteLocked) {
// First check _writerActive and wait until we see that set to false.
while (true) {
if (_writerActive) {
std::this_thread::yield();
goto retry;
}

// Otherwise try to increment the count.
if (lockState.state.compare_exchange_strong(stateVal, stateVal+1)) {
// Success! Record the state we used to mark this lock as
// acquired.
return i;
else if (_states[stateIndex].mutex.TryAcquireRead()) {
break;
}
// Otherwise we advance to the next state index and try there.
}
}

void
TfBigRWMutex::_AcquireWrite()
{
// First, we need to take _writerWaiting from false -> true.
bool writerWaits = false;
while (!_writerWaiting.compare_exchange_weak(writerWaits, true)) {
std::this_thread::yield();
writerWaits = false;
}

// Now, we need to wait for all pending readers to finish and lock out any
// new ones.
for (_LockState *lockState = _states.get(),
*end = _states.get() + NumStates; lockState != end;
++lockState) {

int expected = NotLocked;
while (!lockState->state.compare_exchange_weak(expected, WriteLocked)) {
while (_writerActive.exchange(true) == true) {
// Another writer is active, wait to see false and retry.
do {
std::this_thread::yield();
expected = NotLocked;
}
} while (_writerActive);
}

// Use the staged-acquire API that TfSpinRWMutex supplies so that we can
// acquire the write locks while simultaneously waiting for readers on the
// other locks to complete. Otherwise we'd have to wait for all pending
// readers on the Nth lock before beginning to take the N+1th lock.
TfSpinRWMutex::_StagedAcquireWriteState
stageStates[NumStates] { TfSpinRWMutex::_StageNotAcquired };

bool allAcquired;
do {
allAcquired = true;
for (int i = 0; i != NumStates; ++i) {
stageStates[i] =
_states[i].mutex._StagedAcquireWriteStep(stageStates[i]);
allAcquired &= (stageStates[i] == TfSpinRWMutex::_StageAcquired);
}
} while (!allAcquired);
}

void
TfBigRWMutex::_ReleaseWrite()
{
// Restore all the read lock states to 0 and set writerWaits to false.
_writerActive = false;

// Release all the write locks.
for (_LockState *lockState = _states.get(),
*end = _states.get() + NumStates; lockState != end;
++lockState) {
lockState->state = NotLocked;
lockState->mutex.ReleaseWrite();
}
_writerWaiting = false;
}

PXR_NAMESPACE_CLOSE_SCOPE
40 changes: 12 additions & 28 deletions pxr/base/tf/bigRWMutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "pxr/base/arch/hints.h"
#include "pxr/base/tf/diagnosticLite.h"
#include "pxr/base/tf/hash.h"
#include "pxr/base/tf/spinRWMutex.h"

#include <atomic>
#include <memory>
Expand Down Expand Up @@ -152,12 +153,6 @@ class TfBigRWMutex
void AcquireRead() {
TF_AXIOM(_acqState == NotAcquired);
_acqState = _mutex->_AcquireRead(_GetSeed());
// Inform the compiler that the value of _acqState must be >=0 now.
// Without this hint, the compiler must generate code to handle
// releasing from the WriteAcquired state and the NotAcquired state
// in the subsequent Release() call, since it does not know that
// _AcquireRead() can only return values >=0.
ARCH_GUARANTEE_TO_COMPILER(_acqState >= 0);
}

/// Acquire a write lock on this lock's associated mutex. This lock
Expand Down Expand Up @@ -196,8 +191,7 @@ class TfBigRWMutex
}

// Helper for returning a seed value associated with this lock object.
// This used to help determine which lock state a read-locking scope
// should start with.
// This helps determine which lock state a read-lock should use.
inline int _GetSeed() const {
return static_cast<int>(
static_cast<unsigned>(TfHash()(this)) >> 8);
Expand All @@ -213,45 +207,35 @@ class TfBigRWMutex

// Optimistic read-lock case inlined.
inline int _AcquireRead(int seed) {
// Determine an initial lock state index to use.
// Determine a lock state index to use.
int stateIndex = seed % NumStates;

// Optimistic path:
{
_LockState &lockState = _states[stateIndex];
int state = lockState.state.load();
if (state != WriteLocked && !_writerWaiting &&
lockState.state.compare_exchange_strong(state, state + 1)) {
// Success!
return stateIndex;
}
if (ARCH_UNLIKELY(_writerActive) ||
!_states[stateIndex].mutex.TryAcquireRead()) {
_AcquireReadContended(stateIndex);
}
// Contended case out-of-line:
return _AcquireReadContended(stateIndex);
return stateIndex;
}

// Contended read-lock helper.
TF_API int _AcquireReadContended(int stateIndex);
TF_API void _AcquireReadContended(int stateIndex);

void _ReleaseRead(int stateIndex) {
// Just decrement the read count on the state we hold.
--_states[stateIndex].state;
_states[stateIndex].mutex.ReleaseRead();
}

TF_API void _AcquireWrite();
TF_API void _ReleaseWrite();

struct _LockState {
_LockState() : state(0) {}
std::atomic<int> state;
TfSpinRWMutex mutex;
// This padding ensures that \p state instances sit on different cache
// lines.
char _unused_padding[
ARCH_CACHE_LINE_SIZE-(sizeof(state) % ARCH_CACHE_LINE_SIZE)];
ARCH_CACHE_LINE_SIZE-(sizeof(mutex) % ARCH_CACHE_LINE_SIZE)];
};

std::unique_ptr<_LockState []> _states;
std::atomic<bool> _writerWaiting;
std::atomic<bool> _writerActive;

};

Expand Down
80 changes: 80 additions & 0 deletions pxr/base/tf/spinRWMutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//
// Copyright 2022 Pixar
//
// Licensed under the Apache License, Version 2.0 (the "Apache License")
// with the following modification; you may not use this file except in
// compliance with the Apache License and the following modification to it:
// Section 6. Trademarks. is deleted and replaced with:
//
// 6. Trademarks. This License does not grant permission to use the trade
// names, trademarks, service marks, or product names of the Licensor
// and its affiliates, except as required to comply with Section 4(c) of
// the License and to reproduce the content of the NOTICE file.
//
// You may obtain a copy of the Apache License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the Apache License with the above modification is
// distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the Apache License for the specific
// language governing permissions and limitations under the Apache License.
//

#include "pxr/pxr.h"

#include "pxr/base/tf/spinRWMutex.h"
#include "pxr/base/arch/defines.h"

// Needed for ARCH_SPIN_PAUSE on Windows in builds with precompiled
// headers disabled.
#ifdef ARCH_COMPILER_MSVC
#include <intrin.h>
#endif

#include <thread>

PXR_NAMESPACE_OPEN_SCOPE

static constexpr int SpinsBeforeBackoff = 32;

template <class Fn>
static void WaitWithBackoff(Fn &&fn) {
// Hope for the best...
if (ARCH_LIKELY(fn())) {
return;
}
// Otherwise spin for a bit...
for (int i = 0; i != SpinsBeforeBackoff; ++i) {
ARCH_SPIN_PAUSE();
if (fn()) {
return;
}
}
// Keep checking but yield our thread...
do {
std::this_thread::yield();
} while (!fn());
}


void
TfSpinRWMutex::_WaitForWriter() const
{
// Wait until we see a cleared WriterFlag.
WaitWithBackoff([this]() {
return !(_lockState.load() & WriterFlag);
});
}

void
TfSpinRWMutex::_WaitForReaders() const
{
// Wait until we see zero readers.
WaitWithBackoff([this]() {
return _lockState.load() == WriterFlag;
});
}

PXR_NAMESPACE_CLOSE_SCOPE
Loading

0 comments on commit dcfd31b

Please sign in to comment.