Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Statistics: Pass ColumnTags and Types #7397

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,12 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {

ForceTraversalOperationId = operation.OperationId;
ForceTraversalCookie = operation.Cookie;
ForceTraversalColumnTags = operation.ColumnTags;
ForceTraversalTypes = operation.Types;
ForceTraversalReplyToActorId = operation.ReplyToActorId;

PersistForceTraversal(db);

db.Table<Schema::ForceTraversals>().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete();
ForceTraversals.pop_front();
} else if (!ScheduleTraversalsByTime.Empty()){
Expand Down Expand Up @@ -672,9 +676,11 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer());
}

void TStatisticsAggregator::PersistTraversalOperationIdAndCookie(NIceDb::TNiceDb& db) {
void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId));
PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ToString(ForceTraversalCookie));
PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags));
PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes));
}

void TStatisticsAggregator::PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db) {
Expand All @@ -689,6 +695,8 @@ void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
ForceTraversalOperationId = 0;
ForceTraversalCookie = 0;
TraversalTableId.PathId = TPathId();
ForceTraversalColumnTags.clear();
ForceTraversalTypes.clear();
TraversalStartTime = TInstant::MicroSeconds(0);
PersistTraversal(db);

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);
void PersistTraversal(NIceDb::TNiceDb& db);
void PersistTraversalOperationIdAndCookie(NIceDb::TNiceDb& db);
void PersistForceTraversal(NIceDb::TNiceDb& db);
void PersistStartKey(NIceDb::TNiceDb& db);
void PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db);
void PersistGlobalTraversalRound(NIceDb::TNiceDb& db);
Expand Down Expand Up @@ -308,6 +308,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

ui64 ForceTraversalOperationId = 0;
ui64 ForceTraversalCookie = 0;
TString ForceTraversalColumnTags;
TString ForceTraversalTypes;
TTableId TraversalTableId;
bool TraversalIsColumnTable = false;
TSerializedCellVec TraversalStartKey;
Expand All @@ -328,6 +330,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
ui64 OperationId = 0;
ui64 Cookie = 0;
TPathId PathId;
TString ColumnTags;
TString Types;
TActorId ReplyToActorId;
};
std::list<TForceTraversal> ForceTraversals;
Expand Down
22 changes: 14 additions & 8 deletions ydb/core/statistics/aggregator/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ struct TAggregatorSchema : NIceDb::Schema {
struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {};
struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {};
struct Cookie : Column<4, NScheme::NTypeIds::Uint64> {};
struct ColumnTags : Column<5, NScheme::NTypeIds::String> {};
struct Types : Column<6, NScheme::NTypeIds::String> {};

using TKey = TableKey<OperationId, OwnerId, LocalPathId>;
using TColumns = TableColumns<
OperationId,
OwnerId,
LocalPathId,
Cookie
Cookie,
ColumnTags,
Types
>;
};

Expand All @@ -77,13 +81,15 @@ struct TAggregatorSchema : NIceDb::Schema {
static constexpr ui64 SysParam_Database = 1;
static constexpr ui64 SysParam_TraversalStartKey = 2;
static constexpr ui64 SysParam_ForceTraversalOperationId = 3;
static constexpr ui64 SysParam_ForceTraversalCookie = 4;
static constexpr ui64 SysParam_TraversalTableOwnerId = 5;
static constexpr ui64 SysParam_TraversalTableLocalPathId = 6;
static constexpr ui64 SysParam_TraversalStartTime = 7;
static constexpr ui64 SysParam_NextForceTraversalOperationId = 8;
static constexpr ui64 SysParam_TraversalIsColumnTable = 9;
static constexpr ui64 SysParam_GlobalTraversalRound = 10;
static constexpr ui64 SysParam_TraversalTableOwnerId = 4;
static constexpr ui64 SysParam_TraversalTableLocalPathId = 5;
static constexpr ui64 SysParam_ForceTraversalCookie = 6;
static constexpr ui64 SysParam_ForceTraversalColumnTags = 7;
static constexpr ui64 SysParam_ForceTraversalTypes = 8;
static constexpr ui64 SysParam_TraversalStartTime = 9;
static constexpr ui64 SysParam_NextForceTraversalOperationId = 10;
static constexpr ui64 SysParam_TraversalIsColumnTable = 11;
static constexpr ui64 SysParam_GlobalTraversalRound = 12;
};

} // NKikimr::NStat
13 changes: 10 additions & 3 deletions ydb/core/statistics/aggregator/tx_analyze_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/tx/datashard/datashard.h>

