Skip to content

Commit

Permalink
storage: long term cache for integer-like PK column in memory (#9445)
Browse files Browse the repository at this point in the history
ref #9032

storage: cache PK column in memory

Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
Lloyd-Pottiger and ti-chi-bot[bot] authored Sep 21, 2024
1 parent 355ab8c commit 5a41ce2
Show file tree
Hide file tree
Showing 53 changed files with 604 additions and 114 deletions.
8 changes: 8 additions & 0 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,14 @@ void ColumnArray::insertDefault()
getOffsets().push_back(getOffsets().empty() ? 0 : getOffsets().back());
}

void ColumnArray::insertManyDefaults(size_t length)
{
auto & offsets = getOffsets();
size_t v = 0;
if (!offsets.empty())
v = offsets.back();
offsets.resize_fill(offsets.size() + length, v);
}

void ColumnArray::popBack(size_t n)
{
Expand Down
6 changes: 1 addition & 5 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,7 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
}

void insertDefault() override;
void insertManyDefaults(size_t length) override
{
for (size_t i = 0; i < length; ++i)
insertDefault();
}
void insertManyDefaults(size_t length) override;
void popBack(size_t n) override;
/// TODO: If result_size_hint < 0, makes reserve() using size of filtered column, not source column to avoid some OOM issues.
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
Expand Down
24 changes: 24 additions & 0 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include <Storages/BackgroundProcessingPool.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h>
#include <Storages/DeltaMerge/DeltaIndexManager.h>
#include <Storages/DeltaMerge/File/ColumnCacheLongTerm.h>
#include <Storages/DeltaMerge/Index/MinMaxIndex.h>
#include <Storages/DeltaMerge/Index/VectorIndexCache.h>
#include <Storages/DeltaMerge/LocalIndexerScheduler.h>
Expand Down Expand Up @@ -151,6 +152,7 @@ struct ContextShared
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
mutable DM::MinMaxIndexCachePtr minmax_index_cache; /// Cache of minmax index in compressed files.
mutable DM::VectorIndexCachePtr vector_index_cache;
mutable DM::ColumnCacheLongTermPtr column_cache_long_term;
mutable DM::DeltaIndexManagerPtr delta_index_manager; /// Manage the Delta Indies of Segments.
ProcessList process_list; /// Executing queries at the moment.
ViewDependencies view_dependencies; /// Current dependencies
Expand Down Expand Up @@ -1412,6 +1414,28 @@ void Context::dropVectorIndexCache() const
shared->vector_index_cache.reset();
}

void Context::setColumnCacheLongTerm(size_t cache_size_in_bytes)
{
auto lock = getLock();

RUNTIME_CHECK(!shared->column_cache_long_term);

shared->column_cache_long_term = std::make_shared<DM::ColumnCacheLongTerm>(cache_size_in_bytes);
}

DM::ColumnCacheLongTermPtr Context::getColumnCacheLongTerm() const
{
auto lock = getLock();
return shared->column_cache_long_term;
}

void Context::dropColumnCacheLongTerm() const
{
auto lock = getLock();
if (shared->column_cache_long_term)
shared->column_cache_long_term.reset();
}

