Skip to content

Commit

Permalink
Merge d41ef38 into 8a06f46
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Oct 24, 2024
2 parents 8a06f46 + d41ef38 commit 8061cee
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 47 deletions.
2 changes: 1 addition & 1 deletion ydb/public/lib/ydb_cli/dump/dump.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TClient::TImpl {
}

TRestoreResult Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings) {
auto client = TRestoreClient(Driver, *Log);
auto client = TRestoreClient(Driver, Log);
return client.Restore(fsPath, dbPath, settings);
}

Expand Down
30 changes: 9 additions & 21 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
#include <ydb/public/lib/ydb_cli/common/retry_func.h>
#include <ydb/public/lib/ydb_cli/dump/util/log.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>

#include <library/cpp/logger/log.h>

#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
#include <util/generic/maybe.h>
Expand All @@ -19,17 +18,6 @@
#include <util/string/builder.h>
#include <util/string/join.h>

#define LOG_IMPL(log, level, message) \
if (log.FiltrationLevel() >= level) { \
log.Write(level, TStringBuilder() << message); \
} \
Y_SEMICOLON_GUARD

#define LOG_D(message) LOG_IMPL(Log, ELogPriority::TLOG_DEBUG, message)
#define LOG_I(message) LOG_IMPL(Log, ELogPriority::TLOG_INFO, message)
#define LOG_W(message) LOG_IMPL(Log, ELogPriority::TLOG_WARNING, message)
#define LOG_E(message) LOG_IMPL(Log, ELogPriority::TLOG_ERR, message)

namespace NYdb {
namespace NDump {

Expand All @@ -48,7 +36,7 @@ bool IsFileExists(const TFsPath& path) {
return path.Exists() && path.IsFile();
}

Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, TLog& log) {
Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, const TLog* log) {
LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read scheme from " << fsPath.Quote());
Ydb::Table::CreateTableRequest proto;
Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto));
Expand All @@ -64,7 +52,7 @@ TTableDescription TableDescriptionWithoutIndexesFromProto(Ydb::Table::CreateTabl
return TableDescriptionFromProto(proto);
}

Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath, TLog& log) {
Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath, const TLog* log) {
LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read ACL from " << fsPath.Quote());
Ydb::Scheme::ModifyPermissionsRequest proto;
Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto));
Expand Down Expand Up @@ -95,12 +83,12 @@ bool IsOperationStarted(TStatus operationStatus) {

} // anonymous

TRestoreClient::TRestoreClient(const TDriver& driver, TLog& log)
: Log(log)
, ImportClient(driver)
TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog>& log)
: ImportClient(driver)
, OperationClient(driver)
, SchemeClient(driver)
, TableClient(driver)
, Log(log)
{
}

Expand Down Expand Up @@ -257,7 +245,7 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
}

auto scheme = ReadTableScheme(fsPath.Child(SCHEME_FILE_NAME), Log);
auto scheme = ReadTableScheme(fsPath.Child(SCHEME_FILE_NAME), Log.get());
auto dumpedDesc = TableDescriptionFromProto(scheme);

if (dumpedDesc.GetAttributes().contains(DOC_API_TABLE_VERSION_ATTR) && settings.SkipDocumentTables_) {
Expand Down Expand Up @@ -384,7 +372,7 @@ TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString&
}

accumulator.Reset(CreateImportDataAccumulator(desc, *actualDesc, settings));
writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings));
writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings, Log));

break;
}
Expand Down Expand Up @@ -504,7 +492,7 @@ TRestoreResult TRestoreClient::RestorePermissions(const TFsPath& fsPath, const T

LOG_D("Restore ACL " << fsPath.GetPath().Quote() << " to " << dbPath.Quote());

auto permissions = ReadPermissions(fsPath.Child(PERMISSIONS_FILE_NAME), Log);
auto permissions = ReadPermissions(fsPath.Child(PERMISSIONS_FILE_NAME), Log.get());
return ModifyPermissions(SchemeClient, dbPath, TModifyPermissionsSettings(permissions));
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/public/lib/ydb_cli/dump/restore_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ class TRestoreClient {
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const THashSet<TString>& oldEntries);

public:
explicit TRestoreClient(const TDriver& driver, TLog& log);
explicit TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog>& log);

TRestoreResult Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings = {});

private:
TLog& Log;
NImport::TImportClient ImportClient;
NOperation::TOperationClient OperationClient;
NScheme::TSchemeClient SchemeClient;
NTable::TTableClient TableClient;
std::shared_ptr<TLog> Log;

}; // TRestoreClient

Expand Down
53 changes: 31 additions & 22 deletions ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "restore_import_data.h"

#include <ydb/public/lib/ydb_cli/common/retry_func.h>
#include <ydb/public/lib/ydb_cli/dump/util/log.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>

#include <library/cpp/string_utils/quote/quote.h>
Expand Down Expand Up @@ -662,29 +663,29 @@ class TTableRows {
return false;
}

