Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix walking back start of elapsing time #4578

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ namespace NActors {
ThreadQueue.Push(workerId + 1, revolvingCounter);

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);

if (threadCtx.WaitingPad.Park())
return 0;

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
wctx.AddParkedCycles(hpnow - hpprev);
}
Expand Down
22 changes: 12 additions & 10 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,16 @@ namespace NActors {
bool firstEvent = true;
bool preempted = false;
bool wasWorking = false;
NHPTimer::STime hpnow = Ctx.HPStart;
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpnow = Ctx.HPStart;
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
hpprev = Ctx.HPStart;
NHPTimer::STime eventStart = Ctx.HPStart;

for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
mailbox->ProcessEvents(mailbox);
recipient = evExt->GetRecipientRewrite();
TActorContext ctx(*mailbox, *this, hpprev, recipient);
TActorContext ctx(*mailbox, *this, eventStart, recipient);
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
// move for destruct before ctx;
auto ev = std::move(evExt);
Expand Down Expand Up @@ -250,7 +250,7 @@ namespace NActors {
actor->Receive(ev);

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);

mailbox->ProcessEvents(mailbox);
actor->OnDequeueEvent();
Expand All @@ -265,8 +265,9 @@ namespace NActors {

if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
reclaimAsFree = true;

NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);

Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter);
if (elapsed > 1000000) {
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
}
Expand All @@ -286,9 +287,10 @@ namespace NActors {
Ctx.IncrementNonDeliveredEvents();
}
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
}
eventStart = hpnow;

if (TlsThreadContext->CapturedType == ESendingType::Tail) {
AtomicStore(&mailbox->ScheduleMoment, hpnow);
Expand Down Expand Up @@ -778,7 +780,7 @@ namespace NActors {
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Expand All @@ -790,7 +792,7 @@ namespace NActors {
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ namespace NActors {
}

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
do {
if (WaitingPad.Park()) // interrupted
return true;
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
state = GetState<TWaitState>();
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
Expand Down
16 changes: 15 additions & 1 deletion ydb/library/actors/core/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "defs.h"

#include <atomic>
#include <ydb/library/actors/util/datetime.h>
#include <ydb/library/actors/util/mpmc_ring_queue.h>

Expand Down Expand Up @@ -29,10 +30,23 @@ namespace NActors {
bool IsCurrentRecipientAService = false;
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;

std::atomic<ui64> StartOfElapsingTime = 0;
std::atomic<i64> StartOfElapsingTime = 0;
std::atomic<ui64> ElapsingActorActivity = 0;
TWorkerContext *WorkerCtx = nullptr;
ui32 ActorSystemIndex = 0;

ui64 UpdateStartOfElapsingTime(i64 newValue) {
i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
for (;;) {
if (newValue - oldValue <= 0) {
break;
}
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
break;
}
}
return oldValue;
}
};

extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp
Expand Down
13 changes: 8 additions & 5 deletions ydb/library/actors/core/worker_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ namespace NActors {
}

void AddElapsedCycles(ui32 activityType, i64 elapsed) {
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
if (Y_LIKELY(elapsed > 0)) {
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
}
}

void AddParkedCycles(i64 elapsed) {
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
if (Y_LIKELY(elapsed > 0)) {
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
}
}

void AddBlockedCycles(i64 elapsed) {
Expand Down Expand Up @@ -126,7 +130,6 @@ namespace NActors {
RelaxedStore(&Stats->ReceivedEvents, RelaxedLoad(&Stats->ReceivedEvents) + 1);
RelaxedStore(&Stats->ReceivedEventsByActivity[activityType], RelaxedLoad(&Stats->ReceivedEventsByActivity[activityType]) + 1);
RelaxedStore(&Stats->ScheduledEventsByActivity[activityType], RelaxedLoad(&Stats->ScheduledEventsByActivity[activityType]) + scheduled);
AddElapsedCycles(activityType, elapsed);
return elapsed;
}

Expand Down
Loading