Skip to content

Commit

Permalink
StartCollecting: handle race condition KIKIMR-19336
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Sep 16, 2023
1 parent b31c5e5 commit 5e9a163
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 10 deletions.
13 changes: 7 additions & 6 deletions ydb/core/cms/cms.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,10 @@ void TCms::EnqueueRequest(TAutoPtr<IEventHandle> ev, const TActorContext &ctx)

void TCms::StartCollecting()
{
Y_VERIFY(Queue.empty());
if (!Queue.empty()) {
return;
}

std::swap(NextQueue, Queue);

InfoCollectorStartTime = TActivationContext::Now();
Expand Down Expand Up @@ -1320,13 +1323,11 @@ void TCms::ProcessQueue()
Queue.pop();
}

// Process events received while collecting and processing queue
if (Queue.empty() && !NextQueue.empty()) {
StartCollecting();
}

if (!Queue.empty()) {
Send(SelfId(), new TEvPrivate::TEvProcessQueue);
} else if (!NextQueue.empty()) {
// Process events received while collecting and processing queue
StartCollecting();
}
}

Expand Down
142 changes: 138 additions & 4 deletions ydb/core/cms/cms_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@

#include <util/system/hostname.h>

namespace NKikimr {
namespace NCmsTest {
namespace NKikimr::NCmsTest {

using namespace NCms;
using namespace NNodeWhiteboard;
Expand Down Expand Up @@ -1508,6 +1507,141 @@ Y_UNIT_TEST_SUITE(TCmsTest) {
UNIT_ASSERT_VALUES_EQUAL(rec.status().code(), TStatus::ALLOW);
}
}

Y_UNIT_TEST(RacyStartCollecting)
{
TCmsTestEnv env(8);
env.CreateDefaultCmsPipe();

auto makeRequest = [&env]() {
auto ev = MakePermissionRequest("user", false, true, false,
MakeAction(TAction::RESTART_SERVICES, env.GetNodeId(0), 60000000, "storage")
);
ev->Record.SetDuration(60000000);
ev->Record.SetAvailabilityMode(MODE_MAX_AVAILABILITY);
env.SendToCms(ev.Release());
};

TActorId cmsActorId;
{
makeRequest();
auto ev = env.GrabEdgeEvent<TEvCms::TEvPermissionResponse>(env.GetSender());
cmsActorId = ev->Sender;
}

THolder<IEventHandle> delayedClusterInfo;
env.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TCms::TEvPrivate::EvClusterInfo) {
if (ev->Recipient == cmsActorId) {
delayedClusterInfo.Reset(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
}
return TTestActorRuntime::EEventAction::PROCESS;
});

makeRequest();
if (!delayedClusterInfo) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&delayedClusterInfo](IEventHandle&) {
return bool(delayedClusterInfo);
});
env.DispatchEvents(opts);
}

bool requestCaptured = false;
env.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvCms::EvPermissionRequest) {
requestCaptured = true;
}
return TTestActorRuntime::EEventAction::PROCESS;
});

makeRequest();
if (!requestCaptured) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&requestCaptured](IEventHandle&) {
return requestCaptured;
});
env.DispatchEvents(opts);
}

env.Send(delayedClusterInfo.Release(), 0, true);
bool processQueueCaptured = false;
bool clusterInfoCaptured = false;
env.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TCms::TEvPrivate::EvProcessQueue) {
if (ev->Sender == cmsActorId || ev->Recipient == cmsActorId) {
processQueueCaptured = true;
}
} else if (ev->GetTypeRewrite() == TCms::TEvPrivate::EvClusterInfo) {
if (ev->Recipient == cmsActorId) {
clusterInfoCaptured = true;
if (processQueueCaptured) {
delayedClusterInfo.Reset(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
}
}
return TTestActorRuntime::EEventAction::PROCESS;
});

if (!delayedClusterInfo || !clusterInfoCaptured) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&delayedClusterInfo, &clusterInfoCaptured](IEventHandle&) {
return delayedClusterInfo || clusterInfoCaptured;
});
env.DispatchEvents(opts);
}

requestCaptured = false;
THolder<IEventHandle> delayedStartCollecting;
env.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TEvCms::EvPermissionRequest) {
requestCaptured = true;
} else if (ev->GetTypeRewrite() == TCms::TEvPrivate::EvStartCollecting) {
if (ev->Sender == cmsActorId || ev->Recipient == cmsActorId) {
delayedStartCollecting.Reset(ev.Release());
return TTestActorRuntime::EEventAction::DROP;
}
}
return TTestActorRuntime::EEventAction::PROCESS;
});

makeRequest();
if (!requestCaptured || !delayedStartCollecting) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&requestCaptured, &delayedStartCollecting](IEventHandle&) {
return requestCaptured && delayedStartCollecting;
});
env.DispatchEvents(opts);
}

bool startCollecting = false;
env.SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
if (ev->GetTypeRewrite() == TCms::TEvPrivate::EvStartCollecting) {
if (ev->Sender == cmsActorId || ev->Recipient == cmsActorId) {
startCollecting = true;
}
}
return TTestActorRuntime::EEventAction::PROCESS;
});

if (delayedClusterInfo) {
env.Send(delayedClusterInfo.Release(), 0, true);
}
env.Send(delayedStartCollecting.Release(), 0, true);

if (!startCollecting) {
TDispatchOptions opts;
opts.FinalEvents.emplace_back([&startCollecting](IEventHandle&) {
return startCollecting;
});
env.DispatchEvents(opts);
}

env.DestroyDefaultCmsPipe();
}
}

}
} // NCmsTest
} // NKikimr

0 comments on commit 5e9a163

Please sign in to comment.