Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4955
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lidezhu committed Jul 5, 2022
1 parent 00edd82 commit 0a4a29b
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 9 deletions.
23 changes: 23 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le
{
LOG_DEBUG(log, "Merge Segment [" << left->info() << "] and [" << right->info() << "], safe point:" << dm_context.min_version);

/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return;
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return;
}
}

SegmentSnapshotPtr left_snap;
SegmentSnapshotPtr right_snap;

Expand Down
48 changes: 46 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o
return delta->appendToCache(dm_context, block, offset, limit);
}

bool Segment::write(DMContext & dm_context, const Block & block)
bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache)
{
LOG_TRACE(log, "Segment [" << segment_id << "] write to disk rows: " << block.rows());
WriteBatches wbs(dm_context.storage_pool);
Expand All @@ -236,7 +236,14 @@ bool Segment::write(DMContext & dm_context, const Block & block)

if (delta->appendToDisk(dm_context, pack))
{
flushCache(dm_context);
if (flush_cache)
{
while (!flushCache(dm_context))
{
if (hasAbandoned())
return false;
}
}
return true;
}
else
Expand Down Expand Up @@ -878,7 +885,34 @@ SegmentPair Segment::applySplit(DMContext & dm_context, //

SegmentPtr Segment::merge(DMContext & dm_context, const SegmentPtr & left, const SegmentPtr & right)
{
<<<<<<< HEAD
WriteBatches wbs(dm_context.storage_pool);
=======
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_FMT_DEBUG(left->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return {};
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_FMT_DEBUG(right->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return {};
}
}

>>>>>>> 94afb714ed (flush cache before segment merge (#4955))

auto left_snap = left->createSnapshot(dm_context, true);
auto right_snap = right->createSnapshot(dm_context, true);
Expand All @@ -899,8 +933,18 @@ SegmentPtr Segment::merge(DMContext & dm_context, const SegmentPtr & left, const
return merged;
}

<<<<<<< HEAD
StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
const SegmentPtr & left,
=======
/// Segments may contain some rows that not belong to its range which is left by previous split operation.
/// And only saved data in the segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So remember to do a flush for the segments before merge.
StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
const ColumnDefinesPtr & schema_snap,
const SegmentPtr & left,
>>>>>>> 94afb714ed (flush cache before segment merge (#4955))
const SegmentSnapshotPtr & left_snap,
const SegmentPtr & right,
const SegmentSnapshotPtr & right_snap,
Expand Down
49 changes: 49 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class Segment : private boost::noncopyable

bool writeToDisk(DMContext & dm_context, const PackPtr & pack);
bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit);
<<<<<<< HEAD
bool write(DMContext & dm_context, const Block & block); // For test only
bool write(DMContext & dm_context, const HandleRange & delete_range);

Expand Down Expand Up @@ -125,6 +126,54 @@ class Segment : private boost::noncopyable
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

BlockInputStreamPtr getInputStreamRaw(const DMContext & dm_context, const ColumnDefines & columns_to_read);
=======

/// For test only.
bool write(DMContext & dm_context, const Block & block, bool flush_cache = true);

bool write(DMContext & dm_context, const RowKeyRange & delete_range);
bool ingestColumnFiles(DMContext & dm_context, const RowKeyRange & range, const ColumnFiles & column_files, bool clear_data_in_range);

SegmentSnapshotPtr createSnapshot(const DMContext & dm_context, bool for_update, CurrentMetrics::Metric metric) const;

BlockInputStreamPtr getInputStream(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter,
UInt64 max_version,
size_t expected_block_size);

BlockInputStreamPtr getInputStream(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const RowKeyRanges & read_ranges,
const RSOperatorPtr & filter = {},
UInt64 max_version = std::numeric_limits<UInt64>::max(),
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

/// Return a stream which is suitable for exporting data.
/// reorganize_block: put those rows with the same pk rows into the same block or not.
BlockInputStreamPtr getInputStreamForDataExport(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
const RowKeyRange & data_range,
size_t expected_block_size = DEFAULT_BLOCK_SIZE,
bool reorganize_block = true) const;

BlockInputStreamPtr getInputStreamRaw(
const DMContext & dm_context,
const ColumnDefines & columns_to_read,
const SegmentSnapshotPtr & segment_snap,
bool do_range_filter,
size_t expected_block_size = DEFAULT_BLOCK_SIZE);

BlockInputStreamPtr getInputStreamRaw(
const DMContext & dm_context,
const ColumnDefines & columns_to_read);
>>>>>>> 94afb714ed (flush cache before segment merge (#4955))

/// For those split, merge and mergeDelta methods, we should use prepareXXX/applyXXX combo in real production.
/// split(), merge() and mergeDelta() are only used in test cases.
Expand Down
32 changes: 25 additions & 7 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,17 @@ CATCH
TEST_F(Segment_test, Split)
try
{
const size_t num_rows_write = 100;
const size_t num_rows_write_per_batch = 100;
const size_t num_rows_write = num_rows_write_per_batch * 2;
{
// write to segment
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
segment->write(dmContext(), std::move(block));
// write to segment and flush
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), true);
}
{
// write to segment and don't flush
Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), false);
}

{
Expand Down Expand Up @@ -664,7 +670,11 @@ try
size_t num_rows_seg2 = 0;
{
{
<<<<<<< HEAD
auto in = segment->getInputStream(dmContext(), *tableColumns());
=======
auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()});
>>>>>>> 94afb714ed (flush cache before segment merge (#4955))
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -673,7 +683,11 @@ try
in->readSuffix();
}
{
<<<<<<< HEAD
auto in = segment->getInputStream(dmContext(), *tableColumns());
=======
auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()});
>>>>>>> 94afb714ed (flush cache before segment merge (#4955))
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -684,9 +698,13 @@ try
ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write);
}

// delete rows in the right segment
{
new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange());
new_segment->flushCache(dmContext());
}

// merge segments
// TODO: enable merge test!
if (false)
{
segment = Segment::merge(dmContext(), segment, new_segment);
{
Expand All @@ -705,7 +723,7 @@ try
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, num_rows_write);
EXPECT_EQ(num_rows_read, num_rows_seg1);
}
}
}
Expand Down

0 comments on commit 0a4a29b

Please sign in to comment.