From 2f4a62bdee5efd2281edee773abe3f7c424e2d51 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Wed, 25 Sep 2024 16:55:09 +0800 Subject: [PATCH 1/7] Remove useless func --- .../Interpreters/InterpreterSelectQuery.cpp | 1 - dbms/src/Server/BgStorageInit.cpp | 2 +- .../Decode/SSTFilesToDTFilesOutputStream.cpp | 6 +- .../DeltaMerge/Index/LocalIndexInfo.cpp | 19 ++++ .../DeltaMerge/Index/LocalIndexInfo.h | 2 + .../tests/gtest_dm_storage_delta_merge.cpp | 14 +-- dbms/src/Storages/IManageableStorage.h | 4 +- .../MultiRaft/Disagg/CheckpointIngestInfo.cpp | 6 +- .../Storages/KVStore/tests/gtest_kvstore.cpp | 6 +- .../tests/gtest_kvstore_fast_add_peer.cpp | 5 +- dbms/src/Storages/StorageDeltaMerge.cpp | 69 ++++++++++---- dbms/src/Storages/StorageDeltaMerge.h | 15 ++- .../TiDB/Schema/tests/gtest_schema_sync.cpp | 94 ++++++++++++++++++- 13 files changed, 197 insertions(+), 46 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index bf32782f770..490670bf66b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -243,7 +243,6 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d || (managed_storage->engineType() != ::TiDB::StorageEngine::DT && managed_storage->engineType() != ::TiDB::StorageEngine::TMT)) { - LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name); storage = storage_tmp; table_lock = storage->lockForShare(context.getCurrentQueryId()); return; diff --git a/dbms/src/Server/BgStorageInit.cpp b/dbms/src/Server/BgStorageInit.cpp index 5c53db7083e..6fbb2060348 100644 --- a/dbms/src/Server/BgStorageInit.cpp +++ b/dbms/src/Server/BgStorageInit.cpp @@ -53,7 +53,7 @@ void doInitStores(Context & global_context, const LoggerPtr & log) const auto & [ks_id, table_id] = ks_table_id; try { - init_cnt += storage->initStoreIfDataDirExist(restore_segments_thread_pool) ? 1 : 0; + init_cnt += storage->initStoreIfNeed(restore_segments_thread_pool) ? 1 : 0; LOG_INFO(log, "Storage inited done, keyspace={} table_id={}", ks_id, table_id); } catch (...) diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp index 3a4a47975a7..72543d32c16 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp @@ -175,7 +175,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() RUNTIME_CHECK(dt_stream == nullptr); // The parent_path and file_id are generated by the storage. - auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile(); + auto [parent_path, file_id] = storage->getAndMaybeInitStore()->preAllocateIngestFile(); if (parent_path.empty()) { // Can not allocate path and id for storing DTFiles (the storage may be dropped / shutdown) @@ -223,7 +223,7 @@ bool SSTFilesToDTFilesOutputStream::finalizeDTFileStream() // If remote data store is not enabled, add the DTFile to StoragePathPool so that we can restore it later // Else just add it's size to disk delegator - storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); + storage->getAndMaybeInitStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); dt_stream.reset(); @@ -360,7 +360,7 @@ void SSTFilesToDTFilesOutputStream::cancel() try { // If DMFile has pre-ingested, remove it. - storage->getStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false); + storage->getAndMaybeInitStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false); // Remove local DMFile. file->remove(context.getFileProvider()); } diff --git a/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp index fe527ac7ad2..1d2911dbcea 100644 --- a/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp @@ -314,4 +314,23 @@ LocalIndexInfosChangeset generateLocalIndexInfos( }; } +bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger) +{ + if (!isVectorIndexSupported(logger)) + return false; + + for (const auto & col : table_info.columns) + { + if (col.vector_index) + { + return true; + } + } + for (const auto & idx : table_info.index_infos) + { + if (idx.vector_index) + return true; + } + return false; +} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h index 68f3fd82111..055503bc39d 100644 --- a/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h +++ b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h @@ -68,4 +68,6 @@ LocalIndexInfosChangeset generateLocalIndexInfos( const TiDB::TableInfo & new_table_info, const LoggerPtr & logger); + +bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger); } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 01d55d503b9..8860110ee1c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -162,7 +162,7 @@ try EXPECT_EQ(col_value->getDataAt(i), String(DB::toString(num_rows_write))); } } - auto delta_store = storage->getStore(); + auto delta_store = storage->getAndMaybeInitStore(); size_t total_segment_rows = 0; auto segment_stats = delta_store->getSegmentsStats(); for (auto & stat : segment_stats) @@ -308,7 +308,7 @@ try auto sort_desc = storage->getPrimarySortDescription(); ASSERT_FALSE(storage->storeInited()); - const auto & store = storage->getStore(); + const auto & store = storage->getAndMaybeInitStore(); ASSERT_TRUE(storage->storeInited()); auto pk_type2 = store->getPKDataType(); auto sort_desc2 = store->getPrimarySortDescription(); @@ -826,11 +826,11 @@ try { write_data(num_rows_write, 1000); num_rows_write += 1000; - if (storage->getStore()->getSegmentsStats().size() > 1) + if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1) break; } { - ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); + ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(*ctx); @@ -847,13 +847,13 @@ try // write more data make sure segments more than 1 for (size_t i = 0; i < 100000; i++) { - if (storage->getStore()->getSegmentsStats().size() > 1) + if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1) break; write_data(num_rows_write, 1000); num_rows_write += 1000; } { - ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); + ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(*ctx); @@ -873,7 +873,7 @@ try // restore the table and make sure there is just one segment left create_table(); { - ASSERT_EQ(storage->getStore()->getSegmentsStats().size(), 1); + ASSERT_EQ(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); ASSERT_LT(read_data(), num_rows_write); } storage->drop(); diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 27d761f064c..343ecdee2ea 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -90,8 +90,8 @@ class IManageableStorage : public IStorage /// `limit` is the max number of segments to gc, return value is the number of segments gced virtual UInt64 onSyncGc(Int64 /*limit*/, const DM::GCOptions &) { throw Exception("Unsupported"); } - /// Return true is data dir exist - virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } + /// Return true if the DeltaMergeStore instance need to be inited + virtual bool initStoreIfNeed(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } virtual TiDB::StorageEngine engineType() const = 0; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp index be220967915..cb1ef198828 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp @@ -80,7 +80,8 @@ CheckpointIngestInfoPtr CheckpointIngestInfo::restore( if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context + = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & seg_persisted : ingest_info_persisted.segments()) { ReadBufferFromString buf(seg_persisted.segment_meta()); @@ -197,7 +198,8 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context + = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & segment_to_drop : segments) { DM::WriteBatches wbs(*dm_context->storage_pool, dm_context->getWriteLimiter()); diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index 7a70e673fad..4508c7af3c2 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -1085,7 +1085,7 @@ try { if (ingest_using_split) { - auto stats = storage->getStore()->getStoreStats(); + auto stats = storage->getAndMaybeInitStore()->getStoreStats(); // Including 0..20, 20..100, 100..inf. ASSERT_EQ(3, stats.segment_count); } @@ -1128,7 +1128,7 @@ try } } { - auto stats = storage->getStore()->getStoreStats(); + auto stats = storage->getAndMaybeInitStore()->getStoreStats(); ASSERT_NE(0, stats.total_stable_size_on_disk); ASSERT_NE(0, stats.total_rows); ASSERT_NE(0, stats.total_size); @@ -1150,7 +1150,7 @@ try auto gc_n = storage->onSyncGc(100, DM::GCOptions::newAllForTest()); ASSERT_EQ(0, gc_n); - auto stats = storage->getStore()->getStoreStats(); + auto stats = storage->getAndMaybeInitStore()->getStoreStats(); // The data of [20, 100), is not reclaimed during Apply Snapshot. if (ingest_using_split) { diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 15e2373a34f..87626e2e72d 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -246,7 +246,8 @@ void assertNoSegment( auto storage = storages.get(keyspace_id, table_id); RUNTIME_CHECK(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context + = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & seg_persisted : ingest_info_persisted.segments()) { ReadBufferFromString buf(seg_persisted.segment_meta()); @@ -485,7 +486,7 @@ try auto storage = global_context.getTMTContext().getStorages().get(keyspace_id, table_id); ASSERT_TRUE(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); - auto store = dm_storage->getStore(); + auto store = dm_storage->getAndMaybeInitStore(); ASSERT_EQ(store->getRowKeyColumnSize(), 1); verifyRows( global_context, diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 30a86d897c3..39b2af08509 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -93,11 +93,10 @@ StorageDeltaMerge::StorageDeltaMerge( , global_context(global_context_.getGlobalContext()) , log(Logger::get(fmt::format("{}.{}", db_name_, table_name_))) { - if (primary_expr_ast_->children.empty()) - throw Exception("No primary key"); + RUNTIME_CHECK_MSG(!primary_expr_ast_->children.empty(), "No primary key, ident={}", log->identifier()); // save schema from TiDB - if (table_info_) + if (likely(table_info_)) { tidb_table_info = table_info_->get(); is_common_handle = tidb_table_info.is_common_handle; @@ -113,6 +112,11 @@ StorageDeltaMerge::StorageDeltaMerge( table_column_info = std::make_unique(db_name_, table_name_, primary_expr_ast_); updateTableColumnInfo(); + + if (table_info_ && containsLocalIndexInfos(table_info_.value(), log)) + { + getAndMaybeInitStore(); + } } void StorageDeltaMerge::updateTableColumnInfo() @@ -1394,16 +1398,34 @@ void StorageDeltaMerge::alterSchemaChange( LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize()); { - std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL. + // In order to avoid concurrent issue between init store and DDL, + // we must acquire the lock before schema changes is applied. + std::lock_guard lock(store_mutex); if (storeInited()) { _store->applySchemaChanges(table_info); } - else // it seems we will never come into this branch ? + else { - updateTableColumnInfo(); + if (containsLocalIndexInfos(table_info, log)) + { + // If there exist vector index, then we must init the store to create + // at least 1 segment. So that tidb can detect the index is added. + initStore(lock); + _store->applySchemaChanges(table_info); + } + else + { + // If there is no data need to be stored for this table, the _store instance + // is not inited to reduce fragmentation files that may exhaust the inode of + // disk. + // Under this case, we update some in-memory variables to ensure the correctness. + updateTableColumnInfo(); + } } } + + // Should generate new decoding snapshot and cache block decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); @@ -1802,14 +1824,9 @@ SortDescription StorageDeltaMerge::getPrimarySortDescription() const return desc; } -DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool) +DeltaMergeStorePtr & StorageDeltaMerge::initStore(const std::lock_guard &, ThreadPool * thread_pool) { - if (storeInited()) - { - return _store; - } - std::lock_guard lock(store_mutex); - if (_store == nullptr) + if (likely(_store == nullptr)) { auto index_infos = initLocalIndexInfos(tidb_table_info, log); _store = DeltaMergeStore::create( @@ -1834,8 +1851,20 @@ DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread return _store; } -bool StorageDeltaMerge::initStoreIfDataDirExist(ThreadPool * thread_pool) +DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool) +{ + if (storeInited()) + { + return _store; + } + std::lock_guard lock(store_mutex); + return initStore(lock, thread_pool); +} + +bool StorageDeltaMerge::initStoreIfNeed(ThreadPool * thread_pool) { + // If the table is tombstone, then wait for it exceeds the gc_safepoint and + // get physically drop without initing the instance. if (shutdown_called.load(std::memory_order_relaxed) || isTombstone()) { return false; @@ -1845,12 +1874,16 @@ bool StorageDeltaMerge::initStoreIfDataDirExist(ThreadPool * thread_pool) { return true; } - if (!dataDirExist()) + if (dataDirExist() || containsLocalIndexInfos(tidb_table_info, log)) { - return false; + // - there exist some data stored on disk + // - there exist tiflash local index + // We need to init the DeltaMergeStore instance for reporting the disk usage, local index + // status, etc. + getAndMaybeInitStore(thread_pool); + return true; } - getAndMaybeInitStore(thread_pool); - return true; + return false; } bool StorageDeltaMerge::dataDirExist() diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 4d9813a4c44..28f87200452 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -191,10 +191,17 @@ class StorageDeltaMerge void checkStatus(const Context & context) override; void deleteRows(const Context &, size_t rows) override; - const DM::DeltaMergeStorePtr & getStore() { return getAndMaybeInitStore(); } - + // Return the DeltaMergeStore instance if it has been inited + // Else return nullptr DM::DeltaMergeStorePtr getStoreIfInited() const; + // Return the DeltaMergeStore instance + // If the instance is not inited, this method will initialize the instance + // and return it. + DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr); + + bool initStoreIfNeed(ThreadPool * thread_pool) override; + bool isCommonHandle() const override { return is_common_handle; } size_t getRowKeyColumnSize() const override { return rowkey_column_size; } @@ -206,8 +213,6 @@ class StorageDeltaMerge void releaseDecodingBlock(Int64 block_decoding_schema_epoch, BlockUPtr block) override; - bool initStoreIfDataDirExist(ThreadPool * thread_pool) override; - DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); } #ifndef DBMS_PUBLIC_GTEST @@ -238,7 +243,7 @@ class StorageDeltaMerge DataTypePtr getPKTypeImpl() const override; - DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr); + DM::DeltaMergeStorePtr & initStore(const std::lock_guard &, ThreadPool * thread_pool = nullptr); bool storeInited() const { return store_inited.load(std::memory_order_acquire); } void updateTableColumnInfo(); ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info); diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index eb41dd6e2af..9a531f4578a 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -796,7 +796,7 @@ try DM::tests::DeltaMergeStoreVectorBase dmsv; StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); - dmsv.store = storage->getStore(); + dmsv.store = storage->getAndMaybeInitStore(); dmsv.db_context = std::make_shared(global_ctx.getGlobalContext()); dmsv.vec_column_name = cols.getAllPhysical().back().name; dmsv.vec_column_id = mustGetSyncedTable(t1_id)->getTableInfo().getColumnID(dmsv.vec_column_name); @@ -818,7 +818,6 @@ try // sync table schema, the VectorIndex in TableInfo should be updated refreshTableSchema(t1_id); auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); - tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); idx_infos = tbl_info.index_infos; ASSERT_EQ(idx_infos.size(), 1); for (const auto & idx : idx_infos) @@ -904,4 +903,95 @@ try } CATCH + +TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase1) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + refreshSchema(); + + // The `StorageDeltaMerge` is created but `DeltaMergeStore` is not inited + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + ASSERT_EQ(nullptr,storage->getStoreIfInited()); + + // add a vector index + IndexID idx_id = 11; + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // sync table schema, the VectorIndex in TableInfo should be updated + refreshTableSchema(t1_id); + // The `DeltaMergeStore` instanced should be inited + ASSERT_NE(nullptr,storage->getStoreIfInited()); + + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + auto idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } + +} +CATCH + +TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase2) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + // The table is created and vector index is added. After that, the table info is synced to TiFlash + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + IndexID idx_id = 11; + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // Synced with mock tidb, and create the StorageDeltaMerge instance + refreshTableSchema(t1_id); + // The `DeltaMergeStore` instanced should be inited + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + ASSERT_NE(nullptr, storage->getStoreIfInited()); + + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + auto idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } +} +CATCH + } // namespace DB::tests From f3b2358c7bbd70054f7aefb5d86aa3606ccf1ebf Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2024 13:31:55 +0800 Subject: [PATCH 2/7] Add fullstack test case --- tests/fullstack-test2/vector/vector-index.test | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/fullstack-test2/vector/vector-index.test b/tests/fullstack-test2/vector/vector-index.test index a151ea2a668..77fd1431cbc 100644 --- a/tests/fullstack-test2/vector/vector-index.test +++ b/tests/fullstack-test2/vector/vector-index.test @@ -16,6 +16,15 @@ # Preparation. mysql> drop table if exists test.t; +# Build vector index on empty table, it should return quickly +mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_l2 ((VEC_L2_DISTANCE(v))) USING HNSW; +mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_cos ((VEC_COSINE_DISTANCE(v))) USING HNSW; +mysql> drop table if exists test.t; + +# Build vector index on table with data on the stable layer mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL); mysql> INSERT INTO test.t VALUES ('[8.7, 5.7, 7.7, 9.8, 1.5]'),('[3.6, 9.7, 2.4, 6.6, 4.9]'),('[4.7, 4.9, 2.6, 5.2, 7.4]'),('[7.7, 6.7, 8.3, 7.8, 5.7]'),('[1.4, 4.5, 8.5, 7.7, 6.2]'); mysql> alter table test.t set tiflash replica 1; From 5bbc7830b99c78634b444ed7b5c23d7b2674b8ab Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2024 15:42:07 +0800 Subject: [PATCH 3/7] Generate index stats from TableInfo for non-inited store --- .../System/StorageSystemDTLocalIndexes.cpp | 48 +++++++++++++++---- 1 file changed, 40 insertions(+), 8 deletions(-) diff --git a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp index 6da97c76aca..45e6cda1d8f 100644 --- a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp +++ b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp @@ -21,11 +21,14 @@ #include #include #include +#include +#include #include #include #include #include #include +#include namespace DB { @@ -58,6 +61,39 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam })); } +DM::LocalIndexesStats generateLocalIndexesStatsFromTableInfo(const TiDB::TableInfo & table_info) +{ + auto local_index_infos = DM::generateLocalIndexInfos(nullptr, table_info, Logger::get()); + + DM::LocalIndexesStats stats; + if (!local_index_infos) + return stats; + + for (const auto & index_info : *local_index_infos) + { + DM::LocalIndexStats index_stats; + index_stats.column_id = index_info.column_id; + index_stats.index_id = index_info.index_id; + index_stats.column_name = index_info.column_name; + index_stats.index_kind = "HNSW"; + stats.emplace_back(std::move(index_stats)); + } + return stats; +} + +std::optional getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage) +{ + if (dm_storage->isTombstone()) + return std::nullopt; + + const auto & table_info = dm_storage->getTableInfo(); + auto store = dm_storage->getStoreIfInited(); + if (!store) + return generateLocalIndexesStatsFromTableInfo(table_info); + + return store->getLocalIndexStats(); +} + BlockInputStreams StorageSystemDTLocalIndexes::read( const Names & column_names, const SelectQueryInfo &, @@ -90,16 +126,12 @@ BlockInputStreams StorageSystemDTLocalIndexes::read( auto dm_storage = std::dynamic_pointer_cast(storage); const auto & table_info = dm_storage->getTableInfo(); - auto table_id = table_info.id; - auto store = dm_storage->getStoreIfInited(); - if (!store) - continue; + const auto table_id = table_info.id; - if (dm_storage->isTombstone()) + const auto index_stats = getLocalIndexesStatsFromStorage(dm_storage); + if (!index_stats) continue; - - auto index_stats = store->getLocalIndexStats(); - for (auto & stat : index_stats) + for (const auto & stat : *index_stats) { size_t j = 0; res_columns[j++]->insert(database_name); From eec731ea034877cf788de10442d200f65089d9af Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2024 15:45:42 +0800 Subject: [PATCH 4/7] Revert "Remove useless func" This reverts commit 682892adc09727fc48797aeacc3c40631622b397. --- .../Interpreters/InterpreterSelectQuery.cpp | 1 + dbms/src/Server/BgStorageInit.cpp | 2 +- .../Decode/SSTFilesToDTFilesOutputStream.cpp | 6 +- .../DeltaMerge/Index/LocalIndexInfo.cpp | 19 ---- .../DeltaMerge/Index/LocalIndexInfo.h | 2 - .../tests/gtest_dm_storage_delta_merge.cpp | 14 +-- dbms/src/Storages/IManageableStorage.h | 4 +- .../MultiRaft/Disagg/CheckpointIngestInfo.cpp | 6 +- .../Storages/KVStore/tests/gtest_kvstore.cpp | 6 +- .../tests/gtest_kvstore_fast_add_peer.cpp | 5 +- dbms/src/Storages/StorageDeltaMerge.cpp | 69 ++++---------- dbms/src/Storages/StorageDeltaMerge.h | 15 +-- .../TiDB/Schema/tests/gtest_schema_sync.cpp | 94 +------------------ 13 files changed, 46 insertions(+), 197 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 490670bf66b..bf32782f770 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -243,6 +243,7 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d || (managed_storage->engineType() != ::TiDB::StorageEngine::DT && managed_storage->engineType() != ::TiDB::StorageEngine::TMT)) { + LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name); storage = storage_tmp; table_lock = storage->lockForShare(context.getCurrentQueryId()); return; diff --git a/dbms/src/Server/BgStorageInit.cpp b/dbms/src/Server/BgStorageInit.cpp index 6fbb2060348..5c53db7083e 100644 --- a/dbms/src/Server/BgStorageInit.cpp +++ b/dbms/src/Server/BgStorageInit.cpp @@ -53,7 +53,7 @@ void doInitStores(Context & global_context, const LoggerPtr & log) const auto & [ks_id, table_id] = ks_table_id; try { - init_cnt += storage->initStoreIfNeed(restore_segments_thread_pool) ? 1 : 0; + init_cnt += storage->initStoreIfDataDirExist(restore_segments_thread_pool) ? 1 : 0; LOG_INFO(log, "Storage inited done, keyspace={} table_id={}", ks_id, table_id); } catch (...) diff --git a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp index 72543d32c16..3a4a47975a7 100644 --- a/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/Decode/SSTFilesToDTFilesOutputStream.cpp @@ -175,7 +175,7 @@ bool SSTFilesToDTFilesOutputStream::newDTFileStream() RUNTIME_CHECK(dt_stream == nullptr); // The parent_path and file_id are generated by the storage. - auto [parent_path, file_id] = storage->getAndMaybeInitStore()->preAllocateIngestFile(); + auto [parent_path, file_id] = storage->getStore()->preAllocateIngestFile(); if (parent_path.empty()) { // Can not allocate path and id for storing DTFiles (the storage may be dropped / shutdown) @@ -223,7 +223,7 @@ bool SSTFilesToDTFilesOutputStream::finalizeDTFileStream() // If remote data store is not enabled, add the DTFile to StoragePathPool so that we can restore it later // Else just add it's size to disk delegator - storage->getAndMaybeInitStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); + storage->getStore()->preIngestFile(dt_file->parentPath(), dt_file->fileId(), bytes_written); dt_stream.reset(); @@ -360,7 +360,7 @@ void SSTFilesToDTFilesOutputStream::cancel() try { // If DMFile has pre-ingested, remove it. - storage->getAndMaybeInitStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false); + storage->getStore()->removePreIngestFile(file->fileId(), /*throw_on_not_exist*/ false); // Remove local DMFile. file->remove(context.getFileProvider()); } diff --git a/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp index 1d2911dbcea..fe527ac7ad2 100644 --- a/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp @@ -314,23 +314,4 @@ LocalIndexInfosChangeset generateLocalIndexInfos( }; } -bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger) -{ - if (!isVectorIndexSupported(logger)) - return false; - - for (const auto & col : table_info.columns) - { - if (col.vector_index) - { - return true; - } - } - for (const auto & idx : table_info.index_infos) - { - if (idx.vector_index) - return true; - } - return false; -} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h index 055503bc39d..68f3fd82111 100644 --- a/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h +++ b/dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.h @@ -68,6 +68,4 @@ LocalIndexInfosChangeset generateLocalIndexInfos( const TiDB::TableInfo & new_table_info, const LoggerPtr & logger); - -bool containsLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger); } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp index 8860110ee1c..01d55d503b9 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_storage_delta_merge.cpp @@ -162,7 +162,7 @@ try EXPECT_EQ(col_value->getDataAt(i), String(DB::toString(num_rows_write))); } } - auto delta_store = storage->getAndMaybeInitStore(); + auto delta_store = storage->getStore(); size_t total_segment_rows = 0; auto segment_stats = delta_store->getSegmentsStats(); for (auto & stat : segment_stats) @@ -308,7 +308,7 @@ try auto sort_desc = storage->getPrimarySortDescription(); ASSERT_FALSE(storage->storeInited()); - const auto & store = storage->getAndMaybeInitStore(); + const auto & store = storage->getStore(); ASSERT_TRUE(storage->storeInited()); auto pk_type2 = store->getPKDataType(); auto sort_desc2 = store->getPrimarySortDescription(); @@ -826,11 +826,11 @@ try { write_data(num_rows_write, 1000); num_rows_write += 1000; - if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1) + if (storage->getStore()->getSegmentsStats().size() > 1) break; } { - ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); + ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(*ctx); @@ -847,13 +847,13 @@ try // write more data make sure segments more than 1 for (size_t i = 0; i < 100000; i++) { - if (storage->getAndMaybeInitStore()->getSegmentsStats().size() > 1) + if (storage->getStore()->getSegmentsStats().size() > 1) break; write_data(num_rows_write, 1000); num_rows_write += 1000; } { - ASSERT_GT(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); + ASSERT_GT(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_EQ(read_data(), num_rows_write); } storage->flushCache(*ctx); @@ -873,7 +873,7 @@ try // restore the table and make sure there is just one segment left create_table(); { - ASSERT_EQ(storage->getAndMaybeInitStore()->getSegmentsStats().size(), 1); + ASSERT_EQ(storage->getStore()->getSegmentsStats().size(), 1); ASSERT_LT(read_data(), num_rows_write); } storage->drop(); diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 343ecdee2ea..27d761f064c 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -90,8 +90,8 @@ class IManageableStorage : public IStorage /// `limit` is the max number of segments to gc, return value is the number of segments gced virtual UInt64 onSyncGc(Int64 /*limit*/, const DM::GCOptions &) { throw Exception("Unsupported"); } - /// Return true if the DeltaMergeStore instance need to be inited - virtual bool initStoreIfNeed(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } + /// Return true is data dir exist + virtual bool initStoreIfDataDirExist(ThreadPool * /*thread_pool*/) { throw Exception("Unsupported"); } virtual TiDB::StorageEngine engineType() const = 0; diff --git a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp index cb1ef198828..be220967915 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/Disagg/CheckpointIngestInfo.cpp @@ -80,8 +80,7 @@ CheckpointIngestInfoPtr CheckpointIngestInfo::restore( if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context - = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & seg_persisted : ingest_info_persisted.segments()) { ReadBufferFromString buf(seg_persisted.segment_meta()); @@ -198,8 +197,7 @@ void CheckpointIngestInfo::deleteWrittenData(TMTContext & tmt, RegionPtr region, if (storage && storage->engineType() == TiDB::StorageEngine::DT) { auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context - = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & segment_to_drop : segments) { DM::WriteBatches wbs(*dm_context->storage_pool, dm_context->getWriteLimiter()); diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index 4508c7af3c2..7a70e673fad 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -1085,7 +1085,7 @@ try { if (ingest_using_split) { - auto stats = storage->getAndMaybeInitStore()->getStoreStats(); + auto stats = storage->getStore()->getStoreStats(); // Including 0..20, 20..100, 100..inf. ASSERT_EQ(3, stats.segment_count); } @@ -1128,7 +1128,7 @@ try } } { - auto stats = storage->getAndMaybeInitStore()->getStoreStats(); + auto stats = storage->getStore()->getStoreStats(); ASSERT_NE(0, stats.total_stable_size_on_disk); ASSERT_NE(0, stats.total_rows); ASSERT_NE(0, stats.total_size); @@ -1150,7 +1150,7 @@ try auto gc_n = storage->onSyncGc(100, DM::GCOptions::newAllForTest()); ASSERT_EQ(0, gc_n); - auto stats = storage->getAndMaybeInitStore()->getStoreStats(); + auto stats = storage->getStore()->getStoreStats(); // The data of [20, 100), is not reclaimed during Apply Snapshot. if (ingest_using_split) { diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp index 87626e2e72d..15e2373a34f 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore_fast_add_peer.cpp @@ -246,8 +246,7 @@ void assertNoSegment( auto storage = storages.get(keyspace_id, table_id); RUNTIME_CHECK(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); - auto dm_context - = dm_storage->getAndMaybeInitStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); + auto dm_context = dm_storage->getStore()->newDMContext(tmt.getContext(), tmt.getContext().getSettingsRef()); for (const auto & seg_persisted : ingest_info_persisted.segments()) { ReadBufferFromString buf(seg_persisted.segment_meta()); @@ -486,7 +485,7 @@ try auto storage = global_context.getTMTContext().getStorages().get(keyspace_id, table_id); ASSERT_TRUE(storage && storage->engineType() == TiDB::StorageEngine::DT); auto dm_storage = std::dynamic_pointer_cast(storage); - auto store = dm_storage->getAndMaybeInitStore(); + auto store = dm_storage->getStore(); ASSERT_EQ(store->getRowKeyColumnSize(), 1); verifyRows( global_context, diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 39b2af08509..30a86d897c3 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -93,10 +93,11 @@ StorageDeltaMerge::StorageDeltaMerge( , global_context(global_context_.getGlobalContext()) , log(Logger::get(fmt::format("{}.{}", db_name_, table_name_))) { - RUNTIME_CHECK_MSG(!primary_expr_ast_->children.empty(), "No primary key, ident={}", log->identifier()); + if (primary_expr_ast_->children.empty()) + throw Exception("No primary key"); // save schema from TiDB - if (likely(table_info_)) + if (table_info_) { tidb_table_info = table_info_->get(); is_common_handle = tidb_table_info.is_common_handle; @@ -112,11 +113,6 @@ StorageDeltaMerge::StorageDeltaMerge( table_column_info = std::make_unique(db_name_, table_name_, primary_expr_ast_); updateTableColumnInfo(); - - if (table_info_ && containsLocalIndexInfos(table_info_.value(), log)) - { - getAndMaybeInitStore(); - } } void StorageDeltaMerge::updateTableColumnInfo() @@ -1398,34 +1394,16 @@ void StorageDeltaMerge::alterSchemaChange( LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize()); { - // In order to avoid concurrent issue between init store and DDL, - // we must acquire the lock before schema changes is applied. - std::lock_guard lock(store_mutex); + std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL. if (storeInited()) { _store->applySchemaChanges(table_info); } - else + else // it seems we will never come into this branch ? { - if (containsLocalIndexInfos(table_info, log)) - { - // If there exist vector index, then we must init the store to create - // at least 1 segment. So that tidb can detect the index is added. - initStore(lock); - _store->applySchemaChanges(table_info); - } - else - { - // If there is no data need to be stored for this table, the _store instance - // is not inited to reduce fragmentation files that may exhaust the inode of - // disk. - // Under this case, we update some in-memory variables to ensure the correctness. - updateTableColumnInfo(); - } + updateTableColumnInfo(); } } - - // Should generate new decoding snapshot and cache block decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); @@ -1824,9 +1802,14 @@ SortDescription StorageDeltaMerge::getPrimarySortDescription() const return desc; } -DeltaMergeStorePtr & StorageDeltaMerge::initStore(const std::lock_guard &, ThreadPool * thread_pool) +DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool) { - if (likely(_store == nullptr)) + if (storeInited()) + { + return _store; + } + std::lock_guard lock(store_mutex); + if (_store == nullptr) { auto index_infos = initLocalIndexInfos(tidb_table_info, log); _store = DeltaMergeStore::create( @@ -1851,20 +1834,8 @@ DeltaMergeStorePtr & StorageDeltaMerge::initStore(const std::lock_guard &, ThreadPool * thread_pool = nullptr); + DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr); bool storeInited() const { return store_inited.load(std::memory_order_acquire); } void updateTableColumnInfo(); ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info); diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index 9a531f4578a..eb41dd6e2af 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -796,7 +796,7 @@ try DM::tests::DeltaMergeStoreVectorBase dmsv; StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); - dmsv.store = storage->getAndMaybeInitStore(); + dmsv.store = storage->getStore(); dmsv.db_context = std::make_shared(global_ctx.getGlobalContext()); dmsv.vec_column_name = cols.getAllPhysical().back().name; dmsv.vec_column_id = mustGetSyncedTable(t1_id)->getTableInfo().getColumnID(dmsv.vec_column_name); @@ -818,6 +818,7 @@ try // sync table schema, the VectorIndex in TableInfo should be updated refreshTableSchema(t1_id); auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); idx_infos = tbl_info.index_infos; ASSERT_EQ(idx_infos.size(), 1); for (const auto & idx : idx_infos) @@ -903,95 +904,4 @@ try } CATCH - -TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase1) -try -{ - auto pd_client = global_ctx.getTMTContext().getPDClient(); - - const String db_name = "mock_db"; - MockTiDB::instance().newDataBase(db_name); - - auto cols = ColumnsDescription({ - {"col1", typeFromString("Int64")}, - {"vec", typeFromString("Array(Float32)")}, - }); - auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); - refreshSchema(); - - // The `StorageDeltaMerge` is created but `DeltaMergeStore` is not inited - StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); - ASSERT_EQ(nullptr,storage->getStoreIfInited()); - - // add a vector index - IndexID idx_id = 11; - auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ - .kind = tipb::VectorIndexKind::HNSW, - .dimension = 3, - .distance_metric = tipb::VectorDistanceMetric::L2, - }); - MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); - - // sync table schema, the VectorIndex in TableInfo should be updated - refreshTableSchema(t1_id); - // The `DeltaMergeStore` instanced should be inited - ASSERT_NE(nullptr,storage->getStoreIfInited()); - - auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); - auto idx_infos = tbl_info.index_infos; - ASSERT_EQ(idx_infos.size(), 1); - for (const auto & idx : idx_infos) - { - ASSERT_EQ(idx.id, idx_id); - ASSERT_NE(idx.vector_index, nullptr); - ASSERT_EQ(idx.vector_index->kind, vector_index->kind); - ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); - ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); - } - -} -CATCH - -TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase2) -try -{ - auto pd_client = global_ctx.getTMTContext().getPDClient(); - - const String db_name = "mock_db"; - MockTiDB::instance().newDataBase(db_name); - - // The table is created and vector index is added. After that, the table info is synced to TiFlash - auto cols = ColumnsDescription({ - {"col1", typeFromString("Int64")}, - {"vec", typeFromString("Array(Float32)")}, - }); - auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); - IndexID idx_id = 11; - auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ - .kind = tipb::VectorIndexKind::HNSW, - .dimension = 3, - .distance_metric = tipb::VectorDistanceMetric::L2, - }); - MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); - - // Synced with mock tidb, and create the StorageDeltaMerge instance - refreshTableSchema(t1_id); - // The `DeltaMergeStore` instanced should be inited - StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); - ASSERT_NE(nullptr, storage->getStoreIfInited()); - - auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); - auto idx_infos = tbl_info.index_infos; - ASSERT_EQ(idx_infos.size(), 1); - for (const auto & idx : idx_infos) - { - ASSERT_EQ(idx.id, idx_id); - ASSERT_NE(idx.vector_index, nullptr); - ASSERT_EQ(idx.vector_index->kind, vector_index->kind); - ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); - ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); - } -} -CATCH - } // namespace DB::tests From b362ba3d9b25356b46a2fe99db2fa701adea8513 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2024 16:01:03 +0800 Subject: [PATCH 5/7] Pick some comment refine --- .../Interpreters/InterpreterSelectQuery.cpp | 1 - dbms/src/Storages/StorageDeltaMerge.cpp | 12 +++++++++-- dbms/src/Storages/StorageDeltaMerge.h | 21 ++++++++++++------- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index bf32782f770..490670bf66b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -243,7 +243,6 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d || (managed_storage->engineType() != ::TiDB::StorageEngine::DT && managed_storage->engineType() != ::TiDB::StorageEngine::TMT)) { - LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name); storage = storage_tmp; table_lock = storage->lockForShare(context.getCurrentQueryId()); return; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 30a86d897c3..43777c309a3 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1394,16 +1394,24 @@ void StorageDeltaMerge::alterSchemaChange( LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize()); { - std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL. + // In order to avoid concurrent issue between init store and DDL, + // we must acquire the lock before schema changes is applied. + std::lock_guard lock(store_mutex); if (storeInited()) { _store->applySchemaChanges(table_info); } - else // it seems we will never come into this branch ? + else { + // If there is no data need to be stored for this table, the _store instance + // is not inited to reduce fragmentation files that may exhaust the inode of + // disk. + // Under this case, we update some in-memory variables to ensure the correctness. updateTableColumnInfo(); } } + + // Should generate new decoding snapshot and cache block decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 4d9813a4c44..ba4f7ab1172 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -191,14 +191,21 @@ class StorageDeltaMerge void checkStatus(const Context & context) override; void deleteRows(const Context &, size_t rows) override; + bool isCommonHandle() const override { return is_common_handle; } + + size_t getRowKeyColumnSize() const override { return rowkey_column_size; } + + DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); } + +public: const DM::DeltaMergeStorePtr & getStore() { return getAndMaybeInitStore(); } DM::DeltaMergeStorePtr getStoreIfInited() const; - bool isCommonHandle() const override { return is_common_handle; } - - size_t getRowKeyColumnSize() const override { return rowkey_column_size; } + bool initStoreIfDataDirExist(ThreadPool * thread_pool) override; +public: + /// decoding methods std::pair getSchemaSnapshotAndBlockForDecoding( const TableStructureLockHolder & table_structure_lock, bool need_block, @@ -206,10 +213,6 @@ class StorageDeltaMerge void releaseDecodingBlock(Int64 block_decoding_schema_epoch, BlockUPtr block) override; - bool initStoreIfDataDirExist(ThreadPool * thread_pool) override; - - DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); } - #ifndef DBMS_PUBLIC_GTEST protected: #endif @@ -238,8 +241,12 @@ class StorageDeltaMerge DataTypePtr getPKTypeImpl() const override; + // Return the DeltaMergeStore instance + // If the instance is not inited, this method will initialize the instance + // and return it. DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr); bool storeInited() const { return store_inited.load(std::memory_order_acquire); } + void updateTableColumnInfo(); ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info); DM::ColumnDefines getStoreColumnDefines() const override; From 051702d440bed036be39ae849ca0dfdf4adc0e72 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Thu, 26 Sep 2024 16:12:50 +0800 Subject: [PATCH 6/7] Add unit test --- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 3 + .../DeltaMerge/DeltaMergeStore_Statistics.cpp | 18 +++ .../System/StorageSystemDTLocalIndexes.cpp | 22 +--- .../TiDB/Schema/tests/gtest_schema_sync.cpp | 120 ++++++++++++++++++ 4 files changed, 142 insertions(+), 21 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 341f8efb91f..171c0957a2d 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -575,7 +575,10 @@ class DeltaMergeStore StoreStats getStoreStats(); SegmentsStats getSegmentsStats(); + LocalIndexesStats getLocalIndexStats(); + // Generate local index stats for non inited DeltaMergeStore + static std::optional genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info); bool isCommonHandle() const { return is_common_handle; } size_t getRowKeyColumnSize() const { return rowkey_column_size; } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index 29fb04e42cf..89961f92c08 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -193,6 +193,24 @@ SegmentsStats DeltaMergeStore::getSegmentsStats() return stats; } +std::optional DeltaMergeStore::genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info) +{ + auto local_index_infos = DM::generateLocalIndexInfos(nullptr, table_info, Logger::get()); + if (!local_index_infos) + return std::nullopt; + + DM::LocalIndexesStats stats; + for (const auto & index_info : *local_index_infos) + { + DM::LocalIndexStats index_stats; + index_stats.column_id = index_info.column_id; + index_stats.index_id = index_info.index_id; + index_stats.index_kind = "HNSW"; + stats.emplace_back(std::move(index_stats)); + } + return stats; +} + LocalIndexesStats DeltaMergeStore::getLocalIndexStats() { auto local_index_infos_snap = getLocalIndexInfosSnapshot(); diff --git a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp index 45e6cda1d8f..bf0419dc635 100644 --- a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp +++ b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp @@ -61,26 +61,6 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam })); } -DM::LocalIndexesStats generateLocalIndexesStatsFromTableInfo(const TiDB::TableInfo & table_info) -{ - auto local_index_infos = DM::generateLocalIndexInfos(nullptr, table_info, Logger::get()); - - DM::LocalIndexesStats stats; - if (!local_index_infos) - return stats; - - for (const auto & index_info : *local_index_infos) - { - DM::LocalIndexStats index_stats; - index_stats.column_id = index_info.column_id; - index_stats.index_id = index_info.index_id; - index_stats.column_name = index_info.column_name; - index_stats.index_kind = "HNSW"; - stats.emplace_back(std::move(index_stats)); - } - return stats; -} - std::optional getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage) { if (dm_storage->isTombstone()) @@ -89,7 +69,7 @@ std::optional getLocalIndexesStatsFromStorage(const Stora const auto & table_info = dm_storage->getTableInfo(); auto store = dm_storage->getStoreIfInited(); if (!store) - return generateLocalIndexesStatsFromTableInfo(table_info); + return DM::DeltaMergeStore::genLocalIndexStatsByTableInfo(table_info); return store->getLocalIndexStats(); } diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index eb41dd6e2af..7f6d33487d7 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -52,6 +52,9 @@ extern const char force_context_path[]; extern const char force_set_num_regions_for_table[]; extern const char random_ddl_fail_when_rename_partitions[]; } // namespace FailPoints + +// defined in StorageSystemDTLocalIndexes.cpp +std::optional getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage); } // namespace DB namespace DB::tests { @@ -904,4 +907,121 @@ try } CATCH +TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase1) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + refreshSchema(); + + // The `StorageDeltaMerge` is created but `DeltaMergeStore` is not inited + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + { + // The `DeltaMergeStore` is not inited + ASSERT_EQ(nullptr, storage->getStoreIfInited()); + auto stats = getLocalIndexesStatsFromStorage(storage); + ASSERT_FALSE(stats.has_value()); + } + + // add a vector index + IndexID idx_id = 11; + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // sync table schema, the VectorIndex in TableInfo should be updated + refreshTableSchema(t1_id); + { + // The `DeltaMergeStore` is not inited + ASSERT_EQ(nullptr, storage->getStoreIfInited()); + auto stats = getLocalIndexesStatsFromStorage(storage); + ASSERT_TRUE(stats.has_value()); + ASSERT_EQ(stats->size(), 1); + auto & s = (*stats)[0]; + ASSERT_EQ(s.index_id, idx_id); + ASSERT_EQ(s.rows_delta_indexed, 0); + ASSERT_EQ(s.rows_delta_not_indexed, 0); + ASSERT_EQ(s.rows_stable_indexed, 0); + ASSERT_EQ(s.rows_stable_not_indexed, 0); + } + + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + auto idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } +} +CATCH + +TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase2) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + // The table is created and vector index is added. After that, the table info is synced to TiFlash + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + IndexID idx_id = 11; + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // Synced with mock tidb, and create the StorageDeltaMerge instance + refreshTableSchema(t1_id); + { + // The `DeltaMergeStore` is not inited + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + ASSERT_EQ(nullptr, storage->getStoreIfInited()); + auto stats = getLocalIndexesStatsFromStorage(storage); + ASSERT_TRUE(stats.has_value()); + ASSERT_EQ(stats->size(), 1); + auto & s = (*stats)[0]; + ASSERT_EQ(s.index_id, idx_id); + ASSERT_EQ(s.rows_delta_indexed, 0); + ASSERT_EQ(s.rows_delta_not_indexed, 0); + ASSERT_EQ(s.rows_stable_indexed, 0); + ASSERT_EQ(s.rows_stable_not_indexed, 0); + } + + + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + auto idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } +} +CATCH + } // namespace DB::tests From 679765dc9e5e1acfb03ab99d3da67711a3f93677 Mon Sep 17 00:00:00 2001 From: JaySon-Huang Date: Fri, 27 Sep 2024 19:08:25 +0800 Subject: [PATCH 7/7] Resolve conflict --- dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp | 2 +- dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index 89961f92c08..b77855d497b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -195,7 +195,7 @@ SegmentsStats DeltaMergeStore::getSegmentsStats() std::optional DeltaMergeStore::genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info) { - auto local_index_infos = DM::generateLocalIndexInfos(nullptr, table_info, Logger::get()); + auto local_index_infos = DM::initLocalIndexInfos(table_info, Logger::get()); if (!local_index_infos) return std::nullopt; diff --git a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp index bf0419dc635..afd14ff6381 100644 --- a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp +++ b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include