Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resharding, add logging to writer #10864

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/public/lib/ydb_cli/dump/dump.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TClient::TImpl {
}

TRestoreResult Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings) {
auto client = TRestoreClient(Driver, *Log);
auto client = TRestoreClient(Driver, Log);
return client.Restore(fsPath, dbPath, settings);
}

Expand Down
30 changes: 9 additions & 21 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
#include <ydb/public/lib/ydb_cli/common/retry_func.h>
#include <ydb/public/lib/ydb_cli/dump/util/log.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>

#include <library/cpp/logger/log.h>

#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
#include <util/generic/maybe.h>
Expand All @@ -19,17 +18,6 @@
#include <util/string/builder.h>
#include <util/string/join.h>

#define LOG_IMPL(log, level, message) \
if (log.FiltrationLevel() >= level) { \
log.Write(level, TStringBuilder() << message); \
} \
Y_SEMICOLON_GUARD

#define LOG_D(message) LOG_IMPL(Log, ELogPriority::TLOG_DEBUG, message)
#define LOG_I(message) LOG_IMPL(Log, ELogPriority::TLOG_INFO, message)
#define LOG_W(message) LOG_IMPL(Log, ELogPriority::TLOG_WARNING, message)
#define LOG_E(message) LOG_IMPL(Log, ELogPriority::TLOG_ERR, message)

namespace NYdb {
namespace NDump {

Expand All @@ -48,7 +36,7 @@ bool IsFileExists(const TFsPath& path) {
return path.Exists() && path.IsFile();
}

Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, TLog& log) {
Ydb::Table::CreateTableRequest ReadTableScheme(const TString& fsPath, const TLog* log) {
LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read scheme from " << fsPath.Quote());
Ydb::Table::CreateTableRequest proto;
Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto));
Expand All @@ -64,7 +52,7 @@ TTableDescription TableDescriptionWithoutIndexesFromProto(Ydb::Table::CreateTabl
return TableDescriptionFromProto(proto);
}

Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath, TLog& log) {
Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TString& fsPath, const TLog* log) {
LOG_IMPL(log, ELogPriority::TLOG_DEBUG, "Read ACL from " << fsPath.Quote());
Ydb::Scheme::ModifyPermissionsRequest proto;
Y_ENSURE(google::protobuf::TextFormat::ParseFromString(TFileInput(fsPath).ReadAll(), &proto));
Expand Down Expand Up @@ -95,12 +83,12 @@ bool IsOperationStarted(TStatus operationStatus) {

} // anonymous

TRestoreClient::TRestoreClient(const TDriver& driver, TLog& log)
: Log(log)
, ImportClient(driver)
TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog>& log)
: ImportClient(driver)
, OperationClient(driver)
, SchemeClient(driver)
, TableClient(driver)
, Log(log)
{
}

Expand Down Expand Up @@ -257,7 +245,7 @@ TRestoreResult TRestoreClient::RestoreTable(const TFsPath& fsPath, const TString
TStringBuilder() << "There is incomplete file in folder: " << fsPath.GetPath());
}

auto scheme = ReadTableScheme(fsPath.Child(SCHEME_FILE_NAME), Log);
auto scheme = ReadTableScheme(fsPath.Child(SCHEME_FILE_NAME), Log.get());
auto dumpedDesc = TableDescriptionFromProto(scheme);

if (dumpedDesc.GetAttributes().contains(DOC_API_TABLE_VERSION_ATTR) && settings.SkipDocumentTables_) {
Expand Down Expand Up @@ -384,7 +372,7 @@ TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString&
}

accumulator.Reset(CreateImportDataAccumulator(desc, *actualDesc, settings));
writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings));
writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings, Log));

