Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve base stats propagation logic (#1741) #1857

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ydb/core/protos/statistics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ message TEvConnectNode {
message TEvRequestStats {
optional uint32 NodeId = 1;
repeated fixed64 NeedSchemeShards = 2;
optional bool Urgent = 3;
}

// SA -> nodes
// SA -> nodes, node -> nodes
message TEvPropagateStatistics {
repeated uint32 NodeIds = 1; // hierarchical propagation
message TStatsEntry {
Expand All @@ -57,6 +58,10 @@ message TEvPropagateStatistics {
repeated TStatsEntry Entries = 2;
}

// node -> SA, node -> node
message TEvPropagateStatisticsResponse {
}

// SA -> nodes
message TEvStatisticsIsDisabled {
}
142 changes: 104 additions & 38 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TT
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
{
PropagateInterval = forTests ? TDuration::Seconds(5) : TDuration::Minutes(3);
PropagateTimeout = forTests ? TDuration::Seconds(3) : TDuration::Minutes(2);

auto seed = std::random_device{}();
RandomGenerator.seed(seed);
Expand Down Expand Up @@ -124,11 +125,6 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectNode::TPtr& ev) {
return;
}

if (!IsPropagateInFlight) {
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
IsPropagateInFlight = true;
}

if (!record.NeedSchemeShardsSize()) {
return;
}
Expand All @@ -149,14 +145,28 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) {

SA_LOG_D("[" << TabletID() << "] EvRequestStats"
<< ", node id = " << nodeId
<< ", schemeshard count = " << record.NeedSchemeShardsSize());
<< ", schemeshard count = " << record.NeedSchemeShardsSize()
<< ", urgent = " << record.GetUrgent());

if (!EnableStatistics) {
auto disabled = std::make_unique<TEvStatistics::TEvStatisticsIsDisabled>();
Send(NStat::MakeStatServiceID(nodeId), disabled.release());
return;
}

for (const auto& ssId : record.GetNeedSchemeShards()) {
RequestedSchemeShards.insert(ssId);
}

if (record.GetUrgent()) {
PendingRequests.push(std::move(ev));
if (!ProcessUrgentInFlight) {
Send(SelfId(), new TEvPrivate::TEvProcessUrgent());
ProcessUrgentInFlight = true;
}
return;
}

std::vector<TSSId> ssIds;
ssIds.reserve(record.NeedSchemeShardsSize());
for (const auto& ssId : record.GetNeedSchemeShards()) {
Expand Down Expand Up @@ -206,6 +216,60 @@ void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagate::TPtr&) {
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
}

void TStatisticsAggregator::Handle(TEvStatistics::TEvPropagateStatisticsResponse::TPtr&) {
if (!PropagationInFlight) {
return;
}
if (LastSSIndex < PropagationSchemeShards.size()) {
LastSSIndex = PropagatePart(PropagationNodes, PropagationSchemeShards, LastSSIndex, true);
} else {
PropagationInFlight = false;
PropagationNodes.clear();
PropagationSchemeShards.clear();
LastSSIndex = 0;
}
}

void TStatisticsAggregator::Handle(TEvPrivate::TEvProcessUrgent::TPtr&) {
SA_LOG_D("[" << TabletID() << "] EvProcessUrgent");

ProcessUrgentInFlight = false;

if (PendingRequests.empty()) {
return;
}

TEvStatistics::TEvRequestStats::TPtr ev = std::move(PendingRequests.front());
PendingRequests.pop();

if (!PendingRequests.empty()) {
Send(SelfId(), new TEvPrivate::TEvProcessUrgent());
ProcessUrgentInFlight = true;
}

auto record = ev->Get()->Record;
const auto nodeId = record.GetNodeId();

std::vector<TSSId> ssIds;
ssIds.reserve(record.NeedSchemeShardsSize());
for (const auto& ssId : record.GetNeedSchemeShards()) {
ssIds.push_back(ssId);
}

SendStatisticsToNode(nodeId, ssIds);
}

void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagateTimeout::TPtr&) {
SA_LOG_D("[" << TabletID() << "] EvPropagateTimeout");

if (PropagationInFlight) {
PropagationInFlight = false;
PropagationNodes.clear();
PropagationSchemeShards.clear();
LastSSIndex = 0;
}
}

void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector<TSSId>& ssIds) {
if (FastCounter > 0) {
--FastCounter;
Expand All @@ -217,7 +281,7 @@ void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector<TS
}
}
if (!FastCheckInFlight) {
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck());
Schedule(FastCheckInterval, new TEvPrivate::TEvFastPropagateCheck());
FastCheckInFlight = true;
}
}
Expand All @@ -230,7 +294,7 @@ void TStatisticsAggregator::SendStatisticsToNode(TNodeId nodeId, const std::vect
std::vector<TNodeId> nodeIds;
nodeIds.push_back(nodeId);

PropagateStatisticsImpl(nodeIds, ssIds);
PropagatePart(nodeIds, ssIds, 0, false);
}

