Skip to content

Commit

Permalink
Add support for delaying interconnect events (#5954)
Browse files Browse the repository at this point in the history
  • Loading branch information
snaury authored Jun 26, 2024
1 parent 68df614 commit afab5f3
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 6 deletions.
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ static TInterconnectSettings GetInterconnectSettings(const NKikimrConfig::TInter
result.ErrorSleepRetryMultiplier = config.GetErrorSleepRetryMultiplier();
}

if (config.HasEventDelayMicrosec()) {
result.EventDelay = TDuration::MicroSeconds(config.GetEventDelayMicrosec());
}

return result;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ message TInterconnectConfig {
optional double ErrorSleepRetryMultiplier = 48;

optional uint32 OutgoingHandshakeInflightLimit = 43;

optional uint64 EventDelayMicrosec = 49;
}

message TChannelProfileConfig {
Expand Down
3 changes: 3 additions & 0 deletions ydb/library/actors/core/interconnect.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ namespace NActors {
EvCloseInputSession,
EvPoisonSession,
EvTerminate,
EvForwardDelayed,
EvEnd
};

Expand Down Expand Up @@ -264,5 +265,7 @@ namespace NActors {
struct TEvPoisonSession : TEventLocal<TEvPoisonSession, EvPoisonSession> {};

struct TEvTerminate : TEventLocal<TEvTerminate, EvTerminate> {};

struct TEvForwardDelayed : TEventLocal<TEvForwardDelayed, EvForwardDelayed> {};
};
}
1 change: 1 addition & 0 deletions ydb/library/actors/interconnect/interconnect_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ namespace NActors {
TDuration FirstErrorSleep = TDuration::MilliSeconds(10);
TDuration MaxErrorSleep = TDuration::Seconds(1);
double ErrorSleepRetryMultiplier = 4.0;
TDuration EventDelay = TDuration::Zero();

ui32 GetSendBufferSize() const {
ui32 res = 512 * 1024; // 512 kb is the default value for send buffer
Expand Down
43 changes: 37 additions & 6 deletions ydb/library/actors/interconnect/interconnect_tcp_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ namespace NActors {
Proxy->Metrics->SubSubscribersCount(Subscribers.size());
Subscribers.clear();

for (auto& d : DelayedEvents) {
d.Span.EndError("nondelivery");
TActivationContext::Send(IEventHandle::ForwardOnNondelivery(d.Event, TEvents::TEvUndelivered::Disconnected));
}
DelayedEvents.clear();

ChannelScheduler->ForEach([&](TEventOutputChannel& channel) {
channel.NotifyUndelivered();
});
Expand All @@ -115,16 +121,12 @@ namespace NActors {
Y_ABORT("TInterconnectSessionTCP::PassAway() can't be called directly");
}

void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
Proxy->ValidateEvent(ev, "Forward");
void TInterconnectSessionTCP::Enqueue(STATEFN_SIG) {
Proxy->ValidateEvent(ev, "Enqueue");

LOG_DEBUG_IC_SESSION("ICS02", "send event from: %s to: %s", ev->Sender.ToString().data(), ev->Recipient.ToString().data());
++MessagesGot;

if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Subscribe(ev);
}

ui16 evChannel = ev->GetChannel();
auto& oChannel = ChannelScheduler->GetOutputChannel(evChannel);
const bool wasWorking = oChannel.IsWorking();
Expand Down Expand Up @@ -164,6 +166,35 @@ namespace NActors {
IssueRam(true);
}

void TInterconnectSessionTCP::Forward(STATEFN_SIG) {
Proxy->ValidateEvent(ev, "Forward");

if (ev->Flags & IEventHandle::FlagSubscribeOnSession) {
Subscribe(ev);
}

if (Y_UNLIKELY(Proxy->Common->Settings.EventDelay)) {
auto& d = DelayedEvents.emplace_back();
d.Event = std::move(ev);
if (Y_UNLIKELY(d.Event->TraceId)) {
d.Span = NWilson::TSpan(15 /*max verbosity*/, std::move(d.Event->TraceId), "Interconnect.Delay");
// Reparent event to the delay span
d.Event->TraceId = d.Span.GetTraceId();
}
Schedule(Proxy->Common->Settings.EventDelay, new TEvInterconnect::TEvForwardDelayed);
} else {
Enqueue(ev);
}
}

void TInterconnectSessionTCP::ForwardDelayed() {
Y_ABORT_UNLESS(!DelayedEvents.empty());
auto d = std::move(DelayedEvents.front());
DelayedEvents.pop_front();
d.Span.End();
Enqueue(d.Event);
}

void TInterconnectSessionTCP::Subscribe(STATEFN_SIG) {
LOG_DEBUG_IC_SESSION("ICS04", "subscribe for session state for %s", ev->Sender.ToString().data());
const auto [it, inserted] = Subscribers.emplace(ev->Sender, ev->Cookie);
Expand Down
9 changes: 9 additions & 0 deletions ydb/library/actors/interconnect/interconnect_tcp_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,9 @@ namespace NActors {
void Terminate(TDisconnectReason reason);
void PassAway() override;

void Enqueue(STATEFN_SIG);
void Forward(STATEFN_SIG);
void ForwardDelayed();
void Subscribe(STATEFN_SIG);
void Unsubscribe(STATEFN_SIG);

Expand All @@ -456,6 +458,7 @@ namespace NActors {
TimeLimit.emplace(GetMaxCyclesPerEvent());
STRICT_STFUNC_BODY(
fFunc(TEvInterconnect::EvForward, Forward)
cFunc(TEvInterconnect::EvForwardDelayed, ForwardDelayed)
cFunc(TEvents::TEvPoisonPill::EventType, HandlePoison)
fFunc(TEvInterconnect::TEvConnectNode::EventType, Subscribe)
fFunc(TEvents::TEvSubscribe::EventType, Subscribe)
Expand Down Expand Up @@ -611,6 +614,12 @@ namespace NActors {

std::unordered_map<TActorId, ui64, TActorId::THash> Subscribers;

struct TDelayedEvent {
TAutoPtr<IEventHandle> Event;
NWilson::TSpan Span;
};
std::deque<TDelayedEvent> DelayedEvents;

// time at which we want to send confirmation packet even if there was no outgoing data
ui64 UnconfirmedBytes = 0;
TMonotonic ForcePacketTimestamp = TMonotonic::Max();
Expand Down

0 comments on commit afab5f3

Please sign in to comment.