break;
}
Expand Down Expand Up @@ -504,7 +492,7 @@ TRestoreResult TRestoreClient::RestorePermissions(const TFsPath& fsPath, const T

LOG_D("Restore ACL " << fsPath.GetPath().Quote() << " to " << dbPath.Quote());

auto permissions = ReadPermissions(fsPath.Child(PERMISSIONS_FILE_NAME), Log);
auto permissions = ReadPermissions(fsPath.Child(PERMISSIONS_FILE_NAME), Log.get());
return ModifyPermissions(SchemeClient, dbPath, TModifyPermissionsSettings(permissions));
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/public/lib/ydb_cli/dump/restore_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ class TRestoreClient {
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const THashSet<TString>& oldEntries);

public:
explicit TRestoreClient(const TDriver& driver, TLog& log);
explicit TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog>& log);

TRestoreResult Restore(const TString& fsPath, const TString& dbPath, const TRestoreSettings& settings = {});

private:
TLog& Log;
NImport::TImportClient ImportClient;
NOperation::TOperationClient OperationClient;
NScheme::TSchemeClient SchemeClient;
NTable::TTableClient TableClient;
std::shared_ptr<TLog> Log;

}; // TRestoreClient

Expand Down
53 changes: 31 additions & 22 deletions ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "restore_import_data.h"

#include <ydb/public/lib/ydb_cli/common/retry_func.h>
#include <ydb/public/lib/ydb_cli/dump/util/log.h>
#include <ydb/public/lib/ydb_cli/dump/util/util.h>

#include <library/cpp/string_utils/quote/quote.h>
Expand Down Expand Up @@ -662,29 +663,29 @@ class TTableRows {
return false;
}