void TStatisticsAggregator::PropagateStatistics() {
Expand All @@ -255,7 +319,13 @@ void TStatisticsAggregator::PropagateStatistics() {
ssIds.push_back(ssId);
}

PropagateStatisticsImpl(nodeIds, ssIds);
Schedule(PropagateTimeout, new TEvPrivate::TEvPropagateTimeout);

PropagationInFlight = true;
PropagationNodes = std::move(nodeIds);
PropagationSchemeShards = std::move(ssIds);

LastSSIndex = PropagatePart(PropagationNodes, PropagationSchemeShards, 0, true);
}

void TStatisticsAggregator::PropagateFastStatistics() {
Expand All @@ -280,43 +350,39 @@ void TStatisticsAggregator::PropagateFastStatistics() {
ssIds.push_back(ssId);
}

PropagateStatisticsImpl(nodeIds, ssIds);
PropagatePart(nodeIds, ssIds, 0, false);
}

void TStatisticsAggregator::PropagateStatisticsImpl(
const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds)
size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds,
size_t lastSSIndex, bool useSizeLimit)
{
if (nodeIds.empty() || ssIds.empty()) {
return;
}
auto propagate = std::make_unique<TEvStatistics::TEvPropagateStatistics>();
auto* record = propagate->MutableRecord();

TNodeId leadingNodeId = nodeIds[0];
record->MutableNodeIds()->Reserve(nodeIds.size() - 1);
for (size_t i = 1; i < nodeIds.size(); ++i) {
record->AddNodeIds(nodeIds[i]);
}

for (size_t index = 0; index < ssIds.size(); ) {
auto propagate = std::make_unique<TEvStatistics::TEvPropagateStatistics>();
auto* record = propagate->MutableRecord();
record->MutableNodeIds()->Reserve(nodeIds.size() - 1);
for (size_t i = 1; i < nodeIds.size(); ++i) {
record->AddNodeIds(nodeIds[i]);
}
for (size_t size = 0; index < ssIds.size(); ++index) {
auto ssId = ssIds[index];
auto* entry = record->AddEntries();
entry->SetSchemeShardId(ssId);
auto itStats = BaseStats.find(ssId);
if (itStats != BaseStats.end()) {
entry->SetStats(itStats->second);
size += itStats->second.size();
} else {
entry->SetStats(TString()); // stats are not sent from SA yet
}
if (size >= StatsSizeLimitBytes) {
++index;
break;
}
size_t sizeLimit = useSizeLimit ? StatsSizeLimitBytes : std::numeric_limits<size_t>::max();
size_t index = lastSSIndex;
for (size_t size = 0; index < ssIds.size() && size < sizeLimit; ++index) {
auto ssId = ssIds[index];
auto* entry = record->AddEntries();
entry->SetSchemeShardId(ssId);
auto itStats = BaseStats.find(ssId);
if (itStats != BaseStats.end()) {
entry->SetStats(itStats->second);
size += itStats->second.size();
} else {
entry->SetStats(TString()); // stats are not sent from SS yet
}
Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release());
}

Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release());

return index;
}

