Skip to content

Commit

Permalink
storage: Use mmap to view vector index (pingcap#165)
Browse files Browse the repository at this point in the history
Signed-off-by: Wish <[email protected]>
  • Loading branch information
breezewish authored and Lloyd-Pottiger committed Aug 12, 2024
1 parent 32d911b commit ba076ab
Show file tree
Hide file tree
Showing 36 changed files with 2,109 additions and 836 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
8 changes: 8 additions & 0 deletions dbms/src/Common/LRUCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ class LRUCache
return res;
}

/// Returns whether a specific key is in the LRU cache
/// without updating the LRU order.
bool contains(const Key & key)
{
std::lock_guard cache_lock(mutex);
return cells.find(key) != cells.end();
}

void set(const Key & key, const MappedPtr & mapped)
{
std::scoped_lock cache_lock(mutex);
Expand Down
125 changes: 125 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/BlockIO.h>
#include <Flash/Coprocessor/ChunkCodec.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/DAGStorageInterpreter.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/TableLockHolder.h>
#include <TiDB/Schema/TiDB.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#pragma GCC diagnostic ignored "-Wnon-virtual-dtor"
#include <kvproto/coprocessor.pb.h>
#include <pingcap/coprocessor/Client.h>
#include <tipb/select.pb.h>
#pragma GCC diagnostic pop

namespace DB
{
class DAGQueryBlock;
class ExchangeReceiver;
class DAGExpressionAnalyzer;
struct SubqueryForSet;
class Join;
class Expand2;
using JoinPtr = std::shared_ptr<Join>;
using Expand2Ptr = std::shared_ptr<Expand2>;

/** build ch plan from dag request: dag executors -> ch plan
*/
class DAGQueryBlockInterpreter
{
public:
DAGQueryBlockInterpreter(
Context & context_,
const std::vector<BlockInputStreams> & input_streams_vec_,
const DAGQueryBlock & query_block_,
size_t max_streams_);

~DAGQueryBlockInterpreter() = default;

BlockInputStreams execute();

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
void executeImpl(DAGPipeline & pipeline);
void handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline);
void handleJoin(
const tipb::Join & join,
DAGPipeline & pipeline,
SubqueryForSet & right_query,
size_t fine_grained_shuffle_count);
void handleExchangeReceiver(DAGPipeline & pipeline);
void handleMockExchangeReceiver(DAGPipeline & pipeline);
void handleProjection(DAGPipeline & pipeline, const tipb::Projection & projection);
void handleWindow(DAGPipeline & pipeline, const tipb::Window & window, bool enable_fine_grained_shuffle);
void handleWindowOrder(DAGPipeline & pipeline, const tipb::Sort & window_sort, bool enable_fine_grained_shuffle);
void handleExpand2(DAGPipeline & pipeline, const tipb::Expand2 & expand2);
void executeWhere(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expressionActionsPtr,
String & filter_column,
const String & extra_info = "");
void executeWindowOrder(DAGPipeline & pipeline, SortDescription sort_desc, bool enable_fine_grained_shuffle);
void executeOrder(DAGPipeline & pipeline, const NamesAndTypes & order_columns);
void executeLimit(DAGPipeline & pipeline);
void executeExpand(DAGPipeline & pipeline, const ExpressionActionsPtr & expr);
void executeExpand2(DAGPipeline & pipeline, const Expand2Ptr & expand);
void executeWindow(
DAGPipeline & pipeline,
WindowDescription & window_description,
bool enable_fine_grained_shuffle);
void executeAggregation(
DAGPipeline & pipeline,
const ExpressionActionsPtr & expression_actions_ptr,
const Names & key_names,
const TiDB::TiDBCollators & collators,
AggregateDescriptions & aggregate_descriptions,
bool is_final_agg,
bool enable_fine_grained_shuffle);
void executeProject(DAGPipeline & pipeline, NamesWithAliases & project_cols, const String & extra_info = "");
void handleExchangeSender(DAGPipeline & pipeline);
void handleMockExchangeSender(DAGPipeline & pipeline);

void recordProfileStreams(DAGPipeline & pipeline, const String & key);

void recordJoinExecuteInfo(size_t build_side_index, const JoinPtr & join_ptr);

void restorePipelineConcurrency(DAGPipeline & pipeline);

DAGContext & dagContext() const;

Context & context;
std::vector<BlockInputStreams> input_streams_vec;
const DAGQueryBlock & query_block;

NamesWithAliases final_project;

/// How many streams we ask for storage to produce, and in how many threads we will do further processing.
size_t max_streams = 1;

std::unique_ptr<DAGExpressionAnalyzer> analyzer;

LoggerPtr log;
};
} // namespace DB
10 changes: 5 additions & 5 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
Expand Down Expand Up @@ -1385,16 +1385,16 @@ void Context::dropMinMaxIndexCache() const
{
auto lock = getLock();
if (shared->minmax_index_cache)
shared->minmax_index_cache->reset();
shared->minmax_index_cache.reset();
}

