Skip to content

Commit

Permalink
Improve BlobDepot assimilator metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru committed Apr 23, 2024
1 parent 99fcb09 commit 92161f3
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 47 deletions.
137 changes: 112 additions & 25 deletions ydb/core/blob_depot/assimilator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ namespace NKikimr::NBlobDepot {

Become(&TThis::StateFunc);
Action();
UpdateBytesCopiedQ();
}

void TAssimilator::PassAway() {
Expand All @@ -116,8 +117,10 @@ namespace NKikimr::NBlobDepot {
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle);
cFunc(TEvPrivate::EvResume, Action);
cFunc(TEvPrivate::EvResumeScanDataForPlanning, HandleResumeScanDataForPlanning);
cFunc(TEvPrivate::EvResumeScanDataForCopying, HandleResumeScanDataForCopying);
fFunc(TEvPrivate::EvTxComplete, HandleTxComplete);
cFunc(TEvPrivate::EvUpdateBytesCopiedQ, UpdateBytesCopiedQ);
cFunc(TEvents::TSystem::Poison, PassAway);

default:
Expand All @@ -134,7 +137,11 @@ namespace NKikimr::NBlobDepot {
if (Self->DecommitState < EDecommitState::BlobsFinished) {
SendAssimilateRequest();
} else if (Self->DecommitState < EDecommitState::BlobsCopied) {
ScanDataForCopying();
if (PlanningComplete) {
ScanDataForCopying();
} else {
ScanDataForPlanning();
}
} else if (Self->DecommitState == EDecommitState::BlobsCopied) {
Y_ABORT_UNLESS(!PipeId);
CreatePipe();
Expand Down Expand Up @@ -283,7 +290,61 @@ namespace NKikimr::NBlobDepot {
}
}

void TAssimilator::ScanDataForPlanning() {
if (ResumeScanDataForPlanningInFlight) {
return;
}

THPTimer timer;
ui32 numItems = 0;
bool timeout = false;

if (!LastPlanScannedKey) {
++Self->Assimilator.CopyIteration;
Self->Assimilator.BytesToCopy = 0;
}

TData::TScanRange range{
LastPlanScannedKey ? TData::TKey(*LastPlanScannedKey) : TData::TKey::Min(),
TData::TKey::Max(),
};
Self->Data->ScanRange(range, nullptr, nullptr, [&](const TData::TKey& key, const TData::TValue& value) {
if (++numItems == 1000) {
numItems = 0;
if (TDuration::Seconds(timer.Passed()) >= TDuration::MilliSeconds(1)) {
timeout = true;
return false;
}
}
if (value.GoingToAssimilate) {
Self->Assimilator.BytesToCopy += key.GetBlobId().BlobSize();
}
LastPlanScannedKey.emplace(key.GetBlobId());
return true;
});

if (timeout) {
ResumeScanDataForPlanningInFlight = true;
TActivationContext::Send(new IEventHandle(TEvPrivate::EvResumeScanDataForPlanning, 0, SelfId(), {}, nullptr, 0));
return;
}

ActionInProgress = false;
PlanningComplete = true;
Action();
}

void TAssimilator::HandleResumeScanDataForPlanning() {
Y_ABORT_UNLESS(ResumeScanDataForPlanningInFlight);
ResumeScanDataForPlanningInFlight = false;
ScanDataForPlanning();
}

void TAssimilator::ScanDataForCopying() {
if (ResumeScanDataForCopyingInFlight) {
return;
}

STLOG(PRI_DEBUG, BLOB_DEPOT, BDT54, "TAssimilator::ScanDataForCopying", (Id, Self->GetLogId()),
(LastScannedKey, LastScannedKey), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size()));

Expand Down Expand Up @@ -324,11 +385,8 @@ namespace NKikimr::NBlobDepot {
(EntriesToProcess, EntriesToProcess), (Timeout, timeout), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size()));

if (timeout) { // timeout hit, reschedule work
if (!ResumeScanDataForCopyingInFlight) {
TActivationContext::Send(new IEventHandle(TEvPrivate::EvResumeScanDataForCopying, 0, SelfId(), {}, nullptr, 0));
ResumeScanDataForCopyingInFlight = true;
}
break;
TActivationContext::Send(new IEventHandle(TEvPrivate::EvResumeScanDataForCopying, 0, SelfId(), {}, nullptr, 0));
ResumeScanDataForCopyingInFlight = true;
} else if (!ScanQ.empty()) {
using TQuery = TEvBlobStorage::TEvGet::TQuery;
const ui32 sz = ScanQ.size();
Expand All @@ -345,15 +403,18 @@ namespace NKikimr::NBlobDepot {
GetIdToUnprocessedPuts.try_emplace(getId);
ScanQ.clear();
TotalSize = 0;
} else if (!GetIdToUnprocessedPuts.empty()) { // there are some unprocessed get queries, still have to wait
break;
continue;
} else if (!GetIdToUnprocessedPuts.empty()) {
// there are some unprocessed get queries, still have to wait
} else if (!EntriesToProcess) { // we have finished scanning the whole table without any entries, copying is done
OnCopyDone();
break;
} else { // we have finished scanning, but we have replicated some data, restart scanning to ensure that nothing left
LastScannedKey.reset();
EntriesToProcess = false;
LastPlanScannedKey.reset();
EntriesToProcess = PlanningComplete = ActionInProgress = false;
Action();
}
break;
}
}

