Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DMFile: Support modify DMFile meta #9365

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion dbms/src/Server/DTTool/DTToolInspect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args)

// Open the DMFile at `workdir/dmf_<file-id>`
auto fp = context.getFileProvider();
auto dmfile = DB::DM::DMFile::restore(fp, args.file_id, 0, args.workdir, DB::DM::DMFileMeta::ReadMode::all());
auto dmfile = DB::DM::DMFile::restore(
fp,
args.file_id,
0,
args.workdir,
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);

LOG_INFO(logger, "bytes on disk: {}", dmfile->getBytesOnDisk());

Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Server/DTTool/DTToolMigrate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ bool isRecognizable(const DB::DM::DMFile & file, const std::string & target)
{
return DB::DM::DMFileMeta::metaFileName() == target || DB::DM::DMFileMeta::configurationFileName() == target
|| DB::DM::DMFileMeta::packPropertyFileName() == target || needFrameMigration(file, target)
|| isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::metaFileName() == target;
|| isIgnoredInMigration(file, target) || DB::DM::DMFileMetaV2::isMetaFileName(target);
}

namespace bpo = boost::program_options;
Expand Down Expand Up @@ -193,7 +193,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
args.file_id,
0,
args.workdir,
DB::DM::DMFileMeta::ReadMode::all());
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);
auto source_version = 0;
if (src_file->useMetaV2())
{
Expand Down Expand Up @@ -270,7 +271,8 @@ int migrateServiceMain(DB::Context & context, const MigrateArgs & args)
args.file_id,
1,
keeper.migration_temp_dir.path(),
DB::DM::DMFileMeta::ReadMode::all());
DB::DM::DMFileMeta::ReadMode::all(),
0 /* FIXME: Support other meta version */);
}
}
LOG_INFO(logger, "migration finished");
Expand Down
20 changes: 14 additions & 6 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ void ColumnFileBig::serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool /*
big_pb->set_id(file->pageId());
big_pb->set_valid_rows(valid_rows);
big_pb->set_valid_bytes(valid_bytes);
big_pb->set_meta_version(file->metaVersion());
}

ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
Expand All @@ -100,8 +101,10 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
readIntBinary(valid_bytes, buf);

auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
auto dmfile = remote_data_store ? restoreDMFileFromRemoteDataSource(dm_context, remote_data_store, file_page_id)
: restoreDMFileFromLocal(dm_context, file_page_id);
// In this version, ColumnFileBig's meta_version is always 0.
auto dmfile = remote_data_store
? restoreDMFileFromRemoteDataSource(dm_context, remote_data_store, file_page_id, /* meta_version */ 0)
: restoreDMFileFromLocal(dm_context, file_page_id, /* meta_version */ 0);
auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, segment_range);
return std::shared_ptr<ColumnFileBig>(dp_file);
}
Expand All @@ -112,8 +115,9 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata(
const dtpb::ColumnFileBig & cf_pb)
{
auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
auto dmfile = remote_data_store ? restoreDMFileFromRemoteDataSource(dm_context, remote_data_store, cf_pb.id())
: restoreDMFileFromLocal(dm_context, cf_pb.id());
auto dmfile = remote_data_store
? restoreDMFileFromRemoteDataSource(dm_context, remote_data_store, cf_pb.id(), cf_pb.meta_version())
: restoreDMFileFromLocal(dm_context, cf_pb.id(), cf_pb.meta_version());
auto * dp_file = new ColumnFileBig(dmfile, cf_pb.valid_rows(), cf_pb.valid_bytes(), segment_range);
return std::shared_ptr<ColumnFileBig>(dp_file);
}
Expand All @@ -132,8 +136,11 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
readIntBinary(valid_rows, buf);
readIntBinary(valid_bytes, buf);

// In this version, ColumnFileBig's meta_version is always 0.
UInt64 meta_version = 0;

auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
auto dmfile = restoreDMFileFromCheckpoint(dm_context, remote_data_store, temp_ps, wbs, file_page_id);
auto dmfile = restoreDMFileFromCheckpoint(dm_context, remote_data_store, temp_ps, wbs, file_page_id, meta_version);
auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, target_range);
return std::shared_ptr<ColumnFileBig>(dp_file);
}
Expand All @@ -148,9 +155,10 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint(
UInt64 file_page_id = cf_pb.id();
size_t valid_rows = cf_pb.valid_rows();
size_t valid_bytes = cf_pb.valid_bytes();
size_t meta_version = cf_pb.meta_version();

auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store;
auto dmfile = restoreDMFileFromCheckpoint(dm_context, remote_data_store, temp_ps, wbs, file_page_id);
auto dmfile = restoreDMFileFromCheckpoint(dm_context, remote_data_store, temp_ps, wbs, file_page_id, meta_version);
auto * dp_file = new ColumnFileBig(dmfile, valid_rows, valid_bytes, target_range);
return std::shared_ptr<ColumnFileBig>(dp_file);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DMContext_fwd.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 PingCAP, Inc.
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ std::vector<ColumnFilePtrT> CloneColumnFilesHelper<ColumnFilePtrT>::clone(
/* page_id= */ new_page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
old_dmfile->metaVersion(),
dm_context.keyspace_id);

auto new_column_file = f->cloneWith(dm_context, new_file, target_range);
cloned.push_back(new_column_file);
}
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ void DeltaMergeStore::cleanPreIngestFiles(
f.id,
file_parent_path,
DM::DMFileMeta::ReadMode::memoryAndDiskSize(),
0 /* a meta version that must exists */,
keyspace_id);
removePreIngestFile(f.id, false);
file->remove(file_provider);
Expand Down Expand Up @@ -189,6 +190,7 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
page_id,
file_parent_path,
DMFileMeta::ReadMode::all(),
file->metaVersion(),
keyspace_id);
data_files.emplace_back(std::move(ref_file));
wbs.data.putRefPage(page_id, file->pageId());
Expand Down Expand Up @@ -472,6 +474,7 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit(
new_page_id,
file->parentPath(),
DMFileMeta::ReadMode::all(),
file->metaVersion(),
keyspace_id);
wbs.data.putRefPage(new_page_id, file->pageId());

Expand Down Expand Up @@ -661,6 +664,7 @@ UInt64 DeltaMergeStore::ingestFiles(
external_file.id,
file_parent_path,
DMFileMeta::ReadMode::memoryAndDiskSize(),
0 /* FIXME: Support other meta version */,
keyspace_id);
}
else
Expand All @@ -671,7 +675,7 @@ UInt64 DeltaMergeStore::ingestFiles(
.table_id = dm_context->physical_table_id,
.file_id = external_file.id};
file = remote_data_store->prepareDMFile(oid, external_file.id)
->restore(DMFileMeta::ReadMode::memoryAndDiskSize());
->restore(DMFileMeta::ReadMode::memoryAndDiskSize(), 0 /* FIXME: Support other meta version */);
}
rows += file->getRows();
bytes += file->getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class LocalDMFileGcRemover final
/* page_id= */ 0,
path,
DMFileMeta::ReadMode::none(),
0 /* a meta version that must exist */,
path_pool->getKeyspaceID());
if (unlikely(!dmfile))
{
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/ColumnStat.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ struct ColumnStat

std::optional<dtpb::VectorIndexFileProps> vector_index = std::nullopt;

#ifndef NDEBUG
// This field is only used for testing
String additional_data_for_test{};
JaySon-Huang marked this conversation as resolved.
Show resolved Hide resolved
#endif

dtpb::ColumnStat toProto() const
{
dtpb::ColumnStat stat;
Expand All @@ -61,6 +66,10 @@ struct ColumnStat
if (vector_index.has_value())
stat.mutable_vector_index()->CopyFrom(vector_index.value());

#ifndef NDEBUG
stat.set_additional_data_for_test(additional_data_for_test);
#endif

return stat;
}

Expand All @@ -80,6 +89,9 @@ struct ColumnStat

if (proto.has_vector_index())
vector_index = proto.vector_index();
#ifndef NDEBUG
additional_data_for_test = proto.additional_data_for_test();
#endif
}

