Skip to content

Commit

Permalink
*: Introduce MutexProtected and SharedMutexProtected (#9115)
Browse files Browse the repository at this point in the history
ref #6233

Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
Lloyd-Pottiger and ti-chi-bot[bot] committed Jul 16, 2024
1 parent bf69caf commit 1ee7da7
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 46 deletions.
101 changes: 101 additions & 0 deletions dbms/src/Common/MutexProtected.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/nocopyable.h>
#include <common/defines.h>
#include <common/types.h>

namespace DB
{

template <typename T>
class MutexProtected
{
DISALLOW_COPY_AND_MOVE(MutexProtected);

private:
template <typename U>
class Locked
{
DISALLOW_COPY_AND_MOVE(Locked);

public:
Locked(U & value_, std::mutex & mutex_)
: value(value_)
, locker(mutex_)
{}

ALWAYS_INLINE inline U const * operator->() const { return &value; }
ALWAYS_INLINE inline U const & operator*() const { return value; }

ALWAYS_INLINE inline U * operator->() { return &value; }
ALWAYS_INLINE inline U & operator*() { return value; }

ALWAYS_INLINE inline U const & get() const { return value; }
ALWAYS_INLINE inline U & get() { return value; }

private:
U & value;
std::scoped_lock<std::mutex> locker;
};

auto lockConst() const { return Locked<T const>(value, mutex); }
auto lockMutable() { return Locked<T>(value, mutex); }

public:
template <typename... Args>
explicit MutexProtected(Args &&... args)
: value(forward<Args>(args)...)
{}

template <typename Callback>
decltype(auto) with(Callback callback) const
{
auto lock = lockConst();
return callback(*lock);
}

template <typename Callback>
decltype(auto) with(Callback callback)
{
auto lock = lockMutable();
return callback(*lock);
}

template <typename Callback>
void forEachConst(Callback callback) const
{
with([&](auto const & value) {
for (auto & item : value)
callback(item);
});
}

template <typename Callback>
void forEach(Callback callback)
{
with([&](auto & value) {
for (auto & item : value)
callback(item);
});
}

private:
T value;
mutable std::mutex mutex;
};

} // namespace DB
116 changes: 116 additions & 0 deletions dbms/src/Common/SharedMutexProtected.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/nocopyable.h>
#include <common/defines.h>
#include <common/types.h>

#include <shared_mutex>

namespace DB
{

template <typename T>
class SharedMutexProtected
{
DISALLOW_COPY_AND_MOVE(SharedMutexProtected);

private:
template <typename U, typename Lock>
class Locked
{
DISALLOW_COPY_AND_MOVE(Locked);

public:
Locked(U & value_, std::shared_mutex & mutex_)
: value(value_)
, locker(mutex_)
{}

ALWAYS_INLINE inline U const * operator->() const { return &value; }
ALWAYS_INLINE inline U const & operator*() const { return value; }

ALWAYS_INLINE inline U * operator->()
requires(!std::is_const_v<U>)
{
return &value;
}
ALWAYS_INLINE inline U & operator*()
requires(!std::is_const_v<U>)
{
return value;
}

ALWAYS_INLINE inline U const & get() const { return &value; }
ALWAYS_INLINE inline U & get()
requires(!std::is_const_v<U>)
{
return &value;
}

private:
U & value;
Lock locker;
};

public:
// Return a locked object that can be used to shared access the protected value.
// Please destroy the object ASAP to release the lock.
auto lockShared() const { return Locked<T const, std::shared_lock<std::shared_mutex>>(value, mutex); }
// Return a locked object that can be used to exclusive access the protected value.
// Please destroy the object ASAP to release the lock.
auto lockExclusive() { return Locked<T, std::unique_lock<std::shared_mutex>>(value, mutex); }

SharedMutexProtected() = default;

template <typename Callback>
decltype(auto) withShared(Callback callback) const
{
auto lock = lockShared();
return callback(*lock);
}

template <typename Callback>
decltype(auto) withExclusive(Callback callback)
{
auto lock = lockExclusive();
return callback(*lock);
}

template <typename Callback>
void forEachShared(Callback callback) const
{
withShared([&](auto const & value) {
for (auto & item : value)
callback(item);
});
}

template <typename Callback>
void forEachExclusive(Callback callback)
{
withExclusive([&](auto & value) {
for (auto & item : value)
callback(item);
});
}

private:
T value;
mutable std::shared_mutex mutex;
};

} // namespace DB
31 changes: 16 additions & 15 deletions dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,25 @@ bool WNDisaggSnapshotManager::unregisterSnapshotIfEmpty(const DisaggTaskId & tas

void WNDisaggSnapshotManager::clearExpiredSnapshots()
{
std::unique_lock lock(mtx);
Timepoint now = Clock::now();
for (auto iter = snapshots.begin(); iter != snapshots.end(); /*empty*/)
{
if (iter->second.expired_at < now)
{
LOG_INFO(
log,
"Remove expired Disaggregated Snapshot, task_id={} expired_at={:%Y-%m-%d %H:%M:%S}",
iter->first,
iter->second.expired_at);
iter = snapshots.erase(iter);
}
else
snapshots.withExclusive([&](auto & snapshots) {
for (auto iter = snapshots.begin(); iter != snapshots.end(); /*empty*/)
{
++iter;
if (iter->second.expired_at < now)
{
LOG_INFO(
log,
"Remove expired Disaggregated Snapshot, task_id={} expired_at={:%Y-%m-%d %H:%M:%S}",
iter->first,
iter->second.expired_at);
iter = snapshots.erase(iter);
}
else
{
++iter;
}
}
}
});
}

} // namespace DB::DM::Remote
46 changes: 23 additions & 23 deletions dbms/src/Storages/DeltaMerge/Remote/WNDisaggSnapshotManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <Common/Logger.h>
#include <Common/SharedMutexProtected.h>
#include <Common/nocopyable.h>
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
Expand All @@ -25,9 +26,6 @@
#include <common/types.h>
#include <fmt/chrono.h>

#include <memory>
#include <mutex>
#include <shared_mutex>

namespace DB::DM::Remote
{
Expand All @@ -54,21 +52,23 @@ class WNDisaggSnapshotManager
const DisaggReadSnapshotPtr & snap,
const Timepoint & expired_at)
{
std::unique_lock lock(mtx);
LOG_INFO(log, "Register Disaggregated Snapshot, task_id={}", task_id);
return snapshots.withExclusive([&](auto & snapshots) {
LOG_INFO(log, "Register Disaggregated Snapshot, task_id={}", task_id);

// Since EstablishDisagg may be retried, there may be existing snapshot.
// We replace these existing snapshot using a new one.
snapshots.insert_or_assign(task_id, SnapshotWithExpireTime{.snap = snap, .expired_at = expired_at});
return true;
// Since EstablishDisagg may be retried, there may be existing snapshot.
// We replace these existing snapshot using a new one.
snapshots[task_id] = SnapshotWithExpireTime{.snap = snap, .expired_at = expired_at};
return true;
});
}

DisaggReadSnapshotPtr getSnapshot(const DisaggTaskId & task_id) const
{
std::shared_lock read_lock(mtx);
if (auto iter = snapshots.find(task_id); iter != snapshots.end())
return iter->second.snap;
return nullptr;
return snapshots.withShared([&](auto & snapshots) {
if (auto iter = snapshots.find(task_id); iter != snapshots.end())
return iter->second.snap;
return DisaggReadSnapshotPtr{nullptr};
});
}

bool unregisterSnapshotIfEmpty(const DisaggTaskId & task_id);
Expand All @@ -78,21 +78,21 @@ class WNDisaggSnapshotManager
private:
bool unregisterSnapshot(const DisaggTaskId & task_id)
{
std::unique_lock lock(mtx);
if (auto iter = snapshots.find(task_id); iter != snapshots.end())
{
LOG_INFO(log, "Unregister Disaggregated Snapshot, task_id={}", task_id);
snapshots.erase(iter);
return true;
}
return false;
return snapshots.withExclusive([&](auto & snapshots) {
if (auto iter = snapshots.find(task_id); iter != snapshots.end())
{
LOG_INFO(log, "Unregister Disaggregated Snapshot, task_id={}", task_id);
snapshots.erase(iter);
return true;
}
return false;
});
}

void clearExpiredSnapshots();

private:
mutable std::shared_mutex mtx;
std::unordered_map<DisaggTaskId, SnapshotWithExpireTime> snapshots;
SharedMutexProtected<std::unordered_map<DisaggTaskId, SnapshotWithExpireTime>> snapshots;

BackgroundProcessingPool & pool;
BackgroundProcessingPool::TaskHandle handle;
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/Storages/DeltaMerge/RowKeyRange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/RedactHelpers.h>
#include <Common/SharedMutexProtected.h>
#include <Storages/DeltaMerge/RowKeyRange.h>

namespace DB::DM
Expand Down Expand Up @@ -59,9 +60,8 @@ RowKeyValue RowKeyValueRef::toRowKeyValue() const
}
}

