Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: long term cache for integer-like PK column in memory #9445

Merged
merged 7 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ void ColumnArray::insertDefault()
getOffsets().push_back(getOffsets().empty() ? 0 : getOffsets().back());
}

void ColumnArray::insertManyDefaults(size_t length)
{
auto & offsets = getOffsets();
size_t v = 0;
if (!offsets.empty())
v = offsets.back();
offsets.resize_fill(offsets.size() + length, v);
}

void ColumnArray::popBack(size_t n)
{
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,7 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
}

void insertDefault() override;
void insertManyDefaults(size_t length) override
{
for (size_t i = 0; i < length; ++i)
insertDefault();
}
void insertManyDefaults(size_t length) override;
void popBack(size_t n) override;
/// TODO: If result_size_hint < 0, makes reserve() using size of filtered column, not source column to avoid some OOM issues.
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
Expand Down
24 changes: 24 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/File/ColumnCacheLongTerm.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
Expand Down Expand Up @@ -151,6 +152,7 @@ struct ContextShared
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable DM::MinMaxIndexCachePtr minmax_index_cache; /// Cache of minmax index in compressed files.
mutable DM::VectorIndexCachePtr vector_index_cache;
mutable DM::ColumnCacheLongTermPtr column_cache_long_term;
mutable DM::DeltaIndexManagerPtr delta_index_manager; /// Manage the Delta Indies of Segments.
ProcessList process_list; /// Executing queries at the moment.
ViewDependencies view_dependencies; /// Current dependencies
Expand Down Expand Up @@ -1412,6 +1414,28 @@ void Context::dropVectorIndexCache() const
shared->vector_index_cache.reset();
}

void Context::setColumnCacheLongTerm(size_t cache_size_in_bytes)
{
auto lock = getLock();

RUNTIME_CHECK(!shared->column_cache_long_term);

shared->column_cache_long_term = std::make_shared<DM::ColumnCacheLongTerm>(cache_size_in_bytes);
}

DM::ColumnCacheLongTermPtr Context::getColumnCacheLongTerm() const
{
auto lock = getLock();
return shared->column_cache_long_term;
}

void Context::dropColumnCacheLongTerm() const
{
auto lock = getLock();
if (shared->column_cache_long_term)
shared->column_cache_long_term.reset();
}

bool Context::isDeltaIndexLimited() const
{
// Don't need to use a lock here, as delta_index_manager should be set at starting up.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ namespace DM
{
class MinMaxIndexCache;
class VectorIndexCache;
class ColumnCacheLongTerm;
class DeltaIndexManager;
class GlobalStoragePool;
class SharedBlockSchemas;
Expand Down Expand Up @@ -397,6 +398,10 @@ class Context
std::shared_ptr<DM::VectorIndexCache> getVectorIndexCache() const;
void dropVectorIndexCache() const;

void setColumnCacheLongTerm(size_t cache_size_in_bytes);
std::shared_ptr<DM::ColumnCacheLongTerm> getColumnCacheLongTerm() const;
void dropColumnCacheLongTerm() const;

bool isDeltaIndexLimited() const;
void setDeltaIndexManager(size_t cache_size_in_bytes);
std::shared_ptr<DM::DeltaIndexManager> getDeltaIndexManager() const;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ int benchEntry(const std::vector<std::string> & opts)
/*min_version_*/ 0,
NullspaceID,
/*physical_table_id*/ 1,
/*pk_col_id*/ 0,
false,
1,
db_context->getSettingsRef());
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1480,6 +1480,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (vec_index_cache_entities)
global_context->setVectorIndexCache(vec_index_cache_entities);

size_t column_cache_long_term_size
= config().getUInt64("column_cache_long_term_size", 512 * 1024 * 1024 /* 512MB */);
if (column_cache_long_term_size)
global_context->setColumnCacheLongTerm(column_cache_long_term_size);

/// Size of max memory usage of DeltaIndex, used by DeltaMerge engine.
/// - In non-disaggregated mode, its default value is 0, means unlimited, and it
/// controls the number of total bytes keep in the memory.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
/*min_version_*/ 0,
NullspaceID,
/*physical_table_id*/ 1,
/*pk_col_id*/ 0,
false,
1,
db_context->getSettingsRef());
Expand Down
9 changes: 0 additions & 9 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,6 @@ class BitmapFilterView
inline UInt32 size() const { return filter_size; }

inline UInt32 offset() const { return filter_offset; }

