Skip to content

Commit

Permalink
ddl: Atomic rename DeltaMergeStore::db_name in memory (#9246)
Browse files Browse the repository at this point in the history
ref #9233

Make renaming `DeltaMergeStore::db_name` to be atomic
  • Loading branch information
JaySon-Huang authored Jul 23, 2024
1 parent 1a0a7e1 commit 4049a8c
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 66 deletions.
25 changes: 7 additions & 18 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,23 +266,12 @@ void injectFailPointForLocalRead([[maybe_unused]] const SelectQueryInfo & query_
});
}

String genErrMsgForLocalRead(
const ManageableStoragePtr & storage,
const KeyspaceID keyspace_id,
const TableID & table_id,
const TableID & logical_table_id)
String genErrMsgForLocalRead(const KeyspaceID keyspace_id, const TableID & table_id, const TableID & logical_table_id)
{
return table_id == logical_table_id
? fmt::format(
"(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={})",
storage->getDatabaseName(),
storage->getTableName(),
keyspace_id,
table_id)
? fmt::format("(while creating read sources from storage, keyspace_id={} table_id={})", keyspace_id, table_id)
: fmt::format(
"(while creating read sources from storage `{}`.`{}`, keyspace_id={} table_id={} logical_table_id={})",
storage->getDatabaseName(),
storage->getTableName(),
"(while creating read sources from storage, keyspace_id={} table_id={} logical_table_id={})",
keyspace_id,
table_id,
logical_table_id);
Expand Down Expand Up @@ -1084,14 +1073,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
else
{
// Throw an exception for TiDB / TiSpark to retry
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
catch (DB::Exception & e)
{
/// Other unknown exceptions
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
Expand Down Expand Up @@ -1167,14 +1156,14 @@ DM::Remote::DisaggPhysicalTableReadSnapshotPtr DAGStorageInterpreter::buildLocal
else
{
// Throw an exception for TiDB / TiSpark to retry
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
catch (DB::Exception & e)
{
/// Other unknown exceptions
e.addMessage(genErrMsgForLocalRead(storage, keyspace_id, table_id, logical_table_id));
e.addMessage(genErrMsgForLocalRead(keyspace_id, table_id, logical_table_id));
throw;
}
}
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,6 @@ DeltaMergeStore::DeltaMergeStore(
, path_pool(std::make_shared<StoragePathPool>(
global_context.getPathPool().withTable(db_name_, table_name_, data_path_contains_database_name)))
, settings(settings_)
, db_name(db_name_)
, table_name(table_name_)
, keyspace_id(keyspace_id_)
, physical_table_id(physical_table_id_)
, is_common_handle(is_common_handle_)
Expand All @@ -232,6 +230,12 @@ DeltaMergeStore::DeltaMergeStore(
, next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY)
, log(Logger::get(fmt::format("keyspace={} table_id={}", keyspace_id_, physical_table_id_)))
{
{
auto meta = table_meta.lockExclusive();
meta->db_name = db_name_;
meta->table_name = table_name_;
}

replica_exist.store(has_replica);
// for mock test, table_id_ should be DB::InvalidTableID
NamespaceID ns_id = physical_table_id == DB::InvalidTableID ? TEST_NAMESPACE_ID : physical_table_id;
Expand Down Expand Up @@ -325,9 +329,9 @@ void DeltaMergeStore::rename(String /*new_path*/, String new_database_name, Stri
{
path_pool->rename(new_database_name, new_table_name);

// TODO: replacing these two variables is not atomic, but could be good enough?
table_name.swap(new_table_name);
db_name.swap(new_database_name);
auto meta = table_meta.lockExclusive();
meta->table_name.swap(new_table_name);
meta->db_name.swap(new_database_name);
}

void DeltaMergeStore::dropAllSegments(bool keep_first_segment)
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <Storages/KVStore/Decode/DecodingStorageSchemaSnapshot.h>
#include <Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.h>
#include <Storages/Page/PageStorage_fwd.h>
#include <Storages/TableNameMeta.h>
#include <TiDB/Schema/TiDB.h>

#include <queue>
Expand Down Expand Up @@ -277,8 +278,12 @@ class DeltaMergeStore : private boost::noncopyable

void setUpBackgroundTask(const DMContextPtr & dm_context);

const String & getDatabaseName() const { return db_name; }
const String & getTableName() const { return table_name; }
TableNameMeta getTableMeta() const
{
auto meta = table_meta.lockShared();
return TableNameMeta{meta->db_name, meta->table_name};
}
String getIdent() const { return fmt::format("keyspace={} table_id={}", keyspace_id, physical_table_id); }

void rename(String new_path, String new_database_name, String new_table_name);

Expand Down Expand Up @@ -796,8 +801,7 @@ class DeltaMergeStore : private boost::noncopyable
Settings settings;
StoragePoolPtr storage_pool;

String db_name;
String table_name;
SharedMutexProtected<TableNameMeta> table_meta;

const KeyspaceID keyspace_id;
const TableID physical_table_id;
Expand Down Expand Up @@ -838,7 +842,7 @@ class DeltaMergeStore : private boost::noncopyable
mutable std::shared_mutex read_write_mutex;

LoggerPtr log;
}; // namespace DM
};

using DeltaMergeStorePtr = std::shared_ptr<DeltaMergeStore>;

Expand Down
45 changes: 15 additions & 30 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -700,11 +700,7 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c
auto segment_bytes = segment->getEstimatedBytes();
if (segment_rows >= dm_context->segment_limit_rows || segment_bytes >= dm_context->segment_limit_bytes)
{
LOG_TRACE(
log,
"GC - Merge skipped because current segment is not small, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Merge skipped because current segment is not small, segment={}", segment->simpleInfo());
return {};
}

Expand All @@ -713,13 +709,12 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMerge(const DMContextPtr & dm_context, c
{
LOG_TRACE(
log,
"GC - Merge skipped because cannot find adjacent segments to merge, segment={} table={}",
segment->simpleInfo(),
table_name);
"GC - Merge skipped because cannot find adjacent segments to merge, segment={}",
segment->simpleInfo());
return {};
}

LOG_INFO(log, "GC - Trigger Merge, segment={} table={}", segment->simpleInfo(), table_name);
LOG_INFO(log, "GC - Trigger Merge, segment={}", segment->simpleInfo());
auto new_segment = segmentMerge(*dm_context, segments_to_merge, SegmentMergeReason::BackgroundGCThread);
if (new_segment)
{
Expand All @@ -743,23 +738,15 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(
// The segment we just retrieved may be dropped from the map. Let's verify it again before creating a snapshot.
if (!isSegmentValid(lock, segment))
{
LOG_TRACE(
log,
"GC - Skip checking MergeDelta because not valid, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Skip checking MergeDelta because not valid, segment={}", segment->simpleInfo());
return {};
}

segment_snap
= segment->createSnapshot(*dm_context, /* for_update */ true, CurrentMetrics::DT_SnapshotOfDeltaMerge);
if (!segment_snap)
{
LOG_TRACE(
log,
"GC - Skip checking MergeDelta because snapshot failed, segment={} table={}",
segment->simpleInfo(),
table_name);
LOG_TRACE(log, "GC - Skip checking MergeDelta because snapshot failed, segment={}", segment->simpleInfo());
return {};
}
}
Expand Down Expand Up @@ -828,26 +815,24 @@ SegmentPtr DeltaMergeStore::gcTrySegmentMergeDelta(

if (!should_compact)
{
LOG_TRACE(log, "GC - MergeDelta skipped, segment={} table={}", segment->simpleInfo(), table_name);
LOG_TRACE(log, "GC - MergeDelta skipped, segment={}", segment->simpleInfo());
return {};
}

LOG_INFO(
log,
"GC - Trigger MergeDelta, compact_reason={} segment={} table={}",
"GC - Trigger MergeDelta, compact_reason={} segment={}",
GC::toString(compact_reason),
segment->simpleInfo(),
table_name);
segment->simpleInfo());
auto new_segment = segmentMergeDelta(*dm_context, segment, MergeDeltaReason::BackgroundGCThread, segment_snap);

if (!new_segment)
{
LOG_DEBUG(
log,
"GC - MergeDelta aborted, compact_reason={} segment={} table={}",
"GC - MergeDelta aborted, compact_reason={} segment={}",
GC::toString(compact_reason),
segment->simpleInfo(),
table_name);
segment->simpleInfo());
return {};
}

Expand Down Expand Up @@ -877,8 +862,7 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)
DB::Timestamp gc_safe_point = latest_gc_safe_point.load(std::memory_order_acquire);
LOG_TRACE(
log,
"GC on table start, table={} check_key={} options={} gc_safe_point={} max_gc_limit={}",
table_name,
"GC on table start, check_key={} options={} gc_safe_point={} max_gc_limit={}",
next_gc_check_key.toDebugString(),
gc_options.toString(),
gc_safe_point,
Expand Down Expand Up @@ -937,15 +921,16 @@ UInt64 DeltaMergeStore::onSyncGc(Int64 limit, const GCOptions & gc_options)

if (!new_seg)
{
LOG_TRACE(log, "GC - Skipped segment, segment={} table={}", segment->simpleInfo(), table_name);
LOG_TRACE(log, "GC - Skipped segment, segment={}", segment->simpleInfo());
continue;
}

gc_segments_num++;
}
catch (Exception & e)
{
e.addMessage(fmt::format("Error while GC segment, segment={} table={}", segment->info(), table_name));
e.addMessage(
fmt::format("Error while GC segment, segment={} log_ident={}", segment->info(), log->identifier()));
e.rethrow();
}
}
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class IManageableStorage : public IStorage
/// Return true is data dir exist
virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); }

