From 2fb0c5b8b77619efeadeb7e31d7cfeb5798e086d Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 22 May 2022 17:20:42 +0800 Subject: [PATCH 1/6] flush cache before segment merge --- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 7 +++++ dbms/src/Storages/DeltaMerge/Segment.cpp | 17 +++++++++-- dbms/src/Storages/DeltaMerge/Segment.h | 3 +- .../DeltaMerge/tests/gtest_dm_segment.cpp | 28 +++++++++++++------ 4 files changed, 43 insertions(+), 12 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 997db601d1e..0a14974bbac 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1971,6 +1971,13 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le right->info(), dm_context.min_version); + /// This segment may contain some rows that not belong to this segment range which is left by previous split operation. + /// And only saved data in this segment will be filtered by the segment range in the merge process, + /// unsaved data will be directly copied to the new segment. + /// So we flush here to make sure that all potential data left by previous split operation is saved. + left->flushCache(dm_context); + right->flushCache(dm_context); + SegmentSnapshotPtr left_snap; SegmentSnapshotPtr right_snap; ColumnDefinesPtr schema_snap; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 9e195b2b25d..40a684693e6 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -306,7 +306,7 @@ bool Segment::writeToCache(DMContext & dm_context, const Block & block, size_t o return delta->appendToCache(dm_context, block, offset, limit); } -bool Segment::write(DMContext & dm_context, const Block & block) +bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cache) { LOG_FMT_TRACE(log, "Segment [{}] write to disk rows: {}", segment_id, block.rows()); WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); @@ -316,7 +316,10 @@ bool Segment::write(DMContext & dm_context, const Block & block) if (delta->appendColumnFile(dm_context, column_file)) { - flushCache(dm_context); + if (flush_cache) + { + flushCache(dm_context); + } return true; } else @@ -1129,6 +1132,12 @@ SegmentPair Segment::applySplit(DMContext & dm_context, // SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, const SegmentPtr & right) { WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); + /// This segment may contain some rows that not belong to this segment range which is left by previous split operation. + /// And only saved data in this segment will be filtered by the segment range in the merge process, + /// unsaved data will be directly copied to the new segment. + /// So we flush here to make sure that all potential data left by previous split operation is saved. + left->flushCache(dm_context); + right->flushCache(dm_context); auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); @@ -1149,6 +1158,10 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem return merged; } +/// Segments may contain some rows that not belong to its range which is left by previous split operation. +/// And only saved data in the segment will be filtered by the segment range in the merge process, +/// unsaved data will be directly copied to the new segment. +/// So remember to do a flush for the segments before merge. StableValueSpacePtr Segment::prepareMerge(DMContext & dm_context, // const ColumnDefinesPtr & schema_snap, const SegmentPtr & left, diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 3ad29ee14a5..cccfc5091b9 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -144,7 +144,8 @@ class Segment : private boost::noncopyable bool writeToCache(DMContext & dm_context, const Block & block, size_t offset, size_t limit); /// For test only. - bool write(DMContext & dm_context, const Block & block); + bool write(DMContext & dm_context, const Block & block, bool flush_cache = true); + bool write(DMContext & dm_context, const RowKeyRange & delete_range); bool ingestColumnFiles(DMContext & dm_context, const RowKeyRange & range, const ColumnFiles & column_files, bool clear_data_in_range); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 6bf33465366..e19dde73a05 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -949,11 +949,17 @@ CATCH TEST_F(Segment_test, Split) try { - const size_t num_rows_write = 100; + const size_t num_rows_write_per_batch = 100; + const size_t num_rows_write = num_rows_write_per_batch * 2; { - // write to segment - Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write, false); - segment->write(dmContext(), std::move(block)); + // write to segment and flush + Block block = DMTestEnv::prepareSimpleWriteBlock(0, num_rows_write_per_batch, false); + segment->write(dmContext(), std::move(block), true); + } + { + // write to segment and don't flush + Block block = DMTestEnv::prepareSimpleWriteBlock(num_rows_write_per_batch, 2 * num_rows_write_per_batch, false); + segment->write(dmContext(), std::move(block), false); } { @@ -989,7 +995,7 @@ try size_t num_rows_seg2 = 0; { { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = segment->getInputStream(dmContext(), *tableColumns(), {segment->getRowKeyRange()}); in->readPrefix(); while (Block block = in->read()) { @@ -998,7 +1004,7 @@ try in->readSuffix(); } { - auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + auto in = new_segment->getInputStream(dmContext(), *tableColumns(), {new_segment->getRowKeyRange()}); in->readPrefix(); while (Block block = in->read()) { @@ -1009,9 +1015,13 @@ try ASSERT_EQ(num_rows_seg1 + num_rows_seg2, num_rows_write); } + // delete rows in the right segment + { + new_segment->write(dmContext(), new_segment->getRowKeyRange()); + new_segment->flushCache(dmContext()); + } + // merge segments - // TODO: enable merge test! - if (false) { segment = Segment::merge(dmContext(), tableColumns(), segment, new_segment); { @@ -1030,7 +1040,7 @@ try num_rows_read += block.rows(); } in->readSuffix(); - EXPECT_EQ(num_rows_read, num_rows_write); + EXPECT_EQ(num_rows_read, num_rows_seg1); } } } From 1603fe1ffa032dc527f912164e47bcef86d81d7c Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 22 May 2022 17:51:17 +0800 Subject: [PATCH 2/6] keep flush until success --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 10 ++++++++-- dbms/src/Storages/DeltaMerge/Segment.cpp | 11 +++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 0a14974bbac..f01055a03ec 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1975,8 +1975,14 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le /// And only saved data in this segment will be filtered by the segment range in the merge process, /// unsaved data will be directly copied to the new segment. /// So we flush here to make sure that all potential data left by previous split operation is saved. - left->flushCache(dm_context); - right->flushCache(dm_context); + while (!left->flushCache(dm_context)) + { + // keep flush until success + } + while (!right->flushCache(dm_context)) + { + // keep flush until success + } SegmentSnapshotPtr left_snap; SegmentSnapshotPtr right_snap; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 40a684693e6..988599ec698 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -1136,8 +1136,15 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem /// And only saved data in this segment will be filtered by the segment range in the merge process, /// unsaved data will be directly copied to the new segment. /// So we flush here to make sure that all potential data left by previous split operation is saved. - left->flushCache(dm_context); - right->flushCache(dm_context); + while (!left->flushCache(dm_context)) + { + // keep flush until success + } + while (!right->flushCache(dm_context)) + { + // keep flush until success + } + auto left_snap = left->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); auto right_snap = right->createSnapshot(dm_context, true, CurrentMetrics::DT_SnapshotOfSegmentMerge); From 9598c4a7102f6ca974f274642f2b5b1357befb76 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 22 May 2022 19:21:18 +0800 Subject: [PATCH 3/6] check whether segment is valid if flush failed --- dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index f01055a03ec..27de092c26a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -1977,11 +1977,21 @@ void DeltaMergeStore::segmentMerge(DMContext & dm_context, const SegmentPtr & le /// So we flush here to make sure that all potential data left by previous split operation is saved. while (!left->flushCache(dm_context)) { - // keep flush until success + // keep flush until success if not abandoned + if (left->hasAbandoned()) + { + LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return; + } } while (!right->flushCache(dm_context)) { - // keep flush until success + // keep flush until success if not abandoned + if (right->hasAbandoned()) + { + LOG_FMT_DEBUG(log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return; + } } SegmentSnapshotPtr left_snap; From ba6be8f18609b7728f61a11c42844adf279be55c Mon Sep 17 00:00:00 2001 From: lidezhu <47731263+lidezhu@users.noreply.github.com> Date: Sun, 22 May 2022 19:21:52 +0800 Subject: [PATCH 4/6] Update dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp Co-authored-by: JaySon --- dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index e19dde73a05..5726cfa132d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -1017,7 +1017,7 @@ try // delete rows in the right segment { - new_segment->write(dmContext(), new_segment->getRowKeyRange()); + new_segment->write(dmContext(), /*delete_range*/ new_segment->getRowKeyRange()); new_segment->flushCache(dmContext()); } From d631d4c8dd19042c1d2c6f46a24ea21c569de0e4 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Sun, 22 May 2022 19:28:26 +0800 Subject: [PATCH 5/6] add more fix --- dbms/src/Storages/DeltaMerge/Segment.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 988599ec698..82d73ab1764 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -1138,11 +1138,21 @@ SegmentPtr Segment::merge(DMContext & dm_context, const ColumnDefinesPtr & schem /// So we flush here to make sure that all potential data left by previous split operation is saved. while (!left->flushCache(dm_context)) { - // keep flush until success + // keep flush until success if not abandoned + if (left->hasAbandoned()) + { + LOG_FMT_DEBUG(left->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return {}; + } } while (!right->flushCache(dm_context)) { - // keep flush until success + // keep flush until success if not abandoned + if (right->hasAbandoned()) + { + LOG_FMT_DEBUG(right->log, "Give up merge segments left [{}], right [{}]", left->segmentId(), right->segmentId()); + return {}; + } } From 4d7de4e0d9566e62c59b8b3ba4b804e2e7bfb437 Mon Sep 17 00:00:00 2001 From: lidezhu Date: Mon, 23 May 2022 11:30:25 +0800 Subject: [PATCH 6/6] check flush result in segment::write --- dbms/src/Storages/DeltaMerge/Segment.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 82d73ab1764..8398fdcee40 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -318,7 +318,11 @@ bool Segment::write(DMContext & dm_context, const Block & block, bool flush_cach { if (flush_cache) { - flushCache(dm_context); + while (!flushCache(dm_context)) + { + if (hasAbandoned()) + return false; + } } return true; }