Skip to content

Commit

Permalink
Fix leaking blobs via using patching (#1639)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored and alexvru committed Feb 7, 2024
1 parent f5e8d85 commit 273e018
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
26 changes: 17 additions & 9 deletions ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
TStackVec<bool, TypicalDisksInSubring> ReceivedResponseFlags;
TStackVec<bool, TypicalDisksInSubring> EmptyResponseFlags;
TStackVec<bool, TypicalDisksInSubring> ErrorResponseFlags;
TStackVec<bool, TypicalDisksInSubring> ForceStopFlags;
TBlobStorageGroupInfo::TVDiskIds VDisks;

bool UseVPatch = false;
Expand Down Expand Up @@ -332,8 +333,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
}

void Handle(TEvBlobStorage::TEvVPatchResult::TPtr &ev) {
ReceivedResults++;
NKikimrBlobStorage::TEvVPatchResult &record = ev->Get()->Record;

Y_ABORT_UNLESS(record.HasCookie());
ui8 subgroupIdx = record.GetCookie();
if (ForceStopFlags[subgroupIdx]) {
return; // ignore force stop response
}
ReceivedResults++;

PullOutStatusFlagsAndFressSpace(record);
Y_ABORT_UNLESS(record.HasStatus());
NKikimrProto::EReplyStatus status = record.GetStatus();
Expand All @@ -342,9 +350,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
errorReason = record.GetErrorReason();
}

Y_ABORT_UNLESS(record.HasCookie());
ui8 subgroupIdx = record.GetCookie();

PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA23, "Received VPatchResult",
(Status, status),
(SubgroupIdx, (ui32)subgroupIdx),
Expand Down Expand Up @@ -413,15 +418,16 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
void SendStopDiffs() {
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA18, "Send stop diffs");
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchDiff>> events;
for (ui32 vdiskIdx = 0; vdiskIdx < VDisks.size(); ++vdiskIdx) {
if (!ErrorResponseFlags[vdiskIdx] && !EmptyResponseFlags[vdiskIdx] && ReceivedResponseFlags[vdiskIdx]) {
for (ui32 subgroupIdx = 0; subgroupIdx < VDisks.size(); ++subgroupIdx) {
if (!ErrorResponseFlags[subgroupIdx] && !EmptyResponseFlags[subgroupIdx] && ReceivedResponseFlags[subgroupIdx]) {
std::unique_ptr<TEvBlobStorage::TEvVPatchDiff> ev = std::make_unique<TEvBlobStorage::TEvVPatchDiff>(
OriginalId, PatchedId, VDisks[vdiskIdx], 0, Deadline, vdiskIdx);
OriginalId, PatchedId, VDisks[subgroupIdx], 0, Deadline, subgroupIdx);
ev->SetForceEnd();
ForceStopFlags[subgroupIdx] = true;
events.emplace_back(std::move(ev));
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA19, "Send stop message",
(VDiskIdxInSubgroup, vdiskIdx),
(VDiskId, VDisks[vdiskIdx]));
(VDiskIdxInSubgroup, subgroupIdx),
(VDiskId, VDisks[subgroupIdx]));
}
}
SendToQueues(events, false);
Expand Down Expand Up @@ -495,6 +501,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA20, "Send TEvVPatchDiff",
(VDiskIdxInSubgroup, idxInSubgroup),
(PatchedVDiskIdxInSubgroup, patchedIdxInSubgroup),
(PartId, (ui64)partPlacement.PartId),
(DiffsForPart, diffsForPart.size()),
(ParityPlacements, parityPlacements.size()),
(WaitedXorDiffs, waitedXorDiffs));
Expand Down Expand Up @@ -586,6 +593,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor<TBlob
ReceivedResponseFlags.assign(VDisks.size(), false);
ErrorResponseFlags.assign(VDisks.size(), false);
EmptyResponseFlags.assign(VDisks.size(), false);
ForceStopFlags.assign(VDisks.size(), false);

TDeque<std::unique_ptr<TEvBlobStorage::TEvVPatchStart>> events;

