Skip to content

Commit

Permalink
Fix tsan error (#5441)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Jun 11, 2024
1 parent 7a17f96 commit f2d1054
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 101 deletions.
122 changes: 122 additions & 0 deletions ydb/library/actors/core/mon_stats.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#include "mon_stats.h"


namespace NActors {

TLogHistogram::TLogHistogram() {
memset(Buckets, 0, sizeof(Buckets));
}

void TLogHistogram::Aggregate(const TLogHistogram& other) {
const ui64 inc = RelaxedLoad(&other.TotalSamples);
RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc);
for (size_t i = 0; i < Y_ARRAY_SIZE(Buckets); ++i) {
Buckets[i] += RelaxedLoad(&other.Buckets[i]);
}
}

ui32 TLogHistogram::Count() const {
return Y_ARRAY_SIZE(Buckets);
}

NMonitoring::TBucketBound TLogHistogram::UpperBound(ui32 index) const {
Y_ASSERT(index < Y_ARRAY_SIZE(Buckets));
if (index == 0) {
return 1;
}
return NMonitoring::TBucketBound(1ull << (index - 1)) * 2.0;
}

NMonitoring::TBucketValue TLogHistogram::Value(ui32 index) const {
Y_ASSERT(index < Y_ARRAY_SIZE(Buckets));
return Buckets[index];
}

TExecutorThreadStats::TExecutorThreadStats() // must be not empty as 0 used as default
: ElapsedTicksByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, ReceivedEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
{}

namespace {
template <typename T>
void AggregateOne(TVector<T>& self, const TVector<T>& other) {
const size_t selfSize = self.size();
const size_t otherSize = other.size();
if (selfSize < otherSize)
self.resize(otherSize);
for (size_t at = 0; at < otherSize; ++at)
self[at] += RelaxedLoad(&other[at]);
}
}

void TExecutorThreadStats::Aggregate(const TExecutorThreadStats& other) {
SentEvents += RelaxedLoad(&other.SentEvents);
ReceivedEvents += RelaxedLoad(&other.ReceivedEvents);
PreemptedEvents += RelaxedLoad(&other.PreemptedEvents);
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
CpuUs += RelaxedLoad(&other.CpuUs);
SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks);
RelaxedStore(
&WorstActivationTimeUs,
std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
ElapsedTicks += RelaxedLoad(&other.ElapsedTicks);
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
BlockedTicks += RelaxedLoad(&other.BlockedTicks);
MailboxPushedOutByTailSending += RelaxedLoad(&other.MailboxPushedOutByTailSending);
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions);

ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram);
EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram);
EventProcessingCountHistogram.Aggregate(other.EventProcessingCountHistogram);
EventProcessingTimeHistogram.Aggregate(other.EventProcessingTimeHistogram);

AggregateOne(ElapsedTicksByActivity, other.ElapsedTicksByActivity);
AggregateOne(ReceivedEventsByActivity, other.ReceivedEventsByActivity);
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);

// AggregatedCurrentActivationTime is readed and modified only from one thread
auto timeUs = RelaxedLoad(&other.CurrentActivationTime.TimeUs);
if (timeUs) {
AggregatedCurrentActivationTime.push_back(TActivationTime{
.TimeUs = timeUs,
.LastActivity = RelaxedLoad(&other.CurrentActivationTime.LastActivity)});
}
if (other.AggregatedCurrentActivationTime.size()) {
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
}

if (UsageByActivity.size() < other.UsageByActivity.size()) {
UsageByActivity.resize(other.UsageByActivity.size());
}
for (size_t i = 0; i < UsageByActivity.size(); ++i) {
for (size_t j = 0; j < 10; ++j) {
UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]);
}
}

RelaxedStore(
&PoolActorRegistrations,
std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations)));
RelaxedStore(
&PoolDestroyedActors,
std::max(RelaxedLoad(&PoolDestroyedActors), RelaxedLoad(&other.PoolDestroyedActors)));
RelaxedStore(
&PoolAllocatedMailboxes,
std::max(RelaxedLoad(&PoolAllocatedMailboxes), RelaxedLoad(&other.PoolAllocatedMailboxes)));
}

size_t TExecutorThreadStats::MaxActivityType() const {
return ActorsAliveByActivity.size();
}

}
109 changes: 8 additions & 101 deletions ydb/library/actors/core/mon_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

