From 040f72fa701ee29e3301c9e55a9d92e3f6ad5251 Mon Sep 17 00:00:00 2001 From: azevaykin <145343289+azevaykin@users.noreply.github.com> Date: Thu, 29 Aug 2024 05:33:40 +0300 Subject: [PATCH] Statistics: several analyzes inflight (#8394) --- .../statistics/aggregator/aggregator_impl.cpp | 1 + .../aggregator/ut/ut_analyze_columnshard.cpp | 44 ++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index 246f73ffb3dc..b49ceb245893 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -672,6 +672,7 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) { } ForceTraversalOperationId = operation.OperationId; + break; } if (!pathId) { diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp index 4931963f4ab6..ac5fd563fdd8 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp @@ -73,12 +73,21 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { auto sender = runtime.AllocateEdgeActor(); const TString operationId = "operationId"; + TBlockEvents block(runtime); + + auto tabletPipe = runtime.ConnectToPipe(tableInfo.SaTabletId, sender, 0, {}); + auto analyzeRequest1 = MakeAnalyzeRequest({tableInfo.PathId}, operationId); - runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest1.release()); + runtime.SendToPipe(tabletPipe, sender, analyzeRequest1.release()); + + runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size(); }); auto analyzeRequest2 = MakeAnalyzeRequest({tableInfo.PathId}, operationId); - runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest2.release()); + runtime.SendToPipe(tabletPipe, sender, analyzeRequest2.release()); + block.Unblock(); + block.Stop(); + auto response1 = runtime.GrabEdgeEventRethrow(sender); UNIT_ASSERT(response1); UNIT_ASSERT_VALUES_EQUAL(response1->Get()->Record.GetOperationId(), operationId); @@ -87,6 +96,37 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { UNIT_ASSERT(!response2); } + Y_UNIT_TEST(AnalyzeMultiOperationId) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + auto sender = runtime.AllocateEdgeActor(); + + auto GetOperationId = [] (size_t i) { return TStringBuilder() << "operationId" << i; }; + + TBlockEvents block(runtime); + + const size_t numEvents = 10; + + auto tabletPipe = runtime.ConnectToPipe(tableInfo.SaTabletId, sender, 0, {}); + + for (size_t i = 0; i < numEvents; ++i) { + auto analyzeRequest = MakeAnalyzeRequest({tableInfo.PathId}, GetOperationId(i)); + runtime.SendToPipe(tabletPipe, sender, analyzeRequest.release()); + } + + runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size() == numEvents; }); + + block.Unblock(); + block.Stop(); + + for (size_t i = 0; i < numEvents; ++i) { + auto response = runtime.GrabEdgeEventRethrow(sender); + UNIT_ASSERT(response); + UNIT_ASSERT_VALUES_EQUAL(response->Get()->Record.GetOperationId(), GetOperationId(i)); + } + } + Y_UNIT_TEST(AnalyzeRebootSaBeforeAnalyzeTableResponse) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime();