Skip to content

Commit

Permalink
Merge 4cdf536 into d34dcf3
Browse files Browse the repository at this point in the history
  • Loading branch information
azevaykin authored Aug 2, 2024
2 parents d34dcf3 + 4cdf536 commit 2a6b0d0
Show file tree
Hide file tree
Showing 16 changed files with 326 additions and 186 deletions.
9 changes: 4 additions & 5 deletions ydb/core/protos/counters_statistics_aggregator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ enum ETxTypes {
TXTYPE_NAVIGATE = 5 [(TxTypeOpts) = {Name: "TxNavigate"}];
TXTYPE_RESOLVE = 6 [(TxTypeOpts) = {Name: "TxResolve"}];
TXTYPE_SCAN_RESPONSE = 7 [(TxTypeOpts) = {Name: "TxScanResponse"}];
TXTYPE_SAVE_QUERY_RESPONSE = 8 [(TxTypeOpts) = {Name: "TxSaveQueryResponse"}];
TXTYPE_FINISH_TRAVERSAL = 8 [(TxTypeOpts) = {Name: "TxFinishTraversak"}];
TXTYPE_SCHEDULE_TRAVERSAL = 9 [(TxTypeOpts) = {Name: "TxScheduleTraversal"}];
TXTYPE_DELETE_QUERY_RESPONSE = 10 [(TxTypeOpts) = {Name: "TxDeleteQueryResponse"}];
TXTYPE_AGGR_STAT_RESPONSE = 11 [(TxTypeOpts) = {Name: "TxAggregateStatisticsResponse"}];
TXTYPE_RESPONSE_TABLET_DISTRIBUTION = 12 [(TxTypeOpts) = {Name: "TxResponseTabletDistribution"}];
TXTYPE_ACK_TIMEOUT = 13 [(TxTypeOpts) = {Name: "TxAckTimeout"}];
TXTYPE_AGGR_STAT_RESPONSE = 10 [(TxTypeOpts) = {Name: "TxAggregateStatisticsResponse"}];
TXTYPE_RESPONSE_TABLET_DISTRIBUTION = 11 [(TxTypeOpts) = {Name: "TxResponseTabletDistribution"}];
TXTYPE_ACK_TIMEOUT = 12 [(TxTypeOpts) = {Name: "TxAckTimeout"}];
}
54 changes: 39 additions & 15 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,8 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
if (TraversalTableId.PathId == pathId) {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
} else {
auto it = ForceTraversalsByPathId.find(pathId);
if (it != ForceTraversalsByPathId.end()) {
if (std::any_of(ForceTraversals.begin(), ForceTraversals.end(),
[&pathId](const TForceTraversal& elem) { return elem.PathId == pathId;})) {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
} else {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
Expand Down Expand Up @@ -579,17 +579,25 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {

TPathId pathId;

if (!ForceTraversals.Empty() && !LastTraversalWasForce) {
auto* operation = ForceTraversals.Front();
ReplyToActorIds.swap(operation->ReplyToActorIds);
pathId = operation->PathId;
if (!ForceTraversals.empty() && !LastTraversalWasForce) {
LastTraversalWasForce = true;

db.Table<Schema::ForceTraversals>().Key(operation->OperationId).Delete();
ForceTraversals.PopFront();
ForceTraversalsByPathId.erase(pathId);
TForceTraversal& operation = ForceTraversals.front();
pathId = operation.PathId;

LastTraversalWasForce = true;
ForceTraversalOperationId = operation.OperationId;
ForceTraversalCookie = operation.Cookie;
ForceTraversalColumnTags = operation.ColumnTags;
ForceTraversalTypes = operation.Types;
ForceTraversalReplyToActorId = operation.ReplyToActorId;

PersistForceTraversal(db);

db.Table<Schema::ForceTraversals>().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete();
ForceTraversals.pop_front();
} else if (!ScheduleTraversalsByTime.Empty()){
LastTraversalWasForce = false;

auto* oldestTable = ScheduleTraversalsByTime.Top();
if (TInstant::Now() < oldestTable->LastUpdateTime + ScheduleTraversalPeriod) {
SA_LOG_T("[" << TabletID() << "] A schedule traversal is skiped. "
Expand All @@ -598,7 +606,6 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
}

pathId = oldestTable->PathId;
LastTraversalWasForce = false;
} else {
SA_LOG_E("[" << TabletID() << "] No schedule traversal from schemeshard.");
return;
Expand All @@ -615,7 +622,7 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
TraversalTableId.PathId = pathId;

SA_LOG_D("[" << TabletID() << "] Start "
<< ( LastTraversalWasForce ? "force" : "schedule" )
<< LastTraversalWasForceString()
<< " traversal for path " << pathId);

StartTraversal(db);
Expand Down Expand Up @@ -649,6 +656,10 @@ void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) {
ResetTraversalState(db);
}

TString TStatisticsAggregator::LastTraversalWasForceString() const {
return LastTraversalWasForce ? "force" : "schedule";
}

void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
db.Table<Schema::SysParams>().Key(id).Update(
NIceDb::TUpdate<Schema::SysParams::Value>(value));
Expand All @@ -659,29 +670,42 @@ void TStatisticsAggregator::PersistTraversal(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_TraversalTableLocalPathId, ToString(TraversalTableId.PathId.LocalPathId));
PersistSysParam(db, Schema::SysParam_TraversalStartTime, ToString(TraversalStartTime.MicroSeconds()));
PersistSysParam(db, Schema::SysParam_TraversalIsColumnTable, ToString(TraversalIsColumnTable));
PersistSysParam(db, Schema::SysParam_TraversalIsColumnTable, ToString(TraversalIsColumnTable));
PersistSysParam(db, Schema::SysParam_TraversalIsColumnTable, ToString(TraversalIsColumnTable));
}

void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer());
}

void TStatisticsAggregator::PersistLastForceTraversalOperationId(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_LastForceTraversalOperationId, ToString(LastForceTraversalOperationId));
void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId));
PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ToString(ForceTraversalCookie));
PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags));
PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes));
}

