Skip to content

Commit

Permalink
WriteActror settings (ydb-platform#9251)
Browse files Browse the repository at this point in the history
  • Loading branch information
nikvas0 committed Sep 17, 2024
1 parent a459164 commit 207ae47
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 33 deletions.
22 changes: 22 additions & 0 deletions ydb/core/kqp/node_service/kqp_node_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
#include <ydb/core/kqp/runtime/kqp_read_iterator_common.h>
#include <ydb/core/kqp/runtime/kqp_write_actor_settings.h>
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
#include <ydb/core/kqp/common/kqp_resolve.h>

Expand Down Expand Up @@ -81,6 +82,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
if (config.HasIteratorReadQuotaSettings()) {
SetIteratorReadsQuotaSettings(config.GetIteratorReadQuotaSettings());
}
if (config.HasWriteActorSettings()) {
SetWriteActorSettings(config.GetWriteActorSettings());
}

SchedulerOptions = {
.AdvanceTimeInterval = TDuration::MicroSeconds(config.GetComputeSchedulerSettings().GetAdvanceTimeIntervalUsec()),
Expand Down Expand Up @@ -418,6 +422,24 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
SetReadIteratorBackoffSettings(ptr);
}

void SetWriteActorSettings(const NKikimrConfig::TTableServiceConfig::TWriteActorSettings& settings) {
auto ptr = MakeIntrusive<NKikimr::NKqp::TWriteActorSettings>();

ptr->InFlightMemoryLimitPerActorBytes = settings.GetInFlightMemoryLimitPerActorBytes();
ptr->MemoryLimitPerMessageBytes = settings.GetMemoryLimitPerMessageBytes();
ptr->MaxBatchesPerMessage = settings.GetMaxBatchesPerMessage();

ptr->StartRetryDelay = TDuration::MilliSeconds(settings.GetStartRetryDelayMs());
ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxRetryDelayMs());
ptr->UnsertaintyRatio = settings.GetUnsertaintyRatio();
ptr->Multiplier = settings.GetMultiplier();

ptr->MaxWriteAttempts = settings.GetMaxWriteAttempts();
ptr->MaxResolveAttempts = settings.GetMaxResolveAttempts();

NKikimr::NKqp::SetWriteActorSettings(ptr);
}

void HandleWork(TEvents::TEvUndelivered::TPtr& ev) {
switch (ev->Get()->SourceType) {
case TEvKqpNode::TEvStartKqpTasksResponse::EventType: {
Expand Down
50 changes: 18 additions & 32 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "kqp_write_actor.h"

#include "kqp_write_table.h"
#include "kqp_write_actor_settings.h"

#include <util/generic/singleton.h>
#include <ydb/core/actorlib_impl/long_timer.h>
Expand All @@ -23,32 +24,14 @@


namespace {
constexpr i64 kInFlightMemoryLimitPerActor = 64_MB;
constexpr i64 kMemoryLimitPerMessage = 64_MB;
constexpr i64 kMaxBatchesPerMessage = 8;

struct TWriteActorBackoffSettings {
TDuration StartRetryDelay = TDuration::MilliSeconds(250);
TDuration MaxRetryDelay = TDuration::Seconds(5);
double UnsertaintyRatio = 0.5;
double Multiplier = 2.0;

ui64 MaxWriteAttempts = 32;
ui64 MaxResolveAttempts = 5;
};

const TWriteActorBackoffSettings* BackoffSettings() {
return Singleton<TWriteActorBackoffSettings>();
}

TDuration CalculateNextAttemptDelay(ui64 attempt) {
auto delay = BackoffSettings()->StartRetryDelay;
TDuration CalculateNextAttemptDelay(const NKikimr::NKqp::TWriteActorSettings& settings, ui64 attempt) {
auto delay = settings.StartRetryDelay;
for (ui64 index = 0; index < attempt; ++index) {
delay *= BackoffSettings()->Multiplier;
delay *= settings.Multiplier;
}

delay *= 1 + BackoffSettings()->UnsertaintyRatio * (1 - 2 * RandomNumber<double>());
delay = Min(delay, BackoffSettings()->MaxRetryDelay);
delay *= 1 + settings.UnsertaintyRatio * (1 - 2 * RandomNumber<double>());
delay = Min(delay, settings.MaxRetryDelay);

return delay;
}
Expand Down Expand Up @@ -133,6 +116,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
TIntrusivePtr<TKqpCounters> counters)
: LogPrefix(TStringBuilder() << "TxId: " << args.TxId << ", task: " << args.TaskId << ". ")
, Settings(std::move(settings))
, MessageSettings(GetWriteActorSettings())
, OutputIndex(args.OutputIndex)
, Callbacks(args.Callback)
, Counters(counters)
Expand All @@ -149,6 +133,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
Settings.GetImmediateTx())
, InconsistentTx(
Settings.GetInconsistentTx())
, MemoryLimit(MessageSettings.InFlightMemoryLimitPerActorBytes)
{
YQL_ENSURE(std::holds_alternative<ui64>(TxId));
YQL_ENSURE(!ImmediateTx);
Expand Down Expand Up @@ -248,9 +233,9 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
}

void PlanResolveTable() {
CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(ResolveAttempts));
CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(MessageSettings, ResolveAttempts));
TlsActivationContext->Schedule(
CalculateNextAttemptDelay(ResolveAttempts),
CalculateNextAttemptDelay(MessageSettings, ResolveAttempts),
new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0));
}

