Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: Use mmap to view vector index #9313

Merged
merged 18 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ namespace DB
M(proactive_flush_force_set_type) \
M(exception_when_fetch_disagg_pages) \
M(cop_send_failure) \
M(file_cache_fg_download_fail) \
M(force_set_parallel_prehandle_threshold) \
M(force_raise_prehandle_exception) \
M(force_agg_on_partial_block) \
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Common/LRUCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ class LRUCache
return res;
}

/// Returns whether a specific key is in the LRU cache
/// without updating the LRU order.
bool contains(const Key & key)
{
std::lock_guard cache_lock(mutex);
return cells.contains(key);
}

void set(const Key & key, const MappedPtr & mapped)
{
std::scoped_lock cache_lock(mutex);
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/StoragePool/GlobalPageIdAllocator.h>
#include <Storages/DeltaMerge/StoragePool/GlobalStoragePool.h>
#include <Storages/DeltaMerge/StoragePool/StoragePool.h>
Expand Down Expand Up @@ -1388,13 +1388,13 @@ void Context::dropMinMaxIndexCache() const
shared->minmax_index_cache->reset();
}

void Context::setVectorIndexCache(size_t cache_size_in_bytes)
void Context::setVectorIndexCache(size_t cache_entities)
{
auto lock = getLock();

RUNTIME_CHECK(!shared->vector_index_cache);

shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_size_in_bytes);
shared->vector_index_cache = std::make_shared<DM::VectorIndexCache>(cache_entities);
}

DM::VectorIndexCachePtr Context::getVectorIndexCache() const
Expand All @@ -1407,7 +1407,7 @@ void Context::dropVectorIndexCache() const
{
auto lock = getLock();
if (shared->vector_index_cache)
shared->vector_index_cache->reset();
shared->vector_index_cache.reset();
}

bool Context::isDeltaIndexLimited() const
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ class Context
std::shared_ptr<DM::MinMaxIndexCache> getMinMaxIndexCache() const;
void dropMinMaxIndexCache() const;

void setVectorIndexCache(size_t cache_size_in_bytes);
void setVectorIndexCache(size_t cache_entities);
std::shared_ptr<DM::VectorIndexCache> getVectorIndexCache() const;
void dropVectorIndexCache() const;

Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1439,10 +1439,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (minmax_index_cache_size)
global_context->setMinMaxIndexCache(minmax_index_cache_size);

// 1GiB vector index cache.
size_t vec_index_cache_size = config().getUInt64("vec_index_cache_size", 1ULL * 1024 * 1024 * 1024);
if (vec_index_cache_size)
global_context->setVectorIndexCache(vec_index_cache_size);
/// The vector index cache by number instead of bytes. Because it use `mmap` and let the operator system decide the memory usage.
size_t vec_index_cache_entities = config().getUInt64("vec_index_cache_entities", 1000);
if (vec_index_cache_entities)
global_context->setVectorIndexCache(vec_index_cache_entities);
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved

/// Size of max memory usage of DeltaIndex, used by DeltaMerge engine.
/// - In non-disaggregated mode, its default value is 0, means unlimited, and it
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ struct DMContext : private boost::noncopyable
TableID physical_table_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings)
const DB::Settings & settings,
const ScanContextPtr & scan_context = nullptr)
{
return std::unique_ptr<DMContext>(new DMContext(
session_context_,
Expand All @@ -145,7 +146,7 @@ struct DMContext : private boost::noncopyable
is_common_handle_,
rowkey_column_size_,
settings,
nullptr,
scan_context,
""));
}

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,14 @@ struct ColumnDefine
/// Note: ColumnDefine is used in both Write path and Read path.
/// In the read path, vector_index is usually not available. Use AnnQueryInfo for
/// read related vector index information.
TiDB::VectorIndexInfoPtr vector_index;
TiDB::VectorIndexDefinitionPtr vector_index;

explicit ColumnDefine(
ColId id_ = 0,
String name_ = "",
DataTypePtr type_ = nullptr,
Field default_value_ = Field{},
TiDB::VectorIndexInfoPtr vector_index_ = nullptr)
TiDB::VectorIndexDefinitionPtr vector_index_ = nullptr)
: id(id_)
, name(std::move(name_))
, type(std::move(type_))
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct ColumnStat
size_t array_sizes_bytes = 0;
size_t array_sizes_mark_bytes = 0;

std::optional<dtpb::ColumnVectorIndexInfo> vector_index = std::nullopt;
std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;

dtpb::ColumnStat toProto() const
{
Expand Down
Loading