Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: Support multiple vec indexes on the same column #9469

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2020,24 +2020,20 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info)
original_table_columns.swap(new_original_table_columns);
store_columns.swap(new_store_columns);

// copy the local_index_infos to check whether any new index is created
LocalIndexInfosPtr local_index_infos_copy = nullptr;
{
std::shared_lock index_read_lock(mtx_local_index_infos);
local_index_infos_copy = std::shared_ptr<LocalIndexInfos>(local_index_infos);
}
// Get a snapshot on the local_index_infos to check whether any new index is created
LocalIndexInfosSnapshot local_index_infos_snap = getLocalIndexInfosSnapshot();

std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));

// release the lock because `checkAllSegmentsLocalIndex` will try to acquire the lock
// and generate tasks on segments
lock.unlock();

auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_copy, table_info, log);
auto new_local_index_infos = generateLocalIndexInfos(local_index_infos_snap, table_info, log);
if (new_local_index_infos)
{
{
// new index created, update the info in-memory
// new index created, update the info in-memory thread safety between `getLocalIndexInfosSnapshot`
std::unique_lock index_write_lock(mtx_local_index_infos);
local_index_infos.swap(new_local_index_infos);
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ struct StoreStats

struct LocalIndexStats
{
String column_name{};
UInt64 column_id{};
UInt64 index_id{};
String index_kind{};
Expand Down Expand Up @@ -864,13 +863,16 @@ class DeltaMergeStore
const SegmentPtr & segment,
const DMFiles & new_dm_files);

// Get a snap of local_index_infos to check whether any new index is created.
LocalIndexInfosPtr getLocalIndexInfosSnapshot() const
// Get a snap of local_index_infos for checking.
// Note that this is just a shallow copy of `local_index_infos`, do not
// modify the local indexes inside the snapshot.
LocalIndexInfosSnapshot getLocalIndexInfosSnapshot() const
{
std::shared_lock index_read_lock(mtx_local_index_infos);
if (!local_index_infos || local_index_infos->empty())
return nullptr;
return std::make_shared<LocalIndexInfos>(*local_index_infos);
// only make a shallow copy on the shared_ptr is OK
return local_index_infos;
}

/**
Expand Down
13 changes: 5 additions & 8 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,6 @@ bool DeltaMergeStore::segmentEnsureStableIndexAsync(const SegmentPtr & segment)
{
RUNTIME_CHECK(segment != nullptr);

// TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return false;
Expand Down Expand Up @@ -569,7 +568,6 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co
{
RUNTIME_CHECK(segment != nullptr);

// TODO(local index): There could be some indexes are built while some indexes is not yet built after DDL
auto local_index_infos_snap = getLocalIndexInfosSnapshot();
if (!local_index_infos_snap)
return true;
Expand Down Expand Up @@ -597,12 +595,11 @@ bool DeltaMergeStore::segmentWaitStableIndexReady(const SegmentPtr & segment) co
bool all_indexes_built = true;
for (const auto & index : *build_info.indexes_to_build)
{
auto col_id = index.column_id;
// The dmfile may be built before col_id is added. Skip build indexes for it
if (!dmfile->isColumnExist(col_id))
continue;

all_indexes_built = all_indexes_built && dmfile->getColumnStat(col_id).index_bytes > 0;
const auto [state, bytes] = dmfile->getLocalIndexState(index.column_id, index.index_id);
UNUSED(bytes);
all_indexes_built = all_indexes_built
// dmfile built before the column_id added or index already built
&& (state == DMFileMeta::LocalIndexState::NoNeed || state == DMFileMeta::LocalIndexState::IndexBuilt);
}
if (all_indexes_built)
return true;
Expand Down
14 changes: 7 additions & 7 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
LocalIndexStats index_stats;
index_stats.column_id = index_info.column_id;
index_stats.index_id = index_info.index_id;
index_stats.column_name = index_info.column_name;
index_stats.index_kind = "HNSW"; // TODO: Support more.

for (const auto & [handle, segment] : segments)
Expand All @@ -221,13 +220,14 @@ LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
bool is_stable_indexed = true;
for (const auto & dmfile : stable->getDMFiles())
{
if (!dmfile->isColumnExist(index_info.column_id))
continue; // Regard as indexed, because column does not need any index

auto column_stat = dmfile->getColumnStat(index_info.column_id);

if (column_stat.index_bytes == 0 && column_stat.data_bytes > 0)
const auto [state, bytes] = dmfile->getLocalIndexState(index_info.column_id, index_info.index_id);
UNUSED(bytes);
switch (state)
{
case DMFileMeta::LocalIndexState::NoNeed: // Regard as indexed, because column does not need any index
case DMFileMeta::LocalIndexState::IndexBuilt:
break;
case DMFileMeta::LocalIndexState::IndexPending:
is_stable_indexed = false;
break;
}
Expand Down
66 changes: 49 additions & 17 deletions dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
#include <IO/WriteHelpers.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/dtpb/dmfile.pb.h>
#include <Storages/KVStore/Types.h>

namespace DB
{
namespace DM
namespace DB::DM
{
struct ColumnStat
{
ColId col_id;
DataTypePtr type;
// The average size of values. A hint for speeding up deserialize.
double avg_size;
// The serialized size of the column data on disk.
// The serialized size of the column data on disk. (including column data and nullmap)
size_t serialized_bytes = 0;

// These members are only useful when using metav2
Expand All @@ -41,7 +40,7 @@ struct ColumnStat
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;

std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;
std::vector<dtpb::VectorIndexFileProps> vector_index;

#ifndef NDEBUG
// This field is only used for testing
Expand All @@ -63,8 +62,11 @@ struct ColumnStat
stat.set_array_sizes_bytes(array_sizes_bytes);
stat.set_array_sizes_mark_bytes(array_sizes_mark_bytes);

if (vector_index.has_value())
stat.mutable_vector_index()->CopyFrom(vector_index.value());
for (const auto & vec_idx : vector_index)
{
auto * pb_idx = stat.add_vector_indexes();
pb_idx->CopyFrom(vec_idx);
}

#ifndef NDEBUG
stat.set_additional_data_for_test(additional_data_for_test);
Expand All @@ -88,14 +90,29 @@ struct ColumnStat
array_sizes_mark_bytes = proto.array_sizes_mark_bytes();

if (proto.has_vector_index())
vector_index = proto.vector_index();
{
// For backward compatibility, loaded `vector_index` into `vector_indexes`
// with index_id == EmptyIndexID
vector_index.emplace_back(proto.vector_index());
auto & idx = vector_index.back();
idx.set_index_id(EmptyIndexID);
idx.set_index_bytes(index_bytes);
}
vector_index.reserve(vector_index.size() + proto.vector_indexes_size());
for (const auto & pb_idx : proto.vector_indexes())
{
vector_index.emplace_back(pb_idx);
}

#ifndef NDEBUG
additional_data_for_test = proto.additional_data_for_test();
#endif
}

// @deprecated. New fields should be added via protobuf. Use `toProto` instead
void serializeToBuffer(WriteBuffer & buf) const
// New fields should be added via protobuf. Use `toProto` instead
[[deprecated("Use ColumnStat::toProto instead")]] //
void
serializeToBuffer(WriteBuffer & buf) const
{
writeIntBinary(col_id, buf);
writeStringBinary(type->getName(), buf);
Expand All @@ -108,8 +125,10 @@ struct ColumnStat
writeIntBinary(index_bytes, buf);
}

// @deprecated. This only presents for reading with old data. Use `mergeFromProto` instead
void parseFromBuffer(ReadBuffer & buf)
// This only presents for reading with old data. Use `mergeFromProto` instead
[[deprecated("Use ColumnStat::mergeFromProto instead")]] //
void
parseFromBuffer(ReadBuffer & buf)
{
readIntBinary(col_id, buf);
String type_name;
Expand All @@ -127,7 +146,9 @@ struct ColumnStat

using ColumnStats = std::unordered_map<ColId, ColumnStat>;

inline void readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadBuffer & buf)
[[deprecated("Used by DMFileMeta v1. Use ColumnStat::mergeFromProto instead")]] //
inline void
readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadBuffer & buf)
{
DataTypeFactory & data_type_factory = DataTypeFactory::instance();

Expand Down Expand Up @@ -155,11 +176,23 @@ inline void readText(ColumnStats & column_sats, DMFileFormat::Version ver, ReadB
DB::assertChar('\n', buf);

auto type = data_type_factory.getOrSet(type_name);
column_sats.emplace(id, ColumnStat{id, type, avg_size, serialized_bytes});
column_sats.emplace(
id,
ColumnStat{
.col_id = id,
.type = type,
.avg_size = avg_size,
.serialized_bytes = serialized_bytes,
// ... here ignore some fields with default initializers
.vector_index = {},
.additional_data_for_test = {},
});
}
}

inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver, WriteBuffer & buf)
[[deprecated("Used by DMFileMeta v1. Use ColumnStat::toProto instead")]] //
inline void
writeText(const ColumnStats & column_sats, DMFileFormat::Version ver, WriteBuffer & buf)
{
DB::writeString("Columns: ", buf);
DB::writeText(column_sats.size(), buf);
Expand All @@ -182,5 +215,4 @@ inline void writeText(const ColumnStats & column_sats, DMFileFormat::Version ver
}
}

} // namespace DM
} // namespace DB
} // namespace DB::DM
30 changes: 29 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

#pragma once

#include <DataTypes/IDataType.h>
#include <IO/WriteHelpers.h>
#include <Poco/File.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFileMetaV2.h>
#include <Storages/DeltaMerge/File/DMFileUtil.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/FormatVersion.h>
Expand Down Expand Up @@ -144,6 +147,29 @@ class DMFile : private boost::noncopyable
}
bool isColumnExist(ColId col_id) const { return meta->column_stats.contains(col_id); }

std::tuple<DMFileMeta::LocalIndexState, size_t> getLocalIndexState(ColId col_id, IndexID index_id) const
{
return meta->getLocalIndexState(col_id, index_id);
}

// Check whether the local index of given col_id and index_id has been built on this dmfile.
// Return false if
// - the col_id is not exist in the dmfile
// - the index has not been built
bool isLocalIndexExist(ColId col_id, IndexID index_id) const
{
return std::get<0>(meta->getLocalIndexState(col_id, index_id)) == DMFileMeta::LocalIndexState::IndexBuilt;
}

// Try to get the local index of given col_id and index_id.
// Return std::nullopt if
// - the col_id is not exist in the dmfile
// - the index has not been built
std::optional<dtpb::VectorIndexFileProps> getLocalIndex(ColId col_id, IndexID index_id) const
{
return meta->getLocalIndex(col_id, index_id);
}

/*
* TODO: This function is currently unused. We could use it when:
* 1. The content is polished (e.g. including at least file ID, and use a format easy for grep).
Expand Down Expand Up @@ -183,7 +209,6 @@ class DMFile : private boost::noncopyable
void switchToRemote(const S3::DMFileOID & oid) const;

UInt32 metaVersion() const { return meta->metaVersion(); }
UInt32 bumpMetaVersion() const { return meta->bumpMetaVersion(); }

private:
DMFile(
Expand Down Expand Up @@ -279,6 +304,9 @@ class DMFile : private boost::noncopyable
return IDataType::getFileNameForStream(DB::toString(col_id), substream);
}

static String vectorIndexFileName(IndexID index_id) { return fmt::format("idx_{}.vector", index_id); }
String vectorIndexPath(IndexID index_id) const { return subFilePath(vectorIndexFileName(index_id)); }

void addPack(const DMFileMeta::PackStat & pack_stat) const { meta->pack_stats.push_back(pack_stat); }

DMFileStatus getStatus() const { return meta->status; }
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn
bool is_matching_ann_query = false;
for (const auto & cd : read_columns)
{
// Note that it requires ann_query_info->column_id match
if (cd.id == ann_query_info->column_id())
{
is_matching_ann_query = true;
Expand Down Expand Up @@ -191,8 +192,10 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn

RUNTIME_CHECK(rest_columns.size() + 1 == read_columns.size(), rest_columns.size(), read_columns.size());

const auto & vec_column_stat = dmfile->getColumnStat(vec_column->id);
if (vec_column_stat.index_bytes == 0 || !vec_column_stat.vector_index.has_value())
const IndexID ann_query_info_index_id = ann_query_info->index_id() > 0 //
? ann_query_info->index_id()
: EmptyIndexID;
if (!dmfile->isLocalIndexExist(vec_column->id, ann_query_info_index_id))
// Vector index is defined but does not exist on the data file,
// or there is no data at all
return fallback();
Expand Down
Loading