From 281d32bba624beead82795787572e76ad4442bff Mon Sep 17 00:00:00 2001 From: azevaykin <145343289+azevaykin@users.noreply.github.com> Date: Fri, 2 Aug 2024 18:52:47 +0300 Subject: [PATCH] Statistics: Pass ColumnTags and Types (#7397) --- .../statistics/aggregator/aggregator_impl.cpp | 10 ++++++- .../statistics/aggregator/aggregator_impl.h | 6 +++- ydb/core/statistics/aggregator/schema.h | 22 +++++++++------ .../aggregator/tx_analyze_table.cpp | 13 +++++++-- ydb/core/statistics/aggregator/tx_init.cpp | 14 ++++++++++ .../tx_response_tablet_distribution.cpp | 5 ++++ .../aggregator/ut/ut_analyze_columnshard.cpp | 8 ++++++ ydb/core/statistics/ut_common/ut_common.cpp | 28 +++++++++++++++---- ydb/core/statistics/ut_common/ut_common.h | 17 +++++++++-- 9 files changed, 102 insertions(+), 21 deletions(-) diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index 9e1e9ee6ae96..263c83ae66f7 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -587,8 +587,12 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) { ForceTraversalOperationId = operation.OperationId; ForceTraversalCookie = operation.Cookie; + ForceTraversalColumnTags = operation.ColumnTags; + ForceTraversalTypes = operation.Types; ForceTraversalReplyToActorId = operation.ReplyToActorId; + PersistForceTraversal(db); + db.Table().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete(); ForceTraversals.pop_front(); } else if (!ScheduleTraversalsByTime.Empty()){ @@ -672,9 +676,11 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) { PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer()); } -void TStatisticsAggregator::PersistTraversalOperationIdAndCookie(NIceDb::TNiceDb& db) { +void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) { PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId)); PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ToString(ForceTraversalCookie)); + PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags)); + PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes)); } void TStatisticsAggregator::PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db) { @@ -689,6 +695,8 @@ void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) { ForceTraversalOperationId = 0; ForceTraversalCookie = 0; TraversalTableId.PathId = TPathId(); + ForceTraversalColumnTags.clear(); + ForceTraversalTypes.clear(); TraversalStartTime = TInstant::MicroSeconds(0); PersistTraversal(db); diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h index 42a5d1572f66..ca8872dcb4e5 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.h +++ b/ydb/core/statistics/aggregator/aggregator_impl.h @@ -145,7 +145,7 @@ class TStatisticsAggregator : public TActor, public NTabl void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value); void PersistTraversal(NIceDb::TNiceDb& db); - void PersistTraversalOperationIdAndCookie(NIceDb::TNiceDb& db); + void PersistForceTraversal(NIceDb::TNiceDb& db); void PersistStartKey(NIceDb::TNiceDb& db); void PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db); void PersistGlobalTraversalRound(NIceDb::TNiceDb& db); @@ -308,6 +308,8 @@ class TStatisticsAggregator : public TActor, public NTabl ui64 ForceTraversalOperationId = 0; ui64 ForceTraversalCookie = 0; + TString ForceTraversalColumnTags; + TString ForceTraversalTypes; TTableId TraversalTableId; bool TraversalIsColumnTable = false; TSerializedCellVec TraversalStartKey; @@ -328,6 +330,8 @@ class TStatisticsAggregator : public TActor, public NTabl ui64 OperationId = 0; ui64 Cookie = 0; TPathId PathId; + TString ColumnTags; + TString Types; TActorId ReplyToActorId; }; std::list ForceTraversals; diff --git a/ydb/core/statistics/aggregator/schema.h b/ydb/core/statistics/aggregator/schema.h index f7f9d4ac3016..21eb7d4c643a 100644 --- a/ydb/core/statistics/aggregator/schema.h +++ b/ydb/core/statistics/aggregator/schema.h @@ -51,13 +51,17 @@ struct TAggregatorSchema : NIceDb::Schema { struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {}; struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {}; struct Cookie : Column<4, NScheme::NTypeIds::Uint64> {}; + struct ColumnTags : Column<5, NScheme::NTypeIds::String> {}; + struct Types : Column<6, NScheme::NTypeIds::String> {}; using TKey = TableKey; using TColumns = TableColumns< OperationId, OwnerId, LocalPathId, - Cookie + Cookie, + ColumnTags, + Types >; }; @@ -77,13 +81,15 @@ struct TAggregatorSchema : NIceDb::Schema { static constexpr ui64 SysParam_Database = 1; static constexpr ui64 SysParam_TraversalStartKey = 2; static constexpr ui64 SysParam_ForceTraversalOperationId = 3; - static constexpr ui64 SysParam_ForceTraversalCookie = 4; - static constexpr ui64 SysParam_TraversalTableOwnerId = 5; - static constexpr ui64 SysParam_TraversalTableLocalPathId = 6; - static constexpr ui64 SysParam_TraversalStartTime = 7; - static constexpr ui64 SysParam_NextForceTraversalOperationId = 8; - static constexpr ui64 SysParam_TraversalIsColumnTable = 9; - static constexpr ui64 SysParam_GlobalTraversalRound = 10; + static constexpr ui64 SysParam_TraversalTableOwnerId = 4; + static constexpr ui64 SysParam_TraversalTableLocalPathId = 5; + static constexpr ui64 SysParam_ForceTraversalCookie = 6; + static constexpr ui64 SysParam_ForceTraversalColumnTags = 7; + static constexpr ui64 SysParam_ForceTraversalTypes = 8; + static constexpr ui64 SysParam_TraversalStartTime = 9; + static constexpr ui64 SysParam_NextForceTraversalOperationId = 10; + static constexpr ui64 SysParam_TraversalIsColumnTable = 11; + static constexpr ui64 SysParam_GlobalTraversalRound = 12; }; } // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/tx_analyze_table.cpp b/ydb/core/statistics/aggregator/tx_analyze_table.cpp index 839f7fd52a4b..81a41a60b8d7 100644 --- a/ydb/core/statistics/aggregator/tx_analyze_table.cpp +++ b/ydb/core/statistics/aggregator/tx_analyze_table.cpp @@ -2,6 +2,8 @@ #include +#include + namespace NKikimr::NStat { struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase { @@ -24,12 +26,13 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase { } NIceDb::TNiceDb db(txc.DB); - Self->PersistTraversalOperationIdAndCookie(db); const ui64 cookie = Record.GetCookie(); + const TString types = JoinVectorIntoString(TVector(Record.GetTypes().begin(), Record.GetTypes().end()), ","); for (const auto& table : Record.GetTables()) { const TPathId pathId = PathIdFromPathId(table.GetPathId()); + const TString columnTags = JoinVectorIntoString(TVector{table.GetColumnTags().begin(),table.GetColumnTags().end()},","); // drop request with the same cookie and path from this sender if (std::any_of(Self->ForceTraversals.begin(), Self->ForceTraversals.end(), @@ -46,16 +49,20 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase { .OperationId = Self->NextForceTraversalOperationId, .Cookie = cookie, .PathId = pathId, + .ColumnTags = columnTags, + .Types = types, .ReplyToActorId = ReplyToActorId }; Self->ForceTraversals.emplace_back(operation); - db.Table().Key(Self->NextForceTraversalOperationId, pathId.OwnerId, pathId.LocalPathId).Update( NIceDb::TUpdate(Self->NextForceTraversalOperationId), NIceDb::TUpdate(pathId.OwnerId), NIceDb::TUpdate(pathId.LocalPathId), - NIceDb::TUpdate(cookie)); + NIceDb::TUpdate(cookie), + NIceDb::TUpdate(columnTags), + NIceDb::TUpdate(types) + ); } Self->PersistNextForceTraversalOperationId(db); diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp index 1d68ae8054c3..b76935326bf8 100644 --- a/ydb/core/statistics/aggregator/tx_init.cpp +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -74,6 +74,16 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal table local path id: " << Self->TraversalTableId.PathId.LocalPathId); break; + case Schema::SysParam_ForceTraversalColumnTags: { + Self->ForceTraversalColumnTags = value; + SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal columns tags: " << value); + break; + } + case Schema::SysParam_ForceTraversalTypes: { + Self->ForceTraversalTypes = value; + SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal types: " << value); + break; + } case Schema::SysParam_TraversalStartTime: { auto us = FromString(value); Self->TraversalStartTime = TInstant::MicroSeconds(us); @@ -207,6 +217,8 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { ui64 ownerId = rowset.GetValue(); ui64 localPathId = rowset.GetValue(); ui64 cookie = rowset.GetValue(); + TString columnTags = rowset.GetValue(); + TString types = rowset.GetValue(); auto pathId = TPathId(ownerId, localPathId); @@ -214,6 +226,8 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { .OperationId = operationId, .Cookie = cookie, .PathId = pathId, + .ColumnTags = columnTags, + .Types = types, .ReplyToActorId = {} }; Self->ForceTraversals.emplace_back(operation); diff --git a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp index 71b1159ceb18..341b31964907 100644 --- a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp +++ b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp @@ -3,6 +3,8 @@ #include #include +#include + namespace NKikimr::NStat { struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase { @@ -34,6 +36,9 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase { auto& outRecord = Request->Record; PathIdFromPathId(Self->TraversalTableId.PathId, outRecord.MutablePathId()); + + TVector columnTags = Scan(SplitString(Self->ForceTraversalColumnTags, ",")); + outRecord.MutableColumnTags()->Add(columnTags.begin(), columnTags.end()); auto distribution = Self->TabletsForReqDistribution; for (auto& inNode : Record.GetNodes()) { diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp index 9bb5bb15b6aa..85107784bc83 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp @@ -51,6 +51,14 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { Analyze(runtime, {tableInfo.PathId}, tableInfo.SaTabletId); } + Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + auto tableInfo = CreateDatabaseTables(env, 1, 1)[0]; + + Analyze(runtime, {{tableInfo.PathId, {1, 2}}}, tableInfo.SaTabletId); + } + Y_UNIT_TEST(AnalyzeTwoColumnTables) { TTestEnv env(1, 1); auto& runtime = *env.GetServer().GetRuntime(); diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index 9639d17cda5d..8005f02a00da 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -323,13 +323,29 @@ void ValidateCountMinAbsense(TTestActorRuntime& runtime, TPathId pathId) { UNIT_ASSERT(!rsp.Success); } -void Analyze(TTestActorRuntime& runtime, const std::vector& pathIds, ui64 saTabletId) { +TAnalyzedTable::TAnalyzedTable(const TPathId& pathId) + : PathId(pathId) +{} + +TAnalyzedTable::TAnalyzedTable(const TPathId& pathId, const std::vector& columnTags) + : PathId(pathId) + , ColumnTags(columnTags) +{} + +void TAnalyzedTable::ToProto(NKikimrStat::TTable& tableProto) const { + PathIdFromPathId(PathId, tableProto.MutablePathId()); + tableProto.MutableColumnTags()->Add(ColumnTags.begin(), ColumnTags.end()); +} + + +void Analyze(TTestActorRuntime& runtime, const std::vector& tables, ui64 saTabletId) { const ui64 cookie = 555; auto ev = std::make_unique(); - auto& record = ev->Record; + NKikimrStat::TEvAnalyze& record = ev->Record; record.SetCookie(cookie); - for (const TPathId& pathId : pathIds) - PathIdFromPathId(pathId, record.AddTables()->MutablePathId()); + record.AddTypes(NKikimrStat::EColumnStatisticType::TYPE_COUNT_MIN_SKETCH); + for (const TAnalyzedTable& table : tables) + table.ToProto(*record.AddTables()); auto sender = runtime.AllocateEdgeActor(); runtime.SendToPipe(saTabletId, sender, ev.release()); @@ -338,10 +354,10 @@ void Analyze(TTestActorRuntime& runtime, const std::vector& pathIds, ui UNIT_ASSERT_VALUES_EQUAL(evResponse->Get()->Record.GetCookie(), cookie); } -void AnalyzeTable(TTestActorRuntime& runtime, const TPathId& pathId, ui64 shardTabletId) { +void AnalyzeTable(TTestActorRuntime& runtime, const TAnalyzedTable& table, ui64 shardTabletId) { auto ev = std::make_unique(); auto& record = ev->Record; - PathIdFromPathId(pathId, record.MutableTable()->MutablePathId()); + table.ToProto(*record.MutableTable()); record.AddTypes(NKikimrStat::EColumnStatisticType::TYPE_COUNT_MIN_SKETCH); auto sender = runtime.AllocateEdgeActor(); diff --git a/ydb/core/statistics/ut_common/ut_common.h b/ydb/core/statistics/ut_common/ut_common.h index f03bb695d30e..532ce0250e10 100644 --- a/ydb/core/statistics/ut_common/ut_common.h +++ b/ydb/core/statistics/ut_common/ut_common.h @@ -3,6 +3,10 @@ #include #include +namespace NKikimrStat { + class TTable; +} + namespace NKikimr { namespace NStat { @@ -74,8 +78,17 @@ std::shared_ptr ExtractCountMin(TTestActorRuntime& runtime, TPa void ValidateCountMin(TTestActorRuntime& runtime, TPathId pathId); void ValidateCountMinAbsense(TTestActorRuntime& runtime, TPathId pathId); -void Analyze(TTestActorRuntime& runtime, const std::vector& pathIds, ui64 saTabletId); -void AnalyzeTable(TTestActorRuntime& runtime, const TPathId& pathId, ui64 shardTabletId); +struct TAnalyzedTable { + TPathId PathId; + std::vector ColumnTags; + + TAnalyzedTable(const TPathId& pathId); + TAnalyzedTable(const TPathId& pathId, const std::vector& columnTags); + void ToProto(NKikimrStat::TTable& tableProto) const; +}; + +void Analyze(TTestActorRuntime& runtime, const std::vector& table, ui64 saTabletId); +void AnalyzeTable(TTestActorRuntime& runtime, const TAnalyzedTable& table, ui64 shardTabletId); } // namespace NStat } // namespace NKikimr