Skip to content

Commit

Permalink
storage: Add system.dt_local_indexes (#9379)
Browse files Browse the repository at this point in the history
ref #9032

storage: Add system.dt_local_indexes

Signed-off-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
Lloyd-Pottiger committed Aug 28, 2024
1 parent 5f08ae6 commit 3e3f23c
Show file tree
Hide file tree
Showing 6 changed files with 301 additions and 0 deletions.
20 changes: 20 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <Storages/DeltaMerge/DeltaMergeInterfaces.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/DeltaMerge/Filter/PushDownFilter.h>
#include <Storages/DeltaMerge/Index/IndexInfo.h>
#include <Storages/DeltaMerge/Remote/DisaggSnapshot_fwd.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/ScanContext_fwd.h>
Expand Down Expand Up @@ -173,6 +174,19 @@ struct StoreStats
UInt64 background_tasks_length = 0;
};

struct LocalIndexStats
{
String column_name{};
UInt64 column_id{};
String index_kind{};

UInt64 rows_stable_indexed{}; // Total rows
UInt64 rows_stable_not_indexed{}; // Total rows
UInt64 rows_delta_indexed{}; // Total rows
UInt64 rows_delta_not_indexed{}; // Total rows
};
using LocalIndexesStats = std::vector<LocalIndexStats>;

class DeltaMergeStore : private boost::noncopyable
{
public:
Expand Down Expand Up @@ -527,6 +541,7 @@ class DeltaMergeStore : private boost::noncopyable

StoreStats getStoreStats();
SegmentsStats getSegmentsStats();
LocalIndexesStats getLocalIndexStats();

bool isCommonHandle() const { return is_common_handle; }
size_t getRowKeyColumnSize() const { return rowkey_column_size; }
Expand Down Expand Up @@ -838,6 +853,11 @@ class DeltaMergeStore : private boost::noncopyable

RowKeyValue next_gc_check_key;

// Some indexes are built in TiFlash locally. For example, Vector Index.
// Compares to the lightweight RoughSet Indexes, these indexes require lot
// of resources to build, so they will be built in separated background pool.
IndexInfosPtr local_index_infos;

// Synchronize between write threads and read threads.
mutable std::shared_mutex read_write_mutex;

Expand Down
54 changes: 54 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,60 @@ SegmentsStats DeltaMergeStore::getSegmentsStats()
return stats;
}

LocalIndexesStats DeltaMergeStore::getLocalIndexStats()
{
std::shared_lock lock(read_write_mutex);

if (!local_index_infos || local_index_infos->empty())
return {};

LocalIndexesStats stats;
for (const auto & index_info : *local_index_infos)
{
LocalIndexStats index_stats;
index_stats.column_id = index_info.column_id;
index_stats.column_name = index_info.column_name;
index_stats.index_kind = "HNSW"; // TODO: Support more.

for (const auto & [handle, segment] : segments)
{
UNUSED(handle);

// Currently Delta is always not indexed.
index_stats.rows_delta_not_indexed
+= segment->getDelta()->getRows(); // TODO: More precisely count column bytes instead.

const auto & stable = segment->getStable();
bool is_stable_indexed = true;
for (const auto & dmfile : stable->getDMFiles())
{
if (!dmfile->isColumnExist(index_info.column_id))
continue; // Regard as indexed, because column does not need any index

auto column_stat = dmfile->getColumnStat(index_info.column_id);

if (column_stat.index_bytes == 0 && column_stat.data_bytes > 0)
{
is_stable_indexed = false;
break;
}
}

if (is_stable_indexed)
{
index_stats.rows_stable_indexed += stable->getRows();
}
else
{
index_stats.rows_stable_not_indexed += stable->getRows();
}
}

stats.emplace_back(index_stats);
}

return stats;
}

} // namespace DM
} // namespace DB
41 changes: 41 additions & 0 deletions dbms/src/Storages/DeltaMerge/Index/IndexInfo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/KVStore/Types.h>
#include <TiDB/Schema/VectorIndex.h>

namespace DB::DM
{

enum class IndexType
{
Vector = 1,
};

struct IndexInfo
{
IndexType type;
ColumnID column_id;
String column_name;
// Now we only support vector index.
// In the future, we may support more types of indexes, using std::variant.
TiDB::VectorIndexDefinitionPtr index_definition;
};

using IndexInfos = std::vector<IndexInfo>;
using IndexInfosPtr = std::shared_ptr<IndexInfos>;

} // namespace DB::DM
135 changes: 135 additions & 0 deletions dbms/src/Storages/System/StorageSystemDTLocalIndexes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Columns/ColumnString.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/DatabaseTiFlash.h>
#include <Databases/IDatabase.h>
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/KVStore/Types.h>
#include <Storages/MutableSupport.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/System/StorageSystemDTLocalIndexes.h>
#include <TiDB/Schema/SchemaNameMapper.h>

