From f5e8d85d5970a5a18c9f4e369de9df56ba656728 Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Wed, 7 Feb 2024 10:40:11 +0300 Subject: [PATCH 1/2] Fix scrubbing and flapping unittest (#1640) --- ydb/core/blobstorage/backpressure/queue.h | 4 + .../queue_backpressure_client.cpp | 6 +- .../backpressure/queue_backpressure_server.h | 4 +- .../blobstorage/dsproxy/dsproxy_get_impl.cpp | 5 +- .../blobstorage/pdisk/mock/pdisk_mock.cpp | 9 ++ ydb/core/blobstorage/pdisk/mock/pdisk_mock.h | 1 + .../blobstorage/ut_blobstorage/scrub_fast.cpp | 152 +++++++++++++----- .../vdisk/scrub/blob_recovery_impl.h | 4 +- .../vdisk/scrub/blob_recovery_process.cpp | 111 ++++++++----- .../vdisk/scrub/blob_recovery_request.cpp | 9 +- .../scrub/restore_corrupted_blob_actor.cpp | 8 + 11 files changed, 217 insertions(+), 96 deletions(-) diff --git a/ydb/core/blobstorage/backpressure/queue.h b/ydb/core/blobstorage/backpressure/queue.h index 5ba7a745a73c..f7e49e66805d 100644 --- a/ydb/core/blobstorage/backpressure/queue.h +++ b/ydb/core/blobstorage/backpressure/queue.h @@ -180,6 +180,10 @@ class TBlobStorageQueue { return Queues.InFlight.size(); } + ui64 GetInFlightCost() const { + return InFlightCost; + } + void UpdateCostModel(TInstant now, const NKikimrBlobStorage::TVDiskCostSettings& settings, const TBlobStorageGroupType& type); void InvalidateCosts(); diff --git a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp index b3c9d1d33c4d..f60b2d78a0ac 100644 --- a/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp +++ b/ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp @@ -365,7 +365,11 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped= cost); Cost -= cost; --InFlight; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 003a4e1a777d..87cdf96a2bf9 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -293,12 +293,13 @@ void TGetImpl::PrepareRequests(TLogContext &logCtx, TDequeSetId(ReaderTabletData->Id); msg->SetGeneration(ReaderTabletData->Generation); } - R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << get.OrderNumber - << " vget# " << vget->ToString()); } for (auto& vget : gets) { if (vget) { + R_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " + << Info->GetTopology().GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID())) + << " vget# " << vget->ToString()); outVGets.push_back(std::move(vget)); ++RequestIndex; } diff --git a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp index f057bb46d5df..90969f236cbe 100644 --- a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp +++ b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp @@ -207,6 +207,11 @@ struct TPDiskMockState::TImpl { } } + bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) { + const ui64 chunkBegin = ui64(chunkIdx) * ChunkSize; + return static_cast(Corrupted & TIntervalSet{chunkBegin + begin, chunkBegin + end}); + } + std::set GetChunks() { std::set res; for (auto& [ownerId, owner] : Owners) { @@ -290,6 +295,10 @@ void TPDiskMockState::SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool Impl->SetCorruptedArea(chunkIdx, begin, end, enabled); } +bool TPDiskMockState::HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end) { + return Impl->HasCorruptedArea(chunkIdx, begin, end); +} + std::set TPDiskMockState::GetChunks() { return Impl->GetChunks(); } diff --git a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.h b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.h index c22944034760..2198b0615c34 100644 --- a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.h +++ b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.h @@ -26,6 +26,7 @@ namespace NKikimr { ~TPDiskMockState(); void SetCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end, bool enabled); + bool HasCorruptedArea(ui32 chunkIdx, ui32 begin, ui32 end); std::set GetChunks(); TMaybe GetOwnerRound(const TVDiskID& vDiskId) const; ui32 GetChunkSize() const; diff --git a/ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp b/ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp index 4ceb6595d927..2e24b5318d41 100644 --- a/ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/scrub_fast.cpp @@ -16,60 +16,125 @@ void Test() { TString data = TString::Uninitialized(8_MB); memset(data.Detach(), 'X', data.size()); - TLogoBlobID id(1, 1, 1, 0, data.size(), 0); - - { // write data to group - TActorId sender = runtime->AllocateEdgeActor(1); - runtime->WrapInActorContext(sender, [&] { - SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max())); - }); - auto res = env.WaitForEdgeActorEvent(sender); - UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); - } - auto checkReadable = [&](NKikimrProto::EReplyStatus status) { - TActorId sender = runtime->AllocateEdgeActor(1); - runtime->WrapInActorContext(sender, [&] { - SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(), - NKikimrBlobStorage::EGetHandleClass::FastRead)); - }); - auto res = env.WaitForEdgeActorEvent(sender); - UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); - UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1); - auto& r = res->Get()->Responses[0]; - UNIT_ASSERT_VALUES_EQUAL(r.Status, status); - if (status == NKikimrProto::OK) { - UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data); + for (ui32 step = 1; step < 100; ++step) { + TLogoBlobID id(1, 1, step, 0, data.size(), 0); + + { // write data to group + TActorId sender = runtime->AllocateEdgeActor(1); + runtime->WrapInActorContext(sender, [&] { + SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvPut(id, data, TInstant::Max())); + }); + auto res = env.WaitForEdgeActorEvent(sender); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); } - }; - checkReadable(NKikimrProto::OK); + auto checkReadable = [&] { + TActorId sender = runtime->AllocateEdgeActor(1); + runtime->WrapInActorContext(sender, [&] { + SendToBSProxy(sender, info->GroupID, new TEvBlobStorage::TEvGet(id, 0, 0, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead)); + }); + auto res = env.WaitForEdgeActorEvent(sender); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->ResponseSz, 1); + auto& r = res->Get()->Responses[0]; + UNIT_ASSERT_VALUES_EQUAL(r.Status, NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(r.Buffer.ConvertToString(), data); + + ui32 partsMask = 0; + for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) { + const TVDiskID& vdiskId = info->GetVDiskId(i); + env.WithQueueId(vdiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, [&](TActorId queueId) { + const TActorId sender = runtime->AllocateEdgeActor(1); + auto ev = TEvBlobStorage::TEvVGet::CreateExtremeDataQuery(vdiskId, TInstant::Max(), + NKikimrBlobStorage::EGetHandleClass::FastRead); + ev->AddExtremeQuery(id, 0, 0); + runtime->Send(new IEventHandle(queueId, sender, ev.release()), sender.NodeId()); + auto reply = env.WaitForEdgeActorEvent(sender); + auto& record = reply->Get()->Record; + UNIT_ASSERT_VALUES_EQUAL(record.GetStatus(), NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(record.ResultSize(), 1); + for (const auto& result : record.GetResult()) { + if (result.GetStatus() == NKikimrProto::OK) { + const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(result.GetBlobID()); + UNIT_ASSERT(id.PartId()); + const ui32 partIdx = id.PartId() - 1; + const ui32 mask = 1 << partIdx; + UNIT_ASSERT(!(partsMask & mask)); + partsMask |= mask; + } else { + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), NKikimrProto::NODATA); + } + } + }); + } + UNIT_ASSERT_VALUES_EQUAL(partsMask, (1 << info->Type.TotalPartCount()) - 1); + }; - for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) { - const TActorId vdiskActorId = info->GetActorId(i); + checkReadable(); - ui32 nodeId, pdiskId; - std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId); - auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId)); - Y_ABORT_UNLESS(it != env.PDiskMockStates.end()); + ui32 mask = 0; - const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId()); - env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId()); - auto res = env.WaitForEdgeActorEvent(sender); + for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) { + const TActorId vdiskActorId = info->GetActorId(i); - for (auto& item : res->Get()->Layout) { - using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult; - if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob) { - const TDiskPart& part = item.Location; - it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size, true); - break; + ui32 nodeId, pdiskId; + std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId); + auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId)); + Y_ABORT_UNLESS(it != env.PDiskMockStates.end()); + + const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId()); + env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId()); + auto res = env.WaitForEdgeActorEvent(sender); + + for (auto& item : res->Get()->Layout) { + using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult; + if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) { + const TDiskPart& part = item.Location; + mask |= 1 << i; + it->second->SetCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + 1 + RandomNumber(part.Size), true); + break; + } } + + checkReadable(); } - checkReadable(NKikimrProto::OK); - } + env.Sim(TDuration::Seconds(60)); - env.Sim(TDuration::Seconds(60)); + for (ui32 i = 0; i < info->GetTotalVDisksNum(); ++i) { + if (~mask >> i & 1) { + continue; + } + + const TActorId vdiskActorId = info->GetActorId(i); + + ui32 nodeId, pdiskId; + std::tie(nodeId, pdiskId, std::ignore) = DecomposeVDiskServiceId(vdiskActorId); + auto it = env.PDiskMockStates.find(std::make_pair(nodeId, pdiskId)); + Y_ABORT_UNLESS(it != env.PDiskMockStates.end()); + + const TActorId sender = runtime->AllocateEdgeActor(vdiskActorId.NodeId()); + env.Runtime->Send(new IEventHandle(vdiskActorId, sender, new TEvBlobStorage::TEvCaptureVDiskLayout), sender.NodeId()); + auto res = env.WaitForEdgeActorEvent(sender); + + bool anyPartReadable = false; + + for (auto& item : res->Get()->Layout) { + using T = TEvBlobStorage::TEvCaptureVDiskLayoutResult; + if (item.Database == T::EDatabase::LogoBlobs && item.RecordType == T::ERecordType::HugeBlob && item.BlobId.FullID() == id) { + const TDiskPart& part = item.Location; + anyPartReadable = !it->second->HasCorruptedArea(part.ChunkIdx, part.Offset, part.Offset + part.Size); + if (anyPartReadable) { + break; + } + } + } + + UNIT_ASSERT(anyPartReadable); + } + } } Y_UNIT_TEST_SUITE(ScrubFast) { @@ -77,3 +142,4 @@ Y_UNIT_TEST_SUITE(ScrubFast) { Test(); } } + diff --git a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h index 0e9022168f2c..cf645679aad6 100644 --- a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h +++ b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_impl.h @@ -99,15 +99,15 @@ namespace NKikimr { // a map to fill upon receiving VGet result struct TPerBlobInfo { - const TInstant Deadline; std::weak_ptr Context; TEvRecoverBlobResult::TItem *Item; // item to update ui32 BlobReplyCounter = 0; // number of unreplied queries for this blob }; std::unordered_multimap> VGetResultMap; + std::set> GetsInFlight; void AddBlobQuery(const TLogoBlobID& id, NMatrix::TVectorType needed, const std::shared_ptr& context, TEvRecoverBlobResult::TItem *item); - void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize); + void AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup); void SendPendingQueries(); void Handle(TEvBlobStorage::TEvVGetResult::TPtr ev); NKikimrProto::EReplyStatus ProcessItemData(TEvRecoverBlobResult::TItem& item); diff --git a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp index 24a1d5dd087a..378c689fc4f3 100644 --- a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_process.cpp @@ -7,46 +7,56 @@ namespace NKikimr { STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS32, VDISKP(LogPrefix, "AddBlobQuery"), (SelfId, SelfId()), (Id, id), (Needed, needed), (RequestId, context->RequestId)); const TInstant deadline = context->Iterator->first; - const TBlobStorageGroupType& gtype = Info->Type; TBlobStorageGroupInfo::TOrderNums nums; Info->GetTopology().PickSubgroup(id.Hash(), nums); ui32 blobReplyCounter = 0; for (ui32 i = 0; i < nums.size(); ++i) { const TVDiskID& vdiskId = Info->GetVDiskId(i); // obtain VDisk - if (TVDiskIdShort(vdiskId) == VCtx->ShortSelfVDisk) { - continue; + if (TVDiskIdShort(vdiskId) != VCtx->ShortSelfVDisk) { + AddExtremeQuery(vdiskId, id, deadline, i); + ++blobReplyCounter; } + } + VGetResultMap.emplace(id, TPerBlobInfo{context, item, blobReplyCounter}); + } + + void TBlobRecoveryActor::AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 idxInSubgroup) { + const auto [_, inserted] = GetsInFlight.emplace(vdiskId, id); + + ui32 worstReplySize = 0; + if (inserted) { + const TBlobStorageGroupType& gtype = Info->Type; switch (TIngress::IngressMode(gtype)) { case TIngress::EMode::GENERIC: - ui32 maxSize; - maxSize = 0; if (gtype.GetErasure() == TBlobStorageGroupType::ErasureMirror3dc) { - maxSize += gtype.PartSize(TLogoBlobID(id, i % 3 + 1)); + worstReplySize = gtype.PartSize(TLogoBlobID(id, idxInSubgroup % 3 + 1)); } else { for (ui32 k = 0; k < gtype.TotalPartCount(); ++k) { - maxSize += i >= gtype.TotalPartCount() || k == i ? gtype.PartSize(TLogoBlobID(id, k + 1)) : 0; + worstReplySize += idxInSubgroup >= gtype.TotalPartCount() || k == idxInSubgroup + ? gtype.PartSize(TLogoBlobID(id, k + 1)) : 0; } } - AddExtremeQuery(vdiskId, id, deadline, maxSize); break; case TIngress::EMode::MIRROR3OF4: - AddExtremeQuery(vdiskId, id, deadline, gtype.PartSize(TLogoBlobID(id, 1)) + - gtype.PartSize(TLogoBlobID(id, 2))); + for (ui32 i = 0; i < 2; ++i) { + if (idxInSubgroup % 2 == i || idxInSubgroup >= 4) { + worstReplySize += gtype.PartSize(TLogoBlobID(id, i + 1)); + } + } break; } - ++blobReplyCounter; } - VGetResultMap.emplace(id, TPerBlobInfo{context->Iterator->first, context, item, blobReplyCounter}); - } - void TBlobRecoveryActor::AddExtremeQuery(const TVDiskID& vdiskId, const TLogoBlobID& id, TInstant deadline, ui32 worstReplySize) { STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS33, VDISKP(LogPrefix, "AddExtremeQuery"), (SelfId, SelfId()), - (VDiskId, vdiskId), (Id, id), (WorstReplySize, worstReplySize)); + (VDiskId, vdiskId), (Id, id), (WorstReplySize, worstReplySize), (AlreadyInFlight, !inserted)); + if (!inserted) { // the request is already in flight + return; + } TQuery& query = Queries[vdiskId]; - const ui32 maxReplySize = 10000000; // FIXME + const ui32 maxReplySize = 32_MB; if (query.VGet && query.WorstReplySize + worstReplySize > maxReplySize) { // send the request on overflow query.Pending.push_back(std::move(query.VGet)); query.WorstReplySize = 0; @@ -79,35 +89,51 @@ namespace NKikimr { STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS35, VDISKP(LogPrefix, "received TEvVGetResult"), (SelfId, SelfId()), (Msg, ev->Get()->ToString())); + const TInstant now = TActivationContext::Now(); const auto& record = ev->Get()->Record; + const TVDiskID vdiskId = VDiskIDFromVDiskID(record.GetVDiskID()); + std::unordered_map> rerequest; + std::unordered_set done; + for (const auto& res : record.GetResult()) { const TLogoBlobID& id = LogoBlobIDFromLogoBlobID(res.GetBlobID()); const TLogoBlobID& fullId = id.FullID(); // whole blob id - auto r = VGetResultMap.equal_range(fullId); - for (auto it = r.first; it != r.second; ) { + done.insert(fullId); + const NKikimrProto::EReplyStatus status = res.GetStatus(); + auto [begin, end] = VGetResultMap.equal_range(fullId); + for (auto it = begin; it != end; ) { TPerBlobInfo& info = it->second; if (auto context = info.Context.lock()) { // context acquired, request is still intact - auto& item = *info.Item; // only here we can access item, after obtaining context pointer - TRope data = ev->Get()->GetBlobData(res); - bool update = false; - if (res.GetStatus() == NKikimrProto::OK && data) { - item.SetPartData(id, std::move(data)); - update = true; - } - const bool term = !--info.BlobReplyCounter; - if (item.Status == NKikimrProto::UNKNOWN && (term || update)) { - const NKikimrProto::EReplyStatus prevStatus = std::exchange(item.Status, ProcessItemData(item)); - if (item.Status == NKikimrProto::UNKNOWN && term) { // not enough parts to fulfill request - item.Status = NKikimrProto::NODATA; + if (status == NKikimrProto::DEADLINE && now < context->Iterator->first) { + auto& deadline = rerequest[fullId]; + deadline = Max(deadline, context->Iterator->first); + } else { + auto& item = *info.Item; // only here we can access item, after obtaining context pointer + TRope data = ev->Get()->GetBlobData(res); + bool update = false; + if (res.GetStatus() == NKikimrProto::OK && data) { + item.SetPartData(id, std::move(data)); + update = true; + } + const bool term = !--info.BlobReplyCounter; + if (item.Status == NKikimrProto::UNKNOWN && (term || update)) { + const NKikimrProto::EReplyStatus prevStatus = std::exchange(item.Status, ProcessItemData(item)); + if (item.Status == NKikimrProto::UNKNOWN && term) { // not enough parts to fulfill request + item.Status = NKikimrProto::NODATA; + } + STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS36, VDISKP(LogPrefix, "processing item"), + (SelfId, SelfId()), (RequestId, context->RequestId), (Id, id), + (Status, res.GetStatus()), (Last, term), (DataUpdated, update), + (EntryStatus, prevStatus), (ExitStatus, item.Status)); + } + if (item.Status != NKikimrProto::UNKNOWN && !--context->NumUnrespondedBlobs) { // request fully completed + context->SendResult(SelfId()); + InFlight.erase(context->Iterator); + } + if (term) { // this was the last reply for current blob + it = VGetResultMap.erase(it); + continue; } - STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS36, VDISKP(LogPrefix, "processing item"), - (SelfId, SelfId()), (RequestId, context->RequestId), (Id, id), - (Status, res.GetStatus()), (Last, term), (DataUpdated, update), - (EntryStatus, prevStatus), (ExitStatus, item.Status)); - } - if (item.Status != NKikimrProto::UNKNOWN && !--context->NumUnrespondedBlobs) { - context->SendResult(SelfId()); - InFlight.erase(context->Iterator); } ++it; } else { // request deadlined or canceled, we erase it from the map @@ -115,6 +141,15 @@ namespace NKikimr { } } } + + for (const auto& id : done) { + const size_t n = GetsInFlight.erase(std::make_tuple(vdiskId, id)); + Y_DEBUG_ABORT_UNLESS(n == 1); + } + for (const auto& [id, deadline] : rerequest) { + AddExtremeQuery(vdiskId, id, deadline, Info->GetTopology().GetIdxInSubgroup(vdiskId, id.Hash())); + } + SendPendingQueries(); } NKikimrProto::EReplyStatus TBlobRecoveryActor::ProcessItemData(TEvRecoverBlobResult::TItem& item) { diff --git a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_request.cpp b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_request.cpp index fcfbd6ed9f3a..e86edcacbbb9 100644 --- a/ydb/core/blobstorage/vdisk/scrub/blob_recovery_request.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/blob_recovery_request.cpp @@ -49,14 +49,9 @@ namespace NKikimr { } InFlight.erase(InFlight.begin(), it); - TInstant deadline = TInstant::Max(); // next deadline - if (it != InFlight.end()) { - deadline = it->first; - } - // reschedule timer - if (deadline != TInstant::Max()) { - Schedule(deadline, new TEvents::TEvWakeup); + if (it != InFlight.end()) { + Schedule(it->first, new TEvents::TEvWakeup); } else { WakeupScheduled = false; } diff --git a/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp b/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp index 87b1113d5f2e..8957d7a998da 100644 --- a/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp +++ b/ydb/core/blobstorage/vdisk/scrub/restore_corrupted_blob_actor.cpp @@ -196,6 +196,8 @@ namespace NKikimr { } void IssueQuery() { + STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS00, VDISKP(LogPrefix, "IssueQuery"), (SelfId, SelfId())); + std::unique_ptr ev; for (size_t i = 0; i < Items.size(); ++i) { const auto& item = Items[i]; @@ -205,6 +207,8 @@ namespace NKikimr { ev->Deadline = Deadline; } ev->Items.emplace_back(item.BlobId, TStackVec(item.Parts), item.PartsMask, item.Needed, TDiskPart(), i); + STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS17, VDISKP(LogPrefix, "IssueQuery item"), (SelfId, SelfId()), + (BlobId, item.BlobId), (PartsMask, item.PartsMask), (Needed, item.Needed)); } } if (ev) { @@ -222,6 +226,8 @@ namespace NKikimr { } void Handle(TEvRecoverBlobResult::TPtr ev) { + STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS24, VDISKP(LogPrefix, "Handle(TEvRecoverBlobResult)"), (SelfId, SelfId())); + for (auto& item : ev->Get()->Items) { auto& myItem = Items[item.Cookie]; Y_ABORT_UNLESS(myItem.Status == NKikimrProto::UNKNOWN); @@ -233,6 +239,8 @@ namespace NKikimr { IssueWrite(myItem, item.Cookie); } } + STLOG(PRI_DEBUG, BS_VDISK_SCRUB, VDS43, VDISKP(LogPrefix, "Handle(TEvRecoverBlobResult) item"), + (SelfId, SelfId()), (BlobId, item.BlobId), (Status, item.Status), (PartsMask, item.PartsMask)); } for (const auto& item : Items) { if (item.Status == NKikimrProto::UNKNOWN) { From 273e018c909a6207257b6930e577cd7422b67c9e Mon Sep 17 00:00:00 2001 From: kruall Date: Wed, 7 Feb 2024 12:15:08 +0300 Subject: [PATCH 2/2] Fix leaking blobs via using patching (#1639) --- .../blobstorage/dsproxy/dsproxy_patch.cpp | 26 ++++++++++++------- .../blobstorage/vdisk/common/vdisk_events.h | 13 ++++++++-- .../skeleton/blobstorage_skeletonfront.cpp | 4 ++- .../vdisk/skeleton/skeleton_vpatch_actor.cpp | 11 +++++--- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index 131112c799d5..e3429b5f3de4 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -68,6 +68,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor ReceivedResponseFlags; TStackVec EmptyResponseFlags; TStackVec ErrorResponseFlags; + TStackVec ForceStopFlags; TBlobStorageGroupInfo::TVDiskIds VDisks; bool UseVPatch = false; @@ -332,8 +333,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGet()->Record; + + Y_ABORT_UNLESS(record.HasCookie()); + ui8 subgroupIdx = record.GetCookie(); + if (ForceStopFlags[subgroupIdx]) { + return; // ignore force stop response + } + ReceivedResults++; + PullOutStatusFlagsAndFressSpace(record); Y_ABORT_UNLESS(record.HasStatus()); NKikimrProto::EReplyStatus status = record.GetStatus(); @@ -342,9 +350,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor> events; - for (ui32 vdiskIdx = 0; vdiskIdx < VDisks.size(); ++vdiskIdx) { - if (!ErrorResponseFlags[vdiskIdx] && !EmptyResponseFlags[vdiskIdx] && ReceivedResponseFlags[vdiskIdx]) { + for (ui32 subgroupIdx = 0; subgroupIdx < VDisks.size(); ++subgroupIdx) { + if (!ErrorResponseFlags[subgroupIdx] && !EmptyResponseFlags[subgroupIdx] && ReceivedResponseFlags[subgroupIdx]) { std::unique_ptr ev = std::make_unique( - OriginalId, PatchedId, VDisks[vdiskIdx], 0, Deadline, vdiskIdx); + OriginalId, PatchedId, VDisks[subgroupIdx], 0, Deadline, subgroupIdx); ev->SetForceEnd(); + ForceStopFlags[subgroupIdx] = true; events.emplace_back(std::move(ev)); PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA19, "Send stop message", - (VDiskIdxInSubgroup, vdiskIdx), - (VDiskId, VDisks[vdiskIdx])); + (VDiskIdxInSubgroup, subgroupIdx), + (VDiskId, VDisks[subgroupIdx])); } } SendToQueues(events, false); @@ -495,6 +501,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor> events; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index f33888e534a9..dfc24c096ff9 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -366,10 +366,12 @@ namespace NKikimr { : public TEventLocal { TVMsgContext Ctx; std::unique_ptr Event; + bool DoNotResend; - TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr event) + TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr event, bool doNotResend = false) : Ctx(ctx) , Event(std::move(event)) + , DoNotResend(doNotResend) { Y_DEBUG_ABORT_UNLESS(Ctx.ExtQueueId != NKikimrBlobStorage::EVDiskQueueId::Unknown); Y_DEBUG_ABORT_UNLESS(Ctx.IntQueueId != NKikimrBlobStorage::EVDiskInternalQueueId::IntUnknown); @@ -468,6 +470,9 @@ namespace NKikimr { TActorIDPtr SkeletonFrontIDPtr; THPTimer ExecutionTimer; + protected: + bool DoNotResendFromSkeletonFront = false; + public: TEvVResultBaseWithQoSPB() = default; @@ -526,7 +531,7 @@ namespace NKikimr { byteSize, this->ToString().data()); if (SkeletonFrontIDPtr && MsgCtx.IntQueueId != NKikimrBlobStorage::IntUnknown) { - ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev))); + ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev), DoNotResendFromSkeletonFront)); } else { TActivationContext::Send(ev.release()); } @@ -2182,6 +2187,10 @@ namespace NKikimr { Record.SetApproximateFreeSpaceShare(approximateFreeSpaceShare); } + void SetForceEndResponse() { + DoNotResendFromSkeletonFront = true; + } + void MakeError(NKikimrProto::EReplyStatus status, const TString &errorReason, const NKikimrBlobStorage::TEvVPatchDiff &request) { diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index 238a69e391c5..290ac985d8a8 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -1632,7 +1632,9 @@ namespace NKikimr { extQueue.Completed(ctx, msgCtx, event); TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId); intQueue.Completed(ctx, msgCtx, *this, id); - TActivationContext::Send(event.release()); + if (!ev->Get()->DoNotResend) { + TActivationContext::Send(event.release()); + } } void ChangeGeneration(const TVDiskID& vdiskId, const TIntrusivePtr& info, diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp index 8aaec57b68b4..a3587f75ef24 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp @@ -291,7 +291,7 @@ namespace NKikimr::NPrivate { } } - void SendVPatchResult(NKikimrProto::EReplyStatus status) + void SendVPatchResult(NKikimrProto::EReplyStatus status, bool forceEnd = false) { STLOG(PRI_INFO, BS_VDISK_PATCH, BSVSP07, VDiskLogPrefix << " TEvVPatch: send patch result;", @@ -308,6 +308,9 @@ namespace NKikimr::NPrivate { } AddMark((status == NKikimrProto::OK ? "Patch ends with OK" : "Patch ends witn NOT OK")); CurrentEventTrace = nullptr; + if (forceEnd) { + ResultEvent->SetForceEndResponse(); + } SendVDiskResponse(TActivationContext::AsActorContext(), Sender, ResultEvent.release(), Cookie); } @@ -501,7 +504,7 @@ namespace NKikimr::NPrivate { Cookie = ev->Cookie; CurrentEventTrace = ev->Get()->VDiskSkeletonTrace; AddMark("Error: HandleError TEvVPatchDiff"); - SendVPatchResult(NKikimrProto::ERROR); + SendVPatchResult(NKikimrProto::ERROR, ev->Get()->IsForceEnd()); } void HandleForceEnd(TEvBlobStorage::TEvVPatchDiff::TPtr &ev) { @@ -509,7 +512,7 @@ namespace NKikimr::NPrivate { SendVPatchFoundParts(NKikimrProto::ERROR); if (forceEnd) { AddMark("Force end"); - SendVPatchResult(NKikimrProto::OK); + SendVPatchResult(NKikimrProto::OK, true); } else { AddMark("Force end by error"); SendVPatchResult(NKikimrProto::ERROR); @@ -566,7 +569,7 @@ namespace NKikimr::NPrivate { if (forceEnd) { AddMark("Force end"); - SendVPatchResult(NKikimrProto::OK); + SendVPatchResult(NKikimrProto::OK, true); NotifySkeletonAboutDying(); Become(&TThis::ErrorState); return;