diff --git a/ydb/core/blob_depot/assimilator.cpp b/ydb/core/blob_depot/assimilator.cpp index a85de4f9f7b4..4f5f8179486c 100644 --- a/ydb/core/blob_depot/assimilator.cpp +++ b/ydb/core/blob_depot/assimilator.cpp @@ -94,6 +94,7 @@ namespace NKikimr::NBlobDepot { Become(&TThis::StateFunc); Action(); + UpdateBytesCopiedQ(); } void TAssimilator::PassAway() { @@ -116,8 +117,10 @@ namespace NKikimr::NBlobDepot { hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); hFunc(TEvBlobStorage::TEvControllerGroupDecommittedResponse, Handle); cFunc(TEvPrivate::EvResume, Action); + cFunc(TEvPrivate::EvResumeScanDataForPlanning, HandleResumeScanDataForPlanning); cFunc(TEvPrivate::EvResumeScanDataForCopying, HandleResumeScanDataForCopying); fFunc(TEvPrivate::EvTxComplete, HandleTxComplete); + cFunc(TEvPrivate::EvUpdateBytesCopiedQ, UpdateBytesCopiedQ); cFunc(TEvents::TSystem::Poison, PassAway); default: @@ -134,7 +137,11 @@ namespace NKikimr::NBlobDepot { if (Self->DecommitState < EDecommitState::BlobsFinished) { SendAssimilateRequest(); } else if (Self->DecommitState < EDecommitState::BlobsCopied) { - ScanDataForCopying(); + if (PlanningComplete) { + ScanDataForCopying(); + } else { + ScanDataForPlanning(); + } } else if (Self->DecommitState == EDecommitState::BlobsCopied) { Y_ABORT_UNLESS(!PipeId); CreatePipe(); @@ -283,7 +290,61 @@ namespace NKikimr::NBlobDepot { } } + void TAssimilator::ScanDataForPlanning() { + if (ResumeScanDataForPlanningInFlight) { + return; + } + + THPTimer timer; + ui32 numItems = 0; + bool timeout = false; + + if (!LastPlanScannedKey) { + ++Self->Assimilator.CopyIteration; + Self->Assimilator.BytesToCopy = 0; + } + + TData::TScanRange range{ + LastPlanScannedKey ? TData::TKey(*LastPlanScannedKey) : TData::TKey::Min(), + TData::TKey::Max(), + }; + Self->Data->ScanRange(range, nullptr, nullptr, [&](const TData::TKey& key, const TData::TValue& value) { + if (++numItems == 1000) { + numItems = 0; + if (TDuration::Seconds(timer.Passed()) >= TDuration::MilliSeconds(1)) { + timeout = true; + return false; + } + } + if (value.GoingToAssimilate) { + Self->Assimilator.BytesToCopy += key.GetBlobId().BlobSize(); + } + LastPlanScannedKey.emplace(key.GetBlobId()); + return true; + }); + + if (timeout) { + ResumeScanDataForPlanningInFlight = true; + TActivationContext::Send(new IEventHandle(TEvPrivate::EvResumeScanDataForPlanning, 0, SelfId(), {}, nullptr, 0)); + return; + } + + ActionInProgress = false; + PlanningComplete = true; + Action(); + } + + void TAssimilator::HandleResumeScanDataForPlanning() { + Y_ABORT_UNLESS(ResumeScanDataForPlanningInFlight); + ResumeScanDataForPlanningInFlight = false; + ScanDataForPlanning(); + } + void TAssimilator::ScanDataForCopying() { + if (ResumeScanDataForCopyingInFlight) { + return; + } + STLOG(PRI_DEBUG, BLOB_DEPOT, BDT54, "TAssimilator::ScanDataForCopying", (Id, Self->GetLogId()), (LastScannedKey, LastScannedKey), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size())); @@ -324,11 +385,8 @@ namespace NKikimr::NBlobDepot { (EntriesToProcess, EntriesToProcess), (Timeout, timeout), (NumGetsUnprocessed, GetIdToUnprocessedPuts.size())); if (timeout) { // timeout hit, reschedule work - if (!ResumeScanDataForCopyingInFlight) { - TActivationContext::Send(new IEventHandle(TEvPrivate::EvResumeScanDataForCopying, 0, SelfId(), {}, nullptr, 0)); - ResumeScanDataForCopyingInFlight = true; - } - break; + TActivationContext::Send(new IEventHandle(TEvPrivate::EvResumeScanDataForCopying, 0, SelfId(), {}, nullptr, 0)); + ResumeScanDataForCopyingInFlight = true; } else if (!ScanQ.empty()) { using TQuery = TEvBlobStorage::TEvGet::TQuery; const ui32 sz = ScanQ.size(); @@ -345,15 +403,18 @@ namespace NKikimr::NBlobDepot { GetIdToUnprocessedPuts.try_emplace(getId); ScanQ.clear(); TotalSize = 0; - } else if (!GetIdToUnprocessedPuts.empty()) { // there are some unprocessed get queries, still have to wait - break; + continue; + } else if (!GetIdToUnprocessedPuts.empty()) { + // there are some unprocessed get queries, still have to wait } else if (!EntriesToProcess) { // we have finished scanning the whole table without any entries, copying is done OnCopyDone(); - break; } else { // we have finished scanning, but we have replicated some data, restart scanning to ensure that nothing left LastScannedKey.reset(); - EntriesToProcess = false; + LastPlanScannedKey.reset(); + EntriesToProcess = PlanningComplete = ActionInProgress = false; + Action(); } + break; } } @@ -365,7 +426,7 @@ namespace NKikimr::NBlobDepot { void TAssimilator::Handle(TEvBlobStorage::TEvGetResult::TPtr ev) { auto& msg = *ev->Get(); - (msg.Status == NKikimrProto::OK ? Self->AssimilatorLatestOkGet : Self->AssimilatorLatestOkPut) = TInstant::Now(); + (msg.Status == NKikimrProto::OK ? Self->Assimilator.LatestOkGet : Self->Assimilator.LatestErrorGet) = TInstant::Now(); const auto it = GetIdToUnprocessedPuts.find(ev->Cookie); Y_ABORT_UNLESS(it != GetIdToUnprocessedPuts.end()); ui32 getBytes = 0; @@ -389,26 +450,25 @@ namespace NKikimr::NBlobDepot { ++it->second; } getBytes += resp.Id.BlobSize(); - ++Self->AssimilatorBlobsReadOk; + ++Self->Assimilator.BlobsReadOk; } else if (resp.Status == NKikimrProto::NODATA) { Self->Data->ExecuteTxCommitAssimilatedBlob(NKikimrProto::NODATA, TBlobSeqId(), TData::TKey(resp.Id), TEvPrivate::EvTxComplete, SelfId(), it->first); ++it->second; - ++Self->AssimilatorBlobsReadNoData; + ++Self->Assimilator.BlobsReadNoData; + Self->Assimilator.BytesToCopy -= resp.Id.BlobSize(); } else { - ++Self->AssimilatorBlobsReadError; + ++Self->Assimilator.BlobsReadError; continue; } - Self->AssimilatorLastReadBlobId = resp.Id; + Self->Assimilator.LastReadBlobId = resp.Id; } if (getBytes) { Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_DECOMMIT_GET_BYTES] += getBytes; } if (!it->second) { GetIdToUnprocessedPuts.erase(it); - if (!ResumeScanDataForCopyingInFlight) { - ScanDataForCopying(); - } + ScanDataForCopying(); } } @@ -417,20 +477,20 @@ namespace NKikimr::NBlobDepot { Y_ABORT_UNLESS(it != GetIdToUnprocessedPuts.end()); if (!--it->second) { GetIdToUnprocessedPuts.erase(it); - if (!ResumeScanDataForCopyingInFlight) { - ScanDataForCopying(); - } + ScanDataForCopying(); } } void TAssimilator::Handle(TEvBlobStorage::TEvPutResult::TPtr ev) { auto& msg = *ev->Get(); - (msg.Status == NKikimrProto::OK ? Self->AssimilatorLatestOkPut : Self->AssimilatorLatestErrorPut) = TInstant::Now(); + (msg.Status == NKikimrProto::OK ? Self->Assimilator.LatestOkPut : Self->Assimilator.LatestErrorPut) = TInstant::Now(); if (msg.Status == NKikimrProto::OK) { Self->TabletCounters->Cumulative()[NKikimrBlobDepot::COUNTER_DECOMMIT_PUT_OK_BYTES] += msg.Id.BlobSize(); - ++Self->AssimilatorBlobsPutOk; + ++Self->Assimilator.BlobsPutOk; + Self->Assimilator.BytesToCopy -= msg.Id.BlobSize(); + Self->Assimilator.BytesCopied += msg.Id.BlobSize(); } else { - ++Self->AssimilatorBlobsPutError; + ++Self->Assimilator.BlobsPutError; } const auto it = PutIdToKey.find(ev->Cookie); Y_ABORT_UNLESS(it != PutIdToKey.end()); @@ -554,7 +614,7 @@ namespace NKikimr::NBlobDepot { } void TAssimilator::UpdateAssimilatorPosition() const { - Self->AssimilatorPosition = TStringBuilder() + Self->Assimilator.Position = TStringBuilder() << "SkipBlocksUpTo# " << (SkipBlocksUpTo ? ToString(*SkipBlocksUpTo) : "") << Endl << "SkipBarriersUpTo# " << (SkipBarriersUpTo ? TString(TStringBuilder() << std::get<0>(*SkipBarriersUpTo) << ':' << (int)std::get<1>(*SkipBarriersUpTo)) @@ -562,6 +622,33 @@ namespace NKikimr::NBlobDepot { << "SkipBlobsUpTo# " << (SkipBlobsUpTo ? SkipBlobsUpTo->ToString() : ""); } + void TAssimilator::UpdateBytesCopiedQ() { + while (BytesCopiedQ.size() >= 3) { + BytesCopiedQ.pop_front(); + } + BytesCopiedQ.emplace_back(TActivationContext::Monotonic(), Self->Assimilator.BytesCopied); + + Self->Assimilator.CopySpeed = 0; + Self->Assimilator.CopyTimeRemaining = TDuration::Max(); + + if (BytesCopiedQ.size() > 1) { + const auto& [frontTs, frontBytes] = BytesCopiedQ.front(); + const auto& [backTs, backBytes] = BytesCopiedQ.back(); + const TDuration deltaTs = backTs - frontTs; + const ui64 deltaBytes = backBytes - frontBytes; + if (deltaTs != TDuration::Zero()) { + Self->Assimilator.CopySpeed = deltaBytes * 1'000'000 / deltaTs.MicroSeconds(); + } + if (deltaBytes) { + Self->Assimilator.CopyTimeRemaining = TDuration::MicroSeconds(Self->Assimilator.BytesToCopy * + deltaTs.MicroSeconds() / deltaBytes); + } + } + + TActivationContext::Schedule(TDuration::Seconds(1), new IEventHandle(TEvPrivate::EvUpdateBytesCopiedQ, 0, + SelfId(), {}, nullptr, 0)); + } + void TBlobDepot::TData::ExecuteTxCommitAssimilatedBlob(NKikimrProto::EReplyStatus status, TBlobSeqId blobSeqId, TData::TKey key, ui32 notifyEventType, TActorId parentId, ui64 cookie, bool keep, bool doNotKeep) { Self->Execute(std::make_unique(Self, status, blobSeqId, std::move(key), diff --git a/ydb/core/blob_depot/assimilator.h b/ydb/core/blob_depot/assimilator.h index bd23ea8f86f6..d0e757e5eaee 100644 --- a/ydb/core/blob_depot/assimilator.h +++ b/ydb/core/blob_depot/assimilator.h @@ -10,8 +10,10 @@ namespace NKikimr::NBlobDepot { struct TEvPrivate { enum { EvResume = EventSpaceBegin(TEvents::ES_PRIVATE), + EvResumeScanDataForPlanning, EvResumeScanDataForCopying, EvTxComplete, + EvUpdateBytesCopiedQ, }; }; @@ -42,6 +44,12 @@ namespace NKikimr::NBlobDepot { bool ActionInProgress = false; bool ResumeScanDataForCopyingInFlight = false; + std::optional LastPlanScannedKey; + bool PlanningComplete = false; + bool ResumeScanDataForPlanningInFlight = false; + + std::deque> BytesCopiedQ; + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::BLOB_DEPOT_ASSIMILATOR_ACTOR; @@ -62,6 +70,8 @@ namespace NKikimr::NBlobDepot { void Action(); void SendAssimilateRequest(); void Handle(TEvBlobStorage::TEvAssimilateResult::TPtr ev); + void ScanDataForPlanning(); + void HandleResumeScanDataForPlanning(); void ScanDataForCopying(); void HandleResumeScanDataForCopying(); void Handle(TEvBlobStorage::TEvGetResult::TPtr ev); @@ -74,6 +84,7 @@ namespace NKikimr::NBlobDepot { void Handle(TEvBlobStorage::TEvControllerGroupDecommittedResponse::TPtr ev); TString SerializeAssimilatorState() const; void UpdateAssimilatorPosition() const; + void UpdateBytesCopiedQ(); }; } // NKikimrBlobDepot::NBlobDepot diff --git a/ydb/core/blob_depot/blob_depot_tablet.h b/ydb/core/blob_depot/blob_depot_tablet.h index 726ebafc598e..1b59fc81ec00 100644 --- a/ydb/core/blob_depot/blob_depot_tablet.h +++ b/ydb/core/blob_depot/blob_depot_tablet.h @@ -290,17 +290,24 @@ namespace NKikimr::NBlobDepot { TActorId GroupAssimilatorId; EDecommitState DecommitState = EDecommitState::Default; std::optional AssimilatorState; - TString AssimilatorPosition; - TInstant AssimilatorLatestErrorGet; - TInstant AssimilatorLatestOkGet; - TInstant AssimilatorLatestErrorPut; - TInstant AssimilatorLatestOkPut; - TLogoBlobID AssimilatorLastReadBlobId; - ui64 AssimilatorBlobsReadOk = 0; - ui64 AssimilatorBlobsReadNoData = 0; - ui64 AssimilatorBlobsReadError = 0; - ui64 AssimilatorBlobsPutOk = 0; - ui64 AssimilatorBlobsPutError = 0; + struct { + TString Position; + TInstant LatestErrorGet; + TInstant LatestOkGet; + TInstant LatestErrorPut; + TInstant LatestOkPut; + TLogoBlobID LastReadBlobId; + ui64 BytesToCopy = 0; + ui64 BytesCopied = 0; + ui64 CopySpeed = 0; + TDuration CopyTimeRemaining = TDuration::Max(); + ui64 BlobsReadOk = 0; + ui64 BlobsReadNoData = 0; + ui64 BlobsReadError = 0; + ui64 BlobsPutOk = 0; + ui64 BlobsPutError = 0; + ui32 CopyIteration = 0; + } Assimilator; class TGroupAssimilator; diff --git a/ydb/core/blob_depot/mon_main.cpp b/ydb/core/blob_depot/mon_main.cpp index 051be4245bf3..eb6b4d74fa00 100644 --- a/ydb/core/blob_depot/mon_main.cpp +++ b/ydb/core/blob_depot/mon_main.cpp @@ -456,19 +456,24 @@ namespace NKikimr::NBlobDepot { KEYVALUE_P("Now", TInstant::Now()); KEYVALUE_P("Decommit state", DecommitState); KEYVALUE_P("Assimilator state", GroupAssimilatorId ? "running" : "stopped"); - KEYVALUE_P("Assimilator position", TStringBuilder() << "
" << AssimilatorPosition << "
");
+                            KEYVALUE_P("Assimilator position", TStringBuilder() << "
" << Assimilator.Position << "
");
                             KEYVALUE_P("Last assimilated blob id", Data->LastAssimilatedBlobId ?
                                 Data->LastAssimilatedBlobId->ToString() : "");
-                            KEYVALUE_P("Last read blob id", AssimilatorLastReadBlobId);
-                            KEYVALUE_P("Latest successful get", AssimilatorLatestOkGet);
-                            KEYVALUE_P("Latest erroneous get", AssimilatorLatestErrorGet);
-                            KEYVALUE_P("Latest successful put", AssimilatorLatestOkPut);
-                            KEYVALUE_P("Latest erroneous put", AssimilatorLatestErrorPut);
-                            KEYVALUE_P("Blobs read with OK", AssimilatorBlobsReadOk);
-                            KEYVALUE_P("Blobs read with NODATA", AssimilatorBlobsReadNoData);
-                            KEYVALUE_P("Blobs read with error", AssimilatorBlobsReadError);
-                            KEYVALUE_P("Blobs put with OK", AssimilatorBlobsPutOk);
-                            KEYVALUE_P("Blobs put with error", AssimilatorBlobsPutError);
+                            KEYVALUE_P("Copy iteration", Assimilator.CopyIteration);
+                            KEYVALUE_P("Bytes to copy", Assimilator.BytesToCopy);
+                            KEYVALUE_P("Bytes already copied", Assimilator.BytesCopied);
+                            KEYVALUE_P("Copy speed, bytes per second", Assimilator.CopySpeed);
+                            KEYVALUE_P("Copy time remaining", Assimilator.CopyTimeRemaining);
+                            KEYVALUE_P("Last read blob id", Assimilator.LastReadBlobId);
+                            KEYVALUE_P("Latest successful get", Assimilator.LatestOkGet);
+                            KEYVALUE_P("Latest erroneous get", Assimilator.LatestErrorGet);
+                            KEYVALUE_P("Latest successful put", Assimilator.LatestOkPut);
+                            KEYVALUE_P("Latest erroneous put", Assimilator.LatestErrorPut);
+                            KEYVALUE_P("Blobs read with OK", Assimilator.BlobsReadOk);
+                            KEYVALUE_P("Blobs read with NODATA", Assimilator.BlobsReadNoData);
+                            KEYVALUE_P("Blobs read with error", Assimilator.BlobsReadError);
+                            KEYVALUE_P("Blobs put with OK", Assimilator.BlobsPutOk);
+                            KEYVALUE_P("Blobs put with error", Assimilator.BlobsPutError);
                         })
                     }
                 }