Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new items in scan context #7841

Merged
merged 17 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tipb
21 changes: 10 additions & 11 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/FmtUtils.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashException.h>
#include <Common/TiFlashMetrics.h>
#include <DataStreams/ExpressionBlockInputStream.h>
Expand Down Expand Up @@ -308,10 +309,6 @@ void DAGStorageInterpreter::executeImpl(PipelineExecutorContext & exec_context,
{
auto & dag_context = dagContext();

auto scan_context = std::make_shared<DM::ScanContext>();
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

if (!mvcc_query_info->regions_query_info.empty())
{
buildLocalExec(exec_context, group_builder, context.getSettingsRef().max_block_size);
Expand All @@ -322,7 +319,7 @@ void DAGStorageInterpreter::executeImpl(PipelineExecutorContext & exec_context,
// Note that `buildRemoteRequests` must be called after `buildLocalExec` because
// `buildLocalExec` will setup `region_retry_from_local_region` and we must
// retry those regions or there will be data lost.
auto remote_requests = buildRemoteRequests(scan_context);
auto remote_requests = buildRemoteRequests(dag_context.scan_context_map[table_scan.getTableScanExecutorID()]);
if (dag_context.is_disaggregated_task && !remote_requests.empty())
{
// This means RN is sending requests with stale region info, we simply reject the request
Expand Down Expand Up @@ -400,10 +397,6 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
{
auto & dag_context = dagContext();

auto scan_context = std::make_shared<DM::ScanContext>();
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

if (!mvcc_query_info->regions_query_info.empty())
{
buildLocalStreams(pipeline, context.getSettingsRef().max_block_size);
Expand All @@ -415,7 +408,7 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
// Note that `buildRemoteRequests` must be called after `buildLocalStreams` because
// `buildLocalStreams` will setup `region_retry_from_local_region` and we must
// retry those regions or there will be data lost.
auto remote_requests = buildRemoteRequests(scan_context);
auto remote_requests = buildRemoteRequests(dag_context.scan_context_map[table_scan.getTableScanExecutorID()]);
if (dag_context.is_disaggregated_task && !remote_requests.empty())
{
// This means RN is sending requests with stale region info, we simply reject the request
Expand Down Expand Up @@ -552,11 +545,17 @@ void DAGStorageInterpreter::prepare()
// and `TiDB::TableInfo`) we may get this process more simplified. (tiflash/issues/1853)

// Do learner read
const DAGContext & dag_context = *context.getDAGContext();
DAGContext & dag_context = *context.getDAGContext();
auto scan_context = std::make_shared<DM::ScanContext>();
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

Stopwatch watch;
if (dag_context.isBatchCop() || dag_context.isMPPTask() || dag_context.is_disaggregated_task)
learner_read_snapshot = doBatchCopLearnerRead();
else
learner_read_snapshot = doCopLearnerRead();
scan_context->total_learner_read_ns += watch.elapsed();

// Acquire read lock on `alter lock` and build the requested inputstreams
storages_with_structure_lock = getAndLockStorages(context.getSettingsRef().schema_version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ struct MockWriter
summary.scan_context->total_dmfile_skipped_packs = 2;
summary.scan_context->total_dmfile_scanned_rows = 8000;
summary.scan_context->total_dmfile_skipped_rows = 15000;
summary.scan_context->total_dmfile_rough_set_index_load_time_ns = 10;
summary.scan_context->total_dmfile_rough_set_index_check_time_ns = 10;
summary.scan_context->total_dmfile_read_time_ns = 200;
summary.scan_context->total_create_snapshot_time_ns = 5;
summary.scan_context->total_local_region_num = 10;
Expand Down
13 changes: 5 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1191,23 +1191,21 @@ UInt64 DMFile::getFileSize(ColId col_id, const String & filename) const
}
}

S3::S3RandomAccessFile::ReadFileInfo DMFile::getReadFileInfo(ColId col_id, const String & filename) const
UInt64 DMFile::getReadFileSize(ColId col_id, const String & filename) const
{
auto itr = merged_sub_file_infos.find(filename);
if (itr != merged_sub_file_infos.end())
{
return getMergedFileInfoOfColumn(itr->second);
return getMergedFileSizeOfColumn(itr->second);
}
else
{
return S3::S3RandomAccessFile::ReadFileInfo{.size = getFileSize(col_id, filename)};
return getFileSize(col_id, filename);
}
}

S3::S3RandomAccessFile::ReadFileInfo DMFile::getMergedFileInfoOfColumn(const MergedSubFileInfo & file_info) const
UInt64 DMFile::getMergedFileSizeOfColumn(const MergedSubFileInfo & file_info) const
{
S3::S3RandomAccessFile::ReadFileInfo read_file_info;

// Get filesize of merged file.
auto itr = std::find_if(
merged_files.begin(),
Expand All @@ -1216,8 +1214,7 @@ S3::S3RandomAccessFile::ReadFileInfo DMFile::getMergedFileInfoOfColumn(const Mer
return merged_file.number == file_info.number;
});
RUNTIME_CHECK(itr != merged_files.end());
read_file_info.size = itr->size;
return read_file_info;
return itr->size;
}
} // namespace DM
} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,8 @@ class DMFile : private boost::noncopyable
bool useMetaV2() const { return version == DMFileFormat::V3; }

UInt64 getFileSize(ColId col_id, const String & filename) const;
S3::S3RandomAccessFile::ReadFileInfo getReadFileInfo(ColId col_id, const String & filename) const;
S3::S3RandomAccessFile::ReadFileInfo getMergedFileInfoOfColumn(const MergedSubFileInfo & file_info) const;
UInt64 getReadFileSize(ColId col_id, const String & filename) const;
UInt64 getMergedFileSizeOfColumn(const MergedSubFileInfo & file_info) const;

// The id to construct the file path on disk.
UInt64 file_id;
Expand Down
11 changes: 7 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ class DMFilePackFilter
tryLoadIndex(attr.col_id);
}

Stopwatch watch;
const auto check_results = filter->roughCheck(0, pack_count, param);
std::transform(use_packs.begin(), use_packs.end(), check_results.begin(), use_packs.begin(), [](UInt8 a, RSResult b) { return (static_cast<bool>(a)) && (b != None); });
scan_context->total_dmfile_rough_set_index_check_time_ns += watch.elapsed();
}

for (auto u : use_packs)
Expand Down Expand Up @@ -225,7 +227,8 @@ class DMFilePackFilter
const MinMaxIndexCachePtr & index_cache,
bool set_cache_if_miss,
ColId col_id,
const ReadLimiterPtr & read_limiter)
const ReadLimiterPtr & read_limiter,
const ScanContextPtr & scan_context)
{
const auto & type = dmfile->getColumnStat(col_id).type;
const auto file_name_base = DMFile::getFileNameBase(col_id);
Expand All @@ -234,7 +237,7 @@ class DMFilePackFilter
auto index_file_size = dmfile->colIndexSize(col_id);
if (index_file_size == 0)
return std::make_shared<MinMaxIndex>(*type);
auto index_guard = S3::S3RandomAccessFile::setReadFileInfo(dmfile->getReadFileInfo(col_id, dmfile->colIndexFileName(file_name_base)));
auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({dmfile->getReadFileSize(col_id, dmfile->colIndexFileName(file_name_base)), scan_context});
if (!dmfile->configuration) // v1
{
auto index_buf = ReadBufferFromFileProvider(
Expand Down Expand Up @@ -323,9 +326,9 @@ class DMFilePackFilter
return;

Stopwatch watch;
loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter);
loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context);

scan_context->total_dmfile_rough_set_index_load_time_ns += watch.elapsed();
scan_context->total_dmfile_rough_set_index_check_time_ns += watch.elapsed();
}

private:
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ DMFileReader::Stream::Stream(
if (res->empty()) // 0 rows.
return res;
size_t size = sizeof(MarkInCompressedFile) * reader.dmfile->getPacks();
auto mark_guard = S3::S3RandomAccessFile::setReadFileInfo(reader.dmfile->getReadFileInfo(col_id, reader.dmfile->colMarkFileName(file_name_base)));
auto mark_guard = S3::S3RandomAccessFile::setReadFileInfo({reader.dmfile->getReadFileSize(col_id, reader.dmfile->colMarkFileName(file_name_base)), reader.scan_context});
if (reader.dmfile->configuration)
{
if (reader.dmfile->useMetaV2()) // metav2
Expand Down Expand Up @@ -192,7 +192,7 @@ DMFileReader::Stream::Stream(
buffer_size,
aio_threshold,
max_read_buffer_size);
auto data_guard = S3::S3RandomAccessFile::setReadFileInfo(reader.dmfile->getReadFileInfo(col_id, reader.dmfile->colDataFileName(file_name_base)));
auto data_guard = S3::S3RandomAccessFile::setReadFileInfo({reader.dmfile->getReadFileSize(col_id, reader.dmfile->colDataFileName(file_name_base)), reader.scan_context});
if (!reader.dmfile->configuration)
{
buf = std::make_unique<CompressedReadBufferFromFileProvider<true>>(reader.file_provider,
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/DeltaMerge/Remote/RNLocalPageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void RNLocalPageCache::unguard(const std::vector<UniversalPageId> & keys, uint64
cv.notify_all();
}

RNLocalPageCache::OccupySpaceResult RNLocalPageCache::occupySpace(const std::vector<PageOID> & pages, const std::vector<size_t> & page_sizes)
RNLocalPageCache::OccupySpaceResult RNLocalPageCache::occupySpace(const std::vector<PageOID> & pages, const std::vector<size_t> & page_sizes, ScanContextPtr scan_context)
{
RUNTIME_CHECK(pages.size() == page_sizes.size(), pages.size(), page_sizes.size());
const size_t n = pages.size();
Expand Down Expand Up @@ -376,8 +376,12 @@ RNLocalPageCache::OccupySpaceResult RNLocalPageCache::occupySpace(const std::vec
// Pages may be occupied but not written yet, so we always return missing pages according
// to the storage.
if (const auto & page_entry = storage->getEntry(keys[i], snapshot); page_entry.isValid())
{
scan_context->total_disagg_read_cache_hit_size += page_sizes[i];
continue;
}
missing_ids.push_back(pages[i]);
scan_context->total_disagg_read_cache_miss_size += page_sizes[i];
}
GET_METRIC(tiflash_storage_remote_cache, type_page_miss).Increment(missing_ids.size());
GET_METRIC(tiflash_storage_remote_cache, type_page_hit).Increment(pages.size() - missing_ids.size());
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Remote/RNLocalPageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Interpreters/Context_fwd.h>
#include <Storages/DeltaMerge/Remote/ObjectId.h>
#include <Storages/DeltaMerge/Remote/RNLocalPageCache_fwd.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/PageStorage_fwd.h>
#include <Storages/Page/V3/Universal/UniversalPageId.h>
Expand Down Expand Up @@ -179,7 +180,7 @@ class RNLocalPageCache
*
* This function also returns pages not in the cache.
*/
OccupySpaceResult occupySpace(const std::vector<PageOID> & pages, const std::vector<size_t> & page_sizes);
OccupySpaceResult occupySpace(const std::vector<PageOID> & pages, const std::vector<size_t> & page_sizes, ScanContextPtr scan_context = nullptr);

/**
* Put a page into the cache.
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Remote/RNWorkerFetchPages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ RNLocalPageCache::OccupySpaceResult blockingOccupySpaceForTask(const RNReadSegme
// FetchPagesRequest into multiples in future, then we need to change
// the moment of calling `occupySpace`.
auto page_cache = seg_task->meta.dm_context->db_context.getSharedContextDisagg()->rn_page_cache;
auto scan_context = seg_task->meta.dm_context->scan_context;

Stopwatch w_occupy;
auto occupy_result = page_cache->occupySpace(cf_tiny_oids, seg_task->meta.delta_tinycf_page_sizes);
auto occupy_result = page_cache->occupySpace(cf_tiny_oids, seg_task->meta.delta_tinycf_page_sizes, scan_context);
// This metric is per-segment.
GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_cache_occupy).Observe(w_occupy.elapsedSeconds());

Expand Down
28 changes: 23 additions & 5 deletions dbms/src/Storages/DeltaMerge/ScanContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <atomic>


namespace DB::DM
{
/// ScanContext is used to record statistical information in table scan for current query.
Expand All @@ -41,14 +42,18 @@ class ScanContext
/// sum of skipped rows in dmfiles(both stable and ColumnFileBig) among this query
std::atomic<uint64_t> total_dmfile_skipped_rows{0};

std::atomic<uint64_t> total_dmfile_rough_set_index_load_time_ns{0};
std::atomic<uint64_t> total_dmfile_rough_set_index_check_time_ns{0};
std::atomic<uint64_t> total_dmfile_read_time_ns{0};
std::atomic<uint64_t> total_create_snapshot_time_ns{0};

std::atomic<uint64_t> total_remote_region_num{0};
std::atomic<uint64_t> total_local_region_num{0};

std::atomic<uint64_t> total_user_read_bytes{0};
std::atomic<uint64_t> total_learner_read_ns{0};
std::atomic<uint64_t> total_disagg_read_cache_hit_size{0};
std::atomic<uint64_t> total_disagg_read_cache_miss_size{0};


ScanContext() = default;

Expand All @@ -58,12 +63,15 @@ class ScanContext
total_dmfile_skipped_packs = tiflash_scan_context_pb.total_dmfile_skipped_packs();
total_dmfile_scanned_rows = tiflash_scan_context_pb.total_dmfile_scanned_rows();
total_dmfile_skipped_rows = tiflash_scan_context_pb.total_dmfile_skipped_rows();
total_dmfile_rough_set_index_load_time_ns = tiflash_scan_context_pb.total_dmfile_rough_set_index_load_time_ms() * 1000000;
total_dmfile_rough_set_index_check_time_ns = tiflash_scan_context_pb.total_dmfile_rough_set_index_check_time_ms() * 1000000;
total_dmfile_read_time_ns = tiflash_scan_context_pb.total_dmfile_read_time_ms() * 1000000;
total_create_snapshot_time_ns = tiflash_scan_context_pb.total_create_snapshot_time_ms() * 1000000;
total_remote_region_num = tiflash_scan_context_pb.total_remote_region_num();
total_local_region_num = tiflash_scan_context_pb.total_local_region_num();
total_user_read_bytes = tiflash_scan_context_pb.total_user_read_bytes();
total_learner_read_ns = tiflash_scan_context_pb.total_learner_read_ms() * 1000000;
total_disagg_read_cache_hit_size = tiflash_scan_context_pb.total_disagg_read_cache_hit_size();
total_disagg_read_cache_miss_size = tiflash_scan_context_pb.total_disagg_read_cache_miss_size();
}

tipb::TiFlashScanContext serialize()
Expand All @@ -73,12 +81,16 @@ class ScanContext
tiflash_scan_context_pb.set_total_dmfile_skipped_packs(total_dmfile_skipped_packs);
tiflash_scan_context_pb.set_total_dmfile_scanned_rows(total_dmfile_scanned_rows);
tiflash_scan_context_pb.set_total_dmfile_skipped_rows(total_dmfile_skipped_rows);
tiflash_scan_context_pb.set_total_dmfile_rough_set_index_load_time_ms(total_dmfile_rough_set_index_load_time_ns / 1000000);
tiflash_scan_context_pb.set_total_dmfile_rough_set_index_check_time_ms(total_dmfile_rough_set_index_check_time_ns / 1000000);
tiflash_scan_context_pb.set_total_dmfile_read_time_ms(total_dmfile_read_time_ns / 1000000);
tiflash_scan_context_pb.set_total_create_snapshot_time_ms(total_create_snapshot_time_ns / 1000000);
tiflash_scan_context_pb.set_total_remote_region_num(total_remote_region_num);
tiflash_scan_context_pb.set_total_local_region_num(total_local_region_num);
tiflash_scan_context_pb.set_total_user_read_bytes(total_user_read_bytes);
tiflash_scan_context_pb.set_total_learner_read_ms(total_learner_read_ns / 1000000);
tiflash_scan_context_pb.set_total_disagg_read_cache_hit_size(total_disagg_read_cache_hit_size);
tiflash_scan_context_pb.set_total_disagg_read_cache_miss_size(total_disagg_read_cache_miss_size);

return tiflash_scan_context_pb;
}

Expand All @@ -88,12 +100,15 @@ class ScanContext
total_dmfile_skipped_packs += other.total_dmfile_skipped_packs;
total_dmfile_scanned_rows += other.total_dmfile_scanned_rows;
total_dmfile_skipped_rows += other.total_dmfile_skipped_rows;
total_dmfile_rough_set_index_load_time_ns += other.total_dmfile_rough_set_index_load_time_ns;
total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ns;
total_dmfile_read_time_ns += other.total_dmfile_read_time_ns;
total_create_snapshot_time_ns += other.total_create_snapshot_time_ns;
total_local_region_num += other.total_local_region_num;
total_remote_region_num += other.total_remote_region_num;
total_user_read_bytes += other.total_user_read_bytes;
total_learner_read_ns += other.total_learner_read_ns;
total_disagg_read_cache_hit_size += other.total_disagg_read_cache_hit_size;
total_disagg_read_cache_miss_size += other.total_disagg_read_cache_miss_size;
}

void merge(const tipb::TiFlashScanContext & other)
Expand All @@ -102,12 +117,15 @@ class ScanContext
total_dmfile_skipped_packs += other.total_dmfile_skipped_packs();
total_dmfile_scanned_rows += other.total_dmfile_scanned_rows();
total_dmfile_skipped_rows += other.total_dmfile_skipped_rows();
total_dmfile_rough_set_index_load_time_ns += other.total_dmfile_rough_set_index_load_time_ms() * 1000000;
total_dmfile_rough_set_index_check_time_ns += other.total_dmfile_rough_set_index_check_time_ms() * 1000000;
total_dmfile_read_time_ns += other.total_dmfile_read_time_ms() * 1000000;
total_create_snapshot_time_ns += other.total_create_snapshot_time_ms() * 1000000;
total_local_region_num += other.total_local_region_num();
total_remote_region_num += other.total_remote_region_num();
total_user_read_bytes += other.total_user_read_bytes();
total_learner_read_ns += other.total_learner_read_ms() * 1000000;
total_disagg_read_cache_hit_size += other.total_disagg_read_cache_hit_size();
total_disagg_read_cache_miss_size += other.total_disagg_read_cache_miss_size();
}

// Reference: https://docs.pingcap.com/tidb/dev/tidb-resource-control
Expand Down
11 changes: 9 additions & 2 deletions dbms/src/Storages/S3/S3RandomAccessFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,26 @@ inline static RandomAccessFilePtr tryOpenCachedFile(const String & remote_fname,
}
}

inline static RandomAccessFilePtr createFromNormalFile(const String & remote_fname, std::optional<UInt64> filesize)
inline static RandomAccessFilePtr createFromNormalFile(const String & remote_fname, std::optional<UInt64> filesize, std::optional<DM::ScanContextPtr> scan_context)
{
auto file = tryOpenCachedFile(remote_fname, filesize);
if (file != nullptr)
{
if (scan_context.has_value())
scan_context.value()->total_disagg_read_cache_hit_size += filesize.value();
return file;
}
if (scan_context.has_value())
scan_context.value()->total_disagg_read_cache_miss_size += filesize.value();
auto & ins = S3::ClientFactory::instance();
return std::make_shared<S3RandomAccessFile>(ins.sharedTiFlashClient(), remote_fname);
}

RandomAccessFilePtr S3RandomAccessFile::create(const String & remote_fname)
{
return createFromNormalFile(remote_fname, read_file_info ? std::optional<UInt64>(read_file_info->size) : std::nullopt);
if (read_file_info)
return createFromNormalFile(remote_fname, std::optional<UInt64>(read_file_info->size), read_file_info->scan_context != nullptr ? std::optional<DM::ScanContextPtr>(read_file_info->scan_context) : std::nullopt);
else
return createFromNormalFile(remote_fname, std::nullopt, std::nullopt);
}
} // namespace DB::S3
Loading