bool Context::isDeltaIndexLimited() const
{
// Don't need to use a lock here, as delta_index_manager should be set at starting up.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ namespace DM
{
class MinMaxIndexCache;
class VectorIndexCache;
class ColumnCacheLongTerm;
class DeltaIndexManager;
class GlobalStoragePool;
class SharedBlockSchemas;
Expand Down Expand Up @@ -397,6 +398,10 @@ class Context
std::shared_ptr<DM::VectorIndexCache> getVectorIndexCache() const;
void dropVectorIndexCache() const;

void setColumnCacheLongTerm(size_t cache_size_in_bytes);
std::shared_ptr<DM::ColumnCacheLongTerm> getColumnCacheLongTerm() const;
void dropColumnCacheLongTerm() const;

bool isDeltaIndexLimited() const;
void setDeltaIndexManager(size_t cache_size_in_bytes);
std::shared_ptr<DM::DeltaIndexManager> getDeltaIndexManager() const;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ int benchEntry(const std::vector<std::string> & opts)
/*min_version_*/ 0,
NullspaceID,
/*physical_table_id*/ 1,
/*pk_col_id*/ 0,
false,
1,
db_context->getSettingsRef());
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1480,6 +1480,11 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (vec_index_cache_entities)
global_context->setVectorIndexCache(vec_index_cache_entities);

size_t column_cache_long_term_size
= config().getUInt64("column_cache_long_term_size", 512 * 1024 * 1024 /* 512MB */);
if (column_cache_long_term_size)
global_context->setColumnCacheLongTerm(column_cache_long_term_size);

/// Size of max memory usage of DeltaIndex, used by DeltaMerge engine.
/// - In non-disaggregated mode, its default value is 0, means unlimited, and it
/// controls the number of total bytes keep in the memory.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Server/tests/gtest_dttool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic
/*min_version_*/ 0,
NullspaceID,
/*physical_table_id*/ 1,
/*pk_col_id*/ 0,
false,
1,
db_context->getSettingsRef());
Expand Down
9 changes: 0 additions & 9 deletions dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,6 @@ class BitmapFilterView
inline UInt32 size() const { return filter_size; }

inline UInt32 offset() const { return filter_offset; }

