diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index bf32782f770..490670bf66b 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -243,7 +243,6 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d || (managed_storage->engineType() != ::TiDB::StorageEngine::DT && managed_storage->engineType() != ::TiDB::StorageEngine::TMT)) { - LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name); storage = storage_tmp; table_lock = storage->lockForShare(context.getCurrentQueryId()); return; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 341f8efb91f..171c0957a2d 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -575,7 +575,10 @@ class DeltaMergeStore StoreStats getStoreStats(); SegmentsStats getSegmentsStats(); + LocalIndexesStats getLocalIndexStats(); + // Generate local index stats for non inited DeltaMergeStore + static std::optional genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info); bool isCommonHandle() const { return is_common_handle; } size_t getRowKeyColumnSize() const { return rowkey_column_size; } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index 29fb04e42cf..b77855d497b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -193,6 +193,24 @@ SegmentsStats DeltaMergeStore::getSegmentsStats() return stats; } +std::optional DeltaMergeStore::genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info) +{ + auto local_index_infos = DM::initLocalIndexInfos(table_info, Logger::get()); + if (!local_index_infos) + return std::nullopt; + + DM::LocalIndexesStats stats; + for (const auto & index_info : *local_index_infos) + { + DM::LocalIndexStats index_stats; + index_stats.column_id = index_info.column_id; + index_stats.index_id = index_info.index_id; + index_stats.index_kind = "HNSW"; + stats.emplace_back(std::move(index_stats)); + } + return stats; +} + LocalIndexesStats DeltaMergeStore::getLocalIndexStats() { auto local_index_infos_snap = getLocalIndexInfosSnapshot(); diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 30a86d897c3..43777c309a3 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -1394,16 +1394,24 @@ void StorageDeltaMerge::alterSchemaChange( LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize()); { - std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL. + // In order to avoid concurrent issue between init store and DDL, + // we must acquire the lock before schema changes is applied. + std::lock_guard lock(store_mutex); if (storeInited()) { _store->applySchemaChanges(table_info); } - else // it seems we will never come into this branch ? + else { + // If there is no data need to be stored for this table, the _store instance + // is not inited to reduce fragmentation files that may exhaust the inode of + // disk. + // Under this case, we update some in-memory variables to ensure the correctness. updateTableColumnInfo(); } } + + // Should generate new decoding snapshot and cache block decoding_schema_changed = true; SortDescription pk_desc = getPrimarySortDescription(); diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 4d9813a4c44..ba4f7ab1172 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -191,14 +191,21 @@ class StorageDeltaMerge void checkStatus(const Context & context) override; void deleteRows(const Context &, size_t rows) override; + bool isCommonHandle() const override { return is_common_handle; } + + size_t getRowKeyColumnSize() const override { return rowkey_column_size; } + + DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); } + +public: const DM::DeltaMergeStorePtr & getStore() { return getAndMaybeInitStore(); } DM::DeltaMergeStorePtr getStoreIfInited() const; - bool isCommonHandle() const override { return is_common_handle; } - - size_t getRowKeyColumnSize() const override { return rowkey_column_size; } + bool initStoreIfDataDirExist(ThreadPool * thread_pool) override; +public: + /// decoding methods std::pair getSchemaSnapshotAndBlockForDecoding( const TableStructureLockHolder & table_structure_lock, bool need_block, @@ -206,10 +213,6 @@ class StorageDeltaMerge void releaseDecodingBlock(Int64 block_decoding_schema_epoch, BlockUPtr block) override; - bool initStoreIfDataDirExist(ThreadPool * thread_pool) override; - - DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); } - #ifndef DBMS_PUBLIC_GTEST protected: #endif @@ -238,8 +241,12 @@ class StorageDeltaMerge DataTypePtr getPKTypeImpl() const override; + // Return the DeltaMergeStore instance + // If the instance is not inited, this method will initialize the instance + // and return it. DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr); bool storeInited() const { return store_inited.load(std::memory_order_acquire); } + void updateTableColumnInfo(); ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info); DM::ColumnDefines getStoreColumnDefines() const override; diff --git a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp index 6da97c76aca..afd14ff6381 100644 --- a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp +++ b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp @@ -21,11 +21,14 @@ #include #include #include +#include +#include #include #include #include #include #include +#include namespace DB { @@ -58,6 +61,19 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam })); } +std::optional getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage) +{ + if (dm_storage->isTombstone()) + return std::nullopt; + + const auto & table_info = dm_storage->getTableInfo(); + auto store = dm_storage->getStoreIfInited(); + if (!store) + return DM::DeltaMergeStore::genLocalIndexStatsByTableInfo(table_info); + + return store->getLocalIndexStats(); +} + BlockInputStreams StorageSystemDTLocalIndexes::read( const Names & column_names, const SelectQueryInfo &, @@ -90,16 +106,12 @@ BlockInputStreams StorageSystemDTLocalIndexes::read( auto dm_storage = std::dynamic_pointer_cast(storage); const auto & table_info = dm_storage->getTableInfo(); - auto table_id = table_info.id; - auto store = dm_storage->getStoreIfInited(); - if (!store) - continue; + const auto table_id = table_info.id; - if (dm_storage->isTombstone()) + const auto index_stats = getLocalIndexesStatsFromStorage(dm_storage); + if (!index_stats) continue; - - auto index_stats = store->getLocalIndexStats(); - for (auto & stat : index_stats) + for (const auto & stat : *index_stats) { size_t j = 0; res_columns[j++]->insert(database_name); diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index eb41dd6e2af..7f6d33487d7 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -52,6 +52,9 @@ extern const char force_context_path[]; extern const char force_set_num_regions_for_table[]; extern const char random_ddl_fail_when_rename_partitions[]; } // namespace FailPoints + +// defined in StorageSystemDTLocalIndexes.cpp +std::optional getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage); } // namespace DB namespace DB::tests { @@ -904,4 +907,121 @@ try } CATCH +TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase1) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + refreshSchema(); + + // The `StorageDeltaMerge` is created but `DeltaMergeStore` is not inited + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + { + // The `DeltaMergeStore` is not inited + ASSERT_EQ(nullptr, storage->getStoreIfInited()); + auto stats = getLocalIndexesStatsFromStorage(storage); + ASSERT_FALSE(stats.has_value()); + } + + // add a vector index + IndexID idx_id = 11; + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // sync table schema, the VectorIndex in TableInfo should be updated + refreshTableSchema(t1_id); + { + // The `DeltaMergeStore` is not inited + ASSERT_EQ(nullptr, storage->getStoreIfInited()); + auto stats = getLocalIndexesStatsFromStorage(storage); + ASSERT_TRUE(stats.has_value()); + ASSERT_EQ(stats->size(), 1); + auto & s = (*stats)[0]; + ASSERT_EQ(s.index_id, idx_id); + ASSERT_EQ(s.rows_delta_indexed, 0); + ASSERT_EQ(s.rows_delta_not_indexed, 0); + ASSERT_EQ(s.rows_stable_indexed, 0); + ASSERT_EQ(s.rows_stable_not_indexed, 0); + } + + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + auto idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } +} +CATCH + +TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase2) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + // The table is created and vector index is added. After that, the table info is synced to TiFlash + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + IndexID idx_id = 11; + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // Synced with mock tidb, and create the StorageDeltaMerge instance + refreshTableSchema(t1_id); + { + // The `DeltaMergeStore` is not inited + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + ASSERT_EQ(nullptr, storage->getStoreIfInited()); + auto stats = getLocalIndexesStatsFromStorage(storage); + ASSERT_TRUE(stats.has_value()); + ASSERT_EQ(stats->size(), 1); + auto & s = (*stats)[0]; + ASSERT_EQ(s.index_id, idx_id); + ASSERT_EQ(s.rows_delta_indexed, 0); + ASSERT_EQ(s.rows_delta_not_indexed, 0); + ASSERT_EQ(s.rows_stable_indexed, 0); + ASSERT_EQ(s.rows_stable_not_indexed, 0); + } + + + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + auto idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } +} +CATCH + } // namespace DB::tests diff --git a/tests/fullstack-test2/vector/vector-index.test b/tests/fullstack-test2/vector/vector-index.test index a151ea2a668..77fd1431cbc 100644 --- a/tests/fullstack-test2/vector/vector-index.test +++ b/tests/fullstack-test2/vector/vector-index.test @@ -16,6 +16,15 @@ # Preparation. mysql> drop table if exists test.t; +# Build vector index on empty table, it should return quickly +mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL); +mysql> alter table test.t set tiflash replica 1; +func> wait_table test t +mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_l2 ((VEC_L2_DISTANCE(v))) USING HNSW; +mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_cos ((VEC_COSINE_DISTANCE(v))) USING HNSW; +mysql> drop table if exists test.t; + +# Build vector index on table with data on the stable layer mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL); mysql> INSERT INTO test.t VALUES ('[8.7, 5.7, 7.7, 9.8, 1.5]'),('[3.6, 9.7, 2.4, 6.6, 4.9]'),('[4.7, 4.9, 2.6, 5.2, 7.4]'),('[7.7, 6.7, 8.3, 7.8, 5.7]'),('[1.4, 4.5, 8.5, 7.7, 6.2]'); mysql> alter table test.t set tiflash replica 1;