void TStatisticsAggregator::PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_NextForceTraversalOperationId, ToString(NextForceTraversalOperationId));
}

void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_GlobalTraversalRound, ToString(GlobalTraversalRound));
}

void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
ForceTraversalOperationId = 0;
ForceTraversalCookie = 0;
TraversalTableId.PathId = TPathId();
ForceTraversalColumnTags.clear();
ForceTraversalTypes.clear();
TraversalStartTime = TInstant::MicroSeconds(0);
PersistTraversal(db);

TraversalStartKey = TSerializedCellVec();
PersistStartKey(db);

ReplyToActorIds.clear();
ForceTraversalReplyToActorId = {};

for (auto& [tag, _] : CountMinSketches) {
db.Table<Schema::ColumnStatistics>().Key(tag).Delete();
Expand Down
26 changes: 17 additions & 9 deletions ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
struct TTxNavigate;
struct TTxResolve;
struct TTxDatashardScanResponse;
struct TTxSaveQueryResponse;
struct TTxFinishTraversal;
struct TTxScheduleTrasersal;
struct TTxDeleteQueryResponse;
struct TTxAggregateStatisticsResponse;
struct TTxResponseTabletDistribution;
struct TTxAckTimeout;
Expand Down Expand Up @@ -146,15 +145,18 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);
void PersistTraversal(NIceDb::TNiceDb& db);
void PersistForceTraversal(NIceDb::TNiceDb& db);
void PersistStartKey(NIceDb::TNiceDb& db);
void PersistLastForceTraversalOperationId(NIceDb::TNiceDb& db);
void PersistNextForceTraversalOperationId(NIceDb::TNiceDb& db);
void PersistGlobalTraversalRound(NIceDb::TNiceDb& db);

void ResetTraversalState(NIceDb::TNiceDb& db);
void ScheduleNextTraversal(NIceDb::TNiceDb& db);
void StartTraversal(NIceDb::TNiceDb& db);
void FinishTraversal(NIceDb::TNiceDb& db);