// Return how many valid rows.
size_t count() const
{
return std::count(
filter->filter.cbegin() + filter_offset,
filter->filter.cbegin() + filter_offset + filter_size,
true);
}
};

} // namespace DB::DM
4 changes: 3 additions & 1 deletion dbms/src/Storages/DeltaMerge/DMContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ DMContext::DMContext(
const Context & session_context_,
const StoragePathPoolPtr & path_pool_,
const StoragePoolPtr & storage_pool_,
const DB::Timestamp min_version_,
DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
Expand All @@ -47,6 +48,7 @@ DMContext::DMContext(
, min_version(min_version_)
, keyspace_id(keyspace_id_)
, physical_table_id(physical_table_id_)
, pk_col_id(pk_col_id_)
, is_common_handle(is_common_handle_)
, rowkey_column_size(rowkey_column_size_)
, segment_limit_rows(settings.dt_segment_limit_rows)
Expand Down
15 changes: 14 additions & 1 deletion dbms/src/Storages/DeltaMerge/DMContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ struct DMContext : private boost::noncopyable
const KeyspaceID keyspace_id;
const TableID physical_table_id;

/// The user-defined PK column. If multi-column PK, or no PK, it is 0.
/// Note that user-defined PK will never be _tidb_rowid.
///
/// @warning This field is later added. It is just set to 0 in existing tests
/// for convenience. If you develop some feature rely on this field, remember
/// to modify related unit tests.
const ColumnID pk_col_id;

bool is_common_handle;
// The number of columns in primary key if is_common_handle = true, otherwise, should always be 1.
size_t rowkey_column_size;
Expand Down Expand Up @@ -104,6 +112,7 @@ struct DMContext : private boost::noncopyable
DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
Expand All @@ -117,6 +126,7 @@ struct DMContext : private boost::noncopyable
min_version_,
keyspace_id_,
physical_table_id_,
pk_col_id_,
is_common_handle_,
rowkey_column_size_,
settings,
Expand All @@ -131,6 +141,7 @@ struct DMContext : private boost::noncopyable
DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
Expand All @@ -143,6 +154,7 @@ struct DMContext : private boost::noncopyable
min_version_,
keyspace_id_,
physical_table_id_,
pk_col_id_,
is_common_handle_,
rowkey_column_size_,
settings,
Expand All @@ -160,9 +172,10 @@ struct DMContext : private boost::noncopyable
const Context & session_context_,
const StoragePathPoolPtr & path_pool_,
const StoragePoolPtr & storage_pool_,
const DB::Timestamp min_version_,
DB::Timestamp min_version_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool is_common_handle_,
size_t rowkey_column_size_,
const DB::Settings & settings,
Expand Down
42 changes: 6 additions & 36 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ DeltaMergeStore::DeltaMergeStore(
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
Expand All @@ -230,6 +231,7 @@ DeltaMergeStore::DeltaMergeStore(
, is_common_handle(is_common_handle_)
, rowkey_column_size(rowkey_column_size_)
, original_table_handle_define(handle)
, pk_col_id(pk_col_id_)
, background_pool(db_context.getBackgroundPool())
, blockable_background_pool(db_context.getBlockableBackgroundPool())
, next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY)
Expand Down Expand Up @@ -331,6 +333,7 @@ DeltaMergeStorePtr DeltaMergeStore::create(
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
Expand All @@ -347,6 +350,7 @@ DeltaMergeStorePtr DeltaMergeStore::create(
table_name_,
keyspace_id_,
physical_table_id_,
pk_col_id_,
has_replica,
columns,
handle,
Expand All @@ -360,42 +364,6 @@ DeltaMergeStorePtr DeltaMergeStore::create(
return store_shared_ptr;
}

std::unique_ptr<DeltaMergeStore> DeltaMergeStore::createUnique(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name_,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_,
ThreadPool * thread_pool)
{
auto * store = new DeltaMergeStore(
db_context,
data_path_contains_database_name,
db_name_,
table_name_,
keyspace_id_,
physical_table_id_,
has_replica,
columns,
handle,
is_common_handle_,
rowkey_column_size_,
local_index_infos_,
settings_,
thread_pool);
std::unique_ptr<DeltaMergeStore> store_unique_ptr(store);
store_unique_ptr->checkAllSegmentsLocalIndex();
return store_unique_ptr;
}

DeltaMergeStore::~DeltaMergeStore()
{
LOG_INFO(log, "Release DeltaMerge Store start");
Expand Down Expand Up @@ -548,6 +516,7 @@ DMContextPtr DeltaMergeStore::newDMContext(
latest_gc_safe_point.load(std::memory_order_acquire),
keyspace_id,
physical_table_id,
pk_col_id,
is_common_handle,
rowkey_column_size,
db_settings,
Expand Down Expand Up @@ -1517,6 +1486,7 @@ Remote::DisaggPhysicalTableReadSnapshotPtr DeltaMergeStore::writeNodeBuildRemote

return std::make_unique<Remote::DisaggPhysicalTableReadSnapshot>(
KeyspaceTableID{keyspace_id, physical_table_id},
pk_col_id,
std::move(tasks));
}

Expand Down
22 changes: 6 additions & 16 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class DeltaMergeStore
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
Expand All @@ -307,22 +308,7 @@ class DeltaMergeStore
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
bool is_common_handle_,
size_t rowkey_column_size_,
IndexInfosPtr local_index_infos_,
const Settings & settings_ = EMPTY_SETTINGS,
ThreadPool * thread_pool = nullptr);

static std::unique_ptr<DeltaMergeStore> createUnique(
Context & db_context,
bool data_path_contains_database_name,
const String & db_name,
const String & table_name_,
KeyspaceID keyspace_id_,
TableID physical_table_id_,
ColumnID pk_col_id_,
bool has_replica,
const ColumnDefines & columns,
const ColumnDefine & handle,
Expand Down Expand Up @@ -932,6 +918,10 @@ class DeltaMergeStore
BlockPtr original_table_header; // Used to speed up getHeader()
ColumnDefine original_table_handle_define;

/// The user-defined PK column. If multi-column PK, or no PK, it is 0.
/// Note that user-defined PK will never be _tidb_rowid.
ColumnID pk_col_id;

// The columns we actually store.
// First three columns are always _tidb_rowid, _INTERNAL_VERSION, _INTERNAL_DELMARK
// No matter `tidb_rowid` exist in `table_columns` or not.
Expand Down
Loading

0 comments on commit 5a41ce2

Please sign in to comment.