diff --git a/dbms/src/Debug/MockTiDB.cpp b/dbms/src/Debug/MockTiDB.cpp index d053e5669df..c872526948e 100644 --- a/dbms/src/Debug/MockTiDB.cpp +++ b/dbms/src/Debug/MockTiDB.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -46,6 +47,7 @@ extern const int UNKNOWN_TABLE; } // namespace ErrorCodes using ColumnInfo = TiDB::ColumnInfo; +using IndexInfo = TiDB::IndexInfo; using TableInfo = TiDB::TableInfo; using PartitionInfo = TiDB::PartitionInfo; using PartitionDefinition = TiDB::PartitionDefinition; @@ -546,6 +548,87 @@ void MockTiDB::dropPartition(const String & database_name, const String & table_ version_diff[version] = diff; } +IndexInfo reverseGetIndexInfo( + IndexID id, + const NameAndTypePair & column, + Int32 offset, + TiDB::VectorIndexDefinitionPtr vector_index) +{ + IndexInfo index_info; + index_info.id = id; + index_info.state = TiDB::StatePublic; + + std::vector idx_cols; + Poco::JSON::Object::Ptr idx_col_json = new Poco::JSON::Object(); + Poco::JSON::Object::Ptr name_json = new Poco::JSON::Object(); + name_json->set("O", column.name); + name_json->set("L", column.name); + idx_col_json->set("name", name_json); + idx_col_json->set("length", -1); + idx_col_json->set("offset", offset); + TiDB::IndexColumnInfo idx_col(idx_col_json); + index_info.idx_cols.push_back(idx_col); + index_info.vector_index = vector_index; + + return index_info; +} + +void MockTiDB::addVectorIndexToTable( + const String & database_name, + const String & table_name, + const IndexID index_id, + const NameAndTypePair & column_name, + Int32 offset, + TiDB::VectorIndexDefinitionPtr vector_index) +{ + std::lock_guard lock(tables_mutex); + + TablePtr table = getTableByNameInternal(database_name, table_name); + String qualified_name = database_name + "." + table_name; + auto & indexes = table->table_info.index_infos; + if (std::find_if(indexes.begin(), indexes.end(), [&](const IndexInfo & index_) { return index_.id == index_id; }) + != indexes.end()) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Index {} already exists in TiDB table {}", + index_id, + qualified_name); + IndexInfo index_info = reverseGetIndexInfo(index_id, column_name, offset, vector_index); + indexes.push_back(index_info); + + version++; + + SchemaDiff diff; + diff.type = SchemaActionType::ActionAddVectorIndex; + diff.schema_id = table->database_id; + diff.table_id = table->id(); + diff.version = version; + version_diff[version] = diff; +} + +void MockTiDB::dropVectorIndexFromTable(const String & database_name, const String & table_name, IndexID index_id) +{ + std::lock_guard lock(tables_mutex); + + TablePtr table = getTableByNameInternal(database_name, table_name); + String qualified_name = database_name + "." + table_name; + + auto & indexes = table->table_info.index_infos; + auto it + = std::find_if(indexes.begin(), indexes.end(), [&](const IndexInfo & index_) { return index_.id == index_id; }); + RUNTIME_CHECK_MSG(it != indexes.end(), "Index {} does not exist in TiDB table {}", index_id, qualified_name); + indexes.erase(it); + + version++; + + SchemaDiff diff; + diff.type = SchemaActionType::DropIndex; + diff.schema_id = table->database_id; + diff.table_id = table->id(); + diff.version = version; + version_diff[version] = diff; +} + void MockTiDB::addColumnToTable( const String & database_name, const String & table_name, diff --git a/dbms/src/Debug/MockTiDB.h b/dbms/src/Debug/MockTiDB.h index 55c7386d6bb..911710f93d0 100644 --- a/dbms/src/Debug/MockTiDB.h +++ b/dbms/src/Debug/MockTiDB.h @@ -125,6 +125,16 @@ class MockTiDB : public ext::Singleton void dropDB(Context & context, const String & database_name, bool drop_regions); + void addVectorIndexToTable( + const String & database_name, + const String & table_name, + IndexID index_id, + const NameAndTypePair & column_name, + Int32 offset, + TiDB::VectorIndexDefinitionPtr vector_index); + + void dropVectorIndexFromTable(const String & database_name, const String & table_name, IndexID index_id); + void addColumnToTable( const String & database_name, const String & table_name, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0b7bf52fa09..b67c78ce102 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -62,6 +63,8 @@ #include #include #include +#include +#include namespace ProfileEvents @@ -219,7 +222,7 @@ DeltaMergeStore::DeltaMergeStore( const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, - IndexInfosPtr local_index_infos_, + LocalIndexInfosPtr local_index_infos_, const Settings & settings_, ThreadPool * thread_pool) : global_context(db_context.getGlobalContext()) @@ -339,7 +342,7 @@ DeltaMergeStorePtr DeltaMergeStore::create( const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, - IndexInfosPtr local_index_infos_, + LocalIndexInfosPtr local_index_infos_, const Settings & settings_, ThreadPool * thread_pool) { @@ -2017,8 +2020,29 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info) original_table_columns.swap(new_original_table_columns); store_columns.swap(new_store_columns); - // TODO(local index): There could be some local indexes added/dropped after DDL + // copy the local_index_infos to check whether any new index is created + LocalIndexInfosPtr local_index_infos_copy = nullptr; + { + std::shared_lock index_read_lock(mtx_local_index_infos); + local_index_infos_copy = std::shared_ptr(local_index_infos); + } + std::atomic_store(&original_table_header, std::make_shared(toEmptyBlock(original_table_columns))); + + // release the lock because `checkAllSegmentsLocalIndex` will try to acquire the lock + // and generate tasks on segments + lock.unlock(); + + auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_copy, table_info, log); + if (new_local_index_infos) + { + { + // new index created, update the info in-memory + std::unique_lock index_write_lock(mtx_local_index_infos); + local_index_infos.swap(new_local_index_infos); + } + checkAllSegmentsLocalIndex(); + } // else no new index is created } SortDescription DeltaMergeStore::getPrimarySortDescription() const diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 2c3aff36e63..194315eb905 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -178,6 +178,7 @@ struct LocalIndexStats { String column_name{}; UInt64 column_id{}; + UInt64 index_id{}; String index_kind{}; UInt64 rows_stable_indexed{}; // Total rows @@ -296,7 +297,7 @@ class DeltaMergeStore const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, - IndexInfosPtr local_index_infos_, + LocalIndexInfosPtr local_index_infos_, const Settings & settings_ = EMPTY_SETTINGS, ThreadPool * thread_pool = nullptr); @@ -314,7 +315,7 @@ class DeltaMergeStore const ColumnDefine & handle, bool is_common_handle_, size_t rowkey_column_size_, - IndexInfosPtr local_index_infos_, + LocalIndexInfosPtr local_index_infos_, const Settings & settings_ = EMPTY_SETTINGS, ThreadPool * thread_pool = nullptr); @@ -720,7 +721,7 @@ class DeltaMergeStore void segmentEnsureStableIndex( DMContext & dm_context, - const IndexInfosPtr & index_info, + const LocalIndexInfosPtr & index_info, const DMFiles & dm_files, const String & source_segment_info); @@ -863,6 +864,15 @@ class DeltaMergeStore const SegmentPtr & segment, const DMFiles & new_dm_files); + // Get a snap of local_index_infos to check whether any new index is created. + LocalIndexInfosPtr getLocalIndexInfosSnapshot() const + { + std::shared_lock index_read_lock(mtx_local_index_infos); + if (!local_index_infos || local_index_infos->empty()) + return nullptr; + return std::make_shared(*local_index_infos); + } + /** * Check whether there are new local indexes should be built for all segments. */ @@ -950,7 +960,8 @@ class DeltaMergeStore // Some indexes are built in TiFlash locally. For example, Vector Index. // Compares to the lightweight RoughSet Indexes, these indexes require lot // of resources to build, so they will be built in separated background pool. - IndexInfosPtr local_index_infos; + LocalIndexInfosPtr local_index_infos; + mutable std::shared_mutex mtx_local_index_infos; struct DMFileIDToSegmentIDs { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 3226b6b618b..d20acabfa2e 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -452,7 +452,7 @@ SegmentPtr DeltaMergeStore::segmentMerge( void DeltaMergeStore::checkAllSegmentsLocalIndex() { - if (!local_index_infos || local_index_infos->empty()) + if (!getLocalIndexInfosSnapshot()) return; LOG_INFO(log, "CheckAllSegmentsLocalIndex - Begin"); @@ -529,12 +529,13 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment) RUNTIME_CHECK(segment != nullptr); // TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL - if (!local_index_infos || local_index_infos->empty()) + auto local_index_infos_snap = getLocalIndexInfosSnapshot(); + if (!local_index_infos_snap) return false; // No lock is needed, stable meta is immutable. auto dm_files = segment->getStable()->getDMFiles(); - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files); + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files); if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) return false; @@ -569,13 +570,14 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co RUNTIME_CHECK(segment != nullptr); // TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL - if (!local_index_infos || local_index_infos->empty()) + auto local_index_infos_snap = getLocalIndexInfosSnapshot(); + if (!local_index_infos_snap) return true; // No lock is needed, stable meta is immutable. auto segment_id = segment->segmentId(); auto dm_files = segment->getStable()->getDMFiles(); - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files); + auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, dm_files); if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) return true; @@ -653,7 +655,7 @@ SegmentPtr DeltaMergeStore::segmentUpdateMeta( void DeltaMergeStore::segmentEnsureStableIndex( DMContext & dm_context, - const IndexInfosPtr & index_info, + const LocalIndexInfosPtr & index_info, const DMFiles & dm_files, const String & source_segment_info) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp index 12e1ab15d62..90e899e53f7 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp @@ -194,16 +194,18 @@ SegmentsStats DeltaMergeStore::getSegmentsStats() LocalIndexesStats DeltaMergeStore::getLocalIndexStats() { - std::shared_lock lock(read_write_mutex); - - if (!local_index_infos || local_index_infos->empty()) + auto local_index_infos_snap = getLocalIndexInfosSnapshot(); + if (!local_index_infos_snap) return {}; + std::shared_lock lock(read_write_mutex); + LocalIndexesStats stats; - for (const auto & index_info : *local_index_infos) + for (const auto & index_info : *local_index_infos_snap) { LocalIndexStats index_stats; index_stats.column_id = index_info.column_id; + index_stats.index_id = index_info.index_id; index_stats.column_name = index_info.column_name; index_stats.index_kind = "HNSW"; // TODO: Support more. diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp index 7e438773438..01410a57970 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -32,14 +33,14 @@ namespace DB::DM { DMFileIndexWriter::LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo( - const IndexInfosPtr & index_infos, + const LocalIndexInfosPtr & index_infos, const DMFiles & dm_files) { assert(index_infos != nullptr); static constexpr double VECTOR_INDEX_SIZE_FACTOR = 1.2; LocalIndexBuildInfo build; - build.indexes_to_build = std::make_shared(); + build.indexes_to_build = std::make_shared(); build.file_ids.reserve(dm_files.size()); for (const auto & dmfile : dm_files) { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h index 09b64999809..3b4e13868bd 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h @@ -44,15 +44,15 @@ class DMFileIndexWriter { std::vector file_ids; size_t estimated_memory_bytes = 0; - IndexInfosPtr indexes_to_build; + LocalIndexInfosPtr indexes_to_build; }; - static LocalIndexBuildInfo getLocalIndexBuildInfo(const IndexInfosPtr & index_infos, const DMFiles & dm_files); + static LocalIndexBuildInfo getLocalIndexBuildInfo(const LocalIndexInfosPtr & index_infos, const DMFiles & dm_files); struct Options { const StoragePathPoolPtr path_pool; - const IndexInfosPtr index_infos; + const LocalIndexInfosPtr index_infos; const DMFiles dm_files; const DMContext & dm_context; }; diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp new file mode 100644 index 00000000000..afeaf90a02f --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.cpp @@ -0,0 +1,208 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include + +namespace DB::DM +{ + +bool isVectorIndexSupported(const LoggerPtr & logger) +{ + // Vector Index requires a specific storage format to work. + if ((STORAGE_FORMAT_CURRENT.identifier > 0 && STORAGE_FORMAT_CURRENT.identifier < 6) + || STORAGE_FORMAT_CURRENT.identifier == 100) + { + LOG_ERROR( + logger, + "The current storage format is {}, which does not support building vector index. TiFlash will " + "write data without vector index.", + STORAGE_FORMAT_CURRENT.identifier); + return false; + } + + return true; +} + +TiDB::ColumnInfo getVectorIndxColumnInfo( + const TiDB::TableInfo & table_info, + const TiDB::IndexInfo & idx_info, + const LoggerPtr & logger) +{ + if (!idx_info.vector_index + || (idx_info.state != TiDB::StatePublic && idx_info.state != TiDB::StateWriteReorganization)) + { + return {}; + } + + // Vector Index requires a specific storage format to work. + if (!isVectorIndexSupported(logger)) + { + return {}; + } + + if (idx_info.idx_cols.size() != 1) + { + LOG_ERROR( + logger, + "The index columns length is {}, which does not support building vector index, index_id={}, table_id={}.", + idx_info.idx_cols.size(), + idx_info.id, + table_info.id); + return {}; + } + + for (const auto & col : table_info.columns) + { + if (col.name == idx_info.idx_cols[0].name) + { + return col; + } + } + + LOG_ERROR( + logger, + "The index column does not exist, table_id={} index_id={} idx_col_name={}.", + table_info.id, + idx_info.id, + idx_info.idx_cols[0].name); + return {}; +} + +LocalIndexInfosPtr initLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger) +{ + LocalIndexInfosPtr index_infos = std::make_shared(); + index_infos->reserve(table_info.columns.size() + table_info.index_infos.size()); + for (const auto & col : table_info.columns) + { + if (col.vector_index && isVectorIndexSupported(logger)) + { + index_infos->emplace_back(LocalIndexInfo{ + .type = IndexType::Vector, + .column_id = col.id, + .column_name = col.name, + .index_definition = col.vector_index, + }); + LOG_INFO(logger, "Add a new index by column comments, column_id={}, table_id={}.", col.id, table_info.id); + } + } + + for (const auto & idx : table_info.index_infos) + { + auto column = getVectorIndxColumnInfo(table_info, idx, logger); + // column.id <= 0 means we don't get the valid column ID. + if (column.id <= DB::EmptyColumnID) + { + LOG_ERROR( + Logger::get(), + "The current storage format is {}, which does not support building vector index. TiFlash will " + "write data without vector index.", + STORAGE_FORMAT_CURRENT.identifier); + return {}; + } + + LOG_INFO(logger, "Add a new index, index_id={}, table_id={}.", idx.id, table_info.id); + index_infos->emplace_back(LocalIndexInfo{ + .type = IndexType::Vector, + .index_id = idx.id, + .column_id = column.id, + .column_name = column.name, + .index_definition = idx.vector_index, + }); + } + + index_infos->shrink_to_fit(); + return index_infos; +} + +LocalIndexInfosPtr generateLocalIndexInfos( + const LocalIndexInfosPtr & existing_indexes, + const TiDB::TableInfo & new_table_info, + const LoggerPtr & logger) +{ + LocalIndexInfosPtr new_index_infos = std::make_shared>(); + // The first time generate index infos. + if (!existing_indexes) + { + auto index_infos = initLocalIndexInfos(new_table_info, logger); + if (!index_infos || index_infos->empty()) + return nullptr; + new_index_infos = std::move(index_infos); + return new_index_infos; + } + + new_index_infos->insert(new_index_infos->cend(), existing_indexes->begin(), existing_indexes->end()); + + std::unordered_map original_local_index_id_map; + for (size_t index = 0; index < new_index_infos->size(); ++index) + { + original_local_index_id_map[new_index_infos->at(index).index_id] = index; + } + + bool any_new_index_created = false; + bool any_index_removed = false; + for (const auto & idx : new_table_info.index_infos) + { + if (!idx.vector_index) + continue; + + auto iter = original_local_index_id_map.find(idx.id); + if (iter == original_local_index_id_map.end()) + { + if (idx.state == TiDB::StatePublic || idx.state == TiDB::StateWriteReorganization) + { + // create a new index + auto column = getVectorIndxColumnInfo(new_table_info, idx, logger); + LocalIndexInfo index_info{ + .type = IndexType::Vector, + .index_id = idx.id, + .column_id = column.id, + .column_name = column.name, + .index_definition = idx.vector_index, + }; + new_index_infos->emplace_back(std::move(index_info)); + any_new_index_created = true; + LOG_INFO(logger, "Add a new index, index_id={}, table_id={}.", idx.id, new_table_info.id); + } + } + else + { + if (idx.state == TiDB::StateDeleteReorganization) + continue; + // remove the existing index + original_local_index_id_map.erase(iter); + } + } + + // drop nonexistent indices + for (auto & iter : original_local_index_id_map) + { + // It means this index is create by column comments which we don't support drop index. + if (iter.first == DB::EmptyIndexID) + continue; + new_index_infos->erase(new_index_infos->begin() + iter.second); + any_index_removed = true; + LOG_INFO(logger, "Drop a index, index_id={}, table_id={}.", iter.first, new_table_info.id); + } + + if (!any_new_index_created && !any_index_removed) + return nullptr; + return new_index_infos; +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h index 5837c432251..34a5db7d1b6 100644 --- a/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h +++ b/dbms/src/Storages/DeltaMerge/Index/IndexInfo.h @@ -17,25 +17,48 @@ #include #include -namespace DB::DM +namespace TiDB { +struct TableInfo; +struct ColumnInfo; +struct IndexInfo; +} // namespace TiDB +namespace DB +{ +class Logger; +using LoggerPtr = std::shared_ptr; +} // namespace DB +namespace DB::DM +{ enum class IndexType { Vector = 1, }; -struct IndexInfo +struct LocalIndexInfo { IndexType type; - ColumnID column_id; + IndexID index_id = DB::EmptyIndexID; + ColumnID column_id = DB::EmptyColumnID; String column_name; // Now we only support vector index. // In the future, we may support more types of indexes, using std::variant. TiDB::VectorIndexDefinitionPtr index_definition; }; -using IndexInfos = std::vector; -using IndexInfosPtr = std::shared_ptr; +using LocalIndexInfos = std::vector; +using LocalIndexInfosPtr = std::shared_ptr; + +bool isVectorIndexSupported(const LoggerPtr & logger); +LocalIndexInfosPtr initLocalIndexInfos(const TiDB::TableInfo & table_info, const LoggerPtr & logger); +TiDB::ColumnInfo getVectorIndxColumnInfo( + const TiDB::TableInfo & table_info, + const TiDB::IndexInfo & idx_info, + const LoggerPtr & logger); +LocalIndexInfosPtr generateLocalIndexInfos( + const LocalIndexInfosPtr & existing_indexes, + const TiDB::TableInfo & new_table_info, + const LoggerPtr & logger); } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index 3d6bafb3746..1f80e5b3d02 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -11,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#pragma once #include #include diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp index a649b2791e2..277174a5493 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp @@ -79,7 +79,7 @@ class DeltaMergeStoreVectorTest {cdVec()}, {range}, /* num_streams= */ 1, - /* max_version= */ std::numeric_limits::max(), + /* start_ts= */ std::numeric_limits::max(), filter, std::vector{}, 0, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h index 1174a991523..b28c6d91239 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index_utils.h @@ -15,10 +15,13 @@ #pragma once #include +#include #include #include +#include #include #include +#include #include @@ -28,8 +31,8 @@ namespace DB::DM::tests class VectorIndexTestUtils { public: - const ColumnID vec_column_id = 100; - const String vec_column_name = "vec"; + ColumnID vec_column_id = 100; + String vec_column_name = "vec"; /// Create a column with values like [1], [2], [3], ... /// Each value is a VectorFloat32 with exactly one dimension. @@ -62,7 +65,7 @@ class VectorIndexTestUtils return wb.str(); } - ColumnDefine cdVec() + ColumnDefine cdVec() const { // When used in read, no need to assign vector_index. return ColumnDefine(vec_column_id, vec_column_name, ::DB::tests::typeFromString("Array(Float32)")); @@ -73,22 +76,132 @@ class VectorIndexTestUtils return cache->cleanOutdatedCacheEntries(); } - IndexInfosPtr indexInfo( + LocalIndexInfosPtr indexInfo( TiDB::VectorIndexDefinition definition = TiDB::VectorIndexDefinition{ .kind = tipb::VectorIndexKind::HNSW, .dimension = 1, .distance_metric = tipb::VectorDistanceMetric::L2, }) { - const IndexInfos index_infos = IndexInfos{ - IndexInfo{ + const LocalIndexInfos index_infos = LocalIndexInfos{ + LocalIndexInfo{ .type = IndexType::Vector, .column_id = vec_column_id, + .column_name = "", .index_definition = std::make_shared(definition), }, }; - return std::make_shared(index_infos); + return std::make_shared(index_infos); } }; +class DeltaMergeStoreVectorBase : public VectorIndexTestUtils +{ +public: + DeltaMergeStorePtr reload() + { + auto cols = DMTestEnv::getDefaultColumns(); + cols->push_back(cdVec()); + ColumnDefine handle_column_define = (*cols)[0]; + + DeltaMergeStorePtr s = DeltaMergeStore::create( + *db_context, + false, + "test", + "t_100", + NullspaceID, + 100, + /*pk_col_id*/ 0, + true, + *cols, + handle_column_define, + false, + 1, + indexInfo(), + DeltaMergeStore::Settings()); + return s; + } + + void write(size_t num_rows_write) + { + String sequence = fmt::format("[0, {})", num_rows_write); + Block block; + { + block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of vector for test + block.insert(colVecFloat32(sequence, vec_column_name, vec_column_id)); + } + store->write(*db_context, db_context->getSettingsRef(), block); + } + + void writeWithVecData(size_t num_rows_write) + { + String sequence = fmt::format("[0, {})", num_rows_write); + Block block; + { + block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); + // Add a column of vector for test + block.insert(createVecFloat32Column( + {{1.0, 2.0, 3.0}, {0.0, 0.0, 0.0}, {1.0, 2.0, 3.5}}, + vec_column_name, + vec_column_id)); + } + store->write(*db_context, db_context->getSettingsRef(), block); + } + + void read(const RowKeyRange & range, const PushDownFilterPtr & filter, const ColumnWithTypeAndName & out) + { + auto in = store->read( + *db_context, + db_context->getSettingsRef(), + {cdVec()}, + {range}, + /* num_streams= */ 1, + /* start_ts= */ std::numeric_limits::max(), + filter, + std::vector{}, + 0, + TRACING_NAME, + /*keep_order=*/false)[0]; + ASSERT_INPUTSTREAM_COLS_UR( + in, + Strings({vec_column_name}), + createColumns({ + out, + })); + } + + void triggerMergeDelta() const + { + std::vector all_segments; + { + std::shared_lock lock(store->read_write_mutex); + for (const auto & [_, segment] : store->id_to_segment) + all_segments.push_back(segment); + } + auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef()); + for (const auto & segment : all_segments) + ASSERT_TRUE( + store->segmentMergeDelta(*dm_context, segment, DeltaMergeStore::MergeDeltaReason::Manual) != nullptr); + } + + void waitStableIndexReady() const + { + std::vector all_segments; + { + std::shared_lock lock(store->read_write_mutex); + for (const auto & [_, segment] : store->id_to_segment) + all_segments.push_back(segment); + } + for (const auto & segment : all_segments) + ASSERT_TRUE(store->segmentWaitStableIndexReady(segment)); + } + + ContextPtr db_context; + DeltaMergeStorePtr store; + +protected: + constexpr static const char * TRACING_NAME = "DeltaMergeStoreVectorTest"; +}; + } // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp new file mode 100644 index 00000000000..1f767c45ac6 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_local_index_info.cpp @@ -0,0 +1,290 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include + +namespace DB::tests +{ + +TEST(LocalIndexInfo, CheckIndexChanged) +try +{ + TiDB::TableInfo table_info; + { + TiDB::ColumnInfo column_info; + column_info.name = "vec"; + column_info.id = 100; + table_info.columns.emplace_back(column_info); + } + + auto logger = Logger::get(); + DM::LocalIndexInfosPtr index_info = nullptr; + // check the same + { + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + ASSERT_EQ(new_index_info, nullptr); + // check again, nothing changed, return nullptr + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + + // update + index_info = new_index_info; + } + + // Add a vector index to the TableInfo. + TiDB::IndexColumnInfo default_index_col_info; + default_index_col_info.name = "vec"; + default_index_col_info.length = -1; + default_index_col_info.offset = 0; + TiDB::IndexInfo expect_idx; + { + expect_idx.id = 1; + expect_idx.idx_cols.emplace_back(default_index_col_info); + expect_idx.vector_index = TiDB::VectorIndexDefinitionPtr(new TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + table_info.index_infos.emplace_back(expect_idx); + } + + // check the different + { + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + ASSERT_NE(new_index_info, nullptr); + ASSERT_EQ(new_index_info->size(), 1); + const auto & idx = (*new_index_info)[0]; + ASSERT_EQ(DM::IndexType::Vector, idx.type); + ASSERT_EQ(expect_idx.id, idx.index_id); + ASSERT_EQ(100, idx.column_id); + ASSERT_NE(nullptr, idx.index_definition); + ASSERT_EQ(expect_idx.vector_index->kind, idx.index_definition->kind); + ASSERT_EQ(expect_idx.vector_index->dimension, idx.index_definition->dimension); + ASSERT_EQ(expect_idx.vector_index->distance_metric, idx.index_definition->distance_metric); + + // check again, nothing changed, return nullptr + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + + // update + index_info = new_index_info; + } + + // Add another vector index to the TableInfo. + TiDB::IndexInfo expect_idx2; + { + expect_idx2.id = 2; // another index_id + expect_idx2.idx_cols.emplace_back(default_index_col_info); + expect_idx2.vector_index = TiDB::VectorIndexDefinitionPtr(new TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 2, + .distance_metric = tipb::VectorDistanceMetric::COSINE, // another distance + }); + table_info.index_infos.emplace_back(expect_idx2); + } + // check the different + { + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + ASSERT_NE(new_index_info, nullptr); + ASSERT_EQ(new_index_info->size(), 2); + const auto & idx0 = (*new_index_info)[0]; + ASSERT_EQ(DM::IndexType::Vector, idx0.type); + ASSERT_EQ(expect_idx.id, idx0.index_id); + ASSERT_EQ(100, idx0.column_id); + ASSERT_NE(nullptr, idx0.index_definition); + ASSERT_EQ(expect_idx.vector_index->kind, idx0.index_definition->kind); + ASSERT_EQ(expect_idx.vector_index->dimension, idx0.index_definition->dimension); + ASSERT_EQ(expect_idx.vector_index->distance_metric, idx0.index_definition->distance_metric); + const auto & idx1 = (*new_index_info)[1]; + ASSERT_EQ(DM::IndexType::Vector, idx1.type); + ASSERT_EQ(expect_idx2.id, idx1.index_id); + ASSERT_EQ(100, idx1.column_id); + ASSERT_NE(nullptr, idx1.index_definition); + ASSERT_EQ(expect_idx2.vector_index->kind, idx1.index_definition->kind); + ASSERT_EQ(expect_idx2.vector_index->dimension, idx1.index_definition->dimension); + ASSERT_EQ(expect_idx2.vector_index->distance_metric, idx1.index_definition->distance_metric); + + // check again, nothing changed, return nullptr + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + + // update + index_info = new_index_info; + } + + // Remove the second vecotr index and add a new vector index to the TableInfo. + TiDB::IndexInfo expect_idx3; + { + // drop the second index + table_info.index_infos.pop_back(); + // add a new index + expect_idx3.id = 3; // another index_id + expect_idx3.idx_cols.emplace_back(default_index_col_info); + expect_idx3.vector_index = TiDB::VectorIndexDefinitionPtr(new TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::COSINE, // another distance + }); + table_info.index_infos.emplace_back(expect_idx3); + } + // check the different + { + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + ASSERT_NE(new_index_info, nullptr); + ASSERT_EQ(new_index_info->size(), 2); + const auto & idx0 = (*new_index_info)[0]; + ASSERT_EQ(DM::IndexType::Vector, idx0.type); + ASSERT_EQ(expect_idx.id, idx0.index_id); + ASSERT_EQ(100, idx0.column_id); + ASSERT_NE(nullptr, idx0.index_definition); + ASSERT_EQ(expect_idx.vector_index->kind, idx0.index_definition->kind); + ASSERT_EQ(expect_idx.vector_index->dimension, idx0.index_definition->dimension); + ASSERT_EQ(expect_idx.vector_index->distance_metric, idx0.index_definition->distance_metric); + const auto & idx1 = (*new_index_info)[1]; + ASSERT_EQ(DM::IndexType::Vector, idx1.type); + ASSERT_EQ(expect_idx3.id, idx1.index_id); + ASSERT_EQ(100, idx1.column_id); + ASSERT_NE(nullptr, idx1.index_definition); + ASSERT_EQ(expect_idx3.vector_index->kind, idx1.index_definition->kind); + ASSERT_EQ(expect_idx3.vector_index->dimension, idx1.index_definition->dimension); + ASSERT_EQ(expect_idx3.vector_index->distance_metric, idx1.index_definition->distance_metric); + + // check again, nothing changed, return nullptr + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + } +} +CATCH + +TEST(LocalIndexInfo, CheckIndexAddWithVecIndexOnColumnInfo) +try +{ + // The serverless branch, vector index may directly defined on the ColumnInfo. + // Create table info with a vector index by column comments. + auto col_vector_index = TiDB::VectorIndexDefinitionPtr(new TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::INNER_PRODUCT, + }); + TiDB::TableInfo table_info; + { + TiDB::ColumnInfo column_info; + column_info.name = "vec"; + column_info.id = 98; + table_info.columns.emplace_back(column_info); + + TiDB::ColumnInfo column_info_v1; + column_info_v1.name = "vec1"; + column_info_v1.id = 99; + column_info_v1.vector_index = col_vector_index; + table_info.columns.emplace_back(column_info_v1); + } + + // Add a vector index by add vector index dirctly. + TiDB::IndexColumnInfo default_index_col_info; + default_index_col_info.name = "vec"; + default_index_col_info.length = -1; + default_index_col_info.offset = 0; + TiDB::IndexInfo expect_idx; + { + expect_idx.id = 1; + expect_idx.idx_cols.emplace_back(default_index_col_info); + expect_idx.vector_index = TiDB::VectorIndexDefinitionPtr(new TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 1, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + table_info.index_infos.emplace_back(expect_idx); + } + + // check the different + auto logger = Logger::get(); + DM::LocalIndexInfosPtr index_info = nullptr; + { + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + ASSERT_NE(new_index_info, nullptr); + ASSERT_EQ(new_index_info->size(), 2); + + const auto & idx0 = (*new_index_info)[0]; + ASSERT_EQ(DM::IndexType::Vector, idx0.type); + ASSERT_EQ(EmptyIndexID, idx0.index_id); + ASSERT_EQ(99, idx0.column_id); + ASSERT_NE(nullptr, idx0.index_definition); + ASSERT_EQ(col_vector_index->kind, idx0.index_definition->kind); + ASSERT_EQ(col_vector_index->dimension, idx0.index_definition->dimension); + ASSERT_EQ(col_vector_index->distance_metric, idx0.index_definition->distance_metric); + + const auto & idx1 = (*new_index_info)[1]; + ASSERT_EQ(DM::IndexType::Vector, idx1.type); + ASSERT_EQ(expect_idx.id, idx1.index_id); + ASSERT_EQ(98, idx1.column_id); + ASSERT_NE(nullptr, idx1.index_definition); + ASSERT_EQ(expect_idx.vector_index->kind, idx1.index_definition->kind); + ASSERT_EQ(expect_idx.vector_index->dimension, idx1.index_definition->dimension); + ASSERT_EQ(expect_idx.vector_index->distance_metric, idx1.index_definition->distance_metric); + // check again, table_info.index_infos doesn't change and return them + DM::LocalIndexInfosPtr empty_index_info = nullptr; + ASSERT_EQ(2, generateLocalIndexInfos(empty_index_info, table_info, logger)->size()); + + // update + index_info = new_index_info; + } + + // Drop the first vector index on column vec1. + table_info.index_infos.erase(table_info.index_infos.begin()); + + // Add another vector index to the TableInfo + TiDB::IndexInfo expect_idx2; + { + expect_idx2.id = 2; // another index_id + expect_idx2.idx_cols.emplace_back(default_index_col_info); + expect_idx2.vector_index = TiDB::VectorIndexDefinitionPtr(new TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 2, + .distance_metric = tipb::VectorDistanceMetric::COSINE, // another distance + }); + table_info.index_infos.emplace_back(expect_idx2); + } + // check the different + { + auto new_index_info = generateLocalIndexInfos(index_info, table_info, logger); + ASSERT_NE(new_index_info, nullptr); + ASSERT_EQ(new_index_info->size(), 2); + + const auto & idx0 = (*new_index_info)[0]; + ASSERT_EQ(DM::IndexType::Vector, idx0.type); + ASSERT_EQ(EmptyIndexID, idx0.index_id); + ASSERT_EQ(99, idx0.column_id); + ASSERT_NE(nullptr, idx0.index_definition); + ASSERT_EQ(col_vector_index->kind, idx0.index_definition->kind); + ASSERT_EQ(col_vector_index->dimension, idx0.index_definition->dimension); + ASSERT_EQ(col_vector_index->distance_metric, idx0.index_definition->distance_metric); + + const auto & idx1 = (*new_index_info)[1]; + ASSERT_EQ(DM::IndexType::Vector, idx1.type); + ASSERT_EQ(expect_idx2.id, idx1.index_id); + ASSERT_EQ(98, idx1.column_id); + ASSERT_NE(nullptr, idx1.index_definition); + ASSERT_EQ(expect_idx2.vector_index->kind, idx1.index_definition->kind); + ASSERT_EQ(expect_idx2.vector_index->dimension, idx1.index_definition->dimension); + ASSERT_EQ(expect_idx2.vector_index->distance_metric, idx1.index_definition->distance_metric); + + // check again, nothing changed, return nullptr + ASSERT_EQ(nullptr, generateLocalIndexInfos(new_index_info, table_info, logger)); + } +} +CATCH + +} // namespace DB::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 3664801fdd0..732607589ec 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -736,7 +736,7 @@ bool SegmentTestBasic::replaceSegmentStableData(PageIdU64 segment_id, const DMFi return success; } -bool SegmentTestBasic::ensureSegmentStableIndex(PageIdU64 segment_id, IndexInfosPtr local_index_infos) +bool SegmentTestBasic::ensureSegmentStableIndex(PageIdU64 segment_id, LocalIndexInfosPtr local_index_infos) { LOG_INFO(logger_op, "EnsureSegmentStableIndex, segment_id={}", segment_id); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 6cde2f65a3c..6bbecb41593 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -104,7 +104,7 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic /** * Returns whether segment stable index is created. */ - bool ensureSegmentStableIndex(PageIdU64 segment_id, IndexInfosPtr local_index_infos); + bool ensureSegmentStableIndex(PageIdU64 segment_id, LocalIndexInfosPtr local_index_infos); Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); Block prepareWriteBlockInSegmentRange( diff --git a/dbms/src/Storages/KVStore/Types.h b/dbms/src/Storages/KVStore/Types.h index db0f1c68b00..be11a1b28be 100644 --- a/dbms/src/Storages/KVStore/Types.h +++ b/dbms/src/Storages/KVStore/Types.h @@ -45,6 +45,18 @@ using KeyspaceDatabaseID = std::pair; using ColumnID = Int64; +enum : ColumnID +{ + EmptyColumnID = 0, +}; + +using IndexID = Int64; + +enum : IndexID +{ + EmptyIndexID = 0, +}; + // Constants for column id, prevent conflict with TiDB. static constexpr ColumnID TiDBPkColumnID = -1; static constexpr ColumnID ExtraTableIDColumnID = -3; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 9a584d454fd..9188e84f4a6 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -50,6 +50,7 @@ #include #include #include +#include #include #include #include @@ -1801,40 +1802,6 @@ SortDescription StorageDeltaMerge::getPrimarySortDescription() const return desc; } -IndexInfosPtr extractLocalIndexInfos(const TiDB::TableInfo & table_info) -{ - IndexInfosPtr index_infos = std::make_shared(); - index_infos->reserve(table_info.columns.size()); - for (const auto & col : table_info.columns) - { - // TODO: support more index type - if (col.vector_index) - { - // Vector Index requires a specific storage format to work. - if ((STORAGE_FORMAT_CURRENT.identifier > 0 && STORAGE_FORMAT_CURRENT.identifier < 6) - || STORAGE_FORMAT_CURRENT.identifier == 100) - { - LOG_ERROR( - Logger::get(), - "The current storage format is {}, which does not support building vector index. TiFlash will " - "write data without vector index.", - STORAGE_FORMAT_CURRENT.identifier); - return {}; - } - - index_infos->emplace_back(IndexInfo{ - .type = IndexType::Vector, - .column_id = col.id, - .column_name = col.name, - .index_definition = col.vector_index, - }); - } - } - - index_infos->shrink_to_fit(); - return index_infos; -} - DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread_pool) { if (storeInited()) @@ -1844,7 +1811,7 @@ DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread std::lock_guard lock(store_mutex); if (_store == nullptr) { - auto index_infos = extractLocalIndexInfos(tidb_table_info); + auto index_infos = initLocalIndexInfos(tidb_table_info, log); _store = DeltaMergeStore::create( global_context, data_path_contains_database_name, diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index fd88b9fbcf2..a66b9de2572 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -308,6 +309,4 @@ class StorageDeltaMerge friend class MockStorage; }; - - } // namespace DB diff --git a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp index bc73dec0f63..54b1a34446a 100644 --- a/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp +++ b/dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp @@ -45,6 +45,7 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam {"column_name", std::make_shared()}, {"column_id", std::make_shared()}, + {"index_id", std::make_shared()}, {"index_kind", std::make_shared()}, {"rows_stable_indexed", std::make_shared()}, // Total rows @@ -120,6 +121,7 @@ BlockInputStreams StorageSystemDTLocalIndexes::read( res_columns[j++]->insert(stat.column_name); res_columns[j++]->insert(stat.column_id); + res_columns[j++]->insert(stat.index_id); res_columns[j++]->insert(stat.index_kind); res_columns[j++]->insert(stat.rows_stable_indexed); diff --git a/dbms/src/TiDB/Schema/SchemaGetter.h b/dbms/src/TiDB/Schema/SchemaGetter.h index 17ec507d9d4..3fb04e5370a 100644 --- a/dbms/src/TiDB/Schema/SchemaGetter.h +++ b/dbms/src/TiDB/Schema/SchemaGetter.h @@ -103,12 +103,13 @@ enum class SchemaActionType : Int8 ActionDropResourceGroup = 70, ActionAlterTablePartitioning = 71, ActionRemovePartitioning = 72, + ActionAddVectorIndex = 73, // If we support new type from TiDB. // MaxRecognizedType also needs to be changed. // It should always be equal to the maximum supported type + 1 - MaxRecognizedType = 73, + MaxRecognizedType = 74, }; struct AffectedOption diff --git a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp index 35f76da50d2..90675aa00c0 100644 --- a/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp +++ b/dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp @@ -22,10 +22,14 @@ #include #include #include +#include +#include +#include #include #include #include #include +#include #include #include #include @@ -36,6 +40,7 @@ #include #include +#include namespace DB { @@ -282,7 +287,6 @@ TEST_F(SchemaSyncTest, PhysicalDropTable) try { auto pd_client = global_ctx.getTMTContext().getPDClient(); - const String db_name = "mock_db"; MockTiDB::instance().newDataBase(db_name); @@ -766,4 +770,133 @@ try } CATCH +TEST_F(SchemaSyncTest, VectorIndex) +try +{ + auto pd_client = global_ctx.getTMTContext().getPDClient(); + + const String db_name = "mock_db"; + MockTiDB::instance().newDataBase(db_name); + + auto cols = ColumnsDescription({ + {"col1", typeFromString("Int64")}, + {"vec", typeFromString("Array(Float32)")}, + }); + + // table_name, cols, pk_name + auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), ""); + refreshSchema(); + + auto vector_index = std::make_shared(TiDB::VectorIndexDefinition{ + .kind = tipb::VectorIndexKind::HNSW, + .dimension = 3, + .distance_metric = tipb::VectorDistanceMetric::L2, + }); + + DM::tests::DeltaMergeStoreVectorBase dmsv; + StorageDeltaMergePtr storage = std::static_pointer_cast(mustGetSyncedTable(t1_id)); + dmsv.store = storage->getStore(); + dmsv.db_context = std::make_shared(global_ctx.getGlobalContext()); + dmsv.vec_column_name = cols.getAllPhysical().back().name; + dmsv.vec_column_id = mustGetSyncedTable(t1_id)->getTableInfo().getColumnID(dmsv.vec_column_name); + const size_t num_rows_write = vector_index->dimension; + // write to store + dmsv.writeWithVecData(num_rows_write); + // trigger mergeDelta for all segments + dmsv.triggerMergeDelta(); + + // add a vector index + IndexID idx_id = 11; + MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index); + + // sync schema, the VectorIndex in TableInfo is not get updated + refreshSchema(); + auto idx_infos = mustGetSyncedTable(t1_id)->getTableInfo().index_infos; + ASSERT_EQ(idx_infos.size(), 0); + + // sync table schema, the VectorIndex in TableInfo should be updated + refreshTableSchema(t1_id); + auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + tbl_info = mustGetSyncedTable(t1_id)->getTableInfo(); + idx_infos = tbl_info.index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + ASSERT_EQ(idx.id, idx_id); + ASSERT_NE(idx.vector_index, nullptr); + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } + + // test read with ANN query after add a vector index + { + // check stable index has built for all segments + dmsv.waitStableIndexReady(); + const auto range = DM::RowKeyRange::newAll(dmsv.store->is_common_handle, dmsv.store->rowkey_column_size); + + // read from store + { + dmsv.read( + range, + DM::EMPTY_FILTER, + createVecFloat32Column( + {{1.0, 2.0, 3.0}, {0.0, 0.0, 0.0}, {1.0, 2.0, 3.5}}, + dmsv.vec_column_name, + dmsv.vec_column_id)); + } + + auto ann_query_info = std::make_shared(); + ann_query_info->set_column_id(dmsv.vec_column_id); + ann_query_info->set_distance_metric(tipb::VectorDistanceMetric::L2); + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(dmsv.encodeVectorFloat32({1.0, 2.0, 3.5})); + + auto filter = std::make_shared(DM::wrapWithANNQueryInfo(nullptr, ann_query_info)); + + dmsv.read(range, filter, createVecFloat32Column({{1.0, 2.0, 3.5}})); + } + + // read with ANN query + { + ann_query_info->set_top_k(1); + ann_query_info->set_ref_vec_f32(dmsv.encodeVectorFloat32({1.0, 2.0, 3.8})); + + auto filter = std::make_shared(DM::wrapWithANNQueryInfo(nullptr, ann_query_info)); + + dmsv.read(range, filter, createVecFloat32Column({{1.0, 2.0, 3.5}})); + } + } + + // drop a vector index + MockTiDB::instance().dropVectorIndexFromTable(db_name, "t1", idx_id); + + // sync schema, the VectorIndex in TableInfo is not get updated + { + refreshSchema(); + idx_infos = mustGetSyncedTable(t1_id)->getTableInfo().index_infos; + ASSERT_EQ(idx_infos.size(), 1); + for (const auto & idx : idx_infos) + { + if (idx.vector_index) + { + ASSERT_EQ(idx.vector_index->kind, vector_index->kind); + ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension); + ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric); + } + } + } + + // sync table schema, the VectorIndex in TableInfo should be updated + { + refreshTableSchema(t1_id); + idx_infos = mustGetSyncedTable(t1_id)->getTableInfo().index_infos; + ASSERT_EQ(idx_infos.size(), 0); + } +} +CATCH + } // namespace DB::tests