Skip to content

Commit

Permalink
Revert "Remove useless func"
Browse files Browse the repository at this point in the history
This reverts commit 682892a.
  • Loading branch information
JaySon-Huang committed Sep 26, 2024
1 parent b9bc77f commit 2b98e1f
Show file tree
Hide file tree
Showing 13 changed files with 46 additions and 197 deletions.
1 change: 1 addition & 0 deletions dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
|| (managed_storage->engineType() != ::TiDB::StorageEngine::DT
&& managed_storage->engineType() != ::TiDB::StorageEngine::TMT))
{
LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name);
storage = storage_tmp;
table_lock = storage->lockForShare(context.getCurrentQueryId());
return;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/BgStorageInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void doInitStores(Context & global_context, const LoggerPtr & log)
const auto & [ks_id, table_id] = ks_table_id;
try
{
init_cnt += storage->initStoreIfNeed(restore_segments_thread_pool) ? 1 : 0;
init_cnt += storage->initStoreIfDataDirExist(restore_segments_thread_pool) ? 1 : 0;
LOG_INFO(log, "Storage inited done, keyspace={} table_id={}", ks_id, table_id);
}
catch (...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ bool SSTFilesToDTFilesOutputStream<ChildStream>::newDTFileStream()
RUNTIME_CHECK(dt_stream == nullptr);

// The parent_path and file_id are generated by the storage.
auto [parent_path, file_id] = storage->getAndMaybeInitStore()->preAllocateIngestFile();
auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile();
if (parent_path.empty())
{
// Can not allocate path and id for storing DTFiles (the storage may be dropped / shutdown)
Expand Down Expand Up @@ -223,7 +223,7 @@ bool SSTFilesToDTFilesOutputStream<ChildStream>::finalizeDTFileStream()

// If remote data store is not enabled, add the DTFile to StoragePathPool so that we can restore it later
// Else just add it's size to disk delegator
storage->getAndMaybeInitStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);
storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written);

dt_stream.reset();

Expand Down Expand Up @@ -360,7 +360,7 @@ void SSTFilesToDTFilesOutputStream<ChildStream>::cancel()
try
{
// If DMFile has pre-ingested, remove it.
storage->getAndMaybeInitStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false);
storage->getStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false);
// Remove local DMFile.
file->remove(context.getFileProvider());
}
Expand Down
19 changes: 0 additions & 19 deletions dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,4 @@ LocalIndexInfosPtr generateLocalIndexInfos(
return new_index_infos;
}

bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger)
{
if (!isVectorIndexSupported(logger))
return false;

for (const auto & col : table_info.columns)
{
if (col.vector_index)
{
return true;
}
}
for (const auto & idx : table_info.index_infos)
{
if (idx.vector_index)
return true;
}
return false;
}
} // namespace DB::DM
2 changes: 0 additions & 2 deletions dbms/src/Storages/DeltaMerge/Index/IndexInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,4 @@ LocalIndexInfosPtr generateLocalIndexInfos(
const TiDB::TableInfo & new_table_info,
const LoggerPtr & logger);


bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger);
} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ try
EXPECT_EQ(col_value->getDataAt(i), String(DB::toString(num_rows_write)));
}
}
auto delta_store = storage->getAndMaybeInitStore();
auto delta_store = storage->getStore();
size_t total_segment_rows = 0;
auto segment_stats = delta_store->getSegmentsStats();
for (auto & stat : segment_stats)
Expand Down Expand Up @@ -308,7 +308,7 @@ try
auto sort_desc = storage->getPrimarySortDescription();
ASSERT_FALSE(storage->storeInited());

const auto & store = storage->getAndMaybeInitStore();
const auto & store = storage->getStore();
ASSERT_TRUE(storage->storeInited());
auto pk_type2 = store->getPKDataType();
auto sort_desc2 = store->getPrimarySortDescription();
Expand Down Expand Up @@ -826,11 +826,11 @@ try
{
write_data(num_rows_write, 1000);
num_rows_write += 1000;
if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1)
if (storage->getStore()->getSegmentsStats().size() > 1)
break;
}
{
ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1);
ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1);
ASSERT_EQ(read_data(), num_rows_write);
}
storage->flushCache(*ctx);
Expand All @@ -847,13 +847,13 @@ try
// write more data make sure segments more than 1
for (size_t i = 0; i < 100000; i++)
{
if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1)
if (storage->getStore()->getSegmentsStats().size() > 1)
break;
write_data(num_rows_write, 1000);
num_rows_write += 1000;
}
{
ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1);
ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1);
ASSERT_EQ(read_data(), num_rows_write);
}
storage->flushCache(*ctx);
Expand All @@ -873,7 +873,7 @@ try
// restore the table and make sure there is just one segment left
create_table();
{
ASSERT_EQ(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1);
ASSERT_EQ(storage->getStore()->getSegmentsStats().size(), 1);
ASSERT_LT(read_data(), num_rows_write);
}
storage->drop();
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/IManageableStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ class IManageableStorage : public IStorage
/// `limit` is the max number of segments to gc, return value is the number of segments gced
virtual UInt64 onSyncGc(Int64 /*limit*/, const DM::GCOptions &) { throw Exception("Unsupported"); }