// Return how many valid rows.
size_t count() const
{
return std::count(
filter->filter.cbegin() + filter_offset,
filter->filter.cbegin() + filter_offset + filter_size,
true);
}
};

} // namespace DB::DM
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/DMContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ DMContext::DMContext(
const Context & session_context_,
const StoragePathPoolPtr & path_pool_,
const StoragePoolPtr & storage_pool_,
const DB::Timestamp min_version_,
DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
Expand All @@ -47,6 +48,7 @@ DMContext::DMContext(
, min_version(min_version_)
, keyspace_id(keyspace_id_)
, physical_table_id(physical_table_id_)
, pk_col_id(pk_col_id_)
, is_common_handle(is_common_handle_)
, rowkey_column_size(rowkey_column_size_)
, segment_limit_rows(settings.dt_segment_limit_rows)
Expand Down
15 changes: 14 additions & 1 deletion dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ struct DMContext : private boost::noncopyable
const KeyspaceID keyspace_id;
const TableID physical_table_id;

/// The user-defined PK column. If multi-column PK, or no PK, it is 0.
/// Note that user-defined PK will never be _tidb_rowid.
///
/// @warning This field is later added. It is just set to 0 in existing tests
/// for convenience. If you develop some feature rely on this field, remember
/// to modify related unit tests.
const ColumnID pk_col_id;

bool is_common_handle;
// The number of columns in primary key if is_common_handle = true, otherwise, should always be 1.
size_t rowkey_column_size;
Expand Down Expand Up @@ -104,6 +112,7 @@ struct DMContext : private boost::noncopyable
DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
Expand All @@ -117,6 +126,7 @@ struct DMContext : private boost::noncopyable
min_version_,
keyspace_id_,
physical_table_id_,
pk_col_id_,
is_common_handle_,
rowkey_column_size_,
settings,
Expand All @@ -131,6 +141,7 @@ struct DMContext : private boost::noncopyable
DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
Expand All @@ -143,6 +154,7 @@ struct DMContext : private boost::noncopyable
min_version_,
keyspace_id_,
physical_table_id_,
pk_col_id_,
is_common_handle_,
rowkey_column_size_,
settings,
Expand All @@ -160,9 +172,10 @@ struct DMContext : private boost::noncopyable
const Context & session_context_,
const StoragePathPoolPtr & path_pool_,
const StoragePoolPtr & storage_pool_,
const DB::Timestamp min_version_,
DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
Expand Down
42 changes: 6 additions & 36 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ DeltaMergeStore::DeltaMergeStore(
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
Expand All @@ -230,6 +231,7 @@ DeltaMergeStore::DeltaMergeStore(
, is_common_handle(is_common_handle_)
, rowkey_column_size(rowkey_column_size_)
, original_table_handle_define(handle)
, pk_col_id(pk_col_id_)
, background_pool(db_context.getBackgroundPool())
, blockable_background_pool(db_context.getBlockableBackgroundPool())
, next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY)
Expand Down Expand Up @@ -331,6 +333,7 @@ DeltaMergeStorePtr DeltaMergeStore::create(
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
Expand All @@ -347,6 +350,7 @@ DeltaMergeStorePtr DeltaMergeStore::create(
table_name_,
keyspace_id_,
physical_table_id_,
pk_col_id_,
has_replica,
columns,
handle,
Expand All @@ -360,42 +364,6 @@ DeltaMergeStorePtr DeltaMergeStore::create(
return store_shared_ptr;
}

std::unique_ptr<DeltaMergeStore> DeltaMergeStore::createUnique(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name_,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
{
auto * store = new DeltaMergeStore(
db_context,
data_path_contains_database_name,
db_name_,
table_name_,
keyspace_id_,
physical_table_id_,
has_replica,
columns,
handle,
is_common_handle_,
rowkey_column_size_,
local_index_infos_,
settings_,
thread_pool);
std::unique_ptr<DeltaMergeStore> store_unique_ptr(store);
store_unique_ptr->checkAllSegmentsLocalIndex();
return store_unique_ptr;
}

DeltaMergeStore::~DeltaMergeStore()
{
LOG_INFO(log, "Release DeltaMerge Store start");
Expand Down Expand Up @@ -548,6 +516,7 @@ DMContextPtr DeltaMergeStore::newDMContext(
latest_gc_safe_point.load(std::memory_order_acquire),
keyspace_id,
physical_table_id,
pk_col_id,
is_common_handle,
rowkey_column_size,
db_settings,
Expand Down Expand Up @@ -1517,6 +1486,7 @@ Remote::DisaggPhysicalTableReadSnapshotPtr DeltaMergeStore::writeNodeBuildRemote

return std::make_unique<Remote::DisaggPhysicalTableReadSnapshot>(
KeyspaceTableID{keyspace_id, physical_table_id},
pk_col_id,
std::move(tasks));
}

Expand Down
22 changes: 6 additions & 16 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class DeltaMergeStore
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
Expand All @@ -307,22 +308,7 @@ class DeltaMergeStore
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

static std::unique_ptr<DeltaMergeStore> createUnique(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
Expand Down Expand Up @@ -932,6 +918,10 @@ class DeltaMergeStore
BlockPtr original_table_header; // Used to speed up getHeader()
ColumnDefine original_table_handle_define;

/// The user-defined PK column. If multi-column PK, or no PK, it is 0.
/// Note that user-defined PK will never be _tidb_rowid.
ColumnID pk_col_id;

// The columns we actually store.
// First three columns are always _tidb_rowid, _INTERNAL_VERSION, _INTERNAL_DELMARK
// No matter `tidb_rowid` exist in `table_columns` or not.
Expand Down
Loading