diff --git a/ydb/core/tx/coordinator/coordinator_volatile_ut.cpp b/ydb/core/tx/coordinator/coordinator_volatile_ut.cpp index bc148f7d8ba2..8d75974813aa 100644 --- a/ydb/core/tx/coordinator/coordinator_volatile_ut.cpp +++ b/ydb/core/tx/coordinator/coordinator_volatile_ut.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -258,6 +259,104 @@ namespace NKikimr::NFlatTxCoordinator::NTest { observedSteps.clear(); } + Y_UNIT_TEST(MediatorReconnectPlanRace) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetNodeCount(1) + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + + auto &runtime = *server->GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TX_COORDINATOR, NActors::NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::BOOTSTRAPPER, NActors::NLog::PRI_DEBUG); + + auto sender = runtime.AllocateEdgeActor(); + ui64 coordinatorId = ChangeStateStorage(Coordinator, server->GetSettings().Domain); + ui64 mediatorId = ChangeStateStorage(Mediator, server->GetSettings().Domain); + ui64 tabletId = ChangeStateStorage(TTestTxConfig::TxTablet0, server->GetSettings().Domain); + + CreateTestBootstrapper(runtime, + CreateTestTabletInfo(tabletId, TTabletTypes::Dummy), + [](const TActorId& tablet, TTabletStorageInfo* info) { + return new TPlanTargetTablet(tablet, info); + }); + + { + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot, 1)); + runtime.DispatchEvents(options); + } + + TActorId mediatorQueue; + std::vector> mediatorQueueSteps; + auto blockMediatorQueueSteps = runtime.AddObserver([&](TEvMediatorQueueStep::TPtr& ev) { + mediatorQueue = ev->GetRecipientRewrite(); + mediatorQueueSteps.emplace_back(ev.Release()); + Cerr << "... blocked TEvMediatorQueueStep for " << mediatorQueue << Endl; + }); + + std::vector observedSteps; + auto stepsObserver = runtime.AddObserver([&](TEvTxProcessing::TEvPlanStep::TPtr& ev) { + auto* msg = ev->Get(); + observedSteps.push_back(msg->Record.GetStep()); + }); + + auto waitFor = [&](const auto& condition, const TString& description) { + for (int i = 0; i < 5 && !condition(); ++i) { + Cerr << "... waiting for " << description << Endl; + TDispatchOptions options; + options.CustomFinalCondition = [&]() { + return condition(); + }; + runtime.DispatchEvents(options); + } + UNIT_ASSERT_C(condition(), "... failed to wait for " << description); + }; + + ui64 txId = 12345678; + if (auto propose = std::make_unique(coordinatorId, txId, 0, Min(), Max())) { + auto* tx = propose->Record.MutableTransaction(); + // Not necessary, but we test volatile transactions here + tx->SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile); + auto* affected = tx->AddAffectedSet(); + affected->SetTabletId(tabletId); + affected->SetFlags(TEvTxProxy::TEvProposeTransaction::AffectedWrite); + + runtime.SendToPipe(coordinatorId, sender, propose.release()); + } + + waitFor([&]{ return mediatorQueueSteps.size() > 0; }, "TEvMediatorQueueStep"); + UNIT_ASSERT_VALUES_EQUAL(mediatorQueueSteps.size(), 1u); + + // We shouldn't see any steps yet + UNIT_ASSERT_VALUES_EQUAL(observedSteps.size(), 0u); + + auto injectMediatorQueueStep = runtime.AddObserver([&](TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { + if (ev->GetRecipientRewrite() == mediatorQueue) { + Cerr << "... found pipe disconnect at " << mediatorQueue << Endl; + // Stop blocking mediator queue steps + // This seems to be safe, since we remove someone else from std::list + blockMediatorQueueSteps.Remove(); + // Inject blocked mediator steps into queue mailbox, they will be handled after the disconnect + for (auto& ev : mediatorQueueSteps) { + runtime.Send(ev.release(), 0, true); + } + mediatorQueueSteps.clear(); + } + }); + + Cerr << "... rebooting mediator" << Endl; + RebootTablet(runtime, mediatorId, sender); + + waitFor([&]{ return mediatorQueueSteps.empty(); }, "injected mediator steps"); + + // We must observe the plan step soon + runtime.SimulateSleep(TDuration::Seconds(2)); + UNIT_ASSERT_VALUES_EQUAL(observedSteps.size(), 1u); + } + } // Y_UNIT_TEST_SUITE(CoordinatorVolatile) } // namespace NKikimr::NFlatTxCoordinator::NTest diff --git a/ydb/core/tx/coordinator/mediator_queue.cpp b/ydb/core/tx/coordinator/mediator_queue.cpp index c7a4eecbd255..8cb7557d80aa 100644 --- a/ydb/core/tx/coordinator/mediator_queue.cpp +++ b/ydb/core/tx/coordinator/mediator_queue.cpp @@ -284,6 +284,7 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrappedGetTypeRewrite()) { + HFunc(TEvMediatorQueueStep, Handle); HFunc(TEvTxProcessing::TEvPlanStepAck, Handle); HFunc(TEvTxCoordinator::TEvCoordinatorSyncResult, Handle); HFunc(TEvTabletPipe::TEvClientConnected, Handle); @@ -294,8 +295,8 @@ class TTxCoordinatorMediatorQueue : public TActorBootstrappedGetTypeRewrite()) { - HFunc(TEvTxProcessing::TEvPlanStepAck, Handle); HFunc(TEvMediatorQueueStep, Handle); + HFunc(TEvTxProcessing::TEvPlanStepAck, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); CFunc(TEvents::TSystem::PoisonPill, Die) }