TString GetData(ui64 memLimit, ui64 batchSize, bool force = false) {
TString GetData(ui64 memLimit, ui64 batchSize, bool force = false) {
Y_ENSURE(HasData(memLimit, batchSize, force));
Y_ENSURE(!ByMemSize.empty());
Y_ENSURE(!ByRecordsSize.empty());
Y_ENSURE(!ByMemSize.begin()->second.empty());
Y_ENSURE(!ByRecordsSize.begin()->second.empty());

auto get = [this, batchSize](TRowsBy& from) {
auto it = from.begin()->second.begin();
auto& rows = (*it)->second;
auto it = *from.begin()->second.begin();
auto& rows = it->second;

RemoveFromSizeTracker(ByMemSize, rows.MemSize(), *it);
RemoveFromSizeTracker(ByRecordsSize, rows.RecordsSize(), *it);
RemoveFromSizeTracker(ByMemSize, rows.MemSize(), it);
RemoveFromSizeTracker(ByRecordsSize, rows.RecordsSize(), it);

MemSize -= rows.MemSize();
auto ret = rows.Serialize(batchSize);
MemSize += rows.MemSize();

if (rows.MemSize()) {
Y_ENSURE(ByMemSize[rows.MemSize()].insert(*it).second);
Y_ENSURE(ByMemSize[rows.MemSize()].insert(it).second);
}
if (rows.RecordsSize()) {
Y_ENSURE(ByRecordsSize[rows.RecordsSize()].insert(*it).second);
Y_ENSURE(ByRecordsSize[rows.RecordsSize()].insert(it).second);
}

return ret;
Expand Down Expand Up @@ -739,9 +740,16 @@ class TDataAccumulator: public NPrivate::IDataAccumulator {
return Rows.GetData(MemLimit, BatchSize, force);
}

void Reshard(const TVector<TKeyRange>& keyRanges) {
void Reshard(const TVector<TKeyRange>& keyRanges, const TString& data) {
TGuard<TMutex> lock(Mutex);
Rows.Reshard(keyRanges);

TStringInput input(data);
TString line;

while (input.ReadLine(line)) {
Rows.Add(KeyBuilder.Build(line), std::move(line));
}
}

private:
Expand Down Expand Up @@ -792,6 +800,7 @@ class TDataWriter: public NPrivate::IDataWriter {
}

if (retryNumber == maxRetries) {
LOG_E("There is no retries left, last result: " << importResult.GetIssues().ToOneLineString());
return false;
}

Expand All @@ -801,19 +810,12 @@ class TDataWriter: public NPrivate::IDataWriter {
TMaybe<TTableDescription> desc;
auto descResult = DescribeTable(TableClient, Path, desc);
if (!descResult.IsSuccess()) {
LOG_E("Describe table " << Path.Quote() << " failed: " << descResult.GetIssues().ToOneLineString());
return false;
}

Accumulator->Reshard(desc->GetKeyRanges());

TStringInput input(data);
TString line;

while (input.ReadLine(line)) {
Accumulator->Feed(std::move(line));
}

break;
Accumulator->Reshard(desc->GetKeyRanges(), data);
return true;
}

case EStatus::ABORTED:
Expand Down Expand Up @@ -855,12 +857,14 @@ class TDataWriter: public NPrivate::IDataWriter {
const TRestoreSettings& settings,
TImportClient& importClient,
TTableClient& tableClient,
NPrivate::IDataAccumulator* accumulator)
NPrivate::IDataAccumulator* accumulator,
const std::shared_ptr<TLog>& log)
: Path(path)
, Settings(MakeSettings(settings, desc))
, ImportClient(importClient)
, TableClient(tableClient)
, Accumulator(dynamic_cast<TDataAccumulator*>(accumulator))
, Log(log)
, RateLimiterSettings(settings.RateLimiterSettings_)
, RequestLimiter(RateLimiterSettings.GetRps(), RateLimiterSettings.GetRps())
{
Expand All @@ -871,7 +875,10 @@ class TDataWriter: public NPrivate::IDataWriter {
}

bool Push(TString&& data) override {
Y_ENSURE(data.size() < TRestoreSettings::MaxBytesPerRequest, "Data is too long");
if (data.size() >= TRestoreSettings::MaxBytesPerRequest) {
LOG_E("Data is too long");
return false;
}

if (IsStopped()) {
return false;
Expand All @@ -896,6 +903,7 @@ class TDataWriter: public NPrivate::IDataWriter {
TImportClient& ImportClient;
TTableClient& TableClient;
TDataAccumulator* Accumulator;
const std::shared_ptr<TLog> Log;

const TRateLimiterSettings RateLimiterSettings;

Expand All @@ -922,8 +930,9 @@ NPrivate::IDataWriter* CreateImportDataWriter(
TImportClient& importClient,
TTableClient& tableClient,
NPrivate::IDataAccumulator* accumulator,
const TRestoreSettings& settings) {
return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator);
const TRestoreSettings& settings,
const std::shared_ptr<TLog>& log) {
return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator, log);
}

} // NDump
Expand Down
5 changes: 4 additions & 1 deletion ydb/public/lib/ydb_cli/dump/restore_import_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "restore_impl.h"

class TLog;

namespace NYdb {
namespace NDump {

Expand All @@ -16,7 +18,8 @@ NPrivate::IDataWriter* CreateImportDataWriter(
NImport::TImportClient& importClient,
NTable::TTableClient& tableClient,
NPrivate::IDataAccumulator* accumulator,
const TRestoreSettings& settings);
const TRestoreSettings& settings,
const std::shared_ptr<TLog>& log);

} // NDump
} // NYdb
14 changes: 14 additions & 0 deletions ydb/public/lib/ydb_cli/dump/util/log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <library/cpp/logger/log.h>

#define LOG_IMPL(log, level, message) \
if (log->FiltrationLevel() >= level) { \
log->Write(level, TStringBuilder() << message); \
} \
Y_SEMICOLON_GUARD

#define LOG_D(message) LOG_IMPL(Log, ELogPriority::TLOG_DEBUG, message)
#define LOG_I(message) LOG_IMPL(Log, ELogPriority::TLOG_INFO, message)
#define LOG_W(message) LOG_IMPL(Log, ELogPriority::TLOG_WARNING, message)
#define LOG_E(message) LOG_IMPL(Log, ELogPriority::TLOG_ERR, message)

0 comments on commit 8061cee

Please sign in to comment.