Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4955
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
lidezhu authored and ti-chi-bot committed May 23, 2022
1 parent 25545c4 commit 8b892a2
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 11 deletions.
23 changes: 23 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,29 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le
right->info(),
dm_context.min_version);

/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return;
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return;
}
}

SegmentSnapshotPtr left_snap;
SegmentSnapshotPtr right_snap;
ColumnDefinesPtr schema_snap;
Expand Down
38 changes: 36 additions & 2 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o
return delta->appendToCache(dm_context, block, offset, limit);
}

bool Segment::write(DMContext & dm_context, const Block & block)
bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache)
{
LOG_FMT_TRACE(log, "Segment [{}] write to disk rows: {}", segment_id, block.rows());
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
Expand All @@ -315,7 +315,14 @@ bool Segment::write(DMContext & dm_context, const Block & block)

if (delta->appendColumnFile(dm_context, column_file))
{
flushCache(dm_context);
if (flush_cache)
{
while (!flushCache(dm_context))
{
if (hasAbandoned())
return false;
}
}
return true;
}
else
Expand Down Expand Up @@ -1130,6 +1137,29 @@ SegmentPair Segment::applySplit(DMContext & dm_context, //
SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, const SegmentPtr & right)
{
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
/// This segment may contain some rows that not belong to this segment range which is left by previous split operation.
/// And only saved data in this segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So we flush here to make sure that all potential data left by previous split operation is saved.
while (!left->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (left->hasAbandoned())
{
LOG_FMT_DEBUG(left->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return {};
}
}
while (!right->flushCache(dm_context))
{
// keep flush until success if not abandoned
if (right->hasAbandoned())
{
LOG_FMT_DEBUG(right->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId());
return {};
}
}


auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge);
auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge);
Expand All @@ -1150,6 +1180,10 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem
return merged;
}

/// Segments may contain some rows that not belong to its range which is left by previous split operation.
/// And only saved data in the segment will be filtered by the segment range in the merge process,
/// unsaved data will be directly copied to the new segment.
/// So remember to do a flush for the segments before merge.
StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, //
const ColumnDefinesPtr & schema_snap,
const SegmentPtr & left,
Expand Down
7 changes: 7 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,14 @@ class Segment : private boost::noncopyable

bool writeToDisk(DMContext & dm_context, const ColumnFilePtr & column_file);
bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit);
<<<<<<< HEAD
bool write(DMContext & dm_context, const Block & block); // For test only
=======

/// For test only.
bool write(DMContext & dm_context, const Block & block, bool flush_cache = true);

>>>>>>> 94afb714ed (flush cache before segment merge (#4955))
bool write(DMContext & dm_context, const RowKeyRange & delete_range);
bool ingestColumnFiles(DMContext & dm_context, const RowKeyRange & range, const ColumnFiles & column_files, bool clear_data_in_range);

Expand Down
28 changes: 19 additions & 9 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,11 +916,17 @@ CATCH
TEST_F(Segment_test, Split)
try
{
const size_t num_rows_write = 100;
const size_t num_rows_write_per_batch = 100;
const size_t num_rows_write = num_rows_write_per_batch * 2;
{
// write to segment
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false);
segment->write(dmContext(), std::move(block));
// write to segment and flush
Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), true);
}
{
// write to segment and don't flush
Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false);
segment->write(dmContext(), std::move(block), false);
}

{
Expand Down Expand Up @@ -956,7 +962,7 @@ try
size_t num_rows_seg2 = 0;
{
{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()});
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -965,7 +971,7 @@ try
in->readSuffix();
}
{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()});
in->readPrefix();
while (Block block = in->read())
{
Expand All @@ -976,9 +982,13 @@ try
ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write);
}

// delete rows in the right segment
{
new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange());
new_segment->flushCache(dmContext());
}

// merge segments
// TODO: enable merge test!
if (false)
{
segment = Segment::merge(dmContext(), tableColumns(), segment, new_segment);
{
Expand All @@ -997,7 +1007,7 @@ try
num_rows_read += block.rows();
}
in->readSuffix();
EXPECT_EQ(num_rows_read, num_rows_write);
EXPECT_EQ(num_rows_read, num_rows_seg1);
}
}
}
Expand Down

0 comments on commit 8b892a2

Please sign in to comment.