From b13cbcccf3acbd8bd852381d1177e343c1cd1eea Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Sat, 13 Jul 2024 10:07:43 +0800 Subject: [PATCH 1/6] storage: Optimize Vector Index [2] (#234) Signed-off-by: Wish --- dbms/src/Columns/ColumnArray.cpp | 8 + dbms/src/Columns/ColumnArray.h | 6 +- dbms/src/Interpreters/Context.cpp | 24 +++ dbms/src/Interpreters/Context.h | 5 + dbms/src/Server/DTTool/DTToolBench.cpp | 1 + dbms/src/Server/Server.cpp | 5 + dbms/src/Server/tests/gtest_dttool.cpp | 1 + .../BitmapFilter/BitmapFilterView.h | 16 +- dbms/src/Storages/DeltaMerge/DMContext.cpp | 4 +- dbms/src/Storages/DeltaMerge/DMContext.h | 15 +- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 42 +---- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 22 +-- .../DeltaMerge/File/ColumnCacheLongTerm.h | 104 +++++++++++ .../DeltaMerge/File/ColumnCacheLongTerm_fwd.h | 26 +++ .../File/DMFileBlockInputStream.cpp | 7 +- .../DeltaMerge/File/DMFileBlockInputStream.h | 19 +- .../Storages/DeltaMerge/File/DMFileReader.cpp | 1 + .../Storages/DeltaMerge/File/DMFileReader.h | 13 ++ .../File/VectorColumnFromIndexReader.cpp | 1 - dbms/src/Storages/DeltaMerge/ReadUtil.cpp | 1 + .../DeltaMerge/Remote/DisaggSnapshot.cpp | 2 + .../DeltaMerge/Remote/DisaggSnapshot.h | 4 +- .../DeltaMerge/Remote/Proto/remote.proto | 19 +- .../Storages/DeltaMerge/Remote/Serializer.cpp | 1 + .../Storages/DeltaMerge/SegmentReadTask.cpp | 4 +- .../src/Storages/DeltaMerge/SegmentReadTask.h | 3 +- .../Storages/DeltaMerge/StableValueSpace.cpp | 1 + .../DeltaMerge/tests/gtest_dm_column_file.cpp | 1 + .../tests/gtest_dm_delta_merge_store.cpp | 2 + ...est_dm_delta_merge_store_fast_add_peer.cpp | 1 + .../gtest_dm_delta_merge_store_test_basic.h | 2 + ...test_dm_delta_merge_store_vector_index.cpp | 1 + .../tests/gtest_dm_delta_value_space.cpp | 1 + .../DeltaMerge/tests/gtest_dm_file.cpp | 2 + .../tests/gtest_dm_minmax_index.cpp | 1 + .../DeltaMerge/tests/gtest_dm_segment.cpp | 1 + .../tests/gtest_dm_segment_common_handle.cpp | 1 + .../DeltaMerge/tests/gtest_dm_segment_s3.cpp | 1 + .../tests/gtest_dm_simple_pk_test_basic.cpp | 1 + .../tests/gtest_dm_vector_index.cpp | 164 ++++++++++++++++++ .../tests/gtest_segment_read_task.cpp | 9 +- .../gtest_segment_replace_stable_data.cpp | 1 + .../tests/gtest_segment_test_basic.cpp | 1 + .../tests/gtest_segment_test_basic.h | 19 +- .../DeltaMerge/workload/DTWorkload.cpp | 3 +- .../Storages/DeltaMerge/workload/DTWorkload.h | 2 +- dbms/src/Storages/StorageDeltaMerge.cpp | 16 ++ dbms/src/Storages/StorageDeltaMerge.h | 3 + .../Storages/StorageDisaggregatedRemote.cpp | 3 +- dbms/src/TestUtils/TiFlashTestEnv.cpp | 1 + dbms/src/TiDB/Decode/Vector.cpp | 10 +- dbms/src/TiDB/Schema/VectorIndex.h | 2 +- 52 files changed, 493 insertions(+), 111 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm.h create mode 100644 dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm_fwd.h diff --git a/dbms/src/Columns/ColumnArray.cpp b/dbms/src/Columns/ColumnArray.cpp index 6c5d57e3006..575f21c796a 100644 --- a/dbms/src/Columns/ColumnArray.cpp +++ b/dbms/src/Columns/ColumnArray.cpp @@ -352,6 +352,14 @@ void ColumnArray::insertDefault() getOffsets().push_back(getOffsets().empty() ? 0 : getOffsets().back()); } +void ColumnArray::insertManyDefaults(size_t length) +{ + auto & offsets = getOffsets(); + size_t v = 0; + if (!offsets.empty()) + v = offsets.back(); + offsets.resize_fill(offsets.size() + length, v); +} void ColumnArray::popBack(size_t n) { diff --git a/dbms/src/Columns/ColumnArray.h b/dbms/src/Columns/ColumnArray.h index 9c0e7d5589e..f18068e6ea0 100644 --- a/dbms/src/Columns/ColumnArray.h +++ b/dbms/src/Columns/ColumnArray.h @@ -103,11 +103,7 @@ class ColumnArray final : public COWPtrHelper } void insertDefault() override; - void insertManyDefaults(size_t length) override - { - for (size_t i = 0; i < length; ++i) - insertDefault(); - } + void insertManyDefaults(size_t length) override; void popBack(size_t n) override; /// TODO: If result_size_hint < 0, makes reserve() using size of filtered column, not source column to avoid some OOM issues. ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override; diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index 95edc01b2bd..f59c1e36559 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -53,6 +53,7 @@ #include #include #include +#include #include #include #include @@ -151,6 +152,7 @@ struct ContextShared mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files. mutable DM::MinMaxIndexCachePtr minmax_index_cache; /// Cache of minmax index in compressed files. mutable DM::VectorIndexCachePtr vector_index_cache; + mutable DM::ColumnCacheLongTermPtr column_cache_long_term; mutable DM::DeltaIndexManagerPtr delta_index_manager; /// Manage the Delta Indies of Segments. ProcessList process_list; /// Executing queries at the moment. ViewDependencies view_dependencies; /// Current dependencies @@ -1412,6 +1414,28 @@ void Context::dropVectorIndexCache() const shared->vector_index_cache.reset(); } +void Context::setColumnCacheLongTerm(size_t cache_size_in_bytes) +{ + auto lock = getLock(); + + RUNTIME_CHECK(!shared->column_cache_long_term); + + shared->column_cache_long_term = std::make_shared(cache_size_in_bytes); +} + +DM::ColumnCacheLongTermPtr Context::getColumnCacheLongTerm() const +{ + auto lock = getLock(); + return shared->column_cache_long_term; +} + +void Context::dropColumnCacheLongTerm() const +{ + auto lock = getLock(); + if (shared->column_cache_long_term) + shared->column_cache_long_term.reset(); +} + bool Context::isDeltaIndexLimited() const { // Don't need to use a lock here, as delta_index_manager should be set at starting up. diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 257c92ed0b8..edbe1451873 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -109,6 +109,7 @@ namespace DM { class MinMaxIndexCache; class VectorIndexCache; +class ColumnCacheLongTerm; class DeltaIndexManager; class GlobalStoragePool; class SharedBlockSchemas; @@ -397,6 +398,10 @@ class Context std::shared_ptr getVectorIndexCache() const; void dropVectorIndexCache() const; + void setColumnCacheLongTerm(size_t cache_size_in_bytes); + std::shared_ptr getColumnCacheLongTerm() const; + void dropColumnCacheLongTerm() const; + bool isDeltaIndexLimited() const; void setDeltaIndexManager(size_t cache_size_in_bytes); std::shared_ptr getDeltaIndexManager() const; diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index 5d57f2c4a35..37e1a543752 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -359,6 +359,7 @@ int benchEntry(const std::vector & opts) /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 1, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index fbcbac0c33e..92f530b3215 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1480,6 +1480,11 @@ int Server::main(const std::vector & /*args*/) if (vec_index_cache_entities) global_context->setVectorIndexCache(vec_index_cache_entities); + size_t column_cache_long_term_size + = config().getUInt64("column_cache_long_term_size", 512 * 1024 * 1024 /* 512MB */); + if (column_cache_long_term_size) + global_context->setColumnCacheLongTerm(column_cache_long_term_size); + /// Size of max memory usage of DeltaIndex, used by DeltaMerge engine. /// - In non-disaggregated mode, its default value is 0, means unlimited, and it /// controls the number of total bytes keep in the memory. diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 23442999036..54e315f2f66 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -91,6 +91,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 1, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h index 09804aaf077..26438fab8d0 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h @@ -46,13 +46,17 @@ class BitmapFilterView inline UInt32 offset() const { return filter_offset; } - // Return how many valid rows. - size_t count() const + String toDebugString() const { - return std::count( - filter->filter.cbegin() + filter_offset, - filter->filter.cbegin() + filter_offset + filter_size, - true); + String s(size(), '1'); + for (UInt32 i = 0; i < size(); i++) + { + if (!get(i)) + { + s[i] = '0'; + } + } + return s; } }; diff --git a/dbms/src/Storages/DeltaMerge/DMContext.cpp b/dbms/src/Storages/DeltaMerge/DMContext.cpp index 12217345489..f845820c8fb 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.cpp +++ b/dbms/src/Storages/DeltaMerge/DMContext.cpp @@ -33,9 +33,10 @@ DMContext::DMContext( const Context & session_context_, const StoragePathPoolPtr & path_pool_, const StoragePoolPtr & storage_pool_, - const DB::Timestamp min_version_, + DB::Timestamp min_version_, KeyspaceID keyspace_id_, TableID physical_table_id_, + ColumnID pk_col_id_, bool is_common_handle_, size_t rowkey_column_size_, const DB::Settings & settings, @@ -47,6 +48,7 @@ DMContext::DMContext( , min_version(min_version_) , keyspace_id(keyspace_id_) , physical_table_id(physical_table_id_) + , pk_col_id(pk_col_id_) , is_common_handle(is_common_handle_) , rowkey_column_size(rowkey_column_size_) , segment_limit_rows(settings.dt_segment_limit_rows) diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index 1a1d5266958..aed897ef365 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -59,6 +59,14 @@ struct DMContext : private boost::noncopyable const KeyspaceID keyspace_id; const TableID physical_table_id; + /// The user-defined PK column. If multi-column PK, or no PK, it is 0. + /// Note that user-defined PK will never be _tidb_rowid. + /// + /// @warning This field is later added. It is just set to 0 in existing tests + /// for convenience. If you develop some feature rely on this field, remember + /// to modify related unit tests. + const ColumnID pk_col_id; + bool is_common_handle; // The number of columns in primary key if is_common_handle = true, otherwise, should always be 1. size_t rowkey_column_size; @@ -104,6 +112,7 @@ struct DMContext : private boost::noncopyable DB::Timestamp min_version_, KeyspaceID keyspace_id_, TableID physical_table_id_, + ColumnID pk_col_id_, bool is_common_handle_, size_t rowkey_column_size_, const DB::Settings & settings, @@ -117,6 +126,7 @@ struct DMContext : private boost::noncopyable min_version_, keyspace_id_, physical_table_id_, + pk_col_id_, is_common_handle_, rowkey_column_size_, settings, @@ -131,6 +141,7 @@ struct DMContext : private boost::noncopyable DB::Timestamp min_version_, KeyspaceID keyspace_id_, TableID physical_table_id_, + ColumnID pk_col_id_, bool is_common_handle_, size_t rowkey_column_size_, const DB::Settings & settings, @@ -143,6 +154,7 @@ struct DMContext : private boost::noncopyable min_version_, keyspace_id_, physical_table_id_, + pk_col_id_, is_common_handle_, rowkey_column_size_, settings, @@ -160,9 +172,10 @@ struct DMContext : private boost::noncopyable const Context & session_context_, const StoragePathPoolPtr & path_pool_, const StoragePoolPtr & storage_pool_, - const DB::Timestamp min_version_, + DB::Timestamp min_version_, KeyspaceID keyspace_id_, TableID physical_table_id_, + ColumnID pk_col_id_, bool is_common_handle_, size_t rowkey_column_size_, const DB::Settings & settings, diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 3966121b6f5..0b7bf52fa09 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -213,6 +213,7 @@ DeltaMergeStore::DeltaMergeStore( const String & table_name_, KeyspaceID keyspace_id_, TableID physical_table_id_, + ColumnID pk_col_id_, bool has_replica, const ColumnDefines & columns, const ColumnDefine & handle, @@ -230,6 +231,7 @@ DeltaMergeStore::DeltaMergeStore( , is_common_handle(is_common_handle_) , rowkey_column_size(rowkey_column_size_) , original_table_handle_define(handle) + , pk_col_id(pk_col_id_) , background_pool(db_context.getBackgroundPool()) , blockable_background_pool(db_context.getBlockableBackgroundPool()) , next_gc_check_key(is_common_handle ? RowKeyValue::COMMON_HANDLE_MIN_KEY : RowKeyValue::INT_HANDLE_MIN_KEY) @@ -331,6 +333,7 @@ DeltaMergeStorePtr DeltaMergeStore::create( const String & table_name_, KeyspaceID keyspace_id_, TableID physical_table_id_, + ColumnID pk_col_id_, bool has_replica, const ColumnDefines & columns, const ColumnDefine & handle, @@ -347,6 +350,7 @@ DeltaMergeStorePtr DeltaMergeStore::create( table_name_, keyspace_id_, physical_table_id_, + pk_col_id_, has_replica, columns, handle, @@ -360,42 +364,6 @@ DeltaMergeStorePtr DeltaMergeStore::create( return store_shared_ptr; } -std::unique_ptr DeltaMergeStore::createUnique( - Context & db_context, - bool data_path_contains_database_name, - const String & db_name_, - const String & table_name_, - KeyspaceID keyspace_id_, - TableID physical_table_id_, - bool has_replica, - const ColumnDefines & columns, - const ColumnDefine & handle, - bool is_common_handle_, - size_t rowkey_column_size_, - IndexInfosPtr local_index_infos_, - const Settings & settings_, - ThreadPool * thread_pool) -{ - auto * store = new DeltaMergeStore( - db_context, - data_path_contains_database_name, - db_name_, - table_name_, - keyspace_id_, - physical_table_id_, - has_replica, - columns, - handle, - is_common_handle_, - rowkey_column_size_, - local_index_infos_, - settings_, - thread_pool); - std::unique_ptr store_unique_ptr(store); - store_unique_ptr->checkAllSegmentsLocalIndex(); - return store_unique_ptr; -} - DeltaMergeStore::~DeltaMergeStore() { LOG_INFO(log, "Release DeltaMerge Store start"); @@ -548,6 +516,7 @@ DMContextPtr DeltaMergeStore::newDMContext( latest_gc_safe_point.load(std::memory_order_acquire), keyspace_id, physical_table_id, + pk_col_id, is_common_handle, rowkey_column_size, db_settings, @@ -1517,6 +1486,7 @@ Remote::DisaggPhysicalTableReadSnapshotPtr DeltaMergeStore::writeNodeBuildRemote return std::make_unique( KeyspaceTableID{keyspace_id, physical_table_id}, + pk_col_id, std::move(tasks)); } diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 8af9fdb525e..2c3aff36e63 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -290,6 +290,7 @@ class DeltaMergeStore const String & table_name_, KeyspaceID keyspace_id_, TableID physical_table_id_, + ColumnID pk_col_id_, bool has_replica, const ColumnDefines & columns, const ColumnDefine & handle, @@ -307,22 +308,7 @@ class DeltaMergeStore const String & table_name_, KeyspaceID keyspace_id_, TableID physical_table_id_, - bool has_replica, - const ColumnDefines & columns, - const ColumnDefine & handle, - bool is_common_handle_, - size_t rowkey_column_size_, - IndexInfosPtr local_index_infos_, - const Settings & settings_ = EMPTY_SETTINGS, - ThreadPool * thread_pool = nullptr); - - static std::unique_ptr createUnique( - Context & db_context, - bool data_path_contains_database_name, - const String & db_name, - const String & table_name_, - KeyspaceID keyspace_id_, - TableID physical_table_id_, + ColumnID pk_col_id_, bool has_replica, const ColumnDefines & columns, const ColumnDefine & handle, @@ -932,6 +918,10 @@ class DeltaMergeStore BlockPtr original_table_header; // Used to speed up getHeader() ColumnDefine original_table_handle_define; + /// The user-defined PK column. If multi-column PK, or no PK, it is 0. + /// Note that user-defined PK will never be _tidb_rowid. + ColumnID pk_col_id; + // The columns we actually store. // First three columns are always _tidb_rowid, _INTERNAL_VERSION, _INTERNAL_DELMARK // No matter `tidb_rowid` exist in `table_columns` or not. diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm.h b/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm.h new file mode 100644 index 00000000000..1522c2f173c --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm.h @@ -0,0 +1,104 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace DB::DM +{ + +/** + * @brief ColumnCacheLongTerm exists for the lifetime of the process to reduce. + * repeated reading of some frequently used columns (like PK) involved in queries. + * It is unlike ColumnCache, which only exists for the lifetime of a snapshot. + * + * Currently ColumnCacheLongTerm is only filled in Vector Search, which requires + * high QPS. + */ +class ColumnCacheLongTerm +{ +private: + struct CacheKey + { + String dmfile_parent_path; + PageIdU64 dmfile_id; + ColumnID column_id; + + bool operator==(const CacheKey & other) const + { + return dmfile_parent_path == other.dmfile_parent_path // + && dmfile_id == other.dmfile_id // + && column_id == other.column_id; + } + }; + + struct CacheKeyHasher + { + std::size_t operator()(const CacheKey & id) const + { + using boost::hash_combine; + using boost::hash_value; + + std::size_t seed = 0; + hash_combine(seed, hash_value(id.dmfile_parent_path)); + hash_combine(seed, hash_value(id.dmfile_id)); + hash_combine(seed, hash_value(id.column_id)); + return seed; + } + }; + + struct CacheWeightFn + { + size_t operator()(const CacheKey & key, const IColumn::Ptr & col) const + { + return sizeof(key) + key.dmfile_parent_path.size() + col->byteSize(); + } + }; + + using LRUCache = DB::LRUCache; + +public: + explicit ColumnCacheLongTerm(size_t cache_size_bytes) + : cache(cache_size_bytes) + {} + + static bool isCacheableColumn(const ColumnDefine & cd) { return cd.type->isInteger(); } + + IColumn::Ptr get(const DMFilePtr & dmfile, ColumnID column_id, std::function load_fn) + { + auto key = CacheKey{ + .dmfile_parent_path = dmfile->parentPath(), + .dmfile_id = dmfile->fileId(), + .column_id = column_id, + }; + auto [result, _] = cache.getOrSet(key, [&load_fn] { return std::make_shared(load_fn()); }); + return *result; + } + + void clear() { cache.reset(); } + + void getStats(size_t & out_hits, size_t & out_misses) const { cache.getStats(out_hits, out_misses); } + +private: + LRUCache cache; +}; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm_fwd.h b/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm_fwd.h new file mode 100644 index 00000000000..7f94a742164 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm_fwd.h @@ -0,0 +1,26 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::DM +{ + +class ColumnCacheLongTerm; + +using ColumnCacheLongTermPtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 39683006f84..ec90d605feb 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -32,7 +32,8 @@ DMFileBlockInputStreamBuilder::DMFileBlockInputStreamBuilder(const Context & con setCaches( global_context.getMarkCache(), global_context.getMinMaxIndexCache(), - global_context.getVectorIndexCache()); + global_context.getVectorIndexCache(), + global_context.getColumnCacheLongTerm()); // init from settings setFromSettings(context.getSettingsRef()); } @@ -236,6 +237,10 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn scan_context, ReadTag::Query); + if (column_cache_long_term && pk_col_id) + // ColumnCacheLongTerm is only filled in Vector Search. + rest_columns_reader.setColumnCacheLongTerm(column_cache_long_term, pk_col_id); + DMFileWithVectorIndexBlockInputStreamPtr reader = DMFileWithVectorIndexBlockInputStream::create( ann_query_info, dmfile, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h index efe98004ca0..456999aa4c9 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -179,6 +180,16 @@ class DMFileBlockInputStreamBuilder return *this; } + /** + * @note To really enable the long term cache, you also need to ensure + * ColumnCacheLongTerm is initialized in the global context. + */ + DMFileBlockInputStreamBuilder & enableColumnCacheLongTerm(ColumnID pk_col_id_) + { + pk_col_id = pk_col_id_; + return *this; + } + private: // These methods are called by the ctor @@ -187,11 +198,13 @@ class DMFileBlockInputStreamBuilder DMFileBlockInputStreamBuilder & setCaches( const MarkCachePtr & mark_cache_, const MinMaxIndexCachePtr & index_cache_, - const VectorIndexCachePtr & vector_index_cache_) + const VectorIndexCachePtr & vector_index_cache_, + const ColumnCacheLongTermPtr & column_cache_long_term_) { mark_cache = mark_cache_; index_cache = index_cache_; vector_index_cache = vector_index_cache_; + column_cache_long_term = column_cache_long_term_; return *this; } @@ -224,6 +237,10 @@ class DMFileBlockInputStreamBuilder VectorIndexCachePtr vector_index_cache; // Note: Currently thie field is assigned only for Stable streams, not available for ColumnFileBig std::optional bitmap_filter; + + // Note: column_cache_long_term is currently only filled when performing Vector Search. + ColumnCacheLongTermPtr column_cache_long_term = nullptr; + ColumnID pk_col_id = 0; }; /** diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 4dfe5b4605f..0dd042f9568 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index a75c7bff6d7..ccac3686d0d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -182,6 +183,18 @@ class DMFileReader // Each pair object indicates several continuous packs with RSResult::All and will be read as a Block. // It is sorted by start_pack. std::queue> all_match_block_infos; + std::unordered_map last_read_from_cache{}; + +public: + void setColumnCacheLongTerm(ColumnCacheLongTermPtr column_cache_long_term_, ColumnID pk_col_id_) + { + column_cache_long_term = column_cache_long_term_; + pk_col_id = pk_col_id_; + } + +private: + ColumnCacheLongTermPtr column_cache_long_term = nullptr; + ColumnID pk_col_id = 0; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp index 8f6238bce02..4ac5fe274a1 100644 --- a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp @@ -17,7 +17,6 @@ #include - namespace DB::DM { diff --git a/dbms/src/Storages/DeltaMerge/ReadUtil.cpp b/dbms/src/Storages/DeltaMerge/ReadUtil.cpp index 88cb634b201..719f51c865b 100644 --- a/dbms/src/Storages/DeltaMerge/ReadUtil.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadUtil.cpp @@ -71,6 +71,7 @@ std::pair readBlockWithReturnFilter( stable = nullptr; if (delta != nullptr) { + filter = nullptr; block = delta->read(filter, true); } return {block, true}; diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp index 6f34f5302b8..849f9f3ca18 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.cpp @@ -83,8 +83,10 @@ SegmentReadTasks DisaggReadSnapshot::releaseNoNeedFetchTasks() DisaggPhysicalTableReadSnapshot::DisaggPhysicalTableReadSnapshot( KeyspaceTableID ks_table_id_, + ColumnID pk_col_id_, SegmentReadTasks && tasks_) : ks_physical_table_id(ks_table_id_) + , pk_col_id(pk_col_id_) { for (auto && t : tasks_) { diff --git a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h index 86ac2a7e62f..1ff0a68935b 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h +++ b/dbms/src/Storages/DeltaMerge/Remote/DisaggSnapshot.h @@ -96,7 +96,7 @@ class DisaggPhysicalTableReadSnapshot friend struct Serializer; public: - DisaggPhysicalTableReadSnapshot(KeyspaceTableID ks_table_id_, SegmentReadTasks && tasks_); + DisaggPhysicalTableReadSnapshot(KeyspaceTableID ks_table_id_, ColumnID pk_col_id_, SegmentReadTasks && tasks_); SegmentReadTaskPtr popTask(UInt64 segment_id); @@ -117,6 +117,8 @@ class DisaggPhysicalTableReadSnapshot // maybe we can reuse them to reduce memory consumption. DM::ColumnDefinesPtr column_defines; + ColumnID pk_col_id = 0; + private: mutable std::shared_mutex mtx; // segment_id -> SegmentReadTaskPtr diff --git a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto index c495c823cc6..62b4022dc76 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto +++ b/dbms/src/Storages/DeltaMerge/Remote/Proto/remote.proto @@ -21,6 +21,11 @@ message RemotePhysicalTable { uint64 table_id = 2; uint32 keyspace_id = 4; repeated RemoteSegment segments = 3; + + // Note: PK column is not handle column. For example, for a String PK, + // pk_col_id is the col_id of the String column, but handle column is -1. + // If PK is clustered, this field is kept 0. + int64 pk_col_id = 5; } message RemoteSegment { @@ -102,17 +107,3 @@ message CheckpointInfo { // whether the data reclaimed on the write node or not bool is_local_data_reclaimed = 4; } - -message TiFlashColumnInfo { - int64 column_id = 1; - // serialized name by IDataType::getName() - // TODO: deseri this name is costly, consider another way - // like the tipb.ColumnInfo - bytes type_full_name = 2; - // maybe this is not need - bytes column_name = 3; -} - -message TiFlashSchema { - repeated TiFlashColumnInfo columns = 1; -} diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 28ccdcfead3..68dbb4683b4 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -57,6 +57,7 @@ RemotePb::RemotePhysicalTable Serializer::serializePhysicalTable( remote_table.set_snapshot_id(task_id.toMeta().SerializeAsString()); remote_table.set_keyspace_id(snap->ks_physical_table_id.first); remote_table.set_table_id(snap->ks_physical_table_id.second); + remote_table.set_pk_col_id(snap->pk_col_id); for (const auto & [seg_id, seg_task] : snap->tasks) { auto remote_seg = Serializer::serializeSegment( diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index 274a83554d1..0acbb1c8d53 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -65,7 +65,8 @@ SegmentReadTask::SegmentReadTask( StoreID store_id_, const String & store_address, KeyspaceID keyspace_id, - TableID physical_table_id) + TableID physical_table_id, + ColumnID pk_col_id) : store_id(store_id_) { CurrentMetrics::add(CurrentMetrics::DT_SegmentReadTasks); @@ -86,6 +87,7 @@ SegmentReadTask::SegmentReadTask( /* min_version */ 0, keyspace_id, physical_table_id, + pk_col_id, /* is_common_handle */ segment_range.is_common_handle, /* rowkey_column_size */ segment_range.rowkey_column_size, db_context.getSettingsRef(), diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index c24d52a782f..be28df7dc8e 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -83,7 +83,8 @@ struct SegmentReadTask StoreID store_id, const String & store_address, KeyspaceID keyspace_id, - TableID physical_table_id); + TableID physical_table_id, + ColumnID pk_col_id); ~SegmentReadTask(); diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 71ebc988d60..0b6e2c1ba28 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -489,6 +489,7 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream( { DMFileBlockInputStreamBuilder builder(context.global_context); builder.enableCleanRead(enable_handle_clean_read, is_fast_scan, enable_del_clean_read, max_data_version) + .enableColumnCacheLongTerm(context.pk_col_id) .setRSOperator(filter) .setColumnCache(column_caches[i]) .setTracingID(context.tracing_id) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp index 98081586a44..f8b2f2a66d8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp @@ -72,6 +72,7 @@ class ColumnFileTest /*min_version_*/ 0, keyspace_id, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp index eb203caefea..39f8620e81c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp @@ -277,6 +277,7 @@ try "t_200", NullspaceID, 200, + /*pk_col_id*/ 0, true, *new_cols, handle_column_define, @@ -3347,6 +3348,7 @@ class DeltaMergeStoreMergeDeltaBySegmentTest DB::base::TiFlashStorageTestBasic::getCurrentFullTestName(), NullspaceID, 101, + /*pk_col_id*/ 0, true, *cols, (*cols)[0], diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 00b135c1b61..0f79e62dd4d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -168,6 +168,7 @@ class DeltaMergeStoreTestFastAddPeer fmt::format("t_{}", table_id), keyspace_id, table_id, + /*pk_col_id*/ 0, true, *cols, handle_column_define, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h index 1b51b208e5f..3d6bafb3746 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_test_basic.h @@ -74,6 +74,7 @@ class DeltaMergeStoreTest : public DB::base::TiFlashStorageTestBasic "t_100", NullspaceID, 100, + /*pk_col_id*/ 0, true, *cols, handle_column_define, @@ -191,6 +192,7 @@ class DeltaMergeStoreRWTest "t_101", NullspaceID, 101, + /*pk_col_id*/ 0, true, *cols, handle_column_define, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp index 5d49358444f..7fe08268f0a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_vector_index.cpp @@ -48,6 +48,7 @@ class DeltaMergeStoreVectorTest "t_100", NullspaceID, 100, + /*pk_col_id*/ 0, true, *cols, handle_column_define, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp index 2a71a638aee..6c8f7abccb4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp @@ -110,6 +110,7 @@ class DeltaValueSpaceTest : public DB::base::TiFlashStorageTestBasic /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 33c91a7efb3..8e743da56c2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -135,6 +135,7 @@ class DMFileMetaV2Test : public DB::base::TiFlashStorageTestBasic /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); @@ -1969,6 +1970,7 @@ class DMFileClusteredIndexTest /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, is_common_handle, rowkey_column_size, db_context->getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp index bae9f31d5af..b2104bfc571 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_minmax_index.cpp @@ -118,6 +118,7 @@ bool checkMatch( name, NullspaceID, /*table_id*/ next_table_id++, + /*pk_col_id*/ 0, true, table_columns, getExtraHandleColumnDefine(is_common_handle), diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index a323171836c..8b7af3d0b93 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -120,6 +120,7 @@ class SegmentTest : public DB::base::TiFlashStorageTestBasic /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index 65f3e4c7ae2..9d33fea52de 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -92,6 +92,7 @@ class SegmentCommonHandleTest : public DB::base::TiFlashStorageTestBasic /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, is_common_handle, rowkey_column_size, db_context->getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp index 22ad30f0dd0..01694d3c0e4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp @@ -159,6 +159,7 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp index d356970e408..919ad3c830e 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_simple_pk_test_basic.cpp @@ -54,6 +54,7 @@ void SimplePKTestBasic::reload() DB::base::TiFlashStorageTestBasic::getCurrentFullTestName(), NullspaceID, 101, + /*pk_col_id*/ 0, true, *cols, (*cols)[0], diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index 9dcbbfea0c1..6b3f4babb87 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -99,6 +100,7 @@ class VectorIndexDMFileTest /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); @@ -808,6 +810,14 @@ class VectorIndexSegmentTestBase , public SegmentTestBasic { public: + void SetUp() override + { + auto options = SegmentTestBasic::SegmentTestOptions{}; + if (enable_column_cache_long_term) + options.pk_col_id = EXTRA_HANDLE_COLUMN_ID; + SegmentTestBasic::SetUp(options); + } + BlockInputStreamPtr annQuery( PageIdU64 segment_id, Int64 begin, @@ -880,6 +890,7 @@ class VectorIndexSegmentTestBase // DMFile has different logic when there is only vec column. // So we test it independently. bool test_only_vec_column = false; + bool enable_column_cache_long_term = false; int pack_size = 10; ColumnsWithTypeAndName createColumnData(const ColumnsWithTypeAndName & columns) const @@ -1109,6 +1120,158 @@ try } CATCH +class ColumnCacheLongTermTestCacheNotEnabled + : public VectorIndexSegmentTestBase + , public testing::WithParamInterface +{ +public: + ColumnCacheLongTermTestCacheNotEnabled() + { + enable_column_cache_long_term = false; + test_only_vec_column = GetParam(); + } +}; + +INSTANTIATE_TEST_CASE_P( // + VectorIndex, + ColumnCacheLongTermTestCacheNotEnabled, + /* vec_only */ ::testing::Bool()); + +TEST_P(ColumnCacheLongTermTestCacheNotEnabled, Basic) +try +{ + // When cache is not enabled, no matter we read from vec column or not, we should not record + // any cache hit or miss. + + ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 100, /* at */ 0, /* clear */ false); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + + size_t cache_hit = 0; + size_t cache_miss = 0; + db_context->getColumnCacheLongTerm()->clear(); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 0); + ASSERT_EQ(cache_miss, 0); + + auto stream = annQuery(DELTA_MERGE_FIRST_SEGMENT_ID, createQueryColumns(), 1, {100.0}); + assertStreamOut(stream, "[99, 100)"); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 0); + ASSERT_EQ(cache_miss, 0); +} +CATCH + +class ColumnCacheLongTermTestCacheEnabledAndNoReadPK + : public VectorIndexSegmentTestBase + , public testing::WithParamInterface +{ +public: + ColumnCacheLongTermTestCacheEnabledAndNoReadPK() + { + enable_column_cache_long_term = true; + test_only_vec_column = true; + } +}; + +INSTANTIATE_TEST_CASE_P( // + VectorIndex, + ColumnCacheLongTermTestCacheEnabledAndNoReadPK, + /* unused */ ::testing::Bool()); + +TEST_P(ColumnCacheLongTermTestCacheEnabledAndNoReadPK, Basic) +try +{ + // When cache is enabled, if we do not read PK, we should not record + // any cache hit or miss. + + ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 100, /* at */ 0, /* clear */ false); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + + size_t cache_hit = 0; + size_t cache_miss = 0; + db_context->getColumnCacheLongTerm()->clear(); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 0); + ASSERT_EQ(cache_miss, 0); + + auto stream = annQuery(DELTA_MERGE_FIRST_SEGMENT_ID, createQueryColumns(), 1, {100.0}); + assertStreamOut(stream, "[99, 100)"); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 0); + ASSERT_EQ(cache_miss, 0); +} +CATCH + +class ColumnCacheLongTermTestCacheEnabledAndReadPK + : public VectorIndexSegmentTestBase + , public testing::WithParamInterface +{ +public: + ColumnCacheLongTermTestCacheEnabledAndReadPK() + { + enable_column_cache_long_term = true; + test_only_vec_column = false; + pack_size = GetParam(); + } +}; + +INSTANTIATE_TEST_CASE_P( // + VectorIndex, + ColumnCacheLongTermTestCacheEnabledAndReadPK, + /* pack_size */ ::testing::Values(1, 2, 3, 4, 5)); + + +TEST_P(ColumnCacheLongTermTestCacheEnabledAndReadPK, Basic) +try +{ + // When cache is enabled, if we read from PK column, we could record + // cache hit and miss. + + ingestDTFileIntoDelta(DELTA_MERGE_FIRST_SEGMENT_ID, 100, /* at */ 0, /* clear */ false); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ensureSegmentStableIndex(DELTA_MERGE_FIRST_SEGMENT_ID, indexInfo()); + + size_t cache_hit = 0; + size_t cache_miss = 0; + db_context->getColumnCacheLongTerm()->clear(); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 0); + ASSERT_EQ(cache_miss, 0); + + auto stream = annQuery(DELTA_MERGE_FIRST_SEGMENT_ID, createQueryColumns(), 1, {100.0}); + assertStreamOut(stream, "[99, 100)"); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 0); + ASSERT_EQ(cache_miss, 1); + + stream = annQuery(DELTA_MERGE_FIRST_SEGMENT_ID, createQueryColumns(), 1, {100.0}); + assertStreamOut(stream, "[99, 100)"); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 1); + ASSERT_EQ(cache_miss, 1); + + // Read from possibly another pack, should still hit cache. + stream = annQuery(DELTA_MERGE_FIRST_SEGMENT_ID, createQueryColumns(), 1, {0.0}); + assertStreamOut(stream, "[0, 1)"); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 2); + ASSERT_EQ(cache_miss, 1); + + // Query over multiple packs (for example, when pack_size=1, this should query over 10 packs) + stream = annQuery(DELTA_MERGE_FIRST_SEGMENT_ID, createQueryColumns(), 10, {100.0}); + assertStreamOut(stream, "[90, 100)"); + db_context->getColumnCacheLongTerm()->getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 3); + ASSERT_EQ(cache_miss, 1); +} +CATCH + + class VectorIndexSegmentExtraColumnTest : public VectorIndexSegmentTestBase , public testing::WithParamInterface> @@ -1417,6 +1580,7 @@ class VectorIndexSegmentOnS3Test /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef(), diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp index 83f7d71972f..0ce3c04918f 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp @@ -205,7 +205,8 @@ class DMStoreForSegmentReadTaskTest : public DeltaMergeStoreTest /*store_id*/ 1, /*store_address*/ "127.0.0.1", store->keyspace_id, - store->physical_table_id); + store->physical_table_id, + /*pk_col_id*/ 0); } void initReadNodePageCacheIfUninitialized() @@ -716,7 +717,8 @@ try /*store_id*/ 1, /*store_address*/ "127.0.0.1", store->keyspace_id, - store->physical_table_id); + store->physical_table_id, + /*pk_col_id*/ 0); auto seg_id = seg_task->segment->segmentId(); @@ -856,7 +858,8 @@ try /*store_id*/ 1, /*store_address*/ "127.0.0.1", store->keyspace_id, - store->physical_table_id); + store->physical_table_id, + /*pk_col_id*/ 0); const auto & cfs = seg_task->read_snapshot->delta->getMemTableSetSnapshot()->getColumnFiles(); ASSERT_EQ(cfs.size(), 1); const auto & cf = cfs.front(); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp index b939f2028af..0ae0248cb3c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_stable_data.cpp @@ -503,6 +503,7 @@ class SegmentReplaceStableDataDisaggregated /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef(), diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 5f898376049..3664801fdd0 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -920,6 +920,7 @@ std::unique_ptr SegmentTestBasic::createDMContext() /*min_version_*/ 0, NullspaceID, /*physical_table_id*/ 100, + /*pk_col_id*/ options.pk_col_id, options.is_common_handle, 1, db_context->getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 180bcd57cb2..6cde2f65a3c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -27,25 +27,25 @@ #include #include -namespace DB -{ -namespace DM -{ -namespace tests +namespace DB::DM::tests { + class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic { public: struct SegmentTestOptions { bool is_common_handle = false; + ColumnID pk_col_id = 0; DB::Settings db_settings; }; - void SetUp() override + void SetUp() override { SetUp({}); } + + void SetUp(const SegmentTestOptions & options) { TiFlashStorageTestBasic::SetUp(); - reloadWithOptions({}); + reloadWithOptions(options); } public: @@ -188,6 +188,5 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic LoggerPtr logger_op; LoggerPtr logger; }; -} // namespace tests -} // namespace DM -} // namespace DB + +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp index 57543880c15..0a14c9df287 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.cpp @@ -59,13 +59,14 @@ DTWorkload::DTWorkload( context->initializeGlobalPageIdAllocator(); context->initializeGlobalStoragePoolIfNeed(context->getPathPool()); Stopwatch sw; - store = DeltaMergeStore::createUnique( + store = DeltaMergeStore::create( *context, true, table_info->db_name, table_info->table_name, NullspaceID, table_info->table_id, + /*pk_col_id*/ 0, true, *table_info->columns, table_info->handle, diff --git a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.h b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.h index 12032899409..c9ca1ede352 100644 --- a/dbms/src/Storages/DeltaMerge/workload/DTWorkload.h +++ b/dbms/src/Storages/DeltaMerge/workload/DTWorkload.h @@ -137,7 +137,7 @@ class DTWorkload std::unique_ptr table_info; std::unique_ptr key_gen; std::unique_ptr ts_gen; - std::unique_ptr store; + std::shared_ptr store; std::unique_ptr handle_lock; std::shared_ptr handle_table; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index dd186653ecf..c5822965711 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -302,6 +302,21 @@ void StorageDeltaMerge::updateTableColumnInfo() } rowkey_column_size = rowkey_column_defines.size(); + { + std::vector pk_col_ids; + for (const auto & col : tidb_table_info.columns) + { + if (col.hasPriKeyFlag()) + pk_col_ids.push_back(col.id); + } + if (pk_col_ids.size() == 1) + pk_col_id = pk_col_ids[0]; + else + pk_col_id = 0; + + // TODO: Handle with PK change? + } + LOG_INFO( log, "updateTableColumnInfo finished, table_name={} table_column_defines={}", @@ -1837,6 +1852,7 @@ DeltaMergeStorePtr & StorageDeltaMerge::getAndMaybeInitStore(ThreadPool * thread table_column_info->table_name, tidb_table_info.keyspace_id, tidb_table_info.id, + pk_col_id, tidb_table_info.replica_info.count > 0, std::move(table_column_info->table_column_defines), std::move(table_column_info->handle_column_define), diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index c9f87a9601a..4bc83b24374 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -275,6 +275,9 @@ class StorageDeltaMerge bool is_common_handle = false; bool pk_is_handle = false; size_t rowkey_column_size = 0; + /// The user-defined PK column. If multi-column PK, or no PK, it is 0. + /// Note that user-defined PK will never be _tidb_rowid. + ColumnID pk_col_id; OrderedNameSet hidden_columns; // The table schema synced from TiDB diff --git a/dbms/src/Storages/StorageDisaggregatedRemote.cpp b/dbms/src/Storages/StorageDisaggregatedRemote.cpp index 8637baa24b7..1dcc0d41acf 100644 --- a/dbms/src/Storages/StorageDisaggregatedRemote.cpp +++ b/dbms/src/Storages/StorageDisaggregatedRemote.cpp @@ -410,7 +410,8 @@ void StorageDisaggregated::buildReadTaskForWriteNodeTable( store_id, store_address, table.keyspace_id(), - table.table_id()); + table.table_id(), + table.pk_col_id()); std::lock_guard lock(output_lock); output_seg_tasks.push_back(seg_read_task); }, diff --git a/dbms/src/TestUtils/TiFlashTestEnv.cpp b/dbms/src/TestUtils/TiFlashTestEnv.cpp index b8ba74d477e..a6dc1d81980 100644 --- a/dbms/src/TestUtils/TiFlashTestEnv.cpp +++ b/dbms/src/TestUtils/TiFlashTestEnv.cpp @@ -170,6 +170,7 @@ void TiFlashTestEnv::addGlobalContext( global_context->createTMTContext(raft_config, pingcap::ClusterConfig()); global_context->setDeltaIndexManager(1024 * 1024 * 100 /*100MB*/); + global_context->setColumnCacheLongTerm(1024 * 1024 * 100 /*100MB*/); auto & path_pool = global_context->getPathPool(); global_context->getTMTContext().restore(path_pool); diff --git a/dbms/src/TiDB/Decode/Vector.cpp b/dbms/src/TiDB/Decode/Vector.cpp index 86b5f79c70a..8ab1e53a3de 100644 --- a/dbms/src/TiDB/Decode/Vector.cpp +++ b/dbms/src/TiDB/Decode/Vector.cpp @@ -31,15 +31,7 @@ extern const int BAD_ARGUMENTS; VectorFloat32Ref::VectorFloat32Ref(const Float32 * elements, size_t n) : elements(elements) , elements_n(n) -{ - for (size_t i = 0; i < n; ++i) - { - if (unlikely(std::isnan(elements[i]))) - throw Exception("NaN not allowed in vector", ErrorCodes::BAD_ARGUMENTS); - if (unlikely(std::isinf(elements[i]))) - throw Exception("infinite value not allowed in vector", ErrorCodes::BAD_ARGUMENTS); - } -} +{} void VectorFloat32Ref::checkDims(VectorFloat32Ref b) const { diff --git a/dbms/src/TiDB/Schema/VectorIndex.h b/dbms/src/TiDB/Schema/VectorIndex.h index 2af77b2e3e6..848af229a3e 100644 --- a/dbms/src/TiDB/Schema/VectorIndex.h +++ b/dbms/src/TiDB/Schema/VectorIndex.h @@ -69,7 +69,7 @@ struct fmt::formatter auto format(const TiDB::VectorIndexDefinitionPtr & vi, FormatContext & ctx) const -> decltype(ctx.out()) { if (!vi) - return fmt::format_to(ctx.out(), ""); + return fmt::format_to(ctx.out(), ""); return fmt::format_to(ctx.out(), "{}", *vi); } }; From b815f6cb8cef65514ebcc58e31e6b0b1748a194b Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Mon, 15 Jul 2024 13:01:17 +0800 Subject: [PATCH 2/6] storage: Optimize Vector Index [3] (#236) Signed-off-by: Wish --- .../DeltaMerge/File/ColumnCacheLongTerm.h | 10 +- .../DeltaMerge/Remote/RNWorkerFetchPages.cpp | 371 ++++++++++++++++++ .../tests/gtest_column_cache_long_term.cpp | 99 +++++ 3 files changed, 477 insertions(+), 3 deletions(-) create mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp create mode 100644 dbms/src/Storages/DeltaMerge/tests/gtest_column_cache_long_term.cpp diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm.h b/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm.h index 1522c2f173c..9f087eef826 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm.h +++ b/dbms/src/Storages/DeltaMerge/File/ColumnCacheLongTerm.h @@ -82,11 +82,15 @@ class ColumnCacheLongTerm static bool isCacheableColumn(const ColumnDefine & cd) { return cd.type->isInteger(); } - IColumn::Ptr get(const DMFilePtr & dmfile, ColumnID column_id, std::function load_fn) + IColumn::Ptr get( + const String & dmf_parent_path, + PageIdU64 dmf_id, + ColumnID column_id, + std::function load_fn) { auto key = CacheKey{ - .dmfile_parent_path = dmfile->parentPath(), - .dmfile_id = dmfile->fileId(), + .dmfile_parent_path = dmf_parent_path, + .dmfile_id = dmf_id, .column_id = column_id, }; auto [result, _] = cache.getOrSet(key, [&load_fn] { return std::make_shared(load_fn()); }); diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp new file mode 100644 index 00000000000..69bd9dd09b8 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp @@ -0,0 +1,371 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace std::chrono_literals; + +namespace DB::DM::Remote +{ + +RNLocalPageCache::OccupySpaceResult blockingOccupySpaceForTask(const RNReadSegmentTaskPtr & seg_task) +{ + std::vector cf_tiny_oids; + { + cf_tiny_oids.reserve(seg_task->meta.delta_tinycf_page_ids.size()); + for (const auto & page_id : seg_task->meta.delta_tinycf_page_ids) + { + auto page_oid = PageOID{ + .store_id = seg_task->meta.store_id, + .ks_table_id = {seg_task->meta.keyspace_id, seg_task->meta.physical_table_id}, + .page_id = page_id, + }; + cf_tiny_oids.emplace_back(page_oid); + } + } + + // Note: We must occupySpace segment by segment, because we need to read + // at least the complete data of one segment in order to drive everything forward. + // Currently we call occupySpace for each FetchPagesRequest, which is fine, + // because we send one request each seg_task. If we want to split + // FetchPagesRequest into multiples in future, then we need to change + // the moment of calling `occupySpace`. + auto page_cache = seg_task->meta.dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + auto scan_context = seg_task->meta.dm_context->scan_context; + + Stopwatch w_occupy; + auto occupy_result = page_cache->occupySpace(cf_tiny_oids, seg_task->meta.delta_tinycf_page_sizes, scan_context); + // This metric is per-segment. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_cache_occupy).Observe(w_occupy.elapsedSeconds()); + + return occupy_result; +} + +disaggregated::FetchDisaggPagesRequest buildFetchPagesRequest( + const RNReadSegmentTaskPtr & seg_task, + const std::vector & pages_not_in_cache) +{ + disaggregated::FetchDisaggPagesRequest req; + auto meta = seg_task->meta.snapshot_id.toMeta(); + // The keyspace_id here is not vital, as we locate the table and segment by given + // snapshot_id. But it could be helpful for debugging. + auto keyspace_id = seg_task->meta.keyspace_id; + meta.set_keyspace_id(keyspace_id); + meta.set_api_version(keyspace_id == NullspaceID ? kvrpcpb::APIVersion::V1 : kvrpcpb::APIVersion::V2); + *req.mutable_snapshot_id() = meta; + req.set_table_id(seg_task->meta.physical_table_id); + req.set_segment_id(seg_task->meta.segment_id); + + for (auto page_id : pages_not_in_cache) + req.add_page_ids(page_id.page_id); + + return req; +} + +RNReadSegmentTaskPtr RNWorkerFetchPages::doWork(const RNReadSegmentTaskPtr & seg_task) +{ + if (seg_task->meta.delta_tinycf_page_ids.empty()) + { + // No page need to be fetched or guarded. + return seg_task; + } + + MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get()); + Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; + SCOPE_EXIT({ + // This metric is per-segment. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_fetch_page) + .Observe(watch_work.elapsedSeconds()); + }); + + auto occupy_result = blockingOccupySpaceForTask(seg_task); + auto req = buildFetchPagesRequest(seg_task, occupy_result.pages_not_in_cache); + { + auto cftiny_total = seg_task->meta.delta_tinycf_page_ids.size(); + auto cftiny_fetch = occupy_result.pages_not_in_cache.size(); + LOG_DEBUG( + log, + "Ready to fetch pages, seg_task={} page_hit_rate={} pages_not_in_cache={}", + seg_task->info(), + cftiny_total == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * cftiny_fetch / cftiny_total), + occupy_result.pages_not_in_cache); + GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(cftiny_total); + GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(cftiny_fetch); + } + + const size_t max_retry_times = 3; + std::exception_ptr last_exception; + + // TODO: Maybe don't need to re-fetch all pages when retry. + for (size_t i = 0; i < max_retry_times; ++i) + { + try + { + doFetchPages(seg_task, req); + seg_task->initColumnFileDataProvider(occupy_result.pages_guard); + + // We finished fetch all pages for this seg task, just return it for downstream + // workers. If we have met any errors, page guard will not be persisted. + return seg_task; + } + catch (const pingcap::Exception & e) + { + last_exception = std::current_exception(); + LOG_WARNING( + log, + "Meet RPC client exception when fetching pages: {}, will be retried. seg_task={}", + e.displayText(), + seg_task->info()); + std::this_thread::sleep_for(1s); + } + catch (...) + { + LOG_ERROR(log, "{}: {}", seg_task->info(), getCurrentExceptionMessage(true)); + throw; + } + } + + // Still failed after retry... + RUNTIME_CHECK(last_exception); + std::rethrow_exception(last_exception); +} + +// In order to make network and disk run parallelly, +// `doFetchPages` will receive data pages from WN, +// package these data pages into several `WritePageTask` objects +// and send them to `RNWritePageCachePool` to write into local page cache. +struct WritePageTask +{ + explicit WritePageTask(RNLocalPageCache * page_cache_) + : page_cache(page_cache_) + {} + RNLocalPageCache * page_cache; + UniversalWriteBatch wb; + std::list remote_pages; // Hold the data of wb. + std::list remote_page_mem_tracker_wrappers; // Hold the memory stat of remote_pages. +}; +using WritePageTaskPtr = std::unique_ptr; + + +void RNWorkerFetchPages::doFetchPages( + const RNReadSegmentTaskPtr & seg_task, + const disaggregated::FetchDisaggPagesRequest & request) +{ + // No page need to be fetched. + if (request.page_ids_size() == 0) + return; + + Stopwatch sw_total; + Stopwatch watch_rpc{CLOCK_MONOTONIC_COARSE}; + bool rpc_is_observed = false; + double total_write_page_cache_sec = 0.0; + + pingcap::kv::RpcCall rpc( + cluster->rpc_client, + seg_task->meta.store_address); + + grpc::ClientContext client_context; + // set timeout for the streaming call to avoid inf wait before `Finish()` + rpc.setClientContext( + client_context, + seg_task->meta.dm_context->db_context.getSettingsRef().disagg_fetch_pages_timeout); + auto stream_resp = rpc.call(&client_context, request); + + SCOPE_EXIT({ + // Most of the time, it will call `Finish()` and check the status of grpc when `Read()` return false. + // `Finish()` will be called here when exceptions thrown. + if (unlikely(stream_resp != nullptr)) + { + stream_resp->Finish(); + } + }); + + // Used to verify all pages are fetched. + std::set remaining_pages_to_fetch; + for (auto p : request.page_ids()) + remaining_pages_to_fetch.insert(p); + + UInt64 read_stream_ns = 0; + UInt64 deserialize_page_ns = 0; + UInt64 schedule_write_page_ns = 0; + UInt64 packet_count = 0; + UInt64 task_count = 0; + UInt64 page_count = request.page_ids_size(); + + auto schedule_task = [&task_count, &schedule_write_page_ns](WritePageTaskPtr && write_page_task) { + task_count += 1; + auto task = std::make_shared>([write_page_task = std::move(write_page_task)]() { + write_page_task->page_cache->write(std::move(write_page_task->wb)); + }); + Stopwatch sw; + RNWritePageCachePool::get().scheduleOrThrowOnError([task]() { (*task)(); }); + schedule_write_page_ns += sw.elapsed(); + return task->get_future(); + }; + + WritePageTaskPtr write_page_task; + std::vector> write_page_results; + + // Keep reading packets. + while (true) + { + Stopwatch sw_packet; + auto packet = std::make_shared(); + if (bool more = stream_resp->Read(packet.get()); !more) + { + auto status = stream_resp->Finish(); + stream_resp.reset(); // Reset to avoid calling `Finish()` repeatedly. + RUNTIME_CHECK_MSG( + status.ok(), + "Failed to fetch all pages for {}, status={}, message={}, wn_address={}", + seg_task->info(), + static_cast(status.error_code()), + status.error_message(), + seg_task->meta.store_address); + break; + } + + MemTrackerWrapper packet_mem_tracker_wrapper(packet->SpaceUsedLong(), fetch_pages_mem_tracker.get()); + + read_stream_ns += sw_packet.elapsedFromLastTime(); + packet_count += 1; + if (!rpc_is_observed) + { + // Count RPC time as sending request + receive first response packet. + rpc_is_observed = true; + // This metric is per-segment, because we only count once for each task. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_rpc_fetch_page) + .Observe(watch_rpc.elapsedSeconds()); + } + + if (packet->has_error()) + { + throw Exception(fmt::format("{} (from {})", packet->error().msg(), seg_task->info())); + } + + Stopwatch watch_write_page_cache{CLOCK_MONOTONIC_COARSE}; + SCOPE_EXIT({ total_write_page_cache_sec += watch_write_page_cache.elapsedSeconds(); }); + + std::vector received_page_ids; + for (const String & page : packet->pages()) + { + if (write_page_task == nullptr) + { + write_page_task = std::make_unique( + seg_task->meta.dm_context->db_context.getSharedContextDisagg()->rn_page_cache.get()); + } + auto & remote_page = write_page_task->remote_pages.emplace_back(); // NOLINT(bugprone-use-after-move) + bool parsed = remote_page.ParseFromString(page); + RUNTIME_CHECK_MSG(parsed, "Failed to parse page data (from {})", seg_task->info()); + write_page_task->remote_page_mem_tracker_wrappers.emplace_back( + remote_page.SpaceUsedLong(), + fetch_pages_mem_tracker.get()); + + RUNTIME_CHECK( + remaining_pages_to_fetch.contains(remote_page.page_id()), + remaining_pages_to_fetch, + remote_page.page_id()); + + received_page_ids.emplace_back(remote_page.page_id()); + remaining_pages_to_fetch.erase(remote_page.page_id()); + + // Write page into LocalPageCache. Note that the page must be occupied. + auto oid = Remote::PageOID{ + .store_id = seg_task->meta.store_id, + .ks_table_id = {seg_task->meta.keyspace_id, seg_task->meta.physical_table_id}, + .page_id = remote_page.page_id(), + }; + auto read_buffer + = std::make_shared(remote_page.data().data(), remote_page.data().size()); + PageFieldSizes field_sizes; + field_sizes.reserve(remote_page.field_sizes_size()); + for (const auto & field_sz : remote_page.field_sizes()) + { + field_sizes.emplace_back(field_sz); + } + deserialize_page_ns += sw_packet.elapsedFromLastTime(); + + auto page_id = RNLocalPageCache::buildCacheId(oid); + write_page_task->wb + .putPage(page_id, 0, std::move(read_buffer), remote_page.data().size(), std::move(field_sizes)); + auto write_batch_limit_size + = seg_task->meta.dm_context->db_context.getSettingsRef().dt_write_page_cache_limit_size; + if (write_page_task->wb.getTotalDataSize() >= write_batch_limit_size) + { + write_page_results.push_back( + schedule_task(std::move(write_page_task))); // write_page_task is moved and reset. + } + } + } + + if (write_page_task != nullptr && write_page_task->wb.getTotalDataSize() > 0) + { + write_page_results.push_back(schedule_task(std::move(write_page_task))); + } + + Stopwatch sw_wait_write_page_finished; + for (auto & f : write_page_results) + { + f.get(); + } + auto wait_write_page_finished_ns = sw_wait_write_page_finished.elapsed(); + + // This metric is per-segment. + GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_write_page_cache) + .Observe(total_write_page_cache_sec); + + // Verify all pending pages are now received. + RUNTIME_CHECK_MSG( + remaining_pages_to_fetch.empty(), + "Failed to fetch all pages for {}, remaining_pages_to_fetch={}, wn_address={}", + seg_task->info(), + remaining_pages_to_fetch, + seg_task->meta.store_address); + + LOG_DEBUG( + log, + "Finished fetch pages, seg_task={}, page_count={}, packet_count={}, task_count={}, " + "total_ms={}, read_stream_ms={}, deserialize_page_ms={}, schedule_write_page_ms={}, " + "wait_write_page_finished_ms={}", + seg_task->info(), + page_count, + packet_count, + task_count, + sw_total.elapsed() / 1000000, + read_stream_ns / 1000000, + deserialize_page_ns / 1000000, + schedule_write_page_ns / 1000000, + wait_write_page_finished_ns / 1000000); +} + +} // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_column_cache_long_term.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_column_cache_long_term.cpp new file mode 100644 index 00000000000..9f276bfbd65 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_column_cache_long_term.cpp @@ -0,0 +1,99 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +namespace DB::DM::tests +{ + + +TEST(VectorIndexColumnCacheTest, Evict) +try +{ + size_t cache_hit = 0; + size_t cache_miss = 0; + + auto cache = ColumnCacheLongTerm(150); + cache.getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 0); + ASSERT_EQ(cache_miss, 0); + + auto col = cache.get("/", 1, 2, [] { + // key=40, value=40 + auto data = genSequence("[0, 5)"); + auto col = ::DB::tests::createColumn(data, "", 0).column; + return col; + }); + ASSERT_EQ(col->size(), 5); + cache.getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 0); + ASSERT_EQ(cache_miss, 1); + + col = cache.get("/", 1, 2, [] { + // key=40, value=40 + auto data = genSequence("[0, 5)"); + return ::DB::tests::createColumn(data, "", 0).column; + }); + ASSERT_EQ(col->size(), 5); + cache.getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 1); + ASSERT_EQ(cache_miss, 1); + + col = cache.get("/", 1, 3, [] { + // key=40, value=400 + auto data = genSequence("[0, 100)"); + return ::DB::tests::createColumn(data, "", 0).column; + }); + ASSERT_EQ(col->size(), 100); + cache.getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 1); + ASSERT_EQ(cache_miss, 2); + + col = cache.get("/", 1, 2, [] { + // key=40, value=40 + auto data = genSequence("[0, 5)"); + return ::DB::tests::createColumn(data, "", 0).column; + }); + ASSERT_EQ(col->size(), 5); + cache.getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 1); + ASSERT_EQ(cache_miss, 3); + + col = cache.get("/", 1, 4, [] { + // key=40, value=8 + auto data = genSequence("[0, 1)"); + return ::DB::tests::createColumn(data, "", 0).column; + }); + ASSERT_EQ(col->size(), 1); + cache.getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 1); + ASSERT_EQ(cache_miss, 4); + + col = cache.get("/", 1, 2, [] { + // key=40, value=40 + auto data = genSequence("[0, 5)"); + return ::DB::tests::createColumn(data, "", 0).column; + }); + ASSERT_EQ(col->size(), 5); + cache.getStats(cache_hit, cache_miss); + ASSERT_EQ(cache_hit, 2); + ASSERT_EQ(cache_miss, 4); +} +CATCH + + +} // namespace DB::DM::tests From 785a1d99e3196181ceecfa94a37d2a915aa58e1c Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 20 Sep 2024 13:53:35 +0800 Subject: [PATCH 3/6] fix Signed-off-by: Lloyd-Pottiger --- .../Storages/DeltaMerge/File/DMFileReader.cpp | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 0dd042f9568..fa24ef54723 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -457,6 +457,23 @@ ColumnPtr DMFileReader::readExtraColumn( { assert(cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == TAG_COLUMN_ID || cd.id == VERSION_COLUMN_ID); + if (column_cache_long_term && cd.id == pk_col_id && ColumnCacheLongTerm::isCacheableColumn(cd)) + { + // ColumnCacheLongTerm only caches user assigned PrimaryKey column. + auto column_all_data + = column_cache_long_term->get(dmfile->parentPath(), dmfile->fileId(), cd.id, [&]() -> IColumn::Ptr { + // Always read all packs when filling cache + ColumnPtr column; + readFromDiskOrSharingCache(cd, column, 0, dmfile->getPacks(), dmfile->getRows()); + return column; + }); + + auto column = cd.type->createColumn(); + column->reserve(read_rows); + column->insertRangeFrom(*column_all_data, next_row_offset - read_rows, read_rows); + return column; + } + const auto & pack_stats = dmfile->getPackStats(); auto read_strategy = ColumnCache::getReadStrategy(start_pack_id, pack_count, clean_read_packs); if (read_strategy.size() != 1 && cd.id == EXTRA_HANDLE_COLUMN_ID) From e01854a55f90da93c79e2c438a41544f807b3e7e Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 20 Sep 2024 14:12:15 +0800 Subject: [PATCH 4/6] fix Signed-off-by: Lloyd-Pottiger --- .../BitmapFilter/BitmapFilterView.h | 13 ------- .../Storages/DeltaMerge/File/DMFileReader.cpp | 35 ++++++++++--------- 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h index 26438fab8d0..f84dd6aca74 100644 --- a/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h +++ b/dbms/src/Storages/DeltaMerge/BitmapFilter/BitmapFilterView.h @@ -45,19 +45,6 @@ class BitmapFilterView inline UInt32 size() const { return filter_size; } inline UInt32 offset() const { return filter_offset; } - - String toDebugString() const - { - String s(size(), '1'); - for (UInt32 i = 0; i < size(); i++) - { - if (!get(i)) - { - s[i] = '0'; - } - } - return s; - } }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index fa24ef54723..e127eba17da 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -457,23 +457,6 @@ ColumnPtr DMFileReader::readExtraColumn( { assert(cd.id == EXTRA_HANDLE_COLUMN_ID || cd.id == TAG_COLUMN_ID || cd.id == VERSION_COLUMN_ID); - if (column_cache_long_term && cd.id == pk_col_id && ColumnCacheLongTerm::isCacheableColumn(cd)) - { - // ColumnCacheLongTerm only caches user assigned PrimaryKey column. - auto column_all_data - = column_cache_long_term->get(dmfile->parentPath(), dmfile->fileId(), cd.id, [&]() -> IColumn::Ptr { - // Always read all packs when filling cache - ColumnPtr column; - readFromDiskOrSharingCache(cd, column, 0, dmfile->getPacks(), dmfile->getRows()); - return column; - }); - - auto column = cd.type->createColumn(); - column->reserve(read_rows); - column->insertRangeFrom(*column_all_data, next_row_offset - read_rows, read_rows); - return column; - } - const auto & pack_stats = dmfile->getPackStats(); auto read_strategy = ColumnCache::getReadStrategy(start_pack_id, pack_count, clean_read_packs); if (read_strategy.size() != 1 && cd.id == EXTRA_HANDLE_COLUMN_ID) @@ -534,6 +517,24 @@ ColumnPtr DMFileReader::readColumn(const ColumnDefine & cd, size_t start_pack_id if (!column_streams.contains(DMFile::getFileNameBase(cd.id))) return createColumnWithDefaultValue(cd, read_rows); + if (column_cache_long_term && cd.id == pk_col_id && ColumnCacheLongTerm::isCacheableColumn(cd)) + { + // ColumnCacheLongTerm only caches user assigned PrimaryKey column. + auto data_type = dmfile->getColumnStat(cd.id).type; + auto column_all_data + = column_cache_long_term->get(dmfile->parentPath(), dmfile->fileId(), cd.id, [&]() -> IColumn::Ptr { + // Always read all packs when filling cache + ColumnPtr column; + readFromDiskOrSharingCache(cd, column, 0, dmfile->getPacks(), dmfile->getRows()); + return column; + }); + + auto column = data_type->createColumn(); + column->reserve(read_rows); + column->insertRangeFrom(*column_all_data, next_row_offset - read_rows, read_rows); + return convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); + } + // Not cached if (!enable_column_cache || !isCacheableColumn(cd)) { From b3dae7968b706ad26356ba1ce960a91314538069 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 20 Sep 2024 14:16:23 +0800 Subject: [PATCH 5/6] fix Signed-off-by: Lloyd-Pottiger --- .../DeltaMerge/Remote/RNWorkerFetchPages.cpp | 371 ------------------ 1 file changed, 371 deletions(-) delete mode 100644 dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp deleted file mode 100644 index 69bd9dd09b8..00000000000 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp +++ /dev/null @@ -1,371 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -using namespace std::chrono_literals; - -namespace DB::DM::Remote -{ - -RNLocalPageCache::OccupySpaceResult blockingOccupySpaceForTask(const RNReadSegmentTaskPtr & seg_task) -{ - std::vector cf_tiny_oids; - { - cf_tiny_oids.reserve(seg_task->meta.delta_tinycf_page_ids.size()); - for (const auto & page_id : seg_task->meta.delta_tinycf_page_ids) - { - auto page_oid = PageOID{ - .store_id = seg_task->meta.store_id, - .ks_table_id = {seg_task->meta.keyspace_id, seg_task->meta.physical_table_id}, - .page_id = page_id, - }; - cf_tiny_oids.emplace_back(page_oid); - } - } - - // Note: We must occupySpace segment by segment, because we need to read - // at least the complete data of one segment in order to drive everything forward. - // Currently we call occupySpace for each FetchPagesRequest, which is fine, - // because we send one request each seg_task. If we want to split - // FetchPagesRequest into multiples in future, then we need to change - // the moment of calling `occupySpace`. - auto page_cache = seg_task->meta.dm_context->db_context.getSharedContextDisagg()->rn_page_cache; - auto scan_context = seg_task->meta.dm_context->scan_context; - - Stopwatch w_occupy; - auto occupy_result = page_cache->occupySpace(cf_tiny_oids, seg_task->meta.delta_tinycf_page_sizes, scan_context); - // This metric is per-segment. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_cache_occupy).Observe(w_occupy.elapsedSeconds()); - - return occupy_result; -} - -disaggregated::FetchDisaggPagesRequest buildFetchPagesRequest( - const RNReadSegmentTaskPtr & seg_task, - const std::vector & pages_not_in_cache) -{ - disaggregated::FetchDisaggPagesRequest req; - auto meta = seg_task->meta.snapshot_id.toMeta(); - // The keyspace_id here is not vital, as we locate the table and segment by given - // snapshot_id. But it could be helpful for debugging. - auto keyspace_id = seg_task->meta.keyspace_id; - meta.set_keyspace_id(keyspace_id); - meta.set_api_version(keyspace_id == NullspaceID ? kvrpcpb::APIVersion::V1 : kvrpcpb::APIVersion::V2); - *req.mutable_snapshot_id() = meta; - req.set_table_id(seg_task->meta.physical_table_id); - req.set_segment_id(seg_task->meta.segment_id); - - for (auto page_id : pages_not_in_cache) - req.add_page_ids(page_id.page_id); - - return req; -} - -RNReadSegmentTaskPtr RNWorkerFetchPages::doWork(const RNReadSegmentTaskPtr & seg_task) -{ - if (seg_task->meta.delta_tinycf_page_ids.empty()) - { - // No page need to be fetched or guarded. - return seg_task; - } - - MemoryTrackerSetter setter(true, fetch_pages_mem_tracker.get()); - Stopwatch watch_work{CLOCK_MONOTONIC_COARSE}; - SCOPE_EXIT({ - // This metric is per-segment. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_worker_fetch_page) - .Observe(watch_work.elapsedSeconds()); - }); - - auto occupy_result = blockingOccupySpaceForTask(seg_task); - auto req = buildFetchPagesRequest(seg_task, occupy_result.pages_not_in_cache); - { - auto cftiny_total = seg_task->meta.delta_tinycf_page_ids.size(); - auto cftiny_fetch = occupy_result.pages_not_in_cache.size(); - LOG_DEBUG( - log, - "Ready to fetch pages, seg_task={} page_hit_rate={} pages_not_in_cache={}", - seg_task->info(), - cftiny_total == 0 ? "N/A" : fmt::format("{:.2f}%", 100.0 - 100.0 * cftiny_fetch / cftiny_total), - occupy_result.pages_not_in_cache); - GET_METRIC(tiflash_disaggregated_details, type_cftiny_read).Increment(cftiny_total); - GET_METRIC(tiflash_disaggregated_details, type_cftiny_fetch).Increment(cftiny_fetch); - } - - const size_t max_retry_times = 3; - std::exception_ptr last_exception; - - // TODO: Maybe don't need to re-fetch all pages when retry. - for (size_t i = 0; i < max_retry_times; ++i) - { - try - { - doFetchPages(seg_task, req); - seg_task->initColumnFileDataProvider(occupy_result.pages_guard); - - // We finished fetch all pages for this seg task, just return it for downstream - // workers. If we have met any errors, page guard will not be persisted. - return seg_task; - } - catch (const pingcap::Exception & e) - { - last_exception = std::current_exception(); - LOG_WARNING( - log, - "Meet RPC client exception when fetching pages: {}, will be retried. seg_task={}", - e.displayText(), - seg_task->info()); - std::this_thread::sleep_for(1s); - } - catch (...) - { - LOG_ERROR(log, "{}: {}", seg_task->info(), getCurrentExceptionMessage(true)); - throw; - } - } - - // Still failed after retry... - RUNTIME_CHECK(last_exception); - std::rethrow_exception(last_exception); -} - -// In order to make network and disk run parallelly, -// `doFetchPages` will receive data pages from WN, -// package these data pages into several `WritePageTask` objects -// and send them to `RNWritePageCachePool` to write into local page cache. -struct WritePageTask -{ - explicit WritePageTask(RNLocalPageCache * page_cache_) - : page_cache(page_cache_) - {} - RNLocalPageCache * page_cache; - UniversalWriteBatch wb; - std::list remote_pages; // Hold the data of wb. - std::list remote_page_mem_tracker_wrappers; // Hold the memory stat of remote_pages. -}; -using WritePageTaskPtr = std::unique_ptr; - - -void RNWorkerFetchPages::doFetchPages( - const RNReadSegmentTaskPtr & seg_task, - const disaggregated::FetchDisaggPagesRequest & request) -{ - // No page need to be fetched. - if (request.page_ids_size() == 0) - return; - - Stopwatch sw_total; - Stopwatch watch_rpc{CLOCK_MONOTONIC_COARSE}; - bool rpc_is_observed = false; - double total_write_page_cache_sec = 0.0; - - pingcap::kv::RpcCall rpc( - cluster->rpc_client, - seg_task->meta.store_address); - - grpc::ClientContext client_context; - // set timeout for the streaming call to avoid inf wait before `Finish()` - rpc.setClientContext( - client_context, - seg_task->meta.dm_context->db_context.getSettingsRef().disagg_fetch_pages_timeout); - auto stream_resp = rpc.call(&client_context, request); - - SCOPE_EXIT({ - // Most of the time, it will call `Finish()` and check the status of grpc when `Read()` return false. - // `Finish()` will be called here when exceptions thrown. - if (unlikely(stream_resp != nullptr)) - { - stream_resp->Finish(); - } - }); - - // Used to verify all pages are fetched. - std::set remaining_pages_to_fetch; - for (auto p : request.page_ids()) - remaining_pages_to_fetch.insert(p); - - UInt64 read_stream_ns = 0; - UInt64 deserialize_page_ns = 0; - UInt64 schedule_write_page_ns = 0; - UInt64 packet_count = 0; - UInt64 task_count = 0; - UInt64 page_count = request.page_ids_size(); - - auto schedule_task = [&task_count, &schedule_write_page_ns](WritePageTaskPtr && write_page_task) { - task_count += 1; - auto task = std::make_shared>([write_page_task = std::move(write_page_task)]() { - write_page_task->page_cache->write(std::move(write_page_task->wb)); - }); - Stopwatch sw; - RNWritePageCachePool::get().scheduleOrThrowOnError([task]() { (*task)(); }); - schedule_write_page_ns += sw.elapsed(); - return task->get_future(); - }; - - WritePageTaskPtr write_page_task; - std::vector> write_page_results; - - // Keep reading packets. - while (true) - { - Stopwatch sw_packet; - auto packet = std::make_shared(); - if (bool more = stream_resp->Read(packet.get()); !more) - { - auto status = stream_resp->Finish(); - stream_resp.reset(); // Reset to avoid calling `Finish()` repeatedly. - RUNTIME_CHECK_MSG( - status.ok(), - "Failed to fetch all pages for {}, status={}, message={}, wn_address={}", - seg_task->info(), - static_cast(status.error_code()), - status.error_message(), - seg_task->meta.store_address); - break; - } - - MemTrackerWrapper packet_mem_tracker_wrapper(packet->SpaceUsedLong(), fetch_pages_mem_tracker.get()); - - read_stream_ns += sw_packet.elapsedFromLastTime(); - packet_count += 1; - if (!rpc_is_observed) - { - // Count RPC time as sending request + receive first response packet. - rpc_is_observed = true; - // This metric is per-segment, because we only count once for each task. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_rpc_fetch_page) - .Observe(watch_rpc.elapsedSeconds()); - } - - if (packet->has_error()) - { - throw Exception(fmt::format("{} (from {})", packet->error().msg(), seg_task->info())); - } - - Stopwatch watch_write_page_cache{CLOCK_MONOTONIC_COARSE}; - SCOPE_EXIT({ total_write_page_cache_sec += watch_write_page_cache.elapsedSeconds(); }); - - std::vector received_page_ids; - for (const String & page : packet->pages()) - { - if (write_page_task == nullptr) - { - write_page_task = std::make_unique( - seg_task->meta.dm_context->db_context.getSharedContextDisagg()->rn_page_cache.get()); - } - auto & remote_page = write_page_task->remote_pages.emplace_back(); // NOLINT(bugprone-use-after-move) - bool parsed = remote_page.ParseFromString(page); - RUNTIME_CHECK_MSG(parsed, "Failed to parse page data (from {})", seg_task->info()); - write_page_task->remote_page_mem_tracker_wrappers.emplace_back( - remote_page.SpaceUsedLong(), - fetch_pages_mem_tracker.get()); - - RUNTIME_CHECK( - remaining_pages_to_fetch.contains(remote_page.page_id()), - remaining_pages_to_fetch, - remote_page.page_id()); - - received_page_ids.emplace_back(remote_page.page_id()); - remaining_pages_to_fetch.erase(remote_page.page_id()); - - // Write page into LocalPageCache. Note that the page must be occupied. - auto oid = Remote::PageOID{ - .store_id = seg_task->meta.store_id, - .ks_table_id = {seg_task->meta.keyspace_id, seg_task->meta.physical_table_id}, - .page_id = remote_page.page_id(), - }; - auto read_buffer - = std::make_shared(remote_page.data().data(), remote_page.data().size()); - PageFieldSizes field_sizes; - field_sizes.reserve(remote_page.field_sizes_size()); - for (const auto & field_sz : remote_page.field_sizes()) - { - field_sizes.emplace_back(field_sz); - } - deserialize_page_ns += sw_packet.elapsedFromLastTime(); - - auto page_id = RNLocalPageCache::buildCacheId(oid); - write_page_task->wb - .putPage(page_id, 0, std::move(read_buffer), remote_page.data().size(), std::move(field_sizes)); - auto write_batch_limit_size - = seg_task->meta.dm_context->db_context.getSettingsRef().dt_write_page_cache_limit_size; - if (write_page_task->wb.getTotalDataSize() >= write_batch_limit_size) - { - write_page_results.push_back( - schedule_task(std::move(write_page_task))); // write_page_task is moved and reset. - } - } - } - - if (write_page_task != nullptr && write_page_task->wb.getTotalDataSize() > 0) - { - write_page_results.push_back(schedule_task(std::move(write_page_task))); - } - - Stopwatch sw_wait_write_page_finished; - for (auto & f : write_page_results) - { - f.get(); - } - auto wait_write_page_finished_ns = sw_wait_write_page_finished.elapsed(); - - // This metric is per-segment. - GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_write_page_cache) - .Observe(total_write_page_cache_sec); - - // Verify all pending pages are now received. - RUNTIME_CHECK_MSG( - remaining_pages_to_fetch.empty(), - "Failed to fetch all pages for {}, remaining_pages_to_fetch={}, wn_address={}", - seg_task->info(), - remaining_pages_to_fetch, - seg_task->meta.store_address); - - LOG_DEBUG( - log, - "Finished fetch pages, seg_task={}, page_count={}, packet_count={}, task_count={}, " - "total_ms={}, read_stream_ms={}, deserialize_page_ms={}, schedule_write_page_ms={}, " - "wait_write_page_finished_ms={}", - seg_task->info(), - page_count, - packet_count, - task_count, - sw_total.elapsed() / 1000000, - read_stream_ns / 1000000, - deserialize_page_ns / 1000000, - schedule_write_page_ns / 1000000, - wait_write_page_finished_ns / 1000000); -} - -} // namespace DB::DM::Remote From 321ac8ae0106a60f10a8033e7dfd6d08a565d7c8 Mon Sep 17 00:00:00 2001 From: Lloyd-Pottiger Date: Fri, 20 Sep 2024 16:25:10 +0800 Subject: [PATCH 6/6] fix Signed-off-by: Lloyd-Pottiger --- dbms/src/Storages/StorageDeltaMerge.cpp | 2 +- dbms/src/Storages/StorageDeltaMerge.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index c5822965711..9a584d454fd 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -314,7 +314,7 @@ void StorageDeltaMerge::updateTableColumnInfo() else pk_col_id = 0; - // TODO: Handle with PK change? + // TODO: Handle with PK change: drop old PK column cache rather than let LRU evict it. } LOG_INFO( diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 4bc83b24374..fd88b9fbcf2 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -277,7 +277,7 @@ class StorageDeltaMerge size_t rowkey_column_size = 0; /// The user-defined PK column. If multi-column PK, or no PK, it is 0. /// Note that user-defined PK will never be _tidb_rowid. - ColumnID pk_col_id; + ColumnID pk_col_id = 0; OrderedNameSet hidden_columns; // The table schema synced from TiDB