TString LastTraversalWasForceString() const;

STFUNC(StateInit) {
StateInitImpl(ev, SelfId());
}
Expand Down Expand Up @@ -239,7 +241,7 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
std::queue<TEvStatistics::TEvRequestStats::TPtr> PendingRequests;
bool ProcessUrgentInFlight = false;

std::unordered_set<TActorId> ReplyToActorIds;
TActorId ForceTraversalReplyToActorId = {};

bool IsSchemeshardSeen = false;
bool IsStatisticsTableCreated = false;
Expand Down Expand Up @@ -304,11 +306,15 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl

private: // stored in local db

ui64 ForceTraversalOperationId = 0;
ui64 ForceTraversalCookie = 0;
TString ForceTraversalColumnTags;
TString ForceTraversalTypes;
TTableId TraversalTableId;
bool TraversalIsColumnTable = false;
TSerializedCellVec TraversalStartKey;
TInstant TraversalStartTime;
ui64 LastForceTraversalOperationId = 0;
ui64 NextForceTraversalOperationId = 0;

size_t GlobalTraversalRound = 1;

Expand All @@ -320,13 +326,15 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
TTraversalsByTime;
TTraversalsByTime ScheduleTraversalsByTime;

struct TForceTraversal : public TIntrusiveListItem<TForceTraversal> {
struct TForceTraversal {
ui64 OperationId = 0;
ui64 Cookie = 0;
TPathId PathId;
std::unordered_set<TActorId> ReplyToActorIds;
TString ColumnTags;
TString Types;
TActorId ReplyToActorId;
};
TIntrusiveList<TForceTraversal> ForceTraversals;
std::unordered_map<TPathId, TForceTraversal> ForceTraversalsByPathId;
std::list<TForceTraversal> ForceTraversals;
};

} // NKikimr::NStat
26 changes: 18 additions & 8 deletions ydb/core/statistics/aggregator/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,18 @@ struct TAggregatorSchema : NIceDb::Schema {
struct OperationId : Column<1, NScheme::NTypeIds::Uint64> {};
struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {};
struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {};
struct Cookie : Column<4, NScheme::NTypeIds::Uint64> {};
struct ColumnTags : Column<5, NScheme::NTypeIds::String> {};
struct Types : Column<6, NScheme::NTypeIds::String> {};

using TKey = TableKey<OperationId>;
using TKey = TableKey<OperationId, OwnerId, LocalPathId>;
using TColumns = TableColumns<
OperationId,
OwnerId,
LocalPathId
LocalPathId,
Cookie,
ColumnTags,
Types
>;
};

Expand All @@ -74,12 +80,16 @@ struct TAggregatorSchema : NIceDb::Schema {

static constexpr ui64 SysParam_Database = 1;
static constexpr ui64 SysParam_TraversalStartKey = 2;
static constexpr ui64 SysParam_TraversalTableOwnerId = 3;
static constexpr ui64 SysParam_TraversalTableLocalPathId = 4;
static constexpr ui64 SysParam_TraversalStartTime = 5;
static constexpr ui64 SysParam_LastForceTraversalOperationId = 6;
static constexpr ui64 SysParam_TraversalIsColumnTable = 7;
static constexpr ui64 SysParam_GlobalTraversalRound = 8;
static constexpr ui64 SysParam_ForceTraversalOperationId = 3;
static constexpr ui64 SysParam_TraversalTableOwnerId = 4;
static constexpr ui64 SysParam_TraversalTableLocalPathId = 5;
static constexpr ui64 SysParam_ForceTraversalCookie = 6;
static constexpr ui64 SysParam_ForceTraversalColumnTags = 7;
static constexpr ui64 SysParam_ForceTraversalTypes = 8;
static constexpr ui64 SysParam_TraversalStartTime = 9;
static constexpr ui64 SysParam_NextForceTraversalOperationId = 10;
static constexpr ui64 SysParam_TraversalIsColumnTable = 11;
static constexpr ui64 SysParam_GlobalTraversalRound = 12;
};

} // NKikimr::NStat
76 changes: 46 additions & 30 deletions ydb/core/statistics/aggregator/tx_analyze_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@

#include <ydb/core/tx/datashard/datashard.h>

#include <util/string/vector.h>

namespace NKikimr::NStat {

struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
TPathId PathId;
TActorId ReplyToActorId;
ui64 OperationId = 0;
const NKikimrStat::TEvAnalyze& Record;
TActorId ReplyToActorId;

TTxAnalyzeTable(TSelf* self, const TPathId& pathId, TActorId replyToActorId)
TTxAnalyzeTable(TSelf* self, const NKikimrStat::TEvAnalyze& record, TActorId replyToActorId)
: TTxBase(self)
, PathId(pathId)
, Record(record)
, ReplyToActorId(replyToActorId)
{}

Expand All @@ -24,28 +25,47 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
return true;
}

auto itOp = Self->ForceTraversalsByPathId.find(PathId);
if (itOp != Self->ForceTraversalsByPathId.end()) {
itOp->second.ReplyToActorIds.insert(ReplyToActorId);
OperationId = itOp->second.OperationId;
return true;
}

NIceDb::TNiceDb db(txc.DB);

TForceTraversal& operation = Self->ForceTraversalsByPathId[PathId];
operation.PathId = PathId;
operation.OperationId = ++Self->LastForceTraversalOperationId;
operation.ReplyToActorIds.insert(ReplyToActorId);
Self->ForceTraversals.PushBack(&operation);

Self->PersistLastForceTraversalOperationId(db);

db.Table<Schema::ForceTraversals>().Key(operation.OperationId).Update(
NIceDb::TUpdate<Schema::ForceTraversals::OwnerId>(PathId.OwnerId),
NIceDb::TUpdate<Schema::ForceTraversals::LocalPathId>(PathId.LocalPathId));
const ui64 cookie = Record.GetCookie();
const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");

for (const auto& table : Record.GetTables()) {
const TPathId pathId = PathIdFromPathId(table.GetPathId());
const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");

// drop request with the same cookie and path from this sender
if (std::any_of(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
[this, &pathId, &cookie](const TForceTraversal& elem) {
return elem.PathId == pathId
&& elem.Cookie == cookie
&& elem.ReplyToActorId == ReplyToActorId
;})) {
return true;
}

// create new force trasersal
TForceTraversal operation {
.OperationId = Self->NextForceTraversalOperationId,
.Cookie = cookie,
.PathId = pathId,
.ColumnTags = columnTags,
.Types = types,
.ReplyToActorId = ReplyToActorId
};
Self->ForceTraversals.emplace_back(operation);

db.Table<Schema::ForceTraversals>().Key(Self->NextForceTraversalOperationId, pathId.OwnerId, pathId.LocalPathId).Update(
NIceDb::TUpdate<Schema::ForceTraversals::OperationId>(Self->NextForceTraversalOperationId),
NIceDb::TUpdate<Schema::ForceTraversals::OwnerId>(pathId.OwnerId),
NIceDb::TUpdate<Schema::ForceTraversals::LocalPathId>(pathId.LocalPathId),
NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie),
NIceDb::TUpdate<Schema::ForceTraversals::ColumnTags>(columnTags),
NIceDb::TUpdate<Schema::ForceTraversals::Types>(types)
);
}

OperationId = operation.OperationId;
Self->PersistNextForceTraversalOperationId(db);

return true;
}
Expand All @@ -56,13 +76,9 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
};

void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyze::TPtr& ev) {
const auto& record = ev->Get()->Record;

// TODO: replace by queue
for (const auto& table : record.GetTables()) {
Execute(new TTxAnalyzeTable(this, PathIdFromPathId(table.GetPathId()), ev->Sender), TActivationContext::AsActorContext());
}
++NextForceTraversalOperationId;

Execute(new TTxAnalyzeTable(this, ev->Get()->Record, ev->Sender), TActivationContext::AsActorContext());
}

} // NKikimr::NStat
Loading

0 comments on commit 2a6b0d0

Please sign in to comment.