Skip to content

Commit

Permalink
storage: Introduce replaceData in the segment API (#5904)
Browse files Browse the repository at this point in the history
ref #5237
  • Loading branch information
breezewish authored Sep 16, 2022
1 parent aae88b1 commit 892f852
Show file tree
Hide file tree
Showing 10 changed files with 1,012 additions and 132 deletions.
20 changes: 20 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ using LoggerPtr = std::shared_ptr<Logger>;

namespace DM
{
class DMFile;
using DMFilePtr = std::shared_ptr<DMFile>;
class Segment;
using SegmentPtr = std::shared_ptr<Segment>;
using SegmentPair = std::pair<SegmentPtr, SegmentPtr>;
Expand Down Expand Up @@ -495,6 +497,24 @@ class DeltaMergeStore : private boost::noncopyable
MergeDeltaReason reason,
SegmentSnapshotPtr segment_snap = nullptr);

/**
* Discard all data in the segment, and use the specified DMFile as the stable instead.
* The specified DMFile is safe to be shared for multiple segments.
*
* Note 1: This function will not enable GC for the new_stable_file for you, in case of you may want to share the same
* stable file for multiple segments. It is your own duty to enable GC later.
*
* Note 2: You must ensure the specified new_stable_file has been managed by the storage pool, and has been written
* to the PageStorage's data. Otherwise there will be exceptions.
*
* Note 3: This API is subjected to be changed in future, as it relies on the knowledge that all current data
* in this segment is useless, which is a pretty tough requirement.
*/
SegmentPtr segmentDangerouslyReplaceData(
DMContext & dm_context,
const SegmentPtr & segment,
const DMFilePtr & data_file);

// isSegmentValid should be protected by lock on `read_write_mutex`
inline bool isSegmentValid(const std::shared_lock<std::shared_mutex> &, const SegmentPtr & segment)
{
Expand Down
42 changes: 42 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,48 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(
return new_segment;
}

SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceData(
DMContext & dm_context,
const SegmentPtr & segment,
const DMFilePtr & data_file)
{
LOG_FMT_INFO(log, "ReplaceData - Begin, segment={} data_file={}", segment->info(), data_file->path());

WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter());

SegmentPtr new_segment;
{
std::unique_lock lock(read_write_mutex);
if (!isSegmentValid(lock, segment))
{
LOG_FMT_DEBUG(log, "ReplaceData - Give up segment replace data because segment not valid, segment={} data_file={}", segment->simpleInfo(), data_file->path());
return {};
}

auto segment_lock = segment->mustGetUpdateLock();
new_segment = segment->dangerouslyReplaceData(segment_lock, dm_context, data_file, wbs);

RUNTIME_CHECK(compare(segment->getRowKeyRange().getEnd(), new_segment->getRowKeyRange().getEnd()) == 0, segment->info(), new_segment->info());
RUNTIME_CHECK(segment->segmentId() == new_segment->segmentId(), segment->info(), new_segment->info());

wbs.writeLogAndData();
wbs.writeMeta();

segment->abandon(dm_context);
segments[segment->getRowKeyRange().getEnd()] = new_segment;
id_to_segment[segment->segmentId()] = new_segment;

LOG_FMT_INFO(log, "ReplaceData - Finish, old_segment={} new_segment={}", segment->info(), new_segment->info());
}

wbs.writeRemoves();

if constexpr (DM_RUN_CHECK)
check(dm_context.db_context);

return new_segment;
}

bool DeltaMergeStore::doIsSegmentValid(const SegmentPtr & segment)
{
if (segment->hasAbandoned())
Expand Down
15 changes: 10 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,11 +268,16 @@ class DMFile : private boost::noncopyable
bool isColumnExist(ColId col_id) const { return column_stats.find(col_id) != column_stats.end(); }
bool isSingleFileMode() const { return mode == Mode::SINGLE_FILE; }

String toString() const
{
return "{DMFile, packs: " + DB::toString(getPacks()) + ", rows: " + DB::toString(getRows()) + ", bytes: " + DB::toString(getBytes())
+ ", file size: " + DB::toString(getBytesOnDisk()) + "}";
}
/*
* TODO: This function is currently unused. We could use it when:
* 1. The content is polished (e.g. including at least file ID, and use a format easy for grep).
* 2. Unify the place where we are currently printing out DMFile's `path` or `file_id`.
*/
// String toString() const
// {
// return "{DMFile, packs: " + DB::toString(getPacks()) + ", rows: " + DB::toString(getRows()) + ", bytes: " + DB::toString(getBytes())
// + ", file size: " + DB::toString(getBytesOnDisk()) + "}";
// }

DMConfigurationOpt & getConfiguration() { return configuration; }

Expand Down
59 changes: 59 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,65 @@ SegmentPtr Segment::applyMergeDelta(const Segment::Lock &, //
return new_me;
}

SegmentPtr Segment::dangerouslyReplaceDataForTest(DMContext & dm_context, //
const DMFilePtr & data_file) const
{
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());

auto lock = mustGetUpdateLock();
auto new_segment = dangerouslyReplaceData(lock, dm_context, data_file, wbs);

wbs.writeAll();
return new_segment;
}

SegmentPtr Segment::dangerouslyReplaceData(const Segment::Lock &, //
DMContext & dm_context,
const DMFilePtr & data_file,
WriteBatches & wbs) const
{
LOG_FMT_DEBUG(log, "ReplaceData - Begin, data_file={}", data_file->path());

auto & storage_pool = dm_context.storage_pool;
auto delegate = dm_context.path_pool.getStableDiskDelegator();

RUNTIME_CHECK(delegate.getDTFilePath(data_file->fileId()) == data_file->parentPath());

// Always create a ref to the file to allow `data_file` being shared.
auto new_page_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__);
// TODO: We could allow assigning multiple DMFiles in future.
auto ref_file = DMFile::restore(
dm_context.db_context.getFileProvider(),
data_file->fileId(),
new_page_id,
data_file->parentPath(),
DMFile::ReadMetaMode::all());
wbs.data.putRefPage(new_page_id, data_file->pageId());

auto new_stable = std::make_shared<StableValueSpace>(stable->getId());
new_stable->setFiles({ref_file}, rowkey_range, &dm_context);
new_stable->saveMeta(wbs.meta);

// Empty new delta
auto new_delta = std::make_shared<DeltaValueSpace>(delta->getId());
new_delta->saveMeta(wbs);

auto new_me = std::make_shared<Segment>(epoch + 1, //
rowkey_range,
segment_id,
next_segment_id,
new_delta,
new_stable);
new_me->serialize(wbs.meta);

delta->recordRemoveColumnFilesPages(wbs);
stable->recordRemovePacksPages(wbs);

LOG_FMT_DEBUG(log, "ReplaceData - Finish, old_me={} new_me={}", info(), new_me->info());

return new_me;
}

SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, std::optional<RowKeyValue> opt_split_at, SplitMode opt_split_mode) const
{
WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter());
Expand Down
27 changes: 26 additions & 1 deletion dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,31 @@ class Segment : private boost::noncopyable
WriteBatches & wbs,
const StableValueSpacePtr & new_stable) const;

/**
* Only used in tests as a shortcut.
* Normally you should use `dangerouslyReplaceData`.
*/
[[nodiscard]] SegmentPtr dangerouslyReplaceDataForTest(DMContext & dm_context, const DMFilePtr & data_file) const;

/**
* Discard all data in the current delta and stable layer, and use the specified DMFile as the stable instead.
* This API does not have a prepare & apply pair, as it should be quick enough. The specified DMFile is safe
* to be shared for multiple segments.
*
* Note 1: Should be protected behind the Segment update lock to ensure no new data will be appended to this
* segment during the function call. Otherwise these new data will be lost in the new segment.
*
* Note 2: This function will not enable GC for the new_stable_file for you, in case of you may want to share the same
* stable file for multiple segments. It is your own duty to enable GC later.
*
* Note 3: You must ensure the specified new_stable_file has been managed by the storage pool, and has been written
* to the PageStorage's data. Otherwise there will be exceptions.
*
* Note 4: This API is subjected to be changed in future, as it relies on the knowledge that all current data
* in this segment is useless, which is a pretty tough requirement.
*/
[[nodiscard]] SegmentPtr dangerouslyReplaceData(const Lock &, DMContext & dm_context, const DMFilePtr & data_file, WriteBatches & wbs) const;