TString GetData(ui64 memLimit, ui64 batchSize, bool force = false) {
TString GetData(ui64 memLimit, ui64 batchSize, bool force = false) {
Y_ENSURE(HasData(memLimit, batchSize, force));
Y_ENSURE(!ByMemSize.empty());
Y_ENSURE(!ByRecordsSize.empty());
Y_ENSURE(!ByMemSize.begin()->second.empty());
Y_ENSURE(!ByRecordsSize.begin()->second.empty());

auto get = [this, batchSize](TRowsBy& from) {
auto it = from.begin()->second.begin();
auto& rows = (*it)->second;
auto it = *from.begin()->second.begin();
auto& rows = it->second;

RemoveFromSizeTracker(ByMemSize, rows.MemSize(), *it);
RemoveFromSizeTracker(ByRecordsSize, rows.RecordsSize(), *it);
RemoveFromSizeTracker(ByMemSize, rows.MemSize(), it);
RemoveFromSizeTracker(ByRecordsSize, rows.RecordsSize(), it);

MemSize -= rows.MemSize();
auto ret = rows.Serialize(batchSize);
MemSize += rows.MemSize();

if (rows.MemSize()) {
Y_ENSURE(ByMemSize[rows.MemSize()].insert(*it).second);
Y_ENSURE(ByMemSize[rows.MemSize()].insert(it).second);
}
if (rows.RecordsSize()) {
Y_ENSURE(ByRecordsSize[rows.RecordsSize()].insert(*it).second);
Y_ENSURE(ByRecordsSize[rows.RecordsSize()].insert(it).second);
}

return ret;
Expand Down Expand Up @@ -739,9 +740,16 @@ class TDataAccumulator: public NPrivate::IDataAccumulator {
return Rows.GetData(MemLimit, BatchSize, force);
}

void Reshard(const TVector<TKeyRange>& keyRanges) {
void Reshard(const TVector<TKeyRange>& keyRanges, const TString& data) {
TGuard<TMutex> lock(Mutex);
Rows.Reshard(keyRanges);

TStringInput input(data);
TString line;

while (input.ReadLine(line)) {
Rows.Add(KeyBuilder.Build(line), std::move(line));
}
}

private:
Expand Down Expand Up @@ -792,6 +800,7 @@ class TDataWriter: public NPrivate::IDataWriter {
}

if (retryNumber == maxRetries) {
LOG_E("There is no retries left, last result: " << importResult.GetIssues().ToOneLineString());
return false;
}

Expand All @@ -801,19 +810,12 @@ class TDataWriter: public NPrivate::IDataWriter {
TMaybe<TTableDescription> desc;
auto descResult = DescribeTable(TableClient, Path, desc);
if (!descResult.IsSuccess()) {
LOG_E("Describe table " << Path.Quote() << " failed: " << descResult.GetIssues().ToOneLineString());
return false;
}

Accumulator->Reshard(desc->GetKeyRanges());

TStringInput input(data);
TString line;

while (input.ReadLine(line)) {
Accumulator->Feed(std::move(line));
}

break;
Accumulator->Reshard(desc->GetKeyRanges(), data);
return true;
}

case EStatus::ABORTED:
Expand Down Expand Up @@ -855,12 +857,14 @@ class TDataWriter: public NPrivate::IDataWriter {
const TRestoreSettings& settings,
TImportClient& importClient,
TTableClient& tableClient,
NPrivate::IDataAccumulator* accumulator)
NPrivate::IDataAccumulator* accumulator,
const std::shared_ptr<TLog>& log)
: Path(path)
, Settings(MakeSettings(settings, desc))
, ImportClient(importClient)
, TableClient(tableClient)
, Accumulator(dynamic_cast<TDataAccumulator*>(accumulator))
, Log(log)
, RateLimiterSettings(settings.RateLimiterSettings_)
, RequestLimiter(RateLimiterSettings.GetRps(), RateLimiterSettings.GetRps())
{
Expand All @@ -871,7 +875,10 @@ class TDataWriter: public NPrivate::IDataWriter {
}

bool Push(TString&& data) override {
Y_ENSURE(data.size() < TRestoreSettings::MaxBytesPerRequest, "Data is too long");
if (data.size() >= TRestoreSettings::MaxBytesPerRequest) {
LOG_E("Data is too long");
return false;
}

if (IsStopped()) {
return false;
Expand All @@ -896,6 +903,7 @@ class TDataWriter: public NPrivate::IDataWriter {
TImportClient& ImportClient;
TTableClient& TableClient;
TDataAccumulator* Accumulator;
const std::shared_ptr<TLog> Log;

const TRateLimiterSettings RateLimiterSettings;

Expand All @@ -922,8 +930,9 @@ NPrivate::IDataWriter* CreateImportDataWriter(
TImportClient& importClient,
TTableClient& tableClient,
NPrivate::IDataAccumulator* accumulator,
const TRestoreSettings& settings) {
return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator);
const TRestoreSettings& settings,
const std::shared_ptr<TLog>& log) {
return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator, log);
}

} // NDump
Expand Down
5 changes: 4 additions & 1 deletion ydb/public/lib/ydb_cli/dump/restore_import_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "restore_impl.h"

class TLog;

namespace NYdb {
namespace NDump {

Expand All @@ -16,7 +18,8 @@ NPrivate::IDataWriter* CreateImportDataWriter(
NImport::TImportClient& importClient,
NTable::TTableClient& tableClient,
NPrivate::IDataAccumulator* accumulator,
const TRestoreSettings& settings);
const TRestoreSettings& settings,
const std::shared_ptr<TLog>& log);

} // NDump
} // NYdb
14 changes: 14 additions & 0 deletions ydb/public/lib/ydb_cli/dump/util/log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <library/cpp/logger/log.h>

#define LOG_IMPL(log, level, message) \
if (log->FiltrationLevel() >= level) { \
log->Write(level, TStringBuilder() << message); \
} \
Y_SEMICOLON_GUARD

#define LOG_D(message) LOG_IMPL(Log, ELogPriority::TLOG_DEBUG, message)
#define LOG_I(message) LOG_IMPL(Log, ELogPriority::TLOG_INFO, message)
#define LOG_W(message) LOG_IMPL(Log, ELogPriority::TLOG_WARNING, message)
#define LOG_E(message) LOG_IMPL(Log, ELogPriority::TLOG_ERR, message)
Loading