diff --git a/ydb/core/protos/statistics.proto b/ydb/core/protos/statistics.proto index 88efbd1b70e5..6767ca6fc251 100644 --- a/ydb/core/protos/statistics.proto +++ b/ydb/core/protos/statistics.proto @@ -44,9 +44,10 @@ message TEvConnectNode { message TEvRequestStats { optional uint32 NodeId = 1; repeated fixed64 NeedSchemeShards = 2; + optional bool Urgent = 3; } -// SA -> nodes +// SA -> nodes, node -> nodes message TEvPropagateStatistics { repeated uint32 NodeIds = 1; // hierarchical propagation message TStatsEntry { @@ -57,6 +58,10 @@ message TEvPropagateStatistics { repeated TStatsEntry Entries = 2; } +// node -> SA, node -> node +message TEvPropagateStatisticsResponse { +} + // SA -> nodes message TEvStatisticsIsDisabled { } diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index 07f98b283b23..755eda40399b 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -12,6 +12,7 @@ TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TT , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) { PropagateInterval = forTests ? TDuration::Seconds(5) : TDuration::Minutes(3); + PropagateTimeout = forTests ? TDuration::Seconds(3) : TDuration::Minutes(2); auto seed = std::random_device{}(); RandomGenerator.seed(seed); @@ -124,11 +125,6 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectNode::TPtr& ev) { return; } - if (!IsPropagateInFlight) { - Schedule(PropagateInterval, new TEvPrivate::TEvPropagate()); - IsPropagateInFlight = true; - } - if (!record.NeedSchemeShardsSize()) { return; } @@ -149,7 +145,8 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) { SA_LOG_D("[" << TabletID() << "] EvRequestStats" << ", node id = " << nodeId - << ", schemeshard count = " << record.NeedSchemeShardsSize()); + << ", schemeshard count = " << record.NeedSchemeShardsSize() + << ", urgent = " << record.GetUrgent()); if (!EnableStatistics) { auto disabled = std::make_unique(); @@ -157,6 +154,19 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) { return; } + for (const auto& ssId : record.GetNeedSchemeShards()) { + RequestedSchemeShards.insert(ssId); + } + + if (record.GetUrgent()) { + PendingRequests.push(std::move(ev)); + if (!ProcessUrgentInFlight) { + Send(SelfId(), new TEvPrivate::TEvProcessUrgent()); + ProcessUrgentInFlight = true; + } + return; + } + std::vector ssIds; ssIds.reserve(record.NeedSchemeShardsSize()); for (const auto& ssId : record.GetNeedSchemeShards()) { @@ -206,6 +216,60 @@ void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagate::TPtr&) { Schedule(PropagateInterval, new TEvPrivate::TEvPropagate()); } +void TStatisticsAggregator::Handle(TEvStatistics::TEvPropagateStatisticsResponse::TPtr&) { + if (!PropagationInFlight) { + return; + } + if (LastSSIndex < PropagationSchemeShards.size()) { + LastSSIndex = PropagatePart(PropagationNodes, PropagationSchemeShards, LastSSIndex, true); + } else { + PropagationInFlight = false; + PropagationNodes.clear(); + PropagationSchemeShards.clear(); + LastSSIndex = 0; + } +} + +void TStatisticsAggregator::Handle(TEvPrivate::TEvProcessUrgent::TPtr&) { + SA_LOG_D("[" << TabletID() << "] EvProcessUrgent"); + + ProcessUrgentInFlight = false; + + if (PendingRequests.empty()) { + return; + } + + TEvStatistics::TEvRequestStats::TPtr ev = std::move(PendingRequests.front()); + PendingRequests.pop(); + + if (!PendingRequests.empty()) { + Send(SelfId(), new TEvPrivate::TEvProcessUrgent()); + ProcessUrgentInFlight = true; + } + + auto record = ev->Get()->Record; + const auto nodeId = record.GetNodeId(); + + std::vector ssIds; + ssIds.reserve(record.NeedSchemeShardsSize()); + for (const auto& ssId : record.GetNeedSchemeShards()) { + ssIds.push_back(ssId); + } + + SendStatisticsToNode(nodeId, ssIds); +} + +void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagateTimeout::TPtr&) { + SA_LOG_D("[" << TabletID() << "] EvPropagateTimeout"); + + if (PropagationInFlight) { + PropagationInFlight = false; + PropagationNodes.clear(); + PropagationSchemeShards.clear(); + LastSSIndex = 0; + } +} + void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector& ssIds) { if (FastCounter > 0) { --FastCounter; @@ -217,7 +281,7 @@ void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector nodeIds; nodeIds.push_back(nodeId); - PropagateStatisticsImpl(nodeIds, ssIds); + PropagatePart(nodeIds, ssIds, 0, false); } void TStatisticsAggregator::PropagateStatistics() { @@ -255,7 +319,13 @@ void TStatisticsAggregator::PropagateStatistics() { ssIds.push_back(ssId); } - PropagateStatisticsImpl(nodeIds, ssIds); + Schedule(PropagateTimeout, new TEvPrivate::TEvPropagateTimeout); + + PropagationInFlight = true; + PropagationNodes = std::move(nodeIds); + PropagationSchemeShards = std::move(ssIds); + + LastSSIndex = PropagatePart(PropagationNodes, PropagationSchemeShards, 0, true); } void TStatisticsAggregator::PropagateFastStatistics() { @@ -280,43 +350,39 @@ void TStatisticsAggregator::PropagateFastStatistics() { ssIds.push_back(ssId); } - PropagateStatisticsImpl(nodeIds, ssIds); + PropagatePart(nodeIds, ssIds, 0, false); } -void TStatisticsAggregator::PropagateStatisticsImpl( - const std::vector& nodeIds, const std::vector& ssIds) +size_t TStatisticsAggregator::PropagatePart(const std::vector& nodeIds, const std::vector& ssIds, + size_t lastSSIndex, bool useSizeLimit) { - if (nodeIds.empty() || ssIds.empty()) { - return; - } + auto propagate = std::make_unique(); + auto* record = propagate->MutableRecord(); TNodeId leadingNodeId = nodeIds[0]; + record->MutableNodeIds()->Reserve(nodeIds.size() - 1); + for (size_t i = 1; i < nodeIds.size(); ++i) { + record->AddNodeIds(nodeIds[i]); + } - for (size_t index = 0; index < ssIds.size(); ) { - auto propagate = std::make_unique(); - auto* record = propagate->MutableRecord(); - record->MutableNodeIds()->Reserve(nodeIds.size() - 1); - for (size_t i = 1; i < nodeIds.size(); ++i) { - record->AddNodeIds(nodeIds[i]); - } - for (size_t size = 0; index < ssIds.size(); ++index) { - auto ssId = ssIds[index]; - auto* entry = record->AddEntries(); - entry->SetSchemeShardId(ssId); - auto itStats = BaseStats.find(ssId); - if (itStats != BaseStats.end()) { - entry->SetStats(itStats->second); - size += itStats->second.size(); - } else { - entry->SetStats(TString()); // stats are not sent from SA yet - } - if (size >= StatsSizeLimitBytes) { - ++index; - break; - } + size_t sizeLimit = useSizeLimit ? StatsSizeLimitBytes : std::numeric_limits::max(); + size_t index = lastSSIndex; + for (size_t size = 0; index < ssIds.size() && size < sizeLimit; ++index) { + auto ssId = ssIds[index]; + auto* entry = record->AddEntries(); + entry->SetSchemeShardId(ssId); + auto itStats = BaseStats.find(ssId); + if (itStats != BaseStats.end()) { + entry->SetStats(itStats->second); + size += itStats->second.size(); + } else { + entry->SetStats(TString()); // stats are not sent from SS yet } - Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release()); } + + Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release()); + + return index; } void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) { diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h index d2ce8c500b10..609422f8f675 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.h +++ b/ydb/core/statistics/aggregator/aggregator_impl.h @@ -42,12 +42,16 @@ class TStatisticsAggregator : public TActor, public NTabl enum EEv { EvPropagate = EventSpaceBegin(TEvents::ES_PRIVATE), EvFastPropagateCheck, + EvProcessUrgent, + EvPropagateTimeout, EvEnd }; struct TEvPropagate : public TEventLocal {}; struct TEvFastPropagateCheck : public TEventLocal {}; + struct TEvProcessUrgent : public TEventLocal {}; + struct TEvPropagateTimeout : public TEventLocal {}; }; private: @@ -73,12 +77,16 @@ class TStatisticsAggregator : public TActor, public NTabl void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev); void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev); void Handle(TEvPrivate::TEvFastPropagateCheck::TPtr& ev); + void Handle(TEvStatistics::TEvPropagateStatisticsResponse::TPtr& ev); + void Handle(TEvPrivate::TEvProcessUrgent::TPtr& ev); + void Handle(TEvPrivate::TEvPropagateTimeout::TPtr& ev); void ProcessRequests(TNodeId nodeId, const std::vector& ssIds); void SendStatisticsToNode(TNodeId nodeId, const std::vector& ssIds); void PropagateStatistics(); void PropagateFastStatistics(); - void PropagateStatisticsImpl(const std::vector& nodeIds, const std::vector& ssIds); + size_t PropagatePart(const std::vector& nodeIds, const std::vector& ssIds, + size_t lastSSIndex, bool useSizeLimit); void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value); @@ -99,6 +107,9 @@ class TStatisticsAggregator : public TActor, public NTabl hFunc(TEvTabletPipe::TEvServerConnected, Handle); hFunc(TEvTabletPipe::TEvServerDisconnected, Handle); hFunc(TEvPrivate::TEvFastPropagateCheck, Handle); + hFunc(TEvStatistics::TEvPropagateStatisticsResponse, Handle); + hFunc(TEvPrivate::TEvProcessUrgent, Handle); + hFunc(TEvPrivate::TEvPropagateTimeout, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { LOG_CRIT(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, @@ -118,7 +129,8 @@ class TStatisticsAggregator : public TActor, public NTabl static constexpr size_t StatsSizeLimitBytes = 2 << 20; // limit for stats size in one message TDuration PropagateInterval; - bool IsPropagateInFlight = false; // is slow propagation started + TDuration PropagateTimeout; + static constexpr TDuration FastCheckInterval = TDuration::MilliSeconds(50); std::unordered_map BaseStats; // schemeshard id -> serialized stats for all paths @@ -134,6 +146,14 @@ class TStatisticsAggregator : public TActor, public NTabl bool FastCheckInFlight = false; std::unordered_set FastNodes; // nodes for fast propagation std::unordered_set FastSchemeShards; // schemeshards for fast propagation + + bool PropagationInFlight = false; + std::vector PropagationNodes; + std::vector PropagationSchemeShards; + size_t LastSSIndex = 0; + + std::queue PendingRequests; + bool ProcessUrgentInFlight = false; }; } // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp index 88f6428dedc6..9fa487dc8b7c 100644 --- a/ydb/core/statistics/aggregator/tx_init.cpp +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -89,6 +89,8 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { Self->EnableStatistics = AppData(ctx)->FeatureFlags.GetEnableStatistics(); Self->SubscribeForConfigChanges(ctx); + Self->Schedule(Self->PropagateInterval, new TEvPrivate::TEvPropagate()); + Self->Become(&TThis::StateWork); } }; diff --git a/ydb/core/statistics/events.h b/ydb/core/statistics/events.h index a24600338a9d..cf16952f95a4 100644 --- a/ydb/core/statistics/events.h +++ b/ydb/core/statistics/events.h @@ -55,6 +55,7 @@ struct TEvStatistics { EvRequestStats, EvPropagateStatistics, EvStatisticsIsDisabled, + EvPropagateStatisticsResponse, EvEnd }; @@ -115,6 +116,12 @@ struct TEvStatistics { NKikimrStat::TEvStatisticsIsDisabled, EvStatisticsIsDisabled> {}; + + struct TEvPropagateStatisticsResponse : public TEventPB< + TEvPropagateStatisticsResponse, + NKikimrStat::TEvPropagateStatisticsResponse, + EvPropagateStatisticsResponse> + {}; }; } // NStat diff --git a/ydb/core/statistics/stat_service.cpp b/ydb/core/statistics/stat_service.cpp index ef2864e92056..cb4e3009d28a 100644 --- a/ydb/core/statistics/stat_service.cpp +++ b/ydb/core/statistics/stat_service.cpp @@ -24,6 +24,19 @@ class TStatService : public TActorBootstrapped { return NKikimrServices::TActivity::STAT_SERVICE; } + struct TEvPrivate { + enum EEv { + EvRequestTimeout = EventSpaceBegin(TEvents::ES_PRIVATE), + + EvEnd + }; + + struct TEvRequestTimeout : public TEventLocal { + std::unordered_set NeedSchemeShards; + TActorId PipeClientId; + }; + }; + void Bootstrap() { EnableStatistics = AppData()->FeatureFlags.GetEnableStatistics(); @@ -41,9 +54,11 @@ class TStatService : public TActorBootstrapped { hFunc(TEvStatistics::TEvGetStatistics, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvStatistics::TEvPropagateStatistics, Handle); + IgnoreFunc(TEvStatistics::TEvPropagateStatisticsResponse); hFunc(TEvTabletPipe::TEvClientConnected, Handle); hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); hFunc(TEvStatistics::TEvStatisticsIsDisabled, Handle); + hFunc(TEvPrivate::TEvRequestTimeout, Handle); cFunc(TEvents::TEvPoison::EventType, PassAway); default: LOG_CRIT_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, @@ -160,6 +175,11 @@ class TStatService : public TActorBootstrapped { return; } + bool isNewSS = (NeedSchemeShards.find(request.SchemeShardId) == NeedSchemeShards.end()); + if (isNewSS) { + NeedSchemeShards.insert(request.SchemeShardId); + } + auto navigateDomainKey = [this] (TPathId domainKey) { using TNavigate = NSchemeCache::TSchemeCacheNavigate; auto navigate = std::make_unique(); @@ -202,11 +222,18 @@ class TStatService : public TActorBootstrapped { if (!SAPipeClientId) { ConnectToSA(); SyncNode(); - } else { + + } else if (isNewSS) { auto requestStats = std::make_unique(); requestStats->Record.SetNodeId(SelfId().NodeId()); + requestStats->Record.SetUrgent(false); requestStats->Record.AddNeedSchemeShards(request.SchemeShardId); NTabletPipe::SendData(SelfId(), SAPipeClientId, requestStats.release()); + + auto timeout = std::make_unique(); + timeout->NeedSchemeShards.insert(request.SchemeShardId); + timeout->PipeClientId = SAPipeClientId; + Schedule(RequestTimeout, timeout.release()); } } @@ -214,9 +241,12 @@ class TStatService : public TActorBootstrapped { LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, "EvPropagateStatistics, node id = " << SelfId().NodeId()); + Send(ev->Sender, new TEvStatistics::TEvPropagateStatisticsResponse); + auto* record = ev->Get()->MutableRecord(); for (const auto& entry : record->GetEntries()) { ui64 schemeShardId = entry.GetSchemeShardId(); + NeedSchemeShards.erase(schemeShardId); auto& statisticsState = Statistics[schemeShardId]; if (entry.GetStats().empty()) { @@ -319,6 +349,32 @@ class TStatService : public TActorBootstrapped { ReplyAllFailed(); } + void Handle(TEvPrivate::TEvRequestTimeout::TPtr& ev) { + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, + "EvRequestTimeout" + << ", pipe client id = " << ev->Get()->PipeClientId + << ", schemeshard count = " << ev->Get()->NeedSchemeShards.size()); + + if (SAPipeClientId != ev->Get()->PipeClientId) { + return; + } + auto requestStats = std::make_unique(); + bool hasNeedSchemeShards = false; + for (auto& ssId : ev->Get()->NeedSchemeShards) { + if (NeedSchemeShards.find(ssId) != NeedSchemeShards.end()) { + requestStats->Record.AddNeedSchemeShards(ssId); + hasNeedSchemeShards = true; + } + } + if (!hasNeedSchemeShards) { + return; + } + requestStats->Record.SetNodeId(SelfId().NodeId()); + requestStats->Record.SetUrgent(true); + + NTabletPipe::SendData(SelfId(), SAPipeClientId, requestStats.release()); + } + void ConnectToSA() { if (SAPipeClientId || !StatisticsAggregatorId) { return; @@ -338,23 +394,25 @@ class TStatService : public TActorBootstrapped { auto connect = std::make_unique(); auto& record = connect->Record; + auto timeout = std::make_unique(); + timeout->PipeClientId = SAPipeClientId; + record.SetNodeId(SelfId().NodeId()); for (const auto& [ssId, ssState] : Statistics) { auto* entry = record.AddHaveSchemeShards(); entry->SetSchemeShardId(ssId); entry->SetTimestamp(ssState.Timestamp); } - std::unordered_set ssIds; - for (const auto& [reqId, reqState] : InFlight) { - if (reqState.SchemeShardId != 0) { - ssIds.insert(reqState.SchemeShardId); - } - } - for (const auto& ssId : ssIds) { + for (const auto& ssId : NeedSchemeShards) { record.AddNeedSchemeShards(ssId); + timeout->NeedSchemeShards.insert(ssId); } NTabletPipe::SendData(SelfId(), SAPipeClientId, connect.release()); + if (!NeedSchemeShards.empty()) { + Schedule(RequestTimeout, timeout.release()); + } + LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, "SyncNode(), pipe client id = " << SAPipeClientId); } @@ -465,6 +523,8 @@ class TStatService : public TActorBootstrapped { std::unordered_map InFlight; // request id -> state ui64 NextRequestId = 1; + std::unordered_set NeedSchemeShards; + struct TStatEntry { ui64 RowCount = 0; ui64 BytesSize = 0; @@ -486,6 +546,8 @@ class TStatService : public TActorBootstrapped { RSA_FINISHED }; EResolveSAStage ResolveSAStage = RSA_INITIAL; + + static constexpr TDuration RequestTimeout = TDuration::MilliSeconds(100); }; THolder CreateStatService() {