From 2e8feda2a1b2af1533e36bbcd05dc2d04d6e4633 Mon Sep 17 00:00:00 2001 From: hongyunyan <649330952@qq.com> Date: Fri, 20 Jan 2023 11:57:49 +0800 Subject: [PATCH] support new item in ScanContext (#6648) ref pingcap/tiflash#5926 Signed-off-by: ywqzzy <592838129@qq.com> --- contrib/tipb | 2 +- .../Flash/Coprocessor/DAGStorageInterpreter.cpp | 15 +++++++++------ .../src/Flash/Coprocessor/DAGStorageInterpreter.h | 2 +- .../tests/gtest_ti_remote_block_inputstream.cpp | 2 ++ dbms/src/Storages/DeltaMerge/ScanContext.h | 12 ++++++++++++ 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/contrib/tipb b/contrib/tipb index 2fb82891081..f8511db2f07 160000 --- a/contrib/tipb +++ b/contrib/tipb @@ -1 +1 @@ -Subproject commit 2fb8289108131ccd5faf86d3e92fb05d4ff3b326 +Subproject commit f8511db2f07273a60d3bbe3e202c28bbdf69ce50 diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 8d8a9ca5e58..34bd6ea4ed8 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -319,18 +319,19 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline) void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) { + auto scan_context = std::make_shared(); + dagContext().scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; + mvcc_query_info->scan_context = scan_context; + if (!mvcc_query_info->regions_query_info.empty()) { - auto scan_context = std::make_shared(); - dagContext().scan_context_map[table_scan.getTableScanExecutorID()] = scan_context; - mvcc_query_info->scan_context = scan_context; buildLocalStreams(pipeline, settings.max_block_size); } // Should build `remote_requests` and `null_stream` under protect of `table_structure_lock`. auto null_stream_if_empty = std::make_shared(storage_for_logical_table->getSampleBlockForColumns(required_columns)); - auto remote_requests = buildRemoteRequests(); + auto remote_requests = buildRemoteRequests(scan_context); // A failpoint to test pause before alter lock released FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired); @@ -736,6 +737,7 @@ void DAGStorageInterpreter::buildLocalStreamsForPhysicalTable( } catch (RegionException & e) { + query_info.mvcc_query_info->scan_context->total_local_region_num -= e.unavailable_region.size(); /// Recover from region exception for batchCop/MPP if (dag_context.isBatchCop() || dag_context.isMPPTask()) { @@ -768,6 +770,7 @@ void DAGStorageInterpreter::buildLocalStreams(DAGPipeline & pipeline, size_t max size_t total_local_region_num = mvcc_query_info->regions_query_info.size(); if (total_local_region_num == 0) return; + mvcc_query_info->scan_context->total_local_region_num = total_local_region_num; const auto table_query_infos = generateSelectQueryInfos(); bool has_multiple_partitions = table_query_infos.size() > 1; // MultiPartitionStreamPool will be disabled in no partition mode or single-partition case @@ -1010,7 +1013,7 @@ std::tuple> DAGStorageIn } // Build remote requests from `region_retry_from_local_region` and `table_regions_info.remote_regions` -std::vector DAGStorageInterpreter::buildRemoteRequests() +std::vector DAGStorageInterpreter::buildRemoteRequests(const DM::ScanContextPtr & scan_context) { std::vector remote_requests; std::unordered_map region_id_to_table_id_map; @@ -1034,7 +1037,7 @@ std::vector DAGStorageInterpreter::buildRemoteRequests() const auto & retry_regions = retry_regions_map[physical_table_id]; if (retry_regions.empty()) continue; - + scan_context->total_remote_region_num += retry_regions.size(); // Append the region into DAGContext to return them to the upper layer. // The upper layer should refresh its cache about these regions. for (const auto & r : retry_regions) diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index 66e703e7fc6..a965ccf5de0 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -81,7 +81,7 @@ class DAGStorageInterpreter std::tuple> getColumnsForTableScan(Int64 max_columns_to_read); - std::vector buildRemoteRequests(); + std::vector buildRemoteRequests(const DM::ScanContextPtr & scan_context); TableLockHolders releaseAlterLocks(); diff --git a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp index 758f83c69a1..1561a60f31d 100644 --- a/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp +++ b/dbms/src/Flash/Coprocessor/tests/gtest_ti_remote_block_inputstream.cpp @@ -80,6 +80,8 @@ struct MockWriter summary.scan_context->total_dmfile_rough_set_index_load_time_ns = 10; summary.scan_context->total_dmfile_read_time_ns = 200; summary.scan_context->total_create_snapshot_time_ns = 5; + summary.scan_context->total_local_region_num = 10; + summary.scan_context->total_remote_region_num = 5; return summary; } diff --git a/dbms/src/Storages/DeltaMerge/ScanContext.h b/dbms/src/Storages/DeltaMerge/ScanContext.h index 8513ff17a35..4f3cbebb10f 100644 --- a/dbms/src/Storages/DeltaMerge/ScanContext.h +++ b/dbms/src/Storages/DeltaMerge/ScanContext.h @@ -45,6 +45,10 @@ class ScanContext std::atomic total_dmfile_read_time_ns{0}; std::atomic total_create_snapshot_time_ns{0}; + std::atomic total_remote_region_num{0}; + std::atomic total_local_region_num{0}; + + ScanContext() = default; void deserialize(const tipb::TiFlashScanContext & tiflash_scan_context_pb) @@ -56,6 +60,8 @@ class ScanContext total_dmfile_rough_set_index_load_time_ns = tiflash_scan_context_pb.total_dmfile_rough_set_index_load_time_ms() * 1000000; total_dmfile_read_time_ns = tiflash_scan_context_pb.total_dmfile_read_time_ms() * 1000000; total_create_snapshot_time_ns = tiflash_scan_context_pb.total_create_snapshot_time_ms() * 1000000; + total_remote_region_num = tiflash_scan_context_pb.total_remote_region_num(); + total_local_region_num = tiflash_scan_context_pb.total_local_region_num(); } tipb::TiFlashScanContext serialize() @@ -68,6 +74,8 @@ class ScanContext tiflash_scan_context_pb.set_total_dmfile_rough_set_index_load_time_ms(total_dmfile_rough_set_index_load_time_ns / 1000000); tiflash_scan_context_pb.set_total_dmfile_read_time_ms(total_dmfile_read_time_ns / 1000000); tiflash_scan_context_pb.set_total_create_snapshot_time_ms(total_create_snapshot_time_ns / 1000000); + tiflash_scan_context_pb.set_total_remote_region_num(total_remote_region_num); + tiflash_scan_context_pb.set_total_local_region_num(total_local_region_num); return tiflash_scan_context_pb; } @@ -80,6 +88,8 @@ class ScanContext total_dmfile_rough_set_index_load_time_ns += other.total_dmfile_rough_set_index_load_time_ns; total_dmfile_read_time_ns += other.total_dmfile_read_time_ns; total_create_snapshot_time_ns += other.total_create_snapshot_time_ns; + total_local_region_num += other.total_local_region_num; + total_remote_region_num += other.total_remote_region_num; } void merge(const tipb::TiFlashScanContext & other) @@ -91,6 +101,8 @@ class ScanContext total_dmfile_rough_set_index_load_time_ns += other.total_dmfile_rough_set_index_load_time_ms() * 1000000; total_dmfile_read_time_ns += other.total_dmfile_read_time_ms() * 1000000; total_create_snapshot_time_ns += other.total_create_snapshot_time_ms() * 1000000; + total_local_region_num += other.total_local_region_num(); + total_remote_region_num += other.total_remote_region_num(); } };