Skip to content

Commit

Permalink
Statistics: several analyzes inflight (ydb-platform#8394)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored and stanislav-shchetinin committed Aug 30, 2024
1 parent ba92aa3 commit 040f72f
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
1 change: 1 addition & 0 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
}

ForceTraversalOperationId = operation.OperationId;
break;
}

if (!pathId) {
Expand Down
44 changes: 42 additions & 2 deletions ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,21 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
auto sender = runtime.AllocateEdgeActor();
const TString operationId = "operationId";

TBlockEvents<TEvStatistics::TEvAnalyzeTableResponse> block(runtime);

auto tabletPipe = runtime.ConnectToPipe(tableInfo.SaTabletId, sender, 0, {});

auto analyzeRequest1 = MakeAnalyzeRequest({tableInfo.PathId}, operationId);
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest1.release());
runtime.SendToPipe(tabletPipe, sender, analyzeRequest1.release());

runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size(); });

auto analyzeRequest2 = MakeAnalyzeRequest({tableInfo.PathId}, operationId);
runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest2.release());
runtime.SendToPipe(tabletPipe, sender, analyzeRequest2.release());

block.Unblock();
block.Stop();

auto response1 = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
UNIT_ASSERT(response1);
UNIT_ASSERT_VALUES_EQUAL(response1->Get()->Record.GetOperationId(), operationId);
Expand All @@ -87,6 +96,37 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
UNIT_ASSERT(!response2);
}

Y_UNIT_TEST(AnalyzeMultiOperationId) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0];
auto sender = runtime.AllocateEdgeActor();

auto GetOperationId = [] (size_t i) { return TStringBuilder() << "operationId" << i; };

TBlockEvents<TEvStatistics::TEvAnalyzeTableResponse> block(runtime);

const size_t numEvents = 10;

auto tabletPipe = runtime.ConnectToPipe(tableInfo.SaTabletId, sender, 0, {});

for (size_t i = 0; i < numEvents; ++i) {
auto analyzeRequest = MakeAnalyzeRequest({tableInfo.PathId}, GetOperationId(i));
runtime.SendToPipe(tabletPipe, sender, analyzeRequest.release());
}

runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size() == numEvents; });

block.Unblock();
block.Stop();

for (size_t i = 0; i < numEvents; ++i) {
auto response = runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
UNIT_ASSERT(response);
UNIT_ASSERT_VALUES_EQUAL(response->Get()->Record.GetOperationId(), GetOperationId(i));
}
}

Y_UNIT_TEST(AnalyzeRebootSaBeforeAnalyzeTableResponse) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
Expand Down

0 comments on commit 040f72f

Please sign in to comment.