#include <util/string/vector.h>

namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
Expand All @@ -24,12 +26,13 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
}

NIceDb::TNiceDb db(txc.DB);
Self->PersistTraversalOperationIdAndCookie(db);

const ui64 cookie = Record.GetCookie();
const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");

for (const auto& table : Record.GetTables()) {
const TPathId pathId = PathIdFromPathId(table.GetPathId());
const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");

// drop request with the same cookie and path from this sender
if (std::any_of(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
Expand All @@ -46,16 +49,20 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
.OperationId = Self->NextForceTraversalOperationId,
.Cookie = cookie,
.PathId = pathId,
.ColumnTags = columnTags,
.Types = types,
.ReplyToActorId = ReplyToActorId
};
Self->ForceTraversals.emplace_back(operation);


db.Table<Schema::ForceTraversals>().Key(Self->NextForceTraversalOperationId, pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::ForceTraversals::OperationId>(Self->NextForceTraversalOperationId),
NIceDb::TUpdate<Schema::ForceTraversals::OwnerId>(pathId.OwnerId),
NIceDb::TUpdate<Schema::ForceTraversals::LocalPathId>(pathId.LocalPathId),
NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie));
NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie),
NIceDb::TUpdate<Schema::ForceTraversals::ColumnTags>(columnTags),
NIceDb::TUpdate<Schema::ForceTraversals::Types>(types)
);
}

Self->PersistNextForceTraversalOperationId(db);
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/statistics/aggregator/tx_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal table local path id: "
<< Self->TraversalTableId.PathId.LocalPathId);
break;
case Schema::SysParam_ForceTraversalColumnTags: {
Self->ForceTraversalColumnTags = value;
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal columns tags: " << value);
break;
}
case Schema::SysParam_ForceTraversalTypes: {
Self->ForceTraversalTypes = value;
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal types: " << value);
break;
}
case Schema::SysParam_TraversalStartTime: {
auto us = FromString<ui64>(value);
Self->TraversalStartTime = TInstant::MicroSeconds(us);
Expand Down Expand Up @@ -207,13 +217,17 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
ui64 ownerId = rowset.GetValue<Schema::ForceTraversals::OwnerId>();
ui64 localPathId = rowset.GetValue<Schema::ForceTraversals::LocalPathId>();
ui64 cookie = rowset.GetValue<Schema::ForceTraversals::Cookie>();
TString columnTags = rowset.GetValue<Schema::ForceTraversals::ColumnTags>();
TString types = rowset.GetValue<Schema::ForceTraversals::Types>();

auto pathId = TPathId(ownerId, localPathId);

TForceTraversal operation {
.OperationId = operationId,
.Cookie = cookie,
.PathId = pathId,
.ColumnTags = columnTags,
.Types = types,
.ReplyToActorId = {}
};
Self->ForceTraversals.emplace_back(operation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <ydb/core/protos/hive.pb.h>
#include <ydb/core/statistics/service/service.h>

#include <util/string/vector.h>

namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
Expand Down Expand Up @@ -34,6 +36,9 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
auto& outRecord = Request->Record;

PathIdFromPathId(Self->TraversalTableId.PathId, outRecord.MutablePathId());

TVector<ui32> columnTags = Scan<ui32>(SplitString(Self->ForceTraversalColumnTags, ","));
outRecord.MutableColumnTags()->Add(columnTags.begin(), columnTags.end());

auto distribution = Self->TabletsForReqDistribution;
for (auto& inNode : Record.GetNodes()) {
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
Analyze(runtime, {tableInfo.PathId}, tableInfo.SaTabletId);
}

Y_UNIT_TEST(AnalyzeAnalyzeOneColumnTableSpecificColumns) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
auto tableInfo = CreateDatabaseTables(env, 1, 1)[0];

Analyze(runtime, {{tableInfo.PathId, {1, 2}}}, tableInfo.SaTabletId);
}

Y_UNIT_TEST(AnalyzeTwoColumnTables) {
TTestEnv env(1, 1);
auto& runtime = *env.GetServer().GetRuntime();
Expand Down
28 changes: 22 additions & 6 deletions ydb/core/statistics/ut_common/ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,29 @@ void ValidateCountMinAbsense(TTestActorRuntime& runtime, TPathId pathId) {
UNIT_ASSERT(!rsp.Success);
}

void Analyze(TTestActorRuntime& runtime, const std::vector<TPathId>& pathIds, ui64 saTabletId) {
TAnalyzedTable::TAnalyzedTable(const TPathId& pathId)
: PathId(pathId)
{}

TAnalyzedTable::TAnalyzedTable(const TPathId& pathId, const std::vector<ui32>& columnTags)
: PathId(pathId)
, ColumnTags(columnTags)
{}

void TAnalyzedTable::ToProto(NKikimrStat::TTable& tableProto) const {
PathIdFromPathId(PathId, tableProto.MutablePathId());
tableProto.MutableColumnTags()->Add(ColumnTags.begin(), ColumnTags.end());
}


void Analyze(TTestActorRuntime& runtime, const std::vector<TAnalyzedTable>& tables, ui64 saTabletId) {
const ui64 cookie = 555;
auto ev = std::make_unique<TEvStatistics::TEvAnalyze>();
auto& record = ev->Record;
NKikimrStat::TEvAnalyze& record = ev->Record;
record.SetCookie(cookie);
for (const TPathId& pathId : pathIds)
PathIdFromPathId(pathId, record.AddTables()->MutablePathId());
record.AddTypes(NKikimrStat::EColumnStatisticType::TYPE_COUNT_MIN_SKETCH);
for (const TAnalyzedTable& table : tables)
table.ToProto(*record.AddTables());

auto sender = runtime.AllocateEdgeActor();
runtime.SendToPipe(saTabletId, sender, ev.release());
Expand All @@ -338,10 +354,10 @@ void Analyze(TTestActorRuntime& runtime, const std::vector<TPathId>& pathIds, ui
UNIT_ASSERT_VALUES_EQUAL(evResponse->Get()->Record.GetCookie(), cookie);
}

void AnalyzeTable(TTestActorRuntime& runtime, const TPathId& pathId, ui64 shardTabletId) {
void AnalyzeTable(TTestActorRuntime& runtime, const TAnalyzedTable& table, ui64 shardTabletId) {
auto ev = std::make_unique<TEvStatistics::TEvAnalyzeTable>();
auto& record = ev->Record;
PathIdFromPathId(pathId, record.MutableTable()->MutablePathId());
table.ToProto(*record.MutableTable());
record.AddTypes(NKikimrStat::EColumnStatisticType::TYPE_COUNT_MIN_SKETCH);

auto sender = runtime.AllocateEdgeActor();
Expand Down
17 changes: 15 additions & 2 deletions ydb/core/statistics/ut_common/ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
#include <ydb/core/testlib/test_client.h>
#include <library/cpp/testing/unittest/registar.h>

namespace NKikimrStat {
class TTable;
}

namespace NKikimr {
namespace NStat {

Expand Down Expand Up @@ -74,8 +78,17 @@ std::shared_ptr<TCountMinSketch> ExtractCountMin(TTestActorRuntime& runtime, TPa
void ValidateCountMin(TTestActorRuntime& runtime, TPathId pathId);
void ValidateCountMinAbsense(TTestActorRuntime& runtime, TPathId pathId);

void Analyze(TTestActorRuntime& runtime, const std::vector<TPathId>& pathIds, ui64 saTabletId);
void AnalyzeTable(TTestActorRuntime& runtime, const TPathId& pathId, ui64 shardTabletId);
struct TAnalyzedTable {
TPathId PathId;
std::vector<ui32> ColumnTags;

TAnalyzedTable(const TPathId& pathId);
TAnalyzedTable(const TPathId& pathId, const std::vector<ui32>& columnTags);
void ToProto(NKikimrStat::TTable& tableProto) const;
};

void Analyze(TTestActorRuntime& runtime, const std::vector<TAnalyzedTable>& table, ui64 saTabletId);
void AnalyzeTable(TTestActorRuntime& runtime, const TAnalyzedTable& table, ui64 shardTabletId);

} // namespace NStat
} // namespace NKikimr
Loading