Skip to content

Commit

Permalink
Merge 8758b33 into 27ea4c8
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored Apr 26, 2024
2 parents 27ea4c8 + 8758b33 commit 2884015
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 2 deletions.
18 changes: 17 additions & 1 deletion ydb/core/tx/datashard/datashard__stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ class TAsyncTableStatsBuilder : public TActorBootstrapped<TAsyncTableStatsBuilde
ctx.Send(ReplyTo, ev.Release());

FinishTask(ctx);

return Die(ctx);
}

Expand All @@ -206,6 +205,8 @@ class TAsyncTableStatsBuilder : public TActorBootstrapped<TAsyncTableStatsBuilde
if (msg.Status != NKikimrProto::OK) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Stats build failed at datashard " << TabletId << ", for tableId " << TableId
<< " requested pages but got " << msg.Status);
Send(ReplyTo, new TDataShard::TEvPrivate::TEvTableStatsError(TableId, TDataShard::TEvPrivate::TEvTableStatsError::ECode::FETCH_PAGE_FAILED));
FinishTask(ctx);
return Die(ctx);
}

Expand Down Expand Up @@ -409,6 +410,21 @@ void TDataShard::Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorCo
}
}

void TDataShard::Handle(TEvPrivate::TEvTableStatsError::TPtr& ev, const TActorContext& ctx) {
Actors.erase(ev->Sender);

auto msg = ev->Get();

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Stats rebuilt error '" << msg->Message
<< "', code: " << ui32(msg->Code) << ", datashard " << TabletID() << ", tableId " << msg->TableId);

auto it = TableInfos.find(msg->TableId);
if (it != TableInfos.end()) {
it->second->StatsUpdateInProgress = false;
// if we have got an error, a compaction should have happened so restart build stats anyway
it->second->StatsNeedUpdate = true;
}
}

class TDataShard::TTxInitiateStatsUpdate : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
private:
Expand Down
26 changes: 26 additions & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ class TDataShard
EvConfirmReadonlyLease,
EvReadonlyLeaseConfirmation,
EvPlanPredictedTxs,
EvTableStatsError,
EvEnd
};

Expand Down Expand Up @@ -400,6 +401,29 @@ class TDataShard
ui64 SearchHeight = 0;
};

struct TEvTableStatsError : public TEventLocal<TEvTableStatsError, EvTableStatsError> {
enum class ECode {
FETCH_PAGE_FAILED,
RESOURCE_ALLOCATION_FAILED,
ACTOR_DIED,
UNKNOWN
};

TEvTableStatsError(ui64 tableId, ECode code, const TString& msg)
: TableId(tableId)
, Code(code)
, Message(msg)
{}

TEvTableStatsError(ui64 tableId, ECode code)
: TEvTableStatsError(tableId, code, "")
{}

const ui64 TableId;
const ECode Code;
const TString Message;
};

struct TEvRemoveOldInReadSets : public TEventLocal<TEvRemoveOldInReadSets, EvRemoveOldInReadSets> {};

struct TEvRegisterScanActor : public TEventLocal<TEvRegisterScanActor, EvRegisterScanActor> {
Expand Down Expand Up @@ -1247,6 +1271,7 @@ class TDataShard
void Handle(TEvDataShard::TEvSplitPartitioningChanged::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvGetTableStats::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvAsyncTableStats::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvTableStatsError::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& ctx);
void HandleSafe(TEvDataShard::TEvKqpScan::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -2936,6 +2961,7 @@ class TDataShard
HFunc(TEvDataShard::TEvSplitPartitioningChanged, Handle);
HFunc(TEvDataShard::TEvGetTableStats, Handle);
HFunc(TEvPrivate::TEvAsyncTableStats, Handle);
HFunc(TEvPrivate::TEvTableStatsError, Handle);
HFunc(TEvDataShard::TEvKqpScan, Handle);
HFunc(TEvDataShard::TEvUploadRowsRequest, Handle);
HFunc(TEvDataShard::TEvEraseRowsRequest, Handle);
Expand Down
54 changes: 53 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_stats.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
#include "datashard_ut_common_kqp.h"
#include "ydb/core/tablet_flat/shared_sausagecache.h"

namespace NKikimr {
Expand Down Expand Up @@ -378,6 +377,59 @@ Y_UNIT_TEST_SUITE(DataShardStats) {
UNIT_ASSERT_LE(counters->ActiveBytes->Val(), 800*1024); // one index
}

Y_UNIT_TEST(NoData) {
TPortManager pm;
TServerSettings serverSettings(pm.GetPort(2134));
serverSettings.SetDomainName("Root")
.SetUseRealThreads(false);

TServer::TPtr server = new TServer(serverSettings);
auto& runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();

runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
runtime.SetLogPriority(NKikimrServices::TABLET_SAUSAGECACHE, NLog::PRI_TRACE);

InitRoot(server, sender);

auto [shards, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", 1);
const auto shard1 = GetTableShards(server, sender, "/Root/table-1").at(0);

ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)");

bool captured = false;
auto observer = runtime.AddObserver<NSharedCache::TEvResult>([&](NSharedCache::TEvResult::TPtr& event) {
IActor *actor = runtime.FindActor(event->Recipient);

Cerr << "Got SchemeShard NSharedCache::TEvResult from " << event->Sender << " to " << event->Recipient << "(" << actor->GetActivityType() << ")"<< Endl;

if (actor && actor->GetActivityType() == 288) {
auto& message = *event->Get();
event.Reset(static_cast<TEventHandle<NSharedCache::TEvResult> *>(
new IEventHandle(event->Recipient, event->Sender,
new NSharedCache::TEvResult(message.Origin, message.Cookie, NKikimrProto::NODATA))));
captured = true;
}
});

CompactTable(runtime, shard1, tableId1, false);

for (int i = 0; i < 5 && !captured; ++i) {
TDispatchOptions options;
options.CustomFinalCondition = [&]() { return captured; };
runtime.DispatchEvents(options, TDuration::Seconds(5));
}
observer.Remove();

{
Cerr << "Waiting stats.." << Endl;
auto stats = WaitTableStats(runtime, 1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetDatashardId(), shard1);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetRowCount(), 3);
UNIT_ASSERT_VALUES_EQUAL(stats.GetTableStats().GetPartCount(), 1);
}
}

} // Y_UNIT_TEST_SUITE(DataShardStats)

} // namespace NKikimr

0 comments on commit 2884015

Please sign in to comment.