Skip to content

Commit

Permalink
Add total_user_read_bytes in ScanContext and report read RU (#7534)
Browse files Browse the repository at this point in the history
ref #7491
  • Loading branch information
JinheLin authored May 24, 2023
1 parent b6629f1 commit 1a19912
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 20 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
12 changes: 12 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,16 @@ const SingleTableRegions & DAGContext::getTableRegionsInfoByTableID(Int64 table_
{
return tables_regions_info.getTableRegionInfoByTableID(table_id);
}

RU DAGContext::getReadRU() const
{
double ru = 0.0;
for (const auto & [id, sc] : scan_context_map)
{
(void)id; // Disable unused variable warnning.
ru += sc->getReadRU();
}
return std::ceil(ru);
}

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
#include <Flash/Coprocessor/DAGRequest.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Executor/toRU.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
#include <Parsers/makeDummyQuery.h>
#include <Storages/DeltaMerge/Remote/DisaggTaskId.h>
#include <Storages/DeltaMerge/ScanContext.h>
#include <Storages/Transaction/TiDB.h>

namespace DB
{
class Context;
Expand Down Expand Up @@ -273,6 +273,8 @@ class DAGContext

KeyspaceID getKeyspaceID() const { return keyspace_id; }

RU getReadRU() const;

DAGRequest dag_request;
/// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast,
/// dummy_query_string and dummy_ast is used for that
Expand Down
11 changes: 6 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,17 @@ try
}
}

auto ru = query_executor->collectRequestUnit();
auto cpu_ru = query_executor->collectRequestUnit();
auto read_ru = dag_context.getReadRU();
if constexpr (!batch)
{
LOG_INFO(log, "cop finish with request unit: {}", ru);
GET_METRIC(tiflash_compute_request_unit, type_cop).Increment(ru);
LOG_INFO(log, "cop finish with request unit: cpu={} read={}", cpu_ru, read_ru);
GET_METRIC(tiflash_compute_request_unit, type_cop).Increment(cpu_ru + read_ru);
}
else
{
LOG_INFO(log, "batch cop finish with request unit: {}", ru);
GET_METRIC(tiflash_compute_request_unit, type_batch).Increment(ru);
LOG_INFO(log, "batch cop finish with request unit: cpu={} read={}", cpu_ru, read_ru);
GET_METRIC(tiflash_compute_request_unit, type_batch).Increment(cpu_ru + read_ru);
}

if (auto throughput = dag_context.getTableScanThroughput(); throughput.first)
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,10 @@ void MPPTask::runImpl()
// finish receiver
receiver_set->close();
}
auto ru = query_executor_holder->collectRequestUnit();
LOG_INFO(log, "mpp finish with request unit: {}", ru);
GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(ru);
auto cpu_ru = query_executor_holder->collectRequestUnit();
auto read_ru = dag_context->getReadRU();
LOG_INFO(log, "mpp finish with request unit: cpu={} read={}", cpu_ru, read_ru);
GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(cpu_ru + read_ru);

mpp_task_statistics.collectRuntimeStatistics();

Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ std::pair<size_t, size_t> findColumnFile(const ColumnFiles & column_files, size_
}

ColumnFileSetReader::ColumnFileSetReader(
const DMContext & context,
const DMContext & context_,
const ColumnFileSetSnapshotPtr & snapshot_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_)
: snapshot(snapshot_)
: context(context_)
, snapshot(snapshot_)
, col_defs(col_defs_)
, segment_range(segment_range_)
{
Expand All @@ -94,7 +95,7 @@ ColumnFileSetReader::ColumnFileSetReader(

ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
{
auto * new_reader = new ColumnFileSetReader();
auto * new_reader = new ColumnFileSetReader(context);
new_reader->snapshot = snapshot;
new_reader->col_defs = new_col_defs;
new_reader->segment_range = segment_range;
Expand Down Expand Up @@ -171,6 +172,10 @@ size_t ColumnFileSetReader::readRows(MutableColumns & output_columns, size_t off
}
}
}
for (const auto & col : output_columns)
{
context.scan_context->total_user_read_bytes += col->byteSize();
}
return actual_read;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class ColumnFileSetReader
friend class ColumnFileSetInputStream;

private:
const DMContext & context;
ColumnFileSetSnapshotPtr snapshot;

// The columns expected to read. Note that we will do reading exactly in this column order.
Expand All @@ -40,7 +41,9 @@ class ColumnFileSetReader
std::vector<ColumnFileReaderPtr> column_file_readers;

private:
ColumnFileSetReader() = default;
explicit ColumnFileSetReader(const DMContext & context_)
: context(context_)
{}

Block readPKVersion(size_t offset, size_t limit);

Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/ScanContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ScanContext
std::atomic<uint64_t> total_remote_region_num{0};
std::atomic<uint64_t> total_local_region_num{0};

std::atomic<uint64_t> total_user_read_bytes{0};

ScanContext() = default;

Expand All @@ -62,6 +63,7 @@ class ScanContext
total_create_snapshot_time_ns = tiflash_scan_context_pb.total_create_snapshot_time_ms() * 1000000;
total_remote_region_num = tiflash_scan_context_pb.total_remote_region_num();
total_local_region_num = tiflash_scan_context_pb.total_local_region_num();
total_user_read_bytes = tiflash_scan_context_pb.total_user_read_bytes();
}