Expand All @@ -365,7 +426,7 @@ namespace NKikimr::NBlobDepot {

void TAssimilator::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) {
auto& msg = *ev->Get();
(msg.Status == NKikimrProto::OK ? Self->AssimilatorLatestOkGet : Self->AssimilatorLatestOkPut) = TInstant::Now();
(msg.Status == NKikimrProto::OK ? Self->Assimilator.LatestOkGet : Self->Assimilator.LatestErrorGet) = TInstant::Now();
const auto it = GetIdToUnprocessedPuts.find(ev->Cookie);
Y_ABORT_UNLESS(it != GetIdToUnprocessedPuts.end());
ui32 getBytes = 0;
Expand All @@ -389,26 +450,25 @@ namespace NKikimr::NBlobDepot {
++it->second;
}
getBytes += resp.Id.BlobSize();
++Self->AssimilatorBlobsReadOk;
++Self->Assimilator.BlobsReadOk;
} else if (resp.Status == NKikimrProto::NODATA) {
Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(resp.Id),
TEvPrivate::EvTxComplete, SelfId(), it->first);
++it->second;
++Self->AssimilatorBlobsReadNoData;
++Self->Assimilator.BlobsReadNoData;
Self->Assimilator.BytesToCopy -= resp.Id.BlobSize();
} else {
++Self->AssimilatorBlobsReadError;
++Self->Assimilator.BlobsReadError;
continue;
}
Self->AssimilatorLastReadBlobId = resp.Id;
Self->Assimilator.LastReadBlobId = resp.Id;
}
if (getBytes) {
Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_DECOMMIT_GET_BYTES] += getBytes;
}
if (!it->second) {
GetIdToUnprocessedPuts.erase(it);
if (!ResumeScanDataForCopyingInFlight) {
ScanDataForCopying();
}
ScanDataForCopying();
}
}

Expand All @@ -417,20 +477,20 @@ namespace NKikimr::NBlobDepot {
Y_ABORT_UNLESS(it != GetIdToUnprocessedPuts.end());
if (!--it->second) {
GetIdToUnprocessedPuts.erase(it);
if (!ResumeScanDataForCopyingInFlight) {
ScanDataForCopying();
}
ScanDataForCopying();
}
}

void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) {
auto& msg = *ev->Get();
(msg.Status == NKikimrProto::OK ? Self->AssimilatorLatestOkPut : Self->AssimilatorLatestErrorPut) = TInstant::Now();
(msg.Status == NKikimrProto::OK ? Self->Assimilator.LatestOkPut : Self->Assimilator.LatestErrorPut) = TInstant::Now();
if (msg.Status == NKikimrProto::OK) {
Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_DECOMMIT_PUT_OK_BYTES] += msg.Id.BlobSize();
++Self->AssimilatorBlobsPutOk;
++Self->Assimilator.BlobsPutOk;
Self->Assimilator.BytesToCopy -= msg.Id.BlobSize();
Self->Assimilator.BytesCopied += msg.Id.BlobSize();
} else {
++Self->AssimilatorBlobsPutError;
++Self->Assimilator.BlobsPutError;
}
const auto it = PutIdToKey.find(ev->Cookie);
Y_ABORT_UNLESS(it != PutIdToKey.end());
Expand Down Expand Up @@ -554,14 +614,41 @@ namespace NKikimr::NBlobDepot {
}

