Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fix adding vector index on empty table may blocked forever #9470

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dbms/src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ void InterpreterSelectQuery::getAndLockStorageWithSchemaVersion(const String & d
|| (managed_storage->engineType() != ::TiDB::StorageEngine::DT
&& managed_storage->engineType() != ::TiDB::StorageEngine::TMT))
{
LOG_DEBUG(log, "{}.{} is not ManageableStorage", database_name, table_name);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running queries on the system table will trigger this logging, which is verbose but not provide any information. Remove it.

storage = storage_tmp;
table_lock = storage->lockForShare(context.getCurrentQueryId());
return;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,10 @@ class DeltaMergeStore

StoreStats getStoreStats();
SegmentsStats getSegmentsStats();

LocalIndexesStats getLocalIndexStats();
// Generate local index stats for non inited DeltaMergeStore
static std::optional<LocalIndexesStats> genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info);

bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }
Expand Down
18 changes: 18 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,24 @@ SegmentsStats DeltaMergeStore::getSegmentsStats()
return stats;
}

std::optional<LocalIndexesStats> DeltaMergeStore::genLocalIndexStatsByTableInfo(const TiDB::TableInfo & table_info)
{
auto local_index_infos = DM::initLocalIndexInfos(table_info, Logger::get());
if (!local_index_infos)
return std::nullopt;

DM::LocalIndexesStats stats;
for (const auto & index_info : *local_index_infos)
{
DM::LocalIndexStats index_stats;
index_stats.column_id = index_info.column_id;
index_stats.index_id = index_info.index_id;
index_stats.index_kind = "HNSW";
stats.emplace_back(std::move(index_stats));
}
return stats;
}

LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
{
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1394,16 +1394,24 @@ void StorageDeltaMerge::alterSchemaChange(
LOG_DEBUG(log, "Update table_info: {} => {}", tidb_table_info.serialize(), table_info.serialize());

{
std::lock_guard lock(store_mutex); // Avoid concurrent init store and DDL.
// In order to avoid concurrent issue between init store and DDL,
// we must acquire the lock before schema changes is applied.
std::lock_guard lock(store_mutex);
if (storeInited())
{
_store->applySchemaChanges(table_info);
}
else // it seems we will never come into this branch ?
else
{
// If there is no data need to be stored for this table, the _store instance
// is not inited to reduce fragmentation files that may exhaust the inode of
// disk.
// Under this case, we update some in-memory variables to ensure the correctness.
updateTableColumnInfo();
}
}

// Should generate new decoding snapshot and cache block
decoding_schema_changed = true;

SortDescription pk_desc = getPrimarySortDescription();
Expand Down
21 changes: 14 additions & 7 deletions dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,28 @@ class StorageDeltaMerge
void checkStatus(const Context & context) override;
void deleteRows(const Context &, size_t rows) override;

bool isCommonHandle() const override { return is_common_handle; }

size_t getRowKeyColumnSize() const override { return rowkey_column_size; }

DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); }

public:
const DM::DeltaMergeStorePtr & getStore() { return getAndMaybeInitStore(); }

DM::DeltaMergeStorePtr getStoreIfInited() const;

bool isCommonHandle() const override { return is_common_handle; }

size_t getRowKeyColumnSize() const override { return rowkey_column_size; }
bool initStoreIfDataDirExist(ThreadPool * thread_pool) override;

public:
/// decoding methods
std::pair<DB::DecodingStorageSchemaSnapshotConstPtr, BlockUPtr> getSchemaSnapshotAndBlockForDecoding(
const TableStructureLockHolder & table_structure_lock,
bool need_block,
bool with_version_column) override;

void releaseDecodingBlock(Int64 block_decoding_schema_epoch, BlockUPtr block) override;

bool initStoreIfDataDirExist(ThreadPool * thread_pool) override;

DM::DMConfigurationOpt createChecksumConfig() const { return DM::DMChecksumConfig::fromDBContext(global_context); }

#ifndef DBMS_PUBLIC_GTEST
protected:
#endif
Expand Down Expand Up @@ -238,8 +241,12 @@ class StorageDeltaMerge

DataTypePtr getPKTypeImpl() const override;

// Return the DeltaMergeStore instance
// If the instance is not inited, this method will initialize the instance
// and return it.
DM::DeltaMergeStorePtr & getAndMaybeInitStore(ThreadPool * thread_pool = nullptr);
bool storeInited() const { return store_inited.load(std::memory_order_acquire); }