[[nodiscard]] SegmentPtr dropNextSegment(WriteBatches & wbs, const RowKeyRange & next_segment_range);

/// Flush delta's cache packs.
Expand Down Expand Up @@ -468,7 +493,7 @@ class Segment : private boost::noncopyable
bool relevant_place) const;

private:
/// The version of this segment. After split / merge / merge delta, epoch got increased by 1.
/// The version of this segment. After split / merge / mergeDelta / dangerouslyReplaceData, epoch got increased by 1.
const UInt64 epoch;

RowKeyRange rowkey_range;
Expand Down
116 changes: 116 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <Common/Logger.h>
#include <Common/PODArray.h>
#include <Common/SyncPoint/Ctl.h>
#include <DataStreams/OneBlockInputStream.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
Expand Down Expand Up @@ -46,6 +47,121 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte
}
namespace tests
{

class SegmentFrameworkTest : public SegmentTestBasic
{
};

TEST_F(SegmentFrameworkTest, PrepareWriteBlock)
try
{
reloadWithOptions(SegmentTestOptions{.is_common_handle = false});

auto s1_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, 10);
ASSERT_TRUE(s1_id.has_value());
auto s2_id = splitSegmentAt(*s1_id, 20);
ASSERT_TRUE(s2_id.has_value());

// s1 has range [10, 20)
{
auto [begin, end] = getSegmentKeyRange(*s1_id);
ASSERT_EQ(10, begin);
ASSERT_EQ(20, end);
}

{
// write_rows == segment_rows, start_key not specified
auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 10);
ASSERT_EQ(1, blocks.size());
auto handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({10, 11, 12, 13, 14, 15, 16, 17, 18, 19}), handle_data);
}
{
// write_rows > segment_rows, start_key not specified
auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 13);
ASSERT_EQ(2, blocks.size());
{
auto handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({10, 11, 12, 13, 14, 15, 16, 17, 18, 19}), handle_data);
}
{
auto handle_column = blocks[1].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({10, 11, 12}), handle_data);
}
}
{
// start_key specified, end_key - start_key < write_rows
auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 2, /* at */ 16);
ASSERT_EQ(1, blocks.size());
const auto & handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({16, 17}), handle_data);
}
{
auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 4, /* at */ 16);
ASSERT_EQ(1, blocks.size());
const auto & handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({16, 17, 18, 19}), handle_data);
}
{
auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 5, /* at */ 16);
ASSERT_EQ(2, blocks.size());
{
const auto & handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({16, 17, 18, 19}), handle_data);
}
{
const auto & handle_column = blocks[1].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({16}), handle_data);
}
}
{
auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 10, /* at */ 16);
ASSERT_EQ(3, blocks.size());
{
const auto & handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({16, 17, 18, 19}), handle_data);
}
{
const auto & handle_column = blocks[1].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({16, 17, 18, 19}), handle_data);
}
{
const auto & handle_column = blocks[2].getByName(EXTRA_HANDLE_COLUMN_NAME).column;
const auto & handle_data = typeid_cast<const ColumnVector<Handle> &>(*handle_column).getData();
ASSERT_EQ(PaddedPODArray<Handle>({16, 17}), handle_data);
}
}
{
// write rows < segment rows, start key not specified, should choose a random start.
auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 3);
ASSERT_EQ(1, blocks.size());
ASSERT_EQ(3, blocks[0].rows());
}
{
// Let's check whether the generated handles will be starting from 12, for at least once.
auto start_from_12 = 0;
for (size_t i = 0; i < 100; i++)
{
auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 3);
if (blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column->getInt(0) == 12)
start_from_12++;
}
ASSERT_TRUE(start_from_12 > 0); // We should hit at least 1 times in 100 iters.
ASSERT_TRUE(start_from_12 < 50); // We should not hit 50 times in 100 iters :)
}
}
CATCH


class SegmentOperationTest : public SegmentTestBasic
{
protected:
Expand Down
Loading

0 comments on commit 892f852

Please sign in to comment.