void TAssimilator::UpdateAssimilatorPosition() const {
Self->AssimilatorPosition = TStringBuilder()
Self->Assimilator.Position = TStringBuilder()
<< "SkipBlocksUpTo# " << (SkipBlocksUpTo ? ToString(*SkipBlocksUpTo) : "<none>") << Endl
<< "SkipBarriersUpTo# " << (SkipBarriersUpTo
? TString(TStringBuilder() << std::get<0>(*SkipBarriersUpTo) << ':' << (int)std::get<1>(*SkipBarriersUpTo))
: "<none>") << Endl
<< "SkipBlobsUpTo# " << (SkipBlobsUpTo ? SkipBlobsUpTo->ToString() : "<none>");
}

void TAssimilator::UpdateBytesCopiedQ() {
while (BytesCopiedQ.size() >= 3) {
BytesCopiedQ.pop_front();
}
BytesCopiedQ.emplace_back(TActivationContext::Monotonic(), Self->Assimilator.BytesCopied);

Self->Assimilator.CopySpeed = 0;
Self->Assimilator.CopyTimeRemaining = TDuration::Max();

if (BytesCopiedQ.size() > 1) {
const auto& [frontTs, frontBytes] = BytesCopiedQ.front();
const auto& [backTs, backBytes] = BytesCopiedQ.back();
const TDuration deltaTs = backTs - frontTs;
const ui64 deltaBytes = backBytes - frontBytes;
if (deltaTs != TDuration::Zero()) {
Self->Assimilator.CopySpeed = deltaBytes * 1'000'000 / deltaTs.MicroSeconds();
}
if (deltaBytes) {
Self->Assimilator.CopyTimeRemaining = TDuration::MicroSeconds(Self->Assimilator.BytesToCopy *
deltaTs.MicroSeconds() / deltaBytes);
}
}

TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvUpdateBytesCopiedQ, 0,
SelfId(), {}, nullptr, 0));
}