void updateTableColumnInfo();
ColumnsDescription getNewColumnsDescription(const TiDB::TableInfo & table_info);
DM::ColumnDefines getStoreColumnDefines() const override;
Expand Down
28 changes: 20 additions & 8 deletions dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/Index/LocalIndexInfo.h>
#include <Storages/KVStore/TMTStorages.h>
#include <Storages/KVStore/Types.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/System/StorageSystemDTLocalIndexes.h>
#include <TiDB/Schema/SchemaNameMapper.h>
#include <TiDB/Schema/TiDB.h>

namespace DB
{
Expand Down Expand Up @@ -58,6 +61,19 @@ StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & nam
}));
}

std::optional<DM::LocalIndexesStats> getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage)
{
if (dm_storage->isTombstone())
return std::nullopt;

const auto & table_info = dm_storage->getTableInfo();
auto store = dm_storage->getStoreIfInited();
if (!store)
return DM::DeltaMergeStore::genLocalIndexStatsByTableInfo(table_info);

return store->getLocalIndexStats();
}

BlockInputStreams StorageSystemDTLocalIndexes::read(
const Names & column_names,
const SelectQueryInfo &,
Expand Down Expand Up @@ -90,16 +106,12 @@ BlockInputStreams StorageSystemDTLocalIndexes::read(

auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
const auto & table_info = dm_storage->getTableInfo();
auto table_id = table_info.id;
auto store = dm_storage->getStoreIfInited();
if (!store)
continue;
const auto table_id = table_info.id;

if (dm_storage->isTombstone())
const auto index_stats = getLocalIndexesStatsFromStorage(dm_storage);
if (!index_stats)
continue;

auto index_stats = store->getLocalIndexStats();
for (auto & stat : index_stats)
for (const auto & stat : *index_stats)
{
size_t j = 0;
res_columns[j++]->insert(database_name);
Expand Down
120 changes: 120 additions & 0 deletions dbms/src/TiDB/Schema/tests/gtest_schema_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ extern const char force_context_path[];
extern const char force_set_num_regions_for_table[];
extern const char random_ddl_fail_when_rename_partitions[];
} // namespace FailPoints

// defined in StorageSystemDTLocalIndexes.cpp
std::optional<DM::LocalIndexesStats> getLocalIndexesStatsFromStorage(const StorageDeltaMergePtr & dm_storage);
} // namespace DB
namespace DB::tests
{
Expand Down Expand Up @@ -904,4 +907,121 @@ try
}
CATCH

TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase1)
try
{
auto pd_client = global_ctx.getTMTContext().getPDClient();

const String db_name = "mock_db";
MockTiDB::instance().newDataBase(db_name);

auto cols = ColumnsDescription({
{"col1", typeFromString("Int64")},
{"vec", typeFromString("Array(Float32)")},
});
auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), "");
refreshSchema();

// The `StorageDeltaMerge` is created but `DeltaMergeStore` is not inited
StorageDeltaMergePtr storage = std::static_pointer_cast<StorageDeltaMerge>(mustGetSyncedTable(t1_id));
{
// The `DeltaMergeStore` is not inited
ASSERT_EQ(nullptr, storage->getStoreIfInited());
auto stats = getLocalIndexesStatsFromStorage(storage);
ASSERT_FALSE(stats.has_value());
}

// add a vector index
IndexID idx_id = 11;
auto vector_index = std::make_shared<const TiDB::VectorIndexDefinition>(TiDB::VectorIndexDefinition{
.kind = tipb::VectorIndexKind::HNSW,
.dimension = 3,
.distance_metric = tipb::VectorDistanceMetric::L2,
});
MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index);

// sync table schema, the VectorIndex in TableInfo should be updated
refreshTableSchema(t1_id);
{
// The `DeltaMergeStore` is not inited
ASSERT_EQ(nullptr, storage->getStoreIfInited());
auto stats = getLocalIndexesStatsFromStorage(storage);
ASSERT_TRUE(stats.has_value());
ASSERT_EQ(stats->size(), 1);
auto & s = (*stats)[0];
ASSERT_EQ(s.index_id, idx_id);
ASSERT_EQ(s.rows_delta_indexed, 0);
ASSERT_EQ(s.rows_delta_not_indexed, 0);
ASSERT_EQ(s.rows_stable_indexed, 0);
ASSERT_EQ(s.rows_stable_not_indexed, 0);
}

auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo();
auto idx_infos = tbl_info.index_infos;
ASSERT_EQ(idx_infos.size(), 1);
for (const auto & idx : idx_infos)
{
ASSERT_EQ(idx.id, idx_id);
ASSERT_NE(idx.vector_index, nullptr);
ASSERT_EQ(idx.vector_index->kind, vector_index->kind);
ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension);
ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric);
}
}
CATCH

TEST_F(SchemaSyncTest, SyncTableWithVectorIndexCase2)
try
{
auto pd_client = global_ctx.getTMTContext().getPDClient();

const String db_name = "mock_db";
MockTiDB::instance().newDataBase(db_name);

// The table is created and vector index is added. After that, the table info is synced to TiFlash
auto cols = ColumnsDescription({
{"col1", typeFromString("Int64")},
{"vec", typeFromString("Array(Float32)")},
});
auto t1_id = MockTiDB::instance().newTable(db_name, "t1", cols, pd_client->getTS(), "");
IndexID idx_id = 11;
auto vector_index = std::make_shared<const TiDB::VectorIndexDefinition>(TiDB::VectorIndexDefinition{
.kind = tipb::VectorIndexKind::HNSW,
.dimension = 3,
.distance_metric = tipb::VectorDistanceMetric::L2,
});
MockTiDB::instance().addVectorIndexToTable(db_name, "t1", idx_id, cols.getAllPhysical().back(), 0, vector_index);

// Synced with mock tidb, and create the StorageDeltaMerge instance
refreshTableSchema(t1_id);
{
// The `DeltaMergeStore` is not inited
StorageDeltaMergePtr storage = std::static_pointer_cast<StorageDeltaMerge>(mustGetSyncedTable(t1_id));
ASSERT_EQ(nullptr, storage->getStoreIfInited());
auto stats = getLocalIndexesStatsFromStorage(storage);
ASSERT_TRUE(stats.has_value());
ASSERT_EQ(stats->size(), 1);
auto & s = (*stats)[0];
ASSERT_EQ(s.index_id, idx_id);
ASSERT_EQ(s.rows_delta_indexed, 0);
ASSERT_EQ(s.rows_delta_not_indexed, 0);
ASSERT_EQ(s.rows_stable_indexed, 0);
ASSERT_EQ(s.rows_stable_not_indexed, 0);
}


auto tbl_info = mustGetSyncedTable(t1_id)->getTableInfo();
auto idx_infos = tbl_info.index_infos;
ASSERT_EQ(idx_infos.size(), 1);
for (const auto & idx : idx_infos)
{
ASSERT_EQ(idx.id, idx_id);
ASSERT_NE(idx.vector_index, nullptr);
ASSERT_EQ(idx.vector_index->kind, vector_index->kind);
ASSERT_EQ(idx.vector_index->dimension, vector_index->dimension);
ASSERT_EQ(idx.vector_index->distance_metric, vector_index->distance_metric);
}
}
CATCH

} // namespace DB::tests
9 changes: 9 additions & 0 deletions tests/fullstack-test2/vector/vector-index.test
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@
# Preparation.
mysql> drop table if exists test.t;

# Build vector index on empty table, it should return quickly
mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL);
mysql> alter table test.t set tiflash replica 1;
func> wait_table test t
mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_l2 ((VEC_L2_DISTANCE(v))) USING HNSW;
mysql> ALTER TABLE test.t ADD VECTOR INDEX idx_v_cos ((VEC_COSINE_DISTANCE(v))) USING HNSW;
mysql> drop table if exists test.t;

# Build vector index on table with data on the stable layer
mysql> CREATE TABLE test.t (`v` vector(5) DEFAULT NULL);
mysql> INSERT INTO test.t VALUES ('[8.7, 5.7, 7.7, 9.8, 1.5]'),('[3.6, 9.7, 2.4, 6.6, 4.9]'),('[4.7, 4.9, 2.6, 5.2, 7.4]'),('[7.7, 6.7, 8.3, 7.8, 5.7]'),('[1.4, 4.5, 8.5, 7.7, 6.2]');
mysql> alter table test.t set tiflash replica 1;
Expand Down