Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add total_user_read_bytes in ScanContext and report read RU #7534

Merged
merged 5 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/tipb
11 changes: 8 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ std::pair<size_t, size_t> findColumnFile(const ColumnFiles & column_files, size_
}

ColumnFileSetReader::ColumnFileSetReader(
const DMContext & context,
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: snapshot(snapshot_)
: context(context_)
, snapshot(snapshot_)
, col_defs(col_defs_)
, segment_range(segment_range_)
{
Expand All @@ -94,7 +95,7 @@ ColumnFileSetReader::ColumnFileSetReader(

ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
auto * new_reader = new ColumnFileSetReader();
auto * new_reader = new ColumnFileSetReader(context);
new_reader->snapshot = snapshot;
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;
Expand Down Expand Up @@ -171,6 +172,10 @@ size_t ColumnFileSetReader::readRows(MutableColumns & output_columns, size_t off
}
}
}
for (const auto & col : output_columns)
{
context.scan_context->total_user_read_bytes += col->byteSize();
}
return actual_read;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ColumnFileSetReader
friend class ColumnFileSetInputStream;

private:
const DMContext & context;
ColumnFileSetSnapshotPtr snapshot;

// The columns expected to read. Note that we will do reading exactly in this column order.
Expand All @@ -40,7 +41,9 @@ class ColumnFileSetReader
std::vector<ColumnFileReaderPtr> column_file_readers;

private:
ColumnFileSetReader() = default;
explicit ColumnFileSetReader(const DMContext & context_)
: context(context_)
{}

Block readPKVersion(size_t offset, size_t limit);

Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/ScanContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ScanContext
std::atomic<uint64_t> total_remote_region_num{0};
std::atomic<uint64_t> total_local_region_num{0};

std::atomic<uint64_t> total_user_read_bytes{0};

ScanContext() = default;

Expand All @@ -62,6 +63,7 @@ class ScanContext
total_create_snapshot_time_ns = tiflash_scan_context_pb.total_create_snapshot_time_ms() * 1000000;
total_remote_region_num = tiflash_scan_context_pb.total_remote_region_num();
total_local_region_num = tiflash_scan_context_pb.total_local_region_num();
total_user_read_bytes = tiflash_scan_context_pb.total_user_read_bytes();
}

tipb::TiFlashScanContext serialize()
Expand All @@ -76,6 +78,7 @@ class ScanContext
tiflash_scan_context_pb.set_total_create_snapshot_time_ms(total_create_snapshot_time_ns / 1000000);
tiflash_scan_context_pb.set_total_remote_region_num(total_remote_region_num);
tiflash_scan_context_pb.set_total_local_region_num(total_local_region_num);
tiflash_scan_context_pb.set_total_user_read_bytes(total_user_read_bytes);
return tiflash_scan_context_pb;
}

Expand All @@ -90,6 +93,7 @@ class ScanContext
total_create_snapshot_time_ns += other.total_create_snapshot_time_ns;
total_local_region_num += other.total_local_region_num;
total_remote_region_num += other.total_remote_region_num;
total_user_read_bytes += other.total_user_read_bytes;
}

void merge(const tipb::TiFlashScanContext & other)
Expand All @@ -103,6 +107,7 @@ class ScanContext
total_create_snapshot_time_ns += other.total_create_snapshot_time_ms() * 1000000;
total_local_region_num += other.total_local_region_num();
total_remote_region_num += other.total_remote_region_num();
total_user_read_bytes += other.total_user_read_bytes();
}
};

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ bool Segment::isDefinitelyEmpty(DMContext & dm_context, const SegmentSnapshotPtr
streams.push_back(stream);
}

BlockInputStreamPtr stable_stream = std::make_shared<ConcatSkippableBlockInputStream<>>(streams);
BlockInputStreamPtr stable_stream = std::make_shared<ConcatSkippableBlockInputStream<>>(streams, dm_context.scan_context);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, read_ranges, 0);
stable_stream->readPrefix();
while (true)
Expand Down Expand Up @@ -2673,7 +2673,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_contex
is_common_handle,
dm_context.tracing_id);
bitmap_filter->set(stream);
LOG_DEBUG(log, "buildBitmapFilterStableOnly total_rows={}, cost={}ms", segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds());
LOG_DEBUG(log, "buildBitmapFilterStableOnly read_packs={} total_rows={} cost={}ms", some_packs_sets.size(), segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds());
return bitmap_filter;
}

Expand Down
16 changes: 14 additions & 2 deletions dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,19 @@ template <bool need_row_id = false>
class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
{
public:
explicit ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_)
ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, const ScanContextPtr & scan_context_)
: rows(inputs_.size(), 0)
, precede_stream_rows(0)
, scan_context(scan_context_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
}

ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, std::vector<size_t> && rows_)
ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, std::vector<size_t> && rows_, const ScanContextPtr & scan_context_)
: rows(std::move(rows_))
, precede_stream_rows(0)
, scan_context(scan_context_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
Expand Down Expand Up @@ -154,6 +156,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
if (res)
{
res.setStartOffset(res.startOffset() + precede_stream_rows);
addReadBytes(res.bytes());
break;
}
else
Expand Down Expand Up @@ -181,6 +184,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
{
res.setSegmentRowIdCol(createSegmentRowIdCol(res.startOffset(), res.rows()));
}
addReadBytes(res.bytes());
break;
}
else
Expand All @@ -206,9 +210,17 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
}
return seg_row_id_col;
}
void addReadBytes(UInt64 bytes)
{
if (likely(scan_context != nullptr))
{
scan_context->total_user_read_bytes += bytes;
}
}
BlockInputStreams::iterator current_stream;
std::vector<size_t> rows;
size_t precede_stream_rows;
const ScanContextPtr scan_context;
};

} // namespace DM
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,11 @@ StableValueSpace::Snapshot::getInputStream(
}
if (need_row_id)
{
return std::make_shared<ConcatSkippableBlockInputStream</*need_row_id*/ true>>(streams, std::move(rows));
return std::make_shared<ConcatSkippableBlockInputStream</*need_row_id*/ true>>(streams, std::move(rows), context.scan_context);
}
else
{
return std::make_shared<ConcatSkippableBlockInputStream</*need_row_id*/ false>>(streams, std::move(rows));
return std::make_shared<ConcatSkippableBlockInputStream</*need_row_id*/ false>>(streams, std::move(rows), context.scan_context);
}
}

Expand Down