Skip to content

Commit

Permalink
Storages: Fix read ranges of FastScan (#9119) (#9122)
Browse files Browse the repository at this point in the history
close #9118

Shrinking the read ranges of FastScan.

Co-authored-by: jinhelin <[email protected]>
  • Loading branch information
ti-chi-bot and JinheLin authored Jun 5, 2024
1 parent 4951e18 commit 72742dc
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 12 deletions.
22 changes: 11 additions & 11 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -878,13 +878,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal(
auto read_tag = need_row_id ? ReadTag::MVCC : ReadTag::Query;
auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, read_tag, max_version);

RowKeyRanges real_ranges;
for (const auto & read_range : read_ranges)
{
auto real_range = rowkey_range.shrink(read_range);
if (!real_range.none())
real_ranges.emplace_back(std::move(real_range));
}
auto real_ranges = shrinkRowKeyRanges(read_ranges);
if (real_ranges.empty())
return std::make_shared<EmptyBlockInputStream>(toEmptyBlock(*read_info.read_columns));

Expand Down Expand Up @@ -1024,10 +1018,16 @@ BlockInputStreamPtr Segment::getInputStreamModeFast(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
size_t expected_block_size)
{
auto real_ranges = shrinkRowKeyRanges(read_ranges);
if (real_ranges.empty())
{
return std::make_shared<EmptyBlockInputStream>(toEmptyBlock(columns_to_read));
}

auto new_columns_to_read = std::make_shared<ColumnDefines>();

// new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column.
Expand Down Expand Up @@ -1071,7 +1071,7 @@ BlockInputStreamPtr Segment::getInputStreamModeFast(
BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
data_ranges,
real_ranges,
filter,
std::numeric_limits<UInt64>::max(),
expected_block_size,
Expand All @@ -1088,8 +1088,8 @@ BlockInputStreamPtr Segment::getInputStreamModeFast(
ReadTag::Query);

// Do row key filtering based on data_ranges.
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, real_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, real_ranges, 0);

// Filter the unneeded column and filter out the rows whose del_mark is true.
delta_stream
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ class Segment
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,118 @@ try
}
}
CATCH


TEST_P(DeltaMergeStoreRWTest, TestFastScanWithLogicalSplit)
try
{
constexpr auto num_write_rows = 32;
auto table_column_defines = DMTestEnv::getDefaultColumns();
store = reload(table_column_defines);

//Test write multi blocks without overlap and do not compact
{
auto block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false);
auto block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false);
auto block3 = DMTestEnv::prepareSimpleWriteBlock(2 * num_write_rows, 3 * num_write_rows, false);
switch (mode)
{
case TestMode::V1_BlockOnly:
case TestMode::V2_BlockOnly:
case TestMode::V3_BlockOnly:
{
store->write(*db_context, db_context->getSettingsRef(), block1);
store->write(*db_context, db_context->getSettingsRef(), block2);
store->write(*db_context, db_context->getSettingsRef(), block3);
break;
}
case TestMode::V2_FileOnly:
case TestMode::V3_FileOnly:
{
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto [range1, file_ids1] = genDMFile(*dm_context, block1);
auto [range2, file_ids2] = genDMFile(*dm_context, block2);
auto [range3, file_ids3] = genDMFile(*dm_context, block3);
auto range = range1.merge(range2).merge(range3);
auto file_ids = file_ids1;
file_ids.insert(file_ids.cend(), file_ids2.begin(), file_ids2.end());
file_ids.insert(file_ids.cend(), file_ids3.begin(), file_ids3.end());
store->ingestFiles(dm_context, range, file_ids, false);
break;
}
case TestMode::V2_Mix:
case TestMode::V3_Mix:
{
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto [range1, file_ids1] = genDMFile(*dm_context, block1);
auto [range3, file_ids3] = genDMFile(*dm_context, block3);
auto range = range1.merge(range3);
auto file_ids = file_ids1;
file_ids.insert(file_ids.cend(), file_ids3.begin(), file_ids3.end());
store->ingestFiles(dm_context, range, file_ids, false);
store->write(*db_context, db_context->getSettingsRef(), block2);

break;
}
}

store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
}

store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

store->mergeDeltaAll(*db_context);

auto fastscan_rows = [&]() {
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(
*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
std::vector<RuntimeFilterPtr>{},
0,
TRACING_NAME,
/* keep_order= */ false,
/* is_fast_scan= */ true,
/* expected_block_size= */ 1024)[0];
size_t rows = 0;
in->readPrefix();
while (true)
{
auto b = in->read();
if (!b)
{
break;
}
rows += b.rows();
}
return rows;
};

auto before_split = fastscan_rows();

ASSERT_EQ(store->segments.size(), 1);
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto old = store->segments.begin()->second;
auto [left, right] = store->segmentSplit(
*dm_context,
old,
DeltaMergeStore::SegmentSplitReason::ForegroundWrite,
std::nullopt,
DeltaMergeStore::SegmentSplitMode::Logical);
ASSERT_NE(left, nullptr);
ASSERT_NE(right, nullptr);
ASSERT_EQ(store->segments.size(), 2);

auto after_split = fastscan_rows();

ASSERT_EQ(before_split, after_split);
}
CATCH
} // namespace tests
} // namespace DM
} // namespace DB

0 comments on commit 72742dc

Please sign in to comment.