Expand All @@ -262,7 +247,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
SchemeEntry.reset();
SchemeRequest.reset();

if (ResolveAttempts++ >= BackoffSettings()->MaxResolveAttempts) {
if (ResolveAttempts++ >= MessageSettings.MaxResolveAttempts) {
CA_LOG_E(TStringBuilder()
<< "Too many table resolve attempts for table " << TableId << ".");
RuntimeError(
Expand Down Expand Up @@ -581,7 +566,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
void SendDataToShard(const ui64 shardId) {
const auto metadata = ShardedWriteController->GetMessageMetadata(shardId);
YQL_ENSURE(metadata);
if (metadata->SendAttempts >= BackoffSettings()->MaxWriteAttempts) {
if (metadata->SendAttempts >= MessageSettings.MaxWriteAttempts) {
CA_LOG_E("ShardId=" << shardId
<< " for table '" << Settings.GetTable().GetPath()
<< "': retry limit exceeded."
Expand Down Expand Up @@ -651,7 +636,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu

if (InconsistentTx) {
TlsActivationContext->Schedule(
CalculateNextAttemptDelay(metadata->SendAttempts),
CalculateNextAttemptDelay(MessageSettings, metadata->SendAttempts),
new IEventHandle(
SelfId(),
SelfId(),
Expand Down Expand Up @@ -745,11 +730,11 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
try {
ShardedWriteController = CreateShardedWriteController(
TShardedWriteControllerSettings {
.MemoryLimitTotal = kInFlightMemoryLimitPerActor,
.MemoryLimitPerMessage = kMemoryLimitPerMessage,
.MemoryLimitTotal = MessageSettings.InFlightMemoryLimitPerActorBytes,
.MemoryLimitPerMessage = MessageSettings.MemoryLimitPerMessageBytes,
.MaxBatchesPerMessage = (SchemeEntry->Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable
? 1
: kMaxBatchesPerMessage),
: MessageSettings.MaxBatchesPerMessage),
},
std::move(columnsMetadata),
TypeEnv,
Expand Down Expand Up @@ -785,6 +770,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu

TString LogPrefix;
const NKikimrKqp::TKqpTableSinkSettings Settings;
TWriteActorSettings MessageSettings;
const ui64 OutputIndex;
NYql::NDq::TDqAsyncStats EgressStats;
NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr;
Expand All @@ -805,7 +791,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
THashMap<ui64, TLockInfo> LocksInfo;
bool Finished = false;

const i64 MemoryLimit = kInFlightMemoryLimitPerActor;
const i64 MemoryLimit;

IShardedWriteControllerPtr ShardedWriteController = nullptr;
};
Expand Down
28 changes: 28 additions & 0 deletions ydb/core/kqp/runtime/kqp_write_actor_settings.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include "kqp_write_actor_settings.h"

#include <library/cpp/threading/hot_swap/hot_swap.h>
#include <util/generic/singleton.h>


namespace NKikimr {
namespace NKqp {

struct TWriteActorDefaultSettings {
THotSwap<TWriteActorSettings> SettingsPtr;

TWriteActorDefaultSettings() {
SettingsPtr.AtomicStore(new TWriteActorSettings());
}

};

TWriteActorSettings GetWriteActorSettings() {
return *Singleton<TWriteActorDefaultSettings>()->SettingsPtr.AtomicLoad();
}

void SetWriteActorSettings(TIntrusivePtr<TWriteActorSettings> ptr) {
Singleton<TWriteActorDefaultSettings>()->SettingsPtr.AtomicStore(ptr);
}

}
}
28 changes: 28 additions & 0 deletions ydb/core/kqp/runtime/kqp_write_actor_settings.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <util/generic/ptr.h>
#include <util/datetime/base.h>
#include <util/generic/size_literals.h>

namespace NKikimr {
namespace NKqp {

struct TWriteActorSettings : TAtomicRefCount<TWriteActorSettings> {
i64 InFlightMemoryLimitPerActorBytes = 64_MB;
i64 MemoryLimitPerMessageBytes = 64_MB;
i64 MaxBatchesPerMessage = 1000;

TDuration StartRetryDelay = TDuration::Seconds(1);
TDuration MaxRetryDelay = TDuration::Seconds(10);
double UnsertaintyRatio = 0.5;
double Multiplier = 2.0;

ui64 MaxWriteAttempts = 100;
ui64 MaxResolveAttempts = 5;
};

TWriteActorSettings GetWriteActorSettings();
void SetWriteActorSettings(TIntrusivePtr<TWriteActorSettings> ptr);

}
}
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_write_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace NKqp {
namespace {

constexpr ui64 DataShardMaxOperationBytes = 8_MB;
constexpr ui64 ColumnShardMaxOperationBytes = 8_MB;
constexpr ui64 ColumnShardMaxOperationBytes = 64_MB;
constexpr ui64 MaxUnshardedBatchBytes = 0_MB;

class IPayloadSerializer : public TThrRefBase {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/runtime/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ SRCS(
kqp_stream_lookup_worker.h
kqp_tasks_runner.cpp
kqp_transport.cpp
kqp_write_actor_settings.cpp
kqp_write_actor.cpp
kqp_write_table.cpp
)
Expand Down
16 changes: 16 additions & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,20 @@ message TTableServiceConfig {
optional bool EnableRowsDuplicationCheck = 69 [ default = false ];

optional bool EnableHtapTx = 71 [default = false];

message TWriteActorSettings {
optional uint64 InFlightMemoryLimitPerActorBytes = 1 [ default = 67108864 ];
optional uint64 MemoryLimitPerMessageBytes = 2 [ default = 67108864 ];
optional uint64 MaxBatchesPerMessage = 3 [ default = 1000 ];

optional uint64 StartRetryDelayMs = 4 [ default = 1000 ];
optional uint64 MaxRetryDelayMs = 5 [ default = 10000 ];
optional double UnsertaintyRatio = 6 [ default = 0.5 ];
optional double Multiplier = 7 [ default = 2.0 ];

optional uint64 MaxWriteAttempts = 8 [ default = 100 ];
optional uint64 MaxResolveAttempts = 9 [ default = 5 ];
}

optional TWriteActorSettings WriteActorSettings = 72;
};

0 comments on commit 207ae47

Please sign in to comment.