void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
Expand Down
24 changes: 22 additions & 2 deletions ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
enum EEv {
EvPropagate = EventSpaceBegin(TEvents::ES_PRIVATE),
EvFastPropagateCheck,
EvProcessUrgent,
EvPropagateTimeout,

EvEnd
};

struct TEvPropagate : public TEventLocal<TEvPropagate, EvPropagate> {};
struct TEvFastPropagateCheck : public TEventLocal<TEvFastPropagateCheck, EvFastPropagateCheck> {};
struct TEvProcessUrgent : public TEventLocal<TEvProcessUrgent, EvProcessUrgent> {};
struct TEvPropagateTimeout : public TEventLocal<TEvPropagateTimeout, EvPropagateTimeout> {};
};

private:
Expand All @@ -73,12 +77,16 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev);
void Handle(TEvPrivate::TEvFastPropagateCheck::TPtr& ev);
void Handle(TEvStatistics::TEvPropagateStatisticsResponse::TPtr& ev);
void Handle(TEvPrivate::TEvProcessUrgent::TPtr& ev);
void Handle(TEvPrivate::TEvPropagateTimeout::TPtr& ev);

void ProcessRequests(TNodeId nodeId, const std::vector<TSSId>& ssIds);
void SendStatisticsToNode(TNodeId nodeId, const std::vector<TSSId>& ssIds);
void PropagateStatistics();
void PropagateFastStatistics();
void PropagateStatisticsImpl(const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds);
size_t PropagatePart(const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds,
size_t lastSSIndex, bool useSizeLimit);

void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);

Expand All @@ -99,6 +107,9 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
hFunc(TEvTabletPipe::TEvServerConnected, Handle);
hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
hFunc(TEvPrivate::TEvFastPropagateCheck, Handle);
hFunc(TEvStatistics::TEvPropagateStatisticsResponse, Handle);
hFunc(TEvPrivate::TEvProcessUrgent, Handle);
hFunc(TEvPrivate::TEvPropagateTimeout, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
LOG_CRIT(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
Expand All @@ -118,7 +129,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
static constexpr size_t StatsSizeLimitBytes = 2 << 20; // limit for stats size in one message

TDuration PropagateInterval;
bool IsPropagateInFlight = false; // is slow propagation started
TDuration PropagateTimeout;
static constexpr TDuration FastCheckInterval = TDuration::MilliSeconds(50);

std::unordered_map<TSSId, TString> BaseStats; // schemeshard id -> serialized stats for all paths

Expand All @@ -134,6 +146,14 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
bool FastCheckInFlight = false;
std::unordered_set<TNodeId> FastNodes; // nodes for fast propagation
std::unordered_set<TSSId> FastSchemeShards; // schemeshards for fast propagation

bool PropagationInFlight = false;
std::vector<TNodeId> PropagationNodes;
std::vector<TSSId> PropagationSchemeShards;
size_t LastSSIndex = 0;

std::queue<TEvStatistics::TEvRequestStats::TPtr> PendingRequests;
bool ProcessUrgentInFlight = false;
};

} // NKikimr::NStat
2 changes: 2 additions & 0 deletions ydb/core/statistics/aggregator/tx_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
Self->EnableStatistics = AppData(ctx)->FeatureFlags.GetEnableStatistics();
Self->SubscribeForConfigChanges(ctx);

Self->Schedule(Self->PropagateInterval, new TEvPrivate::TEvPropagate());

Self->Become(&TThis::StateWork);
}
};
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/statistics/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct TEvStatistics {
EvRequestStats,
EvPropagateStatistics,
EvStatisticsIsDisabled,
EvPropagateStatisticsResponse,

EvEnd
};
Expand Down Expand Up @@ -115,6 +116,12 @@ struct TEvStatistics {
NKikimrStat::TEvStatisticsIsDisabled,
EvStatisticsIsDisabled>
{};

struct TEvPropagateStatisticsResponse : public TEventPB<
TEvPropagateStatisticsResponse,
NKikimrStat::TEvPropagateStatisticsResponse,
EvPropagateStatisticsResponse>
{};
};

} // NStat
Expand Down
Loading
Loading