virtual ::TiDB::StorageEngine engineType() const = 0;
virtual TiDB::StorageEngine engineType() const = 0;

virtual String getDatabaseName() const = 0;

Expand Down Expand Up @@ -186,7 +186,7 @@ class IManageableStorage : public IStorage
throw Exception(
"Method getDecodingSchemaSnapshot is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
};
}

/// The `block_decoding_schema_epoch` is just an internal version for `DecodingStorageSchemaSnapshot`,
/// And it has no relation with the table schema version.
Expand Down
11 changes: 5 additions & 6 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,7 @@ class DMBlockOutputStream : public IBlockOutputStream
}
catch (DB::Exception & e)
{
e.addMessage(
fmt::format("(while writing to table `{}`.`{}`)", store->getDatabaseName(), store->getTableName()));
e.addMessage(fmt::format("(while writing to table `{}`)", store->getIdent()));
throw;
}

Expand Down Expand Up @@ -1457,12 +1456,12 @@ String StorageDeltaMerge::getTableName() const
{
if (storeInited())
{
return _store->getTableName();
return _store->getTableMeta().table_name;
}
std::lock_guard lock(store_mutex);
if (storeInited())
{
return _store->getTableName();
return _store->getTableMeta().table_name;
}
return table_column_info->table_name;
}
Expand All @@ -1471,12 +1470,12 @@ String StorageDeltaMerge::getDatabaseName() const
{
if (storeInited())
{
return _store->getDatabaseName();
return _store->getTableMeta().db_name;
}
std::lock_guard lock(store_mutex);
if (storeInited())
{
return _store->getDatabaseName();
return _store->getTableMeta().db_name;
}
return table_column_info->db_name;
}
Expand Down
28 changes: 28 additions & 0 deletions dbms/src/Storages/TableNameMeta.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <common/types.h>

namespace DB
{

struct TableNameMeta
{
String db_name;
String table_name;
};

} // namespace DB

0 comments on commit 4049a8c

Please sign in to comment.