Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Sep 30, 2024
1 parent c48f6bd commit e7cccaa
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 96 deletions.
52 changes: 30 additions & 22 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <Common/ComputeLabelHolder.h>
#include <Common/Exception.h>
#include <Common/ProcessCollector_fwd.h>
#include <Common/TiFlashBuildInfo.h>
Expand Down Expand Up @@ -657,25 +656,6 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
Gauge, \
F(type_send, {{"type", "send_queue"}}), \
F(type_receive, {{"type", "recv_queue"}})) \
M(tiflash_compute_request_unit, \
"Request Unit used by tiflash compute", \
Counter, \
F(type_mpp, \
{{"type", "mpp"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop, \
{{"type", "cop"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_cop_stream, \
{{"type", "cop_stream"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()}), \
F(type_batch, \
{{"type", "batch"}, \
ComputeLabelHolder::instance().getClusterIdLabel(), \
ComputeLabelHolder::instance().getProcessIdLabel()})) \
M(tiflash_shared_block_schemas, \
"statistics about shared block schemas of ColumnFiles", \
Gauge, \
Expand Down Expand Up @@ -856,7 +836,18 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_gac_req_acquire_tokens, {"type", "gac_req_acquire_tokens"}), \
F(type_gac_req_ru_consumption_delta, {"type", "gac_req_ru_consumption_delta"}), \
F(type_gac_resp_tokens, {"type", "gac_resp_tokens"}), \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"})) \
F(type_gac_resp_capacity, {"type", "gac_resp_capacity"}), \
F(type_handling_mpp_task_dispatch, {"type", "handling_mpp_task_dispatch"}), \
F(type_handling_mpp_task_establish, {"type", "handling_mpp_task_establish"}), \
F(type_handling_mpp_task_cancel, {"type", "handling_mpp_task_cancel"}), \
F(type_handling_mpp_task_run, {"type", "handling_mpp_task_run"})) \
M(tiflash_compute_request_unit, \
"Request Unit used by tiflash compute for each resource group", \
Counter, \
F(type_mpp, {"type", "mpp"}), \
F(type_cop, {"type", "cop"}), \
F(type_cop_stream, {"type", "cop_stream"}), \
F(type_batch, {"type", "batch"}), ) \
M(tiflash_vector_index_memory_usage, \
"Vector index memory usage", \
Gauge, \
Expand Down Expand Up @@ -884,7 +875,24 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
M(tiflash_read_thread_internal_us, \
"Durations of read thread internal components", \
Histogram, \
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}))
F(type_block_queue_pop_latency, {{"type", "block_queue_pop_latency"}}, ExpBuckets{1, 2, 20}), \
F(type_schedule_one_round, {{"type", "schedule_one_round"}}, ExpBuckets{1, 2, 20})) \
M(tiflash_storage_pack_compression_algorithm_count, \
"The count of the compression algorithm used by each data part", \
Counter, \
F(type_constant, {"type", "constant"}), \
F(type_constant_delta, {"type", "constant_delta"}), \
F(type_runlength, {"type", "runlength"}), \
F(type_for, {"type", "for"}), \
F(type_delta_for, {"type", "delta_for"}), \
F(type_lz4, {"type", "lz4"})) \
M(tiflash_storage_pack_compression_bytes, \
"The uncompression/compression bytes of lz4 and lightweight", \
Counter, \
F(type_lz4_compressed_bytes, {"type", "lz4_compressed_bytes"}), \
F(type_lz4_uncompressed_bytes, {"type", "lz4_uncompressed_bytes"}), \
F(type_lightweight_compressed_bytes, {"type", "lightweight_compressed_bytes"}), \
F(type_lightweight_uncompressed_bytes, {"type", "lightweight_uncompressed_bytes"}))


/// Buckets with boundaries [start * base^0, start * base^1, ..., start * base^(size-1)]
Expand Down
90 changes: 16 additions & 74 deletions dbms/src/Storages/DeltaMerge/Index/LocalIndexInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,6 @@ extern const char force_not_support_vector_index[];
namespace DB::DM
{

struct ComplexIndexID
{
IndexID index_id;
ColumnID column_id;
};

bool operator==(const ComplexIndexID & lhs, const ComplexIndexID & rhs)
{
return lhs.index_id == rhs.index_id && lhs.column_id == rhs.column_id;
}

struct ComplexIndexIDHasher
{
std::size_t operator()(const ComplexIndexID & id) const
{
using boost::hash_combine;
using boost::hash_value;

std::size_t seed = 0;
if (id.index_id > EmptyIndexID)
{
hash_combine(seed, hash_value(0x01));
hash_combine(seed, hash_value(id.index_id));
}
else
{
hash_combine(seed, hash_value(0x02));
hash_combine(seed, hash_value(id.column_id));
}
return seed;
}
};


bool isVectorIndexSupported(const LoggerPtr & logger)
{
// Vector Index requires a specific storage format to work.
Expand Down Expand Up @@ -141,23 +107,21 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
}

// Keep a map of "indexes in existing_indexes" -> "offset in new_index_infos"
std::unordered_map<ComplexIndexID, size_t, ComplexIndexIDHasher> original_local_index_id_map;
std::unordered_map<IndexID, size_t> original_local_index_id_map;
if (existing_indexes)
{
// Create a copy of existing indexes
for (size_t offset = 0; offset < existing_indexes->size(); ++offset)
{
const auto & index = (*existing_indexes)[offset];
original_local_index_id_map.emplace(
ComplexIndexID{.index_id = index.index_id, .column_id = index.column_id},
offset);
original_local_index_id_map.emplace(index.index_id, offset);
new_index_infos->emplace_back(index);
}
}

std::unordered_set<ComplexIndexID, ComplexIndexIDHasher> index_ids_in_new_table;
std::vector<ComplexIndexID> newly_added;
std::vector<ComplexIndexID> newly_dropped;
std::unordered_set<IndexID> index_ids_in_new_table;
std::vector<IndexID> newly_added;
std::vector<IndexID> newly_dropped;

for (const auto & idx : new_table_info.index_infos)
{
Expand All @@ -168,9 +132,7 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
if (column_id <= EmptyColumnID)
continue;

const ComplexIndexID cindex_id{.index_id = idx.id, .column_id = column_id};
auto iter = original_local_index_id_map.find(cindex_id);
if (iter == original_local_index_id_map.end())
if (!original_local_index_id_map.contains(idx.id))
{
if (idx.state == TiDB::StatePublic || idx.state == TiDB::StateWriteReorganization)
{
Expand All @@ -181,15 +143,15 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
.column_id = column_id,
.index_definition = idx.vector_index,
});
newly_added.emplace_back(cindex_id);
index_ids_in_new_table.emplace(cindex_id);
newly_added.emplace_back(idx.id);
index_ids_in_new_table.emplace(idx.id);
}
// else the index is not public or write reorg, consider this index as not exist
}
else
{
if (idx.state != TiDB::StateDeleteReorganization)
index_ids_in_new_table.emplace(cindex_id);
index_ids_in_new_table.emplace(idx.id);
// else exist in both `existing_indexes` and `new_table_info`, but enter "delete reorg". We consider this
// index as not exist in the `new_table_info` and drop it later
}
Expand Down Expand Up @@ -219,12 +181,7 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
buf.joinStr(
original_local_index_id_map.begin(),
original_local_index_id_map.end(),
[](const auto & id, FmtBuffer & fb) {
if (id.first.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.first.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.first.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id.first); },
",");
buf.append("]");
return buf.toString();
Expand All @@ -241,34 +198,19 @@ LocalIndexInfosChangeset generateLocalIndexInfos(
buf.joinStr(
original_local_index_id_map.begin(),
original_local_index_id_map.end(),
[](const auto & id, FmtBuffer & fb) {
if (id.first.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.first.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.first.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id.first); },
",");
buf.append("] added=[");
buf.joinStr(
newly_added.begin(),
newly_added.end(),
[](const ComplexIndexID & id, FmtBuffer & fb) {
if (id.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id); },
",");
buf.append("] dropped=[");
buf.joinStr(
newly_dropped.begin(),
newly_dropped.end(),
[](const ComplexIndexID & id, FmtBuffer & fb) {
if (id.index_id != EmptyIndexID)
fb.fmtAppend("index_id={}", id.index_id);
else
fb.fmtAppend("index_on_column_id={}", id.column_id);
},
[](const auto & id, FmtBuffer & fb) { fb.fmtAppend("index_id={}", id); },
",");
buf.append("]");
return buf.toString();
Expand All @@ -277,11 +219,11 @@ LocalIndexInfosChangeset generateLocalIndexInfos(

// only return the newly dropped index with index_id > EmptyIndexID
std::vector<IndexID> dropped_indexes;
for (const auto & i : newly_dropped)
for (const auto & idx_id : newly_dropped)
{
if (i.index_id <= EmptyIndexID)
if (idx_id <= EmptyIndexID)
continue;
dropped_indexes.emplace_back(i.index_id);
dropped_indexes.emplace_back(idx_id);
}
return LocalIndexInfosChangeset{
.new_local_index_infos = new_index_infos,
Expand Down

0 comments on commit e7cccaa

Please sign in to comment.