Expand Down
13 changes: 11 additions & 2 deletions ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,12 @@ namespace NKikimr {
: public TEventLocal<TEvVDiskRequestCompleted, TEvBlobStorage::EvVDiskRequestCompleted> {
TVMsgContext Ctx;
std::unique_ptr<IEventHandle> Event;
bool DoNotResend;

TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr<IEventHandle> event)
TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr<IEventHandle> event, bool doNotResend = false)
: Ctx(ctx)
, Event(std::move(event))
, DoNotResend(doNotResend)
{
Y_DEBUG_ABORT_UNLESS(Ctx.ExtQueueId != NKikimrBlobStorage::EVDiskQueueId::Unknown);
Y_DEBUG_ABORT_UNLESS(Ctx.IntQueueId != NKikimrBlobStorage::EVDiskInternalQueueId::IntUnknown);
Expand Down Expand Up @@ -468,6 +470,9 @@ namespace NKikimr {
TActorIDPtr SkeletonFrontIDPtr;
THPTimer ExecutionTimer;

protected:
bool DoNotResendFromSkeletonFront = false;

public:
TEvVResultBaseWithQoSPB() = default;

Expand Down Expand Up @@ -526,7 +531,7 @@ namespace NKikimr {
byteSize, this->ToString().data());

if (SkeletonFrontIDPtr && MsgCtx.IntQueueId != NKikimrBlobStorage::IntUnknown) {
ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev)));
ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev), DoNotResendFromSkeletonFront));
} else {
TActivationContext::Send(ev.release());
}
Expand Down Expand Up @@ -2182,6 +2187,10 @@ namespace NKikimr {
Record.SetApproximateFreeSpaceShare(approximateFreeSpaceShare);
}

void SetForceEndResponse() {
DoNotResendFromSkeletonFront = true;
}

void MakeError(NKikimrProto::EReplyStatus status, const TString &errorReason,
const NKikimrBlobStorage::TEvVPatchDiff &request)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,9 @@ namespace NKikimr {
extQueue.Completed(ctx, msgCtx, event);
TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId);
intQueue.Completed(ctx, msgCtx, *this, id);
TActivationContext::Send(event.release());
if (!ev->Get()->DoNotResend) {
TActivationContext::Send(event.release());
}
}

void ChangeGeneration(const TVDiskID& vdiskId, const TIntrusivePtr<TBlobStorageGroupInfo>& info,
Expand Down
11 changes: 7 additions & 4 deletions ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ namespace NKikimr::NPrivate {
}
}

void SendVPatchResult(NKikimrProto::EReplyStatus status)
void SendVPatchResult(NKikimrProto::EReplyStatus status, bool forceEnd = false)
{
STLOG(PRI_INFO, BS_VDISK_PATCH, BSVSP07,
VDiskLogPrefix << " TEvVPatch: send patch result;",
Expand All @@ -308,6 +308,9 @@ namespace NKikimr::NPrivate {
}
AddMark((status == NKikimrProto::OK ? "Patch ends with OK" : "Patch ends witn NOT OK"));
CurrentEventTrace = nullptr;
if (forceEnd) {
ResultEvent->SetForceEndResponse();
}
SendVDiskResponse(TActivationContext::AsActorContext(), Sender, ResultEvent.release(), Cookie);
}

Expand Down Expand Up @@ -501,15 +504,15 @@ namespace NKikimr::NPrivate {
Cookie = ev->Cookie;
CurrentEventTrace = ev->Get()->VDiskSkeletonTrace;
AddMark("Error: HandleError TEvVPatchDiff");
SendVPatchResult(NKikimrProto::ERROR);
SendVPatchResult(NKikimrProto::ERROR, ev->Get()->IsForceEnd());
}

void HandleForceEnd(TEvBlobStorage::TEvVPatchDiff::TPtr &ev) {
bool forceEnd = ev->Get()->IsForceEnd();
SendVPatchFoundParts(NKikimrProto::ERROR);
if (forceEnd) {
AddMark("Force end");
SendVPatchResult(NKikimrProto::OK);
SendVPatchResult(NKikimrProto::OK, true);
} else {
AddMark("Force end by error");
SendVPatchResult(NKikimrProto::ERROR);
Expand Down Expand Up @@ -566,7 +569,7 @@ namespace NKikimr::NPrivate {

if (forceEnd) {
AddMark("Force end");
SendVPatchResult(NKikimrProto::OK);
SendVPatchResult(NKikimrProto::OK, true);
NotifySkeletonAboutDying();
Become(&TThis::ErrorState);
return;
Expand Down

0 comments on commit 273e018

Please sign in to comment.