namespace DB
{
StorageSystemDTLocalIndexes::StorageSystemDTLocalIndexes(const std::string & name_)
: name(name_)
{
setColumns(ColumnsDescription({
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},

{"tidb_database", std::make_shared<DataTypeString>()},
{"tidb_table", std::make_shared<DataTypeString>()},
{"keyspace_id", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeUInt64>())},
{"table_id", std::make_shared<DataTypeInt64>()},

{"column_name", std::make_shared<DataTypeString>()},
{"column_id", std::make_shared<DataTypeUInt64>()},
{"index_kind", std::make_shared<DataTypeString>()},

{"rows_stable_indexed", std::make_shared<DataTypeUInt64>()}, // Total rows
{"rows_stable_not_indexed", std::make_shared<DataTypeUInt64>()}, // Total rows
{"rows_delta_indexed", std::make_shared<DataTypeUInt64>()}, // Total rows
{"rows_delta_not_indexed", std::make_shared<DataTypeUInt64>()}, // Total rows
}));
}

BlockInputStreams StorageSystemDTLocalIndexes::read(
const Names & column_names,
const SelectQueryInfo &,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
const size_t /*max_block_size*/,
const unsigned /*num_streams*/)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;

MutableColumns res_columns = getSampleBlock().cloneEmptyColumns();

SchemaNameMapper mapper;

auto databases = context.getDatabases();
for (const auto & d : databases)
{
String database_name = d.first;
const auto & database = d.second;
const DatabaseTiFlash * db_tiflash = typeid_cast<DatabaseTiFlash *>(database.get());

auto it = database->getIterator(context);
for (; it->isValid(); it->next())
{
const auto & table_name = it->name();
auto & storage = it->table();
if (storage->getName() != MutableSupport::delta_tree_storage_name)
continue;

auto dm_storage = std::dynamic_pointer_cast<StorageDeltaMerge>(storage);
const auto & table_info = dm_storage->getTableInfo();
auto table_id = table_info.id;
auto store = dm_storage->getStoreIfInited();
if (!store)
continue;

if (dm_storage->isTombstone())
continue;

auto index_stats = store->getLocalIndexStats();
for (auto & stat : index_stats)
{
size_t j = 0;
res_columns[j++]->insert(database_name);
res_columns[j++]->insert(table_name);

String tidb_db_name;
KeyspaceID keyspace_id = NullspaceID;
if (db_tiflash)
{
tidb_db_name = db_tiflash->getDatabaseInfo().name;
keyspace_id = db_tiflash->getDatabaseInfo().keyspace_id;
}
res_columns[j++]->insert(tidb_db_name);
String tidb_table_name = table_info.name;
res_columns[j++]->insert(tidb_table_name);
if (keyspace_id == NullspaceID)
res_columns[j++]->insert(Field());
else
res_columns[j++]->insert(static_cast<UInt64>(keyspace_id));
res_columns[j++]->insert(table_id);

res_columns[j++]->insert(stat.column_name);
res_columns[j++]->insert(stat.column_id);
res_columns[j++]->insert(stat.index_kind);

res_columns[j++]->insert(stat.rows_stable_indexed);
res_columns[j++]->insert(stat.rows_stable_not_indexed);
res_columns[j++]->insert(stat.rows_delta_indexed);
res_columns[j++]->insert(stat.rows_delta_not_indexed);
}
}
}

return BlockInputStreams(
1,
std::make_shared<OneBlockInputStream>(getSampleBlock().cloneWithColumns(std::move(res_columns))));
}

} // namespace DB
49 changes: 49 additions & 0 deletions dbms/src/Storages/System/StorageSystemDTLocalIndexes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/IStorage.h>

#include <ext/shared_ptr_helper.h>


namespace DB
{
class Context;

class StorageSystemDTLocalIndexes
: public ext::SharedPtrHelper<StorageSystemDTLocalIndexes>
, public IStorage
{
public:
std::string getName() const override { return "SystemDTLocalIndexes"; }
std::string getTableName() const override { return name; }

BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;

private:
const std::string name;

protected:
explicit StorageSystemDTLocalIndexes(const std::string & name_);
};

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/System/attachSystemTables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Storages/System/StorageSystemAsynchronousMetrics.h>
#include <Storages/System/StorageSystemBuildOptions.h>
#include <Storages/System/StorageSystemColumns.h>
#include <Storages/System/StorageSystemDTLocalIndexes.h>
#include <Storages/System/StorageSystemDTSegments.h>
#include <Storages/System/StorageSystemDTTables.h>
#include <Storages/System/StorageSystemDatabases.h>
Expand All @@ -42,6 +43,7 @@ void attachSystemTablesLocal(IDatabase & system_database)
system_database.attachTable("databases", StorageSystemDatabases::create("databases"));
system_database.attachTable("dt_tables", StorageSystemDTTables::create("dt_tables"));
system_database.attachTable("dt_segments", StorageSystemDTSegments::create("dt_segments"));
system_database.attachTable("dt_local_indexes", StorageSystemDTLocalIndexes::create("dt_local_indexes"));
system_database.attachTable("tables", StorageSystemTables::create("tables"));
system_database.attachTable("columns", StorageSystemColumns::create("columns"));
system_database.attachTable("functions", StorageSystemFunctions::create("functions"));
Expand Down

0 comments on commit 3e3f23c

Please sign in to comment.