void TBlobDepot::TData::ExecuteTxCommitAssimilatedBlob(NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId,
TData::TKey key, ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep, bool doNotKeep) {
Self->Execute(std::make_unique<TTxCommitAssimilatedBlob>(Self, status, blobSeqId, std::move(key),
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/blob_depot/assimilator.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ namespace NKikimr::NBlobDepot {
struct TEvPrivate {
enum {
EvResume = EventSpaceBegin(TEvents::ES_PRIVATE),
EvResumeScanDataForPlanning,
EvResumeScanDataForCopying,
EvTxComplete,
EvUpdateBytesCopiedQ,
};
};

Expand Down Expand Up @@ -42,6 +44,12 @@ namespace NKikimr::NBlobDepot {
bool ActionInProgress = false;
bool ResumeScanDataForCopyingInFlight = false;

std::optional<TLogoBlobID> LastPlanScannedKey;
bool PlanningComplete = false;
bool ResumeScanDataForPlanningInFlight = false;

std::deque<std::tuple<TMonotonic, ui64>> BytesCopiedQ;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::BLOB_DEPOT_ASSIMILATOR_ACTOR;
Expand All @@ -62,6 +70,8 @@ namespace NKikimr::NBlobDepot {
void Action();
void SendAssimilateRequest();
void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev);
void ScanDataForPlanning();
void HandleResumeScanDataForPlanning();
void ScanDataForCopying();
void HandleResumeScanDataForCopying();
void Handle(TEvBlobStorage::TEvGetResult::TPtr ev);
Expand All @@ -74,6 +84,7 @@ namespace NKikimr::NBlobDepot {
void Handle(TEvBlobStorage::TEvControllerGroupDecommittedResponse::TPtr ev);
TString SerializeAssimilatorState() const;
void UpdateAssimilatorPosition() const;
void UpdateBytesCopiedQ();
};

} // NKikimrBlobDepot::NBlobDepot
29 changes: 18 additions & 11 deletions ydb/core/blob_depot/blob_depot_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,17 +290,24 @@ namespace NKikimr::NBlobDepot {
TActorId GroupAssimilatorId;
EDecommitState DecommitState = EDecommitState::Default;
std::optional<TString> AssimilatorState;
TString AssimilatorPosition;
TInstant AssimilatorLatestErrorGet;
TInstant AssimilatorLatestOkGet;
TInstant AssimilatorLatestErrorPut;
TInstant AssimilatorLatestOkPut;
TLogoBlobID AssimilatorLastReadBlobId;
ui64 AssimilatorBlobsReadOk = 0;
ui64 AssimilatorBlobsReadNoData = 0;
ui64 AssimilatorBlobsReadError = 0;
ui64 AssimilatorBlobsPutOk = 0;
ui64 AssimilatorBlobsPutError = 0;
struct {
TString Position;
TInstant LatestErrorGet;
TInstant LatestOkGet;
TInstant LatestErrorPut;
TInstant LatestOkPut;
TLogoBlobID LastReadBlobId;
ui64 BytesToCopy = 0;
ui64 BytesCopied = 0;
ui64 CopySpeed = 0;
TDuration CopyTimeRemaining = TDuration::Max();
ui64 BlobsReadOk = 0;
ui64 BlobsReadNoData = 0;
ui64 BlobsReadError = 0;
ui64 BlobsPutOk = 0;
ui64 BlobsPutError = 0;
ui32 CopyIteration = 0;
} Assimilator;

class TGroupAssimilator;

Expand Down
27 changes: 16 additions & 11 deletions ydb/core/blob_depot/mon_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,19 +456,24 @@ namespace NKikimr::NBlobDepot {
KEYVALUE_P("Now", TInstant::Now());
KEYVALUE_P("Decommit state", DecommitState);
KEYVALUE_P("Assimilator state", GroupAssimilatorId ? "running" : "stopped");
KEYVALUE_P("Assimilator position", TStringBuilder() << "<pre>" << AssimilatorPosition << "<pre/>");
KEYVALUE_P("Assimilator position", TStringBuilder() << "<pre>" << Assimilator.Position << "<pre/>");
KEYVALUE_P("Last assimilated blob id", Data->LastAssimilatedBlobId ?
Data->LastAssimilatedBlobId->ToString() : "<null>");
KEYVALUE_P("Last read blob id", AssimilatorLastReadBlobId);
KEYVALUE_P("Latest successful get", AssimilatorLatestOkGet);
KEYVALUE_P("Latest erroneous get", AssimilatorLatestErrorGet);
KEYVALUE_P("Latest successful put", AssimilatorLatestOkPut);
KEYVALUE_P("Latest erroneous put", AssimilatorLatestErrorPut);
KEYVALUE_P("Blobs read with OK", AssimilatorBlobsReadOk);
KEYVALUE_P("Blobs read with NODATA", AssimilatorBlobsReadNoData);
KEYVALUE_P("Blobs read with error", AssimilatorBlobsReadError);
KEYVALUE_P("Blobs put with OK", AssimilatorBlobsPutOk);
KEYVALUE_P("Blobs put with error", AssimilatorBlobsPutError);
KEYVALUE_P("Copy iteration", Assimilator.CopyIteration);
KEYVALUE_P("Bytes to copy", Assimilator.BytesToCopy);
KEYVALUE_P("Bytes already copied", Assimilator.BytesCopied);
KEYVALUE_P("Copy speed, bytes per second", Assimilator.CopySpeed);
KEYVALUE_P("Copy time remaining", Assimilator.CopyTimeRemaining);
KEYVALUE_P("Last read blob id", Assimilator.LastReadBlobId);
KEYVALUE_P("Latest successful get", Assimilator.LatestOkGet);
KEYVALUE_P("Latest erroneous get", Assimilator.LatestErrorGet);
KEYVALUE_P("Latest successful put", Assimilator.LatestOkPut);
KEYVALUE_P("Latest erroneous put", Assimilator.LatestErrorPut);
KEYVALUE_P("Blobs read with OK", Assimilator.BlobsReadOk);
KEYVALUE_P("Blobs read with NODATA", Assimilator.BlobsReadNoData);
KEYVALUE_P("Blobs read with error", Assimilator.BlobsReadError);
KEYVALUE_P("Blobs put with OK", Assimilator.BlobsPutOk);
KEYVALUE_P("Blobs put with error", Assimilator.BlobsPutError);
})
}
}
Expand Down

0 comments on commit 92161f3

Please sign in to comment.