Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: remove vector_index in column level #9487

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 30 additions & 22 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Common/ComputeLabelHolder.h>
#include <Common/Exception.h>
#include <Common/ProcessCollector_fwd.h>
#include <Common/TiFlashBuildInfo.h>
Expand Down Expand Up @@ -657,25 +656,6 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
Gauge, \
F(type_send, {{"type", "send_queue"}}), \
F(type_receive, {{"type", "recv_queue"}})) \
M(tiflash_compute_request_unit, \
"Request Unit used by tiflash compute", \
Counter, \
F(type_mpp, \
{{"type", "mpp"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop, \
{{"type", "cop"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop_stream, \
{{"type", "cop_stream"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_batch, \
{{"type", "batch"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()})) \
M(tiflash_shared_block_schemas, \
"statistics about shared block schemas of ColumnFiles", \
Gauge, \
Expand Down Expand Up @@ -856,7 +836,18 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_gac_req_acquire_tokens, {"type", "gac_req_acquire_tokens"}), \
F(type_gac_req_ru_consumption_delta, {"type", "gac_req_ru_consumption_delta"}), \
F(type_gac_resp_tokens, {"type", "gac_resp_tokens"}), \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"})) \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"}), \
F(type_handling_mpp_task_dispatch, {"type", "handling_mpp_task_dispatch"}), \
F(type_handling_mpp_task_establish, {"type", "handling_mpp_task_establish"}), \
F(type_handling_mpp_task_cancel, {"type", "handling_mpp_task_cancel"}), \
F(type_handling_mpp_task_run, {"type", "handling_mpp_task_run"})) \
M(tiflash_compute_request_unit, \
"Request Unit used by tiflash compute for each resource group", \
Counter, \
F(type_mpp, {"type", "mpp"}), \
F(type_cop, {"type", "cop"}), \
F(type_cop_stream, {"type", "cop_stream"}), \
F(type_batch, {"type", "batch"}), ) \
M(tiflash_vector_index_memory_usage, \
"Vector index memory usage", \
Gauge, \
Expand Down Expand Up @@ -884,7 +875,24 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
M(tiflash_read_thread_internal_us, \
"Durations of read thread internal components", \
Histogram, \
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}))
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}), \
F(type_schedule_one_round, {{"type", "schedule_one_round"}}, ExpBuckets{1, 2, 20})) \
M(tiflash_storage_pack_compression_algorithm_count, \
"The count of the compression algorithm used by each data part", \
Counter, \
F(type_constant, {"type", "constant"}), \
F(type_constant_delta, {"type", "constant_delta"}), \
F(type_runlength, {"type", "runlength"}), \
F(type_for, {"type", "for"}), \
F(type_delta_for, {"type", "delta_for"}), \
F(type_lz4, {"type", "lz4"})) \
M(tiflash_storage_pack_compression_bytes, \
"The uncompression/compression bytes of lz4 and lightweight", \
Counter, \
F(type_lz4_compressed_bytes, {"type", "lz4_compressed_bytes"}), \
F(type_lz4_uncompressed_bytes, {"type", "lz4_uncompressed_bytes"}), \
F(type_lightweight_compressed_bytes, {"type", "lightweight_compressed_bytes"}), \
F(type_lightweight_uncompressed_bytes, {"type", "lightweight_uncompressed_bytes"}))


/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
13 changes: 1 addition & 12 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,11 @@ struct ColumnDefine
DataTypePtr type;
Field default_value;

/// Note: ColumnDefine is used in both Write path and Read path.
/// In the read path, vector_index is usually not available. Use AnnQueryInfo for
/// read related vector index information.
TiDB::VectorIndexDefinitionPtr vector_index;

explicit ColumnDefine(
ColId id_ = 0,
String name_ = "",
DataTypePtr type_ = nullptr,
Field default_value_ = Field{},
TiDB::VectorIndexDefinitionPtr vector_index_ = nullptr)
explicit ColumnDefine(ColId id_ = 0, String name_ = "", DataTypePtr type_ = nullptr, Field default_value_ = Field{})
: id(id_)
, name(std::move(name_))
, type(std::move(type_))
, default_value(std::move(default_value_))
, vector_index(vector_index_)
{}
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2022,7 +2022,7 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info)

std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));

// release the lock because `applyLocalIndexChange ` will try to acquire the lock
// release the lock because `applyLocalIndexChange` will try to acquire the lock
// and generate tasks on segments
lock.unlock();