/// Return true if the DeltaMergeStore instance need to be inited
virtual bool initStoreIfNeed(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); }
/// Return true is data dir exist
virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); }

virtual TiDB::StorageEngine engineType() const = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ CheckpointIngestInfoPtr CheckpointIngestInfo::restore(
if (storage && storage->engineType() == TiDB::StorageEngine::DT)
{
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
auto dm_context
= dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef());
auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef());
for (const auto & seg_persisted : ingest_info_persisted.segments())
{
ReadBufferFromString buf(seg_persisted.segment_meta());
Expand Down Expand Up @@ -198,8 +197,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region,
if (storage && storage->engineType() == TiDB::StorageEngine::DT)
{
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
auto dm_context
= dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef());
auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef());
for (const auto & segment_to_drop : segments)
{
DM::WriteBatches wbs(*dm_context->storage_pool, dm_context->getWriteLimiter());
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ try
{
if (ingest_using_split)
{
auto stats = storage->getAndMaybeInitStore()->getStoreStats();
auto stats = storage->getStore()->getStoreStats();
// Including 0..20, 20..100, 100..inf.
ASSERT_EQ(3, stats.segment_count);
}
Expand Down Expand Up @@ -1128,7 +1128,7 @@ try
}
}
{
auto stats = storage->getAndMaybeInitStore()->getStoreStats();
auto stats = storage->getStore()->getStoreStats();
ASSERT_NE(0, stats.total_stable_size_on_disk);
ASSERT_NE(0, stats.total_rows);
ASSERT_NE(0, stats.total_size);
Expand All @@ -1150,7 +1150,7 @@ try
auto gc_n = storage->onSyncGc(100, DM::GCOptions::newAllForTest());
ASSERT_EQ(0, gc_n);

auto stats = storage->getAndMaybeInitStore()->getStoreStats();
auto stats = storage->getStore()->getStoreStats();
// The data of [20, 100), is not reclaimed during Apply Snapshot.
if (ingest_using_split)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,7 @@ void assertNoSegment(
auto storage = storages.get(keyspace_id, table_id);
RUNTIME_CHECK(storage && storage->engineType() == TiDB::StorageEngine::DT);
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
auto dm_context
= dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef());
auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef());
for (const auto & seg_persisted : ingest_info_persisted.segments())
{
ReadBufferFromString buf(seg_persisted.segment_meta());
Expand Down Expand Up @@ -486,7 +485,7 @@ try
auto storage = global_context.getTMTContext().getStorages().get(keyspace_id, table_id);
ASSERT_TRUE(storage && storage->engineType() == TiDB::StorageEngine::DT);
auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
auto store = dm_storage->getAndMaybeInitStore();
auto store = dm_storage->getStore();
ASSERT_EQ(store->getRowKeyColumnSize(), 1);
verifyRows(
global_context,
Expand Down
69 changes: 18 additions & 51 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ StorageDeltaMerge::StorageDeltaMerge(
, global_context(global_context_.getGlobalContext())
, log(Logger::get(fmt::format("{}.{}", db_name_, table_name_)))
{
RUNTIME_CHECK_MSG(!primary_expr_ast_->children.empty(), "No primary key, ident={}", log->identifier());
if (primary_expr_ast_->children.empty())
throw Exception("No primary key");

// save schema from TiDB
if (likely(table_info_))
if (table_info_)
{
tidb_table_info = table_info_->get();
is_common_handle = tidb_table_info.is_common_handle;
Expand All @@ -112,11 +113,6 @@ StorageDeltaMerge::StorageDeltaMerge(
table_column_info = std::make_unique<TableColumnInfo>(db_name_, table_name_, primary_expr_ast_);

updateTableColumnInfo();

if (table_info_ && containsLocalIndexInfos(table_info_.value(), log))
{
getAndMaybeInitStore();
}
}

void StorageDeltaMerge::updateTableColumnInfo()
Expand Down Expand Up @@ -1398,34 +1394,16 @@ void StorageDeltaMerge::alterSchemaChange(
LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize());

{
// In order to avoid concurrent issue between init store and DDL,
// we must acquire the lock before schema changes is applied.
std::lock_guard lock(store_mutex);
std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL.
if (storeInited())
{
_store->applySchemaChanges(table_info);
}
else
else // it seems we will never come into this branch ?
{
if (containsLocalIndexInfos(table_info, log))
{
// If there exist vector index, then we must init the store to create
// at least 1 segment. So that tidb can detect the index is added.
initStore(lock);
_store->applySchemaChanges(table_info);
}
else
{
// If there is no data need to be stored for this table, the _store instance
// is not inited to reduce fragmentation files that may exhaust the inode of
// disk.
// Under this case, we update some in-memory variables to ensure the correctness.
updateTableColumnInfo();
}
updateTableColumnInfo();
}
}

// Should generate new decoding snapshot and cache block
decoding_schema_changed = true;

SortDescription pk_desc = getPrimarySortDescription();
Expand Down Expand Up @@ -1824,9 +1802,14 @@ SortDescription StorageDeltaMerge::getPrimarySortDescription() const
return desc;
}

DeltaMergeStorePtr & StorageDeltaMerge::initStore(const std::lock_guard<std::mutex> &, ThreadPool * thread_pool)
DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool)
{
if (likely(_store == nullptr))
if (storeInited())
{
return _store;
}
std::lock_guard lock(store_mutex);
if (_store == nullptr)
{
auto index_infos = initLocalIndexInfos(tidb_table_info, log);
_store = DeltaMergeStore::create(
Expand All @@ -1851,20 +1834,8 @@ DeltaMergeStorePtr & StorageDeltaMerge::initStore(const std::lock_guard<std::mut
return _store;
}

DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool)
{
if (storeInited())
{
return _store;
}
std::lock_guard lock(store_mutex);
return initStore(lock, thread_pool);
}

bool StorageDeltaMerge::initStoreIfNeed(ThreadPool * thread_pool)
bool StorageDeltaMerge::initStoreIfDataDirExist(ThreadPool * thread_pool)
{
// If the table is tombstone, then wait for it exceeds the gc_safepoint and
// get physically drop without initing the instance.
if (shutdown_called.load(std::memory_order_relaxed) || isTombstone())
{
return false;
Expand All @@ -1874,16 +1845,12 @@ bool StorageDeltaMerge::initStoreIfNeed(ThreadPool * thread_pool)
{
return true;
}
if (dataDirExist() || containsLocalIndexInfos(tidb_table_info, log))
if (!dataDirExist())
{
// - there exist some data stored on disk
// - there exist tiflash local index
// We need to init the DeltaMergeStore instance for reporting the disk usage, local index
// status, etc.
getAndMaybeInitStore(thread_pool);
return true;
return false;
}
return false;
getAndMaybeInitStore(thread_pool);
return true;
}

bool StorageDeltaMerge::dataDirExist()
Expand Down
15 changes: 5 additions & 10 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,9 @@ class StorageDeltaMerge
void checkStatus(const Context & context) override;
void deleteRows(const Context &, size_t rows) override;

// Return the DeltaMergeStore instance if it has been inited
// Else return nullptr
DM::DeltaMergeStorePtr getStoreIfInited() const;

// Return the DeltaMergeStore instance
// If the instance is not inited, this method will initialize the instance
// and return it.
DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr);
const DM::DeltaMergeStorePtr & getStore() { return getAndMaybeInitStore(); }

bool initStoreIfNeed(ThreadPool * thread_pool) override;
DM::DeltaMergeStorePtr getStoreIfInited() const;

bool isCommonHandle() const override { return is_common_handle; }

Expand All @@ -213,6 +206,8 @@ class StorageDeltaMerge

void releaseDecodingBlock(Int64 block_decoding_schema_epoch, BlockUPtr block) override;

bool initStoreIfDataDirExist(ThreadPool * thread_pool) override;

DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); }

#ifndef DBMS_PUBLIC_GTEST
Expand Down Expand Up @@ -243,7 +238,7 @@ class StorageDeltaMerge

DataTypePtr getPKTypeImpl() const override;

DM::DeltaMergeStorePtr & initStore(const std::lock_guard<std::mutex> &, ThreadPool * thread_pool = nullptr);
DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr);
bool storeInited() const { return store_inited.load(std::memory_order_acquire); }
void updateTableColumnInfo();
ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info);
Expand Down
Loading

0 comments on commit 2b98e1f

Please sign in to comment.