Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Add system.dt_local_indexes #9379

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Storages/DeltaMerge/DeltaMergeInterfaces.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Index/IndexInfo.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
Expand Down Expand Up @@ -173,6 +174,19 @@ struct StoreStats
UInt64 background_tasks_length = 0;
};

struct LocalIndexStats
{
String column_name{};
UInt64 column_id{};
String index_kind{};

UInt64 rows_stable_indexed{}; // Total rows
UInt64 rows_stable_not_indexed{}; // Total rows
Copy link
Member

@CalvinNeo CalvinNeo Aug 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What're these comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total rows of all segments.

UInt64 rows_delta_indexed{}; // Total rows
UInt64 rows_delta_not_indexed{}; // Total rows
};
using LocalIndexesStats = std::vector<LocalIndexStats>;

class DeltaMergeStore : private boost::noncopyable
{
public:
Expand Down Expand Up @@ -527,6 +541,7 @@ class DeltaMergeStore : private boost::noncopyable

StoreStats getStoreStats();
SegmentsStats getSegmentsStats();
LocalIndexesStats getLocalIndexStats();

bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }
Expand Down Expand Up @@ -838,6 +853,11 @@ class DeltaMergeStore : private boost::noncopyable

RowKeyValue next_gc_check_key;

// Some indexes are built in TiFlash locally. For example, Vector Index.
// Compares to the lightweight RoughSet Indexes, these indexes require lot
// of resources to build, so they will be built in separated background pool.
IndexInfosPtr local_index_infos;

// Synchronize between write threads and read threads.
mutable std::shared_mutex read_write_mutex;

Expand Down
54 changes: 54 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,60 @@ SegmentsStats DeltaMergeStore::getSegmentsStats()
return stats;
}

LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
{
std::shared_lock lock(read_write_mutex);

if (!local_index_infos || local_index_infos->empty())
return {};

LocalIndexesStats stats;
for (const auto & index_info : *local_index_infos)
{
LocalIndexStats index_stats;
index_stats.column_id = index_info.column_id;
index_stats.column_name = index_info.column_name;
index_stats.index_kind = "HNSW"; // TODO: Support more.

for (const auto & [handle, segment] : segments)
{
UNUSED(handle);

// Currently Delta is always not indexed.
index_stats.rows_delta_not_indexed
+= segment->getDelta()->getRows(); // TODO: More precisely count column bytes instead.

const auto & stable = segment->getStable();
bool is_stable_indexed = true;
for (const auto & dmfile : stable->getDMFiles())
{
if (!dmfile->isColumnExist(index_info.column_id))
continue; // Regard as indexed, because column does not need any index

auto column_stat = dmfile->getColumnStat(index_info.column_id);

if (column_stat.index_bytes == 0 && column_stat.data_bytes > 0)
{
is_stable_indexed = false;
break;
}
}

if (is_stable_indexed)
{
index_stats.rows_stable_indexed += stable->getRows();
}
else
{
index_stats.rows_stable_not_indexed += stable->getRows();
}
}

stats.emplace_back(index_stats);
}

return stats;
}

} // namespace DM
} // namespace DB
41 changes: 41 additions & 0 deletions dbms/src/Storages/DeltaMerge/Index/IndexInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/KVStore/Types.h>
#include <TiDB/Schema/VectorIndex.h>

namespace DB::DM
{

enum class IndexType
{
Vector = 1,
};

struct IndexInfo
{
IndexType type;
ColumnID column_id;
String column_name;
// Now we only support vector index.
// In the future, we may support more types of indexes, using std::variant.
TiDB::VectorIndexDefinitionPtr index_definition;
};

using IndexInfos = std::vector<IndexInfo>;
using IndexInfosPtr = std::shared_ptr<IndexInfos>;

} // namespace DB::DM
135 changes: 135 additions & 0 deletions dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Columns/ColumnString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/DatabaseTiFlash.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/KVStore/Types.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/System/StorageSystemDTLocalIndexes.h>
#include <TiDB/Schema/SchemaNameMapper.h>

