diff --git a/contrib/tipb b/contrib/tipb index e21afd2e405..1bbc3bbbd36 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit e21afd2e405d7b7918870f98a8f214bb1e2cafc6 +Subproject commit 1bbc3bbbd3692386c0e63a476e523f08933c58fa diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 49ea60e1943..8bf89b58844 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -339,4 +339,16 @@ const SingleTableRegions & DAGContext::getTableRegionsInfoByTableID(Int64 table_ { return tables_regions_info.getTableRegionInfoByTableID(table_id); } + +RU DAGContext::getReadRU() const +{ + double ru = 0.0; + for (const auto & [id, sc] : scan_context_map) + { + (void)id; // Disable unused variable warnning. + ru += sc->getReadRU(); + } + return std::ceil(ru); +} + } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGContext.h b/dbms/src/Flash/Coprocessor/DAGContext.h index e4de7150a84..06eafb738e4 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.h +++ b/dbms/src/Flash/Coprocessor/DAGContext.h @@ -29,13 +29,13 @@ #include #include #include +#include #include #include #include #include #include #include - namespace DB { class Context; @@ -273,6 +273,8 @@ class DAGContext KeyspaceID getKeyspaceID() const { return keyspace_id; } + RU getReadRU() const; + DAGRequest dag_request; /// Some existing code inherited from Clickhouse assume that each query must have a valid query string and query ast, /// dummy_query_string and dummy_ast is used for that diff --git a/dbms/src/Flash/Coprocessor/DAGDriver.cpp b/dbms/src/Flash/Coprocessor/DAGDriver.cpp index 39d01d16c69..f1239a474f6 100644 --- a/dbms/src/Flash/Coprocessor/DAGDriver.cpp +++ b/dbms/src/Flash/Coprocessor/DAGDriver.cpp @@ -154,16 +154,17 @@ try } } - auto ru = query_executor->collectRequestUnit(); + auto cpu_ru = query_executor->collectRequestUnit(); + auto read_ru = dag_context.getReadRU(); if constexpr (!batch) { - LOG_INFO(log, "cop finish with request unit: {}", ru); - GET_METRIC(tiflash_compute_request_unit, type_cop).Increment(ru); + LOG_INFO(log, "cop finish with request unit: cpu={} read={}", cpu_ru, read_ru); + GET_METRIC(tiflash_compute_request_unit, type_cop).Increment(cpu_ru + read_ru); } else { - LOG_INFO(log, "batch cop finish with request unit: {}", ru); - GET_METRIC(tiflash_compute_request_unit, type_batch).Increment(ru); + LOG_INFO(log, "batch cop finish with request unit: cpu={} read={}", cpu_ru, read_ru); + GET_METRIC(tiflash_compute_request_unit, type_batch).Increment(cpu_ru + read_ru); } if (auto throughput = dag_context.getTableScanThroughput(); throughput.first) diff --git a/dbms/src/Flash/Mpp/MPPTask.cpp b/dbms/src/Flash/Mpp/MPPTask.cpp index 5059b5d3d69..09d7a25c0b6 100644 --- a/dbms/src/Flash/Mpp/MPPTask.cpp +++ b/dbms/src/Flash/Mpp/MPPTask.cpp @@ -436,9 +436,10 @@ void MPPTask::runImpl() // finish receiver receiver_set->close(); } - auto ru = query_executor_holder->collectRequestUnit(); - LOG_INFO(log, "mpp finish with request unit: {}", ru); - GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(ru); + auto cpu_ru = query_executor_holder->collectRequestUnit(); + auto read_ru = dag_context->getReadRU(); + LOG_INFO(log, "mpp finish with request unit: cpu={} read={}", cpu_ru, read_ru); + GET_METRIC(tiflash_compute_request_unit, type_mpp).Increment(cpu_ru + read_ru); mpp_task_statistics.collectRuntimeStatistics(); diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp index 92cc280422c..e8a8a75b9e5 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.cpp @@ -74,11 +74,12 @@ std::pair findColumnFile(const ColumnFiles & column_files, size_ } ColumnFileSetReader::ColumnFileSetReader( - const DMContext & context, + const DMContext & context_, const ColumnFileSetSnapshotPtr & snapshot_, const ColumnDefinesPtr & col_defs_, const RowKeyRange & segment_range_) - : snapshot(snapshot_) + : context(context_) + , snapshot(snapshot_) , col_defs(col_defs_) , segment_range(segment_range_) { @@ -94,7 +95,7 @@ ColumnFileSetReader::ColumnFileSetReader( ColumnFileSetReaderPtr ColumnFileSetReader::createNewReader(const ColumnDefinesPtr & new_col_defs) { - auto * new_reader = new ColumnFileSetReader(); + auto * new_reader = new ColumnFileSetReader(context); new_reader->snapshot = snapshot; new_reader->col_defs = new_col_defs; new_reader->segment_range = segment_range; @@ -171,6 +172,10 @@ size_t ColumnFileSetReader::readRows(MutableColumns & output_columns, size_t off } } } + for (const auto & col : output_columns) + { + context.scan_context->total_user_read_bytes += col->byteSize(); + } return actual_read; } diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h index a8ec39fe5e5..2cab46d01d1 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h @@ -26,6 +26,7 @@ class ColumnFileSetReader friend class ColumnFileSetInputStream; private: + const DMContext & context; ColumnFileSetSnapshotPtr snapshot; // The columns expected to read. Note that we will do reading exactly in this column order. @@ -40,7 +41,9 @@ class ColumnFileSetReader std::vector column_file_readers; private: - ColumnFileSetReader() = default; + explicit ColumnFileSetReader(const DMContext & context_) + : context(context_) + {} Block readPKVersion(size_t offset, size_t limit); diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index 4f3cbebb10f..1152eed731d 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -48,6 +48,7 @@ class ScanContext std::atomic total_remote_region_num{0}; std::atomic total_local_region_num{0}; + std::atomic total_user_read_bytes{0}; ScanContext() = default; @@ -62,6 +63,7 @@ class ScanContext total_create_snapshot_time_ns = tiflash_scan_context_pb.total_create_snapshot_time_ms() * 1000000; total_remote_region_num = tiflash_scan_context_pb.total_remote_region_num(); total_local_region_num = tiflash_scan_context_pb.total_local_region_num(); + total_user_read_bytes = tiflash_scan_context_pb.total_user_read_bytes(); } tipb::TiFlashScanContext serialize() @@ -76,6 +78,7 @@ class ScanContext tiflash_scan_context_pb.set_total_create_snapshot_time_ms(total_create_snapshot_time_ns / 1000000); tiflash_scan_context_pb.set_total_remote_region_num(total_remote_region_num); tiflash_scan_context_pb.set_total_local_region_num(total_local_region_num); + tiflash_scan_context_pb.set_total_user_read_bytes(total_user_read_bytes); return tiflash_scan_context_pb; } @@ -90,6 +93,7 @@ class ScanContext total_create_snapshot_time_ns += other.total_create_snapshot_time_ns; total_local_region_num += other.total_local_region_num; total_remote_region_num += other.total_remote_region_num; + total_user_read_bytes += other.total_user_read_bytes; } void merge(const tipb::TiFlashScanContext & other) @@ -103,6 +107,14 @@ class ScanContext total_create_snapshot_time_ns += other.total_create_snapshot_time_ms() * 1000000; total_local_region_num += other.total_local_region_num(); total_remote_region_num += other.total_remote_region_num(); + total_user_read_bytes += other.total_user_read_bytes(); + } + + // Reference: https://docs.pingcap.com/tidb/dev/tidb-resource-control + // For Read I/O, 1/64 RU per KB. + double getReadRU() const + { + return static_cast(total_user_read_bytes) / 1024.0 / 64.0; } }; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index a87631f6ee9..7c3e97d7049 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -546,7 +546,7 @@ bool Segment::isDefinitelyEmpty(DMContext & dm_context, const SegmentSnapshotPtr streams.push_back(stream); } - BlockInputStreamPtr stable_stream = std::make_shared>(streams); + BlockInputStreamPtr stable_stream = std::make_shared>(streams, dm_context.scan_context); stable_stream = std::make_shared>(stable_stream, read_ranges, 0); stable_stream->readPrefix(); while (true) @@ -2673,7 +2673,7 @@ BitmapFilterPtr Segment::buildBitmapFilterStableOnly(const DMContext & dm_contex is_common_handle, dm_context.tracing_id); bitmap_filter->set(stream); - LOG_DEBUG(log, "buildBitmapFilterStableOnly total_rows={}, cost={}ms", segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds()); + LOG_DEBUG(log, "buildBitmapFilterStableOnly read_packs={} total_rows={} cost={}ms", some_packs_sets.size(), segment_snap->stable->getDMFilesRows(), sw.elapsedMilliseconds()); return bitmap_filter; } diff --git a/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h b/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h index d1f93d3c55f..9f566fdf083 100644 --- a/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/SkippableBlockInputStream.h @@ -74,17 +74,19 @@ template class ConcatSkippableBlockInputStream : public SkippableBlockInputStream { public: - explicit ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_) + ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, const ScanContextPtr & scan_context_) : rows(inputs_.size(), 0) , precede_stream_rows(0) + , scan_context(scan_context_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); current_stream = children.begin(); } - ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, std::vector && rows_) + ConcatSkippableBlockInputStream(SkippableBlockInputStreams inputs_, std::vector && rows_, const ScanContextPtr & scan_context_) : rows(std::move(rows_)) , precede_stream_rows(0) + , scan_context(scan_context_) { children.insert(children.end(), inputs_.begin(), inputs_.end()); current_stream = children.begin(); @@ -154,6 +156,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream if (res) { res.setStartOffset(res.startOffset() + precede_stream_rows); + addReadBytes(res.bytes()); break; } else @@ -181,6 +184,7 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream { res.setSegmentRowIdCol(createSegmentRowIdCol(res.startOffset(), res.rows())); } + addReadBytes(res.bytes()); break; } else @@ -206,9 +210,17 @@ class ConcatSkippableBlockInputStream : public SkippableBlockInputStream } return seg_row_id_col; } + void addReadBytes(UInt64 bytes) + { + if (likely(scan_context != nullptr)) + { + scan_context->total_user_read_bytes += bytes; + } + } BlockInputStreams::iterator current_stream; std::vector rows; size_t precede_stream_rows; + const ScanContextPtr scan_context; }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index 1dc7af9b1e4..ce8e5858b9d 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -463,11 +463,11 @@ StableValueSpace::Snapshot::getInputStream( } if (need_row_id) { - return std::make_shared>(streams, std::move(rows)); + return std::make_shared>(streams, std::move(rows), context.scan_context); } else { - return std::make_shared>(streams, std::move(rows)); + return std::make_shared>(streams, std::move(rows), context.scan_context); } }