// @deprecated. New fields should be added via protobuf. Use `toProto` instead
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ DMFilePtr DMFile::restore(
UInt64 page_id,
const String & parent_path,
const DMFileMeta::ReadMode & read_meta_mode,
UInt64 meta_version,
KeyspaceID keyspace_id)
{
auto is_s3_file = S3::S3FilenameView::fromKeyWithPrefix(parent_path).isDataFile();
Expand All @@ -137,8 +138,12 @@ DMFilePtr DMFile::restore(
/*configuration_*/ std::nullopt,
/*version_*/ STORAGE_FORMAT_CURRENT.dm_file,
/*keyspace_id_*/ keyspace_id));
if (is_s3_file || Poco::File(dmfile->metav2Path()).exists())
if (is_s3_file || Poco::File(dmfile->metav2Path(/* meta_version= */ 0)).exists())
{
// Always use meta_version=0 when checking whether we should treat it as metav2.
// However, when reading actual meta data, we will read according to specified
// meta version.

dmfile->meta = std::make_unique<DMFileMetaV2>(
file_id,
parent_path,
Expand All @@ -147,11 +152,14 @@ DMFilePtr DMFile::restore(
16 * 1024 * 1024,
keyspace_id,
std::nullopt,
STORAGE_FORMAT_CURRENT.dm_file);
STORAGE_FORMAT_CURRENT.dm_file,
meta_version);
dmfile->meta->read(file_provider, read_meta_mode);
}
else if (!read_meta_mode.isNone())
{
RUNTIME_CHECK_MSG(meta_version == 0, "Only support meta_version=0 for MetaV2, meta_version={}", meta_version);
Lloyd-Pottiger marked this conversation as resolved.
Show resolved Hide resolved

dmfile->meta = std::make_unique<DMFileMeta>(
file_id,
parent_path,
Expand Down
28 changes: 20 additions & 8 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Poco/File.h>
#include <Storages/DeltaMerge/DeltaMergeDefines.h>
#include <Storages/DeltaMerge/File/DMFileMetaV2.h>
#include <Storages/DeltaMerge/File/DMFileV3IncrementWriter_fwd.h>
#include <Storages/DeltaMerge/File/DMFile_fwd.h>
#include <Storages/FormatVersion.h>
#include <Storages/S3/S3Filename.h>
Expand Down Expand Up @@ -61,6 +62,7 @@ class DMFile : private boost::noncopyable
UInt64 page_id,
const String & parent_path,
const DMFileMeta::ReadMode & read_meta_mode,
UInt64 meta_version = 0,
KeyspaceID keyspace_id = NullspaceID);

struct ListOptions
Expand Down Expand Up @@ -89,7 +91,7 @@ class DMFile : private boost::noncopyable
// keyspaceID
KeyspaceID keyspaceId() const { return meta->keyspace_id; }

DMFileFormat::Version version() const { return meta->version; }
DMFileFormat::Version version() const { return meta->format_version; }

String path() const;

Expand Down Expand Up @@ -128,7 +130,7 @@ class DMFile : private boost::noncopyable
const std::unordered_set<ColId> & getColumnIndices() const { return meta->column_indices; }

// only used in gtest
void clearPackProperties() { meta->pack_properties.clear_property(); }
void clearPackProperties() const { meta->pack_properties.clear_property(); }

const ColumnStat & getColumnStat(ColId col_id) const
{
Expand Down Expand Up @@ -158,7 +160,7 @@ class DMFile : private boost::noncopyable
* Note that only the column id and type is valid.
* @return All columns
*/
ColumnDefines getColumnDefines(bool sort_by_id = true)
ColumnDefines getColumnDefines(bool sort_by_id = true) const
{
ColumnDefines results{};
results.reserve(this->meta->column_stats.size());
Expand All @@ -173,10 +175,12 @@ class DMFile : private boost::noncopyable
return results;
}

bool useMetaV2() const { return meta->version == DMFileFormat::V3; }
bool useMetaV2() const { return meta->format_version == DMFileFormat::V3; }
std::vector<String> listFilesForUpload() const;
void switchToRemote(const S3::DMFileOID & oid);

UInt32 metaVersion() const { return meta->metaVersion(); }

private:
DMFile(
UInt64 file_id_,
Expand All @@ -201,7 +205,8 @@ class DMFile : private boost::noncopyable
merged_file_max_size_,
keyspace_id_,
configuration_,
version_);
version_,
/* meta_version= */ 0);
}
else
{
Expand All @@ -218,7 +223,7 @@ class DMFile : private boost::noncopyable
// Do not gc me.
String ngcPath() const;

String metav2Path() const { return subFilePath(DMFileMetaV2::metaFileName()); }
String metav2Path(UInt64 meta_version) const { return subFilePath(DMFileMetaV2::metaFileName(meta_version)); }
UInt64 getReadFileSize(ColId col_id, const String & filename) const
{
return meta->getReadFileSize(col_id, filename);
Expand Down Expand Up @@ -270,10 +275,10 @@ class DMFile : private boost::noncopyable
return IDataType::getFileNameForStream(DB::toString(col_id), substream);
}

void addPack(const DMFileMeta::PackStat & pack_stat) { meta->pack_stats.push_back(pack_stat); }
void addPack(const DMFileMeta::PackStat & pack_stat) const { meta->pack_stats.push_back(pack_stat); }

DMFileStatus getStatus() const { return meta->status; }
void setStatus(DMFileStatus status_) { meta->status = status_; }
void setStatus(DMFileStatus status_) const { meta->status = status_; }

void finalize();

Expand All @@ -283,8 +288,15 @@ class DMFile : private boost::noncopyable
const UInt64 page_id;

LoggerPtr log;

#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif
DMFileMetaPtr meta;

friend class DMFileV3IncrementWriter;
friend class DMFileWriter;
friend class DMFileWriterRemote;
friend class DMFileReader;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFileMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ void DMFileMeta::readConfiguration(const FileProviderPtr & file_provider)
= openForRead(file_provider, configurationPath(), encryptionConfigurationPath(), DBMS_DEFAULT_BUFFER_SIZE);
auto stream = InputStreamWrapper{buf};
configuration.emplace(stream);
version = DMFileFormat::V2;
format_version = DMFileFormat::V2;
}
else
{
configuration.reset();
version = DMFileFormat::V1;
format_version = DMFileFormat::V1;
}
}

Expand Down
Loading