Skip to content

Commit

Permalink
storage: remove vector_index in column level (#9487)
Browse files Browse the repository at this point in the history
storage: remove vector_index in column level
  • Loading branch information
Lloyd-Pottiger authored Sep 30, 2024
1 parent ae57df9 commit 76e729e
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 400 deletions.
52 changes: 30 additions & 22 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Common/ComputeLabelHolder.h>
#include <Common/Exception.h>
#include <Common/ProcessCollector_fwd.h>
#include <Common/TiFlashBuildInfo.h>
Expand Down Expand Up @@ -657,25 +656,6 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
Gauge, \
F(type_send, {{"type", "send_queue"}}), \
F(type_receive, {{"type", "recv_queue"}})) \
M(tiflash_compute_request_unit, \
"Request Unit used by tiflash compute", \
Counter, \
F(type_mpp, \
{{"type", "mpp"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop, \
{{"type", "cop"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop_stream, \
{{"type", "cop_stream"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_batch, \
{{"type", "batch"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()})) \
M(tiflash_shared_block_schemas, \
"statistics about shared block schemas of ColumnFiles", \
Gauge, \
Expand Down Expand Up @@ -856,7 +836,18 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_gac_req_acquire_tokens, {"type", "gac_req_acquire_tokens"}), \
F(type_gac_req_ru_consumption_delta, {"type", "gac_req_ru_consumption_delta"}), \
F(type_gac_resp_tokens, {"type", "gac_resp_tokens"}), \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"})) \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"}), \
F(type_handling_mpp_task_dispatch, {"type", "handling_mpp_task_dispatch"}), \
F(type_handling_mpp_task_establish, {"type", "handling_mpp_task_establish"}), \
F(type_handling_mpp_task_cancel, {"type", "handling_mpp_task_cancel"}), \
F(type_handling_mpp_task_run, {"type", "handling_mpp_task_run"})) \
M(tiflash_compute_request_unit, \
"Request Unit used by tiflash compute for each resource group", \
Counter, \
F(type_mpp, {"type", "mpp"}), \
F(type_cop, {"type", "cop"}), \
F(type_cop_stream, {"type", "cop_stream"}), \
F(type_batch, {"type", "batch"}), ) \
M(tiflash_vector_index_memory_usage, \
"Vector index memory usage", \
Gauge, \
Expand Down Expand Up @@ -884,7 +875,24 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
M(tiflash_read_thread_internal_us, \
"Durations of read thread internal components", \
Histogram, \
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}))
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}), \
F(type_schedule_one_round, {{"type", "schedule_one_round"}}, ExpBuckets{1, 2, 20})) \
M(tiflash_storage_pack_compression_algorithm_count, \
"The count of the compression algorithm used by each data part", \
Counter, \
F(type_constant, {"type", "constant"}), \
F(type_constant_delta, {"type", "constant_delta"}), \
F(type_runlength, {"type", "runlength"}), \
F(type_for, {"type", "for"}), \
F(type_delta_for, {"type", "delta_for"}), \
F(type_lz4, {"type", "lz4"})) \
M(tiflash_storage_pack_compression_bytes, \
"The uncompression/compression bytes of lz4 and lightweight", \
Counter, \
F(type_lz4_compressed_bytes, {"type", "lz4_compressed_bytes"}), \
F(type_lz4_uncompressed_bytes, {"type", "lz4_uncompressed_bytes"}), \
F(type_lightweight_compressed_bytes, {"type", "lightweight_compressed_bytes"}), \
F(type_lightweight_uncompressed_bytes, {"type", "lightweight_uncompressed_bytes"}))


/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
13 changes: 1 addition & 12 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,11 @@ struct ColumnDefine
DataTypePtr type;
Field default_value;

/// Note: ColumnDefine is used in both Write path and Read path.
/// In the read path, vector_index is usually not available. Use AnnQueryInfo for
/// read related vector index information.
TiDB::VectorIndexDefinitionPtr vector_index;

explicit ColumnDefine(
ColId id_ = 0,
String name_ = "",
DataTypePtr type_ = nullptr,
Field default_value_ = Field{},
TiDB::VectorIndexDefinitionPtr vector_index_ = nullptr)
explicit ColumnDefine(ColId id_ = 0, String name_ = "", DataTypePtr type_ = nullptr, Field default_value_ = Field{})
: id(id_)
, name(std::move(name_))
, type(std::move(type_))
, default_value(std::move(default_value_))
, vector_index(vector_index_)
{}
};

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2022,7 +2022,7 @@ void DeltaMergeStore::applySchemaChanges(TiDB::TableInfo & table_info)

std::atomic_store(&original_table_header, std::make_shared<Block>(toEmptyBlock(original_table_columns)));

// release the lock because `applyLocalIndexChange ` will try to acquire the lock
// release the lock because `applyLocalIndexChange` will try to acquire the lock
// and generate tasks on segments
lock.unlock();

