From 7bc7f4be39bd30aa802f92a653bfe5d580d6af6b Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 24 Oct 2024 21:06:20 +0300 Subject: [PATCH 1/2] Fix resharding, add logging to writer --- ydb/public/lib/ydb_cli/dump/util/log.h | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 ydb/public/lib/ydb_cli/dump/util/log.h diff --git a/ydb/public/lib/ydb_cli/dump/util/log.h b/ydb/public/lib/ydb_cli/dump/util/log.h new file mode 100644 index 000000000000..df7bbef7cf3f --- /dev/null +++ b/ydb/public/lib/ydb_cli/dump/util/log.h @@ -0,0 +1,14 @@ +#pragma once + +#include + +#define LOG_IMPL(log, level, message) \ + if (log->FiltrationLevel() >= level) { \ + log->Write(level, TStringBuilder() << message); \ + } \ + Y_SEMICOLON_GUARD + +#define LOG_D(message) LOG_IMPL(Log, ELogPriority::TLOG_DEBUG, message) +#define LOG_I(message) LOG_IMPL(Log, ELogPriority::TLOG_INFO, message) +#define LOG_W(message) LOG_IMPL(Log, ELogPriority::TLOG_WARNING, message) +#define LOG_E(message) LOG_IMPL(Log, ELogPriority::TLOG_ERR, message) From d41ef38ab47519cfdbf44c18a22f793c84c92c34 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Thu, 24 Oct 2024 21:09:46 +0300 Subject: [PATCH 2/2] Fix resharding, add logging to writer --- ydb/public/lib/ydb_cli/dump/dump.cpp | 2 +- ydb/public/lib/ydb_cli/dump/restore_impl.cpp | 30 ++++------- ydb/public/lib/ydb_cli/dump/restore_impl.h | 4 +- .../lib/ydb_cli/dump/restore_import_data.cpp | 53 +++++++++++-------- .../lib/ydb_cli/dump/restore_import_data.h | 5 +- 5 files changed, 47 insertions(+), 47 deletions(-) diff --git a/ydb/public/lib/ydb_cli/dump/dump.cpp b/ydb/public/lib/ydb_cli/dump/dump.cpp index d9ba591caec2..8c7c8279c003 100644 --- a/ydb/public/lib/ydb_cli/dump/dump.cpp +++ b/ydb/public/lib/ydb_cli/dump/dump.cpp @@ -34,7 +34,7 @@ class TClient::TImpl { } TRestoreResult Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings) { - auto client = TRestoreClient(Driver, *Log); + auto client = TRestoreClient(Driver, Log); return client.Restore(fsPath, dbPath, settings); } diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp index 4c3a10f2ebff..f610afc12360 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp @@ -7,10 +7,9 @@ #include #include #include +#include #include -#include - #include #include #include @@ -19,17 +18,6 @@ #include #include -#define LOG_IMPL(log, level, message) \ - if (log.FiltrationLevel() >= level) { \ - log.Write(level, TStringBuilder() << message); \ - } \ - Y_SEMICOLON_GUARD - -#define LOG_D(message) LOG_IMPL(Log, ELogPriority::TLOG_DEBUG, message) -#define LOG_I(message) LOG_IMPL(Log, ELogPriority::TLOG_INFO, message) -#define LOG_W(message) LOG_IMPL(Log, ELogPriority::TLOG_WARNING, message) -#define LOG_E(message) LOG_IMPL(Log, ELogPriority::TLOG_ERR, message) - namespace NYdb { namespace NDump { @@ -48,7 +36,7 @@ bool IsFileExists(const TFsPath& path) { return path.Exists() && path.IsFile(); } -Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, TLog& log) { +Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, const TLog* log) { LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read scheme from " << fsPath.Quote()); Ydb::Table::CreateTableRequest proto; Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto)); @@ -64,7 +52,7 @@ TTableDescription TableDescriptionWithoutIndexesFromProto(Ydb::Table::CreateTabl return TableDescriptionFromProto(proto); } -Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath, TLog& log) { +Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath, const TLog* log) { LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read ACL from " << fsPath.Quote()); Ydb::Scheme::ModifyPermissionsRequest proto; Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto)); @@ -95,12 +83,12 @@ bool IsOperationStarted(TStatus operationStatus) { } // anonymous -TRestoreClient::TRestoreClient(const TDriver& driver, TLog& log) - : Log(log) - , ImportClient(driver) +TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr& log) + : ImportClient(driver) , OperationClient(driver) , SchemeClient(driver) , TableClient(driver) + , Log(log) { } @@ -257,7 +245,7 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath()); } - auto scheme = ReadTableScheme(fsPath.Child(SCHEME_FILE_NAME), Log); + auto scheme = ReadTableScheme(fsPath.Child(SCHEME_FILE_NAME), Log.get()); auto dumpedDesc = TableDescriptionFromProto(scheme); if (dumpedDesc.GetAttributes().contains(DOC_API_TABLE_VERSION_ATTR) && settings.SkipDocumentTables_) { @@ -384,7 +372,7 @@ TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString& } accumulator.Reset(CreateImportDataAccumulator(desc, *actualDesc, settings)); - writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings)); + writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings, Log)); break; } @@ -504,7 +492,7 @@ TRestoreResult TRestoreClient::RestorePermissions(const TFsPath& fsPath, const T LOG_D("Restore ACL " << fsPath.GetPath().Quote() << " to " << dbPath.Quote()); - auto permissions = ReadPermissions(fsPath.Child(PERMISSIONS_FILE_NAME), Log); + auto permissions = ReadPermissions(fsPath.Child(PERMISSIONS_FILE_NAME), Log.get()); return ModifyPermissions(SchemeClient, dbPath, TModifyPermissionsSettings(permissions)); } diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.h b/ydb/public/lib/ydb_cli/dump/restore_impl.h index 0d3b100793f8..1c16e781e68d 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_impl.h +++ b/ydb/public/lib/ydb_cli/dump/restore_impl.h @@ -47,16 +47,16 @@ class TRestoreClient { TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const THashSet& oldEntries); public: - explicit TRestoreClient(const TDriver& driver, TLog& log); + explicit TRestoreClient(const TDriver& driver, const std::shared_ptr& log); TRestoreResult Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings = {}); private: - TLog& Log; NImport::TImportClient ImportClient; NOperation::TOperationClient OperationClient; NScheme::TSchemeClient SchemeClient; NTable::TTableClient TableClient; + std::shared_ptr Log; }; // TRestoreClient diff --git a/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp b/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp index 3c5835f71603..6605b8e1265e 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp +++ b/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp @@ -1,6 +1,7 @@ #include "restore_import_data.h" #include +#include #include #include @@ -662,7 +663,7 @@ class TTableRows { return false; } - TString GetData(ui64 memLimit, ui64 batchSize, bool force = false) { + TString GetData(ui64 memLimit, ui64 batchSize, bool force = false) { Y_ENSURE(HasData(memLimit, batchSize, force)); Y_ENSURE(!ByMemSize.empty()); Y_ENSURE(!ByRecordsSize.empty()); @@ -670,21 +671,21 @@ class TTableRows { Y_ENSURE(!ByRecordsSize.begin()->second.empty()); auto get = [this, batchSize](TRowsBy& from) { - auto it = from.begin()->second.begin(); - auto& rows = (*it)->second; + auto it = *from.begin()->second.begin(); + auto& rows = it->second; - RemoveFromSizeTracker(ByMemSize, rows.MemSize(), *it); - RemoveFromSizeTracker(ByRecordsSize, rows.RecordsSize(), *it); + RemoveFromSizeTracker(ByMemSize, rows.MemSize(), it); + RemoveFromSizeTracker(ByRecordsSize, rows.RecordsSize(), it); MemSize -= rows.MemSize(); auto ret = rows.Serialize(batchSize); MemSize += rows.MemSize(); if (rows.MemSize()) { - Y_ENSURE(ByMemSize[rows.MemSize()].insert(*it).second); + Y_ENSURE(ByMemSize[rows.MemSize()].insert(it).second); } if (rows.RecordsSize()) { - Y_ENSURE(ByRecordsSize[rows.RecordsSize()].insert(*it).second); + Y_ENSURE(ByRecordsSize[rows.RecordsSize()].insert(it).second); } return ret; @@ -739,9 +740,16 @@ class TDataAccumulator: public NPrivate::IDataAccumulator { return Rows.GetData(MemLimit, BatchSize, force); } - void Reshard(const TVector& keyRanges) { + void Reshard(const TVector& keyRanges, const TString& data) { TGuard lock(Mutex); Rows.Reshard(keyRanges); + + TStringInput input(data); + TString line; + + while (input.ReadLine(line)) { + Rows.Add(KeyBuilder.Build(line), std::move(line)); + } } private: @@ -792,6 +800,7 @@ class TDataWriter: public NPrivate::IDataWriter { } if (retryNumber == maxRetries) { + LOG_E("There is no retries left, last result: " << importResult.GetIssues().ToOneLineString()); return false; } @@ -801,19 +810,12 @@ class TDataWriter: public NPrivate::IDataWriter { TMaybe desc; auto descResult = DescribeTable(TableClient, Path, desc); if (!descResult.IsSuccess()) { + LOG_E("Describe table " << Path.Quote() << " failed: " << descResult.GetIssues().ToOneLineString()); return false; } - Accumulator->Reshard(desc->GetKeyRanges()); - - TStringInput input(data); - TString line; - - while (input.ReadLine(line)) { - Accumulator->Feed(std::move(line)); - } - - break; + Accumulator->Reshard(desc->GetKeyRanges(), data); + return true; } case EStatus::ABORTED: @@ -855,12 +857,14 @@ class TDataWriter: public NPrivate::IDataWriter { const TRestoreSettings& settings, TImportClient& importClient, TTableClient& tableClient, - NPrivate::IDataAccumulator* accumulator) + NPrivate::IDataAccumulator* accumulator, + const std::shared_ptr& log) : Path(path) , Settings(MakeSettings(settings, desc)) , ImportClient(importClient) , TableClient(tableClient) , Accumulator(dynamic_cast(accumulator)) + , Log(log) , RateLimiterSettings(settings.RateLimiterSettings_) , RequestLimiter(RateLimiterSettings.GetRps(), RateLimiterSettings.GetRps()) { @@ -871,7 +875,10 @@ class TDataWriter: public NPrivate::IDataWriter { } bool Push(TString&& data) override { - Y_ENSURE(data.size() < TRestoreSettings::MaxBytesPerRequest, "Data is too long"); + if (data.size() >= TRestoreSettings::MaxBytesPerRequest) { + LOG_E("Data is too long"); + return false; + } if (IsStopped()) { return false; @@ -896,6 +903,7 @@ class TDataWriter: public NPrivate::IDataWriter { TImportClient& ImportClient; TTableClient& TableClient; TDataAccumulator* Accumulator; + const std::shared_ptr Log; const TRateLimiterSettings RateLimiterSettings; @@ -922,8 +930,9 @@ NPrivate::IDataWriter* CreateImportDataWriter( TImportClient& importClient, TTableClient& tableClient, NPrivate::IDataAccumulator* accumulator, - const TRestoreSettings& settings) { - return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator); + const TRestoreSettings& settings, + const std::shared_ptr& log) { + return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator, log); } } // NDump diff --git a/ydb/public/lib/ydb_cli/dump/restore_import_data.h b/ydb/public/lib/ydb_cli/dump/restore_import_data.h index 93ff09bf2217..0d4b0403d45f 100644 --- a/ydb/public/lib/ydb_cli/dump/restore_import_data.h +++ b/ydb/public/lib/ydb_cli/dump/restore_import_data.h @@ -2,6 +2,8 @@ #include "restore_impl.h" +class TLog; + namespace NYdb { namespace NDump { @@ -16,7 +18,8 @@ NPrivate::IDataWriter* CreateImportDataWriter( NImport::TImportClient& importClient, NTable::TTableClient& tableClient, NPrivate::IDataAccumulator* accumulator, - const TRestoreSettings& settings); + const TRestoreSettings& settings, + const std::shared_ptr& log); } // NDump } // NYdb