Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

control inflight pings in hive #6916

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions ydb/core/mind/hive/hive_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,7 @@ void THive::Handle(TEvPrivate::TEvBootTablets::TPtr&) {
for (auto* node : unimportantNodes) {
node->Ping();
}
ProcessNodePingQueue();
TVector<TTabletId> tabletsToReleaseFromParent;
TSideEffects sideEffects;
sideEffects.Reset(SelfId());
Expand Down Expand Up @@ -687,11 +688,13 @@ void THive::Cleanup() {

void THive::Handle(TEvLocal::TEvStatus::TPtr& ev) {
BLOG_D("Handle TEvLocal::TEvStatus for Node " << ev->Sender.NodeId() << ": " << ev->Get()->Record.ShortDebugString());
RemoveFromPingInProgress(ev->Sender.NodeId());
Execute(CreateStatus(ev->Sender, ev->Get()->Record));
}

void THive::Handle(TEvLocal::TEvSyncTablets::TPtr& ev) {
BLOG_D("THive::Handle::TEvSyncTablets");
RemoveFromPingInProgress(ev->Sender.NodeId());
Execute(CreateSyncTablets(ev->Sender, ev->Get()->Record));
}

Expand Down Expand Up @@ -745,6 +748,7 @@ void THive::Handle(TEvInterconnect::TEvNodeConnected::TPtr &ev) {
void THive::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
TNodeId nodeId = ev->Get()->NodeId;
BLOG_W("Handle TEvInterconnect::TEvNodeDisconnected, NodeId " << nodeId);
RemoveFromPingInProgress(nodeId);
if (ConnectedNodes.erase(nodeId)) {
UpdateCounterNodesConnected(-1);
}
Expand Down Expand Up @@ -917,6 +921,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
case TEvLocal::EvPing: {
TNodeId nodeId = ev->Cookie;
TNodeInfo* node = FindNode(nodeId);
NodePingsInProgress.erase(nodeId);
if (node != nullptr && ev->Sender == node->Local) {
if (node->IsDisconnecting()) {
// ping continiousily until we fully disconnected from the node
Expand All @@ -925,6 +930,7 @@ void THive::Handle(TEvents::TEvUndelivered::TPtr &ev) {
KillNode(node->Id, node->Local);
}
}
ProcessNodePingQueue();
break;
}
};
Expand Down Expand Up @@ -1696,6 +1702,13 @@ void THive::UpdateCounterTabletsStarting(i64 tabletsStartingDiff) {
}
}

void THive::UpdateCounterPingQueueSize() {
if (TabletCounters != nullptr) {
auto& counter = TabletCounters->Simple()[NHive::COUNTER_PINGQUEUE_SIZE];
counter.Set(NodePingQueue.size());
}
}

void THive::RecordTabletMove(const TTabletMoveInfo& moveInfo) {
TabletMoveHistory.PushBack(moveInfo);
TabletCounters->Cumulative()[NHive::COUNTER_TABLETS_MOVED].Increment(1);
Expand Down Expand Up @@ -2672,6 +2685,25 @@ void THive::ExecuteStartTablet(TFullTabletId tabletId, const TActorId& local, ui
Execute(CreateStartTablet(tabletId, local, cookie, external));
}

void THive::QueuePing(const TActorId& local) {
NodePingQueue.push(local);
}

void THive::ProcessNodePingQueue() {
while (!NodePingQueue.empty() && NodePingsInProgress.size() < GetMaxPingsInFlight()) {
TActorId local = NodePingQueue.front();
TNodeId node = local.NodeId();
NodePingQueue.pop();
NodePingsInProgress.insert(node);
SendPing(local, node);
}
}

void THive::RemoveFromPingInProgress(TNodeId node) {
NodePingsInProgress.erase(node);
ProcessNodePingQueue();
}

void THive::SendPing(const TActorId& local, TNodeId id) {
Send(local,
new TEvLocal::TEvPing(HiveId,
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/mind/hive/hive_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ class THive : public TActor<THive>, public TTabletExecutedFlat, public THiveShar
TEventPriorityQueue<THive> EventQueue{*this};
ui64 OperationsLogIndex = 0;
std::vector<TActorId> ActorsWaitingToMoveTablets;
std::queue<TActorId> NodePingQueue;
std::unordered_set<TNodeId> NodePingsInProgress;

struct TPendingCreateTablet {
NKikimrHive::TEvCreateTablet CreateTablet;
Expand Down Expand Up @@ -650,6 +652,7 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
void UpdateCounterEventQueueSize(i64 eventQueueSizeDiff);
void UpdateCounterNodesConnected(i64 nodesConnectedDiff);
void UpdateCounterTabletsStarting(i64 tabletsStartingDiff);
void UpdateCounterPingQueueSize();
void RecordTabletMove(const TTabletMoveInfo& info);
bool DomainHasNodes(const TSubDomainKey &domainKey) const;
void ProcessBootQueue();
Expand Down Expand Up @@ -678,7 +681,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
void UpdateRegisteredDataCenters();
void AddRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
void RemoveRegisteredDataCentersNode(TDataCenterId dataCenterId, TNodeId nodeId);
void QueuePing(const TActorId& local);
void SendPing(const TActorId& local, TNodeId id);
void RemoveFromPingInProgress(TNodeId node);
void ProcessNodePingQueue();
void SendReconnect(const TActorId& local);
static THolder<TGroupFilter> BuildGroupParametersForChannel(const TLeaderTabletInfo& tablet, ui32 channelId);
void KickTablet(const TTabletInfo& tablet);
Expand Down Expand Up @@ -943,6 +949,10 @@ TTabletInfo* FindTabletEvenInDeleting(TTabletId tabletId, TFollowerId followerId
return CurrentConfig.GetLessSystemTabletsMoves();
}

ui64 GetMaxPingsInFlight() const {
return CurrentConfig.GetMaxPingsInFlight();
}

static void ActualizeRestartStatistics(google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);
static ui64 GetRestartsPerPeriod(const google::protobuf::RepeatedField<google::protobuf::uint64>& restartTimestamps, ui64 barrier);
static bool IsSystemTablet(TTabletTypes::EType type);
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/mind/hive/node_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ void TNodeInfo::DeregisterInDomains() {
void TNodeInfo::Ping() {
Y_ABORT_UNLESS((bool)Local);
BLOG_D("Node(" << Id << ") Ping(" << Local << ")");
Hive.SendPing(Local, Id);
Hive.QueuePing(Local);
}

void TNodeInfo::SendReconnect(const TActorId& local) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/mind/hive/tx__register_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ class TTxRegisterNode : public TTransactionBase<THive> {
BLOG_D("THive::TTxRegisterNode(" << Local.NodeId() << ")::Complete");
TNodeInfo* node = Self->FindNode(Local.NodeId());
if (node != nullptr && node->Local) { // we send ping on every RegisterNode because we want to re-sync tablets upon every reconnection
Self->NodePingsInProgress.erase(node->Id);
node->Ping();
Self->ProcessNodePingQueue();
}
}
};
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1472,6 +1472,7 @@ message THiveConfig {
optional bool EnableDestroyOperations = 74 [default = false];
optional double NodeUsageRangeToKick = 75 [default = 0.2];
optional bool LessSystemTabletsMoves = 77 [default = true];
optional uint64 MaxPingsInFlight = 78 [default = 1000];
}

message TBlobCacheConfig {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_hive.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ enum ESimpleCounters {
COUNTER_WORST_OBJECT_VARIANCE = 20 [(CounterOpts) = {Name: "WorstObjectVariance"}];
COUNTER_STORAGE_SCATTER = 21 [(CounterOpts) = {Name: "StorageScatter"}];
COUNTER_TABLETS_STARTING = 22 [(CounterOpts) = {Name: "TabletsStarting"}];
COUNTER_PINGQUEUE_SIZE = 23 [(CounterOpts) = {Name: "PingQueueSize"}];
}

enum ECumulativeCounters {
Expand Down
Loading