namespace DB
{
StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & name_)
: name(name_)
{
setColumns(ColumnsDescription({
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},

{"tidb_database", std::make_shared<DataTypeString>()},
{"tidb_table", std::make_shared<DataTypeString>()},
{"keyspace_id", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
{"table_id", std::make_shared<DataTypeInt64>()},

{"column_name", std::make_shared<DataTypeString>()},
{"column_id", std::make_shared<DataTypeUInt64>()},
{"index_kind", std::make_shared<DataTypeString>()},

{"rows_stable_indexed", std::make_shared<DataTypeUInt64>()}, // Total rows
{"rows_stable_not_indexed", std::make_shared<DataTypeUInt64>()}, // Total rows
{"rows_delta_indexed", std::make_shared<DataTypeUInt64>()}, // Total rows
{"rows_delta_not_indexed", std::make_shared<DataTypeUInt64>()}, // Total rows
}));
}

BlockInputStreams StorageSystemDTLocalIndexes::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();

SchemaNameMapper mapper;

auto databases = context.getDatabases();
for (const auto & d : databases)
{
String database_name = d.first;
const auto & database = d.second;
const DatabaseTiFlash * db_tiflash = typeid_cast<DatabaseTiFlash *>(database.get());

auto it = database->getIterator(context);
for (; it->isValid(); it->next())
{
const auto & table_name = it->name();
auto & storage = it->table();
if (storage->getName() != MutableSupport::delta_tree_storage_name)
continue;

auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
const auto & table_info = dm_storage->getTableInfo();
auto table_id = table_info.id;
auto store = dm_storage->getStoreIfInited();
if (!store)
continue;

if (dm_storage->isTombstone())
continue;

auto index_stats = store->getLocalIndexStats();
for (auto & stat : index_stats)
{
size_t j = 0;
res_columns[j++]->insert(database_name);
res_columns[j++]->insert(table_name);

String tidb_db_name;
KeyspaceID keyspace_id = NullspaceID;
if (db_tiflash)
{
tidb_db_name = db_tiflash->getDatabaseInfo().name;
keyspace_id = db_tiflash->getDatabaseInfo().keyspace_id;
}
res_columns[j++]->insert(tidb_db_name);
String tidb_table_name = table_info.name;
res_columns[j++]->insert(tidb_table_name);
if (keyspace_id == NullspaceID)
res_columns[j++]->insert(Field());
else
res_columns[j++]->insert(static_cast<UInt64>(keyspace_id));
res_columns[j++]->insert(table_id);

res_columns[j++]->insert(stat.column_name);
res_columns[j++]->insert(stat.column_id);
res_columns[j++]->insert(stat.index_kind);

res_columns[j++]->insert(stat.rows_stable_indexed);
res_columns[j++]->insert(stat.rows_stable_not_indexed);
res_columns[j++]->insert(stat.rows_delta_indexed);
res_columns[j++]->insert(stat.rows_delta_not_indexed);
}
}
}

return BlockInputStreams(
1,
std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}

} // namespace DB
49 changes: 49 additions & 0 deletions dbms/src/Storages/System/StorageSystemDTLocalIndexes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/IStorage.h>

#include <ext/shared_ptr_helper.h>


namespace DB
{
class Context;

class StorageSystemDTLocalIndexes
: public ext::SharedPtrHelper<StorageSystemDTLocalIndexes>
, public IStorage
{
public:
std::string getName() const override { return "SystemDTLocalIndexes"; }
std::string getTableName() const override { return name; }

BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;

private:
const std::string name;

protected:
explicit StorageSystemDTLocalIndexes(const std::string & name_);
};

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/System/attachSystemTables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Storages/System/StorageSystemAsynchronousMetrics.h>
#include <Storages/System/StorageSystemBuildOptions.h>
#include <Storages/System/StorageSystemColumns.h>
#include <Storages/System/StorageSystemDTLocalIndexes.h>
#include <Storages/System/StorageSystemDTSegments.h>
#include <Storages/System/StorageSystemDTTables.h>
#include <Storages/System/StorageSystemDatabases.h>
Expand All @@ -42,6 +43,7 @@ void attachSystemTablesLocal(IDatabase & system_database)
system_database.attachTable("databases", StorageSystemDatabases::create("databases"));
system_database.attachTable("dt_tables", StorageSystemDTTables::create("dt_tables"));
system_database.attachTable("dt_segments", StorageSystemDTSegments::create("dt_segments"));
system_database.attachTable("dt_local_indexes", StorageSystemDTLocalIndexes::create("dt_local_indexes"));
system_database.attachTable("tables", StorageSystemTables::create("tables"));
system_database.attachTable("columns", StorageSystemColumns::create("columns"));
system_database.attachTable("functions", StorageSystemFunctions::create("functions"));
Expand Down