std::unordered_map<KeyspaceTableID, RowKeyRange::TableRangeMinMax, boost::hash<KeyspaceTableID>>
SharedMutexProtected<std::unordered_map<KeyspaceTableID, RowKeyRange::TableRangeMinMax, boost::hash<KeyspaceTableID>>>
RowKeyRange::table_min_max_data;
std::shared_mutex RowKeyRange::table_mutex;

const RowKeyRange::TableRangeMinMax & RowKeyRange::getTableMinMaxData(
KeyspaceID keyspace_id,
Expand All @@ -70,12 +70,12 @@ const RowKeyRange::TableRangeMinMax & RowKeyRange::getTableMinMaxData(
{
auto keyspace_table_id = KeyspaceTableID{keyspace_id, table_id};
{
std::shared_lock lock(table_mutex);
if (auto it = table_min_max_data.find(keyspace_table_id); it != table_min_max_data.end())
auto lock = table_min_max_data.lockShared();
if (auto it = lock->find(keyspace_table_id); it != lock->end())
return it->second;
}
std::unique_lock lock(table_mutex);
return table_min_max_data.try_emplace(keyspace_table_id, keyspace_id, table_id, is_common_handle).first->second;
auto lock = table_min_max_data.lockExclusive();
return lock->try_emplace(keyspace_table_id, keyspace_id, table_id, is_common_handle).first->second;
}

template <bool enable_redact, bool right_open = true>
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/RowKeyRange.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#pragma once
#include <Columns/ColumnString.h>
#include <Common/SharedMutexProtected.h>
#include <Core/Types.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
Expand Down Expand Up @@ -447,8 +448,9 @@ struct RowKeyRange
};

/// maybe use a LRU cache in case there are massive tables
static std::unordered_map<KeyspaceTableID, TableRangeMinMax, boost::hash<KeyspaceTableID>> table_min_max_data;
static std::shared_mutex table_mutex;
static SharedMutexProtected<
std::unordered_map<KeyspaceTableID, RowKeyRange::TableRangeMinMax, boost::hash<KeyspaceTableID>>>
table_min_max_data;
static const TableRangeMinMax & getTableMinMaxData(KeyspaceID keyspace_id, TableID table_id, bool is_common_handle);

RowKeyRange(
Expand Down

0 comments on commit 1ee7da7

Please sign in to comment.