Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storages: Fix read ranges of FastScan (#9119) #9123

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -737,13 +737,7 @@ BlockInputStreamPtr Segment::getInputStreamModeNormal(const DMContext & dm_conte

auto read_info = getReadInfo(dm_context, columns_to_read, segment_snap, read_ranges, max_version);

RowKeyRanges real_ranges;
for (const auto & read_range : read_ranges)
{
auto real_range = rowkey_range.shrink(read_range);
if (!real_range.none())
real_ranges.emplace_back(std::move(real_range));
}
auto real_ranges = shrinkRowKeyRanges(read_ranges);
if (real_ranges.empty())
return std::make_shared<EmptyBlockInputStream>(toEmptyBlock(*read_info.read_columns));

Expand Down Expand Up @@ -864,10 +858,16 @@ BlockInputStreamPtr Segment::getInputStreamModeFast(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
size_t expected_block_size)
{
auto real_ranges = shrinkRowKeyRanges(read_ranges);
if (real_ranges.empty())
{
return std::make_shared<EmptyBlockInputStream>(toEmptyBlock(columns_to_read));
}

auto new_columns_to_read = std::make_shared<ColumnDefines>();

// new_columns_to_read need at most columns_to_read.size() + 2, due to may extra insert into the handle column and del_mark column.
Expand Down Expand Up @@ -911,7 +911,7 @@ BlockInputStreamPtr Segment::getInputStreamModeFast(
BlockInputStreamPtr stable_stream = segment_snap->stable->getInputStream(
dm_context,
*new_columns_to_read,
data_ranges,
real_ranges,
filter,
std::numeric_limits<UInt64>::max(),
expected_block_size,
Expand All @@ -922,8 +922,8 @@ BlockInputStreamPtr Segment::getInputStreamModeFast(
BlockInputStreamPtr delta_stream = std::make_shared<DeltaValueInputStream>(dm_context, segment_snap->delta, new_columns_to_read, this->rowkey_range);

// Do row key filtering based on data_ranges.
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, data_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, data_ranges, 0);
delta_stream = std::make_shared<DMRowKeyFilterBlockInputStream<false>>(delta_stream, real_ranges, 0);
stable_stream = std::make_shared<DMRowKeyFilterBlockInputStream<true>>(stable_stream, real_ranges, 0);

// Filter the unneeded column and filter out the rows whose del_mark is true.
delta_stream = std::make_shared<DMDeleteFilterBlockInputStream>(delta_stream, columns_to_read, dm_context.tracing_id);
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class Segment
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & data_ranges,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1228,6 +1228,113 @@ try
}
}
CATCH


TEST_P(DeltaMergeStoreRWTest, TestFastScanWithLogicalSplit)
try
{
constexpr auto num_write_rows = 32;
auto table_column_defines = DMTestEnv::getDefaultColumns();
store = reload(table_column_defines);

//Test write multi blocks without overlap and do not compact
{
auto block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false);
auto block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false);
auto block3 = DMTestEnv::prepareSimpleWriteBlock(2 * num_write_rows, 3 * num_write_rows, false);
switch (mode)
{
case TestMode::V1_BlockOnly:
case TestMode::V2_BlockOnly:
{
store->write(*db_context, db_context->getSettingsRef(), block1);
store->write(*db_context, db_context->getSettingsRef(), block2);
store->write(*db_context, db_context->getSettingsRef(), block3);
break;
}
case TestMode::V2_FileOnly:
{
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto [range1, file_ids1] = genDMFile(*dm_context, block1);
auto [range2, file_ids2] = genDMFile(*dm_context, block2);
auto [range3, file_ids3] = genDMFile(*dm_context, block3);
auto range = range1.merge(range2).merge(range3);
auto file_ids = file_ids1;
file_ids.insert(file_ids.cend(), file_ids2.begin(), file_ids2.end());
file_ids.insert(file_ids.cend(), file_ids3.begin(), file_ids3.end());
store->ingestFiles(dm_context, range, file_ids, false);
break;
}
case TestMode::V2_Mix:
{
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto [range1, file_ids1] = genDMFile(*dm_context, block1);
auto [range3, file_ids3] = genDMFile(*dm_context, block3);
auto range = range1.merge(range3);
auto file_ids = file_ids1;
file_ids.insert(file_ids.cend(), file_ids3.begin(), file_ids3.end());
store->ingestFiles(dm_context, range, file_ids, false);
store->write(*db_context, db_context->getSettingsRef(), block2);

break;
}
}

store->flushCache(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));
}

store->compact(*db_context, RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()));

store->mergeDeltaAll(*db_context);

auto fastscan_rows = [&]() {
const auto & columns = store->getTableColumns();
BlockInputStreamPtr in = store->read(
*db_context,
db_context->getSettingsRef(),
columns,
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
/* num_streams= */ 1,
/* start_ts= */ std::numeric_limits<UInt64>::max(),
EMPTY_FILTER,
TRACING_NAME,
/* keep_order= */ false,
/* is_fast_scan= */ true,
/* expected_block_size= */ 1024)[0];
size_t rows = 0;
in->readPrefix();
while (true)
{
auto b = in->read();
if (!b)
{
break;
}
rows += b.rows();
}
return rows;
};

auto before_split = fastscan_rows();

ASSERT_EQ(store->segments.size(), 1);
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
auto old = store->segments.begin()->second;
auto [left, right] = store->segmentSplit(
*dm_context,
old,
DeltaMergeStore::SegmentSplitReason::ForegroundWrite,
std::nullopt,
DeltaMergeStore::SegmentSplitMode::Logical);
ASSERT_NE(left, nullptr);
ASSERT_NE(right, nullptr);
ASSERT_EQ(store->segments.size(), 2);

auto after_split = fastscan_rows();

ASSERT_EQ(before_split, after_split);
}
CATCH
} // namespace tests
} // namespace DM
} // namespace DB