Expand Down
119 changes: 14 additions & 105 deletions dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,6 @@ extern const char force_not_support_vector_index[];
namespace DB::DM
{

struct ComplexIndexID
{
IndexID index_id;
ColumnID column_id;
};

bool operator==(const ComplexIndexID & lhs, const ComplexIndexID & rhs)
{
return lhs.index_id == rhs.index_id && lhs.column_id == rhs.column_id;
}

struct ComplexIndexIDHasher
{
std::size_t operator()(const ComplexIndexID & id) const
{
using boost::hash_combine;
using boost::hash_value;

std::size_t seed = 0;
if (id.index_id > EmptyIndexID)
{
hash_combine(seed, hash_value(0x01));
hash_combine(seed, hash_value(id.index_id));
}
else
{
hash_combine(seed, hash_value(0x02));
hash_combine(seed, hash_value(id.column_id));
}
return seed;
}
};


bool isVectorIndexSupported(const LoggerPtr & logger)
{
// Vector Index requires a specific storage format to work.
Expand Down Expand Up @@ -141,48 +107,21 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
}

// Keep a map of "indexes in existing_indexes" -> "offset in new_index_infos"
std::unordered_map<ComplexIndexID, size_t, ComplexIndexIDHasher> original_local_index_id_map;
std::unordered_map<IndexID, size_t> original_local_index_id_map;
if (existing_indexes)
{
// Create a copy of existing indexes
for (size_t offset = 0; offset < existing_indexes->size(); ++offset)
{
const auto & index = (*existing_indexes)[offset];
original_local_index_id_map.emplace(
ComplexIndexID{.index_id = index.index_id, .column_id = index.column_id},
offset);
original_local_index_id_map.emplace(index.index_id, offset);
new_index_infos->emplace_back(index);
}
}

std::unordered_set<ComplexIndexID, ComplexIndexIDHasher> index_ids_in_new_table;
std::vector<ComplexIndexID> newly_added;
std::vector<ComplexIndexID> newly_dropped;

// In the serverless branch, previously we define vector index on TiDB::ColumnInfo
for (const auto & col : new_table_info.columns)
{
if (!col.vector_index)
continue;

// We do the check at the beginning, only assert check under debug mode
// is enough
assert(isVectorIndexSupported(logger));

const ComplexIndexID cindex_id{.index_id = EmptyIndexID, .column_id = col.id};
index_ids_in_new_table.emplace(cindex_id);
// already exist in `existing_indexes`
if (original_local_index_id_map.contains(cindex_id))
continue;
// newly added
new_index_infos->emplace_back(LocalIndexInfo{
.type = IndexType::Vector,
.index_id = EmptyIndexID, // the vector index created on ColumnInfo, use EmptyIndexID as the index_id
.column_id = col.id,
.index_definition = col.vector_index,
});
newly_added.emplace_back(cindex_id);
}
std::unordered_set<IndexID> index_ids_in_new_table;
std::vector<IndexID> newly_added;
std::vector<IndexID> newly_dropped;

for (const auto & idx : new_table_info.index_infos)
{
Expand All @@ -193,9 +132,7 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
if (column_id <= EmptyColumnID)
continue;

const ComplexIndexID cindex_id{.index_id = idx.id, .column_id = column_id};
auto iter = original_local_index_id_map.find(cindex_id);
if (iter == original_local_index_id_map.end())
if (!original_local_index_id_map.contains(idx.id))
{
if (idx.state == TiDB::StatePublic || idx.state == TiDB::StateWriteReorganization)
{
Expand All @@ -206,15 +143,15 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
.column_id = column_id,
.index_definition = idx.vector_index,
});
newly_added.emplace_back(cindex_id);
index_ids_in_new_table.emplace(cindex_id);
newly_added.emplace_back(idx.id);
index_ids_in_new_table.emplace(idx.id);
}
// else the index is not public or write reorg, consider this index as not exist
}
else
{
if (idx.state != TiDB::StateDeleteReorganization)
index_ids_in_new_table.emplace(cindex_id);
index_ids_in_new_table.emplace(idx.id);
// else exist in both `existing_indexes` and `new_table_info`, but enter "delete reorg". We consider this
// index as not exist in the `new_table_info` and drop it later
}
Expand Down Expand Up @@ -244,12 +181,7 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
buf.joinStr(
original_local_index_id_map.begin(),
original_local_index_id_map.end(),
[](const auto & id, FmtBuffer & fb) {
if (id.first.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.first.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.first.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id.first); },
",");
buf.append("]");
return buf.toString();
Expand All @@ -266,51 +198,28 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
buf.joinStr(
original_local_index_id_map.begin(),
original_local_index_id_map.end(),
[](const auto & id, FmtBuffer & fb) {
if (id.first.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.first.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.first.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id.first); },
",");
buf.append("] added=[");
buf.joinStr(
newly_added.begin(),
newly_added.end(),
[](const ComplexIndexID & id, FmtBuffer & fb) {
if (id.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id); },
",");
buf.append("] dropped=[");
buf.joinStr(
newly_dropped.begin(),
newly_dropped.end(),
[](const ComplexIndexID & id, FmtBuffer & fb) {
if (id.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id); },
",");
buf.append("]");
return buf.toString();
};
LOG_INFO(logger, "Local index info generated, {}", get_changed_logging());

// only return the newly dropped index with index_id > EmptyIndexID
std::vector<IndexID> dropped_indexes;
for (const auto & i : newly_dropped)
{
if (i.index_id <= EmptyIndexID)
continue;
dropped_indexes.emplace_back(i.index_id);
}
return LocalIndexInfosChangeset{
.new_local_index_infos = new_index_infos,
.dropped_indexes = std::move(dropped_indexes),
.dropped_indexes = std::move(newly_dropped),
};
}

Expand Down
Loading