void Context::setVectorIndexCache(size_t cache_size_in_bytes)
void Context::setVectorIndexCache(size_t cache_entities)
{
auto lock = getLock();

RUNTIME_CHECK(!shared->vector_index_cache);

shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_size_in_bytes);
shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_entities);
}

DM::VectorIndexCachePtr Context::getVectorIndexCache() const
Expand All @@ -1407,7 +1407,7 @@ void Context::dropVectorIndexCache() const
{
auto lock = getLock();
if (shared->vector_index_cache)
shared->vector_index_cache->reset();
shared->vector_index_cache.reset();
}

bool Context::isDeltaIndexLimited() const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class Context
std::shared_ptr<DM::MinMaxIndexCache> getMinMaxIndexCache() const;
void dropMinMaxIndexCache() const;

void setVectorIndexCache(size_t cache_size_in_bytes);
void setVectorIndexCache(size_t cache_entities);
std::shared_ptr<DM::VectorIndexCache> getVectorIndexCache() const;
void dropVectorIndexCache() const;

Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1439,10 +1439,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (minmax_index_cache_size)
global_context->setMinMaxIndexCache(minmax_index_cache_size);

// 1GiB vector index cache.
size_t vec_index_cache_size = config().getUInt64("vec_index_cache_size", 1ULL * 1024 * 1024 * 1024);
if (vec_index_cache_size)
global_context->setVectorIndexCache(vec_index_cache_size);
size_t vec_index_cache_entities = config().getUInt64("vec_index_cache_entities", 1000);
if (vec_index_cache_entities)
global_context->setVectorIndexCache(vec_index_cache_entities);

/// Size of max memory usage of DeltaIndex, used by DeltaMerge engine.
/// - In non-disaggregated mode, its default value is 0, means unlimited, and it
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ struct DMContext : private boost::noncopyable
TableID physical_table_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings)
const DB::Settings & settings,
const ScanContextPtr & scan_context = nullptr)
{
return std::unique_ptr<DMContext>(new DMContext(
session_context_,
Expand All @@ -145,7 +146,7 @@ struct DMContext : private boost::noncopyable
is_common_handle_,
rowkey_column_size_,
settings,
nullptr,
scan_context,
""));
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ struct ColumnDefine
/// Note: ColumnDefine is used in both Write path and Read path.
/// In the read path, vector_index is usually not available. Use AnnQueryInfo for
/// read related vector index information.
TiDB::VectorIndexInfoPtr vector_index;
TiDB::VectorIndexDefinitionPtr vector_index;

explicit ColumnDefine(
ColId id_ = 0,
String name_ = "",
DataTypePtr type_ = nullptr,
Field default_value_ = Field{},
TiDB::VectorIndexInfoPtr vector_index_ = nullptr)
TiDB::VectorIndexDefinitionPtr vector_index_ = nullptr)
: id(id_)
, name(std::move(name_))
, type(std::move(type_))
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct ColumnStat
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;

std::optional<dtpb::ColumnVectorIndexInfo> vector_index = std::nullopt;
std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;

dtpb::ColumnStat toProto() const
{
Expand Down
Loading

0 comments on commit ba076ab

Please sign in to comment.