Skip to content

Commit

Permalink
Statistics: Pass ColumnTags and Types (#7397)
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Aug 2, 2024
1 parent e9a7b28 commit 281d32b
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 21 deletions.
10 changes: 9 additions & 1 deletion ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,12 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {

ForceTraversalOperationId = operation.OperationId;
ForceTraversalCookie = operation.Cookie;
ForceTraversalColumnTags = operation.ColumnTags;
ForceTraversalTypes = operation.Types;
ForceTraversalReplyToActorId = operation.ReplyToActorId;

PersistForceTraversal(db);

db.Table<Schema::ForceTraversals>().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete();
ForceTraversals.pop_front();
} else if (!ScheduleTraversalsByTime.Empty()){
Expand Down Expand Up @@ -672,9 +676,11 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer());
}

void TStatisticsAggregator::PersistTraversalOperationIdAndCookie(NIceDb::TNiceDb& db) {
void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId));
PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ToString(ForceTraversalCookie));
PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags));
PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes));
}

void TStatisticsAggregator::PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db) {
Expand All @@ -689,6 +695,8 @@ void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
ForceTraversalOperationId = 0;
ForceTraversalCookie = 0;
TraversalTableId.PathId = TPathId();
ForceTraversalColumnTags.clear();
ForceTraversalTypes.clear();
TraversalStartTime = TInstant::MicroSeconds(0);
PersistTraversal(db);

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);
void PersistTraversal(NIceDb::TNiceDb& db);
void PersistTraversalOperationIdAndCookie(NIceDb::TNiceDb& db);
void PersistForceTraversal(NIceDb::TNiceDb& db);
void PersistStartKey(NIceDb::TNiceDb& db);
void PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db);
void PersistGlobalTraversalRound(NIceDb::TNiceDb& db);
Expand Down Expand Up @@ -308,6 +308,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

ui64 ForceTraversalOperationId = 0;
ui64 ForceTraversalCookie = 0;
TString ForceTraversalColumnTags;
TString ForceTraversalTypes;
TTableId TraversalTableId;
bool TraversalIsColumnTable = false;
TSerializedCellVec TraversalStartKey;
Expand All @@ -328,6 +330,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
ui64 OperationId = 0;
ui64 Cookie = 0;
TPathId PathId;
TString ColumnTags;
TString Types;
TActorId ReplyToActorId;
};
std::list<TForceTraversal> ForceTraversals;
Expand Down
22 changes: 14 additions & 8 deletions ydb/core/statistics/aggregator/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ struct TAggregatorSchema : NIceDb::Schema {
struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {};
struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {};
struct Cookie : Column<4, NScheme::NTypeIds::Uint64> {};
struct ColumnTags : Column<5, NScheme::NTypeIds::String> {};
struct Types : Column<6, NScheme::NTypeIds::String> {};

using TKey = TableKey<OperationId, OwnerId, LocalPathId>;
using TColumns = TableColumns<
OperationId,
OwnerId,
LocalPathId,
Cookie
Cookie,
ColumnTags,
Types
>;
};

Expand All @@ -77,13 +81,15 @@ struct TAggregatorSchema : NIceDb::Schema {
static constexpr ui64 SysParam_Database = 1;
static constexpr ui64 SysParam_TraversalStartKey = 2;
static constexpr ui64 SysParam_ForceTraversalOperationId = 3;
static constexpr ui64 SysParam_ForceTraversalCookie = 4;
static constexpr ui64 SysParam_TraversalTableOwnerId = 5;
static constexpr ui64 SysParam_TraversalTableLocalPathId = 6;
static constexpr ui64 SysParam_TraversalStartTime = 7;
static constexpr ui64 SysParam_NextForceTraversalOperationId = 8;
static constexpr ui64 SysParam_TraversalIsColumnTable = 9;
static constexpr ui64 SysParam_GlobalTraversalRound = 10;
static constexpr ui64 SysParam_TraversalTableOwnerId = 4;
static constexpr ui64 SysParam_TraversalTableLocalPathId = 5;
static constexpr ui64 SysParam_ForceTraversalCookie = 6;
static constexpr ui64 SysParam_ForceTraversalColumnTags = 7;
static constexpr ui64 SysParam_ForceTraversalTypes = 8;
static constexpr ui64 SysParam_TraversalStartTime = 9;
static constexpr ui64 SysParam_NextForceTraversalOperationId = 10;
static constexpr ui64 SysParam_TraversalIsColumnTable = 11;
static constexpr ui64 SysParam_GlobalTraversalRound = 12;
};

} // NKikimr::NStat
13 changes: 10 additions & 3 deletions ydb/core/statistics/aggregator/tx_analyze_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/tx/datashard/datashard.h>