tipb::TiFlashScanContext serialize()
Expand All @@ -76,6 +78,7 @@ class ScanContext
tiflash_scan_context_pb.set_total_create_snapshot_time_ms(total_create_snapshot_time_ns / 1000000);
tiflash_scan_context_pb.set_total_remote_region_num(total_remote_region_num);
tiflash_scan_context_pb.set_total_local_region_num(total_local_region_num);
tiflash_scan_context_pb.set_total_user_read_bytes(total_user_read_bytes);
return tiflash_scan_context_pb;
}

Expand All @@ -90,6 +93,7 @@ class ScanContext
total_create_snapshot_time_ns += other.total_create_snapshot_time_ns;
total_local_region_num += other.total_local_region_num;
total_remote_region_num += other.total_remote_region_num;
total_user_read_bytes += other.total_user_read_bytes;
}

void merge(const tipb::TiFlashScanContext & other)
Expand All @@ -103,6 +107,14 @@ class ScanContext
total_create_snapshot_time_ns += other.total_create_snapshot_time_ms() * 1000000;
total_local_region_num += other.total_local_region_num();
total_remote_region_num += other.total_remote_region_num();
total_user_read_bytes += other.total_user_read_bytes();
}

// Reference: https://docs.pingcap.com/tidb/dev/tidb-resource-control
// For Read I/O, 1/64 RU per KB.
double getReadRU() const
{
return static_cast<double>(total_user_read_bytes) / 1024.0 / 64.0;
}
};

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ bool Segment::isDefinitelyEmpty(DMContext & dm_context, const SegmentSnapshotPtr
streams.push_back(stream);
}

BlockInputStreamPtr stable_stream = std::make_shared<ConcatSkippableBlockInputStream<>>(streams);
BlockInputStreamPtr stable_stream = std::make_shared<ConcatSkippableBlockInputStream<>>(streams, dm_context.scan_context);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, read_ranges, 0);
stable_stream->readPrefix();
while (true)
Expand Down Expand Up @@ -2673,7 +2673,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_contex
is_common_handle,
dm_context.tracing_id);
bitmap_filter->set(stream);
LOG_DEBUG(log, "buildBitmapFilterStableOnly total_rows={}, cost={}ms", segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds());
LOG_DEBUG(log, "buildBitmapFilterStableOnly read_packs={} total_rows={} cost={}ms", some_packs_sets.size(), segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds());
return bitmap_filter;
}

Expand Down
16 changes: 14 additions & 2 deletions dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,19 @@ template <bool need_row_id = false>
class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
{
public:
explicit ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_)
ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, const ScanContextPtr & scan_context_)
: rows(inputs_.size(), 0)
, precede_stream_rows(0)
, scan_context(scan_context_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
}

ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, std::vector<size_t> && rows_)
ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, std::vector<size_t> && rows_, const ScanContextPtr & scan_context_)
: rows(std::move(rows_))
, precede_stream_rows(0)
, scan_context(scan_context_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
current_stream = children.begin();
Expand Down Expand Up @@ -154,6 +156,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
if (res)
{
res.setStartOffset(res.startOffset() + precede_stream_rows);
addReadBytes(res.bytes());
break;
}
else
Expand Down Expand Up @@ -181,6 +184,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
{
res.setSegmentRowIdCol(createSegmentRowIdCol(res.startOffset(), res.rows()));
}
addReadBytes(res.bytes());
break;
}
else
Expand All @@ -206,9 +210,17 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream
}
return seg_row_id_col;
}
void addReadBytes(UInt64 bytes)
{
if (likely(scan_context != nullptr))
{
scan_context->total_user_read_bytes += bytes;
}
}
BlockInputStreams::iterator current_stream;
std::vector<size_t> rows;
size_t precede_stream_rows;
const ScanContextPtr scan_context;
};

} // namespace DM
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,11 +463,11 @@ StableValueSpace::Snapshot::getInputStream(
}
if (need_row_id)
{
return std::make_shared<ConcatSkippableBlockInputStream</*need_row_id*/ true>>(streams, std::move(rows));
return std::make_shared<ConcatSkippableBlockInputStream</*need_row_id*/ true>>(streams, std::move(rows), context.scan_context);
}
else
{
return std::make_shared<ConcatSkippableBlockInputStream</*need_row_id*/ false>>(streams, std::move(rows));
return std::make_shared<ConcatSkippableBlockInputStream</*need_row_id*/ false>>(streams, std::move(rows), context.scan_context);
}
}

Expand Down

0 comments on commit 1a19912

Please sign in to comment.