namespace NActors {
struct TLogHistogram : public NMonitoring::IHistogramSnapshot {
TLogHistogram() {
memset(Buckets, 0, sizeof(Buckets));
}
TLogHistogram();

inline void Add(ui64 val, ui64 inc = 1) {
size_t ind = 0;
Expand All @@ -29,31 +27,14 @@ namespace NActors {
RelaxedStore(&Buckets[ind], RelaxedLoad(&Buckets[ind]) + inc);
}

void Aggregate(const TLogHistogram& other) {
const ui64 inc = RelaxedLoad(&other.TotalSamples);
RelaxedStore(&TotalSamples, RelaxedLoad(&TotalSamples) + inc);
for (size_t i = 0; i < Y_ARRAY_SIZE(Buckets); ++i) {
Buckets[i] += RelaxedLoad(&other.Buckets[i]);
}
}
void Aggregate(const TLogHistogram& other);

// IHistogramSnapshot
ui32 Count() const override {
return Y_ARRAY_SIZE(Buckets);
}
ui32 Count() const override;

NMonitoring::TBucketBound UpperBound(ui32 index) const override {
Y_ASSERT(index < Y_ARRAY_SIZE(Buckets));
if (index == 0) {
return 1;
}
return NMonitoring::TBucketBound(1ull << (index - 1)) * 2.0;
}
NMonitoring::TBucketBound UpperBound(ui32 index) const override;

NMonitoring::TBucketValue Value(ui32 index) const override {
Y_ASSERT(index < Y_ARRAY_SIZE(Buckets));
return Buckets[index];
}
NMonitoring::TBucketValue Value(ui32 index) const override;

ui64 TotalSamples = 0;
ui64 Buckets[65];
Expand Down Expand Up @@ -126,85 +107,11 @@ namespace NActors {
ui64 MailboxPushedOutByEventCount = 0;
ui64 NotEnoughCpuExecutions = 0;

TExecutorThreadStats() // must be not empty as 0 used as default
: ElapsedTicksByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, ReceivedEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, ActorsAliveByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, ScheduledEventsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, StuckActorsByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
, UsageByActivity(TLocalProcessKeyStateIndexLimiter::GetMaxKeysCount())
{}

template <typename T>
static void AggregateOne(TVector<T>& self, const TVector<T>& other) {
const size_t selfSize = self.size();
const size_t otherSize = other.size();
if (selfSize < otherSize)
self.resize(otherSize);
for (size_t at = 0; at < otherSize; ++at)
self[at] += RelaxedLoad(&other[at]);
}
TExecutorThreadStats();

void Aggregate(const TExecutorThreadStats& other) {
SentEvents += RelaxedLoad(&other.SentEvents);
ReceivedEvents += RelaxedLoad(&other.ReceivedEvents);
PreemptedEvents += RelaxedLoad(&other.PreemptedEvents);
NonDeliveredEvents += RelaxedLoad(&other.NonDeliveredEvents);
EmptyMailboxActivation += RelaxedLoad(&other.EmptyMailboxActivation);
CpuUs += RelaxedLoad(&other.CpuUs);
SafeElapsedTicks += RelaxedLoad(&other.SafeElapsedTicks);
RelaxedStore(
&WorstActivationTimeUs,
std::max(RelaxedLoad(&WorstActivationTimeUs), RelaxedLoad(&other.WorstActivationTimeUs)));
ElapsedTicks += RelaxedLoad(&other.ElapsedTicks);
ParkedTicks += RelaxedLoad(&other.ParkedTicks);
BlockedTicks += RelaxedLoad(&other.BlockedTicks);
MailboxPushedOutByTailSending += RelaxedLoad(&other.MailboxPushedOutByTailSending);
MailboxPushedOutBySoftPreemption += RelaxedLoad(&other.MailboxPushedOutBySoftPreemption);
MailboxPushedOutByTime += RelaxedLoad(&other.MailboxPushedOutByTime);
MailboxPushedOutByEventCount += RelaxedLoad(&other.MailboxPushedOutByEventCount);
NotEnoughCpuExecutions += RelaxedLoad(&other.NotEnoughCpuExecutions);

ActivationTimeHistogram.Aggregate(other.ActivationTimeHistogram);
EventDeliveryTimeHistogram.Aggregate(other.EventDeliveryTimeHistogram);
EventProcessingCountHistogram.Aggregate(other.EventProcessingCountHistogram);
EventProcessingTimeHistogram.Aggregate(other.EventProcessingTimeHistogram);

AggregateOne(ElapsedTicksByActivity, other.ElapsedTicksByActivity);
AggregateOne(ReceivedEventsByActivity, other.ReceivedEventsByActivity);
AggregateOne(ActorsAliveByActivity, other.ActorsAliveByActivity);
AggregateOne(ScheduledEventsByActivity, other.ScheduledEventsByActivity);
AggregateOne(StuckActorsByActivity, other.StuckActorsByActivity);
if (other.CurrentActivationTime.TimeUs) {
AggregatedCurrentActivationTime.push_back(other.CurrentActivationTime);
}
if (other.AggregatedCurrentActivationTime.size()) {
AggregatedCurrentActivationTime.insert(AggregatedCurrentActivationTime.end(), other.AggregatedCurrentActivationTime.begin(), other.AggregatedCurrentActivationTime.end());
}

if (UsageByActivity.size() < other.UsageByActivity.size()) {
UsageByActivity.resize(other.UsageByActivity.size());
}
for (size_t i = 0; i < UsageByActivity.size(); ++i) {
for (size_t j = 0; j < 10; ++j) {
UsageByActivity[i][j] += RelaxedLoad(&other.UsageByActivity[i][j]);
}
}

RelaxedStore(
&PoolActorRegistrations,
std::max(RelaxedLoad(&PoolActorRegistrations), RelaxedLoad(&other.PoolActorRegistrations)));
RelaxedStore(
&PoolDestroyedActors,
std::max(RelaxedLoad(&PoolDestroyedActors), RelaxedLoad(&other.PoolDestroyedActors)));
RelaxedStore(
&PoolAllocatedMailboxes,
std::max(RelaxedLoad(&PoolAllocatedMailboxes), RelaxedLoad(&other.PoolAllocatedMailboxes)));
}
void Aggregate(const TExecutorThreadStats& other);

size_t MaxActivityType() const {
return ActorsAliveByActivity.size();
}
size_t MaxActivityType() const;
};

}
1 change: 1 addition & 0 deletions ydb/library/actors/core/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ SRCS(
mailbox_queue_simple.h
mon.cpp
mon.h
mon_stats.cpp
mon_stats.h
monotonic.cpp
monotonic.h
Expand Down

0 comments on commit f2d1054

Please sign in to comment.