Expand Down
119 changes: 14 additions & 105 deletions dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,6 @@ extern const char force_not_support_vector_index[];
namespace DB::DM
{

struct ComplexIndexID
{
IndexID index_id;
ColumnID column_id;
};

bool operator==(const ComplexIndexID & lhs, const ComplexIndexID & rhs)
{
return lhs.index_id == rhs.index_id && lhs.column_id == rhs.column_id;
}

struct ComplexIndexIDHasher
{
std::size_t operator()(const ComplexIndexID & id) const
{
using boost::hash_combine;
using boost::hash_value;

std::size_t seed = 0;
if (id.index_id > EmptyIndexID)
{
hash_combine(seed, hash_value(0x01));
hash_combine(seed, hash_value(id.index_id));
}
else
{
hash_combine(seed, hash_value(0x02));
hash_combine(seed, hash_value(id.column_id));
}
return seed;
}
};


bool isVectorIndexSupported(const LoggerPtr & logger)
{
// Vector Index requires a specific storage format to work.
Expand Down Expand Up @@ -141,48 +107,21 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
}

// Keep a map of "indexes in existing_indexes" -> "offset in new_index_infos"
std::unordered_map<ComplexIndexID, size_t, ComplexIndexIDHasher> original_local_index_id_map;
std::unordered_map<IndexID, size_t> original_local_index_id_map;
if (existing_indexes)
{
// Create a copy of existing indexes
for (size_t offset = 0; offset < existing_indexes->size(); ++offset)
{
const auto & index = (*existing_indexes)[offset];
original_local_index_id_map.emplace(
ComplexIndexID{.index_id = index.index_id, .column_id = index.column_id},
offset);
original_local_index_id_map.emplace(index.index_id, offset);
new_index_infos->emplace_back(index);
}
}

std::unordered_set<ComplexIndexID, ComplexIndexIDHasher> index_ids_in_new_table;
std::vector<ComplexIndexID> newly_added;
std::vector<ComplexIndexID> newly_dropped;

// In the serverless branch, previously we define vector index on TiDB::ColumnInfo
for (const auto & col : new_table_info.columns)
{
if (!col.vector_index)
continue;

// We do the check at the beginning, only assert check under debug mode
// is enough
assert(isVectorIndexSupported(logger));

const ComplexIndexID cindex_id{.index_id = EmptyIndexID, .column_id = col.id};
index_ids_in_new_table.emplace(cindex_id);
// already exist in `existing_indexes`
if (original_local_index_id_map.contains(cindex_id))
continue;
// newly added
new_index_infos->emplace_back(LocalIndexInfo{
.type = IndexType::Vector,
.index_id = EmptyIndexID, // the vector index created on ColumnInfo, use EmptyIndexID as the index_id
.column_id = col.id,
.index_definition = col.vector_index,
});
newly_added.emplace_back(cindex_id);
}
std::unordered_set<IndexID> index_ids_in_new_table;
std::vector<IndexID> newly_added;
std::vector<IndexID> newly_dropped;

for (const auto & idx : new_table_info.index_infos)
{
Expand All @@ -193,9 +132,7 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
if (column_id <= EmptyColumnID)
continue;

const ComplexIndexID cindex_id{.index_id = idx.id, .column_id = column_id};
auto iter = original_local_index_id_map.find(cindex_id);
if (iter == original_local_index_id_map.end())
if (!original_local_index_id_map.contains(idx.id))
{
if (idx.state == TiDB::StatePublic || idx.state == TiDB::StateWriteReorganization)
{
Expand All @@ -206,15 +143,15 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
.column_id = column_id,
.index_definition = idx.vector_index,
});
newly_added.emplace_back(cindex_id);
index_ids_in_new_table.emplace(cindex_id);
newly_added.emplace_back(idx.id);
index_ids_in_new_table.emplace(idx.id);
}
// else the index is not public or write reorg, consider this index as not exist
}
else
{
if (idx.state != TiDB::StateDeleteReorganization)
index_ids_in_new_table.emplace(cindex_id);
index_ids_in_new_table.emplace(idx.id);
// else exist in both `existing_indexes` and `new_table_info`, but enter "delete reorg". We consider this
// index as not exist in the `new_table_info` and drop it later
}
Expand Down Expand Up @@ -244,12 +181,7 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
buf.joinStr(
original_local_index_id_map.begin(),
original_local_index_id_map.end(),
[](const auto & id, FmtBuffer & fb) {
if (id.first.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.first.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.first.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id.first); },
",");
buf.append("]");
return buf.toString();
Expand All @@ -266,51 +198,28 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
buf.joinStr(
original_local_index_id_map.begin(),
original_local_index_id_map.end(),
[](const auto & id, FmtBuffer & fb) {
if (id.first.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.first.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.first.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id.first); },
",");
buf.append("] added=[");
buf.joinStr(
newly_added.begin(),
newly_added.end(),
[](const ComplexIndexID & id, FmtBuffer & fb) {
if (id.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id); },
",");
buf.append("] dropped=[");
buf.joinStr(
newly_dropped.begin(),
newly_dropped.end(),
[](const ComplexIndexID & id, FmtBuffer & fb) {
if (id.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id); },
",");
buf.append("]");
return buf.toString();
};
LOG_INFO(logger, "Local index info generated, {}", get_changed_logging());

// only return the newly dropped index with index_id > EmptyIndexID
std::vector<IndexID> dropped_indexes;
for (const auto & i : newly_dropped)
{
if (i.index_id <= EmptyIndexID)
continue;
dropped_indexes.emplace_back(i.index_id);
}
return LocalIndexInfosChangeset{
.new_local_index_infos = new_index_infos,
.dropped_indexes = std::move(dropped_indexes),
.dropped_indexes = std::move(newly_dropped),
};
}

Expand Down
Loading

0 comments on commit 76e729e

Please sign in to comment.