Skip to content

Commit

Permalink
ddl: Fix adding vector index on empty table may blocked forever (#9470)
Browse files Browse the repository at this point in the history
ref #9032
  • Loading branch information
JaySon-Huang authored Sep 29, 2024
1 parent bf3d4d3 commit cd204f5
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 18 deletions.
1 change: 0 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
|| (managed_storage->engineType() != ::TiDB::StorageEngine::DT
&& managed_storage->engineType() != ::TiDB::StorageEngine::TMT))
{
LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name);
storage = storage_tmp;
table_lock = storage->lockForShare(context.getCurrentQueryId());
return;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,10 @@ class DeltaMergeStore

StoreStats getStoreStats();
SegmentsStats getSegmentsStats();

LocalIndexesStats getLocalIndexStats();
// Generate local index stats for non inited DeltaMergeStore
static std::optional<LocalIndexesStats> genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info);

bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }
Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,24 @@ SegmentsStats DeltaMergeStore::getSegmentsStats()
return stats;
}

std::optional<LocalIndexesStats> DeltaMergeStore::genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info)
{
auto local_index_infos = DM::initLocalIndexInfos(table_info, Logger::get());
if (!local_index_infos)
return std::nullopt;

DM::LocalIndexesStats stats;
for (const auto & index_info : *local_index_infos)
{
DM::LocalIndexStats index_stats;
index_stats.column_id = index_info.column_id;
index_stats.index_id = index_info.index_id;
index_stats.index_kind = "HNSW";
stats.emplace_back(std::move(index_stats));
}
return stats;
}

LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
{
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,16 +1394,24 @@ void StorageDeltaMerge::alterSchemaChange(
LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize());

{
std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL.
// In order to avoid concurrent issue between init store and DDL,
// we must acquire the lock before schema changes is applied.
std::lock_guard lock(store_mutex);
if (storeInited())
{
_store->applySchemaChanges(table_info);
}
else // it seems we will never come into this branch ?
else
{
// If there is no data need to be stored for this table, the _store instance
// is not inited to reduce fragmentation files that may exhaust the inode of
// disk.
// Under this case, we update some in-memory variables to ensure the correctness.
updateTableColumnInfo();
}
}

// Should generate new decoding snapshot and cache block
decoding_schema_changed = true;

SortDescription pk_desc = getPrimarySortDescription();
Expand Down
21 changes: 14 additions & 7 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,28 @@ class StorageDeltaMerge
void checkStatus(const Context & context) override;
void deleteRows(const Context &, size_t rows) override;

bool isCommonHandle() const override { return is_common_handle; }

size_t getRowKeyColumnSize() const override { return rowkey_column_size; }

DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); }

public:
const DM::DeltaMergeStorePtr & getStore() { return getAndMaybeInitStore(); }

DM::DeltaMergeStorePtr getStoreIfInited() const;

bool isCommonHandle() const override { return is_common_handle; }

size_t getRowKeyColumnSize() const override { return rowkey_column_size; }
bool initStoreIfDataDirExist(ThreadPool * thread_pool) override;

public:
/// decoding methods
std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(
const TableStructureLockHolder & table_structure_lock,
bool need_block,
bool with_version_column) override;

void releaseDecodingBlock(Int64 block_decoding_schema_epoch, BlockUPtr block) override;

bool initStoreIfDataDirExist(ThreadPool * thread_pool) override;

DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); }

#ifndef DBMS_PUBLIC_GTEST
protected:
#endif
Expand Down Expand Up @@ -238,8 +241,12 @@ class StorageDeltaMerge

DataTypePtr getPKTypeImpl() const override;

// Return the DeltaMergeStore instance
// If the instance is not inited, this method will initialize the instance
// and return it.
DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr);
bool storeInited() const { return store_inited.load(std::memory_order_acquire); }

void updateTableColumnInfo();
ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info);
DM::ColumnDefines getStoreColumnDefines() const override;
Expand Down
28 changes: 20 additions & 8 deletions dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/KVStore/TMTStorages.h>
#include <Storages/KVStore/Types.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/System/StorageSystemDTLocalIndexes.h>
#include <TiDB/Schema/SchemaNameMapper.h>
#include <TiDB/Schema/TiDB.h>

namespace DB
{
Expand Down Expand Up @@ -58,6 +61,19 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam
}));
}

std::optional<DM::LocalIndexesStats> getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage)
{
if (dm_storage->isTombstone())
return std::nullopt;

const auto & table_info = dm_storage->getTableInfo();
auto store = dm_storage->getStoreIfInited();
if (!store)
return DM::DeltaMergeStore::genLocalIndexStatsByTableInfo(table_info);

return store->getLocalIndexStats();
}

BlockInputStreams StorageSystemDTLocalIndexes::read(
const Names & column_names,
const SelectQueryInfo &,
Expand Down Expand Up @@ -90,16 +106,12 @@ BlockInputStreams StorageSystemDTLocalIndexes::read(

auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
const auto & table_info = dm_storage->getTableInfo();
auto table_id = table_info.id;
auto store = dm_storage->getStoreIfInited();
if (!store)
continue;
const auto table_id = table_info.id;

if (dm_storage->isTombstone())
const auto index_stats = getLocalIndexesStatsFromStorage(dm_storage);
if (!index_stats)
continue;

auto index_stats = store->getLocalIndexStats();
for (auto & stat : index_stats)
for (const auto & stat : *index_stats)
{
size_t j = 0;
res_columns[j++]->insert(database_name);
Expand Down
120 changes: 120 additions & 0 deletions dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ extern const char force_context_path[];
extern const char force_set_num_regions_for_table[];
extern const char random_ddl_fail_when_rename_partitions[];
} // namespace FailPoints

// defined in StorageSystemDTLocalIndexes.cpp
std::optional<DM::LocalIndexesStats> getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage);
} // namespace DB
namespace DB::tests
{
Expand Down Expand Up @@ -904,4 +907,121 @@ try
}
CATCH

TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase1)
try
{
auto pd_client = global_ctx.getTMTContext().getPDClient();

const String db_name = "mock_db";
MockTiDB::instance().newDataBase(db_name);

auto cols = ColumnsDescription({
{"col1", typeFromString("Int64")},
{"vec", typeFromString("Array(Float32)")},
});
auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), "");
refreshSchema();

// The `StorageDeltaMerge` is created but `DeltaMergeStore` is not inited
StorageDeltaMergePtr storage = std::static_pointer_cast<StorageDeltaMerge>(mustGetSyncedTable(t1_id));
{
// The `DeltaMergeStore` is not inited
ASSERT_EQ(nullptr, storage->getStoreIfInited());
auto stats = getLocalIndexesStatsFromStorage(storage);
ASSERT_FALSE(stats.has_value());
}

// add a vector index
IndexID idx_id = 11;
auto vector_index = std::make_shared<const TiDB::VectorIndexDefinition>(TiDB::VectorIndexDefinition{
.kind = tipb::VectorIndexKind::HNSW,
.dimension = 3,
.distance_metric = tipb::VectorDistanceMetric::L2,
});
MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index);

// sync table schema, the VectorIndex in TableInfo should be updated
refreshTableSchema(t1_id);
{
// The `DeltaMergeStore` is not inited
ASSERT_EQ(nullptr, storage->getStoreIfInited());
auto stats = getLocalIndexesStatsFromStorage(storage);
ASSERT_TRUE(stats.has_value());
ASSERT_EQ(stats->size(), 1);
auto & s = (*stats)[0];
ASSERT_EQ(s.index_id, idx_id);
ASSERT_EQ(s.rows_delta_indexed, 0);
ASSERT_EQ(s.rows_delta_not_indexed, 0);
ASSERT_EQ(s.rows_stable_indexed, 0);
ASSERT_EQ(s.rows_stable_not_indexed, 0);
}

auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo();
auto idx_infos = tbl_info.index_infos;
ASSERT_EQ(idx_infos.size(), 1);
for (const auto & idx : idx_infos)
{
ASSERT_EQ(idx.id, idx_id);
ASSERT_NE(idx.vector_index, nullptr);
ASSERT_EQ(idx.vector_index->kind, vector_index->kind);
ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension);
ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric);
}
}
CATCH

TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase2)
try
{
auto pd_client = global_ctx.getTMTContext().getPDClient();

const String db_name = "mock_db";
MockTiDB::instance().newDataBase(db_name);

// The table is created and vector index is added. After that, the table info is synced to TiFlash
auto cols = ColumnsDescription({
{"col1", typeFromString("Int64")},
{"vec", typeFromString("Array(Float32)")},
});
auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), "");
IndexID idx_id = 11;
auto vector_index = std::make_shared<const TiDB::VectorIndexDefinition>(TiDB::VectorIndexDefinition{
.kind = tipb::VectorIndexKind::HNSW,
.dimension = 3,
.distance_metric = tipb::VectorDistanceMetric::L2,
});
MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index);

// Synced with mock tidb, and create the StorageDeltaMerge instance
refreshTableSchema(t1_id);
{
// The `DeltaMergeStore` is not inited
StorageDeltaMergePtr storage = std::static_pointer_cast<StorageDeltaMerge>(mustGetSyncedTable(t1_id));
ASSERT_EQ(nullptr, storage->getStoreIfInited());
auto stats = getLocalIndexesStatsFromStorage(storage);
ASSERT_TRUE(stats.has_value());
ASSERT_EQ(stats->size(), 1);
auto & s = (*stats)[0];
ASSERT_EQ(s.index_id, idx_id);
ASSERT_EQ(s.rows_delta_indexed, 0);
ASSERT_EQ(s.rows_delta_not_indexed, 0);
ASSERT_EQ(s.rows_stable_indexed, 0);
ASSERT_EQ(s.rows_stable_not_indexed, 0);
}


auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo();
auto idx_infos = tbl_info.index_infos;
ASSERT_EQ(idx_infos.size(), 1);
for (const auto & idx : idx_infos)
{
ASSERT_EQ(idx.id, idx_id);
ASSERT_NE(idx.vector_index, nullptr);
ASSERT_EQ(idx.vector_index->kind, vector_index->kind);
ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension);
ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric);
}
}
CATCH

} // namespace DB::tests
9 changes: 9 additions & 0 deletions tests/fullstack-test2/vector/vector-index.test
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
# Preparation.
mysql> drop table if exists test.t;

# Build vector index on empty table, it should return quickly
mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL);
mysql> alter table test.t set tiflash replica 1;
func> wait_table test t
mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_l2 ((VEC_L2_DISTANCE(v))) USING HNSW;
mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_cos ((VEC_COSINE_DISTANCE(v))) USING HNSW;
mysql> drop table if exists test.t;

# Build vector index on table with data on the stable layer
mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL);
mysql> INSERT INTO test.t VALUES ('[8.7, 5.7, 7.7, 9.8, 1.5]'),('[3.6, 9.7, 2.4, 6.6, 4.9]'),('[4.7, 4.9, 2.6, 5.2, 7.4]'),('[7.7, 6.7, 8.3, 7.8, 5.7]'),('[1.4, 4.5, 8.5, 7.7, 6.2]');
mysql> alter table test.t set tiflash replica 1;
Expand Down

0 comments on commit cd204f5

Please sign in to comment.