From f2d10545fbfa51889faa3586197654567607b197 Mon Sep 17 00:00:00 2001 From: kruall Date: Tue, 11 Jun 2024 17:23:06 +0300 Subject: [PATCH] Fix tsan error (#5441) --- ydb/library/actors/core/mon_stats.cpp | 122 ++++++++++++++++++++++++++ ydb/library/actors/core/mon_stats.h | 109 ++--------------------- ydb/library/actors/core/ya.make | 1 + 3 files changed, 131 insertions(+), 101 deletions(-) create mode 100644 ydb/library/actors/core/mon_stats.cpp diff --git a/ydb/library/actors/core/mon_stats.cpp b/ydb/library/actors/core/mon_stats.cpp new file mode 100644 index 000000000000..6524e1c5ea75 --- /dev/null +++ b/ydb/library/actors/core/mon_stats.cpp @@ -0,0 +1,122 @@ +#include "mon_stats.h" + + +namespace NActors { + + TLogHistogram::TLogHistogram() { + memset(Buckets, 0, sizeof(Buckets)); + } + + void TLogHistogram::Aggregate(const TLogHistogram& other) { + const ui64 inc = RelaxedLoad(&other.TotalSamples); + RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc); + for (size_t i = 0; i < Y_ARRAY_SIZE(Buckets); ++i) { + Buckets[i] += RelaxedLoad(&other.Buckets[i]); + } + } + + ui32 TLogHistogram::Count() const { + return Y_ARRAY_SIZE(Buckets); + } + + NMonitoring::TBucketBound TLogHistogram::UpperBound(ui32 index) const { + Y_ASSERT(index < Y_ARRAY_SIZE(Buckets)); + if (index == 0) { + return 1; + } + return NMonitoring::TBucketBound(1ull << (index - 1)) * 2.0; + } + + NMonitoring::TBucketValue TLogHistogram::Value(ui32 index) const { + Y_ASSERT(index < Y_ARRAY_SIZE(Buckets)); + return Buckets[index]; + } + + TExecutorThreadStats::TExecutorThreadStats() // must be not empty as 0 used as default + : ElapsedTicksByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) + , ReceivedEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) + , ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) + , ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) + , StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) + , UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) + {} + + namespace { + template + void AggregateOne(TVector& self, const TVector& other) { + const size_t selfSize = self.size(); + const size_t otherSize = other.size(); + if (selfSize < otherSize) + self.resize(otherSize); + for (size_t at = 0; at < otherSize; ++at) + self[at] += RelaxedLoad(&other[at]); + } + } + + void TExecutorThreadStats::Aggregate(const TExecutorThreadStats& other) { + SentEvents += RelaxedLoad(&other.SentEvents); + ReceivedEvents += RelaxedLoad(&other.ReceivedEvents); + PreemptedEvents += RelaxedLoad(&other.PreemptedEvents); + NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents); + EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation); + CpuUs += RelaxedLoad(&other.CpuUs); + SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks); + RelaxedStore( + &WorstActivationTimeUs, + std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs))); + ElapsedTicks += RelaxedLoad(&other.ElapsedTicks); + ParkedTicks += RelaxedLoad(&other.ParkedTicks); + BlockedTicks += RelaxedLoad(&other.BlockedTicks); + MailboxPushedOutByTailSending += RelaxedLoad(&other.MailboxPushedOutByTailSending); + MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption); + MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime); + MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount); + NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions); + + ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram); + EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram); + EventProcessingCountHistogram.Aggregate(other.EventProcessingCountHistogram); + EventProcessingTimeHistogram.Aggregate(other.EventProcessingTimeHistogram); + + AggregateOne(ElapsedTicksByActivity, other.ElapsedTicksByActivity); + AggregateOne(ReceivedEventsByActivity, other.ReceivedEventsByActivity); + AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity); + AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity); + AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity); + + // AggregatedCurrentActivationTime is readed and modified only from one thread + auto timeUs = RelaxedLoad(&other.CurrentActivationTime.TimeUs); + if (timeUs) { + AggregatedCurrentActivationTime.push_back(TActivationTime{ + .TimeUs = timeUs, + .LastActivity = RelaxedLoad(&other.CurrentActivationTime.LastActivity)}); + } + if (other.AggregatedCurrentActivationTime.size()) { + AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end()); + } + + if (UsageByActivity.size() < other.UsageByActivity.size()) { + UsageByActivity.resize(other.UsageByActivity.size()); + } + for (size_t i = 0; i < UsageByActivity.size(); ++i) { + for (size_t j = 0; j < 10; ++j) { + UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]); + } + } + + RelaxedStore( + &PoolActorRegistrations, + std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations))); + RelaxedStore( + &PoolDestroyedActors, + std::max(RelaxedLoad(&PoolDestroyedActors), RelaxedLoad(&other.PoolDestroyedActors))); + RelaxedStore( + &PoolAllocatedMailboxes, + std::max(RelaxedLoad(&PoolAllocatedMailboxes), RelaxedLoad(&other.PoolAllocatedMailboxes))); + } + + size_t TExecutorThreadStats::MaxActivityType() const { + return ActorsAliveByActivity.size(); + } + +} \ No newline at end of file diff --git a/ydb/library/actors/core/mon_stats.h b/ydb/library/actors/core/mon_stats.h index d9d856342d80..9bb0df1b065e 100644 --- a/ydb/library/actors/core/mon_stats.h +++ b/ydb/library/actors/core/mon_stats.h @@ -8,9 +8,7 @@ namespace NActors { struct TLogHistogram : public NMonitoring::IHistogramSnapshot { - TLogHistogram() { - memset(Buckets, 0, sizeof(Buckets)); - } + TLogHistogram(); inline void Add(ui64 val, ui64 inc = 1) { size_t ind = 0; @@ -29,31 +27,14 @@ namespace NActors { RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc); } - void Aggregate(const TLogHistogram& other) { - const ui64 inc = RelaxedLoad(&other.TotalSamples); - RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc); - for (size_t i = 0; i < Y_ARRAY_SIZE(Buckets); ++i) { - Buckets[i] += RelaxedLoad(&other.Buckets[i]); - } - } + void Aggregate(const TLogHistogram& other); // IHistogramSnapshot - ui32 Count() const override { - return Y_ARRAY_SIZE(Buckets); - } + ui32 Count() const override; - NMonitoring::TBucketBound UpperBound(ui32 index) const override { - Y_ASSERT(index < Y_ARRAY_SIZE(Buckets)); - if (index == 0) { - return 1; - } - return NMonitoring::TBucketBound(1ull << (index - 1)) * 2.0; - } + NMonitoring::TBucketBound UpperBound(ui32 index) const override; - NMonitoring::TBucketValue Value(ui32 index) const override { - Y_ASSERT(index < Y_ARRAY_SIZE(Buckets)); - return Buckets[index]; - } + NMonitoring::TBucketValue Value(ui32 index) const override; ui64 TotalSamples = 0; ui64 Buckets[65]; @@ -126,85 +107,11 @@ namespace NActors { ui64 MailboxPushedOutByEventCount = 0; ui64 NotEnoughCpuExecutions = 0; - TExecutorThreadStats() // must be not empty as 0 used as default - : ElapsedTicksByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) - , ReceivedEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) - , ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) - , ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) - , StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) - , UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount()) - {} - - template - static void AggregateOne(TVector& self, const TVector& other) { - const size_t selfSize = self.size(); - const size_t otherSize = other.size(); - if (selfSize < otherSize) - self.resize(otherSize); - for (size_t at = 0; at < otherSize; ++at) - self[at] += RelaxedLoad(&other[at]); - } + TExecutorThreadStats(); - void Aggregate(const TExecutorThreadStats& other) { - SentEvents += RelaxedLoad(&other.SentEvents); - ReceivedEvents += RelaxedLoad(&other.ReceivedEvents); - PreemptedEvents += RelaxedLoad(&other.PreemptedEvents); - NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents); - EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation); - CpuUs += RelaxedLoad(&other.CpuUs); - SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks); - RelaxedStore( - &WorstActivationTimeUs, - std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs))); - ElapsedTicks += RelaxedLoad(&other.ElapsedTicks); - ParkedTicks += RelaxedLoad(&other.ParkedTicks); - BlockedTicks += RelaxedLoad(&other.BlockedTicks); - MailboxPushedOutByTailSending += RelaxedLoad(&other.MailboxPushedOutByTailSending); - MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption); - MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime); - MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount); - NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions); - - ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram); - EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram); - EventProcessingCountHistogram.Aggregate(other.EventProcessingCountHistogram); - EventProcessingTimeHistogram.Aggregate(other.EventProcessingTimeHistogram); - - AggregateOne(ElapsedTicksByActivity, other.ElapsedTicksByActivity); - AggregateOne(ReceivedEventsByActivity, other.ReceivedEventsByActivity); - AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity); - AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity); - AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity); - if (other.CurrentActivationTime.TimeUs) { - AggregatedCurrentActivationTime.push_back(other.CurrentActivationTime); - } - if (other.AggregatedCurrentActivationTime.size()) { - AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end()); - } - - if (UsageByActivity.size() < other.UsageByActivity.size()) { - UsageByActivity.resize(other.UsageByActivity.size()); - } - for (size_t i = 0; i < UsageByActivity.size(); ++i) { - for (size_t j = 0; j < 10; ++j) { - UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]); - } - } - - RelaxedStore( - &PoolActorRegistrations, - std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations))); - RelaxedStore( - &PoolDestroyedActors, - std::max(RelaxedLoad(&PoolDestroyedActors), RelaxedLoad(&other.PoolDestroyedActors))); - RelaxedStore( - &PoolAllocatedMailboxes, - std::max(RelaxedLoad(&PoolAllocatedMailboxes), RelaxedLoad(&other.PoolAllocatedMailboxes))); - } + void Aggregate(const TExecutorThreadStats& other); - size_t MaxActivityType() const { - return ActorsAliveByActivity.size(); - } + size_t MaxActivityType() const; }; } diff --git a/ydb/library/actors/core/ya.make b/ydb/library/actors/core/ya.make index 21e35845b0ee..44678b18b2a2 100644 --- a/ydb/library/actors/core/ya.make +++ b/ydb/library/actors/core/ya.make @@ -78,6 +78,7 @@ SRCS( mailbox_queue_simple.h mon.cpp mon.h + mon_stats.cpp mon_stats.h monotonic.cpp monotonic.h