Skip to content

Commit

Permalink
Merge 829f7ba into 365e985
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Oct 24, 2024
2 parents 365e985 + 829f7ba commit 6514a56
Show file tree
Hide file tree
Showing 11 changed files with 278 additions and 57 deletions.
6 changes: 4 additions & 2 deletions ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,9 @@ struct Schema : NIceDb::Schema {
struct RowCount : Column<9, NScheme::NTypeIds::Uint64> {};
struct IndexSize : Column<10, NScheme::NTypeIds::Uint64> {};
struct InFlightTxCount : Column<11, NScheme::NTypeIds::Uint32> {};
struct FollowerId : Column<12, NScheme::NTypeIds::Uint32> {};

using TKey = TableKey<IntervalEnd, Rank>;
using TKey = TableKey<IntervalEnd, Rank, FollowerId>;
using TColumns = TableColumns<
IntervalEnd,
Rank,
Expand All @@ -483,7 +484,8 @@ struct Schema : NIceDb::Schema {
DataSize,
RowCount,
IndexSize,
InFlightTxCount>;
InFlightTxCount,
FollowerId>;
};

struct QuerySessions : Table<13> {
Expand Down
51 changes: 35 additions & 16 deletions ydb/core/sys_view/partition_stats/partition_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

auto& oldPartitions = table.Partitions;
std::unordered_map<TShardIdx, TPartitionStats> newPartitions;
std::unordered_set<TShardIdx> overloaded;
std::set<TOverloadedFollower> overloaded;

for (auto shardIdx : ev->Get()->ShardIndices) {
auto old = oldPartitions.find(shardIdx);
Expand All @@ -92,7 +92,7 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

for (const auto& followerStat: old->second.FollowerStats) {
if (IsPartitionOverloaded(followerStat.second))
overloaded.insert(shardIdx);
overloaded.insert({shardIdx, followerStat.first});
}
}
}
Expand Down Expand Up @@ -148,12 +148,13 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

auto& followerStats = partitionStats.FollowerStats[followerId];

TOverloadedFollower overloadedFollower = {shardIdx, followerId};
if (IsPartitionOverloaded(newStats)) {
tables.Overloaded[pathId].insert(shardIdx);
tables.Overloaded[pathId].insert(overloadedFollower);
} else {
auto overloadedFound = tables.Overloaded.find(pathId);
if (overloadedFound != tables.Overloaded.end()) {
overloadedFound->second.erase(shardIdx);
overloadedFound->second.erase(overloadedFollower);
if (overloadedFound->second.empty()) {
tables.Overloaded.erase(pathId);
}
Expand Down Expand Up @@ -373,15 +374,16 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
struct TPartition {
TPathId PathId;
TShardIdx ShardIdx;
ui32 FollowerId;
double CPUCores;
};
std::vector<TPartition> sorted;

for (const auto& [pathId, shardIndices] : domainTables.Overloaded) {
for (const auto& shardIdx : shardIndices) {
for (const auto& [pathId, overloadedFollowers] : domainTables.Overloaded) {
for (const TOverloadedFollower& overloadedFollower : overloadedFollowers) {
const auto& table = domainTables.Stats[pathId];
const auto& partition = table.Partitions.at(shardIdx).FollowerStats.at(0);
sorted.emplace_back(TPartition{pathId, shardIdx, partition.GetCPUCores()});
const auto& partition = table.Partitions.at(overloadedFollower.ShardIdx).FollowerStats.at(overloadedFollower.FollowerId);
sorted.emplace_back(TPartition{pathId, overloadedFollower.ShardIdx, overloadedFollower.FollowerId, partition.GetCPUCores()});
}
}

Expand All @@ -395,18 +397,21 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
auto sendEvent = MakeHolder<TEvSysView::TEvSendTopPartitions>();
for (const auto& entry : sorted) {
const auto& table = domainTables.Stats[entry.PathId];
const auto& partition = table.Partitions.at(entry.ShardIdx).FollowerStats.at(0);
const auto& followerStats = table.Partitions.at(entry.ShardIdx).FollowerStats;
const auto& partition = followerStats.at(entry.FollowerId);
const auto& leaderPartition = followerStats.at(0);

auto* result = sendEvent->Record.AddPartitions();
result->SetTabletId(partition.GetTabletId());
result->SetPath(table.Path);
result->SetPeakTimeUs(nowUs);
result->SetCPUCores(partition.GetCPUCores());
result->SetNodeId(partition.GetNodeId());
result->SetDataSize(partition.GetDataSize());
result->SetRowCount(partition.GetRowCount());
result->SetIndexSize(partition.GetIndexSize());
result->SetDataSize(leaderPartition.GetDataSize());
result->SetRowCount(leaderPartition.GetRowCount());
result->SetIndexSize(leaderPartition.GetIndexSize());
result->SetInFlightTxCount(partition.GetInFlightTxCount());
result->SetFollowerId(partition.GetFollowerId());

if (++count == TOP_PARTITIONS_COUNT) {
break;
Expand Down Expand Up @@ -438,8 +443,7 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
}

bool IsPartitionOverloaded(const NKikimrSysView::TPartitionStats& stats) const {
return stats.GetCPUCores() >= OverloadedPartitionBound
&& !stats.GetFollowerId();
return stats.GetCPUCores() >= OverloadedPartitionBound;
}

private:
Expand All @@ -452,8 +456,10 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
double OverloadedPartitionBound = 0.7;
TDuration ProcessOverloadedInterval = TDuration::Seconds(15);

typedef ui32 TFollowerId;

struct TPartitionStats {
std::unordered_map<ui32, NKikimrSysView::TPartitionStats> FollowerStats;
std::unordered_map<TFollowerId, NKikimrSysView::TPartitionStats> FollowerStats;
};

struct TTableStats {
Expand All @@ -462,9 +468,22 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
TString Path;
};

struct TOverloadedFollower {
TShardIdx ShardIdx;
TFollowerId FollowerId;

bool operator<(const TOverloadedFollower &other) const {
return std::tie(ShardIdx, FollowerId) < std::tie(other.ShardIdx, other.FollowerId);
}

bool operator==(const TOverloadedFollower &other) const {
return std::tie(ShardIdx, FollowerId) == std::tie(other.ShardIdx, other.FollowerId);
}
};

struct TDomainTables {
std::map<TPathId, TTableStats> Stats;
std::unordered_map<TPathId, std::unordered_set<TShardIdx>> Overloaded;
std::unordered_map<TPathId, std::set<TOverloadedFollower>> Overloaded;
};
std::unordered_map<TPathId, TDomainTables> DomainTables;

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/sys_view/partition_stats/top_partitions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ struct TTopPartitionsExtractorMap :
insert({S::InFlightTxCount::ColumnId, [] (const E& entry) {
return TCell::Make<ui32>(entry.GetInfo().GetInFlightTxCount());
}});
insert({S::FollowerId::ColumnId, [] (const E& entry) {
return TCell::Make<ui32>(entry.GetInfo().GetFollowerId());
}});
}
};

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/sys_view/processor/processor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) {

auto clearPartitionTop = [&] (NKikimrSysView::EStatsType type, TPartitionTop& top) {
for (const auto& partition : top) {
db.Table<Schema::IntervalPartitionTops>().Key((ui32)type, partition->GetTabletId()).Delete();
db.Table<Schema::IntervalPartitionTops>().Key((ui32)type, partition->GetTabletId(), partition->GetFollowerId()).Delete();
}
top.clear();
};
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/sys_view/processor/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ struct TProcessorSchema : NIceDb::Schema {
};
struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {};
struct Data : Column<3, NScheme::NTypeIds::String> {};
struct FollowerId : Column<4, NScheme::NTypeIds::Uint32> {};

using TKey = TableKey<TypeCol, TabletId>;
using TColumns = TableColumns<TypeCol, TabletId, Data>;
using TKey = TableKey<TypeCol, TabletId, FollowerId>;
using TColumns = TableColumns<TypeCol, TabletId, FollowerId, Data>;
};

#define RESULT_PARTITION_TABLE(TableName, TableID) \
Expand Down
42 changes: 26 additions & 16 deletions ydb/core/sys_view/processor/tx_top_partitions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase {
void ProcessTop(NIceDb::TNiceDb& db, NKikimrSysView::EStatsType statsType,
TPartitionTop& top)
{
using TPartitionTopKey = std::pair<ui64, ui32>;

TPartitionTop result;
result.reserve(TOP_PARTITIONS_COUNT);
std::unordered_set<ui64> seen;
std::unordered_set<TPartitionTopKey> seen;
size_t index = 0;
auto topIt = top.begin();

auto copyNewPartition = [&] () {
const auto& newPartition = Record.GetPartitions(index);
auto tabletId = newPartition.GetTabletId();
const ui64 tabletId = newPartition.GetTabletId();
const ui32 followerId = newPartition.GetFollowerId();

TString data;
Y_PROTOBUF_SUPPRESS_NODISCARD newPartition.SerializeToString(&data);
Expand All @@ -32,10 +35,10 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase {
partition->CopyFrom(newPartition);
result.emplace_back(std::move(partition));

db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, tabletId).Update(
db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, tabletId, followerId).Update(
NIceDb::TUpdate<Schema::IntervalPartitionTops::Data>(data));

seen.insert(tabletId);
seen.insert({tabletId, followerId});
++index;
};

Expand All @@ -44,44 +47,49 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase {
if (index == Record.PartitionsSize()) {
break;
}
auto tabletId = Record.GetPartitions(index).GetTabletId();
if (seen.find(tabletId) != seen.end()) {
const auto& partition = Record.GetPartitions(index);
const ui64 tabletId = partition.GetTabletId();
const ui32 followerId = partition.GetFollowerId();
if (seen.contains({tabletId, followerId})) {
++index;
continue;
}
copyNewPartition();
} else {
auto topTabletId = (*topIt)->GetTabletId();
if (seen.find(topTabletId) != seen.end()) {
const ui64 topTabletId = (*topIt)->GetTabletId();
const ui32 topFollowerId = (*topIt)->GetFollowerId();
if (seen.contains({topTabletId, topFollowerId})) {
++topIt;
continue;
}
if (index == Record.PartitionsSize()) {
result.emplace_back(std::move(*topIt++));
seen.insert(topTabletId);
seen.insert({topTabletId, topFollowerId});
continue;
}
auto& newPartition = Record.GetPartitions(index);
auto tabletId = newPartition.GetTabletId();
if (seen.find(tabletId) != seen.end()) {
const auto& newPartition = Record.GetPartitions(index);
const ui64 tabletId = newPartition.GetTabletId();
const ui32 followerId = newPartition.GetFollowerId();
if (seen.contains({tabletId, followerId})) {
++index;
continue;
}
if ((*topIt)->GetCPUCores() >= newPartition.GetCPUCores()) {
result.emplace_back(std::move(*topIt++));
seen.insert(topTabletId);
seen.insert({topTabletId, topFollowerId});
} else {
copyNewPartition();
}
}
}

for (; topIt != top.end(); ++topIt) {
auto topTabletId = (*topIt)->GetTabletId();
if (seen.find(topTabletId) != seen.end()) {
const ui64 topTabletId = (*topIt)->GetTabletId();
const ui64 topFollowerId = (*topIt)->GetFollowerId();
if (seen.contains({topTabletId, topFollowerId})) {
continue;
}
db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, topTabletId).Delete();
db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, topTabletId, topFollowerId).Delete();
}

top.swap(result);
Expand All @@ -108,6 +116,8 @@ void TSysViewProcessor::Handle(TEvSysView::TEvSendTopPartitions::TPtr& ev) {
auto timeUs = record.GetTimeUs();
auto partitionIntervalEnd = IntervalEnd + TotalInterval;

SVLOG_T("TEvSysView::TEvSendTopPartitions: " << " record " << record.ShortDebugString());

if (timeUs < IntervalEnd.MicroSeconds() || timeUs >= partitionIntervalEnd.MicroSeconds()) {
SVLOG_W("[" << TabletID() << "] TEvSendTopPartitions, time mismath: "
<< ", partition interval end# " << partitionIntervalEnd
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/sys_view/ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString
return subdomain;
}

TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 pqTabletsN, bool enableSVP) {
TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, const TTestEnvSettings& settings) {
auto mbusPort = PortManager.GetPort();
auto grpcPort = PortManager.GetPort();

Expand All @@ -44,16 +44,18 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableBackgroundCompaction(false);
featureFlags.SetEnableResourcePools(true);
featureFlags.SetEnableFollowerStats(true);
Settings->SetFeatureFlags(featureFlags);

Settings->SetEnablePersistentQueryStats(enableSVP);
Settings->SetEnableDbCounters(enableSVP);
Settings->SetEnablePersistentQueryStats(settings.EnableSVP);
Settings->SetEnableDbCounters(settings.EnableSVP);
Settings->SetEnableForceFollowers(settings.EnableForceFollowers);

NKikimrConfig::TAppConfig appConfig;
*appConfig.MutableFeatureFlags() = Settings->FeatureFlags;
Settings->SetAppConfig(appConfig);

for (ui32 i : xrange(storagePools)) {
for (ui32 i : xrange(settings.StoragePools)) {
TString poolName = Sprintf("test%d", i);
Settings->AddStoragePool(poolName, TString("/Root:") + poolName, 2);
}
Expand All @@ -74,9 +76,9 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32

Client->InitRootScheme("Root");

if (pqTabletsN) {
if (settings.PqTabletsN) {
NKikimr::NPQ::FillPQConfig(Settings->PQConfig, "/Root/PQ", true);
PqTabletIds = Server->StartPQTablets(pqTabletsN);
PqTabletIds = Server->StartPQTablets(settings.PqTabletsN);
}

Endpoint = "localhost:" + ToString(grpcPort);
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/sys_view/ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings(
NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(
const TString &name, const TStoragePools &pools = {});

struct TTestEnvSettings {
ui32 StoragePools = 0;
ui32 PqTabletsN = 0;
bool EnableSVP = false;
bool EnableForceFollowers = false;
};

class TTestEnv {
public:
class TDisableSourcesTag {};
static TDisableSourcesTag DisableSourcesTag;

public:
TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, ui32 storagePools = 0,
ui32 pqTabletsN = 0, bool enableSVP = false);
TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, const TTestEnvSettings& settings = {});

~TTestEnv();

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/sys_view/ut_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void CreateDatabasesAndTables(TTestEnv& env) {
Y_UNIT_TEST_SUITE(DbCounters) {

Y_UNIT_TEST(TabletsSimple) {
TTestEnv env(1, 2, 0, 0, true);
TTestEnv env(1, 2, {.EnableSVP = true});
CreateDatabasesAndTables(env);

for (size_t iter = 0; iter < 30; ++iter) {
Expand Down
Loading

0 comments on commit 6514a56

Please sign in to comment.