Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Datashard follower stats in sys_view top_partitions #10849

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@ struct Schema : NIceDb::Schema {
struct RowCount : Column<9, NScheme::NTypeIds::Uint64> {};
struct IndexSize : Column<10, NScheme::NTypeIds::Uint64> {};
struct InFlightTxCount : Column<11, NScheme::NTypeIds::Uint32> {};
struct FollowerId : Column<12, NScheme::NTypeIds::Uint32> {};

using TKey = TableKey<IntervalEnd, Rank>;
using TColumns = TableColumns<
Expand All @@ -483,7 +484,8 @@ struct Schema : NIceDb::Schema {
DataSize,
RowCount,
IndexSize,
InFlightTxCount>;
InFlightTxCount,
FollowerId>;
};

struct QuerySessions : Table<13> {
Expand Down
51 changes: 35 additions & 16 deletions ydb/core/sys_view/partition_stats/partition_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

auto& oldPartitions = table.Partitions;
std::unordered_map<TShardIdx, TPartitionStats> newPartitions;
std::unordered_set<TShardIdx> overloaded;
std::set<TOverloadedFollower> overloaded;

for (auto shardIdx : ev->Get()->ShardIndices) {
auto old = oldPartitions.find(shardIdx);
Expand All @@ -92,7 +92,7 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

for (const auto& followerStat: old->second.FollowerStats) {
if (IsPartitionOverloaded(followerStat.second))
overloaded.insert(shardIdx);
overloaded.insert({shardIdx, followerStat.first});
}
}
}
Expand Down Expand Up @@ -148,12 +148,13 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec

auto& followerStats = partitionStats.FollowerStats[followerId];

TOverloadedFollower overloadedFollower = {shardIdx, followerId};
if (IsPartitionOverloaded(newStats)) {
tables.Overloaded[pathId].insert(shardIdx);
tables.Overloaded[pathId].insert(overloadedFollower);
} else {
auto overloadedFound = tables.Overloaded.find(pathId);
if (overloadedFound != tables.Overloaded.end()) {
overloadedFound->second.erase(shardIdx);
overloadedFound->second.erase(overloadedFollower);
if (overloadedFound->second.empty()) {
tables.Overloaded.erase(pathId);
}
Expand Down Expand Up @@ -373,15 +374,16 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
struct TPartition {
TPathId PathId;
TShardIdx ShardIdx;
ui32 FollowerId;
double CPUCores;
};
std::vector<TPartition> sorted;

for (const auto& [pathId, shardIndices] : domainTables.Overloaded) {
for (const auto& shardIdx : shardIndices) {
for (const auto& [pathId, overloadedFollowers] : domainTables.Overloaded) {
for (const TOverloadedFollower& overloadedFollower : overloadedFollowers) {
const auto& table = domainTables.Stats[pathId];
const auto& partition = table.Partitions.at(shardIdx).FollowerStats.at(0);
sorted.emplace_back(TPartition{pathId, shardIdx, partition.GetCPUCores()});
const auto& partition = table.Partitions.at(overloadedFollower.ShardIdx).FollowerStats.at(overloadedFollower.FollowerId);
sorted.emplace_back(TPartition{pathId, overloadedFollower.ShardIdx, overloadedFollower.FollowerId, partition.GetCPUCores()});
}
}

Expand All @@ -395,18 +397,21 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
auto sendEvent = MakeHolder<TEvSysView::TEvSendTopPartitions>();
for (const auto& entry : sorted) {
const auto& table = domainTables.Stats[entry.PathId];
const auto& partition = table.Partitions.at(entry.ShardIdx).FollowerStats.at(0);
const auto& followerStats = table.Partitions.at(entry.ShardIdx).FollowerStats;
const auto& partition = followerStats.at(entry.FollowerId);
const auto& leaderPartition = followerStats.at(0);

auto* result = sendEvent->Record.AddPartitions();
result->SetTabletId(partition.GetTabletId());
result->SetPath(table.Path);
result->SetPeakTimeUs(nowUs);
result->SetCPUCores(partition.GetCPUCores());
result->SetNodeId(partition.GetNodeId());
result->SetDataSize(partition.GetDataSize());
result->SetRowCount(partition.GetRowCount());
result->SetIndexSize(partition.GetIndexSize());
result->SetDataSize(leaderPartition.GetDataSize());
result->SetRowCount(leaderPartition.GetRowCount());
result->SetIndexSize(leaderPartition.GetIndexSize());
result->SetInFlightTxCount(partition.GetInFlightTxCount());
result->SetFollowerId(partition.GetFollowerId());

if (++count == TOP_PARTITIONS_COUNT) {
break;
Expand Down Expand Up @@ -438,8 +443,7 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
}

bool IsPartitionOverloaded(const NKikimrSysView::TPartitionStats& stats) const {
return stats.GetCPUCores() >= OverloadedPartitionBound
&& !stats.GetFollowerId();
return stats.GetCPUCores() >= OverloadedPartitionBound;
}

private:
Expand All @@ -452,8 +456,10 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
double OverloadedPartitionBound = 0.7;
TDuration ProcessOverloadedInterval = TDuration::Seconds(15);

typedef ui32 TFollowerId;

struct TPartitionStats {
std::unordered_map<ui32, NKikimrSysView::TPartitionStats> FollowerStats;
std::unordered_map<TFollowerId, NKikimrSysView::TPartitionStats> FollowerStats;
};

struct TTableStats {
Expand All @@ -462,9 +468,22 @@ class TPartitionStatsCollector : public TActorBootstrapped<TPartitionStatsCollec
TString Path;
};

struct TOverloadedFollower {
TShardIdx ShardIdx;
TFollowerId FollowerId;

bool operator<(const TOverloadedFollower &other) const {
return std::tie(ShardIdx, FollowerId) < std::tie(other.ShardIdx, other.FollowerId);
}

bool operator==(const TOverloadedFollower &other) const {
return std::tie(ShardIdx, FollowerId) == std::tie(other.ShardIdx, other.FollowerId);
}
};

struct TDomainTables {
std::map<TPathId, TTableStats> Stats;
std::unordered_map<TPathId, std::unordered_set<TShardIdx>> Overloaded;
std::unordered_map<TPathId, std::set<TOverloadedFollower>> Overloaded;
};
std::unordered_map<TPathId, TDomainTables> DomainTables;

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/sys_view/partition_stats/top_partitions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ struct TTopPartitionsExtractorMap :
insert({S::InFlightTxCount::ColumnId, [] (const E& entry) {
return TCell::Make<ui32>(entry.GetInfo().GetInFlightTxCount());
}});
insert({S::FollowerId::ColumnId, [] (const E& entry) {
return TCell::Make<ui32>(entry.GetInfo().GetFollowerId());
}});
}
};

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/sys_view/processor/processor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ void TSysViewProcessor::Reset(NIceDb::TNiceDb& db, const TActorContext& ctx) {

auto clearPartitionTop = [&] (NKikimrSysView::EStatsType type, TPartitionTop& top) {
for (const auto& partition : top) {
db.Table<Schema::IntervalPartitionTops>().Key((ui32)type, partition->GetTabletId()).Delete();
db.Table<Schema::IntervalPartitionTops>().Key((ui32)type, partition->GetTabletId(), partition->GetFollowerId()).Delete();
}
top.clear();
};
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/sys_view/processor/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ struct TProcessorSchema : NIceDb::Schema {
};
struct TabletId : Column<2, NScheme::NTypeIds::Uint64> {};
struct Data : Column<3, NScheme::NTypeIds::String> {};
struct FollowerId : Column<4, NScheme::NTypeIds::Uint32> {};

using TKey = TableKey<TypeCol, TabletId>;
using TColumns = TableColumns<TypeCol, TabletId, Data>;
using TKey = TableKey<TypeCol, TabletId, FollowerId>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

На всякий случай упомяну, что это схема таблетки. Технически добавить новую колонку в таблицу можно, но с точки зрения совместимости может быть всё сложно, ведь новая колонка добавится с Null в качестве значения по-умолчанию. Учитывая насколько сложно потом работать с такой таблицей поддерживая совместимость с предыдущими версиями, я бы настоятельно рекомендовал завести для фолловеров отдельную таблицу.

using TColumns = TableColumns<TypeCol, TabletId, FollowerId, Data>;
};

#define RESULT_PARTITION_TABLE(TableName, TableID) \
Expand Down
42 changes: 26 additions & 16 deletions ydb/core/sys_view/processor/tx_top_partitions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase {
void ProcessTop(NIceDb::TNiceDb& db, NKikimrSysView::EStatsType statsType,
TPartitionTop& top)
{
using TPartitionTopKey = std::pair<ui64, ui32>;

TPartitionTop result;
result.reserve(TOP_PARTITIONS_COUNT);
std::unordered_set<ui64> seen;
std::unordered_set<TPartitionTopKey> seen;
size_t index = 0;
auto topIt = top.begin();

auto copyNewPartition = [&] () {
const auto& newPartition = Record.GetPartitions(index);
auto tabletId = newPartition.GetTabletId();
const ui64 tabletId = newPartition.GetTabletId();
const ui32 followerId = newPartition.GetFollowerId();

TString data;
Y_PROTOBUF_SUPPRESS_NODISCARD newPartition.SerializeToString(&data);
Expand All @@ -32,10 +35,10 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase {
partition->CopyFrom(newPartition);
result.emplace_back(std::move(partition));

db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, tabletId).Update(
db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, tabletId, followerId).Update(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Для совместимости со старыми версиями для followerId == 0 здесь должен быть NULL. Наша nice db кажется не позволяет указывать NULL в качестве значения ключевых колонок.

NIceDb::TUpdate<Schema::IntervalPartitionTops::Data>(data));

seen.insert(tabletId);
seen.insert({tabletId, followerId});
++index;
};

Expand All @@ -44,44 +47,49 @@ struct TSysViewProcessor::TTxTopPartitions : public TTxBase {
if (index == Record.PartitionsSize()) {
break;
}
auto tabletId = Record.GetPartitions(index).GetTabletId();
if (seen.find(tabletId) != seen.end()) {
const auto& partition = Record.GetPartitions(index);
const ui64 tabletId = partition.GetTabletId();
const ui32 followerId = partition.GetFollowerId();
if (seen.contains({tabletId, followerId})) {
++index;
continue;
}
copyNewPartition();
} else {
auto topTabletId = (*topIt)->GetTabletId();
if (seen.find(topTabletId) != seen.end()) {
const ui64 topTabletId = (*topIt)->GetTabletId();
const ui32 topFollowerId = (*topIt)->GetFollowerId();
if (seen.contains({topTabletId, topFollowerId})) {
++topIt;
continue;
}
if (index == Record.PartitionsSize()) {
result.emplace_back(std::move(*topIt++));
seen.insert(topTabletId);
seen.insert({topTabletId, topFollowerId});
continue;
}
auto& newPartition = Record.GetPartitions(index);
auto tabletId = newPartition.GetTabletId();
if (seen.find(tabletId) != seen.end()) {
const auto& newPartition = Record.GetPartitions(index);
const ui64 tabletId = newPartition.GetTabletId();
const ui32 followerId = newPartition.GetFollowerId();
if (seen.contains({tabletId, followerId})) {
++index;
continue;
}
if ((*topIt)->GetCPUCores() >= newPartition.GetCPUCores()) {
result.emplace_back(std::move(*topIt++));
seen.insert(topTabletId);
seen.insert({topTabletId, topFollowerId});
} else {
copyNewPartition();
}
}
}

for (; topIt != top.end(); ++topIt) {
auto topTabletId = (*topIt)->GetTabletId();
if (seen.find(topTabletId) != seen.end()) {
const ui64 topTabletId = (*topIt)->GetTabletId();
const ui64 topFollowerId = (*topIt)->GetFollowerId();
if (seen.contains({topTabletId, topFollowerId})) {
continue;
}
db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, topTabletId).Delete();
db.Table<Schema::IntervalPartitionTops>().Key((ui32)statsType, topTabletId, topFollowerId).Delete();
}

top.swap(result);
Expand All @@ -108,6 +116,8 @@ void TSysViewProcessor::Handle(TEvSysView::TEvSendTopPartitions::TPtr& ev) {
auto timeUs = record.GetTimeUs();
auto partitionIntervalEnd = IntervalEnd + TotalInterval;

SVLOG_T("TEvSysView::TEvSendTopPartitions: " << " record " << record.ShortDebugString());

if (timeUs < IntervalEnd.MicroSeconds() || timeUs >= partitionIntervalEnd.MicroSeconds()) {
SVLOG_W("[" << TabletID() << "] TEvSendTopPartitions, time mismath: "
<< ", partition interval end# " << partitionIntervalEnd
Expand Down
14 changes: 8 additions & 6 deletions ydb/core/sys_view/ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(const TString
return subdomain;
}

TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32 pqTabletsN, bool enableSVP) {
TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, const TTestEnvSettings& settings) {
auto mbusPort = PortManager.GetPort();
auto grpcPort = PortManager.GetPort();

Expand All @@ -44,16 +44,18 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32
NKikimrConfig::TFeatureFlags featureFlags;
featureFlags.SetEnableBackgroundCompaction(false);
featureFlags.SetEnableResourcePools(true);
featureFlags.SetEnableFollowerStats(true);
Settings->SetFeatureFlags(featureFlags);

Settings->SetEnablePersistentQueryStats(enableSVP);
Settings->SetEnableDbCounters(enableSVP);
Settings->SetEnablePersistentQueryStats(settings.EnableSVP);
Settings->SetEnableDbCounters(settings.EnableSVP);
Settings->SetEnableForceFollowers(settings.EnableForceFollowers);

NKikimrConfig::TAppConfig appConfig;
*appConfig.MutableFeatureFlags() = Settings->FeatureFlags;
Settings->SetAppConfig(appConfig);

for (ui32 i : xrange(storagePools)) {
for (ui32 i : xrange(settings.StoragePools)) {
TString poolName = Sprintf("test%d", i);
Settings->AddStoragePool(poolName, TString("/Root:") + poolName, 2);
}
Expand All @@ -74,9 +76,9 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, ui32

Client->InitRootScheme("Root");

if (pqTabletsN) {
if (settings.PqTabletsN) {
NKikimr::NPQ::FillPQConfig(Settings->PQConfig, "/Root/PQ", true);
PqTabletIds = Server->StartPQTablets(pqTabletsN);
PqTabletIds = Server->StartPQTablets(settings.PqTabletsN);
}

Endpoint = "localhost:" + ToString(grpcPort);
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/sys_view/ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ NKikimrSubDomains::TSubDomainSettings GetSubDomainDeclareSettings(
NKikimrSubDomains::TSubDomainSettings GetSubDomainDefaultSettings(
const TString &name, const TStoragePools &pools = {});

struct TTestEnvSettings {
ui32 StoragePools = 0;
ui32 PqTabletsN = 0;
bool EnableSVP = false;
bool EnableForceFollowers = false;
};

class TTestEnv {
public:
class TDisableSourcesTag {};
static TDisableSourcesTag DisableSourcesTag;

public:
TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, ui32 storagePools = 0,
ui32 pqTabletsN = 0, bool enableSVP = false);
TTestEnv(ui32 staticNodes = 1, ui32 dynamicNodes = 4, const TTestEnvSettings& settings = {});

~TTestEnv();

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/sys_view/ut_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void CreateDatabasesAndTables(TTestEnv& env) {
Y_UNIT_TEST_SUITE(DbCounters) {

Y_UNIT_TEST(TabletsSimple) {
TTestEnv env(1, 2, 0, 0, true);
TTestEnv env(1, 2, {.EnableSVP = true});
CreateDatabasesAndTables(env);

for (size_t iter = 0; iter < 30; ++iter) {
Expand Down
Loading
Loading