#include <util/string/vector.h>

namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
Expand All @@ -24,12 +26,13 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
}

NIceDb::TNiceDb db(txc.DB);
Self->PersistTraversalOperationIdAndCookie(db);

const ui64 cookie = Record.GetCookie();
const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");

for (const auto& table : Record.GetTables()) {
const TPathId pathId = PathIdFromPathId(table.GetPathId());
const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");

// drop request with the same cookie and path from this sender
if (std::any_of(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
Expand All @@ -46,16 +49,20 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
.OperationId = Self->NextForceTraversalOperationId,
.Cookie = cookie,
.PathId = pathId,
.ColumnTags = columnTags,
.Types = types,
.ReplyToActorId = ReplyToActorId
};
Self->ForceTraversals.emplace_back(operation);


db.Table<Schema::ForceTraversals>().Key(Self->NextForceTraversalOperationId, pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::ForceTraversals::OperationId>(Self->NextForceTraversalOperationId),
NIceDb::TUpdate<Schema::ForceTraversals::OwnerId>(pathId.OwnerId),
NIceDb::TUpdate<Schema::ForceTraversals::LocalPathId>(pathId.LocalPathId),
NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie));
NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie),
NIceDb::TUpdate<Schema::ForceTraversals::ColumnTags>(columnTags),
NIceDb::TUpdate<Schema::ForceTraversals::Types>(types)
);
}

