From 8b9d1a8a8daa8488ef32cb2dabcc50f32f86b28d Mon Sep 17 00:00:00 2001 From: azevaykin Date: Wed, 23 Oct 2024 08:54:07 +0000 Subject: [PATCH 1/2] test --- ydb/core/sys_view/ut_kqp.cpp | 52 ++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index 93a08cfd1f67..97f39e229d67 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -1371,6 +1371,58 @@ Y_UNIT_TEST_SUITE(SystemView) { } } + Y_UNIT_TEST(TopPartitionsFollowers) { + NDataShard::gDbStatsReportInterval = TDuration::Seconds(0); + + auto nowUs = TInstant::Now().MicroSeconds(); + + TTestEnv env(1, 4, 0, 0, true); + CreateTenantsAndTables(env); + + TTableClient client(env.GetDriver()); + size_t rowCount = 0; + for (size_t iter = 0; iter < 30 && !rowCount; ++iter) { + rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); + if (!rowCount) { + Sleep(TDuration::Seconds(1)); + } + } + ui64 intervalEnd = GetIntervalEnd(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); + + TStringBuilder query; + query << R"( + SELECT + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount + FROM `/Root/Tenant1/.sys/top_partitions_one_minute`)" + << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp)"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto ysonString = NKqp::StreamResultToYson(it); + + TYsonFieldChecker check(ysonString, 11); + check.Uint64(intervalEnd); // IntervalEnd + check.Uint64(1); // Rank + check.Uint64Greater(0); // TabletId + check.String("/Root/Tenant1/Table1"); // Path + check.Uint64GreaterOrEquals(nowUs); // PeakTime + check.DoubleGreaterOrEquals(0.); // CPUCores + check.Uint64Greater(0); // NodeId + check.Uint64Greater(0); // DataSize + check.Uint64(3); // RowCount + check.Uint64(0); // IndexSize + check.Uint64(0); // InFlightTxCount + } + Y_UNIT_TEST(Describe) { TTestEnv env; CreateRootTable(env); From 829f7baa57be7882eec3f0e9eb23ad3c0e5c4ea4 Mon Sep 17 00:00:00 2001 From: azevaykin Date: Thu, 24 Oct 2024 14:25:51 +0000 Subject: [PATCH 2/2] Follower stats in .sys/top_partitions_* --- ydb/core/sys_view/common/schema.h | 6 +- .../partition_stats/partition_stats.cpp | 51 +++-- .../partition_stats/top_partitions.cpp | 3 + .../sys_view/processor/processor_impl.cpp | 2 +- ydb/core/sys_view/processor/schema.h | 5 +- .../sys_view/processor/tx_top_partitions.cpp | 42 ++-- ydb/core/sys_view/ut_common.cpp | 14 +- ydb/core/sys_view/ut_common.h | 10 +- ydb/core/sys_view/ut_counters.cpp | 2 +- ydb/core/sys_view/ut_kqp.cpp | 214 ++++++++++++++---- ydb/core/sys_view/ut_labeled.cpp | 10 +- 11 files changed, 264 insertions(+), 95 deletions(-) diff --git a/ydb/core/sys_view/common/schema.h b/ydb/core/sys_view/common/schema.h index ce14d46c4698..c9f598b2c8b5 100644 --- a/ydb/core/sys_view/common/schema.h +++ b/ydb/core/sys_view/common/schema.h @@ -470,8 +470,9 @@ struct Schema : NIceDb::Schema { struct RowCount : Column<9, NScheme::NTypeIds::Uint64> {}; struct IndexSize : Column<10, NScheme::NTypeIds::Uint64> {}; struct InFlightTxCount : Column<11, NScheme::NTypeIds::Uint32> {}; + struct FollowerId : Column<12, NScheme::NTypeIds::Uint32> {}; - using TKey = TableKey; + using TKey = TableKey; using TColumns = TableColumns< IntervalEnd, Rank, @@ -483,7 +484,8 @@ struct Schema : NIceDb::Schema { DataSize, RowCount, IndexSize, - InFlightTxCount>; + InFlightTxCount, + FollowerId>; }; struct QuerySessions : Table<13> { diff --git a/ydb/core/sys_view/partition_stats/partition_stats.cpp b/ydb/core/sys_view/partition_stats/partition_stats.cpp index 8770055157c8..64ecd7e8ea40 100644 --- a/ydb/core/sys_view/partition_stats/partition_stats.cpp +++ b/ydb/core/sys_view/partition_stats/partition_stats.cpp @@ -83,7 +83,7 @@ class TPartitionStatsCollector : public TActorBootstrapped newPartitions; - std::unordered_set overloaded; + std::set overloaded; for (auto shardIdx : ev->Get()->ShardIndices) { auto old = oldPartitions.find(shardIdx); @@ -92,7 +92,7 @@ class TPartitionStatsCollector : public TActorBootstrappedsecond.FollowerStats) { if (IsPartitionOverloaded(followerStat.second)) - overloaded.insert(shardIdx); + overloaded.insert({shardIdx, followerStat.first}); } } } @@ -148,12 +148,13 @@ class TPartitionStatsCollector : public TActorBootstrappedsecond.erase(shardIdx); + overloadedFound->second.erase(overloadedFollower); if (overloadedFound->second.empty()) { tables.Overloaded.erase(pathId); } @@ -373,15 +374,16 @@ class TPartitionStatsCollector : public TActorBootstrapped sorted; - for (const auto& [pathId, shardIndices] : domainTables.Overloaded) { - for (const auto& shardIdx : shardIndices) { + for (const auto& [pathId, overloadedFollowers] : domainTables.Overloaded) { + for (const TOverloadedFollower& overloadedFollower : overloadedFollowers) { const auto& table = domainTables.Stats[pathId]; - const auto& partition = table.Partitions.at(shardIdx).FollowerStats.at(0); - sorted.emplace_back(TPartition{pathId, shardIdx, partition.GetCPUCores()}); + const auto& partition = table.Partitions.at(overloadedFollower.ShardIdx).FollowerStats.at(overloadedFollower.FollowerId); + sorted.emplace_back(TPartition{pathId, overloadedFollower.ShardIdx, overloadedFollower.FollowerId, partition.GetCPUCores()}); } } @@ -395,7 +397,9 @@ class TPartitionStatsCollector : public TActorBootstrapped(); for (const auto& entry : sorted) { const auto& table = domainTables.Stats[entry.PathId]; - const auto& partition = table.Partitions.at(entry.ShardIdx).FollowerStats.at(0); + const auto& followerStats = table.Partitions.at(entry.ShardIdx).FollowerStats; + const auto& partition = followerStats.at(entry.FollowerId); + const auto& leaderPartition = followerStats.at(0); auto* result = sendEvent->Record.AddPartitions(); result->SetTabletId(partition.GetTabletId()); @@ -403,10 +407,11 @@ class TPartitionStatsCollector : public TActorBootstrappedSetPeakTimeUs(nowUs); result->SetCPUCores(partition.GetCPUCores()); result->SetNodeId(partition.GetNodeId()); - result->SetDataSize(partition.GetDataSize()); - result->SetRowCount(partition.GetRowCount()); - result->SetIndexSize(partition.GetIndexSize()); + result->SetDataSize(leaderPartition.GetDataSize()); + result->SetRowCount(leaderPartition.GetRowCount()); + result->SetIndexSize(leaderPartition.GetIndexSize()); result->SetInFlightTxCount(partition.GetInFlightTxCount()); + result->SetFollowerId(partition.GetFollowerId()); if (++count == TOP_PARTITIONS_COUNT) { break; @@ -438,8 +443,7 @@ class TPartitionStatsCollector : public TActorBootstrapped= OverloadedPartitionBound - && !stats.GetFollowerId(); + return stats.GetCPUCores() >= OverloadedPartitionBound; } private: @@ -452,8 +456,10 @@ class TPartitionStatsCollector : public TActorBootstrapped FollowerStats; + std::unordered_map FollowerStats; }; struct TTableStats { @@ -462,9 +468,22 @@ class TPartitionStatsCollector : public TActorBootstrapped Stats; - std::unordered_map> Overloaded; + std::unordered_map> Overloaded; }; std::unordered_map DomainTables; diff --git a/ydb/core/sys_view/partition_stats/top_partitions.cpp b/ydb/core/sys_view/partition_stats/top_partitions.cpp index ca80d79231b7..b1705a9c3e65 100644 --- a/ydb/core/sys_view/partition_stats/top_partitions.cpp +++ b/ydb/core/sys_view/partition_stats/top_partitions.cpp @@ -58,6 +58,9 @@ struct TTopPartitionsExtractorMap : insert({S::InFlightTxCount::ColumnId, [] (const E& entry) { return TCell::Make(entry.GetInfo().GetInFlightTxCount()); }}); + insert({S::FollowerId::ColumnId, [] (const E& entry) { + return TCell::Make(entry.GetInfo().GetFollowerId()); + }}); } }; diff --git a/ydb/core/sys_view/processor/processor_impl.cpp b/ydb/core/sys_view/processor/processor_impl.cpp index e46d4fc56558..a60da604842a 100644 --- a/ydb/core/sys_view/processor/processor_impl.cpp +++ b/ydb/core/sys_view/processor/processor_impl.cpp @@ -283,7 +283,7 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) { auto clearPartitionTop = [&] (NKikimrSysView::EStatsType type, TPartitionTop& top) { for (const auto& partition : top) { - db.Table().Key((ui32)type, partition->GetTabletId()).Delete(); + db.Table().Key((ui32)type, partition->GetTabletId(), partition->GetFollowerId()).Delete(); } top.clear(); }; diff --git a/ydb/core/sys_view/processor/schema.h b/ydb/core/sys_view/processor/schema.h index e1684b3aaa79..23a1055e71a0 100644 --- a/ydb/core/sys_view/processor/schema.h +++ b/ydb/core/sys_view/processor/schema.h @@ -90,9 +90,10 @@ struct TProcessorSchema : NIceDb::Schema { }; struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {}; struct Data : Column<3, NScheme::NTypeIds::String> {}; + struct FollowerId : Column<4, NScheme::NTypeIds::Uint32> {}; - using TKey = TableKey; - using TColumns = TableColumns; + using TKey = TableKey; + using TColumns = TableColumns; }; #define RESULT_PARTITION_TABLE(TableName, TableID) \ diff --git a/ydb/core/sys_view/processor/tx_top_partitions.cpp b/ydb/core/sys_view/processor/tx_top_partitions.cpp index 2048e40e8372..205a0466b526 100644 --- a/ydb/core/sys_view/processor/tx_top_partitions.cpp +++ b/ydb/core/sys_view/processor/tx_top_partitions.cpp @@ -15,15 +15,18 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { void ProcessTop(NIceDb::TNiceDb& db, NKikimrSysView::EStatsType statsType, TPartitionTop& top) { + using TPartitionTopKey = std::pair; + TPartitionTop result; result.reserve(TOP_PARTITIONS_COUNT); - std::unordered_set seen; + std::unordered_set seen; size_t index = 0; auto topIt = top.begin(); auto copyNewPartition = [&] () { const auto& newPartition = Record.GetPartitions(index); - auto tabletId = newPartition.GetTabletId(); + const ui64 tabletId = newPartition.GetTabletId(); + const ui32 followerId = newPartition.GetFollowerId(); TString data; Y_PROTOBUF_SUPPRESS_NODISCARD newPartition.SerializeToString(&data); @@ -32,10 +35,10 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { partition->CopyFrom(newPartition); result.emplace_back(std::move(partition)); - db.Table().Key((ui32)statsType, tabletId).Update( + db.Table().Key((ui32)statsType, tabletId, followerId).Update( NIceDb::TUpdate(data)); - seen.insert(tabletId); + seen.insert({tabletId, followerId}); ++index; }; @@ -44,32 +47,36 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { if (index == Record.PartitionsSize()) { break; } - auto tabletId = Record.GetPartitions(index).GetTabletId(); - if (seen.find(tabletId) != seen.end()) { + const auto& partition = Record.GetPartitions(index); + const ui64 tabletId = partition.GetTabletId(); + const ui32 followerId = partition.GetFollowerId(); + if (seen.contains({tabletId, followerId})) { ++index; continue; } copyNewPartition(); } else { - auto topTabletId = (*topIt)->GetTabletId(); - if (seen.find(topTabletId) != seen.end()) { + const ui64 topTabletId = (*topIt)->GetTabletId(); + const ui32 topFollowerId = (*topIt)->GetFollowerId(); + if (seen.contains({topTabletId, topFollowerId})) { ++topIt; continue; } if (index == Record.PartitionsSize()) { result.emplace_back(std::move(*topIt++)); - seen.insert(topTabletId); + seen.insert({topTabletId, topFollowerId}); continue; } - auto& newPartition = Record.GetPartitions(index); - auto tabletId = newPartition.GetTabletId(); - if (seen.find(tabletId) != seen.end()) { + const auto& newPartition = Record.GetPartitions(index); + const ui64 tabletId = newPartition.GetTabletId(); + const ui32 followerId = newPartition.GetFollowerId(); + if (seen.contains({tabletId, followerId})) { ++index; continue; } if ((*topIt)->GetCPUCores() >= newPartition.GetCPUCores()) { result.emplace_back(std::move(*topIt++)); - seen.insert(topTabletId); + seen.insert({topTabletId, topFollowerId}); } else { copyNewPartition(); } @@ -77,11 +84,12 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { } for (; topIt != top.end(); ++topIt) { - auto topTabletId = (*topIt)->GetTabletId(); - if (seen.find(topTabletId) != seen.end()) { + const ui64 topTabletId = (*topIt)->GetTabletId(); + const ui64 topFollowerId = (*topIt)->GetFollowerId(); + if (seen.contains({topTabletId, topFollowerId})) { continue; } - db.Table().Key((ui32)statsType, topTabletId).Delete(); + db.Table().Key((ui32)statsType, topTabletId, topFollowerId).Delete(); } top.swap(result); @@ -108,6 +116,8 @@ void TSysViewProcessor::Handle(TEvSysView::TEvSendTopPartitions::TPtr& ev) { auto timeUs = record.GetTimeUs(); auto partitionIntervalEnd = IntervalEnd + TotalInterval; + SVLOG_T("TEvSysView::TEvSendTopPartitions: " << " record " << record.ShortDebugString()); + if (timeUs < IntervalEnd.MicroSeconds() || timeUs >= partitionIntervalEnd.MicroSeconds()) { SVLOG_W("[" << TabletID() << "] TEvSendTopPartitions, time mismath: " << ", partition interval end# " << partitionIntervalEnd diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp index 10fd9f068c3e..e88503fcc282 100644 --- a/ydb/core/sys_view/ut_common.cpp +++ b/ydb/core/sys_view/ut_common.cpp @@ -26,7 +26,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString return subdomain; } -TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 pqTabletsN, bool enableSVP) { +TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, const TTestEnvSettings& settings) { auto mbusPort = PortManager.GetPort(); auto grpcPort = PortManager.GetPort(); @@ -44,16 +44,18 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 NKikimrConfig::TFeatureFlags featureFlags; featureFlags.SetEnableBackgroundCompaction(false); featureFlags.SetEnableResourcePools(true); + featureFlags.SetEnableFollowerStats(true); Settings->SetFeatureFlags(featureFlags); - Settings->SetEnablePersistentQueryStats(enableSVP); - Settings->SetEnableDbCounters(enableSVP); + Settings->SetEnablePersistentQueryStats(settings.EnableSVP); + Settings->SetEnableDbCounters(settings.EnableSVP); + Settings->SetEnableForceFollowers(settings.EnableForceFollowers); NKikimrConfig::TAppConfig appConfig; *appConfig.MutableFeatureFlags() = Settings->FeatureFlags; Settings->SetAppConfig(appConfig); - for (ui32 i : xrange(storagePools)) { + for (ui32 i : xrange(settings.StoragePools)) { TString poolName = Sprintf("test%d", i); Settings->AddStoragePool(poolName, TString("/Root:") + poolName, 2); } @@ -74,9 +76,9 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 Client->InitRootScheme("Root"); - if (pqTabletsN) { + if (settings.PqTabletsN) { NKikimr::NPQ::FillPQConfig(Settings->PQConfig, "/Root/PQ", true); - PqTabletIds = Server->StartPQTablets(pqTabletsN); + PqTabletIds = Server->StartPQTablets(settings.PqTabletsN); } Endpoint = "localhost:" + ToString(grpcPort); diff --git a/ydb/core/sys_view/ut_common.h b/ydb/core/sys_view/ut_common.h index 1ae840655acf..3587eaca66b8 100644 --- a/ydb/core/sys_view/ut_common.h +++ b/ydb/core/sys_view/ut_common.h @@ -16,14 +16,20 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings( NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings( const TString &name, const TStoragePools &pools = {}); +struct TTestEnvSettings { + ui32 StoragePools = 0; + ui32 PqTabletsN = 0; + bool EnableSVP = false; + bool EnableForceFollowers = false; +}; + class TTestEnv { public: class TDisableSourcesTag {}; static TDisableSourcesTag DisableSourcesTag; public: - TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, ui32 storagePools = 0, - ui32 pqTabletsN = 0, bool enableSVP = false); + TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, const TTestEnvSettings& settings = {}); ~TTestEnv(); diff --git a/ydb/core/sys_view/ut_counters.cpp b/ydb/core/sys_view/ut_counters.cpp index 66623084656e..c9e05515563d 100644 --- a/ydb/core/sys_view/ut_counters.cpp +++ b/ydb/core/sys_view/ut_counters.cpp @@ -75,7 +75,7 @@ void CreateDatabasesAndTables(TTestEnv& env) { Y_UNIT_TEST_SUITE(DbCounters) { Y_UNIT_TEST(TabletsSimple) { - TTestEnv env(1, 2, 0, 0, true); + TTestEnv env(1, 2, {.EnableSVP = true}); CreateDatabasesAndTables(env); for (size_t iter = 0; iter < 30; ++iter) { diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index 97f39e229d67..f6ba05bdc1aa 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -1108,7 +1108,7 @@ Y_UNIT_TEST_SUITE(SystemView) { } Y_UNIT_TEST(StoragePoolsRanges) { - TTestEnv env(1, 0, 3); + TTestEnv env(1, 0, {.StoragePools = 3}); TTableClient client(env.GetDriver()); size_t rowCount = 0; @@ -1179,9 +1179,11 @@ Y_UNIT_TEST_SUITE(SystemView) { } } - size_t GetRowCount(TTableClient& client, const TString& name) { + size_t GetRowCount(TTableClient& client, const TString& tableName, const TString& condition = {}) { TStringBuilder query; - query << "SELECT * FROM `" << name << "`"; + query << "SELECT * FROM `" << tableName << "`"; + if (!condition.empty()) + query << " WHERE " << condition; auto it = client.StreamExecuteScanQuery(query).GetValueSync(); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); auto ysonString = NKqp::StreamResultToYson(it); @@ -1213,7 +1215,7 @@ Y_UNIT_TEST_SUITE(SystemView) { auto nowUs = TInstant::Now().MicroSeconds(); - TTestEnv env(1, 4, 0, 0, true); + TTestEnv env(1, 4, {.EnableSVP = true}); CreateTenantsAndTables(env); TTableClient client(env.GetDriver()); @@ -1265,7 +1267,7 @@ Y_UNIT_TEST_SUITE(SystemView) { constexpr ui64 partitionCount = 5; - TTestEnv env(1, 4, 0, 0, true); + TTestEnv env(1, 4, {.EnableSVP = true}); CreateTenantsAndTables(env, true, partitionCount); TTableClient client(env.GetDriver()); @@ -1295,7 +1297,7 @@ Y_UNIT_TEST_SUITE(SystemView) { constexpr ui64 partitionCount = 5; - TTestEnv env(1, 4, 0, 0, true); + TTestEnv env(1, 4, {.EnableSVP = true}); CreateTenantsAndTables(env, true, partitionCount); TTableClient client(env.GetDriver()); @@ -1376,51 +1378,175 @@ Y_UNIT_TEST_SUITE(SystemView) { auto nowUs = TInstant::Now().MicroSeconds(); - TTestEnv env(1, 4, 0, 0, true); - CreateTenantsAndTables(env); + TTestEnv env(1, 4, {.EnableSVP = true, .EnableForceFollowers = true}); + + auto& runtime = *env.GetServer().GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::SYSTEM_VIEWS, NLog::PRI_TRACE); TTableClient client(env.GetDriver()); + auto session = client.CreateSession().GetValueSync().GetSession(); + + CreateTenant(env, "Tenant1", true); + auto desc = TTableBuilder() + .AddNullableColumn("Key", EPrimitiveType::Uint64) + .SetPrimaryKeyColumn("Key") + .Build(); + + auto settings = TCreateTableSettings() + .ReplicationPolicy(TReplicationPolicy().ReplicasCount(3)); + + auto result = session.CreateTable("/Root/Tenant1/Table1", + std::move(desc), settings).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + Cerr << "... UPSERT" << Endl; + NKqp::AssertSuccessResult(session.ExecuteDataQuery(R"( + UPSERT INTO `Root/Tenant1/Table1` (Key) VALUES (1u), (2u), (3u); + )", TTxControl::BeginTx().CommitTx()).GetValueSync()); + + Cerr << "... SELECT from leader" << Endl; + { + auto result = session.ExecuteDataQuery(R"( + SELECT * FROM `Root/Tenant1/Table1` WHERE Key = 1; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + NKqp::AssertSuccessResult(result); + + TString actual = FormatResultSetYson(result.GetResultSet(0)); + NKqp::CompareYson(R"([ + [[1u]] + ])", actual); + } + + Cerr << "... SELECT from follower" << Endl; + { + auto result = session.ExecuteDataQuery(R"( + SELECT * FROM `Root/Tenant1/Table1` WHERE Key >= 2; + )", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync(); + NKqp::AssertSuccessResult(result); + + TString actual = FormatResultSetYson(result.GetResultSet(0)); + NKqp::CompareYson(R"([ + [[2u]]; + [[3u]] + ])", actual); + } + size_t rowCount = 0; - for (size_t iter = 0; iter < 30 && !rowCount; ++iter) { - rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); - if (!rowCount) { - Sleep(TDuration::Seconds(1)); - } + for (size_t iter = 0; iter < 30; ++iter) { + if (rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute", "FollowerId != 0")) + break; + Sleep(TDuration::Seconds(5)); } + UNIT_ASSERT_GE(rowCount, 0); + + { + auto result = session.ExecuteDataQuery(R"( + SELECT + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount, + FollowerId, + IF(FollowerId = 0, 'L', 'F') AS LeaderFollower + FROM `/Root/Tenant1/.sys/top_partitions_one_minute` + ORDER BY IntervalEnd, Rank; + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + NKqp::AssertSuccessResult(result); + + auto rs = result.GetResultSet(0); + + TString actual = FormatResultSetYson(rs); + Cerr << "\n\n\n\n\n\n\n\n" << actual << "\n\n\n\n\n\n\n\n" << Endl; + } + ui64 intervalEnd = GetIntervalEnd(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); - TStringBuilder query; - query << R"( - SELECT - IntervalEnd, - Rank, - TabletId, - Path, - PeakTime, - CPUCores, - NodeId, - DataSize, - RowCount, - IndexSize, - InFlightTxCount - FROM `/Root/Tenant1/.sys/top_partitions_one_minute`)" - << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp)"; - auto it = client.StreamExecuteScanQuery(query).GetValueSync(); - UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); - auto ysonString = NKqp::StreamResultToYson(it); - TYsonFieldChecker check(ysonString, 11); - check.Uint64(intervalEnd); // IntervalEnd - check.Uint64(1); // Rank - check.Uint64Greater(0); // TabletId - check.String("/Root/Tenant1/Table1"); // Path - check.Uint64GreaterOrEquals(nowUs); // PeakTime - check.DoubleGreaterOrEquals(0.); // CPUCores - check.Uint64Greater(0); // NodeId - check.Uint64Greater(0); // DataSize - check.Uint64(3); // RowCount - check.Uint64(0); // IndexSize - check.Uint64(0); // InFlightTxCount + + Cerr << "... SELECT leader from .sys/top_partitions_one_minute" << Endl; + { + TStringBuilder query; + query << R"( + SELECT + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount, + FollowerId + FROM `/Root/Tenant1/.sys/top_partitions_one_minute`)" + << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) AND FollowerId = 0"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto ysonString = NKqp::StreamResultToYson(it); + + TYsonFieldChecker check(ysonString, 12); + check.Uint64(intervalEnd); // IntervalEnd + check.Uint64(1); // Rank + check.Uint64Greater(0); // TabletId + check.String("/Root/Tenant1/Table1"); // Path + check.Uint64GreaterOrEquals(nowUs); // PeakTime + check.DoubleGreaterOrEquals(0.); // CPUCores + check.Uint64Greater(0); // NodeId + check.Uint64Greater(0); // DataSize + check.Uint64(3); // RowCount + check.Uint64(0); // IndexSize + check.Uint64(0); // InFlightTxCount + check.Uint64(0); // FollowerId + } + + Cerr << "... SELECT follower from .sys/top_partitions_one_minute" << Endl; + { + TStringBuilder query; + query << R"( + SELECT + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount, + FollowerId + FROM `/Root/Tenant1/.sys/top_partitions_one_minute`)" + << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) AND FollowerId != 0"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto ysonString = NKqp::StreamResultToYson(it); + + TYsonFieldChecker check(ysonString, 12); + check.Uint64(intervalEnd); // IntervalEnd + check.Uint64(2); // Rank + check.Uint64Greater(0); // TabletId + check.String("/Root/Tenant1/Table1"); // Path + check.Uint64GreaterOrEquals(nowUs); // PeakTime + check.DoubleGreaterOrEquals(0.); // CPUCores + check.Uint64Greater(0); // NodeId + check.Uint64Greater(0); // DataSize + check.Uint64(3); // RowCount + check.Uint64(0); // IndexSize + check.Uint64(0); // InFlightTxCount + check.Uint64Greater(0); // FollowerId + } } Y_UNIT_TEST(Describe) { diff --git a/ydb/core/sys_view/ut_labeled.cpp b/ydb/core/sys_view/ut_labeled.cpp index d27ca0065a91..42f442580aff 100644 --- a/ydb/core/sys_view/ut_labeled.cpp +++ b/ydb/core/sys_view/ut_labeled.cpp @@ -103,7 +103,7 @@ void GetCounters(TTestEnv& env, const TString& databaseName, const TString& data Y_UNIT_TEST_SUITE(LabeledDbCounters) { Y_UNIT_TEST(OneTablet) { - TTestEnv env(1, 2, 0, 1, true); + TTestEnv env(1, 2, {.PqTabletsN = 1, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); @@ -125,7 +125,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { Y_UNIT_TEST(OneTabletRemoveCounters) { - TTestEnv env(1, 2, 0, 1, true); + TTestEnv env(1, 2, {.PqTabletsN = 1, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); @@ -159,7 +159,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { Y_UNIT_TEST(OneTabletRestart) { - TTestEnv env(1, 2, 0, 1, true); + TTestEnv env(1, 2, {.PqTabletsN = 1, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); @@ -204,7 +204,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { } Y_UNIT_TEST(TwoTablets) { - TTestEnv env(1, 2, 0, 2, true); + TTestEnv env(1, 2, {.PqTabletsN = 2, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { @@ -227,7 +227,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { } Y_UNIT_TEST(TwoTabletsKillOneTablet) { - TTestEnv env(1, 2, 0, 2, true); + TTestEnv env(1, 2, {.PqTabletsN = 2, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor();