From 6d62b967e16ac31cdf3edf78858931af6fdf2f00 Mon Sep 17 00:00:00 2001 From: azevaykin Date: Fri, 25 Oct 2024 06:12:48 +0000 Subject: [PATCH 1/2] Follower stats in .sys/top_partitions --- ydb/core/sys_view/common/schema.h | 4 +- .../partition_stats/partition_stats.cpp | 51 +++-- .../partition_stats/top_partitions.cpp | 3 + .../sys_view/processor/processor_impl.cpp | 2 +- ydb/core/sys_view/processor/schema.h | 5 +- .../sys_view/processor/tx_top_partitions.cpp | 42 ++-- ydb/core/sys_view/ut_common.cpp | 14 +- ydb/core/sys_view/ut_common.h | 10 +- ydb/core/sys_view/ut_counters.cpp | 2 +- ydb/core/sys_view/ut_kqp.cpp | 190 +++++++++++++++++- ydb/core/sys_view/ut_labeled.cpp | 10 +- 11 files changed, 277 insertions(+), 56 deletions(-) diff --git a/ydb/core/sys_view/common/schema.h b/ydb/core/sys_view/common/schema.h index ce14d46c4698..619370f220ec 100644 --- a/ydb/core/sys_view/common/schema.h +++ b/ydb/core/sys_view/common/schema.h @@ -470,6 +470,7 @@ struct Schema : NIceDb::Schema { struct RowCount : Column<9, NScheme::NTypeIds::Uint64> {}; struct IndexSize : Column<10, NScheme::NTypeIds::Uint64> {}; struct InFlightTxCount : Column<11, NScheme::NTypeIds::Uint32> {}; + struct FollowerId : Column<12, NScheme::NTypeIds::Uint32> {}; using TKey = TableKey; using TColumns = TableColumns< @@ -483,7 +484,8 @@ struct Schema : NIceDb::Schema { DataSize, RowCount, IndexSize, - InFlightTxCount>; + InFlightTxCount, + FollowerId>; }; struct QuerySessions : Table<13> { diff --git a/ydb/core/sys_view/partition_stats/partition_stats.cpp b/ydb/core/sys_view/partition_stats/partition_stats.cpp index 8770055157c8..64ecd7e8ea40 100644 --- a/ydb/core/sys_view/partition_stats/partition_stats.cpp +++ b/ydb/core/sys_view/partition_stats/partition_stats.cpp @@ -83,7 +83,7 @@ class TPartitionStatsCollector : public TActorBootstrapped newPartitions; - std::unordered_set overloaded; + std::set overloaded; for (auto shardIdx : ev->Get()->ShardIndices) { auto old = oldPartitions.find(shardIdx); @@ -92,7 +92,7 @@ class TPartitionStatsCollector : public TActorBootstrappedsecond.FollowerStats) { if (IsPartitionOverloaded(followerStat.second)) - overloaded.insert(shardIdx); + overloaded.insert({shardIdx, followerStat.first}); } } } @@ -148,12 +148,13 @@ class TPartitionStatsCollector : public TActorBootstrappedsecond.erase(shardIdx); + overloadedFound->second.erase(overloadedFollower); if (overloadedFound->second.empty()) { tables.Overloaded.erase(pathId); } @@ -373,15 +374,16 @@ class TPartitionStatsCollector : public TActorBootstrapped sorted; - for (const auto& [pathId, shardIndices] : domainTables.Overloaded) { - for (const auto& shardIdx : shardIndices) { + for (const auto& [pathId, overloadedFollowers] : domainTables.Overloaded) { + for (const TOverloadedFollower& overloadedFollower : overloadedFollowers) { const auto& table = domainTables.Stats[pathId]; - const auto& partition = table.Partitions.at(shardIdx).FollowerStats.at(0); - sorted.emplace_back(TPartition{pathId, shardIdx, partition.GetCPUCores()}); + const auto& partition = table.Partitions.at(overloadedFollower.ShardIdx).FollowerStats.at(overloadedFollower.FollowerId); + sorted.emplace_back(TPartition{pathId, overloadedFollower.ShardIdx, overloadedFollower.FollowerId, partition.GetCPUCores()}); } } @@ -395,7 +397,9 @@ class TPartitionStatsCollector : public TActorBootstrapped(); for (const auto& entry : sorted) { const auto& table = domainTables.Stats[entry.PathId]; - const auto& partition = table.Partitions.at(entry.ShardIdx).FollowerStats.at(0); + const auto& followerStats = table.Partitions.at(entry.ShardIdx).FollowerStats; + const auto& partition = followerStats.at(entry.FollowerId); + const auto& leaderPartition = followerStats.at(0); auto* result = sendEvent->Record.AddPartitions(); result->SetTabletId(partition.GetTabletId()); @@ -403,10 +407,11 @@ class TPartitionStatsCollector : public TActorBootstrappedSetPeakTimeUs(nowUs); result->SetCPUCores(partition.GetCPUCores()); result->SetNodeId(partition.GetNodeId()); - result->SetDataSize(partition.GetDataSize()); - result->SetRowCount(partition.GetRowCount()); - result->SetIndexSize(partition.GetIndexSize()); + result->SetDataSize(leaderPartition.GetDataSize()); + result->SetRowCount(leaderPartition.GetRowCount()); + result->SetIndexSize(leaderPartition.GetIndexSize()); result->SetInFlightTxCount(partition.GetInFlightTxCount()); + result->SetFollowerId(partition.GetFollowerId()); if (++count == TOP_PARTITIONS_COUNT) { break; @@ -438,8 +443,7 @@ class TPartitionStatsCollector : public TActorBootstrapped= OverloadedPartitionBound - && !stats.GetFollowerId(); + return stats.GetCPUCores() >= OverloadedPartitionBound; } private: @@ -452,8 +456,10 @@ class TPartitionStatsCollector : public TActorBootstrapped FollowerStats; + std::unordered_map FollowerStats; }; struct TTableStats { @@ -462,9 +468,22 @@ class TPartitionStatsCollector : public TActorBootstrapped Stats; - std::unordered_map> Overloaded; + std::unordered_map> Overloaded; }; std::unordered_map DomainTables; diff --git a/ydb/core/sys_view/partition_stats/top_partitions.cpp b/ydb/core/sys_view/partition_stats/top_partitions.cpp index ca80d79231b7..b1705a9c3e65 100644 --- a/ydb/core/sys_view/partition_stats/top_partitions.cpp +++ b/ydb/core/sys_view/partition_stats/top_partitions.cpp @@ -58,6 +58,9 @@ struct TTopPartitionsExtractorMap : insert({S::InFlightTxCount::ColumnId, [] (const E& entry) { return TCell::Make(entry.GetInfo().GetInFlightTxCount()); }}); + insert({S::FollowerId::ColumnId, [] (const E& entry) { + return TCell::Make(entry.GetInfo().GetFollowerId()); + }}); } }; diff --git a/ydb/core/sys_view/processor/processor_impl.cpp b/ydb/core/sys_view/processor/processor_impl.cpp index e46d4fc56558..a60da604842a 100644 --- a/ydb/core/sys_view/processor/processor_impl.cpp +++ b/ydb/core/sys_view/processor/processor_impl.cpp @@ -283,7 +283,7 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) { auto clearPartitionTop = [&] (NKikimrSysView::EStatsType type, TPartitionTop& top) { for (const auto& partition : top) { - db.Table().Key((ui32)type, partition->GetTabletId()).Delete(); + db.Table().Key((ui32)type, partition->GetTabletId(), partition->GetFollowerId()).Delete(); } top.clear(); }; diff --git a/ydb/core/sys_view/processor/schema.h b/ydb/core/sys_view/processor/schema.h index e1684b3aaa79..23a1055e71a0 100644 --- a/ydb/core/sys_view/processor/schema.h +++ b/ydb/core/sys_view/processor/schema.h @@ -90,9 +90,10 @@ struct TProcessorSchema : NIceDb::Schema { }; struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {}; struct Data : Column<3, NScheme::NTypeIds::String> {}; + struct FollowerId : Column<4, NScheme::NTypeIds::Uint32> {}; - using TKey = TableKey; - using TColumns = TableColumns; + using TKey = TableKey; + using TColumns = TableColumns; }; #define RESULT_PARTITION_TABLE(TableName, TableID) \ diff --git a/ydb/core/sys_view/processor/tx_top_partitions.cpp b/ydb/core/sys_view/processor/tx_top_partitions.cpp index 2048e40e8372..205a0466b526 100644 --- a/ydb/core/sys_view/processor/tx_top_partitions.cpp +++ b/ydb/core/sys_view/processor/tx_top_partitions.cpp @@ -15,15 +15,18 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { void ProcessTop(NIceDb::TNiceDb& db, NKikimrSysView::EStatsType statsType, TPartitionTop& top) { + using TPartitionTopKey = std::pair; + TPartitionTop result; result.reserve(TOP_PARTITIONS_COUNT); - std::unordered_set seen; + std::unordered_set seen; size_t index = 0; auto topIt = top.begin(); auto copyNewPartition = [&] () { const auto& newPartition = Record.GetPartitions(index); - auto tabletId = newPartition.GetTabletId(); + const ui64 tabletId = newPartition.GetTabletId(); + const ui32 followerId = newPartition.GetFollowerId(); TString data; Y_PROTOBUF_SUPPRESS_NODISCARD newPartition.SerializeToString(&data); @@ -32,10 +35,10 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { partition->CopyFrom(newPartition); result.emplace_back(std::move(partition)); - db.Table().Key((ui32)statsType, tabletId).Update( + db.Table().Key((ui32)statsType, tabletId, followerId).Update( NIceDb::TUpdate(data)); - seen.insert(tabletId); + seen.insert({tabletId, followerId}); ++index; }; @@ -44,32 +47,36 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { if (index == Record.PartitionsSize()) { break; } - auto tabletId = Record.GetPartitions(index).GetTabletId(); - if (seen.find(tabletId) != seen.end()) { + const auto& partition = Record.GetPartitions(index); + const ui64 tabletId = partition.GetTabletId(); + const ui32 followerId = partition.GetFollowerId(); + if (seen.contains({tabletId, followerId})) { ++index; continue; } copyNewPartition(); } else { - auto topTabletId = (*topIt)->GetTabletId(); - if (seen.find(topTabletId) != seen.end()) { + const ui64 topTabletId = (*topIt)->GetTabletId(); + const ui32 topFollowerId = (*topIt)->GetFollowerId(); + if (seen.contains({topTabletId, topFollowerId})) { ++topIt; continue; } if (index == Record.PartitionsSize()) { result.emplace_back(std::move(*topIt++)); - seen.insert(topTabletId); + seen.insert({topTabletId, topFollowerId}); continue; } - auto& newPartition = Record.GetPartitions(index); - auto tabletId = newPartition.GetTabletId(); - if (seen.find(tabletId) != seen.end()) { + const auto& newPartition = Record.GetPartitions(index); + const ui64 tabletId = newPartition.GetTabletId(); + const ui32 followerId = newPartition.GetFollowerId(); + if (seen.contains({tabletId, followerId})) { ++index; continue; } if ((*topIt)->GetCPUCores() >= newPartition.GetCPUCores()) { result.emplace_back(std::move(*topIt++)); - seen.insert(topTabletId); + seen.insert({topTabletId, topFollowerId}); } else { copyNewPartition(); } @@ -77,11 +84,12 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { } for (; topIt != top.end(); ++topIt) { - auto topTabletId = (*topIt)->GetTabletId(); - if (seen.find(topTabletId) != seen.end()) { + const ui64 topTabletId = (*topIt)->GetTabletId(); + const ui64 topFollowerId = (*topIt)->GetFollowerId(); + if (seen.contains({topTabletId, topFollowerId})) { continue; } - db.Table().Key((ui32)statsType, topTabletId).Delete(); + db.Table().Key((ui32)statsType, topTabletId, topFollowerId).Delete(); } top.swap(result); @@ -108,6 +116,8 @@ void TSysViewProcessor::Handle(TEvSysView::TEvSendTopPartitions::TPtr& ev) { auto timeUs = record.GetTimeUs(); auto partitionIntervalEnd = IntervalEnd + TotalInterval; + SVLOG_T("TEvSysView::TEvSendTopPartitions: " << " record " << record.ShortDebugString()); + if (timeUs < IntervalEnd.MicroSeconds() || timeUs >= partitionIntervalEnd.MicroSeconds()) { SVLOG_W("[" << TabletID() << "] TEvSendTopPartitions, time mismath: " << ", partition interval end# " << partitionIntervalEnd diff --git a/ydb/core/sys_view/ut_common.cpp b/ydb/core/sys_view/ut_common.cpp index 10fd9f068c3e..e88503fcc282 100644 --- a/ydb/core/sys_view/ut_common.cpp +++ b/ydb/core/sys_view/ut_common.cpp @@ -26,7 +26,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString return subdomain; } -TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 pqTabletsN, bool enableSVP) { +TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, const TTestEnvSettings& settings) { auto mbusPort = PortManager.GetPort(); auto grpcPort = PortManager.GetPort(); @@ -44,16 +44,18 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 NKikimrConfig::TFeatureFlags featureFlags; featureFlags.SetEnableBackgroundCompaction(false); featureFlags.SetEnableResourcePools(true); + featureFlags.SetEnableFollowerStats(true); Settings->SetFeatureFlags(featureFlags); - Settings->SetEnablePersistentQueryStats(enableSVP); - Settings->SetEnableDbCounters(enableSVP); + Settings->SetEnablePersistentQueryStats(settings.EnableSVP); + Settings->SetEnableDbCounters(settings.EnableSVP); + Settings->SetEnableForceFollowers(settings.EnableForceFollowers); NKikimrConfig::TAppConfig appConfig; *appConfig.MutableFeatureFlags() = Settings->FeatureFlags; Settings->SetAppConfig(appConfig); - for (ui32 i : xrange(storagePools)) { + for (ui32 i : xrange(settings.StoragePools)) { TString poolName = Sprintf("test%d", i); Settings->AddStoragePool(poolName, TString("/Root:") + poolName, 2); } @@ -74,9 +76,9 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 Client->InitRootScheme("Root"); - if (pqTabletsN) { + if (settings.PqTabletsN) { NKikimr::NPQ::FillPQConfig(Settings->PQConfig, "/Root/PQ", true); - PqTabletIds = Server->StartPQTablets(pqTabletsN); + PqTabletIds = Server->StartPQTablets(settings.PqTabletsN); } Endpoint = "localhost:" + ToString(grpcPort); diff --git a/ydb/core/sys_view/ut_common.h b/ydb/core/sys_view/ut_common.h index 1ae840655acf..3587eaca66b8 100644 --- a/ydb/core/sys_view/ut_common.h +++ b/ydb/core/sys_view/ut_common.h @@ -16,14 +16,20 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings( NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings( const TString &name, const TStoragePools &pools = {}); +struct TTestEnvSettings { + ui32 StoragePools = 0; + ui32 PqTabletsN = 0; + bool EnableSVP = false; + bool EnableForceFollowers = false; +}; + class TTestEnv { public: class TDisableSourcesTag {}; static TDisableSourcesTag DisableSourcesTag; public: - TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, ui32 storagePools = 0, - ui32 pqTabletsN = 0, bool enableSVP = false); + TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, const TTestEnvSettings& settings = {}); ~TTestEnv(); diff --git a/ydb/core/sys_view/ut_counters.cpp b/ydb/core/sys_view/ut_counters.cpp index 66623084656e..c9e05515563d 100644 --- a/ydb/core/sys_view/ut_counters.cpp +++ b/ydb/core/sys_view/ut_counters.cpp @@ -75,7 +75,7 @@ void CreateDatabasesAndTables(TTestEnv& env) { Y_UNIT_TEST_SUITE(DbCounters) { Y_UNIT_TEST(TabletsSimple) { - TTestEnv env(1, 2, 0, 0, true); + TTestEnv env(1, 2, {.EnableSVP = true}); CreateDatabasesAndTables(env); for (size_t iter = 0; iter < 30; ++iter) { diff --git a/ydb/core/sys_view/ut_kqp.cpp b/ydb/core/sys_view/ut_kqp.cpp index 93a08cfd1f67..f6ba05bdc1aa 100644 --- a/ydb/core/sys_view/ut_kqp.cpp +++ b/ydb/core/sys_view/ut_kqp.cpp @@ -1108,7 +1108,7 @@ Y_UNIT_TEST_SUITE(SystemView) { } Y_UNIT_TEST(StoragePoolsRanges) { - TTestEnv env(1, 0, 3); + TTestEnv env(1, 0, {.StoragePools = 3}); TTableClient client(env.GetDriver()); size_t rowCount = 0; @@ -1179,9 +1179,11 @@ Y_UNIT_TEST_SUITE(SystemView) { } } - size_t GetRowCount(TTableClient& client, const TString& name) { + size_t GetRowCount(TTableClient& client, const TString& tableName, const TString& condition = {}) { TStringBuilder query; - query << "SELECT * FROM `" << name << "`"; + query << "SELECT * FROM `" << tableName << "`"; + if (!condition.empty()) + query << " WHERE " << condition; auto it = client.StreamExecuteScanQuery(query).GetValueSync(); UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); auto ysonString = NKqp::StreamResultToYson(it); @@ -1213,7 +1215,7 @@ Y_UNIT_TEST_SUITE(SystemView) { auto nowUs = TInstant::Now().MicroSeconds(); - TTestEnv env(1, 4, 0, 0, true); + TTestEnv env(1, 4, {.EnableSVP = true}); CreateTenantsAndTables(env); TTableClient client(env.GetDriver()); @@ -1265,7 +1267,7 @@ Y_UNIT_TEST_SUITE(SystemView) { constexpr ui64 partitionCount = 5; - TTestEnv env(1, 4, 0, 0, true); + TTestEnv env(1, 4, {.EnableSVP = true}); CreateTenantsAndTables(env, true, partitionCount); TTableClient client(env.GetDriver()); @@ -1295,7 +1297,7 @@ Y_UNIT_TEST_SUITE(SystemView) { constexpr ui64 partitionCount = 5; - TTestEnv env(1, 4, 0, 0, true); + TTestEnv env(1, 4, {.EnableSVP = true}); CreateTenantsAndTables(env, true, partitionCount); TTableClient client(env.GetDriver()); @@ -1371,6 +1373,182 @@ Y_UNIT_TEST_SUITE(SystemView) { } } + Y_UNIT_TEST(TopPartitionsFollowers) { + NDataShard::gDbStatsReportInterval = TDuration::Seconds(0); + + auto nowUs = TInstant::Now().MicroSeconds(); + + TTestEnv env(1, 4, {.EnableSVP = true, .EnableForceFollowers = true}); + + auto& runtime = *env.GetServer().GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::SYSTEM_VIEWS, NLog::PRI_TRACE); + + TTableClient client(env.GetDriver()); + auto session = client.CreateSession().GetValueSync().GetSession(); + + CreateTenant(env, "Tenant1", true); + auto desc = TTableBuilder() + .AddNullableColumn("Key", EPrimitiveType::Uint64) + .SetPrimaryKeyColumn("Key") + .Build(); + + auto settings = TCreateTableSettings() + .ReplicationPolicy(TReplicationPolicy().ReplicasCount(3)); + + auto result = session.CreateTable("/Root/Tenant1/Table1", + std::move(desc), settings).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + Cerr << "... UPSERT" << Endl; + NKqp::AssertSuccessResult(session.ExecuteDataQuery(R"( + UPSERT INTO `Root/Tenant1/Table1` (Key) VALUES (1u), (2u), (3u); + )", TTxControl::BeginTx().CommitTx()).GetValueSync()); + + Cerr << "... SELECT from leader" << Endl; + { + auto result = session.ExecuteDataQuery(R"( + SELECT * FROM `Root/Tenant1/Table1` WHERE Key = 1; + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + NKqp::AssertSuccessResult(result); + + TString actual = FormatResultSetYson(result.GetResultSet(0)); + NKqp::CompareYson(R"([ + [[1u]] + ])", actual); + } + + Cerr << "... SELECT from follower" << Endl; + { + auto result = session.ExecuteDataQuery(R"( + SELECT * FROM `Root/Tenant1/Table1` WHERE Key >= 2; + )", TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync(); + NKqp::AssertSuccessResult(result); + + TString actual = FormatResultSetYson(result.GetResultSet(0)); + NKqp::CompareYson(R"([ + [[2u]]; + [[3u]] + ])", actual); + } + + size_t rowCount = 0; + for (size_t iter = 0; iter < 30; ++iter) { + if (rowCount = GetRowCount(client, "/Root/Tenant1/.sys/top_partitions_one_minute", "FollowerId != 0")) + break; + Sleep(TDuration::Seconds(5)); + } + UNIT_ASSERT_GE(rowCount, 0); + + { + auto result = session.ExecuteDataQuery(R"( + SELECT + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount, + FollowerId, + IF(FollowerId = 0, 'L', 'F') AS LeaderFollower + FROM `/Root/Tenant1/.sys/top_partitions_one_minute` + ORDER BY IntervalEnd, Rank; + )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).ExtractValueSync(); + NKqp::AssertSuccessResult(result); + + auto rs = result.GetResultSet(0); + + TString actual = FormatResultSetYson(rs); + Cerr << "\n\n\n\n\n\n\n\n" << actual << "\n\n\n\n\n\n\n\n" << Endl; + } + + ui64 intervalEnd = GetIntervalEnd(client, "/Root/Tenant1/.sys/top_partitions_one_minute"); + + + + Cerr << "... SELECT leader from .sys/top_partitions_one_minute" << Endl; + { + TStringBuilder query; + query << R"( + SELECT + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount, + FollowerId + FROM `/Root/Tenant1/.sys/top_partitions_one_minute`)" + << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) AND FollowerId = 0"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto ysonString = NKqp::StreamResultToYson(it); + + TYsonFieldChecker check(ysonString, 12); + check.Uint64(intervalEnd); // IntervalEnd + check.Uint64(1); // Rank + check.Uint64Greater(0); // TabletId + check.String("/Root/Tenant1/Table1"); // Path + check.Uint64GreaterOrEquals(nowUs); // PeakTime + check.DoubleGreaterOrEquals(0.); // CPUCores + check.Uint64Greater(0); // NodeId + check.Uint64Greater(0); // DataSize + check.Uint64(3); // RowCount + check.Uint64(0); // IndexSize + check.Uint64(0); // InFlightTxCount + check.Uint64(0); // FollowerId + } + + Cerr << "... SELECT follower from .sys/top_partitions_one_minute" << Endl; + { + TStringBuilder query; + query << R"( + SELECT + IntervalEnd, + Rank, + TabletId, + Path, + PeakTime, + CPUCores, + NodeId, + DataSize, + RowCount, + IndexSize, + InFlightTxCount, + FollowerId + FROM `/Root/Tenant1/.sys/top_partitions_one_minute`)" + << "WHERE IntervalEnd = CAST(" << intervalEnd << "ul as Timestamp) AND FollowerId != 0"; + auto it = client.StreamExecuteScanQuery(query).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString()); + auto ysonString = NKqp::StreamResultToYson(it); + + TYsonFieldChecker check(ysonString, 12); + check.Uint64(intervalEnd); // IntervalEnd + check.Uint64(2); // Rank + check.Uint64Greater(0); // TabletId + check.String("/Root/Tenant1/Table1"); // Path + check.Uint64GreaterOrEquals(nowUs); // PeakTime + check.DoubleGreaterOrEquals(0.); // CPUCores + check.Uint64Greater(0); // NodeId + check.Uint64Greater(0); // DataSize + check.Uint64(3); // RowCount + check.Uint64(0); // IndexSize + check.Uint64(0); // InFlightTxCount + check.Uint64Greater(0); // FollowerId + } + } + Y_UNIT_TEST(Describe) { TTestEnv env; CreateRootTable(env); diff --git a/ydb/core/sys_view/ut_labeled.cpp b/ydb/core/sys_view/ut_labeled.cpp index d27ca0065a91..42f442580aff 100644 --- a/ydb/core/sys_view/ut_labeled.cpp +++ b/ydb/core/sys_view/ut_labeled.cpp @@ -103,7 +103,7 @@ void GetCounters(TTestEnv& env, const TString& databaseName, const TString& data Y_UNIT_TEST_SUITE(LabeledDbCounters) { Y_UNIT_TEST(OneTablet) { - TTestEnv env(1, 2, 0, 1, true); + TTestEnv env(1, 2, {.PqTabletsN = 1, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); @@ -125,7 +125,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { Y_UNIT_TEST(OneTabletRemoveCounters) { - TTestEnv env(1, 2, 0, 1, true); + TTestEnv env(1, 2, {.PqTabletsN = 1, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); @@ -159,7 +159,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { Y_UNIT_TEST(OneTabletRestart) { - TTestEnv env(1, 2, 0, 1, true); + TTestEnv env(1, 2, {.PqTabletsN = 1, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); @@ -204,7 +204,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { } Y_UNIT_TEST(TwoTablets) { - TTestEnv env(1, 2, 0, 2, true); + TTestEnv env(1, 2, {.PqTabletsN = 2, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto check = [](::NMonitoring::TDynamicCounterPtr topicGroup) { @@ -227,7 +227,7 @@ Y_UNIT_TEST_SUITE(LabeledDbCounters) { } Y_UNIT_TEST(TwoTabletsKillOneTablet) { - TTestEnv env(1, 2, 0, 2, true); + TTestEnv env(1, 2, {.PqTabletsN = 2, .EnableSVP = true}); const TString databaseName = NPQ::TTabletPreparationParameters().databaseId; const TString databasePath = NPQ::TTabletPreparationParameters().databasePath; auto edge = env.GetServer().GetRuntime()->AllocateEdgeActor(); From bf1d95688cfe79cab3d48ae12bc3148b6db6aea2 Mon Sep 17 00:00:00 2001 From: azevaykin Date: Fri, 25 Oct 2024 09:00:29 +0000 Subject: [PATCH 2/2] New table --- .../partition_stats/partition_stats.cpp | 5 ++ .../sys_view/processor/processor_impl.cpp | 5 +- ydb/core/sys_view/processor/schema.h | 16 +++- ydb/core/sys_view/processor/tx_init.cpp | 79 +++++++++++-------- .../sys_view/processor/tx_top_partitions.cpp | 16 +++- 5 files changed, 82 insertions(+), 39 deletions(-) diff --git a/ydb/core/sys_view/partition_stats/partition_stats.cpp b/ydb/core/sys_view/partition_stats/partition_stats.cpp index 64ecd7e8ea40..1a2f0fe9fe72 100644 --- a/ydb/core/sys_view/partition_stats/partition_stats.cpp +++ b/ydb/core/sys_view/partition_stats/partition_stats.cpp @@ -76,6 +76,11 @@ class TPartitionStatsCollector : public TActorBootstrappedGet()->DomainKey; const auto& pathId = ev->Get()->PathId; + SVLOG_T("TEvSysView::TEvSetPartitioning: domainKey " << domainKey + << " pathId " << pathId + << " path " << ev->Get()->Path + << " ShardIndices size " << ev->Get()->ShardIndices.size()); + auto& tables = DomainTables[domainKey]; auto tableFound = tables.Stats.find(pathId); if (tableFound != tables.Stats.end()) { diff --git a/ydb/core/sys_view/processor/processor_impl.cpp b/ydb/core/sys_view/processor/processor_impl.cpp index a60da604842a..d7615d0f1d58 100644 --- a/ydb/core/sys_view/processor/processor_impl.cpp +++ b/ydb/core/sys_view/processor/processor_impl.cpp @@ -283,7 +283,10 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) { auto clearPartitionTop = [&] (NKikimrSysView::EStatsType type, TPartitionTop& top) { for (const auto& partition : top) { - db.Table().Key((ui32)type, partition->GetTabletId(), partition->GetFollowerId()).Delete(); + if (partition->GetFollowerId() == 0) + db.Table().Key((ui32)type, partition->GetTabletId()).Delete(); + else + db.Table().Key((ui32)type, partition->GetTabletId(), partition->GetFollowerId()).Delete(); } top.clear(); }; diff --git a/ydb/core/sys_view/processor/schema.h b/ydb/core/sys_view/processor/schema.h index 23a1055e71a0..bb9ff08a500e 100644 --- a/ydb/core/sys_view/processor/schema.h +++ b/ydb/core/sys_view/processor/schema.h @@ -90,11 +90,22 @@ struct TProcessorSchema : NIceDb::Schema { }; struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {}; struct Data : Column<3, NScheme::NTypeIds::String> {}; - struct FollowerId : Column<4, NScheme::NTypeIds::Uint32> {}; + + using TKey = TableKey; + using TColumns = TableColumns; + }; + + struct IntervalPartitionFollowerTops : Table<19> { + struct TypeCol : Column<1, NScheme::NTypeIds::Uint32> { + static TString GetColumnName(const TString&) { return "Type"; } + }; + struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {}; + struct FollowerId : Column<3, NScheme::NTypeIds::Uint32> {}; + struct Data : Column<4, NScheme::NTypeIds::String> {}; using TKey = TableKey; using TColumns = TableColumns; - }; + }; #define RESULT_PARTITION_TABLE(TableName, TableID) \ struct TableName : Table { \ @@ -128,6 +139,7 @@ struct TProcessorSchema : NIceDb::Schema { TopByRequestUnitsOneMinute, TopByRequestUnitsOneHour, IntervalPartitionTops, + IntervalPartitionFollowerTops, TopPartitionsOneMinute, TopPartitionsOneHour >; diff --git a/ydb/core/sys_view/processor/tx_init.cpp b/ydb/core/sys_view/processor/tx_init.cpp index dbb48e4f78c0..98a78e1b6904 100644 --- a/ydb/core/sys_view/processor/tx_init.cpp +++ b/ydb/core/sys_view/processor/tx_init.cpp @@ -84,6 +84,47 @@ struct TSysViewProcessor::TTxInit : public TTxBase { return true; }; + template + bool LoadIntervalPartitionTops(NIceDb::TNiceDb& db) { + auto rowset = db.Table().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + size_t partCount = 0; + while (!rowset.EndOfSet()) { + ui32 type = rowset.template GetValue(); + TString data = rowset.template GetValue(); + + if (data) { + auto partition = MakeHolder(); + Y_PROTOBUF_SUPPRESS_NODISCARD partition->ParseFromString(data); + + switch ((NKikimrSysView::EStatsType)type) { + case NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE: + Self->PartitionTopMinute.emplace_back(std::move(partition)); + break; + case NKikimrSysView::TOP_PARTITIONS_ONE_HOUR: + Self->PartitionTopHour.emplace_back(std::move(partition)); + break; + default: + SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected partition stats type: " << type); + } + ++partCount; + } + + if (!rowset.Next()) { + return false; + } + } + + SVLOG_D("[" << Self->TabletID() << "] Loading results: " + << "table# " << S::TableId + << ", partCount count# " << partCount); + + return true; + } + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { SVLOG_D("[" << Self->TabletID() << "] TTxInit::Execute"); @@ -107,6 +148,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase { auto reqUnitsOneMinuteRowset = db.Table().Range().Select(); auto reqUnitsOneHourRowset = db.Table().Range().Select(); auto intervalPartitionTopsRowset = db.Table().Range().Select(); + auto intervalPartitionFollowerTopsRowset = db.Table().Range().Select(); auto topPartitionsOneMinuteRowset = db.Table().Range().Select(); auto topPartitionsOneHourRowset = db.Table().Range().Select(); @@ -126,6 +168,7 @@ struct TSysViewProcessor::TTxInit : public TTxBase { !reqUnitsOneMinuteRowset.IsReady() || !reqUnitsOneHourRowset.IsReady() || !intervalPartitionTopsRowset.IsReady() || + !intervalPartitionFollowerTopsRowset.IsReady() || !topPartitionsOneMinuteRowset.IsReady() || !topPartitionsOneHourRowset.IsReady()) { @@ -390,37 +433,10 @@ struct TSysViewProcessor::TTxInit : public TTxBase { Self->PartitionTopHour.clear(); Self->PartitionTopHour.reserve(TOP_PARTITIONS_COUNT); - auto rowset = db.Table().Range().Select(); - if (!rowset.IsReady()) { + if (!LoadIntervalPartitionTops(db)) + return false; + if (!LoadIntervalPartitionTops(db)) return false; - } - - size_t partCount = 0; - while (!rowset.EndOfSet()) { - ui32 type = rowset.GetValue(); - TString data = rowset.GetValue(); - - if (data) { - auto partition = MakeHolder(); - Y_PROTOBUF_SUPPRESS_NODISCARD partition->ParseFromString(data); - - switch ((NKikimrSysView::EStatsType)type) { - case NKikimrSysView::TOP_PARTITIONS_ONE_MINUTE: - Self->PartitionTopMinute.emplace_back(std::move(partition)); - break; - case NKikimrSysView::TOP_PARTITIONS_ONE_HOUR: - Self->PartitionTopHour.emplace_back(std::move(partition)); - break; - default: - SVLOG_CRIT("[" << Self->TabletID() << "] ignoring unexpected partition stats type: " << type); - } - ++partCount; - } - - if (!rowset.Next()) { - return false; - } - } auto compare = [] (const auto& l, const auto& r) { return l->GetCPUCores() == r->GetCPUCores() ? @@ -429,9 +445,6 @@ struct TSysViewProcessor::TTxInit : public TTxBase { std::sort(Self->PartitionTopMinute.begin(), Self->PartitionTopMinute.end(), compare); std::sort(Self->PartitionTopHour.begin(), Self->PartitionTopHour.end(), compare); - - SVLOG_D("[" << Self->TabletID() << "] Loading interval partition tops: " - << "partition count# " << partCount); } // TopPartitions... diff --git a/ydb/core/sys_view/processor/tx_top_partitions.cpp b/ydb/core/sys_view/processor/tx_top_partitions.cpp index 205a0466b526..42a0d89f4aa9 100644 --- a/ydb/core/sys_view/processor/tx_top_partitions.cpp +++ b/ydb/core/sys_view/processor/tx_top_partitions.cpp @@ -35,8 +35,13 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { partition->CopyFrom(newPartition); result.emplace_back(std::move(partition)); - db.Table().Key((ui32)statsType, tabletId, followerId).Update( - NIceDb::TUpdate(data)); + if (followerId == 0) { + db.Table().Key((ui32)statsType, tabletId).Update( + NIceDb::TUpdate(data)); + } else { + db.Table().Key((ui32)statsType, tabletId, followerId).Update( + NIceDb::TUpdate(data)); + } seen.insert({tabletId, followerId}); ++index; @@ -89,7 +94,12 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase { if (seen.contains({topTabletId, topFollowerId})) { continue; } - db.Table().Key((ui32)statsType, topTabletId, topFollowerId).Delete(); + + if (topFollowerId == 0) { + db.Table().Key((ui32)statsType, topTabletId).Delete(); + } else { + db.Table().Key((ui32)statsType, topTabletId, topFollowerId).Delete(); + } } top.swap(result);