Self->PersistNextForceTraversalOperationId(db);
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/statistics/aggregator/tx_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal table local path id: "
<< Self->TraversalTableId.PathId.LocalPathId);
break;
case Schema::SysParam_ForceTraversalColumnTags: {
Self->ForceTraversalColumnTags = value;
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal columns tags: " << value);
break;
}
case Schema::SysParam_ForceTraversalTypes: {
Self->ForceTraversalTypes = value;
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal types: " << value);
break;
}
case Schema::SysParam_TraversalStartTime: {
auto us = FromString<ui64>(value);
Self->TraversalStartTime = TInstant::MicroSeconds(us);
Expand Down Expand Up @@ -207,13 +217,17 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
ui64 ownerId = rowset.GetValue<Schema::ForceTraversals::OwnerId>();
ui64 localPathId = rowset.GetValue<Schema::ForceTraversals::LocalPathId>();
ui64 cookie = rowset.GetValue<Schema::ForceTraversals::Cookie>();
TString columnTags = rowset.GetValue<Schema::ForceTraversals::ColumnTags>();
TString types = rowset.GetValue<Schema::ForceTraversals::Types>();

auto pathId = TPathId(ownerId, localPathId);

TForceTraversal operation {
.OperationId = operationId,
.Cookie = cookie,
.PathId = pathId,
.ColumnTags = columnTags,
.Types = types,
.ReplyToActorId = {}
};
Self->ForceTraversals.emplace_back(operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <ydb/core/protos/hive.pb.h>
#include <ydb/core/statistics/service/service.h>

#include <util/string/vector.h>

namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
Expand Down Expand Up @@ -34,6 +36,9 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
auto& outRecord = Request->Record;

PathIdFromPathId(Self->TraversalTableId.PathId, outRecord.MutablePathId());

TVector<ui32> columnTags = Scan<ui32>(SplitString(Self->ForceTraversalColumnTags, ","));
outRecord.MutableColumnTags()->Add(columnTags.begin(), columnTags.end());

auto distribution = Self->TabletsForReqDistribution;
for (auto& inNode : Record.GetNodes()) {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
Analyze(runtime, {tableInfo.PathId}, tableInfo.SaTabletId);
}

Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
auto tableInfo = CreateDatabaseTables(env, 1, 1)[0];

Analyze(runtime, {{tableInfo.PathId, {1, 2}}}, tableInfo.SaTabletId);
}

Y_UNIT_TEST(AnalyzeTwoColumnTables) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
Expand Down
28 changes: 22 additions & 6 deletions ydb/core/statistics/ut_common/ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,29 @@ void ValidateCountMinAbsense(TTestActorRuntime& runtime, TPathId pathId) {
UNIT_ASSERT(!rsp.Success);
}

void Analyze(TTestActorRuntime& runtime, const std::vector<TPathId>& pathIds, ui64 saTabletId) {
TAnalyzedTable::TAnalyzedTable(const TPathId& pathId)
: PathId(pathId)
{}

TAnalyzedTable::TAnalyzedTable(const TPathId& pathId, const std::vector<ui32>& columnTags)
: PathId(pathId)
, ColumnTags(columnTags)
{}

void TAnalyzedTable::ToProto(NKikimrStat::TTable& tableProto) const {
PathIdFromPathId(PathId, tableProto.MutablePathId());
tableProto.MutableColumnTags()->Add(ColumnTags.begin(), ColumnTags.end());
}


void Analyze(TTestActorRuntime& runtime, const std::vector<TAnalyzedTable>& tables, ui64 saTabletId) {
const ui64 cookie = 555;
auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
auto& record = ev->Record;
NKikimrStat::TEvAnalyze& record = ev->Record;
record.SetCookie(cookie);
for (const TPathId& pathId : pathIds)
PathIdFromPathId(pathId, record.AddTables()->MutablePathId());
record.AddTypes(NKikimrStat::EColumnStatisticType::TYPE_COUNT_MIN_SKETCH);
for (const TAnalyzedTable& table : tables)
table.ToProto(*record.AddTables());

auto sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(saTabletId, sender, ev.release());
Expand All @@ -338,10 +354,10 @@ void Analyze(TTestActorRuntime& runtime, const std::vector<TPathId>& pathIds, ui
UNIT_ASSERT_VALUES_EQUAL(evResponse->Get()->Record.GetCookie(), cookie);
}

void AnalyzeTable(TTestActorRuntime& runtime, const TPathId& pathId, ui64 shardTabletId) {
void AnalyzeTable(TTestActorRuntime& runtime, const TAnalyzedTable& table, ui64 shardTabletId) {
auto ev = std::make_unique<TEvStatistics::TEvAnalyzeTable>();
auto& record = ev->Record;
PathIdFromPathId(pathId, record.MutableTable()->MutablePathId());
table.ToProto(*record.MutableTable());
record.AddTypes(NKikimrStat::EColumnStatisticType::TYPE_COUNT_MIN_SKETCH);

auto sender = runtime.AllocateEdgeActor();
Expand Down
17 changes: 15 additions & 2 deletions ydb/core/statistics/ut_common/ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#include <ydb/core/testlib/test_client.h>
#include <library/cpp/testing/unittest/registar.h>

namespace NKikimrStat {
class TTable;
}

namespace NKikimr {
namespace NStat {

Expand Down Expand Up @@ -74,8 +78,17 @@ std::shared_ptr<TCountMinSketch> ExtractCountMin(TTestActorRuntime& runtime, TPa
void ValidateCountMin(TTestActorRuntime& runtime, TPathId pathId);
void ValidateCountMinAbsense(TTestActorRuntime& runtime, TPathId pathId);

void Analyze(TTestActorRuntime& runtime, const std::vector<TPathId>& pathIds, ui64 saTabletId);
void AnalyzeTable(TTestActorRuntime& runtime, const TPathId& pathId, ui64 shardTabletId);
struct TAnalyzedTable {
TPathId PathId;
std::vector<ui32> ColumnTags;

TAnalyzedTable(const TPathId& pathId);
TAnalyzedTable(const TPathId& pathId, const std::vector<ui32>& columnTags);
void ToProto(NKikimrStat::TTable& tableProto) const;
};

void Analyze(TTestActorRuntime& runtime, const std::vector<TAnalyzedTable>& table, ui64 saTabletId);
void AnalyzeTable(TTestActorRuntime& runtime, const TAnalyzedTable& table, ui64 shardTabletId);

} // namespace NStat
} // namespace NKikimr

0 comments on commit 281d32b

Please sign in to comment.